Merge branch 'main' into pr/update-eh-sdk

# Conflicts:
#	src/DurableTask.Netherite/DurableTask.Netherite.csproj
This commit is contained in:
sebastianburckhardt 2024-06-04 10:46:24 -07:00
Родитель 49204de796 f3e5f2621f
Коммит 4348f77bbe
28 изменённых файлов: 503 добавлений и 99 удалений

Просмотреть файл

@ -24,6 +24,9 @@ EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PerformanceTests", "test\PerformanceTests\PerformanceTests.csproj", "{DD1E1B3F-4FA2-4F3A-9AE1-6B2A0B864AAF}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PerformanceTests", "test\PerformanceTests\PerformanceTests.csproj", "{DD1E1B3F-4FA2-4F3A-9AE1-6B2A0B864AAF}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{D33AB157-04B9-4BAD-B580-C3C87C17828C}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{D33AB157-04B9-4BAD-B580-C3C87C17828C}"
ProjectSection(SolutionItems) = preProject
src\common.props = src\common.props
EndProjectSection
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}"
EndProject EndProject
@ -41,7 +44,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDF", "sample
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDTFx", "samples\TokenCredentialDTFx\TokenCredentialDTFx.csproj", "{FBFF0814-E6C0-489A-ACCF-9D0699219621}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDTFx", "samples\TokenCredentialDTFx\TokenCredentialDTFx.csproj", "{FBFF0814-E6C0-489A-ACCF-9D0699219621}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Functions.Worker.Extensions.DurableTask.Netherite", "src\Functions.Worker.Extensions.DurableTask.Netherite\Functions.Worker.Extensions.DurableTask.Netherite.csproj", "{3E17402B-3F65-4E5B-B752-48AD56B81208}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Functions.Worker.Extensions.DurableTask.Netherite", "src\Functions.Worker.Extensions.DurableTask.Netherite\Functions.Worker.Extensions.DurableTask.Netherite.csproj", "{3E17402B-3F65-4E5B-B752-48AD56B81208}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution

Просмотреть файл

@ -63,7 +63,7 @@ For some other considerations about how to choose the engine, see [the documenta
## Status ## Status
The current version of Netherite is *1.4.3*. Netherite supports almost all of the DT and DF APIs. The current version of Netherite is *1.5.0*. Netherite supports almost all of the DT and DF APIs.
Some notable differences to the default Azure Table storage provider include: Some notable differences to the default Azure Table storage provider include:
- Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on - Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on

19
eng/ci/code-mirror.yml Normal file
Просмотреть файл

@ -0,0 +1,19 @@
trigger:
branches:
include:
# These are the branches we'll mirror to our internal ADO instance
# Keep this set limited as appropriate (don't mirror individual user branches).
- main
resources:
repositories:
- repository: eng
type: git
name: engineering
ref: refs/tags/release
variables:
- template: ci/variables/cfs.yml@eng
extends:
template: ci/code-mirror.yml@eng

Просмотреть файл

@ -5,7 +5,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Netherite.AzureFunctions" Version="1.4.2" /> <PackageReference Include="Microsoft.Azure.DurableTask.Netherite.AzureFunctions" Version="1.4.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.13.1" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.13.2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.3.0" /> <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.3.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

Просмотреть файл

@ -22,17 +22,9 @@
<PackageIcon>icon.png</PackageIcon> <PackageIcon>icon.png</PackageIcon>
</PropertyGroup> </PropertyGroup>
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ --> <!-- Version can be edited in common.props -->
<PropertyGroup> <Import Project=".\..\common.props" />
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>3</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
</PropertyGroup>
<!-- Our netcoreapp2.2 target is a non-functional dummy target, so we don't need the warning --> <!-- Our netcoreapp2.2 target is a non-functional dummy target, so we don't need the warning -->
<PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.2' "> <PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.2' ">
@ -51,8 +43,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.15.1" /> <PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.16.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.12.0" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.13.2" />
</ItemGroup> </ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp2.2' "> <ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp2.2' ">

Просмотреть файл

@ -32,6 +32,7 @@ namespace DurableTask.Netherite.AzureFunctions
readonly IServiceProvider serviceProvider; readonly IServiceProvider serviceProvider;
readonly DurableTask.Netherite.ConnectionResolver connectionResolver; readonly DurableTask.Netherite.ConnectionResolver connectionResolver;
readonly bool usesNewPassthroughMiddlewareForEntities;
readonly bool inConsumption; readonly bool inConsumption;
// the following are boolean options that can be specified in host.json, // the following are boolean options that can be specified in host.json,
@ -96,6 +97,14 @@ namespace DurableTask.Netherite.AzureFunctions
this.TraceToConsole = ReadBooleanSetting(nameof(this.TraceToConsole)); this.TraceToConsole = ReadBooleanSetting(nameof(this.TraceToConsole));
this.TraceToBlob = ReadBooleanSetting(nameof(this.TraceToBlob)); this.TraceToBlob = ReadBooleanSetting(nameof(this.TraceToBlob));
WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
{
this.usesNewPassthroughMiddlewareForEntities = true;
}
} }
NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(string taskHubNameOverride = null, string connectionName = null) NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(string taskHubNameOverride = null, string connectionName = null)
@ -109,12 +118,19 @@ namespace DurableTask.Netherite.AzureFunctions
// different defaults for key configuration values. // different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount; int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount; int maxConcurrentActivitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount;
int maxConcurrentEntitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount;
int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000; int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000;
// The following defaults are only applied if the customer did not explicitely set them on `host.json` // The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault; this.options.MaxConcurrentOrchestratorFunctions ??= maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault; this.options.MaxConcurrentActivityFunctions ??= maxConcurrentActivitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault; this.options.MaxConcurrentEntityFunctions ??= maxConcurrentEntitiesDefault;
this.options.MaxEntityOperationBatchSize ??= maxEntityOperationBatchSizeDefault;
if (this.usesNewPassthroughMiddlewareForEntities)
{
netheriteSettings.UseSeparateQueueForEntityWorkItems = true;
}
// copy all applicable fields from both the options and the storageProvider options // copy all applicable fields from both the options and the storageProvider options
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), netheriteSettings); JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), netheriteSettings);

Просмотреть файл

@ -22,18 +22,8 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks> <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup> </PropertyGroup>
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ --> <!-- Version can be edited in common.props -->
<PropertyGroup> <Import Project=".\..\common.props" />
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>3</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
</PropertyGroup>
<ItemGroup> <ItemGroup>
<None Include="icon.png" Pack="true" PackagePath="\" /> <None Include="icon.png" Pack="true" PackagePath="\" />
</ItemGroup> </ItemGroup>
@ -56,8 +46,8 @@
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.11.2" /> <PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.11.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" /> <PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" /> <PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.23" /> <PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.16.2" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.15.1" /> <PackageReference Include="Microsoft.FASTER.Core" Version="2.6.5" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" /> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" /> <PackageReference Include="System.Threading.Channels" Version="4.7.1" />

Просмотреть файл

@ -52,6 +52,11 @@ namespace DurableTask.Netherite
[DataMember] [DataMember]
internal bool PrefetchHistory { get; set; } internal bool PrefetchHistory { get; set; }
/// <summary>
/// Whether to exclude entities from the results.
/// </summary>
[DataMember]
internal bool ExcludeEntities { get; set; }
/// <summary> /// <summary>
/// Construct an instance query with the given parameters. /// Construct an instance query with the given parameters.
@ -77,9 +82,6 @@ namespace DurableTask.Netherite
internal bool HasRuntimeStatus => this.RuntimeStatus != null && this.RuntimeStatus.Length > 0; internal bool HasRuntimeStatus => this.RuntimeStatus != null && this.RuntimeStatus.Length > 0;
internal bool IsSet => this.HasRuntimeStatus || !string.IsNullOrWhiteSpace(this.InstanceIdPrefix)
|| !(this.CreatedTimeFrom is null) || !(this.CreatedTimeTo is null);
internal bool Matches(OrchestrationState targetState) internal bool Matches(OrchestrationState targetState)
{ {
if (targetState == null) if (targetState == null)
@ -88,7 +90,8 @@ namespace DurableTask.Netherite
return (!this.HasRuntimeStatus || this.RuntimeStatus.Contains(targetState.OrchestrationStatus)) return (!this.HasRuntimeStatus || this.RuntimeStatus.Contains(targetState.OrchestrationStatus))
&& (string.IsNullOrWhiteSpace(this.InstanceIdPrefix) || targetState.OrchestrationInstance.InstanceId.StartsWith(this.InstanceIdPrefix)) && (string.IsNullOrWhiteSpace(this.InstanceIdPrefix) || targetState.OrchestrationInstance.InstanceId.StartsWith(this.InstanceIdPrefix))
&& (!this.CreatedTimeFrom.HasValue || targetState.CreatedTime >= this.CreatedTimeFrom.Value) && (!this.CreatedTimeFrom.HasValue || targetState.CreatedTime >= this.CreatedTimeFrom.Value)
&& (!this.CreatedTimeTo.HasValue || targetState.CreatedTime <= this.CreatedTimeTo.Value); && (!this.CreatedTimeTo.HasValue || targetState.CreatedTime <= this.CreatedTimeTo.Value)
&& (!this.ExcludeEntities || !DurableTask.Core.Common.Entities.IsEntityInstance(targetState.OrchestrationInstance.InstanceId));
} }
} }
} }

Просмотреть файл

@ -5,6 +5,7 @@ namespace DurableTask.Netherite
{ {
using DurableTask.Core; using DurableTask.Core;
using DurableTask.Core.Common; using DurableTask.Core.Common;
using DurableTask.Core.Entities;
using DurableTask.Core.History; using DurableTask.Core.History;
using DurableTask.Netherite.Abstractions; using DurableTask.Netherite.Abstractions;
using DurableTask.Netherite.Faster; using DurableTask.Netherite.Faster;
@ -27,7 +28,9 @@ namespace DurableTask.Netherite
DurableTask.Core.IOrchestrationService, DurableTask.Core.IOrchestrationService,
DurableTask.Core.IOrchestrationServiceClient, DurableTask.Core.IOrchestrationServiceClient,
DurableTask.Core.IOrchestrationServicePurgeClient, DurableTask.Core.IOrchestrationServicePurgeClient,
DurableTask.Core.Query.IOrchestrationServiceQueryClient,
DurableTask.Netherite.IOrchestrationServiceQueryClient, DurableTask.Netherite.IOrchestrationServiceQueryClient,
DurableTask.Core.Entities.IEntityOrchestrationService,
TransportAbstraction.IHost TransportAbstraction.IHost
{ {
/// <summary> /// <summary>
@ -43,6 +46,7 @@ namespace DurableTask.Netherite
readonly ITransportLayer transport; readonly ITransportLayer transport;
readonly IStorageLayer storage; readonly IStorageLayer storage;
readonly EntityBackendQueriesImplementation EntityBackendQueries;
readonly WorkItemTraceHelper workItemTraceHelper; readonly WorkItemTraceHelper workItemTraceHelper;
@ -88,6 +92,8 @@ namespace DurableTask.Netherite
internal WorkItemQueue<ActivityWorkItem> ActivityWorkItemQueue { get; private set; } internal WorkItemQueue<ActivityWorkItem> ActivityWorkItemQueue { get; private set; }
internal WorkItemQueue<OrchestrationWorkItem> OrchestrationWorkItemQueue { get; private set; } internal WorkItemQueue<OrchestrationWorkItem> OrchestrationWorkItemQueue { get; private set; }
internal WorkItemQueue<OrchestrationWorkItem> EntityWorkItemQueue { get; private set; }
internal LoadPublishWorker LoadPublisher { get; private set; } internal LoadPublishWorker LoadPublisher { get; private set; }
internal ILoggerFactory LoggerFactory { get; } internal ILoggerFactory LoggerFactory { get; }
@ -124,6 +130,7 @@ namespace DurableTask.Netherite
this.Settings = settings; this.Settings = settings;
this.TraceHelper = new OrchestrationServiceTraceHelper(loggerFactory, settings.LogLevelLimit, settings.WorkerId, settings.HubName); this.TraceHelper = new OrchestrationServiceTraceHelper(loggerFactory, settings.LogLevelLimit, settings.WorkerId, settings.HubName);
this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, settings.HubName); this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, settings.HubName);
this.EntityBackendQueries = new EntityBackendQueriesImplementation(this);
try try
{ {
@ -379,6 +386,7 @@ namespace DurableTask.Netherite
this.ActivityWorkItemQueue = new WorkItemQueue<ActivityWorkItem>(); this.ActivityWorkItemQueue = new WorkItemQueue<ActivityWorkItem>();
this.OrchestrationWorkItemQueue = new WorkItemQueue<OrchestrationWorkItem>(); this.OrchestrationWorkItemQueue = new WorkItemQueue<OrchestrationWorkItem>();
this.EntityWorkItemQueue = new WorkItemQueue<OrchestrationWorkItem>();
this.TraceHelper.TraceProgress($"Started client"); this.TraceHelper.TraceProgress($"Started client");
@ -475,6 +483,7 @@ namespace DurableTask.Netherite
await this.transport.StopAsync(fatalExceptionObserved: false); await this.transport.StopAsync(fatalExceptionObserved: false);
this.ActivityWorkItemQueue.Dispose(); this.ActivityWorkItemQueue.Dispose();
this.EntityWorkItemQueue.Dispose();
this.OrchestrationWorkItemQueue.Dispose(); this.OrchestrationWorkItemQueue.Dispose();
} }
@ -539,7 +548,7 @@ namespace DurableTask.Netherite
TransportAbstraction.IPartition TransportAbstraction.IHost.AddPartition(uint partitionId, TransportAbstraction.ISender batchSender) TransportAbstraction.IPartition TransportAbstraction.IHost.AddPartition(uint partitionId, TransportAbstraction.ISender batchSender)
{ {
var partition = new Partition(this, partitionId, this.GetPartitionId, this.GetNumberPartitions, batchSender, this.Settings, this.StorageAccountName, var partition = new Partition(this, partitionId, this.GetPartitionId, this.GetNumberPartitions, batchSender, this.Settings, this.StorageAccountName,
this.ActivityWorkItemQueue, this.OrchestrationWorkItemQueue, this.LoadPublisher, this.workItemTraceHelper); this.ActivityWorkItemQueue, this.OrchestrationWorkItemQueue, this.EntityWorkItemQueue, this.LoadPublisher, this.workItemTraceHelper);
return partition; return partition;
} }
@ -551,7 +560,7 @@ namespace DurableTask.Netherite
IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId) IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId)
{ {
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this); return new PartitionErrorHandler((int)partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this);
} }
void TransportAbstraction.IHost.TraceWarning(string message) void TransportAbstraction.IHost.TraceWarning(string message)
@ -744,7 +753,30 @@ namespace DurableTask.Netherite
/// <inheritdoc /> /// <inheritdoc />
async Task<InstanceQueryResult> IOrchestrationServiceQueryClient.QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken) async Task<InstanceQueryResult> IOrchestrationServiceQueryClient.QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken)
=> await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false); {
return await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
async Task<DurableTask.Core.Query.OrchestrationQueryResult> DurableTask.Core.Query.IOrchestrationServiceQueryClient.GetOrchestrationWithQueryAsync(
DurableTask.Core.Query.OrchestrationQuery query,
CancellationToken cancellationToken)
{
InstanceQuery instanceQuery = new()
{
CreatedTimeFrom = query.CreatedTimeFrom,
CreatedTimeTo = query.CreatedTimeTo,
ExcludeEntities = query.ExcludeEntities,
FetchInput = query.FetchInputsAndOutputs,
InstanceIdPrefix = query.InstanceIdPrefix,
PrefetchHistory = false,
RuntimeStatus = query.RuntimeStatus?.ToArray(),
};
Client client = await this.GetClientAsync().ConfigureAwait(false);
InstanceQueryResult result = await client.QueryOrchestrationStatesAsync(instanceQuery, query.PageSize, query.ContinuationToken, cancellationToken).ConfigureAwait(false);
return new DurableTask.Core.Query.OrchestrationQueryResult(result.Instances.ToList(), result.ContinuationToken);
}
/// <inheritdoc /> /// <inheritdoc />
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId) async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId)
@ -754,16 +786,183 @@ namespace DurableTask.Netherite
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
=> new PurgeResult(await (await this.GetClientAsync()).PurgeInstanceHistoryAsync(purgeInstanceFilter.CreatedTimeFrom, purgeInstanceFilter.CreatedTimeTo, purgeInstanceFilter.RuntimeStatus)); => new PurgeResult(await (await this.GetClientAsync()).PurgeInstanceHistoryAsync(purgeInstanceFilter.CreatedTimeFrom, purgeInstanceFilter.CreatedTimeTo, purgeInstanceFilter.RuntimeStatus));
/// <inheritdoc />
EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => this.EntityBackendQueries;
class EntityBackendQueriesImplementation : EntityBackendQueries
{
readonly NetheriteOrchestrationService service;
public EntityBackendQueriesImplementation(NetheriteOrchestrationService netheriteOrchestrationService)
{
this.service = netheriteOrchestrationService;
}
public override async Task<EntityMetadata?> GetEntityAsync(EntityId id, bool includeState = false, bool includeTransient = false, CancellationToken cancellation = default)
{
string instanceId = id.ToString();
OrchestrationState state = await(await this.service.GetClientAsync().ConfigureAwait(false))
.GetOrchestrationStateAsync(this.service.GetPartitionId(instanceId.ToString()), instanceId, fetchInput: includeState, false).ConfigureAwait(false);
return this.GetEntityMetadata(state, includeState, includeTransient);
}
public override async Task<EntityQueryResult> QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation)
{
string adjustedPrefix = string.IsNullOrEmpty(filter.InstanceIdStartsWith) ? "@" : filter.InstanceIdStartsWith;
if (adjustedPrefix[0] != '@')
{
return new EntityQueryResult()
{
Results = new List<EntityMetadata>(),
ContinuationToken = null,
};
}
var condition = new InstanceQuery()
{
InstanceIdPrefix = adjustedPrefix,
CreatedTimeFrom = filter.LastModifiedFrom,
CreatedTimeTo = filter.LastModifiedTo,
FetchInput = filter.IncludeState,
PrefetchHistory = false,
ExcludeEntities = false,
};
List<EntityMetadata> metadataList = new List<EntityMetadata>();
InstanceQueryResult result = await (await this.service.GetClientAsync().ConfigureAwait(false))
.QueryOrchestrationStatesAsync(condition, filter.PageSize ?? 200, filter.ContinuationToken, cancellation).ConfigureAwait(false);
foreach(var entry in result.Instances)
{
var metadata = this.GetEntityMetadata(entry, filter.IncludeState, filter.IncludeTransient);
if (metadata.HasValue)
{
metadataList.Add(metadata.Value);
}
}
return new EntityQueryResult()
{
Results = metadataList,
ContinuationToken = result.ContinuationToken,
};
}
public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(CleanEntityStorageRequest request = default, CancellationToken cancellation = default)
{
if (!request.ReleaseOrphanedLocks)
{
// there is no need to do anything since deletion is implicit
return new CleanEntityStorageResult();
}
var condition = new InstanceQuery()
{
InstanceIdPrefix = "@",
FetchInput = false,
PrefetchHistory = false,
ExcludeEntities = false,
};
var client = await this.service.GetClientAsync().ConfigureAwait(false);
string continuationToken = null;
int orphanedLocksReleased = 0;
// list all entities (without fetching the input) and for each locked one,
// check if the lock owner is still running. If not, release the lock.
do
{
var page = await client.QueryOrchestrationStatesAsync(condition, 500, continuationToken, cancellation).ConfigureAwait(false);
// The checks run in parallel for all entities in the page
List<Task> tasks = new List<Task>();
foreach (var state in page.Instances)
{
EntityStatus status = ClientEntityHelpers.GetEntityStatus(state.Status);
if (status != null && status.LockedBy != null)
{
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}
}
async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner)
{
uint partitionId = this.service.GetPartitionId(lockOwner);
OrchestrationState ownerState
= await client.GetOrchestrationStateAsync(partitionId, lockOwner, fetchInput: false, fetchOutput: false);
bool OrchestrationIsRunning(OrchestrationStatus? status)
=> status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended);
if (!OrchestrationIsRunning(ownerState?.OrchestrationStatus))
{
// the owner is not a running orchestration. Send a lock release.
EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner);
await client.SendTaskOrchestrationMessageBatchAsync(
this.service.GetPartitionId(state.OrchestrationInstance.InstanceId),
new TaskMessage[] { eventToSend.AsTaskMessage() });
Interlocked.Increment(ref orphanedLocksReleased);
}
}
// wait for all of the checks to finish before moving on to the next page.
await Task.WhenAll(tasks);
}
while (continuationToken != null);
return new CleanEntityStorageResult()
{
EmptyEntitiesRemoved = 0,
OrphanedLocksReleased = orphanedLocksReleased,
};
}
EntityMetadata? GetEntityMetadata(OrchestrationState state, bool includeState, bool includeTransient)
{
if (state != null)
{
// determine the status of the entity by deserializing the custom status field
EntityStatus status = ClientEntityHelpers.GetEntityStatus(state.Status);
if (status?.EntityExists == true || includeTransient)
{
return new EntityMetadata()
{
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
LastModifiedTime = state.CreatedTime,
SerializedState = (includeState && status?.EntityExists == true) ? ClientEntityHelpers.GetEntityState(state.Input) : null,
LockedBy = status?.LockedBy,
BacklogQueueSize = status?.BacklogQueueSize ?? 0,
};
}
}
return null;
}
}
/******************************/ /******************************/
// Task orchestration methods // Task orchestration methods
/******************************/ /******************************/
async Task<TaskOrchestrationWorkItem> IOrchestrationService.LockNextTaskOrchestrationWorkItemAsync( Task<TaskOrchestrationWorkItem> IOrchestrationService.LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
TimeSpan receiveTimeout, => this.LockNextWorkItemInternal(this.OrchestrationWorkItemQueue, receiveTimeout, cancellationToken);
CancellationToken cancellationToken)
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
=> this.LockNextWorkItemInternal(this.OrchestrationWorkItemQueue, receiveTimeout, cancellationToken);
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
=> this.LockNextWorkItemInternal(this.EntityWorkItemQueue, receiveTimeout, cancellationToken);
async Task<TaskOrchestrationWorkItem> LockNextWorkItemInternal(WorkItemQueue<OrchestrationWorkItem> workItemQueue, TimeSpan receiveTimeout, CancellationToken cancellationToken)
{ {
var nextOrchestrationWorkItem = await this.OrchestrationWorkItemQueue.GetNext(receiveTimeout, cancellationToken).ConfigureAwait(false); var nextOrchestrationWorkItem = await workItemQueue.GetNext(receiveTimeout, cancellationToken).ConfigureAwait(false);
if (nextOrchestrationWorkItem != null) if (nextOrchestrationWorkItem != null)
{ {
@ -771,7 +970,7 @@ namespace DurableTask.Netherite
this.workItemTraceHelper.TraceWorkItemStarted( this.workItemTraceHelper.TraceWorkItemStarted(
nextOrchestrationWorkItem.Partition.PartitionId, nextOrchestrationWorkItem.Partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration, nextOrchestrationWorkItem.WorkItemType,
nextOrchestrationWorkItem.MessageBatch.WorkItemId, nextOrchestrationWorkItem.MessageBatch.WorkItemId,
nextOrchestrationWorkItem.MessageBatch.InstanceId, nextOrchestrationWorkItem.MessageBatch.InstanceId,
nextOrchestrationWorkItem.Type.ToString(), nextOrchestrationWorkItem.Type.ToString(),
@ -857,7 +1056,7 @@ namespace DurableTask.Netherite
// It's unavoidable by design, but let's at least create a warning. // It's unavoidable by design, but let's at least create a warning.
this.workItemTraceHelper.TraceWorkItemDiscarded( this.workItemTraceHelper.TraceWorkItemDiscarded(
partition.PartitionId, partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration, orchestrationWorkItem.WorkItemType,
messageBatch.WorkItemId, messageBatch.WorkItemId,
workItem.InstanceId, workItem.InstanceId,
"", "",
@ -899,7 +1098,7 @@ namespace DurableTask.Netherite
this.workItemTraceHelper.TraceWorkItemCompleted( this.workItemTraceHelper.TraceWorkItemCompleted(
partition.PartitionId, partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration, orchestrationWorkItem.WorkItemType,
messageBatch.WorkItemId, messageBatch.WorkItemId,
workItem.InstanceId, workItem.InstanceId,
batchProcessedEvent.OrchestrationStatus, batchProcessedEvent.OrchestrationStatus,
@ -1077,5 +1276,15 @@ namespace DurableTask.Netherite
int IOrchestrationService.MaxConcurrentTaskActivityWorkItems => this.Settings.MaxConcurrentActivityFunctions; int IOrchestrationService.MaxConcurrentTaskActivityWorkItems => this.Settings.MaxConcurrentActivityFunctions;
int IOrchestrationService.TaskActivityDispatcherCount => this.Settings.ActivityDispatcherCount; int IOrchestrationService.TaskActivityDispatcherCount => this.Settings.ActivityDispatcherCount;
EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties()
{
EntityMessageReorderWindow = TimeSpan.Zero,
MaxConcurrentTaskEntityWorkItems = this.Settings.MaxConcurrentEntityFunctions,
MaxEntityOperationBatchSize = this.Settings.MaxEntityOperationBatchSize,
MaximumSignalDelayTime = TimeSpan.MaxValue,
SupportsImplicitEntityDeletion = true,
UseSeparateQueueForEntityWorkItems = this.Settings.UseSeparateQueueForEntityWorkItems,
};
} }
} }

Просмотреть файл

@ -18,7 +18,8 @@ namespace DurableTask.Netherite
public class NetheriteOrchestrationServiceSettings public class NetheriteOrchestrationServiceSettings
{ {
/// <summary> /// <summary>
/// The name of the taskhub. Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. /// The name of the taskhub.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary> /// </summary>
public string HubName { get; set; } public string HubName { get; set; }
@ -57,19 +58,40 @@ namespace DurableTask.Netherite
public Faster.BlobManager.FasterTuningParameters FasterTuningParameters { get; set; } = null; public Faster.BlobManager.FasterTuningParameters FasterTuningParameters { get; set; } = null;
/// <summary> /// <summary>
/// Gets or sets the maximum number of work items that can be processed concurrently on a single node. /// Gets or sets the maximum number of activity work items that can be processed concurrently on a single node.
/// The default value is 100. /// The default value is 100.
/// Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary> /// </summary>
public int MaxConcurrentActivityFunctions { get; set; } = 100; public int MaxConcurrentActivityFunctions { get; set; } = 100;
/// <summary> /// <summary>
/// Gets or sets the maximum number of orchestrations that can be processed concurrently on a single node. /// Gets or sets the maximum number of orchestration work items that can be processed concurrently on a single node.
/// The default value is 100. /// The default value is 100.
/// Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary> /// </summary>
public int MaxConcurrentOrchestratorFunctions { get; set; } = 100; public int MaxConcurrentOrchestratorFunctions { get; set; } = 100;
/// <summary>
/// Gets or sets the maximum number of entity work items that can be processed concurrently on a single node.
/// The default value is 100.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public int MaxConcurrentEntityFunctions { get; set; } = 100;
/// <summary>
/// Whether to use separate work item queues for entities and orchestrators.
/// This defaults to false, to maintain compatility with legacy front ends.
/// Newer front ends explicitly set this to true.
/// </summary>
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;
/// <summary>
/// Gets or sets the maximum number of entity operations that are processed as a single batch.
/// The default value is 1000.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public int MaxEntityOperationBatchSize { get; set; } = 1000;
/// <summary> /// <summary>
/// Gets or sets the number of dispatchers used to dispatch orchestrations. /// Gets or sets the number of dispatchers used to dispatch orchestrations.
/// </summary> /// </summary>

Просмотреть файл

@ -32,6 +32,8 @@ namespace DurableTask.Netherite
public override bool RestoreOriginalRuntimeStateDuringCompletion => false; public override bool RestoreOriginalRuntimeStateDuringCompletion => false;
public WorkItemTraceHelper.WorkItemType WorkItemType => DurableTask.Core.Common.Entities.IsEntityInstance(this.InstanceId) ? WorkItemTraceHelper.WorkItemType.Entity : WorkItemTraceHelper.WorkItemType.Orchestration;
public OrchestrationWorkItem(Partition partition, OrchestrationMessageBatch messageBatch, List<HistoryEvent> previousHistory = null, string customStatus = null) public OrchestrationWorkItem(Partition partition, OrchestrationMessageBatch messageBatch, List<HistoryEvent> previousHistory = null, string customStatus = null)
{ {
this.Partition = partition; this.Partition = partition;

Просмотреть файл

@ -46,6 +46,7 @@ namespace DurableTask.Netherite
public TransportAbstraction.ISender BatchSender { get; private set; } public TransportAbstraction.ISender BatchSender { get; private set; }
public WorkItemQueue<ActivityWorkItem> ActivityWorkItemQueue { get; private set; } public WorkItemQueue<ActivityWorkItem> ActivityWorkItemQueue { get; private set; }
public WorkItemQueue<OrchestrationWorkItem> OrchestrationWorkItemQueue { get; private set; } public WorkItemQueue<OrchestrationWorkItem> OrchestrationWorkItemQueue { get; private set; }
public WorkItemQueue<OrchestrationWorkItem> EntityWorkItemQueue { get; private set; }
public LoadPublishWorker LoadPublisher { get; private set; } public LoadPublishWorker LoadPublisher { get; private set; }
public BatchTimer<PartitionEvent> PendingTimers { get; private set; } public BatchTimer<PartitionEvent> PendingTimers { get; private set; }
@ -70,6 +71,7 @@ namespace DurableTask.Netherite
string storageAccountName, string storageAccountName,
WorkItemQueue<ActivityWorkItem> activityWorkItemQueue, WorkItemQueue<ActivityWorkItem> activityWorkItemQueue,
WorkItemQueue<OrchestrationWorkItem> orchestrationWorkItemQueue, WorkItemQueue<OrchestrationWorkItem> orchestrationWorkItemQueue,
WorkItemQueue<OrchestrationWorkItem> entityWorkItemQueue,
LoadPublishWorker loadPublisher, LoadPublishWorker loadPublisher,
WorkItemTraceHelper workItemTraceHelper) WorkItemTraceHelper workItemTraceHelper)
@ -83,6 +85,7 @@ namespace DurableTask.Netherite
this.StorageAccountName = storageAccountName; this.StorageAccountName = storageAccountName;
this.ActivityWorkItemQueue = activityWorkItemQueue; this.ActivityWorkItemQueue = activityWorkItemQueue;
this.OrchestrationWorkItemQueue = orchestrationWorkItemQueue; this.OrchestrationWorkItemQueue = orchestrationWorkItemQueue;
this.EntityWorkItemQueue = entityWorkItemQueue;
this.LoadPublisher = loadPublisher; this.LoadPublisher = loadPublisher;
this.TraceHelper = new PartitionTraceHelper(host.TraceHelper.Logger, settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this.PartitionId); this.TraceHelper = new PartitionTraceHelper(host.TraceHelper.Logger, settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this.PartitionId);
this.EventTraceHelper = new EventTraceHelper(host.LoggerFactory, settings.EventLogLevelLimit, this); this.EventTraceHelper = new EventTraceHelper(host.LoggerFactory, settings.EventLogLevelLimit, this);
@ -326,14 +329,21 @@ namespace DurableTask.Netherite
{ {
this.WorkItemTraceHelper.TraceWorkItemQueued( this.WorkItemTraceHelper.TraceWorkItemQueued(
this.PartitionId, this.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration, item.WorkItemType,
item.MessageBatch.WorkItemId, item.MessageBatch.WorkItemId,
item.InstanceId, item.InstanceId,
item.Type.ToString(), item.Type.ToString(),
item.EventCount, item.EventCount,
WorkItemTraceHelper.FormatMessageIdList(item.MessageBatch.TracedMessages)); WorkItemTraceHelper.FormatMessageIdList(item.MessageBatch.TracedMessages));
if (this.Settings.UseSeparateQueueForEntityWorkItems && item.WorkItemType == WorkItemTraceHelper.WorkItemType.Entity)
{
this.EntityWorkItemQueue.Add(item);
}
else
{
this.OrchestrationWorkItemQueue.Add(item); this.OrchestrationWorkItemQueue.Add(item);
} }
} }
}
} }

Просмотреть файл

@ -27,6 +27,9 @@ namespace DurableTask.Netherite
[DataMember] [DataMember]
public List<WaitRequestReceived> Waiters { get; set; } public List<WaitRequestReceived> Waiters { get; set; }
[DataMember]
public string CreationRequestEventId { get; set; }
[IgnoreDataMember] [IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Instance, this.InstanceId); public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Instance, this.InstanceId);
@ -39,12 +42,19 @@ namespace DurableTask.Netherite
public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects)
{ {
if (creationRequestReceived.EventIdString == this.CreationRequestEventId)
{
// we have already processed this event - it must be a duplicate delivery. Ignore it.
return;
};
bool exists = this.OrchestrationState != null; bool exists = this.OrchestrationState != null;
bool filterDuplicate = exists
bool previousExecutionWithDedupeStatus = exists
&& creationRequestReceived.DedupeStatuses != null && creationRequestReceived.DedupeStatuses != null
&& creationRequestReceived.DedupeStatuses.Contains(this.OrchestrationState.OrchestrationStatus); && creationRequestReceived.DedupeStatuses.Contains(this.OrchestrationState.OrchestrationStatus);
if (!filterDuplicate) if (!previousExecutionWithDedupeStatus)
{ {
var ee = creationRequestReceived.ExecutionStartedEvent; var ee = creationRequestReceived.ExecutionStartedEvent;
@ -65,6 +75,7 @@ namespace DurableTask.Netherite
ScheduledStartTime = ee.ScheduledStartTime ScheduledStartTime = ee.ScheduledStartTime
}; };
this.OrchestrationStateSize = DurableTask.Netherite.SizeUtils.GetEstimatedSize(this.OrchestrationState); this.OrchestrationStateSize = DurableTask.Netherite.SizeUtils.GetEstimatedSize(this.OrchestrationState);
this.CreationRequestEventId = creationRequestReceived.EventIdString;
// queue the message in the session, or start a timer if delayed // queue the message in the session, or start a timer if delayed
if (!ee.ScheduledStartTime.HasValue) if (!ee.ScheduledStartTime.HasValue)
@ -87,7 +98,7 @@ namespace DurableTask.Netherite
{ {
ClientId = creationRequestReceived.ClientId, ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId, RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate, Succeeded = !previousExecutionWithDedupeStatus,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus, ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
}; };
} }
@ -211,6 +222,8 @@ namespace DurableTask.Netherite
effects.AddDeletion(TrackedObjectKey.History(this.InstanceId)); effects.AddDeletion(TrackedObjectKey.History(this.InstanceId));
this.OrchestrationState = null; this.OrchestrationState = null;
this.OrchestrationStateSize = 0;
this.CreationRequestEventId = null;
this.Waiters = null; this.Waiters = null;
} }

Просмотреть файл

@ -327,7 +327,7 @@ namespace DurableTask.Netherite
{ {
this.Partition.WorkItemTraceHelper.TraceWorkItemDiscarded( this.Partition.WorkItemTraceHelper.TraceWorkItemDiscarded(
this.Partition.PartitionId, this.Partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration, DurableTask.Core.Common.Entities.IsEntityInstance(evt.InstanceId) ? WorkItemTraceHelper.WorkItemType.Entity : WorkItemTraceHelper.WorkItemType.Orchestration,
evt.WorkItemId, evt.InstanceId, evt.WorkItemId, evt.InstanceId,
session != null ? this.GetSessionPosition(session) : null, session != null ? this.GetSessionPosition(session) : null,
"session was replaced"); "session was replaced");

Просмотреть файл

@ -124,7 +124,7 @@ namespace DurableTask.Netherite.Faster
MutableFraction = tuningParameters?.StoreLogMutableFraction ?? 0.9, MutableFraction = tuningParameters?.StoreLogMutableFraction ?? 0.9,
SegmentSizeBits = segmentSizeBits, SegmentSizeBits = segmentSizeBits,
PreallocateLog = false, PreallocateLog = false,
ReadFlags = ReadFlags.None, ReadCopyOptions = default, // is overridden by the per-session configuration
ReadCacheSettings = null, // no read cache ReadCacheSettings = null, // no read cache
MemorySizeBits = memorySizeBits, MemorySizeBits = memorySizeBits,
}; };
@ -886,7 +886,7 @@ namespace DurableTask.Netherite.Faster
#region ILogCommitManager #region ILogCommitManager
void ILogCommitManager.Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum) void ILogCommitManager.Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum, bool forceWriteMetadata)
{ {
try try
{ {
@ -1482,5 +1482,10 @@ namespace DurableTask.Netherite.Faster
}); });
} }
} }
public void CheckpointVersionShift(long oldVersion, long newVersion)
{
// no-op
}
} }
} }

Просмотреть файл

@ -91,5 +91,10 @@ namespace DurableTask.Netherite.Faster
void IDisposable.Dispose() void IDisposable.Dispose()
=> this.localCheckpointManager.Dispose(); => this.localCheckpointManager.Dispose();
public void CheckpointVersionShift(long oldVersion, long newVersion)
{
// no-op
}
} }
} }

Просмотреть файл

@ -182,7 +182,12 @@ namespace DurableTask.Netherite.Faster
ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> CreateASession(string id, bool isScan) ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> CreateASession(string id, bool isScan)
{ {
var functions = new Functions(this.partition, this, this.cacheTracker, isScan); var functions = new Functions(this.partition, this, this.cacheTracker, isScan);
return this.fht.NewSession(functions, id, readFlags: (isScan ? ReadFlags.None : ReadFlags.CopyReadsToTail));
ReadCopyOptions readCopyOptions = isScan
? new ReadCopyOptions(ReadCopyFrom.None, ReadCopyTo.None)
: new ReadCopyOptions(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog);
return this.fht.NewSession(functions, id, default, readCopyOptions);
} }
public IDisposable TrackTemporarySession(ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> session) public IDisposable TrackTemporarySession(ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> session)
@ -969,8 +974,8 @@ namespace DurableTask.Netherite.Faster
{ {
while (enumerator.MoveNext()) while (enumerator.MoveNext())
{ {
if (!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) if ((!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) && !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix))
&& !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix)) || (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(enumerator.Current)))
{ {
// the instance does not match the prefix // the instance does not match the prefix
continue; continue;
@ -1189,8 +1194,8 @@ namespace DurableTask.Netherite.Faster
scanned++; scanned++;
//this.partition.EventDetailTracer?.TraceEventProcessingDetail($"found instance {key.InstanceId}"); //this.partition.EventDetailTracer?.TraceEventProcessingDetail($"found instance {key.InstanceId}");
if (string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) if ((string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) || key.Val.InstanceId.StartsWith(instanceQuery.InstanceIdPrefix))
|| key.Val.InstanceId.StartsWith(instanceQuery.InstanceIdPrefix)) || (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(key.Val.InstanceId)))
{ {
//this.partition.EventDetailTracer?.TraceEventProcessingDetail($"reading instance {key.InstanceId}"); //this.partition.EventDetailTracer?.TraceEventProcessingDetail($"reading instance {key.InstanceId}");

Просмотреть файл

@ -142,6 +142,7 @@ namespace DurableTask.Netherite.Faster
this.traceHelper.TraceProgress($"Using existing blob container at {this.cloudBlobContainer.Result.Uri}"); this.traceHelper.TraceProgress($"Using existing blob container at {this.cloudBlobContainer.Result.Uri}");
} }
var taskHubParameters = new TaskhubParameters() var taskHubParameters = new TaskhubParameters()
{ {
TaskhubName = this.settings.HubName, TaskhubName = this.settings.HubName,

Просмотреть файл

@ -57,7 +57,8 @@ namespace DurableTask.Netherite
None, None,
Client, Client,
Activity, Activity,
Orchestration Orchestration,
Entity,
} }
public enum ClientStatus public enum ClientStatus

Просмотреть файл

@ -22,18 +22,8 @@
<PackageIcon>icon.png</PackageIcon> <PackageIcon>icon.png</PackageIcon>
</PropertyGroup> </PropertyGroup>
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ --> <!-- Version can be edited in common.props -->
<!-- This version MUST be kept constant with DurableTask.Netherite.AzureFunctions --> <Import Project=".\..\common.props" />
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>3</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
</PropertyGroup>
<ItemGroup> <ItemGroup>
<None Include="icon.png" Pack="true" PackagePath="\" /> <None Include="icon.png" Pack="true" PackagePath="\" />
@ -50,4 +40,14 @@
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.1.0" /> <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.1.0" />
</ItemGroup> </ItemGroup>
<!-- This tells the .NET Isolated Worker SDK which WebJobs extension this package depends on -->
<ItemGroup>
<AssemblyAttribute Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions.ExtensionInformationAttribute">
<_Parameter1>Microsoft.Azure.DurableTask.Netherite.AzureFunctions</_Parameter1>
<_Parameter2>$(PackageVersion)</_Parameter2>
<_Parameter3>true</_Parameter3>
<_Parameter3_IsLiteral>true</_Parameter3_IsLiteral>
</AssemblyAttribute>
</ItemGroup>
</Project> </Project>

Просмотреть файл

@ -1,7 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
// This must be updated when updating the version of the package
[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.4.3", true)]

14
src/common.props Normal file
Просмотреть файл

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>5</MinorVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
</PropertyGroup>
</Project>

Просмотреть файл

@ -63,6 +63,44 @@ namespace DurableTask.Netherite.AzureFunctions.Tests
actual: status.Output?.ToString(Formatting.None)); actual: status.Output?.ToString(Formatting.None));
} }
[Fact]
public async Task CanOrchestrateEntities()
{
DurableOrchestrationStatus status = await this.RunOrchestrationAsync(nameof(Functions.OrchestrateCounterEntity));
Assert.Equal(OrchestrationRuntimeStatus.Completed, status.RuntimeStatus);
Assert.Equal(7, (int)status.Output);
}
[Fact]
public async Task CanClientInteractWithEntities()
{
IDurableClient client = await this.GetDurableClientAsync();
var entityId = new EntityId(nameof(Functions.Counter), Guid.NewGuid().ToString("N"));
EntityStateResponse<int> result = await client.ReadEntityStateAsync<int>(entityId);
Assert.False(result.EntityExists);
await Task.WhenAll(
client.SignalEntityAsync(entityId, "incr"),
client.SignalEntityAsync(entityId, "incr"),
client.SignalEntityAsync(entityId, "incr"),
client.SignalEntityAsync(entityId, "add", 4));
await Task.Delay(TimeSpan.FromSeconds(5));
result = await client.ReadEntityStateAsync<int>(entityId);
Assert.True(result.EntityExists);
Assert.Equal(7, result.EntityState);
}
[Fact]
public async Task CanOrchestrationInteractWithEntities()
{
DurableOrchestrationStatus status = await this.RunOrchestrationAsync(nameof(Functions.IncrementThenGet));
Assert.Equal(OrchestrationRuntimeStatus.Completed, status.RuntimeStatus);
Assert.Equal(1, (int)status.Output);
}
static class Functions static class Functions
{ {
[FunctionName(nameof(Sequence))] [FunctionName(nameof(Sequence))]
@ -99,6 +137,72 @@ namespace DurableTask.Netherite.AzureFunctions.Tests
[FunctionName(nameof(IntToString))] [FunctionName(nameof(IntToString))]
public static string IntToString([ActivityTrigger] int input) => input.ToString(); public static string IntToString([ActivityTrigger] int input) => input.ToString();
[FunctionName(nameof(OrchestrateCounterEntity))]
public static async Task<int> OrchestrateCounterEntity(
[OrchestrationTrigger] IDurableOrchestrationContext ctx)
{
var entityId = new EntityId(nameof(Counter), ctx.NewGuid().ToString("N"));
ctx.SignalEntity(entityId, "incr");
ctx.SignalEntity(entityId, "incr");
ctx.SignalEntity(entityId, "incr");
ctx.SignalEntity(entityId, "add", 4);
using (await ctx.LockAsync(entityId))
{
int result = await ctx.CallEntityAsync<int>(entityId, "get");
return result;
}
}
[FunctionName(nameof(Counter))]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int current = ctx.GetState<int>();
switch (ctx.OperationName)
{
case "incr":
ctx.SetState(current + 1);
break;
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(current + amount);
break;
case "get":
ctx.Return(current);
break;
case "set":
amount = ctx.GetInput<int>();
ctx.SetState(amount);
break;
case "delete":
ctx.DeleteState();
break;
default:
throw new NotImplementedException("No such entity operation");
}
}
[FunctionName(nameof(IncrementThenGet))]
public static async Task<int> IncrementThenGet([OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Key needs to be pseudo-random to avoid conflicts with multiple test runs.
string key = context.NewGuid().ToString().Substring(0, 8);
EntityId entityId = new EntityId(nameof(Counter), key);
context.SignalEntity(entityId, "add", 1);
// Invoking a sub-orchestration as a regression test for https://github.com/microsoft/durabletask-mssql/issues/146
return await context.CallSubOrchestratorAsync<int>(nameof(GetEntityAsync), entityId);
}
[FunctionName(nameof(GetEntityAsync))]
public static async Task<int> GetEntityAsync([OrchestrationTrigger] IDurableOrchestrationContext context)
{
EntityId entityId = context.GetInput<EntityId>();
return await context.CallEntityAsync<int>(entityId, "get");
}
} }
} }
} }

Просмотреть файл

@ -24,7 +24,6 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" /> <ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" />
<ProjectReference Include="..\DurableTask.Netherite.Tests\DurableTask.Netherite.Tests.csproj" /> <ProjectReference Include="..\DurableTask.Netherite.Tests\DurableTask.Netherite.Tests.csproj" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.15.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

Просмотреть файл

@ -140,7 +140,7 @@ namespace DurableTask.Netherite.AzureFunctions.Tests
return await client.PurgeInstanceHistoryAsync(default, default, null); return await client.PurgeInstanceHistoryAsync(default, default, null);
} }
async Task<IDurableClient> GetDurableClientAsync() protected async Task<IDurableClient> GetDurableClientAsync()
{ {
var clientRef = new IDurableClient[1]; var clientRef = new IDurableClient[1];
await this.CallFunctionAsync(nameof(ClientFunctions.GetDurableClient), "clientRef", clientRef); await this.CallFunctionAsync(nameof(ClientFunctions.GetDurableClient), "clientRef", clientRef);

Просмотреть файл

@ -9,7 +9,6 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit" Version="2.4.2" /> <PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">

Просмотреть файл

@ -5,7 +5,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Netherite.AzureFunctions" Version="1.1.1" /> <PackageReference Include="Microsoft.Azure.DurableTask.Netherite.AzureFunctions" Version="1.1.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.12.0" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.13.2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" /> <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
</ItemGroup> </ItemGroup>

Просмотреть файл

@ -6,7 +6,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.2" /> <PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.12.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="5.1.2" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="5.1.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.4" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.4" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" /> <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" />