Add support for authentication using an Azure.Core.TokenCredential (#197)

* support authentication using token credentials.

* address PR feedback

* use overload instead of optional argument to be compatible without requiring recompilation

* address PR feedback

* address PR feedback
This commit is contained in:
Sebastian Burckhardt 2022-11-09 07:44:29 -08:00 коммит произвёл GitHub
Родитель cf1e345881
Коммит ed721c67f0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
52 изменённых файлов: 1912 добавлений и 479 удалений

Просмотреть файл

@ -36,6 +36,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LoadGeneratorApp", "test\Lo
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloDTFx", "samples\HelloDTFx\HelloDTFx\HelloDTFx.csproj", "{EC293D85-91E3-4F78-8B1E-2C691315CE96}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDF", "samples\TokenCredentialDF\TokenCredentialDF.csproj", "{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDTFx", "samples\TokenCredentialDTFx\TokenCredentialDTFx.csproj", "{FBFF0814-E6C0-489A-ACCF-9D0699219621}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -78,6 +82,14 @@ Global
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EC293D85-91E3-4F78-8B1E-2C691315CE96}.Release|Any CPU.Build.0 = Release|Any CPU
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Release|Any CPU.Build.0 = Release|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -92,6 +104,8 @@ Global
{2F4D331C-62E4-47E8-852E-163166944DF8} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{EC293D85-91E3-4F78-8B1E-2C691315CE96} = {AB958467-9236-402E-833C-B8DE4841AB9F}
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4} = {AB958467-9236-402E-833C-B8DE4841AB9F}
{FBFF0814-E6C0-489A-ACCF-9D0699219621} = {AB958467-9236-402E-833C-B8DE4841AB9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {238A9613-5411-41CF-BDEC-168CCD5C03FB}

Просмотреть файл

@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// Reference: https://docs.microsoft.com/en-us/azure/azure-functions/functions-dotnet-dependency-injection
[assembly: Microsoft.Azure.Functions.Extensions.DependencyInjection.FunctionsStartup(typeof(TokenCredentialDF.Startup))]
namespace TokenCredentialDF
{
using System;
using Azure.Identity;
using DurableTask.Netherite;
using DurableTask.Netherite.AzureFunctions;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
public class Startup : FunctionsStartup
{
public class MyConnectionResolver : CredentialBasedConnectionNameResolver
{
readonly INameResolver nameResolver;
public MyConnectionResolver(INameResolver nameResolver) : base(new DefaultAzureCredential())
{
this.nameResolver = nameResolver;
}
string Resolve(string name) =>
this.nameResolver.Resolve(name)
?? Environment.GetEnvironmentVariable(name)
?? throw new InvalidOperationException($"missing configuration setting '{name}'");
public override string GetStorageAccountName(string connectionName) => this.Resolve($"{connectionName}__accountName");
public override string GetEventHubsNamespaceName(string connectionName) => this.Resolve($"{connectionName}__eventHubsNamespaceName");
}
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddSingleton<DurableTask.Netherite.ConnectionResolver, MyConnectionResolver>();
}
}
}

Просмотреть файл

@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace TokenCredentialDF
{
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Collections.Generic;
using System.Net;
/// <summary>
/// A simple Http Trigger that is useful for testing whether the service has started correctly and can execute orchestrations.
/// </summary>
public static class Hello
{
[FunctionName(nameof(Hello))]
public async static Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "hello")] HttpRequest req,
[DurableClient] IDurableClient client)
{
try
{
string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence));
TimeSpan timeout = TimeSpan.FromSeconds(30);
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, timeout);
}
catch (Exception e)
{
return new ObjectResult($"exception: {e}") { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
[FunctionName(nameof(HelloSequence))]
public static async Task<List<string>> HelloSequence([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var result = new List<string>
{
await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
await context.CallActivityAsync<string>(nameof(SayHello), "London")
};
return result;
}
[FunctionName(nameof(SayHello))]
public static Task<string> SayHello([ActivityTrigger] IDurableActivityContext context)
{
return Task.FromResult(context.GetInput<string>());
}
}
}

Просмотреть файл

@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace TokenCredentialDF
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Net;
/// <summary>
/// A simple Http Trigger that is useful for testing whether the functions host has started correctly.
/// </summary>
public static class Ping
{
[FunctionName(nameof(Ping))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
[DurableClient(ConnectionName = "MyConnection")] IDurableClient client,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
log.LogInformation("C# HTTP trigger function processed a request.");
var is64bit = Environment.Is64BitProcess;
try
{
return new OkObjectResult($"Hello from {client} ({(is64bit ? "x64" : "x32")})\n");
}
catch (Exception e)
{
return new ObjectResult($"exception: {e}") { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}
}

Просмотреть файл

@ -0,0 +1,24 @@
# Token Credential Sample
This sample demonstrates how to configure a custom connection resolver when using Netherite in an Azure Functions application.
## Why you may want to use a custom resolver
A custom connection resolver is appropriate if you want more control over how connection names are resolved to connection information. For example:
- It allows you to construct your own Token Credential.
- It allows you to use your own configuration logic for resolving connection names, as opposed to following the default mechanism defined by Azure Functions.
For this sample, we use the default Azure Identity credential as provided by the Azure.Identity package.
## Configuration Prerequisites
Before running this sample, you must
1. Create a new Azure Storage account, or reuse an existing one
2. Create a new Azure Event Hubs namespace, or reuse an existing one (that is not currently in use)
3. Make sure you have [Storage Blob Data Contributor](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#storage-blob-data-contributor) and [Storage Table Data Contributor](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#storage-table-data-contributor) permissions for the storage account.
4. Make sure you have [Azure Event Hubs Data Owner](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#azure-event-hubs-data-owner) permission for the event hubs namespace.
5. Set `MyConnection__accountName` to contain the name of the storage account, using an environment variable or a function app configuration setting,
6. Set `MyConnection__eventHubsNamespaceName` to contain the name of the Event Hubs namespace, using an environment variable or a function app configuration setting.

Просмотреть файл

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,29 @@
{
"version": "2.0",
"logging": {
"logLevel": {
"DurableTask.Netherite": "Information",
"DurableTask.Netherite.FasterStorage": "Information",
"DurableTask.Netherite.EventHubsTransport": "Information"
},
"applicationInsights": {
"sampling": {
"isEnabled": false
}
}
},
"extensions": {
"http": {
"routePrefix": ""
},
"durableTask": {
"hubName": "tokencredentialsample",
"UseGracefulShutdown": true,
"storageProvider": {
"type": "Netherite",
"StorageConnectionName": "MyConnection",
"EventHubsConnectionName": "MyConnection"
}
}
}
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using DurableTask.Core;
public class HelloSequence : TaskOrchestration<List<string>, string>
{
public override async Task<List<string>> RunTask(OrchestrationContext context, string input)
{
var result = new List<string>
{
await context.ScheduleTask<string>(typeof(SayHello), "Tokyo"),
await context.ScheduleTask<string>(typeof(SayHello), "Seattle"),
await context.ScheduleTask<string>(typeof(SayHello), "London"),
};
return result;
}
}
public class SayHello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
return $"Hello, {input}!";
}
}

Просмотреть файл

@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Azure.Identity;
using DurableTask.Core;
using DurableTask.Netherite;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
// ----------- construct the Netherite orchestration service
Console.WriteLine("Starting Netherite...");
var netheriteSettings = new NetheriteOrchestrationServiceSettings()
{
HubName = "myhub",
PartitionCount = 4,
};
netheriteSettings.Validate(ConnectionResolver.FromTokenCredential(
new DefaultAzureCredential(),
Environment.GetEnvironmentVariable("AccountName") ?? throw new Exception("missing env var: AccountName"),
Environment.GetEnvironmentVariable("NamespaceName") ?? throw new Exception("missing env var: NamespaceName")
));
var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options => options.SingleLine = true);
});
NetheriteOrchestrationService netherite = new NetheriteOrchestrationService(netheriteSettings, loggerFactory);
// ---------- create the task hub in storage, if it does not already exist
await ((IOrchestrationService) netherite).CreateIfNotExistsAsync();
// ---------- configure and start the DTFx worker
var worker = new TaskHubWorker(netherite, loggerFactory);
worker.AddTaskOrchestrations(typeof(HelloSequence));
worker.AddTaskActivities(typeof(SayHello));
await worker.StartAsync();
// ---------- configure the taskhub client
var client = new TaskHubClient(netherite);
// --------- start the orchestration, then wait for it to complete
Console.WriteLine("Starting the orchestration...");
OrchestrationInstance instance = await client.CreateOrchestrationInstanceAsync(typeof(HelloSequence), null);
Console.WriteLine("Waiting for completion...");
OrchestrationState taskResult = await client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(30), CancellationToken.None);
Console.WriteLine($"Result:\n{JsonConvert.SerializeObject(taskResult, Formatting.Indented)}\n");
// --------- shut down the service
Console.WriteLine("Press any key to shut down...");
Console.ReadKey();
Console.WriteLine($"Shutting down...");
await worker.StopAsync();
Console.WriteLine("Done.");

Просмотреть файл

@ -0,0 +1,19 @@
# Token Credential Sample
This sample demonstrates how to configure a connection resolver when using Netherite as the orchestration provider for a Durable Task project.
The connection resolver allows you to supply a Azure Token Credential instead of a connection string, which means you can use identity-based authentication.
For this sample, we use the default Azure Identity credential as provided by the Azure.Identity package.
## Configuration Prerequisites
Before running this sample, you must
1. Create a new Azure Storage account, or reuse an existing one
2. Create a new Azure Event Hubs namespace, or reuse an existing one (that is not currently in use)
3. Make sure you have [Storage Blob Data Contributor](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#storage-blob-data-contributor) and [Storage Table Data Contributor](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#storage-table-data-contributor) permissions for the storage account.
4. Make sure you have [Azure Event Hubs Data Owner](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#azure-event-hubs-data-owner) permission for the event hubs namespace.
5. Set `AccountName` to contain the name of the storage account, using an environment variable or a function app configuration setting,
6. Set `NamespaceName` to contain the name of the Event Hubs namespace, using an environment variable or a function app configuration setting.

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.7.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DurableTask.Netherite\DurableTask.Netherite.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -8,6 +8,7 @@ namespace DurableTask.Netherite.AzureFunctions
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Extensions.Azure;
@ -19,7 +20,7 @@ namespace DurableTask.Netherite.AzureFunctions
class BlobLogger
{
readonly DateTime starttime;
readonly CloudAppendBlob blob;
readonly Task<CloudAppendBlob> blob;
readonly object flushLock = new object();
readonly object lineLock = new object();
readonly ConcurrentQueue<MemoryStream> writebackQueue;
@ -30,16 +31,21 @@ namespace DurableTask.Netherite.AzureFunctions
readonly Timer timer;
#pragma warning restore IDE0052
public BlobLogger(string storageConnectionString, string hubName, string workerId)
public BlobLogger(ConnectionInfo storageConnection, string hubName, string workerId)
{
this.starttime = DateTime.UtcNow;
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);
CloudBlobClient client = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = client.GetContainerReference("logs");
container.CreateIfNotExists();
this.blob = container.GetAppendBlobReference($"{hubName}.{workerId}.{this.starttime:o}.log");
this.blob.CreateOrReplace();
this.blob = GetBlobAsync();
async Task<CloudAppendBlob> GetBlobAsync()
{
CloudStorageAccount storageAccount = await storageConnection.GetAzureStorageV11AccountAsync();
CloudBlobClient client = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = client.GetContainerReference("logs");
container.CreateIfNotExists();
var blob = container.GetAppendBlobReference($"{hubName}.{workerId}.{this.starttime:o}.log");
await blob.CreateOrReplaceAsync();
return blob;
}
this.memoryStream = new MemoryStream();
this.writer = new StreamWriter(this.memoryStream);
@ -89,7 +95,7 @@ namespace DurableTask.Netherite.AzureFunctions
{
// save to storage
toSave.Seek(0, SeekOrigin.Begin);
this.blob.AppendFromStream(toSave);
this.blob.GetAwaiter().GetResult().AppendFromStream(toSave);
toSave.Dispose();
}
}

Просмотреть файл

@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.AzureFunctions
{
using System;
using System.Collections.Generic;
using System.Text;
using Azure.Identity;
using Microsoft.Extensions.Logging.Abstractions;
#if !NETCOREAPP2_2
/// <summary>
/// Resolves connections using a token credential and a mapping from connection names to resource names.
/// </summary>
public abstract class CredentialBasedConnectionNameResolver : DurableTask.Netherite.ConnectionResolver
{
readonly Azure.Core.TokenCredential tokenCredential;
/// <summary>
/// Create a connection resolver that uses an Azure token credential.
/// </summary>
/// <param name="tokenCredential">The token credential to use.</param>
public CredentialBasedConnectionNameResolver(Azure.Core.TokenCredential tokenCredential)
{
this.tokenCredential = tokenCredential;
}
public abstract string GetStorageAccountName(string connectionName);
public abstract string GetEventHubsNamespaceName(string connectionName);
/// <inheritdoc/>
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
switch (recourceType)
{
case ResourceType.BlobStorage:
case ResourceType.TableStorage:
string storageAccountName = this.GetStorageAccountName(connectionName);
if (string.IsNullOrEmpty(storageAccountName))
{
throw new ArgumentException("GetStorageAccountName returned invalid result");
}
return ConnectionInfo.FromTokenCredential(this.tokenCredential, storageAccountName, recourceType);
case ResourceType.PageBlobStorage:
return null; // same as blob storage
case ResourceType.EventHubsNamespace:
string eventHubsNamespaceName = this.GetEventHubsNamespaceName(connectionName);
if (string.IsNullOrEmpty(eventHubsNamespaceName))
{
throw new ArgumentException("GetEventHubsNamespaceName returned invalid result");
}
return ConnectionInfo.FromTokenCredential(this.tokenCredential, eventHubsNamespaceName, recourceType);
default:
throw new NotSupportedException("unknown resource type");
}
}
/// <inheritdoc/>
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
if (TransportConnectionString.IsPseudoConnectionString(connectionName))
{
TransportConnectionString.Parse(connectionName, out storageChoice, out transportChoice);
}
else
{
storageChoice = StorageChoices.Faster;
transportChoice = TransportChoices.EventHubs;
}
}
}
#endif
}

Просмотреть файл

@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.AzureFunctions
{
using System;
using System.Collections.Generic;
using System.Text;
using Azure.Identity;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging.Abstractions;
#if !NETCOREAPP2_2
/// <summary>
/// Resolves connections using a token credential and a mapping from connection names to resource names.
/// </summary>
class NameResolverBasedConnectionNameResolver : DurableTask.Netherite.ConnectionNameToConnectionStringResolver
{
public NameResolverBasedConnectionNameResolver(INameResolver nameResolver)
: base((string name) => nameResolver.Resolve(name))
{
}
}
#endif
}

Просмотреть файл

@ -13,6 +13,7 @@ namespace DurableTask.Netherite.AzureFunctions
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
@ -28,6 +29,8 @@ namespace DurableTask.Netherite.AzureFunctions
readonly DurableTaskOptions options;
readonly INameResolver nameResolver;
readonly IHostIdProvider hostIdProvider;
readonly IServiceProvider serviceProvider;
readonly DurableTask.Netherite.ConnectionResolver connectionResolver;
readonly bool inConsumption;
@ -44,7 +47,10 @@ namespace DurableTask.Netherite.AzureFunctions
public const string ProviderName = "Netherite";
public string Name => ProviderName;
// Called by the Azure Functions runtime dependency injection infrastructure
/// <summary>
/// This constructor should not be called directly. It is here only to avoid breaking changes.
/// </summary>
[Obsolete]
public NetheriteProviderFactory(
IOptions<DurableTaskOptions> extensionOptions,
ILoggerFactory loggerFactory,
@ -55,13 +61,34 @@ namespace DurableTask.Netherite.AzureFunctions
INameResolver nameResolver,
#pragma warning disable CS0612 // Type or member is obsolete
IPlatformInformation platformInfo)
#pragma warning restore CS0612 // Type or member is obsolete
: this(extensionOptions, loggerFactory, hostIdProvider, nameResolver, serviceProvider:null, ConnectionResolver.FromConnectionNameToConnectionStringResolver((s) => nameResolver.Resolve(s)), platformInfo)
{
}
/// <summary>
/// This constructor should not be called directly. The DI logic calls this when it constructs all the
/// durability provider factories for use by <see cref="Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskExtension"/>.
/// </summary>
[ActivatorUtilitiesConstructor]
public NetheriteProviderFactory(
IOptions<DurableTaskOptions> extensionOptions,
ILoggerFactory loggerFactory,
IHostIdProvider hostIdProvider,
INameResolver nameResolver,
IServiceProvider serviceProvider,
DurableTask.Netherite.ConnectionResolver connectionResolver,
#pragma warning disable CS0612 // Type or member is obsolete
IPlatformInformation platformInfo)
#pragma warning restore CS0612 // Type or member is obsolete
{
this.options = extensionOptions?.Value ?? throw new ArgumentNullException(nameof(extensionOptions));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
this.serviceProvider = serviceProvider;
this.hostIdProvider = hostIdProvider;
this.connectionResolver = connectionResolver;
this.inConsumption = platformInfo.IsInConsumptionPlan();
bool ReadBooleanSetting(string name) => this.options.StorageProvider.TryGetValue(name, out object objValue)
@ -114,15 +141,29 @@ namespace DurableTask.Netherite.AzureFunctions
netheriteSettings.HubName = taskHubNameOverride;
}
// connections for Netherite are resolved either via an injected custom resolver, or otherwise by resolving connection names to connection strings
if (!string.IsNullOrEmpty(connectionName))
{
int pos = connectionName.IndexOf(',');
if (pos == -1 || pos == 0 || pos == connectionName.Length - 1 || pos != connectionName.LastIndexOf(','))
if (this.connectionResolver is NameResolverBasedConnectionNameResolver)
{
throw new ArgumentException("For Netherite, connection name must contain both StorageConnectionName and EventHubsConnectionName, separated by a comma", "connectionName");
// the application does not define a custom connection resolver.
// We split the connection name into two connection names, one for storage and one for event hubs
int pos = connectionName.IndexOf(',');
if (pos == -1 || pos == 0 || pos == connectionName.Length - 1 || pos != connectionName.LastIndexOf(','))
{
throw new ArgumentException("For Netherite, connection name must contain both StorageConnectionName and EventHubsConnectionName, separated by a comma", "connectionName");
}
netheriteSettings.StorageConnectionName = connectionName.Substring(0, pos).Trim();
netheriteSettings.EventHubsConnectionName = connectionName.Substring(pos + 1).Trim();
}
else
{
// the application resolves connection names using a custom resolver,
// which can create connections for different resources as needed
netheriteSettings.StorageConnectionName = connectionName;
netheriteSettings.EventHubsConnectionName = connectionName;
}
netheriteSettings.StorageConnectionName = connectionName.Substring(0, pos).Trim();
netheriteSettings.EventHubsConnectionName = connectionName.Substring(pos + 1).Trim();
}
string runtimeLanguage = this.nameResolver.Resolve("FUNCTIONS_WORKER_RUNTIME");
@ -131,7 +172,8 @@ namespace DurableTask.Netherite.AzureFunctions
netheriteSettings.CacheOrchestrationCursors = false; // cannot resume orchestrations in the middle
}
netheriteSettings.Validate((name) => this.nameResolver.Resolve(name));
// validate the settings and resolve the connections
netheriteSettings.Validate(this.connectionResolver);
int randomProbability = 0;
bool attachFaultInjector =
@ -186,10 +228,10 @@ namespace DurableTask.Netherite.AzureFunctions
{
if (this.TraceToBlob && BlobLogger == null)
{
BlobLogger = new BlobLogger(settings.ResolvedStorageConnectionString, settings.HubName, settings.WorkerId);
BlobLogger = new BlobLogger(settings.BlobStorageConnection, settings.HubName, settings.WorkerId);
}
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
var service = new NetheriteOrchestrationService(settings, this.loggerFactory, this.serviceProvider);
service.OnStopping += () => CachedProviders.TryRemove(key, out var _);

Просмотреть файл

@ -9,6 +9,7 @@ namespace DurableTask.Netherite.AzureFunctions
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
class NetheriteProviderStartup : IWebJobsStartup
{
@ -16,6 +17,7 @@ namespace DurableTask.Netherite.AzureFunctions
{
#if !NETCOREAPP2_2
builder.Services.AddSingleton<IDurabilityProviderFactory, NetheriteProviderFactory>();
builder.Services.TryAddSingleton<ConnectionResolver, NameResolverBasedConnectionNameResolver>();
#else
builder.Services.AddSingleton<IDurabilityProviderFactory, NetheriteProviderPseudoFactory>();
#endif

Просмотреть файл

@ -1,6 +1,12 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("DurableTask.Netherite.Tests, PublicKey="
+ "0024000004800000940000000602000000240000525341310004000001000100e93024fd5f6737"
+ "b5f408b39a97e746f673e7c62d690716c6022ee8871b2bc5efcbd8015dd02253302196ded0d9fc"
+ "0c6bc0e84740f74ca828fed9aebf15272867019cf991484c06b00c7d7aacb34b40ed0ae633f043"
+ "2df41db65caa4b03f9a0e6974c15aeaa7db2acd902914c0628f0b77d9b7839f530ba04584aed92"
+ "87159ebd")]
[assembly: InternalsVisibleTo("DurableTask.Netherite.AzureFunctions, PublicKey="
+ "0024000004800000940000000602000000240000525341310004000001000100e93024fd5f6737"
+ "b5f408b39a97e746f673e7c62d690716c6022ee8871b2bc5efcbd8015dd02253302196ded0d9fc"
+ "0c6bc0e84740f74ca828fed9aebf15272867019cf991484c06b00c7d7aacb34b40ed0ae633f043"

Просмотреть файл

@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using Microsoft.Azure.Storage;
using Microsoft.Extensions.Logging.Abstractions;
/// <summary>
/// Resolves connection names by using a mapping from connection names to connection strings,
/// or an already resolved connection string if specified inside the settings object.
/// Used for compatibility with old configuration scheme.
/// </summary>
class CompatibilityConnectionResolver : ConnectionNameToConnectionStringResolver
{
readonly NetheriteOrchestrationServiceSettings settings;
/// <summary>
/// Creates a connection string resolver for a given lookup function.
/// </summary>
/// <param name="connectionStringLookup">A function that maps connection names to connection strings.</param>
public CompatibilityConnectionResolver(NetheriteOrchestrationServiceSettings settings, Func<string, string> connectionStringLookup)
: base(connectionStringLookup)
{
this.settings = settings;
}
#pragma warning disable CS0618 // Type or member is obsolete
/// <inheritdoc/>
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
switch (recourceType)
{
case ResourceType.EventHubsNamespace:
if (!string.IsNullOrEmpty(this.settings.ResolvedTransportConnectionString))
{
return ConnectionInfo.FromEventHubsConnectionString(this.settings.ResolvedTransportConnectionString);
}
else
{
break;
}
case ResourceType.BlobStorage:
case ResourceType.TableStorage:
if (!string.IsNullOrEmpty(this.settings.ResolvedStorageConnectionString))
{
return ConnectionInfo.FromStorageConnectionString(this.settings.ResolvedStorageConnectionString, recourceType);
}
else
{
break;
}
case ResourceType.PageBlobStorage:
var resolved = this.settings.ResolvedPageBlobStorageConnectionString ?? this.settings.ResolvedStorageConnectionString;
if (!string.IsNullOrEmpty(resolved))
{
return ConnectionInfo.FromStorageConnectionString(resolved, recourceType);
}
else
{
break;
}
}
return base.ResolveConnectionInfo(taskHub, connectionName, recourceType);
}
/// <inheritdoc/>
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
if (TransportConnectionString.IsPseudoConnectionString(this.settings.ResolvedTransportConnectionString))
{
TransportConnectionString.Parse(this.settings.ResolvedTransportConnectionString, out storageChoice, out transportChoice);
}
else
{
base.ResolveLayerConfiguration(connectionName, out storageChoice, out transportChoice);
}
}
#pragma warning restore CS0618 // Type or member is obsolete
}
}

Просмотреть файл

@ -0,0 +1,148 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
/// <summary>
/// Internal abstraction used for capturing connection information and credentials.
/// Represents all different kinds of storage (blobs, tables, event hubs namespaces).
/// </summary>
public partial class ConnectionInfo
{
/// <summary>
/// The name of the resource.
/// </summary>
public string ResourceName { get; set; }
/// <summary>
/// A connection string for accessing this resource.
/// (may be null if using a <see cref="TokenCredential"/>).
/// </summary>
public string ConnectionString { get; set; }
/// <summary>
/// A token credential for accessing this resource.
/// (may be null if using a <see cref="ConnectionString"/>).
/// </summary>
public Azure.Core.TokenCredential TokenCredential { get; set; }
/// <summary>
/// The fully qualified name for the resource.
/// </summary>
public string HostName { get; set; }
/// <summary>
/// Scopes for the token.
/// </summary>
public string[] Scopes;
protected static readonly string[] s_storage_scopes = { "https://storage.azure.com/.default" };
protected static readonly string[] s_eventhubs_scopes = { "https://eventhubs.azure.net/.default" };
/// <summary>
/// Creates a connection info from an event hubs connection string
/// </summary>
/// <param name="connectionString">The connection string.</param>
/// <returns>The connection info.</returns>
public static ConnectionInfo FromEventHubsConnectionString(string connectionString)
{
var properties = Azure.Messaging.EventHubs.EventHubsConnectionStringProperties.Parse(connectionString);
string hostName = properties.FullyQualifiedNamespace;
string nameSpaceName = hostName.Split('.')[0];
return new ConnectionInfo()
{
ResourceName = nameSpaceName,
ConnectionString = connectionString,
TokenCredential = null,
HostName = hostName,
Scopes = s_eventhubs_scopes,
};
}
/// <summary>
/// Creates a connection info from a storage connection string
/// </summary>
/// <param name="connectionString">The connection string.</param>
/// <param name="resourceType">The resource type being accessed.</param>
/// <returns>The connection info.</returns>
public static ConnectionInfo FromStorageConnectionString(string connectionString, ConnectionResolver.ResourceType resourceType)
{
var cloudStorageAccount = CloudStorageAccount.Parse(connectionString);
return new ConnectionInfo()
{
ResourceName = cloudStorageAccount.Credentials.AccountName,
ConnectionString = connectionString,
TokenCredential = null,
HostName = GetEndpoint().Host,
Scopes = s_storage_scopes,
};
Uri GetEndpoint() => resourceType == ConnectionResolver.ResourceType.TableStorage
? cloudStorageAccount.TableEndpoint : cloudStorageAccount.BlobEndpoint;
}
/// <summary>
/// Creates a connection info from a token credential.
/// </summary>
/// <param name="tokenCredential">The token credential.</param>
/// <param name="name">The name of the resource (account name or namespace name).</param>
/// <param name="resourceType">The type of the resource.</param>
/// <returns></returns>
/// <returns>The connection info.</returns>
public static ConnectionInfo FromTokenCredential(Azure.Core.TokenCredential tokenCredential, string name, ConnectionResolver.ResourceType resourceType)
{
switch (resourceType)
{
case ConnectionResolver.ResourceType.EventHubsNamespace:
{
return new ConnectionInfo()
{
ResourceName = name,
ConnectionString = null,
TokenCredential = tokenCredential,
HostName = $"{name}.servicebus.windows.net",
Scopes = s_eventhubs_scopes,
};
}
case ConnectionResolver.ResourceType.BlobStorage:
case ConnectionResolver.ResourceType.PageBlobStorage:
return new ConnectionInfo()
{
ResourceName = name,
ConnectionString = null,
TokenCredential = tokenCredential,
HostName = $"{name}.blob.core.windows.net",
Scopes = s_storage_scopes,
};
case ConnectionResolver.ResourceType.TableStorage:
{
return new ConnectionInfo()
{
ResourceName = name,
ConnectionString = null,
TokenCredential = tokenCredential,
HostName = $"{name}.table.core.windows.net",
Scopes = s_storage_scopes,
};
}
default:
return null;
}
}
/// <summary>
/// When converting to a classic storage account, a renewal timer is associated with each CloudStorageAccount instance. We therefore use
/// a single instance to be shared by all.
/// </summary>
internal Task<Microsoft.Azure.Storage.CloudStorageAccount> CachedStorageAccountTask;
}
}

Просмотреть файл

@ -0,0 +1,219 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Net.Cache;
using System.Net.Http;
using System.Globalization;
using System.Security.Cryptography;
using System.Web;
using DurableTask.Netherite.EventHubsTransport;
using Microsoft.Azure.EventHubs;
using Azure.Core;
using System.Runtime.CompilerServices;
using Microsoft.Azure.EventHubs.Processor;
using Newtonsoft.Json.Serialization;
using DurableTask.Netherite.Faster;
/// <summary>
/// Utilities for constructing various SDK objects from a connection information.
/// </summary>
static class ConnectionInfoExtensions
{
/// <summary>
/// Returns a classic (v11 SDK) storage account object.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <returns>A task for the storage account object.</returns>
/// <exception cref="FormatException">Thrown if the host name of the connection info is not of the expected format {ResourceName}.{HostNameSuffix}.</exception>
public static Task<Microsoft.Azure.Storage.CloudStorageAccount> GetAzureStorageV11AccountAsync(this ConnectionInfo connectionInfo)
{
// storage accounts run a token renewal timer, so we want to share a single instance
if (connectionInfo.CachedStorageAccountTask == null)
{
connectionInfo.CachedStorageAccountTask = GetAsync();
}
return connectionInfo.CachedStorageAccountTask;
async Task<Microsoft.Azure.Storage.CloudStorageAccount> GetAsync()
{
if (connectionInfo.ConnectionString != null)
{
return Microsoft.Azure.Storage.CloudStorageAccount.Parse(connectionInfo.ConnectionString);
}
else
{
var credentials = new Microsoft.Azure.Storage.Auth.StorageCredentials(await connectionInfo.ToLegacyCredentialAsync(CancellationToken.None));
// hostnames are generally structured like
// accountname.blob.core.windows.net
// accountname.table.core.windows.net
// databasename.table.cosmos.azure.com
int firstDot = connectionInfo.HostName.IndexOf('.');
int secondDot = connectionInfo.HostName.IndexOf('.', firstDot + 1);
string hostNameSuffix = connectionInfo.HostName.Substring(secondDot + 1);
return new Microsoft.Azure.Storage.CloudStorageAccount(
storageCredentials: credentials,
accountName: connectionInfo.ResourceName,
endpointSuffix: hostNameSuffix,
useHttps: true);
}
}
}
/// <summary>
/// Creates an Azure Storage table client for the v12 SDK.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="tableName">The table name.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns></returns>
public static Azure.Data.Tables.TableClient GetAzureStorageV12TableClientAsync(this ConnectionInfo connectionInfo, string tableName, CancellationToken cancellationToken = default)
{
if (connectionInfo.ConnectionString != null)
{
return new Azure.Data.Tables.TableClient(connectionInfo.ConnectionString, tableName);
}
else
{
return new Azure.Data.Tables.TableClient(new Uri($"https://{connectionInfo.HostName}/"), tableName, connectionInfo.TokenCredential);
}
}
/// <summary>
/// Creates an Event Hub client for the given connection info.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="eventHub">The event hub name.</param>
/// <returns></returns>
public static EventHubClient CreateEventHubClient(this ConnectionInfo connectionInfo, string eventHub)
{
if (connectionInfo.ConnectionString != null)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionInfo.ConnectionString)
{
EntityPath = eventHub
};
return EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
}
else
{
Uri uri = new Uri($"sb://{connectionInfo.HostName}");
var tokenProvider = new EventHubsTokenProvider(connectionInfo);
return EventHubClient.CreateWithTokenProvider(uri, eventHub, tokenProvider);
}
}
/// <summary>
/// Creates an event processor host for the given connection info.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="hostName">The host name.</param>
/// <param name="eventHubPath">The event hub name.</param>
/// <param name="consumerGroupName">The consumer group name.</param>
/// <param name="checkpointStorage">A connection info for the checkpoint storage.</param>
/// <param name="leaseContainerName">The name of the lease container.</param>
/// <param name="storageBlobPrefix">A prefix for storing the blobs.</param>
/// <returns>An event processor host.</returns>
public static async Task<EventProcessorHost> GetEventProcessorHostAsync(
this ConnectionInfo connectionInfo,
string hostName,
string eventHubPath,
string consumerGroupName,
ConnectionInfo checkpointStorage,
string leaseContainerName,
string storageBlobPrefix)
{
if (connectionInfo.ConnectionString != null)
{
return new EventProcessorHost(
hostName,
eventHubPath,
consumerGroupName,
connectionInfo.ConnectionString,
checkpointStorage.ConnectionString,
leaseContainerName,
storageBlobPrefix);
}
else
{
var storageAccount = await checkpointStorage.GetAzureStorageV11AccountAsync();
return new EventProcessorHost(
new Uri($"sb://{connectionInfo.HostName}"),
eventHubPath,
consumerGroupName,
(ITokenProvider) (new EventHubsTokenProvider(connectionInfo)),
storageAccount,
leaseContainerName,
storageBlobPrefix);
}
}
class EventHubsTokenProvider : Microsoft.Azure.EventHubs.ITokenProvider
{
readonly ConnectionInfo info;
public EventHubsTokenProvider(ConnectionInfo info)
{
this.info = info;
}
static TimeSpan NextRefresh(AccessToken token)
{
DateTimeOffset now = DateTimeOffset.UtcNow;
return token.ExpiresOn - now - TimeSpan.FromMinutes(1); // refresh it a bit early.
}
async Task<SecurityToken> ITokenProvider.GetTokenAsync(string appliesTo, TimeSpan timeout)
{
TokenRequestContext request = new(this.info.Scopes);
AccessToken accessToken = await this.info.TokenCredential.GetTokenAsync(request, CancellationToken.None);
return new JsonSecurityToken(accessToken.Token, appliesTo);
}
}
/// <summary>
/// Adds the necessary authorization headers to a REST http request.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="request">The request object.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns></returns>
public static async Task AuthorizeHttpRequestMessage(this ConnectionInfo connectionInfo, HttpRequestMessage request, CancellationToken cancellationToken)
{
if (connectionInfo.ConnectionString != null)
{
// parse the eventhubs connection string to extract various parameters
var properties = Azure.Messaging.EventHubs.EventHubsConnectionStringProperties.Parse(connectionInfo.ConnectionString);
string resourceUri = properties.Endpoint.AbsoluteUri;
string keyName = properties.SharedAccessKeyName;
string key = properties.SharedAccessKey;
// create a token (from https://docs.microsoft.com/en-us/rest/api/eventhub/generate-sas-token#c)
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
var week = 60 * 60 * 24 * 7;
var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + week);
string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var sasToken = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}", HttpUtility.UrlEncode(resourceUri), HttpUtility.UrlEncode(signature), expiry, keyName);
//add it to the request
request.Headers.Add("Authorization", sasToken);
}
else
{
var bearerToken = (await connectionInfo.ToLegacyCredentialAsync(cancellationToken)).Token;
request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", bearerToken);
}
}
}
}

Просмотреть файл

@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using Microsoft.Azure.Storage;
using Microsoft.Extensions.Logging.Abstractions;
/// <summary>
/// Resolves connection names by using a mapping from connection names to connection strings.
/// </summary>
class ConnectionNameToConnectionStringResolver : ConnectionResolver
{
readonly Func<string, string> connectionStringLookup;
/// <summary>
/// Creates a connection string resolver for a given lookup function.
/// </summary>
/// <param name="connectionStringLookup">A function that maps connection names to connection strings.</param>
public ConnectionNameToConnectionStringResolver(Func<string, string> connectionStringLookup)
{
this.connectionStringLookup = connectionStringLookup;
}
/// <inheritdoc/>
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
var connectionString = this.connectionStringLookup?.Invoke(connectionName);
if (connectionString == null)
{
return null;
}
else if (recourceType == ResourceType.EventHubsNamespace)
{
return ConnectionInfo.FromEventHubsConnectionString(connectionString);
}
else if (recourceType == ResourceType.PageBlobStorage)
{
return null; // this resolver does not support using a separate page blob connection
}
else
{
return ConnectionInfo.FromStorageConnectionString(connectionString, recourceType);
}
}
/// <inheritdoc/>
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
if (TransportConnectionString.IsPseudoConnectionString(connectionName))
{
TransportConnectionString.Parse(connectionName, out storageChoice, out transportChoice);
}
else
{
var connectionString = this.connectionStringLookup?.Invoke(connectionName);
if (TransportConnectionString.IsPseudoConnectionString(connectionString))
{
TransportConnectionString.Parse(connectionString, out storageChoice, out transportChoice);
}
else
{
// the default settings are Faster and EventHubs
storageChoice = StorageChoices.Faster;
transportChoice = TransportChoices.EventHubs;
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,91 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
/// <summary>
/// An abstract class that represents a method for resolving named connections into ConnectionInfo objects, which can then be used to connect.
/// </summary>
public abstract class ConnectionResolver
{
/// <summary>
/// The cloud resources for which Netherite may require connection information
/// </summary>
public enum ResourceType
{
/// <summary>
/// The event hubs namespace. Used for connecting various logical components (partitions, clients, and the load monitor).
/// Not required if the layer configuration uses <see cref="TransportChoices.SingleHost"/>.
/// </summary>
EventHubsNamespace,
/// <summary>
/// The blob storage account, used for task hub storage and event hub consumer checkpoints.
/// Not required if the layer configuration uses <see cref="StorageChoices.Memory"/>.
/// </summary>
BlobStorage,
/// <summary>
/// The table storage account, used for publishing load information.
/// Not required if <see cref="NetheriteOrchestrationServiceSettings.LoadInformationAzureTableName"/> is set to null, or if
/// the layer configuration uses <see cref="StorageChoices.Memory"/>.
/// </summary>
TableStorage,
/// <summary>
/// The page blob storage account. Optional, can be used for storing page blobs separately from other blobs.
/// </summary>
PageBlobStorage,
}
/// <summary>
/// Attempts to resolves the given name to obtain connection information.
/// </summary>
/// <param name="taskHub">The name of the task hub.</param>
/// <param name="connectionName">The connection name.</param>
/// <param name="recourceType">The type of resource to which a connection is desired to be made.</param>
/// <returns>A ConnectionInfo with the required parameters to connect, or null if not found.</returns>
public abstract ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType);
/// <summary>
/// Determines the layers to use. For example, can configure full emulation by assigning <see cref="StorageChoices.Memory"/> and <see cref="TransportChoices.SingleHost"/>.
/// </summary>
/// <param name="connectionName">The connection name.</param>
/// <param name="storageChoice">The storage layer to use.</param>
/// <param name="transportChoice">The transport layer to use.</param>
public abstract void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice);
/// <summary>
/// Creates a connection resolver for a given connection string lookup function.
/// </summary>
/// <param name="connectionStringLookup">A function that maps connection names to connection strings.</param>
public static ConnectionResolver FromConnectionNameToConnectionStringResolver(Func<string, string> connectionStringLookup)
{
return new ConnectionNameToConnectionStringResolver(connectionStringLookup);
}
/// <summary>
/// Creates a connection resolver given a token credential, a storage account name, and an event hubs namespace name.
/// Applies the default suffixes (*.core.windows.net for storage accounts and *.servicebus.windows.net for event hubs).
/// </summary>
/// <param name="tokenCredential">The token credential to use.</param>
/// <param name="storageAccountName">The name of the storage account.</param>
/// <param name="eventHubNamespaceName">The name of the event hub namespace, or null if using the singlehost configuration.</param>
public static ConnectionResolver FromTokenCredential(Azure.Core.TokenCredential tokenCredential, string storageAccountName, string eventHubNamespaceName = null)
{
return new SimpleCredentialResolver(tokenCredential, storageAccountName, eventHubNamespaceName);
}
/// <summary>
/// Creates a connection resolver to use exclusively for in-memory emulation.
/// </summary>
public static ConnectionResolver ForInMemoryEmulation()
{
return new SimpleCredentialResolver(null, null, null);
}
}
}

Просмотреть файл

@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
using Azure.Core;
using Microsoft.Azure.Storage.Auth;
using System.Threading.Tasks;
using System.Threading;
/// <summary>
/// A utility class for constructing a <see cref="Microsoft.Azure.Storage.Auth.TokenCredential"/>
/// from a <see cref="Azure.Core.TokenCredential"/>.
/// </summary>
static class CredentialShim
{
/// <summary>
/// Creates a <see cref="Microsoft.Azure.Storage.Auth.TokenCredential"/> from the connection info.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns></returns>
public static async Task<Microsoft.Azure.Storage.Auth.TokenCredential> ToLegacyCredentialAsync(this ConnectionInfo connectionInfo, CancellationToken cancellationToken)
{
AccessToken token = await GetTokenAsync(connectionInfo.TokenCredential, connectionInfo.Scopes, cancellationToken);
return new Microsoft.Azure.Storage.Auth.TokenCredential(token.Token, RenewTokenAsync, connectionInfo, NextRefresh(token));
}
static ValueTask<AccessToken> GetTokenAsync(
Azure.Core.TokenCredential credential, string[] scopes, CancellationToken cancellation)
{
TokenRequestContext request = new(scopes);
return credential.GetTokenAsync(request, cancellation);
}
static async Task<NewTokenAndFrequency> RenewTokenAsync(object state, CancellationToken cancellationToken)
{
ConnectionInfo connectionInfo = (ConnectionInfo)state;
AccessToken token = await GetTokenAsync(connectionInfo.TokenCredential, connectionInfo.Scopes, cancellationToken);
return new(token.Token, NextRefresh(token));
}
static TimeSpan NextRefresh(AccessToken token)
{
DateTimeOffset now = DateTimeOffset.UtcNow;
return token.ExpiresOn - now - TimeSpan.FromMinutes(1); // refresh it a bit early.
}
}
}

Просмотреть файл

@ -24,9 +24,9 @@ namespace DurableTask.Netherite
/// <param name="eventHubName">The name of the event hub.</param>
/// <param name="partitionCount">The number of partitions to create, if the event hub does not already exist.</param>
/// <returns>true if the event hub was created.</returns>
public static async Task<bool> EnsureEventHubExistsAsync(string connectionString, string eventHubName, int partitionCount)
public static async Task<bool> EnsureEventHubExistsAsync(ConnectionInfo info, string eventHubName, int partitionCount, CancellationToken cancellationToken)
{
var response = await SendHttpRequest(connectionString, eventHubName, partitionCount);
var response = await SendHttpRequest(info, eventHubName, partitionCount, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
{
response.EnsureSuccessStatusCode();
@ -40,9 +40,9 @@ namespace DurableTask.Netherite
/// <param name="connectionString">The SAS connection string for the namespace.</param>
/// <param name="eventHubName">The name of the event hub.</param>
/// <returns>true if the event hub was deleted.</returns>
public static async Task<bool> DeleteEventHubIfExistsAsync(string connectionString, string eventHubName)
public static async Task<bool> DeleteEventHubIfExistsAsync(ConnectionInfo info, string eventHubName, CancellationToken cancellationToken)
{
var response = await SendHttpRequest(connectionString, eventHubName, null);
var response = await SendHttpRequest(info, eventHubName, null, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.NotFound)
{
response.EnsureSuccessStatusCode();
@ -50,35 +50,12 @@ namespace DurableTask.Netherite
return response.StatusCode == System.Net.HttpStatusCode.OK;
}
static Task<HttpResponseMessage> SendHttpRequest(string connectionString, string eventHubName, int? partitionCount)
static async Task<HttpResponseMessage> SendHttpRequest(ConnectionInfo info, string eventHubName, int? partitionCount, CancellationToken cancellationToken)
{
// parse the eventhubs connection string to extract various parameters
var properties = Azure.Messaging.EventHubs.EventHubsConnectionStringProperties.Parse(connectionString);
string resource = properties.FullyQualifiedNamespace;
string resourceUri = properties.Endpoint.AbsoluteUri;
string name = properties.SharedAccessKeyName;
string key = properties.SharedAccessKey;
// create a token that allows us to create an eventhub
string sasToken = createToken(resourceUri, name, key);
// the following token creation code is taken from https://docs.microsoft.com/en-us/rest/api/eventhub/generate-sas-token#c
string createToken(string resourceUri, string name, string key)
{
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
var week = 60 * 60 * 24 * 7;
var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + week);
string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var sasToken = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}", HttpUtility.UrlEncode(resourceUri), HttpUtility.UrlEncode(signature), expiry, name);
return sasToken;
}
// send an http request to create or delete the eventhub
HttpClient client = new HttpClient();
var request = new HttpRequestMessage();
request.RequestUri = new Uri($"https://{resource}/{eventHubName}?timeout=60&api-version=2014-01");
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubName}?timeout=60&api-version=2014-01");
request.Method = partitionCount.HasValue ? HttpMethod.Put : HttpMethod.Delete;
if (partitionCount.HasValue)
{
@ -94,10 +71,12 @@ namespace DurableTask.Netherite
Encoding.UTF8,
"application/xml");
}
request.Headers.Add("Authorization", sasToken);
request.Headers.Add("Host", resource);
request.Headers.Add("Host", info.HostName);
return client.SendAsync(request);
// add an authorization header to the request
await info.AuthorizeHttpRequestMessage(request, cancellationToken);
return await client.SendAsync(request);
}
}
}

Просмотреть файл

@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
using Azure.Core;
using Microsoft.Extensions.Logging.Abstractions;
/// <summary>
/// Resolves connections using a token credential, a storage account name, and an eventhubs namespace name.
/// </summary>
class SimpleCredentialResolver : ConnectionResolver
{
readonly Azure.Core.TokenCredential tokenCredential;
readonly string storageAccountName;
readonly string eventHubNamespaceName;
/// <summary>
/// Create a connection resolver that uses an Azure token credential.
/// </summary>
/// <param name="tokenCredential">The token credential to use.</param>
/// <param name="storageAccountName">The name of the storage account, or null if using in-memory emulation.</param>
/// <param name="eventHubNamespaceName">The name of the event hub namespace, or null if using the singlehost configuration.</param>
public SimpleCredentialResolver(Azure.Core.TokenCredential tokenCredential, string storageAccountName = null, string eventHubNamespaceName = null)
{
this.tokenCredential = tokenCredential;
this.storageAccountName = storageAccountName;
this.eventHubNamespaceName = eventHubNamespaceName;
}
/// <inheritdoc/>
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
switch (recourceType)
{
case ResourceType.BlobStorage:
case ResourceType.TableStorage:
return ConnectionInfo.FromTokenCredential(this.tokenCredential, this.storageAccountName, recourceType);
case ResourceType.PageBlobStorage:
return null; // same as blob storage
case ResourceType.EventHubsNamespace:
return ConnectionInfo.FromTokenCredential(this.tokenCredential, this.eventHubNamespaceName, recourceType);
default:
throw new NotImplementedException("unknown resource type");
}
}
/// <inheritdoc/>
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
if (string.IsNullOrEmpty(this.storageAccountName))
{
storageChoice = StorageChoices.Memory;
transportChoice = TransportChoices.SingleHost;
}
else if (string.IsNullOrEmpty(this.eventHubNamespaceName))
{
storageChoice = StorageChoices.Faster;
transportChoice = TransportChoices.SingleHost;
}
else
{
storageChoice = StorageChoices.Faster;
transportChoice = TransportChoices.EventHubs;
}
}
}
}

Просмотреть файл

@ -50,6 +50,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.25.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.6.1" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.2" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />

Просмотреть файл

@ -15,6 +15,7 @@ namespace DurableTask.Netherite
[DataMember]
public long RequestId { get; set; }
[IgnoreDataMember]
public override EventId EventId => EventId.MakeClientResponseEventId(this.ClientId, this.RequestId);
}
}

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
/// <summary>
/// An exception that indicates configuration errors for Netherite.
/// </summary>
[Serializable()]
public class NetheriteConfigurationException : System.Exception
{
public NetheriteConfigurationException() : base() { }
public NetheriteConfigurationException(string message) : base(message) { }
public NetheriteConfigurationException(string message, System.Exception inner) : base(message, inner) { }
protected NetheriteConfigurationException(System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context) : base(info, context) { }
}
}

Просмотреть файл

@ -10,6 +10,7 @@ namespace DurableTask.Netherite
using DurableTask.Netherite.Faster;
using DurableTask.Netherite.Scaling;
using Microsoft.Azure.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
@ -29,8 +30,17 @@ namespace DurableTask.Netherite
DurableTask.Netherite.IOrchestrationServiceQueryClient,
TransportAbstraction.IHost
{
readonly TransportConnectionString.StorageChoices configuredStorage;
readonly TransportConnectionString.TransportChoices configuredTransport;
/// <summary>
/// The type of transport layer that was configured.
/// </summary>
public TransportChoices TransportChoice { get; }
/// <summary>
/// The type of storage layer that was configured.
/// </summary>
public StorageChoices StorageChoice { get; }
readonly ITransportLayer transport;
readonly IStorageLayer storage;
@ -43,6 +53,11 @@ namespace DurableTask.Netherite
/// </summary>
public const string LoggerCategoryName = "DurableTask.Netherite";
public ITransportLayer TransportLayer => this.transport;
internal IStorageLayer StorageLayer => this.storage;
public IServiceProvider ServiceProvider { get; }
CancellationTokenSource serviceShutdownSource;
Exception startupException;
Timer threadWatcher;
@ -88,15 +103,24 @@ namespace DurableTask.Netherite
#else
string configuration = "Release";
#endif
return $"NetheriteOrchestrationService on {this.configuredTransport}Transport and {this.configuredStorage}Storage, {configuration} build";
return $"NetheriteOrchestrationService on {this.Settings.TransportChoice}Transport and {this.Settings.StorageChoice}Storage, {configuration} build";
}
/// <summary>
/// Creates a new instance of the OrchestrationService with default settings
/// </summary>
public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings settings, ILoggerFactory loggerFactory)
: this(settings, loggerFactory, null)
{
}
/// <summary>
/// Creates a new instance of the OrchestrationService with default settings
/// </summary>
public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings settings, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
this.LoggerFactory = loggerFactory;
this.ServiceProvider = serviceProvider;
this.Settings = settings;
this.TraceHelper = new OrchestrationServiceTraceHelper(loggerFactory, settings.LogLevelLimit, settings.WorkerId, settings.HubName);
this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, settings.HubName);
@ -104,42 +128,57 @@ namespace DurableTask.Netherite
try
{
this.TraceHelper.TraceProgress("Reading configuration for transport and storage layers");
TransportConnectionString.Parse(this.Settings.ResolvedTransportConnectionString, out this.configuredStorage, out this.configuredTransport);
// determine a storage account name to be used for tracing
this.StorageAccountName = this.configuredStorage == TransportConnectionString.StorageChoices.Memory
? "Memory"
: CloudStorageAccount.Parse(this.Settings.ResolvedStorageConnectionString).Credentials.AccountName;
this.TraceHelper.StorageAccountName = this.workItemTraceHelper.StorageAccountName = this.StorageAccountName;
this.TraceHelper.TraceCreated(Environment.ProcessorCount, this.configuredTransport, this.configuredStorage);
switch (this.configuredStorage)
if (!settings.ResolutionComplete)
{
case TransportConnectionString.StorageChoices.Memory:
throw new NetheriteConfigurationException("settings must be validated before constructing orchestration service");
}
// determine a storage account name to be used for tracing
this.StorageAccountName = this.Settings.StorageAccountName;
this.TraceHelper.TraceCreated(Environment.ProcessorCount, this.Settings.TransportChoice, this.Settings.StorageChoice);
// construct the storage layer
switch (this.Settings.StorageChoice)
{
case StorageChoices.Memory:
this.storage = new MemoryStorageLayer(this.Settings, this.TraceHelper.Logger);
break;
case TransportConnectionString.StorageChoices.Faster:
case StorageChoices.Faster:
this.storage = new FasterStorageLayer(this.Settings, this.TraceHelper, this.LoggerFactory);
break;
default:
throw new NotImplementedException("no such storage choice");
case StorageChoices.Custom:
var storageLayerFactory = this.ServiceProvider?.GetService<IStorageLayerFactory>();
if (storageLayerFactory == null)
{
throw new NetheriteConfigurationException("could not find injected IStorageLayerFactory");
}
this.storage = storageLayerFactory.Create(this);
break;
}
switch (this.configuredTransport)
// construct the transport layer
switch (this.Settings.TransportChoice)
{
case TransportConnectionString.TransportChoices.SingleHost:
case TransportChoices.SingleHost:
this.transport = new SingleHostTransport.SingleHostTransportLayer(this, settings, this.storage, this.TraceHelper.Logger);
break;
case TransportConnectionString.TransportChoices.EventHubs:
case TransportChoices.EventHubs:
this.transport = new EventHubsTransport.EventHubsTransport(this, settings, this.storage, loggerFactory);
break;
default:
throw new NotImplementedException("no such transport choice");
case TransportChoices.Custom:
var transportLayerFactory = this.ServiceProvider?.GetService<ITransportLayerFactory>();
if (transportLayerFactory == null)
{
throw new NetheriteConfigurationException("could not find injected ITransportLayerFactory");
}
this.transport = transportLayerFactory.Create(this);
break;
}
this.workItemStopwatch.Start();
@ -168,19 +207,24 @@ namespace DurableTask.Netherite
/// <returns>true if autoscaling is supported, false otherwise</returns>
public bool TryGetScalingMonitor(out ScalingMonitor monitor)
{
if (this.configuredStorage == TransportConnectionString.StorageChoices.Faster
&& this.configuredTransport == TransportConnectionString.TransportChoices.EventHubs)
if (this.Settings.StorageChoice == StorageChoices.Faster
&& this.Settings.TransportChoice == TransportChoices.EventHubs)
{
try
{
ILoadPublisherService loadPublisher = string.IsNullOrEmpty(this.Settings.LoadInformationAzureTableName) ?
new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName)
: new AzureTableLoadPublisher(this.Settings.TableStorageConnection, this.Settings.LoadInformationAzureTableName, this.Settings.HubName);
monitor = new ScalingMonitor(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedTransportConnectionString,
loadPublisher,
this.Settings.EventHubsConnection,
this.Settings.LoadInformationAzureTableName,
this.Settings.HubName,
this.TraceHelper.TraceScaleRecommendation,
this.TraceHelper.TraceProgress,
this.TraceHelper.TraceError);
return true;
}
catch (Exception e)

Просмотреть файл

@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Runtime;
using DurableTask.Core;
using FASTER.core;
using Microsoft.Extensions.Logging;
@ -28,22 +29,11 @@ namespace DurableTask.Netherite
/// <summary>
/// Gets or sets the name for resolving the Eventhubs namespace connection string.
/// Alternatively, may contain special strings specifying the use of the emulator.
/// Pseudo-connection-strings "Memory" or "SingleHost" can be used to configure
/// in-memory emulation, or single-host configuration, respectively.
/// </summary>
public string EventHubsConnectionName { get; set; } = "EventHubsConnection";
/// <summary>
/// The resolved storage connection string. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
public string ResolvedStorageConnectionString { get; set; }
/// <summary>
/// The resolved event hubs connection string. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
public string ResolvedTransportConnectionString { get; set; }
/// <summary>
/// Gets or sets the identifier for the current worker.
/// </summary>
@ -172,20 +162,6 @@ namespace DurableTask.Netherite
/// </summary>
public int PackPartitionTaskMessages { get; set; } = 100;
/// <summary>
/// A name for resolving a storage connection string to be used specifically for the page blobs, or null if page blobs are to be stored in the default account.
/// </summary>
public string PageBlobStorageConnectionName { get; set; } = null;
/// <summary>
/// The resolved page blob storage connection string, or null if page blobs are to be stored in the default account. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
public string ResolvedPageBlobStorageConnectionString { get; set; }
[JsonIgnore]
internal bool UseSeparatePageBlobStorage => !string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString);
/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
@ -241,67 +217,143 @@ namespace DurableTask.Netherite
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel LogLevelLimit { get; set; } = LogLevel.Debug;
#region Compatibility Shim
/// <summary>
/// The resolved storage connection string. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string ResolvedStorageConnectionString { get; set; }
/// <summary>
/// The resolved event hubs connection string. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string ResolvedTransportConnectionString { get; set; }
/// <summary>
/// A name for resolving a storage connection string to be used specifically for the page blobs, or null if page blobs are to be stored in the default account.
/// </summary>
[JsonProperty(DefaultValueHandling = DefaultValueHandling.Ignore)]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string PageBlobStorageConnectionName { get; set; } = null;
/// <summary>
/// The resolved page blob storage connection string, or null if page blobs are to be stored in the default account. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string ResolvedPageBlobStorageConnectionString { get; set; }
class CompatibilityResolver : ConnectionResolver
{
readonly Func<string, string> nameResolver;
CompatibilityResolver(Func<string, string> nameResolver)
{
this.nameResolver = nameResolver;
}
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
throw new NotImplementedException();
}
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
throw new NotImplementedException();
}
}
#endregion
#region Parameters that are set during resolution
/// <summary>
/// The type of storage layer to be used
/// </summary>
[JsonIgnore]
public StorageChoices StorageChoice { get; protected set; }
/// <summary>
/// The type of transport layer to be used
/// </summary>
[JsonIgnore]
public TransportChoices TransportChoice { get; protected set; }
/// <summary>
/// The connection information for Azure Storage blobs.
/// If not explicitly set, this is populated during validation by resolving <see cref="StorageConnectionName"/>.
/// </summary>
[JsonIgnore]
public ConnectionInfo BlobStorageConnection { get; protected set; }
/// <summary>
/// The connection information for Azure Storage tables.
/// If not explicitly set, this is populated during validation by resolving <see cref="StorageConnectionName"/>.
/// </summary>
[JsonIgnore]
public ConnectionInfo TableStorageConnection { get; protected set; }
/// <summary>
/// The connection information for the event hubs namespace.
/// If not explicitly set, this is populated during validation by resolving <see cref="EventHubsConnectionName"/>.
/// </summary>
[JsonIgnore]
public ConnectionInfo EventHubsConnection { get; protected set; }
/// <summary>
/// The connection information for Azure Storage page blobs.
///This is usually null, which means the same <see cref="BlobStorageConnection"/> should be used for page blobs also.
/// </summary>
[JsonIgnore]
public ConnectionInfo PageBlobStorageConnection { get; protected set; }
/// <summary>
/// Whether the storage layer was configured to use a different connection for page blobs than for other blobs
/// </summary>
[JsonIgnore]
internal bool UseSeparatePageBlobStorage => this.PageBlobStorageConnection != null;
[JsonIgnore]
public string StorageAccountName
=> this.StorageChoice == StorageChoices.Memory ? "Memory" : this.BlobStorageConnection.ResourceName;
/// <summary>
/// Whether the settings have been validated and resolved
/// </summary>
[JsonIgnore]
public bool ResolutionComplete { get; protected set; }
#endregion
/// <summary>
/// Validates the settings, throwing exceptions if there are issues.
/// </summary>
/// <param name="nameResolver">Optionally, a resolver for connection names.</param>
public void Validate(Func<string, string> nameResolver = null)
public void Validate(Func<string, string> connectionNameToConnectionString = null)
{
this.Validate(new CompatibilityConnectionResolver(this, connectionNameToConnectionString));
}
/// <summary>
/// Validates the settings and resolves the connections, throwing exceptions if there are issues.
/// </summary>
/// <param name="resolver">A connection resolver.</param>
public void Validate(ConnectionResolver resolver)
{
if (string.IsNullOrEmpty(this.HubName))
{
throw new InvalidOperationException($"Must specify {nameof(this.HubName)} for Netherite storage provider.");
}
if (this.PartitionCount < 1 || this.PartitionCount > 32)
{
throw new ArgumentOutOfRangeException(nameof(this.PartitionCount));
throw new NetheriteConfigurationException($"Must specify {nameof(this.HubName)} for Netherite storage provider.");
}
ValidateTaskhubName(this.HubName);
if (string.IsNullOrEmpty(this.ResolvedTransportConnectionString))
if (this.PartitionCount < 1 || this.PartitionCount > 32)
{
if (string.IsNullOrEmpty(this.EventHubsConnectionName))
{
throw new InvalidOperationException($"Must specify {nameof(this.EventHubsConnectionName)} for Netherite storage provider.");
}
if (TransportConnectionString.IsPseudoConnectionString(this.EventHubsConnectionName))
{
this.ResolvedTransportConnectionString = this.EventHubsConnectionName;
}
else
{
if (nameResolver == null)
{
throw new InvalidOperationException($"Must either specify {nameof(this.ResolvedTransportConnectionString)}, or specify {nameof(this.EventHubsConnectionName)} and provide a nameResolver, to construct Netherite storage provider.");
}
this.ResolvedTransportConnectionString = nameResolver(this.EventHubsConnectionName);
if (string.IsNullOrEmpty(this.ResolvedTransportConnectionString))
{
throw new InvalidOperationException($"Could not resolve {nameof(this.EventHubsConnectionName)}:{this.EventHubsConnectionName} for Netherite storage provider.");
}
}
}
TransportConnectionString.Parse(this.ResolvedTransportConnectionString, out var storage, out var transport);
if (transport == TransportConnectionString.TransportChoices.EventHubs)
{
// validates the connection string
TransportConnectionString.EventHubsNamespaceName(this.ResolvedTransportConnectionString);
}
if (storage == TransportConnectionString.StorageChoices.Memory)
{
this.ResolvedStorageConnectionString = null;
this.ResolvedPageBlobStorageConnectionString = null;
}
else
{
this.ValidateAzureStorageConnectionStrings(nameResolver);
throw new ArgumentOutOfRangeException(nameof(this.PartitionCount));
}
if (this.MaxConcurrentOrchestratorFunctions <= 0)
@ -313,78 +365,90 @@ namespace DurableTask.Netherite
{
throw new ArgumentOutOfRangeException(nameof(this.MaxConcurrentActivityFunctions));
}
}
public void ValidateAzureStorageConnectionStrings(Func<string, string> nameResolver)
{
if (string.IsNullOrEmpty(this.ResolvedStorageConnectionString))
resolver.ResolveLayerConfiguration(this.EventHubsConnectionName, out var storage, out var transport);
this.StorageChoice = storage;
this.TransportChoice = transport;
if (this.TransportChoice == TransportChoices.EventHubs)
{
if (nameResolver == null)
// we need a valid event hubs connection
if (string.IsNullOrEmpty(this.EventHubsConnectionName) && resolver is ConnectionNameToConnectionStringResolver)
{
throw new InvalidOperationException($"Must either specify {nameof(this.ResolvedStorageConnectionString)}, or specify {nameof(this.StorageConnectionName)} and provide a nameResolver, to construct Netherite storage provider.");
throw new NetheriteConfigurationException($"Must specify {nameof(this.EventHubsConnectionName)} for Netherite storage provider.");
}
if (string.IsNullOrEmpty(this.StorageConnectionName))
{
throw new InvalidOperationException($"Must specify {nameof(this.StorageConnectionName)} for Netherite storage provider.");
}
this.ResolvedStorageConnectionString = nameResolver(this.StorageConnectionName);
if (string.IsNullOrEmpty(this.ResolvedStorageConnectionString))
{
throw new InvalidOperationException($"Could not resolve {nameof(this.StorageConnectionName)}:{this.StorageConnectionName} for Netherite storage provider.");
}
}
if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString)
&& !string.IsNullOrEmpty(this.PageBlobStorageConnectionName))
{
if (nameResolver == null)
{
throw new InvalidOperationException($"Must either specify {nameof(this.ResolvedPageBlobStorageConnectionString)}, or specify {nameof(this.PageBlobStorageConnectionName)} and provide a nameResolver, to construct Netherite storage provider.");
}
this.ResolvedPageBlobStorageConnectionString = nameResolver(this.PageBlobStorageConnectionName);
if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString))
{
throw new InvalidOperationException($"Could not resolve {nameof(this.PageBlobStorageConnectionName)}:{this.PageBlobStorageConnectionName} for Netherite storage provider.");
}
}
// make sure the connection string can be parsed correctly
try
{
Microsoft.Azure.Storage.CloudStorageAccount.Parse(this.ResolvedStorageConnectionString);
}
catch (Exception e)
{
throw new FormatException($"Could not parse the specified storage connection string for Netherite storage provider", e);
}
if (!string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString))
{
// make sure the connection string can be parsed correctly
try
{
Microsoft.Azure.Storage.CloudStorageAccount.Parse(this.ResolvedPageBlobStorageConnectionString);
this.EventHubsConnection = resolver.ResolveConnectionInfo(this.HubName, this.EventHubsConnectionName, ConnectionResolver.ResourceType.EventHubsNamespace);
}
catch (Exception e)
{
throw new FormatException($"Could not parse the specified page blob storage connection string for Netherite storage provider", e);
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.EventHubsConnectionName)}={this.EventHubsConnectionName} to create an eventhubs connection for Netherite storage provider: {e.Message}", e);
}
if (this.EventHubsConnection == null)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.EventHubsConnectionName)}={this.EventHubsConnectionName} to create an eventhubs connection for Netherite storage provider.");
}
}
if (this.StorageChoice == StorageChoices.Faster || this.TransportChoice == TransportChoices.EventHubs)
{
// we need a valid blob storage connection
if (string.IsNullOrEmpty(this.StorageConnectionName) && resolver is ConnectionNameToConnectionStringResolver)
{
throw new NetheriteConfigurationException($"Must specify {nameof(this.StorageConnectionName)} for Netherite storage provider.");
}
try
{
this.BlobStorageConnection = resolver.ResolveConnectionInfo(this.HubName, this.StorageConnectionName, ConnectionResolver.ResourceType.BlobStorage);
}
catch (Exception e)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a blob storage connection for Netherite storage provider: {e.Message}", e);
}
if (this.BlobStorageConnection == null)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a blob storage connection for Netherite storage provider.");
}
}
if (this.StorageChoice == StorageChoices.Faster && this.LoadInformationAzureTableName != null)
{
// we need a valid table storage connection
try
{
this.TableStorageConnection = resolver.ResolveConnectionInfo(this.HubName, this.StorageConnectionName, ConnectionResolver.ResourceType.TableStorage);
}
catch (Exception e)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a table storage connection for Netherite storage provider: {e.Message}", e);
}
if (this.TableStorageConnection == null)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a table storage connection for Netherite storage provider.");
}
}
if (this.StorageChoice == StorageChoices.Faster)
{
// some custom resolvers may specify a separate page blob connection, but usually this will be null
this.PageBlobStorageConnection = resolver.ResolveConnectionInfo(this.HubName, this.StorageConnectionName, ConnectionResolver.ResourceType.PageBlobStorage);
}
// we have completed validation and resolution
this.ResolutionComplete = true;
}
const int MinTaskHubNameSize = 1;
const int MinTaskHubNameSize = 3;
const int MaxTaskHubNameSize = 45;
public static void ValidateTaskhubName(string taskhubName)
{
if (taskhubName.Length < MinTaskHubNameSize || taskhubName.Length > MaxTaskHubNameSize)
{
throw new ArgumentException(GetTaskHubErrorString(taskhubName));
throw new NetheriteConfigurationException(GetTaskHubErrorString(taskhubName));
}
try
@ -394,7 +458,7 @@ namespace DurableTask.Netherite
}
catch (ArgumentException e)
{
throw new ArgumentException(GetTaskHubErrorString(taskhubName), e);
throw new NetheriteConfigurationException(GetTaskHubErrorString(taskhubName), e);
}
}

Просмотреть файл

@ -14,48 +14,12 @@ namespace DurableTask.Netherite
/// </summary>
public static class TransportConnectionString
{
/// <summary>
/// Configuration options for the storage component
/// </summary>
public enum StorageChoices
{
/// <summary>
/// Does not store any state to durable storage, just keeps it in memory.
/// Intended for testing scenarios.
/// </summary>
Memory = 0,
/// <summary>
/// Uses the Faster key-value store.
/// </summary>
Faster = 1,
}
/// <summary>
/// Configuration options for the transport component
/// </summary>
public enum TransportChoices
{
/// <summary>
/// Passes messages through memory and puts all partitions on a single host
/// Intended for testing scenarios.
/// </summary>
SingleHost = 0,
/// <summary>
/// Passes messages through eventhubs; can distribute over multiple machines via
/// the eventhubs EventProcessor.
/// </summary>
EventHubs = 1,
}
/// <summary>
/// Determines the components to use given a transport connection string.
/// </summary>
public static bool IsPseudoConnectionString(string connectionString)
{
switch (connectionString.ToLowerInvariant().Trim())
switch ((connectionString ?? "").ToLowerInvariant().Trim())
{
case "memory":
case "singlehost":

Просмотреть файл

@ -17,7 +17,7 @@ namespace DurableTask.Netherite.Scaling
class AzureBlobLoadPublisher : ILoadPublisherService
{
readonly string taskHubName;
readonly CloudBlobContainer blobContainer;
readonly Task<CloudBlobContainer> blobContainer;
readonly static JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
{
@ -27,13 +27,18 @@ namespace DurableTask.Netherite.Scaling
int? numPartitions;
public AzureBlobLoadPublisher(string connectionString, string taskHubName)
public AzureBlobLoadPublisher(ConnectionInfo connectionInfo, string taskHubName)
{
var cloudStorageAccount = CloudStorageAccount.Parse(connectionString);
this.blobContainer = this.GetBlobContainer(connectionInfo, taskHubName);
this.taskHubName = taskHubName;
}
async Task<CloudBlobContainer> GetBlobContainer(ConnectionInfo connectionInfo, string taskHubName)
{
var cloudStorageAccount = await connectionInfo.GetAzureStorageV11AccountAsync();
CloudBlobClient serviceClient = cloudStorageAccount.CreateCloudBlobClient();
string containerName = BlobManager.GetContainerName(taskHubName);
this.blobContainer = serviceClient.GetContainerReference(containerName);
this.taskHubName = taskHubName;
return serviceClient.GetContainerReference(containerName);
}
public TimeSpan PublishInterval => TimeSpan.FromSeconds(10);
@ -46,12 +51,12 @@ namespace DurableTask.Netherite.Scaling
public Task PublishAsync(Dictionary<uint, PartitionLoadInfo> info, CancellationToken cancellationToken)
{
Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo)
async Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo)
{
var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}");
var blobDirectory = (await this.blobContainer).GetDirectoryReference($"p{partitionId:D2}");
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented, serializerSettings);
return blob.UploadTextAsync(json, cancellationToken);
await blob.UploadTextAsync(json, cancellationToken);
}
List<Task> tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList();
@ -87,7 +92,7 @@ namespace DurableTask.Netherite.Scaling
{
// determine number of partitions of taskhub
var info = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
this.blobContainer.GetBlockBlobReference("taskhubparameters.json"),
(await this.blobContainer).GetBlockBlobReference("taskhubparameters.json"),
throwIfNotFound: true,
throwOnParseError: true,
cancellationToken).ConfigureAwait(false);
@ -98,7 +103,7 @@ namespace DurableTask.Netherite.Scaling
async Task<(uint, PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId)
{
PartitionLoadInfo info = await this.ReadJsonBlobAsync<PartitionLoadInfo>(
this.blobContainer.GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"),
(await this.blobContainer).GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"),
throwIfNotFound: false,
throwOnParseError: true,
cancellationToken).ConfigureAwait(false);
@ -116,7 +121,7 @@ namespace DurableTask.Netherite.Scaling
{
// determine number of partitions of taskhub
var info = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
this.blobContainer.GetBlockBlobReference("taskhubparameters.json"),
(await this.blobContainer).GetBlockBlobReference("taskhubparameters.json"),
throwIfNotFound: false,
throwOnParseError: false,
cancellationToken).ConfigureAwait(false);
@ -133,7 +138,7 @@ namespace DurableTask.Netherite.Scaling
async Task DeletePartitionInfo(uint partitionId)
{
var blob = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json");
var blob = (await this.blobContainer).GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json");
await BlobUtils.ForceDeleteAsync(blob).ConfigureAwait(false);
}

Просмотреть файл

@ -19,9 +19,9 @@ namespace DurableTask.Netherite.Scaling
readonly TableClient table;
readonly string taskHubName;
public AzureTableLoadPublisher(string connectionString, string tableName, string taskHubName)
public AzureTableLoadPublisher(ConnectionInfo connectionInfo, string tableName, string taskHubName)
{
this.table = new TableClient(connectionString, tableName);
this.table = connectionInfo.GetAzureStorageV12TableClientAsync(tableName, CancellationToken.None);
this.taskHubName = taskHubName;
}

Просмотреть файл

@ -18,19 +18,16 @@ namespace DurableTask.Netherite.Scaling
/// </summary>
public class ScalingMonitor
{
readonly string storageConnectionString;
readonly string eventHubsConnectionString;
readonly ConnectionInfo eventHubsConnection;
readonly string partitionLoadTableName;
readonly string taskHubName;
readonly TransportConnectionString.TransportChoices configuredTransport;
readonly ILoadPublisherService loadPublisher;
// public logging actions to enable collection of scale-monitor-related logging within the Netherite infrastructure
public Action<string, int, string> RecommendationTracer { get; }
public Action<string> InformationTracer { get; }
public Action<string, Exception> ErrorTracer { get; }
readonly ILoadPublisherService loadPublisher;
/// <summary>
/// The name of the taskhub.
/// </summary>
@ -39,13 +36,13 @@ namespace DurableTask.Netherite.Scaling
/// <summary>
/// Creates an instance of the scaling monitor, with the given parameters.
/// </summary>
/// <param name="storageConnectionString">The storage connection string.</param>
/// <param name="eventHubsConnectionString">The connection string for the transport layer.</param>
/// <param name="storageConnection">The storage connection info.</param>
/// <param name="eventHubsConnection">The connection info for event hubs.</param>
/// <param name="partitionLoadTableName">The name of the storage table with the partition load information.</param>
/// <param name="taskHubName">The name of the taskhub.</param>
public ScalingMonitor(
string storageConnectionString,
string eventHubsConnectionString,
ILoadPublisherService loadPublisher,
ConnectionInfo eventHubsConnection,
string partitionLoadTableName,
string taskHubName,
Action<string, int, string> recommendationTracer,
@ -56,21 +53,10 @@ namespace DurableTask.Netherite.Scaling
this.InformationTracer = informationTracer;
this.ErrorTracer = errorTracer;
this.storageConnectionString = storageConnectionString;
this.eventHubsConnectionString = eventHubsConnectionString;
this.loadPublisher = loadPublisher;
this.eventHubsConnection = eventHubsConnection;
this.partitionLoadTableName = partitionLoadTableName;
this.taskHubName = taskHubName;
TransportConnectionString.Parse(eventHubsConnectionString, out _, out this.configuredTransport);
if (!string.IsNullOrEmpty(partitionLoadTableName))
{
this.loadPublisher = new AzureTableLoadPublisher(storageConnectionString, partitionLoadTableName, taskHubName);
}
else
{
this.loadPublisher = new AzureBlobLoadPublisher(storageConnectionString, taskHubName);
}
}
/// <summary>
@ -230,25 +216,23 @@ namespace DurableTask.Netherite.Scaling
// next, check if any of the entries are not current, in the sense that their input queue position
// does not match the latest queue position
if (this.configuredTransport == TransportConnectionString.TransportChoices.EventHubs)
List<long> positions = await Netherite.EventHubsTransport.EventHubsConnections.GetQueuePositionsAsync(this.eventHubsConnection, EventHubsTransport.PartitionHub).ConfigureAwait(false);
if (positions == null)
{
List<long> positions = await Netherite.EventHubsTransport.EventHubsConnections.GetQueuePositionsAsync(this.eventHubsConnectionString, EventHubsTransport.PartitionHub).ConfigureAwait(false);
return "eventhubs is missing";
}
if (positions == null)
for (int i = 0; i < positions.Count; i++)
{
if (!loadInformation.TryGetValue((uint) i, out var loadInfo))
{
return "eventhubs is missing";
return $"P{i:D2} has no load information published yet";
}
for (int i = 0; i < positions.Count; i++)
if (positions[i] > loadInfo.InputQueuePosition)
{
if (!loadInformation.TryGetValue((uint) i, out var loadInfo))
{
return $"P{i:D2} has no load information published yet";
}
if (positions[i] > loadInfo.InputQueuePosition)
{
return $"P{i:D2} has input queue position {loadInfo.InputQueuePosition} which is {positions[(int)i] - loadInfo.InputQueuePosition} behind latest position {positions[i]}";
}
return $"P{i:D2} has input queue position {loadInfo.InputQueuePosition} which is {positions[(int)i] - loadInfo.InputQueuePosition} behind latest position {positions[i]}";
}
}
@ -267,4 +251,4 @@ namespace DurableTask.Netherite.Scaling
return null;
}
}
}
}

Просмотреть файл

@ -24,14 +24,15 @@ namespace DurableTask.Netherite.Faster
/// </summary>
partial class BlobManager : ICheckpointManager, ILogCommitManager
{
readonly NetheriteOrchestrationServiceSettings settings;
readonly uint partitionId;
readonly CancellationTokenSource shutDownOrTermination;
readonly CloudStorageAccount cloudStorageAccount;
readonly CloudStorageAccount pageBlobAccount;
readonly string taskHubPrefix;
readonly CloudBlobContainer blockBlobContainer;
readonly CloudBlobContainer pageBlobContainer;
CloudStorageAccount cloudStorageAccount;
CloudStorageAccount pageBlobAccount;
CloudBlobContainer blockBlobContainer;
CloudBlobContainer pageBlobContainer;
CloudBlockBlob eventLogCommitBlob;
CloudBlobDirectory pageBlobPartitionDirectory;
CloudBlobDirectory blockBlobPartitionDirectory;
@ -191,16 +192,16 @@ namespace DurableTask.Netherite.Faster
if (taskhubFormat.UseAlternateObjectStore != settings.UseAlternateObjectStore)
{
throw new InvalidOperationException("The Netherite configuration setting 'UseAlternateObjectStore' is incompatible with the existing taskhub.");
throw new NetheriteConfigurationException("The Netherite configuration setting 'UseAlternateObjectStore' is incompatible with the existing taskhub.");
}
if (taskhubFormat.FormatVersion != StorageFormatVersion.Last())
{
throw new InvalidOperationException($"The current storage format version (={StorageFormatVersion.Last()}) is incompatible with the existing taskhub (={taskhubFormat.FormatVersion}).");
throw new NetheriteConfigurationException($"The current storage format version (={StorageFormatVersion.Last()}) is incompatible with the existing taskhub (={taskhubFormat.FormatVersion}).");
}
}
catch (Exception e)
{
throw new InvalidOperationException("The taskhub has an incompatible storage format", e);
throw new NetheriteConfigurationException("The taskhub has an incompatible storage format", e);
}
}
@ -271,9 +272,7 @@ namespace DurableTask.Netherite.Faster
/// <param name="errorHandler">A handler for errors encountered in this partition</param>
/// <param name="psfGroupCount">Number of PSF groups to be created in FASTER</param>
public BlobManager(
CloudStorageAccount storageAccount,
CloudStorageAccount pageBlobAccount,
string localFilePath,
NetheriteOrchestrationServiceSettings settings,
string taskHubName,
string taskHubPrefix,
FaultInjector faultInjector,
@ -284,10 +283,7 @@ namespace DurableTask.Netherite.Faster
IPartitionErrorHandler errorHandler,
int psfGroupCount)
{
this.cloudStorageAccount = storageAccount;
this.pageBlobAccount = pageBlobAccount;
this.UseLocalFiles = (localFilePath != null);
this.LocalFileDirectoryForTestingAndDebugging = localFilePath;
this.settings = settings;
this.ContainerName = GetContainerName(taskHubName);
this.taskHubPrefix = taskHubPrefix;
this.FaultInjector = faultInjector;
@ -295,30 +291,20 @@ namespace DurableTask.Netherite.Faster
this.CheckpointInfo = new CheckpointInfo();
this.PsfCheckpointInfos = Enumerable.Range(0, psfGroupCount).Select(ii => new CheckpointInfo()).ToArray();
if (!this.UseLocalFiles)
if (!string.IsNullOrEmpty(settings.UseLocalDirectoryForPartitionStorage))
{
CloudBlobClient serviceClient = this.cloudStorageAccount.CreateCloudBlobClient();
this.blockBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
if (pageBlobAccount == storageAccount)
{
this.pageBlobContainer = this.BlockBlobContainer;
}
else
{
serviceClient = this.pageBlobAccount.CreateCloudBlobClient();
this.pageBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
}
this.UseLocalFiles = true;
this.LocalFileDirectoryForTestingAndDebugging = settings.UseLocalDirectoryForPartitionStorage;
this.LocalCheckpointManager = new LocalFileCheckpointManager(
this.CheckpointInfo,
this.LocalCheckpointDirectoryPath,
this.GetCheckpointCompletedBlobName());
}
else
{
this.LocalCheckpointManager = new LocalFileCheckpointManager(
this.CheckpointInfo,
this.LocalCheckpointDirectoryPath,
this.GetCheckpointCompletedBlobName());
this.UseLocalFiles = false;
}
this.TraceHelper = new FasterTraceHelper(logger, logLevelLimit, performanceLogger, this.partitionId, this.UseLocalFiles ? "none" : this.cloudStorageAccount.Credentials.AccountName, taskHubName);
this.TraceHelper = new FasterTraceHelper(logger, logLevelLimit, performanceLogger, this.partitionId, this.UseLocalFiles ? "none" : this.settings.StorageAccountName, taskHubName);
this.PartitionErrorHandler = errorHandler;
this.shutDownOrTermination = CancellationTokenSource.CreateLinkedTokenSource(errorHandler.Token);
}
@ -328,7 +314,7 @@ namespace DurableTask.Netherite.Faster
// For testing and debugging with local files
bool UseLocalFiles { get; }
LocalFileCheckpointManager LocalCheckpointManager { get; }
LocalFileCheckpointManager LocalCheckpointManager { get; set; }
string LocalFileDirectoryForTestingAndDebugging { get; }
string LocalDirectoryPath => $"{this.LocalFileDirectoryForTestingAndDebugging}\\{this.ContainerName}";
string LocalCheckpointDirectoryPath => $"{this.LocalDirectoryPath}\\chkpts{this.partitionId:D2}";
@ -351,6 +337,11 @@ namespace DurableTask.Netherite.Faster
{
if (this.UseLocalFiles)
{
this.LocalCheckpointManager = new LocalFileCheckpointManager(
this.CheckpointInfo,
this.LocalCheckpointDirectoryPath,
this.GetCheckpointCompletedBlobName());
Directory.CreateDirectory($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}");
this.EventLogDevice = Devices.CreateLogDevice($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{EventLogBlobName}");
@ -365,6 +356,30 @@ namespace DurableTask.Netherite.Faster
}
else
{
this.cloudStorageAccount = await this.settings.BlobStorageConnection.GetAzureStorageV11AccountAsync();
if (this.settings.PageBlobStorageConnection != null)
{
this.pageBlobAccount = await this.settings.PageBlobStorageConnection.GetAzureStorageV11AccountAsync();
}
else
{
this.pageBlobAccount = this.cloudStorageAccount;
}
CloudBlobClient serviceClient = this.cloudStorageAccount.CreateCloudBlobClient();
this.blockBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
if (this.pageBlobAccount == this.cloudStorageAccount)
{
this.pageBlobContainer = this.BlockBlobContainer;
}
else
{
serviceClient = this.pageBlobAccount.CreateCloudBlobClient();
this.pageBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
}
await this.blockBlobContainer.CreateIfNotExistsAsync();
this.blockBlobPartitionDirectory = this.blockBlobContainer.GetDirectoryReference(this.PartitionFolderName);
@ -445,13 +460,13 @@ namespace DurableTask.Netherite.Faster
await this.LeaseMaintenanceLoopTask; // wait for loop to terminate cleanly
}
public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, CloudStorageAccount pageBlobAccount, string localFileDirectoryPath, string taskHubName, string pathPrefix)
public static async Task DeleteTaskhubStorageAsync(NetheriteOrchestrationServiceSettings settings, string pathPrefix)
{
var containerName = GetContainerName(taskHubName);
var containerName = GetContainerName(settings.HubName);
if (!string.IsNullOrEmpty(localFileDirectoryPath))
if (!string.IsNullOrEmpty(settings.UseLocalDirectoryForPartitionStorage))
{
DirectoryInfo di = new DirectoryInfo($"{localFileDirectoryPath}\\{containerName}"); //TODO fine-grained deletion
DirectoryInfo di = new DirectoryInfo($"{settings.UseLocalDirectoryForPartitionStorage}\\{containerName}"); //TODO fine-grained deletion
if (di.Exists)
{
di.Delete(true);
@ -459,8 +474,9 @@ namespace DurableTask.Netherite.Faster
}
else
{
async Task DeleteContainerContents(CloudStorageAccount account)
async Task DeleteContainerContents(ConnectionInfo connectionInfo)
{
var account = await connectionInfo.GetAzureStorageV11AccountAsync();
CloudBlobClient serviceClient = account.CreateCloudBlobClient();
var blobContainer = serviceClient.GetContainerReference(containerName);
@ -495,11 +511,11 @@ namespace DurableTask.Netherite.Faster
// the same container.
}
await DeleteContainerContents(account);
await DeleteContainerContents(settings.BlobStorageConnection);
if (pageBlobAccount != account)
if (settings.PageBlobStorageConnection != null)
{
await DeleteContainerContents(pageBlobAccount);
await DeleteContainerContents(settings.PageBlobStorageConnection);
}
}
}

Просмотреть файл

@ -22,15 +22,12 @@ namespace DurableTask.Netherite.Faster
readonly NetheriteOrchestrationServiceSettings settings;
readonly OrchestrationServiceTraceHelper traceHelper;
readonly CloudStorageAccount storageAccount;
readonly string localFileDirectory;
readonly CloudStorageAccount pageBlobStorageAccount;
readonly ILogger logger;
readonly ILogger performanceLogger;
readonly MemoryTracker memoryTracker;
readonly CloudBlobContainer cloudBlobContainer;
readonly CloudBlockBlob taskhubParameters;
readonly Task<CloudBlobContainer> cloudBlobContainer;
readonly Task<CloudBlockBlob> taskhubParameters;
public ILoadPublisherService LoadPublisher { get;}
@ -52,27 +49,8 @@ namespace DurableTask.Netherite.Faster
this.settings = settings;
this.traceHelper = traceHelper;
string connectionString = settings.ResolvedStorageConnectionString;
string pageBlobConnectionString = settings.ResolvedPageBlobStorageConnectionString;
this.TestRuntimeAndLoading();
if (!string.IsNullOrEmpty(settings.UseLocalDirectoryForPartitionStorage))
{
this.localFileDirectory = settings.UseLocalDirectoryForPartitionStorage;
}
else
{
this.storageAccount = CloudStorageAccount.Parse(connectionString);
}
if (pageBlobConnectionString != connectionString && !string.IsNullOrEmpty(pageBlobConnectionString))
{
this.pageBlobStorageAccount = CloudStorageAccount.Parse(pageBlobConnectionString);
}
else
{
this.pageBlobStorageAccount = this.storageAccount;
}
this.logger = loggerFactory.CreateLogger($"{NetheriteOrchestrationService.LoggerCategoryName}.FasterStorage");
this.performanceLogger = loggerFactory.CreateLogger($"{NetheriteOrchestrationService.LoggerCategoryName}.FasterStorage.Performance");
@ -83,19 +61,29 @@ namespace DurableTask.Netherite.Faster
settings.TestHooks.CacheDebugger.MemoryTracker = this.memoryTracker;
}
var blobContainerName = GetContainerName(settings.HubName);
var cloudBlobClient = this.storageAccount.CreateCloudBlobClient();
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(blobContainerName);
this.taskhubParameters = this.cloudBlobContainer.GetBlockBlobReference("taskhubparameters.json");
this.cloudBlobContainer = GetBlobContainerAsync();
async Task<CloudBlobContainer> GetBlobContainerAsync()
{
var blobContainerName = GetContainerName(settings.HubName);
var cloudBlobClient = (await settings.BlobStorageConnection.GetAzureStorageV11AccountAsync()).CreateCloudBlobClient();
return cloudBlobClient.GetContainerReference(blobContainerName);
}
this.taskhubParameters = GetTaskhubParametersAsync();
async Task<CloudBlockBlob> GetTaskhubParametersAsync()
{
var cloudBlobContainer = await this.cloudBlobContainer;
return cloudBlobContainer.GetBlockBlobReference("taskhubparameters.json");
}
this.traceHelper.TraceProgress("Creating LoadMonitor Service");
if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName))
{
this.LoadPublisher = new AzureTableLoadPublisher(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName);
this.LoadPublisher = new AzureTableLoadPublisher(settings.TableStorageConnection, settings.LoadInformationAzureTableName, settings.HubName);
}
else
{
this.LoadPublisher = new AzureBlobLoadPublisher(settings.ResolvedStorageConnectionString, settings.HubName);
this.LoadPublisher = new AzureBlobLoadPublisher(settings.BlobStorageConnection, settings.HubName);
}
}
@ -117,14 +105,15 @@ namespace DurableTask.Netherite.Faster
// try load the taskhub parameters
try
{
var jsonText = await this.taskhubParameters.DownloadTextAsync();
var blob = await this.taskhubParameters;
var jsonText = await blob.DownloadTextAsync();
return JsonConvert.DeserializeObject<TaskhubParameters>(jsonText);
}
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.NotFound)
{
if (throwIfNotFound)
{
throw new InvalidOperationException($"The specified taskhub does not exist (TaskHub={this.settings.HubName}, StorageConnectionName={this.settings.StorageConnectionName}");
throw new NetheriteConfigurationException($"The specified taskhub does not exist (TaskHub={this.settings.HubName}, StorageConnectionName={this.settings.StorageConnectionName}");
}
else
{
@ -135,14 +124,14 @@ namespace DurableTask.Netherite.Faster
async Task<bool> IStorageLayer.CreateTaskhubIfNotExistsAsync()
{
bool containerCreated = await this.cloudBlobContainer.CreateIfNotExistsAsync();
bool containerCreated = await (await this.cloudBlobContainer).CreateIfNotExistsAsync();
if (containerCreated)
{
this.traceHelper.TraceProgress($"Created new blob container at {this.cloudBlobContainer.Uri}");
this.traceHelper.TraceProgress($"Created new blob container at {this.cloudBlobContainer.Result.Uri}");
}
else
{
this.traceHelper.TraceProgress($"Using existing blob container at {this.cloudBlobContainer.Uri}");
this.traceHelper.TraceProgress($"Using existing blob container at {this.cloudBlobContainer.Result.Uri}");
}
var taskHubParameters = new TaskhubParameters()
@ -166,13 +155,13 @@ namespace DurableTask.Netherite.Faster
new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None });
var noOverwrite = AccessCondition.GenerateIfNoneMatchCondition("*");
await this.taskhubParameters.UploadTextAsync(jsonText, null, noOverwrite, null, null);
await (await this.taskhubParameters).UploadTextAsync(jsonText, null, noOverwrite, null, null);
this.traceHelper.TraceProgress("Created new taskhub");
// zap the partition hub so we start from zero queue positions
if (!TransportConnectionString.IsPseudoConnectionString(this.settings.ResolvedTransportConnectionString))
if (this.settings.TransportChoice == TransportChoices.EventHubs)
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.settings.ResolvedTransportConnectionString, EventHubsTransport.PartitionHub);
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, CancellationToken.None);
}
}
catch (StorageException e) when (BlobUtils.BlobAlreadyExists(e))
@ -197,20 +186,20 @@ namespace DurableTask.Netherite.Faster
await this.LoadPublisher.DeleteIfExistsAsync(CancellationToken.None).ConfigureAwait(false);
// delete the parameters file which deletes the taskhub logically
await BlobUtils.ForceDeleteAsync(this.taskhubParameters);
await BlobUtils.ForceDeleteAsync(await this.taskhubParameters);
// delete all the files/blobs in the directory/container that represents this taskhub
// If this does not complete successfully, some garbage may be left behind.
await BlobManager.DeleteTaskhubStorageAsync(this.storageAccount, this.pageBlobStorageAccount, this.localFileDirectory, parameters.TaskhubName, TaskhubPathPrefix(parameters));
await BlobManager.DeleteTaskhubStorageAsync(this.settings, TaskhubPathPrefix(parameters));
}
}
public static Task DeleteTaskhubStorageAsync(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName, string pathPrefix)
{
var storageAccount = string.IsNullOrEmpty(connectionString) ? null : CloudStorageAccount.Parse(connectionString);
var pageBlobAccount = string.IsNullOrEmpty(pageBlobConnectionString) ? storageAccount : CloudStorageAccount.Parse(pageBlobConnectionString);
return BlobManager.DeleteTaskhubStorageAsync(storageAccount, pageBlobAccount, localFileDirectory, taskHubName, pathPrefix);
}
//public static Task DeleteTaskhubStorageAsync(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName, string pathPrefix)
//{
// var storageAccount = string.IsNullOrEmpty(connectionString) ? null : CloudStorageAccount.Parse(connectionString);
// var pageBlobAccount = string.IsNullOrEmpty(pageBlobConnectionString) ? storageAccount : CloudStorageAccount.Parse(pageBlobConnectionString);
// return BlobManager.DeleteTaskhubStorageAsync(storageAccount, pageBlobAccount, localFileDirectory, taskHubName, pathPrefix);
//}
IPartitionState IStorageLayer.CreatePartitionState(TaskhubParameters parameters)
{

Просмотреть файл

@ -14,15 +14,15 @@ namespace DurableTask.Netherite.Faster
class PartitionStorage : IPartitionState
{
readonly NetheriteOrchestrationServiceSettings settings;
readonly string taskHubName;
readonly string pathPrefix;
readonly ILogger logger;
readonly ILogger performanceLogger;
readonly MemoryTracker memoryTracker;
readonly CloudStorageAccount storageAccount;
readonly string localFileDirectory;
readonly CloudStorageAccount pageBlobStorageAccount;
//readonly CloudStorageAccount storageAccount;
//readonly string localFileDirectory;
//readonly CloudStorageAccount pageBlobStorageAccount;
Partition partition;
BlobManager blobManager;
@ -41,31 +41,14 @@ namespace DurableTask.Netherite.Faster
public PartitionStorage(NetheriteOrchestrationServiceSettings settings, string pathPrefix, MemoryTracker memoryTracker, ILogger logger, ILogger performanceLogger)
{
this.settings = settings;
this.taskHubName = settings.HubName;
this.pathPrefix = pathPrefix;
this.logger = logger;
this.performanceLogger = performanceLogger;
this.memoryTracker = memoryTracker;
string connectionString = settings.ResolvedStorageConnectionString;
string pageBlobConnectionString = settings.ResolvedPageBlobStorageConnectionString;
if (!string.IsNullOrEmpty(settings.UseLocalDirectoryForPartitionStorage))
{
this.localFileDirectory = settings.UseLocalDirectoryForPartitionStorage;
}
else
{
this.storageAccount = CloudStorageAccount.Parse(connectionString);
}
if (pageBlobConnectionString != connectionString && !string.IsNullOrEmpty(pageBlobConnectionString))
{
this.pageBlobStorageAccount = CloudStorageAccount.Parse(pageBlobConnectionString);
}
else
{
this.pageBlobStorageAccount = this.storageAccount;
}
if (settings.TestHooks?.CacheDebugger != null)
{
@ -98,9 +81,7 @@ namespace DurableTask.Netherite.Faster
int psfCount = 0;
this.blobManager = new BlobManager(
this.storageAccount,
this.pageBlobStorageAccount,
this.localFileDirectory,
this.settings,
this.taskHubName,
this.pathPrefix,
partition.Settings.TestHooks?.FaultInjector,

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
interface IStorageLayerFactory
{
IStorageLayer Create(NetheriteOrchestrationService orchestrationService);
}
}

Просмотреть файл

@ -48,7 +48,7 @@ namespace DurableTask.Netherite
TaskhubGuid = Guid.NewGuid(),
CreationTimestamp = DateTime.UtcNow,
StorageFormat = String.Empty,
PartitionCount = 1,
PartitionCount = this.settings.PartitionCount,
};
return true;
}

Просмотреть файл

@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
/// <summary>
/// Configuration options for the storage layer
/// </summary>
public enum StorageChoices
{
/// <summary>
/// Does not store any state to durable storage, just keeps it in memory.
/// Intended for testing scenarios.
/// </summary>
Memory = 0,
/// <summary>
/// Uses the Faster key-value store.
/// </summary>
Faster = 1,
/// <summary>
/// Uses a custom dependency-injected storage layer
/// </summary>
Custom = 2,
}
}

Просмотреть файл

@ -29,7 +29,7 @@ namespace DurableTask.Netherite
this.serviceInstanceId = Guid.NewGuid();
}
public void TraceCreated(int processorCount, TransportConnectionString.TransportChoices transport, TransportConnectionString.StorageChoices storage)
public void TraceCreated(int processorCount, TransportChoices transport, StorageChoices storage)
{
if (this.logLevelLimit <= LogLevel.Information)
{

Просмотреть файл

@ -15,7 +15,7 @@ namespace DurableTask.Netherite.EventHubsTransport
class EventHubsConnections
{
readonly string connectionString;
readonly ConnectionInfo connectionInfo;
readonly string[] clientHubs;
readonly string partitionHub;
readonly string loadMonitorHub;
@ -29,7 +29,6 @@ namespace DurableTask.Netherite.EventHubsTransport
public const int NumClientChannels = 2;
public string Endpoint { get; private set; }
public DateTime CreationTimestamp { get; private set; }
public ConcurrentDictionary<int, EventHubsSender<PartitionUpdateEvent>> _partitionSenders = new ConcurrentDictionary<int, EventHubsSender<PartitionUpdateEvent>>();
@ -42,18 +41,18 @@ namespace DurableTask.Netherite.EventHubsTransport
int GetClientBucket(Guid clientId, int index) => (int)((Fnv1aHashHelper.ComputeHash(clientId.ToByteArray()) + index) % (uint)this.clientPartitions.Count);
public EventHubsConnections(
string connectionString,
ConnectionInfo connectionInfo,
string partitionHub,
string[] clientHubs,
string loadMonitorHub)
{
this.connectionString = connectionString;
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
}
public string Fingerprint => $"{this.Endpoint}{this.partitionHub}/{this.CreationTimestamp:o}";
public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
public async Task StartAsync(TaskhubParameters parameters)
{
@ -94,30 +93,26 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task EnsureEventHubExistsAsync(string eventHubName, int partitionCount)
{
this.TraceHelper.LogDebug("Creating EventHub {name}", eventHubName);
bool success = await EventHubsUtil.EnsureEventHubExistsAsync(this.connectionString, eventHubName, partitionCount);
bool success = await EventHubsUtil.EnsureEventHubExistsAsync(this.connectionInfo, eventHubName, partitionCount, CancellationToken.None);
if (success)
{
this.TraceHelper.LogInformation("Created EventHub {name}", eventHubName);
this.TraceHelper.LogInformation("Created EventHub {name}", eventHubName, CancellationToken.None);
}
else
{
this.TraceHelper.LogDebug("Conflict on EventHub {name}", eventHubName);
this.TraceHelper.LogDebug("Conflict on EventHub {name}", eventHubName, CancellationToken.None);
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
internal async Task DeletePartitions()
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionString, this.partitionHub);
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionInfo, this.partitionHub, CancellationToken.None);
}
public async Task EnsurePartitionsAsync(int partitionCount, int retries = EventHubCreationRetries)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.connectionString)
{
EntityPath = this.partitionHub
};
var client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
var client = this.connectionInfo.CreateEventHubClient(this.partitionHub);
try
{
var runtimeInformation = await client.GetRuntimeInformationAsync();
@ -126,7 +121,6 @@ namespace DurableTask.Netherite.EventHubsTransport
{
// we were successful. Record information and create a flat list of partition partitions
this.partitionClient = client;
this.Endpoint = connectionStringBuilder.Endpoint.ToString();
this.CreationTimestamp = runtimeInformation.CreatedAt;
for (int i = 0; i < partitionCount; i++)
{
@ -138,7 +132,7 @@ namespace DurableTask.Netherite.EventHubsTransport
{
// we have to create a fresh one
this.TraceHelper.LogWarning("Deleting existing partition EventHub because of partition count mismatch.");
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionString, this.partitionHub);
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionInfo, this.partitionHub, CancellationToken.None);
await Task.Delay(TimeSpan.FromSeconds(10));
}
}
@ -183,13 +177,9 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task<(EventHubClient, EventHubRuntimeInformation)> EnsureClientAsync(int i, int retries = EventHubCreationRetries)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.connectionString)
{
EntityPath = this.clientHubs[i]
};
try
{
var client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
var client = this.connectionInfo.CreateEventHubClient(this.clientHubs[i]);
var runtimeInformation = await client.GetRuntimeInformationAsync();
return (client, runtimeInformation);
}
@ -205,13 +195,9 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task EnsureLoadMonitorAsync(int retries = EventHubCreationRetries)
{
// create loadmonitor client
var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.connectionString)
{
EntityPath = loadMonitorHub,
};
try
{
this.loadMonitorClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
this.loadMonitorClient = this.connectionInfo.CreateEventHubClient(this.loadMonitorHub);
var runtimeInformation = await this.loadMonitorClient.GetRuntimeInformationAsync();
return;
}
@ -223,13 +209,9 @@ namespace DurableTask.Netherite.EventHubsTransport
await this.EnsureLoadMonitorAsync(retries - 1);
}
public static async Task<List<long>> GetQueuePositionsAsync(string connectionString, string partitionHub)
public static async Task<List<long>> GetQueuePositionsAsync(ConnectionInfo connectionInfo, string partitionHub)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = partitionHub,
};
var client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
var client = connectionInfo.CreateEventHubClient(partitionHub);
try
{
var runtimeInformation = await client.GetRuntimeInformationAsync();

Просмотреть файл

@ -28,7 +28,6 @@ namespace DurableTask.Netherite.EventHubsTransport
{
readonly TransportAbstraction.IHost host;
readonly NetheriteOrchestrationServiceSettings settings;
readonly CloudStorageAccount cloudStorageAccount;
readonly ILogger logger;
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
@ -59,16 +58,15 @@ namespace DurableTask.Netherite.EventHubsTransport
{
if (storage is MemoryStorageLayer)
{
throw new InvalidOperationException($"Configuration error: in-memory storage cannot be used together with a real event hubs namespace");
throw new NetheriteConfigurationException($"Configuration error: in-memory storage cannot be used together with a real event hubs namespace");
}
this.host = host;
this.settings = settings;
this.cloudStorageAccount = CloudStorageAccount.Parse(this.settings.ResolvedStorageConnectionString);
this.storage = storage;
string namespaceName = TransportConnectionString.EventHubsNamespaceName(settings.ResolvedTransportConnectionString);
string namespaceName = settings.EventHubsConnection.ResourceName;
this.logger = EventHubsTraceHelper.CreateLogger(loggerFactory);
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, this.cloudStorageAccount.Credentials.AccountName, settings.HubName, namespaceName);
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName);
this.ClientId = Guid.NewGuid();
}
@ -89,21 +87,22 @@ namespace DurableTask.Netherite.EventHubsTransport
// check that we are the correct taskhub!
if (this.parameters.TaskhubName != this.settings.HubName)
{
throw new InvalidOperationException($"The specified taskhub name does not match the task hub name in storage");
throw new NetheriteConfigurationException($"The specified taskhub name does not match the task hub name in storage");
}
this.taskhubGuid = this.parameters.TaskhubGuid.ToByteArray();
(string containerName, string path) = this.storage.GetTaskhubPathPrefix(this.parameters);
this.pathPrefix = path;
var cloudBlobClient = this.cloudStorageAccount.CreateCloudBlobClient();
var cloudStorageAccount = await this.settings.BlobStorageConnection.GetAzureStorageV11AccountAsync();
var cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient();
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(containerName);
this.partitionScript = this.cloudBlobContainer.GetBlockBlobReference("partitionscript.json");
// check that the storage format is supported
BlobManager.CheckStorageFormat(this.parameters.StorageFormat, this.settings);
this.connections = new EventHubsConnections(this.settings.ResolvedTransportConnectionString, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
{
Host = host,
TraceHelper = this.traceHelper,
@ -164,12 +163,11 @@ namespace DurableTask.Netherite.EventHubsTransport
string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");
this.eventProcessorHost = new EventProcessorHost(
this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.settings.ResolvedTransportConnectionString,
this.settings.ResolvedStorageConnectionString,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
@ -191,8 +189,8 @@ namespace DurableTask.Netherite.EventHubsTransport
this.scriptedEventProcessorHost = new ScriptedEventProcessorHost(
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.settings.ResolvedTransportConnectionString,
this.settings.ResolvedStorageConnectionString,
this.settings.EventHubsConnection,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
this.host,
this,
@ -211,12 +209,11 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.traceHelper.LogInformation("Registering LoadMonitor Host with EventHubs");
this.loadMonitorHost = new EventProcessorHost(
this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
LoadMonitorHub,
LoadMonitorConsumerGroup,
this.settings.ResolvedTransportConnectionString,
this.settings.ResolvedStorageConnectionString,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{LoadMonitorHub}");
@ -382,14 +379,36 @@ namespace DurableTask.Netherite.EventHubsTransport
{
byte[] taskHubGuid = this.parameters.TaskhubGuid.ToByteArray();
TimeSpan longPollingInterval = TimeSpan.FromMinutes(1);
var backoffDelay = TimeSpan.Zero;
await this.clientConnectionsEstablished[index];
while (!this.shutdownSource.IsCancellationRequested)
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index);
IEnumerable<EventData> eventData = await receiver.ReceiveAsync(1000, longPollingInterval);
IEnumerable<EventData> eventData;
try
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index);
eventData = await receiver.ReceiveAsync(1000, longPollingInterval);
backoffDelay = TimeSpan.Zero;
}
catch (Exception exception) when (!this.shutdownSource.IsCancellationRequested)
{
if (backoffDelay < TimeSpan.FromSeconds(30))
{
backoffDelay = backoffDelay + backoffDelay + TimeSpan.FromSeconds(2);
}
// if we lose access to storage temporarily, we back off, but don't quit
this.traceHelper.LogError("Client{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", Client.GetShortId(this.ClientId), index, backoffDelay, exception);
await Task.Delay(backoffDelay);
continue; // retry
}
if (eventData != null)
{

Просмотреть файл

@ -23,8 +23,8 @@ namespace DurableTask.Netherite.EventHubsTransport
{
readonly string eventHubPath;
readonly string consumerGroupName;
readonly string eventHubConnectionString;
readonly string storageConnectionString;
readonly ConnectionInfo eventHubConnection;
readonly ConnectionInfo storageConnection;
readonly string leaseContainerName;
readonly string workerId;
readonly TransportAbstraction.IHost host;
@ -41,8 +41,8 @@ namespace DurableTask.Netherite.EventHubsTransport
public ScriptedEventProcessorHost(
string eventHubPath,
string consumerGroupName,
string eventHubConnectionString,
string storageConnectionString,
ConnectionInfo eventHubConnection,
ConnectionInfo storageConnection,
string leaseContainerName,
TransportAbstraction.IHost host,
TransportAbstraction.ISender sender,
@ -54,8 +54,8 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.eventHubPath = eventHubPath;
this.consumerGroupName = consumerGroupName;
this.eventHubConnectionString = eventHubConnectionString;
this.storageConnectionString = storageConnectionString;
this.eventHubConnection = eventHubConnection;
this.storageConnection = storageConnection;
this.leaseContainerName = leaseContainerName;
this.host = host;
this.sender = sender;

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
public interface ITransportLayerFactory
{
ITransportLayer Create(NetheriteOrchestrationService orchestrationService);
}
}

Просмотреть файл

@ -38,7 +38,6 @@ namespace DurableTask.Netherite.SingleHostTransport
this.host = host;
this.settings = settings;
this.storage = storage;
TransportConnectionString.Parse(settings.ResolvedTransportConnectionString, out _, out _);
this.numberPartitions = (uint) settings.PartitionCount;
this.logger = logger;
this.faultInjector = settings.TestHooks?.FaultInjector;

Просмотреть файл

@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
/// <summary>
/// Configuration options for the transport layer
/// </summary>
public enum TransportChoices
{
/// <summary>
/// Passes messages through memory and puts all partitions on a single host
/// Intended primarily for testing scenarios, but can also provide a cheap and efficient option for
/// workloads whose CPU and memory load do not require more than one worker.
/// </summary>
SingleHost = 0,
/// <summary>
/// Passes messages through eventhubs; can distribute over multiple machines via
/// the eventhubs EventProcessor.
/// </summary>
EventHubs = 1,
/// <summary>
/// Uses a custom dependency-injected transport layer
/// </summary>
Custom = 2,
}
}

Просмотреть файл

@ -87,8 +87,8 @@ namespace DurableTask.Netherite.Tests
public async Task EachScenarioOnce(bool restrictMemory)
{
var orchestrationTimeout = TimeSpan.FromMinutes(restrictMemory ? 10 : 5);
var startupTimeout = TimeSpan.FromMinutes(TransportConnectionString.IsPseudoConnectionString(this.settings.ResolvedTransportConnectionString) ? 1 : 3.5);
var shutDownTimeout = TimeSpan.FromMinutes(TransportConnectionString.IsPseudoConnectionString(this.settings.ResolvedTransportConnectionString) ? 0.1 : 3);
var startupTimeout = TimeSpan.FromMinutes(this.settings.UsesEmulation() ? 1 : 3.5);
var shutDownTimeout = TimeSpan.FromMinutes(this.settings.UsesEmulation() ? 0.1 : 3);
var totalTimeout = startupTimeout + orchestrationTimeout + shutDownTimeout;
using var _ = TestOrchestrationClient.WithExtraTime(TimeSpan.FromMinutes(restrictMemory ? 10 : 5));
@ -141,9 +141,9 @@ namespace DurableTask.Netherite.Tests
public async Task ScaleSmallScenarios(bool useReplayChecker, bool restrictMemory, int multiplicity)
{
var orchestrationTimeout = TimeSpan.FromMinutes((restrictMemory ? 10 : 5) + multiplicity * (restrictMemory ? 0.5 : 0.1));
var startupTimeout = TimeSpan.FromMinutes(TransportConnectionString.IsPseudoConnectionString(this.settings.ResolvedTransportConnectionString) ? 1 : 3.5);
var startupTimeout = TimeSpan.FromMinutes(this.settings.UsesEmulation() ? 1 : 3.5);
var testTimeout = orchestrationTimeout + TimeSpan.FromMinutes(multiplicity * 0.2);
var shutDownTimeout = TimeSpan.FromMinutes(TransportConnectionString.IsPseudoConnectionString(this.settings.ResolvedTransportConnectionString) ? 0.1 : 3);
var shutDownTimeout = TimeSpan.FromMinutes(this.settings.UsesEmulation() ? 0.1 : 3);
var totalTimeout = startupTimeout + testTimeout + shutDownTimeout;
using var _ = TestOrchestrationClient.WithExtraTime(orchestrationTimeout);

Просмотреть файл

@ -17,11 +17,11 @@ namespace DurableTask.Netherite.Tests
{
if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable(StorageConnectionName)))
{
throw new InvalidOperationException($"To run tests, environment must define '{StorageConnectionName}'");
throw new NetheriteConfigurationException($"To run tests, environment must define '{StorageConnectionName}'");
}
if (requiresTransportSpec && string.IsNullOrEmpty(Environment.GetEnvironmentVariable(EventHubsConnectionName)))
{
throw new InvalidOperationException($"To run tests, environment must define '{EventHubsConnectionName}'");
throw new NetheriteConfigurationException($"To run tests, environment must define '{EventHubsConnectionName}'");
}
}
@ -53,7 +53,7 @@ namespace DurableTask.Netherite.Tests
//settings.ResolvedStorageConnectionString = "";
//settings.UseLocalDirectoryForPartitionStorage = $"{Environment.GetEnvironmentVariable("temp")}\\FasterTestStorage";
settings.Validate((name) => Environment.GetEnvironmentVariable(name));
settings.Validate(new ConnectionNameToConnectionStringResolver((name) => Environment.GetEnvironmentVariable(name)));
settings.TestHooks = new TestHooks();
return settings;
@ -67,5 +67,9 @@ namespace DurableTask.Netherite.Tests
internal static TestOrchestrationHost GetTestOrchestrationHost(ILoggerFactory loggerFactory)
=> new TestOrchestrationHost(GetNetheriteOrchestrationServiceSettings(), loggerFactory);
internal static bool UsesEmulation(this NetheriteOrchestrationServiceSettings settings)
{
return TransportConnectionString.IsPseudoConnectionString(settings.EventHubsConnectionName);
}
}
}

Просмотреть файл

@ -27,7 +27,7 @@ namespace ScalingTests
ClientLogLevelLimit = LogLevel.Trace,
LoadMonitorLogLevelLimit = LogLevel.Trace,
};
settings.Validate((string connectionName) => Environment.GetEnvironmentVariable(connectionName));
settings.Validate(ConnectionResolver.FromConnectionNameToConnectionStringResolver((string connectionName) => Environment.GetEnvironmentVariable(connectionName)));
var loggerFactory = new LoggerFactory();
loggerFactory.AddProvider(new ConsoleLoggerProvider());
var logger = loggerFactory.CreateLogger("Main");