diff --git a/DurableTask.Netherite.sln b/DurableTask.Netherite.sln index 80e97ef..b18c069 100644 --- a/DurableTask.Netherite.sln +++ b/DurableTask.Netherite.sln @@ -36,7 +36,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LoadGeneratorApp", "test\Lo EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloDTFx", "samples\HelloDTFx\HelloDTFx\HelloDTFx.csproj", "{EC293D85-91E3-4F78-8B1E-2C691315CE96}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TokenCredentialDF", "samples\TokenCredentialDF\TokenCredentialDF.csproj", "{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDF", "samples\TokenCredentialDF\TokenCredentialDF.csproj", "{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDTFx", "samples\TokenCredentialDTFx\TokenCredentialDTFx.csproj", "{FBFF0814-E6C0-489A-ACCF-9D0699219621}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -84,6 +86,10 @@ Global {B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Debug|Any CPU.Build.0 = Debug|Any CPU {B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Release|Any CPU.ActiveCfg = Release|Any CPU {B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Release|Any CPU.Build.0 = Release|Any CPU + {FBFF0814-E6C0-489A-ACCF-9D0699219621}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FBFF0814-E6C0-489A-ACCF-9D0699219621}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FBFF0814-E6C0-489A-ACCF-9D0699219621}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FBFF0814-E6C0-489A-ACCF-9D0699219621}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -99,6 +105,7 @@ Global {78B360B8-3A41-4DC0-A300-94A0FABC1FB0} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B} {EC293D85-91E3-4F78-8B1E-2C691315CE96} = {AB958467-9236-402E-833C-B8DE4841AB9F} {B99AB043-47DC-467C-93CB-D6C69D3B1AD4} = {AB958467-9236-402E-833C-B8DE4841AB9F} + {FBFF0814-E6C0-489A-ACCF-9D0699219621} = {AB958467-9236-402E-833C-B8DE4841AB9F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {238A9613-5411-41CF-BDEC-168CCD5C03FB} diff --git a/samples/TokenCredentialDF/FunctionsStartup.cs b/samples/TokenCredentialDF/FunctionsStartup.cs index 32287f1..06021d4 100644 --- a/samples/TokenCredentialDF/FunctionsStartup.cs +++ b/samples/TokenCredentialDF/FunctionsStartup.cs @@ -8,6 +8,7 @@ namespace PerformanceTests using System; using Azure.Identity; using DurableTask.Netherite; + using DurableTask.Netherite.AzureFunctions; using Microsoft.Azure.Functions.Extensions.DependencyInjection; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.DependencyInjection; @@ -15,7 +16,7 @@ namespace PerformanceTests public class Startup : FunctionsStartup { - public class MyConnectionResolver : TokenCredentialResolver + public class MyConnectionResolver : CredentialBasedConnectionNameResolver { readonly INameResolver nameResolver; @@ -29,7 +30,7 @@ namespace PerformanceTests ?? Environment.GetEnvironmentVariable(name) ?? throw new InvalidOperationException($"missing configuration setting '{name}'"); - public override string GetStorageAccountName(string connectionName) => this.Resolve($"{connectionName}__storageAccountName"); + public override string GetStorageAccountName(string connectionName) => this.Resolve($"{connectionName}__accountName"); public override string GetEventHubsNamespaceName(string connectionName) => this.Resolve($"{connectionName}__eventHubsNamespaceName"); } diff --git a/samples/TokenCredentialDF/host.json b/samples/TokenCredentialDF/host.json index f89603e..d3d8388 100644 --- a/samples/TokenCredentialDF/host.json +++ b/samples/TokenCredentialDF/host.json @@ -17,7 +17,7 @@ "routePrefix": "" }, "durableTask": { - "hubName": "tokentest", + "hubName": "tokencredentialsample", "UseGracefulShutdown": true, "storageProvider": { "type": "Netherite", diff --git a/samples/TokenCredentialDTFx/HelloCities.cs b/samples/TokenCredentialDTFx/HelloCities.cs index 58d69c0..2a85557 100644 --- a/samples/TokenCredentialDTFx/HelloCities.cs +++ b/samples/TokenCredentialDTFx/HelloCities.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. - using System; using DurableTask.Core; @@ -28,4 +27,3 @@ public class SayHello : TaskActivity } } - \ No newline at end of file diff --git a/samples/TokenCredentialDTFx/Program.cs b/samples/TokenCredentialDTFx/Program.cs index 718c8b9..c27550e 100644 --- a/samples/TokenCredentialDTFx/Program.cs +++ b/samples/TokenCredentialDTFx/Program.cs @@ -1,12 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using Azure.Identity; using DurableTask.Core; using DurableTask.Netherite; using Microsoft.Extensions.Logging; using Newtonsoft.Json; - // ----------- construct the Netherite orchestration service Console.WriteLine("Starting Netherite..."); @@ -15,14 +15,13 @@ var netheriteSettings = new NetheriteOrchestrationServiceSettings() { HubName = "myhub", PartitionCount = 4, - - // we explicitly specify the two required connection strings here. - // Another option would be to use a connection name resolver when calling Validate(). - ResolvedStorageConnectionString = "UseDevelopmentStorage=true;", - ResolvedTransportConnectionString = "SingleHost", }; -netheriteSettings.Validate(); +netheriteSettings.Validate(new SimpleCredentialResolver( + new DefaultAzureCredential(), + Environment.GetEnvironmentVariable("AccountName") ?? throw new Exception("missing env var: AccountName"), + Environment.GetEnvironmentVariable("NamespaceName") ?? throw new Exception("missing env var: NamespaceName") +)); var loggerFactory = LoggerFactory.Create(builder => { @@ -74,4 +73,4 @@ Console.WriteLine($"Shutting down..."); await worker.StopAsync(); -Console.WriteLine("Done."); +Console.WriteLine("Done."); \ No newline at end of file diff --git a/samples/TokenCredentialDTFx/HelloDTFx.csproj b/samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj similarity index 74% rename from samples/TokenCredentialDTFx/HelloDTFx.csproj rename to samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj index 6bfb29f..b434263 100644 --- a/samples/TokenCredentialDTFx/HelloDTFx.csproj +++ b/samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj @@ -8,8 +8,11 @@ - + + + + diff --git a/src/DurableTask.Netherite/Connections/TokenCredentialResolver.cs b/src/DurableTask.Netherite.AzureFunctions/TokenCredentialResolver.cs similarity index 91% rename from src/DurableTask.Netherite/Connections/TokenCredentialResolver.cs rename to src/DurableTask.Netherite.AzureFunctions/TokenCredentialResolver.cs index c874f64..aea7167 100644 --- a/src/DurableTask.Netherite/Connections/TokenCredentialResolver.cs +++ b/src/DurableTask.Netherite.AzureFunctions/TokenCredentialResolver.cs @@ -1,16 +1,17 @@ -namespace DurableTask.Netherite +namespace DurableTask.Netherite.AzureFunctions { using System; using System.Collections.Generic; using System.Text; using Azure.Identity; - using Microsoft.Azure.Storage.Blob.Protocol; using Microsoft.Extensions.Logging.Abstractions; +#if !NETCOREAPP2_2 + /// /// Resolves connections using a token credential and a mapping from connection names to resource names. /// - public abstract class TokenCredentialResolver : ConnectionResolver + public abstract class CredentialBasedConnectionNameResolver : DurableTask.Netherite.ConnectionResolver { readonly Azure.Core.TokenCredential tokenCredential; @@ -18,7 +19,7 @@ /// Create a connection resolver that uses an Azure token credential. /// /// The token credential to use. - public TokenCredentialResolver(Azure.Core.TokenCredential tokenCredential) + public CredentialBasedConnectionNameResolver(Azure.Core.TokenCredential tokenCredential) { this.tokenCredential = tokenCredential; } @@ -71,4 +72,6 @@ } } } + +#endif } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Connections/SimpleCredentialResolver.cs b/src/DurableTask.Netherite/Connections/SimpleCredentialResolver.cs new file mode 100644 index 0000000..b502eb4 --- /dev/null +++ b/src/DurableTask.Netherite/Connections/SimpleCredentialResolver.cs @@ -0,0 +1,71 @@ +namespace DurableTask.Netherite +{ + using System; + using System.Collections.Generic; + using System.Text; + using Azure.Identity; + using Microsoft.Extensions.Logging.Abstractions; + + /// + /// Resolves connections using a token credential, a storage account name, and an eventhubs namespace name. + /// + public class SimpleCredentialResolver : ConnectionResolver + { + readonly Azure.Core.TokenCredential tokenCredential; + readonly string storageAccountName; + readonly string eventHubNamespaceName; + + /// + /// Create a connection resolver that uses an Azure token credential. + /// + /// The token credential to use. + /// The name of the storage account, or null if using in-memory emulation. + /// The name of the event hub namespace, or null if using the singlehost configuration. + public SimpleCredentialResolver(Azure.Core.TokenCredential tokenCredential, string storageAccountName = null, string eventHubNamespaceName = null) + { + this.tokenCredential = tokenCredential; + this.storageAccountName = storageAccountName; + this.eventHubNamespaceName = eventHubNamespaceName; + } + + /// + public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType) + { + switch (recourceType) + { + case ResourceType.BlobStorage: + case ResourceType.TableStorage: + return ConnectionInfo.FromTokenCredential(this.tokenCredential, this.storageAccountName, recourceType); + + case ResourceType.PageBlobStorage: + return null; // same as blob storage + + case ResourceType.EventHubsNamespace: + return ConnectionInfo.FromTokenCredential(this.tokenCredential, this.eventHubNamespaceName, recourceType); + + default: + throw new NotImplementedException("unknown resource type"); + } + } + + /// + public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice) + { + if (string.IsNullOrEmpty(this.storageAccountName)) + { + storageChoice = StorageChoices.Memory; + transportChoice = TransportChoices.SingleHost; + } + else if (string.IsNullOrEmpty(this.eventHubNamespaceName)) + { + storageChoice = StorageChoices.Faster; + transportChoice = TransportChoices.SingleHost; + } + else + { + storageChoice = StorageChoices.Faster; + transportChoice = TransportChoices.EventHubs; + } + } + } +} \ No newline at end of file diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index f4d5ca7..1dd5c47 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -28,7 +28,6 @@ namespace DurableTask.Netherite.EventHubsTransport { readonly TransportAbstraction.IHost host; readonly NetheriteOrchestrationServiceSettings settings; - readonly CloudStorageAccount cloudStorageAccount; readonly ILogger logger; readonly EventHubsTraceHelper traceHelper; readonly IStorageLayer storage; @@ -380,14 +379,36 @@ namespace DurableTask.Netherite.EventHubsTransport { byte[] taskHubGuid = this.parameters.TaskhubGuid.ToByteArray(); TimeSpan longPollingInterval = TimeSpan.FromMinutes(1); + var backoffDelay = TimeSpan.Zero; await this.clientConnectionsEstablished[index]; while (!this.shutdownSource.IsCancellationRequested) { - this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index); - - IEnumerable eventData = await receiver.ReceiveAsync(1000, longPollingInterval); + IEnumerable eventData; + + try + { + this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index); + + eventData = await receiver.ReceiveAsync(1000, longPollingInterval); + + backoffDelay = TimeSpan.Zero; + } + catch (Exception exception) when (!this.shutdownSource.IsCancellationRequested) + { + if (backoffDelay < TimeSpan.FromSeconds(30)) + { + backoffDelay = backoffDelay + backoffDelay + TimeSpan.FromSeconds(2); + } + + // if we lose access to storage temporarily, we back off, but don't quit + this.traceHelper.LogError("Client{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", Client.GetShortId(this.ClientId), index, backoffDelay, exception); + + await Task.Delay(backoffDelay); + + continue; // retry + } if (eventData != null) {