Add a sample for running an orchestration with DTFx. (#180)

* Add a sample for running an orchestration with DTFx.
* update validation of settings to be more comprehensive and useful in a DTFx context
This commit is contained in:
Sebastian Burckhardt 2022-07-26 09:46:05 -07:00 коммит произвёл GitHub
Родитель 113fc4addc
Коммит fd878e96ab
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 185 добавлений и 6 удалений

Просмотреть файл

@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29806.167
# Visual Studio Version 17
VisualStudioVersion = 17.2.32630.192
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DurableTask.Netherite", "src\DurableTask.Netherite\DurableTask.Netherite.csproj", "{FC7FB0AF-2322-4356-AF64-A8E2EB7D1EF8}"
EndProject
@ -34,6 +34,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScaleMonitor", "test\ScaleM
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LoadGeneratorApp", "test\LoadGeneratorApp\LoadGeneratorApp.csproj", "{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloDTFx", "samples\HelloDTFx\HelloDTFx\HelloDTFx.csproj", "{EC293D85-91E3-4F78-8B1E-2C691315CE96}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -72,6 +74,10 @@ Global
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Release|Any CPU.Build.0 = Release|Any CPU
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -85,6 +91,7 @@ Global
{654DA6B4-2E2F-4386-BB9F-7CE5A13998DE} = {AB958467-9236-402E-833C-B8DE4841AB9F}
{2F4D331C-62E4-47E8-852E-163166944DF8} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{EC293D85-91E3-4F78-8B1E-2C691315CE96} = {AB958467-9236-402E-833C-B8DE4841AB9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {238A9613-5411-41CF-BDEC-168CCD5C03FB}

Просмотреть файл

@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using DurableTask.Core;
public class HelloSequence : TaskOrchestration<List<string>, string>
{
public override async Task<List<string>> RunTask(OrchestrationContext context, string input)
{
var result = new List<string>
{
await context.ScheduleTask<string>(typeof(SayHello), "Tokyo"),
await context.ScheduleTask<string>(typeof(SayHello), "Seattle"),
await context.ScheduleTask<string>(typeof(SayHello), "London"),
};
return result;
}
}
public class SayHello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
return $"Hello, {input}!";
}
}

Просмотреть файл

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\DurableTask.Netherite\DurableTask.Netherite.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using DurableTask.Core;
using DurableTask.Netherite;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
// ----------- construct the Netherite orchestration service
Console.WriteLine("Starting Netherite...");
var netheriteSettings = new NetheriteOrchestrationServiceSettings()
{
HubName = "myhub",
PartitionCount = 4,
// we explicitly specify the two required connection strings here.
// Another option would be to use a connection name resolver when calling Validate().
ResolvedStorageConnectionString = "UseDevelopmentStorage=true;",
ResolvedTransportConnectionString = "MemoryF",
};
netheriteSettings.Validate();
var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options => options.SingleLine = true);
});
NetheriteOrchestrationService netherite = new NetheriteOrchestrationService(netheriteSettings, loggerFactory);
// ---------- create the task hub in storage, if it does not already exist
await ((IOrchestrationService) netherite).CreateIfNotExistsAsync();
// ---------- configure and start the DTFx worker
var worker = new TaskHubWorker(netherite, loggerFactory);
worker.AddTaskOrchestrations(typeof(HelloSequence));
worker.AddTaskActivities(typeof(SayHello));
await worker.StartAsync();
// ---------- configure the taskhub client
var client = new TaskHubClient(netherite);
// --------- start the orchestration, then wait for it to complete
Console.WriteLine("Starting the orchestration...");
OrchestrationInstance instance = await client.CreateOrchestrationInstanceAsync(typeof(HelloSequence), null);
Console.WriteLine("Waiting for completion...");
OrchestrationState taskResult = await client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(30), CancellationToken.None);
Console.WriteLine($"Result:\n{JsonConvert.SerializeObject(taskResult, Formatting.Indented)}\n");
// --------- shut down the service
Console.WriteLine("Press any key to shut down...");
Console.ReadKey();
Console.WriteLine($"Shutting down...");
await worker.StopAsync();
Console.WriteLine("Done.");

Просмотреть файл

@ -244,11 +244,21 @@ namespace DurableTask.Netherite
/// <summary>
/// Validates the settings, throwing exceptions if there are issues.
/// </summary>
/// <param name="nameResolver">The resolver for environment variables.</param>
public void Validate(Func<string,string> nameResolver)
/// <param name="nameResolver">Optionally, a resolver for connection names.</param>
public void Validate(Func<string,string> nameResolver = null)
{
if (string.IsNullOrEmpty(this.HubName))
{
throw new InvalidOperationException($"Must specify {nameof(this.HubName)} for Netherite storage provider.");
}
if (string.IsNullOrEmpty(this.ResolvedStorageConnectionString))
{
if (nameResolver == null)
{
throw new InvalidOperationException($"Must either specify {nameof(this.ResolvedStorageConnectionString)}, or specify {nameof(this.StorageConnectionName )} and provide a nameResolver, to construct Netherite storage provider.");
}
if (string.IsNullOrEmpty(this.StorageConnectionName))
{
throw new InvalidOperationException($"Must specify {nameof(this.StorageConnectionName)} for Netherite storage provider.");
@ -265,6 +275,11 @@ namespace DurableTask.Netherite
if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString)
&& !string.IsNullOrEmpty(this.PageBlobStorageConnectionName))
{
if (nameResolver == null)
{
throw new InvalidOperationException($"Must either specify {nameof(this.ResolvedPageBlobStorageConnectionString)}, or specify {nameof(this.PageBlobStorageConnectionName)} and provide a nameResolver, to construct Netherite storage provider.");
}
this.ResolvedPageBlobStorageConnectionString = nameResolver(this.PageBlobStorageConnectionName);
if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString))
@ -286,6 +301,11 @@ namespace DurableTask.Netherite
}
else
{
if (nameResolver == null)
{
throw new InvalidOperationException($"Must either specify {nameof(this.ResolvedTransportConnectionString)}, or specify {nameof(this.EventHubsConnectionName)} and provide a nameResolver, to construct Netherite storage provider.");
}
this.ResolvedTransportConnectionString = nameResolver(this.EventHubsConnectionName);
if (string.IsNullOrEmpty(this.ResolvedTransportConnectionString))
@ -302,10 +322,36 @@ namespace DurableTask.Netherite
throw new ArgumentOutOfRangeException(nameof(this.PartitionCount));
}
if (storage != TransportConnectionString.StorageChoices.Memory)
{
// make sure the connection string can be parsed correctly
try
{
Microsoft.Azure.Storage.CloudStorageAccount.Parse(this.ResolvedStorageConnectionString);
}
catch (Exception e)
{
throw new FormatException($"Could not parse the specified storage connection string for Netherite storage provider", e);
}
if (!string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString))
{
// make sure the connection string can be parsed correctly
try
{
Microsoft.Azure.Storage.CloudStorageAccount.Parse(this.ResolvedPageBlobStorageConnectionString);
}
catch (Exception e)
{
throw new FormatException($"Could not parse the specified page blob storage connection string for Netherite storage provider", e);
}
}
}
if (transport == TransportConnectionString.TransportChoices.EventHubs)
{
// validates the connection string
TransportConnectionString.EventHubsNamespaceName(this.ResolvedTransportConnectionString);
TransportConnectionString.EventHubsNamespaceName(this.ResolvedTransportConnectionString);
}
if (this.MaxConcurrentOrchestratorFunctions <= 0)

Просмотреть файл

@ -88,7 +88,7 @@ namespace DurableTask.Netherite
}
catch(Exception e)
{
throw new FormatException("Could not parse the specified Eventhubs namespace connection string.", e);
throw new FormatException("Could not parse the specified Eventhubs namespace connection string for the Netherite storage provider.", e);
}
}
}