add DTFx sample and use simple credential resolver

This commit is contained in:
sebastianburckhardt 2022-10-12 13:52:59 -07:00
Родитель f52e5d9a20
Коммит 25c1ea640c
9 изменённых файлов: 126 добавлений и 23 удалений

Просмотреть файл

@ -36,7 +36,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LoadGeneratorApp", "test\Lo
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloDTFx", "samples\HelloDTFx\HelloDTFx\HelloDTFx.csproj", "{EC293D85-91E3-4F78-8B1E-2C691315CE96}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TokenCredentialDF", "samples\TokenCredentialDF\TokenCredentialDF.csproj", "{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDF", "samples\TokenCredentialDF\TokenCredentialDF.csproj", "{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDTFx", "samples\TokenCredentialDTFx\TokenCredentialDTFx.csproj", "{FBFF0814-E6C0-489A-ACCF-9D0699219621}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -84,6 +86,10 @@ Global
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4}.Release|Any CPU.Build.0 = Release|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FBFF0814-E6C0-489A-ACCF-9D0699219621}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -99,6 +105,7 @@ Global
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{EC293D85-91E3-4F78-8B1E-2C691315CE96} = {AB958467-9236-402E-833C-B8DE4841AB9F}
{B99AB043-47DC-467C-93CB-D6C69D3B1AD4} = {AB958467-9236-402E-833C-B8DE4841AB9F}
{FBFF0814-E6C0-489A-ACCF-9D0699219621} = {AB958467-9236-402E-833C-B8DE4841AB9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {238A9613-5411-41CF-BDEC-168CCD5C03FB}

Просмотреть файл

@ -8,6 +8,7 @@ namespace PerformanceTests
using System;
using Azure.Identity;
using DurableTask.Netherite;
using DurableTask.Netherite.AzureFunctions;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.DependencyInjection;
@ -15,7 +16,7 @@ namespace PerformanceTests
public class Startup : FunctionsStartup
{
public class MyConnectionResolver : TokenCredentialResolver
public class MyConnectionResolver : CredentialBasedConnectionNameResolver
{
readonly INameResolver nameResolver;
@ -29,7 +30,7 @@ namespace PerformanceTests
?? Environment.GetEnvironmentVariable(name)
?? throw new InvalidOperationException($"missing configuration setting '{name}'");
public override string GetStorageAccountName(string connectionName) => this.Resolve($"{connectionName}__storageAccountName");
public override string GetStorageAccountName(string connectionName) => this.Resolve($"{connectionName}__accountName");
public override string GetEventHubsNamespaceName(string connectionName) => this.Resolve($"{connectionName}__eventHubsNamespaceName");
}

Просмотреть файл

@ -17,7 +17,7 @@
"routePrefix": ""
},
"durableTask": {
"hubName": "tokentest",
"hubName": "tokencredentialsample",
"UseGracefulShutdown": true,
"storageProvider": {
"type": "Netherite",

Просмотреть файл

@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using DurableTask.Core;
@ -28,4 +27,3 @@ public class SayHello : TaskActivity<string, string>
}
}

Просмотреть файл

@ -1,12 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Azure.Identity;
using DurableTask.Core;
using DurableTask.Netherite;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
// ----------- construct the Netherite orchestration service
Console.WriteLine("Starting Netherite...");
@ -15,14 +15,13 @@ var netheriteSettings = new NetheriteOrchestrationServiceSettings()
{
HubName = "myhub",
PartitionCount = 4,
// we explicitly specify the two required connection strings here.
// Another option would be to use a connection name resolver when calling Validate().
ResolvedStorageConnectionString = "UseDevelopmentStorage=true;",
ResolvedTransportConnectionString = "SingleHost",
};
netheriteSettings.Validate();
netheriteSettings.Validate(new SimpleCredentialResolver(
new DefaultAzureCredential(),
Environment.GetEnvironmentVariable("AccountName") ?? throw new Exception("missing env var: AccountName"),
Environment.GetEnvironmentVariable("NamespaceName") ?? throw new Exception("missing env var: NamespaceName")
));
var loggerFactory = LoggerFactory.Create(builder =>
{
@ -74,4 +73,4 @@ Console.WriteLine($"Shutting down...");
await worker.StopAsync();
Console.WriteLine("Done.");
Console.WriteLine("Done.");

Просмотреть файл

@ -8,8 +8,11 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Netherite" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DurableTask.Netherite\DurableTask.Netherite.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -1,16 +1,17 @@
namespace DurableTask.Netherite
namespace DurableTask.Netherite.AzureFunctions
{
using System;
using System.Collections.Generic;
using System.Text;
using Azure.Identity;
using Microsoft.Azure.Storage.Blob.Protocol;
using Microsoft.Extensions.Logging.Abstractions;
#if !NETCOREAPP2_2
/// <summary>
/// Resolves connections using a token credential and a mapping from connection names to resource names.
/// </summary>
public abstract class TokenCredentialResolver : ConnectionResolver
public abstract class CredentialBasedConnectionNameResolver : DurableTask.Netherite.ConnectionResolver
{
readonly Azure.Core.TokenCredential tokenCredential;
@ -18,7 +19,7 @@
/// Create a connection resolver that uses an Azure token credential.
/// </summary>
/// <param name="tokenCredential">The token credential to use.</param>
public TokenCredentialResolver(Azure.Core.TokenCredential tokenCredential)
public CredentialBasedConnectionNameResolver(Azure.Core.TokenCredential tokenCredential)
{
this.tokenCredential = tokenCredential;
}
@ -71,4 +72,6 @@
}
}
}
#endif
}

Просмотреть файл

@ -0,0 +1,71 @@
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
using Azure.Identity;
using Microsoft.Extensions.Logging.Abstractions;
/// <summary>
/// Resolves connections using a token credential, a storage account name, and an eventhubs namespace name.
/// </summary>
public class SimpleCredentialResolver : ConnectionResolver
{
readonly Azure.Core.TokenCredential tokenCredential;
readonly string storageAccountName;
readonly string eventHubNamespaceName;
/// <summary>
/// Create a connection resolver that uses an Azure token credential.
/// </summary>
/// <param name="tokenCredential">The token credential to use.</param>
/// <param name="storageAccountName">The name of the storage account, or null if using in-memory emulation.</param>
/// <param name="eventHubNamespaceName">The name of the event hub namespace, or null if using the singlehost configuration.</param>
public SimpleCredentialResolver(Azure.Core.TokenCredential tokenCredential, string storageAccountName = null, string eventHubNamespaceName = null)
{
this.tokenCredential = tokenCredential;
this.storageAccountName = storageAccountName;
this.eventHubNamespaceName = eventHubNamespaceName;
}
/// <inheritdoc/>
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
switch (recourceType)
{
case ResourceType.BlobStorage:
case ResourceType.TableStorage:
return ConnectionInfo.FromTokenCredential(this.tokenCredential, this.storageAccountName, recourceType);
case ResourceType.PageBlobStorage:
return null; // same as blob storage
case ResourceType.EventHubsNamespace:
return ConnectionInfo.FromTokenCredential(this.tokenCredential, this.eventHubNamespaceName, recourceType);
default:
throw new NotImplementedException("unknown resource type");
}
}
/// <inheritdoc/>
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
if (string.IsNullOrEmpty(this.storageAccountName))
{
storageChoice = StorageChoices.Memory;
transportChoice = TransportChoices.SingleHost;
}
else if (string.IsNullOrEmpty(this.eventHubNamespaceName))
{
storageChoice = StorageChoices.Faster;
transportChoice = TransportChoices.SingleHost;
}
else
{
storageChoice = StorageChoices.Faster;
transportChoice = TransportChoices.EventHubs;
}
}
}
}

Просмотреть файл

@ -28,7 +28,6 @@ namespace DurableTask.Netherite.EventHubsTransport
{
readonly TransportAbstraction.IHost host;
readonly NetheriteOrchestrationServiceSettings settings;
readonly CloudStorageAccount cloudStorageAccount;
readonly ILogger logger;
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
@ -380,14 +379,36 @@ namespace DurableTask.Netherite.EventHubsTransport
{
byte[] taskHubGuid = this.parameters.TaskhubGuid.ToByteArray();
TimeSpan longPollingInterval = TimeSpan.FromMinutes(1);
var backoffDelay = TimeSpan.Zero;
await this.clientConnectionsEstablished[index];
while (!this.shutdownSource.IsCancellationRequested)
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index);
IEnumerable<EventData> eventData = await receiver.ReceiveAsync(1000, longPollingInterval);
IEnumerable<EventData> eventData;
try
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index);
eventData = await receiver.ReceiveAsync(1000, longPollingInterval);
backoffDelay = TimeSpan.Zero;
}
catch (Exception exception) when (!this.shutdownSource.IsCancellationRequested)
{
if (backoffDelay < TimeSpan.FromSeconds(30))
{
backoffDelay = backoffDelay + backoffDelay + TimeSpan.FromSeconds(2);
}
// if we lose access to storage temporarily, we back off, but don't quit
this.traceHelper.LogError("Client{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", Client.GetShortId(this.ClientId), index, backoffDelay, exception);
await Task.Delay(backoffDelay);
continue; // retry
}
if (eventData != null)
{