From 40a1f8c68fe540148331e8aa16492d9301c9e36d Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Mon, 22 Apr 2024 13:07:27 -0700 Subject: [PATCH 01/10] Deduplicate CreationRequestReceived events (#381) * always deduplicate CreationRequestReceived events * Update src/DurableTask.Netherite/PartitionState/InstanceState.cs Co-authored-by: David Justo --------- Co-authored-by: David Justo --- .../PartitionState/InstanceState.cs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Netherite/PartitionState/InstanceState.cs b/src/DurableTask.Netherite/PartitionState/InstanceState.cs index 4f8240b..76351fb 100644 --- a/src/DurableTask.Netherite/PartitionState/InstanceState.cs +++ b/src/DurableTask.Netherite/PartitionState/InstanceState.cs @@ -27,6 +27,9 @@ namespace DurableTask.Netherite [DataMember] public List Waiters { get; set; } + [DataMember] + public string CreationRequestEventId { get; set; } + [IgnoreDataMember] public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Instance, this.InstanceId); @@ -39,12 +42,19 @@ namespace DurableTask.Netherite public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) { + if (creationRequestReceived.EventIdString == this.CreationRequestEventId) + { + // we have already processed this event - it must be a duplicate delivery. Ignore it. + return; + }; + bool exists = this.OrchestrationState != null; - bool filterDuplicate = exists + + bool previousExecutionWithDedupeStatus = exists && creationRequestReceived.DedupeStatuses != null && creationRequestReceived.DedupeStatuses.Contains(this.OrchestrationState.OrchestrationStatus); - if (!filterDuplicate) + if (!previousExecutionWithDedupeStatus) { var ee = creationRequestReceived.ExecutionStartedEvent; @@ -65,6 +75,7 @@ namespace DurableTask.Netherite ScheduledStartTime = ee.ScheduledStartTime }; this.OrchestrationStateSize = DurableTask.Netherite.SizeUtils.GetEstimatedSize(this.OrchestrationState); + this.CreationRequestEventId = creationRequestReceived.EventIdString; // queue the message in the session, or start a timer if delayed if (!ee.ScheduledStartTime.HasValue) @@ -87,7 +98,7 @@ namespace DurableTask.Netherite { ClientId = creationRequestReceived.ClientId, RequestId = creationRequestReceived.RequestId, - Succeeded = !filterDuplicate, + Succeeded = !previousExecutionWithDedupeStatus, ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus, }; } @@ -211,6 +222,8 @@ namespace DurableTask.Netherite effects.AddDeletion(TrackedObjectKey.History(this.InstanceId)); this.OrchestrationState = null; + this.OrchestrationStateSize = 0; + this.CreationRequestEventId = null; this.Waiters = null; } From 687071326d51e2b3af23a290bcbabe4132895c4e Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Mon, 22 Apr 2024 14:43:30 -0700 Subject: [PATCH 02/10] Upgrade FASTER dependency to v2.6.4 (#344) * initial commit * added comment * added readCopyOptions * addressed PR feedback * updated CheckpointVersionShift() * updated Netherite version * Update src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs * Update src/DurableTask.Netherite/DurableTask.Netherite.csproj * update to FASTER v2.6.3 * update to FASTER 2.6.4 --------- Co-authored-by: Sebastian Burckhardt --- src/DurableTask.Netherite/DurableTask.Netherite.csproj | 4 ++-- .../StorageLayer/Faster/AzureBlobs/BlobManager.cs | 9 +++++++-- .../Faster/AzureBlobs/LocalFileCheckpointManager.cs | 5 +++++ .../StorageLayer/Faster/FasterKV.cs | 7 ++++++- .../StorageLayer/Faster/FasterStorageProvider.cs | 1 + .../DurableTask.Netherite.Tests.csproj | 1 - 6 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 60f36d7..faced2a 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -56,9 +56,9 @@ - - + + diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index 2ca2478..d2871f1 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -124,7 +124,7 @@ namespace DurableTask.Netherite.Faster MutableFraction = tuningParameters?.StoreLogMutableFraction ?? 0.9, SegmentSizeBits = segmentSizeBits, PreallocateLog = false, - ReadFlags = ReadFlags.None, + ReadCopyOptions = default, // is overridden by the per-session configuration ReadCacheSettings = null, // no read cache MemorySizeBits = memorySizeBits, }; @@ -886,7 +886,7 @@ namespace DurableTask.Netherite.Faster #region ILogCommitManager - void ILogCommitManager.Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum) + void ILogCommitManager.Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum, bool forceWriteMetadata) { try { @@ -1482,5 +1482,10 @@ namespace DurableTask.Netherite.Faster }); } } + + public void CheckpointVersionShift(long oldVersion, long newVersion) + { + // no-op + } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/LocalFileCheckpointManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/LocalFileCheckpointManager.cs index 61b1fc2..282ac9d 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/LocalFileCheckpointManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/LocalFileCheckpointManager.cs @@ -91,5 +91,10 @@ namespace DurableTask.Netherite.Faster void IDisposable.Dispose() => this.localCheckpointManager.Dispose(); + + public void CheckpointVersionShift(long oldVersion, long newVersion) + { + // no-op + } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 65b6840..8d99045 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -182,7 +182,12 @@ namespace DurableTask.Netherite.Faster ClientSession> CreateASession(string id, bool isScan) { var functions = new Functions(this.partition, this, this.cacheTracker, isScan); - return this.fht.NewSession(functions, id, readFlags: (isScan ? ReadFlags.None : ReadFlags.CopyReadsToTail)); + + ReadCopyOptions readCopyOptions = isScan + ? new ReadCopyOptions(ReadCopyFrom.None, ReadCopyTo.None) + : new ReadCopyOptions(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog); + + return this.fht.NewSession(functions, id, default, readCopyOptions); } public IDisposable TrackTemporarySession(ClientSession> session) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs index 7d7bf28..c652e2d 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs @@ -142,6 +142,7 @@ namespace DurableTask.Netherite.Faster this.traceHelper.TraceProgress($"Using existing blob container at {this.cloudBlobContainer.Result.Uri}"); } + var taskHubParameters = new TaskhubParameters() { TaskhubName = this.settings.HubName, diff --git a/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj b/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj index 1c2cead..88b5fb3 100644 --- a/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj +++ b/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj @@ -9,7 +9,6 @@ - From 7ca60baec28db2778fc1fd4e838e046ff7cc54e1 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Mon, 22 Apr 2024 14:43:44 -0700 Subject: [PATCH 03/10] update to v1.5.0 (#386) --- README.md | 2 +- .../DurableTask.Netherite.AzureFunctions.csproj | 4 ++-- src/DurableTask.Netherite/DurableTask.Netherite.csproj | 4 ++-- .../Functions.Worker.Extensions.DurableTask.Netherite.csproj | 4 ++-- .../Properties/AssemblyInfo.cs | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 46f843f..cae4893 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ For some other considerations about how to choose the engine, see [the documenta ## Status -The current version of Netherite is *1.4.3*. Netherite supports almost all of the DT and DF APIs. +The current version of Netherite is *1.5.0*. Netherite supports almost all of the DT and DF APIs. Some notable differences to the default Azure Table storage provider include: - Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 62bbd73..658658c 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -25,8 +25,8 @@ 1 - 4 - 3 + 5 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index faced2a..cd88b39 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -25,8 +25,8 @@ 1 - 4 - 3 + 5 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj index 4b5d5d4..59a6d14 100644 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj +++ b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj @@ -26,8 +26,8 @@ 1 - 4 - 3 + 5 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs b/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs index bdff52c..70f63c1 100644 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs +++ b/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs @@ -4,4 +4,4 @@ using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; // This must be updated when updating the version of the package -[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.4.3", true)] \ No newline at end of file +[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.5.0", true)] \ No newline at end of file From 24efd5478d9982fdbcc93e4ab27470910733c9a9 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Fri, 26 Apr 2024 09:31:15 -0700 Subject: [PATCH 04/10] Add support for isolated entities (#358) --- ...urableTask.Netherite.AzureFunctions.csproj | 4 +- .../NetheriteProviderFactory.cs | 22 +- .../DurableTask.Netherite.csproj | 2 +- .../OrchestrationService/InstanceQuery.cs | 11 +- .../NetheriteOrchestrationService.cs | 235 +++++++++++++++++- .../NetheriteOrchestrationServiceSettings.cs | 32 ++- .../OrchestrationWorkItem.cs | 2 + .../OrchestrationService/Partition.cs | 14 +- .../PartitionState/SessionsState.cs | 2 +- .../StorageLayer/Faster/FasterKV.cs | 8 +- .../Tracing/WorkItemTraceHelper.cs | 3 +- ...Task.Netherite.AzureFunctions.Tests.csproj | 3 +- test/LoadGeneratorApp/LoadGeneratorApp.csproj | 2 +- test/PerformanceTests/PerformanceTests.csproj | 2 +- 14 files changed, 303 insertions(+), 39 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 658658c..5d2950b 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -51,8 +51,8 @@ - - + + diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs index d465d84..abb7875 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs @@ -32,6 +32,7 @@ namespace DurableTask.Netherite.AzureFunctions readonly IServiceProvider serviceProvider; readonly DurableTask.Netherite.ConnectionResolver connectionResolver; + readonly bool usesNewPassthroughMiddlewareForEntities; readonly bool inConsumption; // the following are boolean options that can be specified in host.json, @@ -96,6 +97,14 @@ namespace DurableTask.Netherite.AzureFunctions this.TraceToConsole = ReadBooleanSetting(nameof(this.TraceToConsole)); this.TraceToBlob = ReadBooleanSetting(nameof(this.TraceToBlob)); + + WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType(); + if (runtimeType == WorkerRuntimeType.DotNetIsolated || + runtimeType == WorkerRuntimeType.Java || + runtimeType == WorkerRuntimeType.Custom) + { + this.usesNewPassthroughMiddlewareForEntities = true; + } } NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(string taskHubNameOverride = null, string connectionName = null) @@ -109,12 +118,19 @@ namespace DurableTask.Netherite.AzureFunctions // different defaults for key configuration values. int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount; int maxConcurrentActivitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount; + int maxConcurrentEntitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount; int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000; // The following defaults are only applied if the customer did not explicitely set them on `host.json` - this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault; - this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault; - this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault; + this.options.MaxConcurrentOrchestratorFunctions ??= maxConcurrentOrchestratorsDefault; + this.options.MaxConcurrentActivityFunctions ??= maxConcurrentActivitiesDefault; + this.options.MaxConcurrentEntityFunctions ??= maxConcurrentEntitiesDefault; + this.options.MaxEntityOperationBatchSize ??= maxEntityOperationBatchSizeDefault; + + if (this.usesNewPassthroughMiddlewareForEntities) + { + netheriteSettings.UseSeparateQueueForEntityWorkItems = true; + } // copy all applicable fields from both the options and the storageProvider options JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), netheriteSettings); diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index cd88b39..9682550 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -56,7 +56,7 @@ - + diff --git a/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs b/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs index 683f785..65006a5 100644 --- a/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs +++ b/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs @@ -52,6 +52,11 @@ namespace DurableTask.Netherite [DataMember] internal bool PrefetchHistory { get; set; } + /// + /// Whether to exclude entities from the results. + /// + [DataMember] + internal bool ExcludeEntities { get; set; } /// /// Construct an instance query with the given parameters. @@ -77,9 +82,6 @@ namespace DurableTask.Netherite internal bool HasRuntimeStatus => this.RuntimeStatus != null && this.RuntimeStatus.Length > 0; - internal bool IsSet => this.HasRuntimeStatus || !string.IsNullOrWhiteSpace(this.InstanceIdPrefix) - || !(this.CreatedTimeFrom is null) || !(this.CreatedTimeTo is null); - internal bool Matches(OrchestrationState targetState) { if (targetState == null) @@ -88,7 +90,8 @@ namespace DurableTask.Netherite return (!this.HasRuntimeStatus || this.RuntimeStatus.Contains(targetState.OrchestrationStatus)) && (string.IsNullOrWhiteSpace(this.InstanceIdPrefix) || targetState.OrchestrationInstance.InstanceId.StartsWith(this.InstanceIdPrefix)) && (!this.CreatedTimeFrom.HasValue || targetState.CreatedTime >= this.CreatedTimeFrom.Value) - && (!this.CreatedTimeTo.HasValue || targetState.CreatedTime <= this.CreatedTimeTo.Value); + && (!this.CreatedTimeTo.HasValue || targetState.CreatedTime <= this.CreatedTimeTo.Value) + && (!this.ExcludeEntities || !DurableTask.Core.Common.Entities.IsEntityInstance(targetState.OrchestrationInstance.InstanceId)); } } } diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 5b60c87..8c5e94e 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -5,6 +5,7 @@ namespace DurableTask.Netherite { using DurableTask.Core; using DurableTask.Core.Common; + using DurableTask.Core.Entities; using DurableTask.Core.History; using DurableTask.Netherite.Abstractions; using DurableTask.Netherite.Faster; @@ -24,10 +25,12 @@ namespace DurableTask.Netherite /// Local partition of the distributed orchestration service. /// public class NetheriteOrchestrationService : - DurableTask.Core.IOrchestrationService, + DurableTask.Core.IOrchestrationService, DurableTask.Core.IOrchestrationServiceClient, DurableTask.Core.IOrchestrationServicePurgeClient, + DurableTask.Core.Query.IOrchestrationServiceQueryClient, DurableTask.Netherite.IOrchestrationServiceQueryClient, + DurableTask.Core.Entities.IEntityOrchestrationService, TransportAbstraction.IHost { /// @@ -43,6 +46,7 @@ namespace DurableTask.Netherite readonly ITransportLayer transport; readonly IStorageLayer storage; + readonly EntityBackendQueriesImplementation EntityBackendQueries; readonly WorkItemTraceHelper workItemTraceHelper; @@ -88,6 +92,8 @@ namespace DurableTask.Netherite internal WorkItemQueue ActivityWorkItemQueue { get; private set; } internal WorkItemQueue OrchestrationWorkItemQueue { get; private set; } + internal WorkItemQueue EntityWorkItemQueue { get; private set; } + internal LoadPublishWorker LoadPublisher { get; private set; } internal ILoggerFactory LoggerFactory { get; } @@ -124,7 +130,8 @@ namespace DurableTask.Netherite this.Settings = settings; this.TraceHelper = new OrchestrationServiceTraceHelper(loggerFactory, settings.LogLevelLimit, settings.WorkerId, settings.HubName); this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, settings.HubName); - + this.EntityBackendQueries = new EntityBackendQueriesImplementation(this); + try { this.TraceHelper.TraceProgress("Reading configuration for transport and storage layers"); @@ -379,6 +386,7 @@ namespace DurableTask.Netherite this.ActivityWorkItemQueue = new WorkItemQueue(); this.OrchestrationWorkItemQueue = new WorkItemQueue(); + this.EntityWorkItemQueue = new WorkItemQueue(); this.TraceHelper.TraceProgress($"Started client"); @@ -475,6 +483,7 @@ namespace DurableTask.Netherite await this.transport.StopAsync(fatalExceptionObserved: false); this.ActivityWorkItemQueue.Dispose(); + this.EntityWorkItemQueue.Dispose(); this.OrchestrationWorkItemQueue.Dispose(); } @@ -539,7 +548,7 @@ namespace DurableTask.Netherite TransportAbstraction.IPartition TransportAbstraction.IHost.AddPartition(uint partitionId, TransportAbstraction.ISender batchSender) { var partition = new Partition(this, partitionId, this.GetPartitionId, this.GetNumberPartitions, batchSender, this.Settings, this.StorageAccountName, - this.ActivityWorkItemQueue, this.OrchestrationWorkItemQueue, this.LoadPublisher, this.workItemTraceHelper); + this.ActivityWorkItemQueue, this.OrchestrationWorkItemQueue, this.EntityWorkItemQueue, this.LoadPublisher, this.workItemTraceHelper); return partition; } @@ -551,7 +560,7 @@ namespace DurableTask.Netherite IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId) { - return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this); + return new PartitionErrorHandler((int)partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this); } void TransportAbstraction.IHost.TraceWarning(string message) @@ -744,7 +753,30 @@ namespace DurableTask.Netherite /// async Task IOrchestrationServiceQueryClient.QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken) - => await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false); + { + return await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false); + } + + /// + async Task DurableTask.Core.Query.IOrchestrationServiceQueryClient.GetOrchestrationWithQueryAsync( + DurableTask.Core.Query.OrchestrationQuery query, + CancellationToken cancellationToken) + { + InstanceQuery instanceQuery = new() + { + CreatedTimeFrom = query.CreatedTimeFrom, + CreatedTimeTo = query.CreatedTimeTo, + ExcludeEntities = query.ExcludeEntities, + FetchInput = query.FetchInputsAndOutputs, + InstanceIdPrefix = query.InstanceIdPrefix, + PrefetchHistory = false, + RuntimeStatus = query.RuntimeStatus?.ToArray(), + }; + + Client client = await this.GetClientAsync().ConfigureAwait(false); + InstanceQueryResult result = await client.QueryOrchestrationStatesAsync(instanceQuery, query.PageSize, query.ContinuationToken, cancellationToken).ConfigureAwait(false); + return new DurableTask.Core.Query.OrchestrationQueryResult(result.Instances.ToList(), result.ContinuationToken); + } /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId) @@ -754,24 +786,191 @@ namespace DurableTask.Netherite async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) => new PurgeResult(await (await this.GetClientAsync()).PurgeInstanceHistoryAsync(purgeInstanceFilter.CreatedTimeFrom, purgeInstanceFilter.CreatedTimeTo, purgeInstanceFilter.RuntimeStatus)); + /// + EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => this.EntityBackendQueries; + + class EntityBackendQueriesImplementation : EntityBackendQueries + { + readonly NetheriteOrchestrationService service; + + public EntityBackendQueriesImplementation(NetheriteOrchestrationService netheriteOrchestrationService) + { + this.service = netheriteOrchestrationService; + } + public override async Task GetEntityAsync(EntityId id, bool includeState = false, bool includeTransient = false, CancellationToken cancellation = default) + { + string instanceId = id.ToString(); + OrchestrationState state = await(await this.service.GetClientAsync().ConfigureAwait(false)) + .GetOrchestrationStateAsync(this.service.GetPartitionId(instanceId.ToString()), instanceId, fetchInput: includeState, false).ConfigureAwait(false); + + return this.GetEntityMetadata(state, includeState, includeTransient); + } + + public override async Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) + { + string adjustedPrefix = string.IsNullOrEmpty(filter.InstanceIdStartsWith) ? "@" : filter.InstanceIdStartsWith; + + if (adjustedPrefix[0] != '@') + { + return new EntityQueryResult() + { + Results = new List(), + ContinuationToken = null, + }; + } + + var condition = new InstanceQuery() + { + InstanceIdPrefix = adjustedPrefix, + CreatedTimeFrom = filter.LastModifiedFrom, + CreatedTimeTo = filter.LastModifiedTo, + FetchInput = filter.IncludeState, + PrefetchHistory = false, + ExcludeEntities = false, + }; + + List metadataList = new List(); + + InstanceQueryResult result = await (await this.service.GetClientAsync().ConfigureAwait(false)) + .QueryOrchestrationStatesAsync(condition, filter.PageSize ?? 200, filter.ContinuationToken, cancellation).ConfigureAwait(false); + + foreach(var entry in result.Instances) + { + var metadata = this.GetEntityMetadata(entry, filter.IncludeState, filter.IncludeTransient); + if (metadata.HasValue) + { + metadataList.Add(metadata.Value); + } + } + + return new EntityQueryResult() + { + Results = metadataList, + ContinuationToken = result.ContinuationToken, + }; + } + + public override async Task CleanEntityStorageAsync(CleanEntityStorageRequest request = default, CancellationToken cancellation = default) + { + if (!request.ReleaseOrphanedLocks) + { + // there is no need to do anything since deletion is implicit + return new CleanEntityStorageResult(); + } + + var condition = new InstanceQuery() + { + InstanceIdPrefix = "@", + FetchInput = false, + PrefetchHistory = false, + ExcludeEntities = false, + }; + + var client = await this.service.GetClientAsync().ConfigureAwait(false); + + string continuationToken = null; + int orphanedLocksReleased = 0; + + // list all entities (without fetching the input) and for each locked one, + // check if the lock owner is still running. If not, release the lock. + do + { + var page = await client.QueryOrchestrationStatesAsync(condition, 500, continuationToken, cancellation).ConfigureAwait(false); + + // The checks run in parallel for all entities in the page + List tasks = new List(); + foreach (var state in page.Instances) + { + EntityStatus status = ClientEntityHelpers.GetEntityStatus(state.Status); + if (status != null && status.LockedBy != null) + { + tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy)); + } + } + + async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner) + { + uint partitionId = this.service.GetPartitionId(lockOwner); + + OrchestrationState ownerState + = await client.GetOrchestrationStateAsync(partitionId, lockOwner, fetchInput: false, fetchOutput: false); + + bool OrchestrationIsRunning(OrchestrationStatus? status) + => status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended); + + if (!OrchestrationIsRunning(ownerState?.OrchestrationStatus)) + { + // the owner is not a running orchestration. Send a lock release. + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner); + await client.SendTaskOrchestrationMessageBatchAsync( + this.service.GetPartitionId(state.OrchestrationInstance.InstanceId), + new TaskMessage[] { eventToSend.AsTaskMessage() }); + + Interlocked.Increment(ref orphanedLocksReleased); + } + } + + // wait for all of the checks to finish before moving on to the next page. + await Task.WhenAll(tasks); + } + while (continuationToken != null); + + return new CleanEntityStorageResult() + { + EmptyEntitiesRemoved = 0, + OrphanedLocksReleased = orphanedLocksReleased, + }; + } + + EntityMetadata? GetEntityMetadata(OrchestrationState state, bool includeState, bool includeTransient) + { + if (state != null) + { + // determine the status of the entity by deserializing the custom status field + EntityStatus status = ClientEntityHelpers.GetEntityStatus(state.Status); + + if (status?.EntityExists == true || includeTransient) + { + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + SerializedState = (includeState && status?.EntityExists == true) ? ClientEntityHelpers.GetEntityState(state.Input) : null, + LockedBy = status?.LockedBy, + BacklogQueueSize = status?.BacklogQueueSize ?? 0, + }; + } + } + + return null; + } + } + /******************************/ // Task orchestration methods /******************************/ - async Task IOrchestrationService.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan receiveTimeout, - CancellationToken cancellationToken) + Task IOrchestrationService.LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.LockNextWorkItemInternal(this.OrchestrationWorkItemQueue, receiveTimeout, cancellationToken); + + Task IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.LockNextWorkItemInternal(this.OrchestrationWorkItemQueue, receiveTimeout, cancellationToken); + + Task IEntityOrchestrationService.LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.LockNextWorkItemInternal(this.EntityWorkItemQueue, receiveTimeout, cancellationToken); + + async Task LockNextWorkItemInternal(WorkItemQueue workItemQueue, TimeSpan receiveTimeout, CancellationToken cancellationToken) { - var nextOrchestrationWorkItem = await this.OrchestrationWorkItemQueue.GetNext(receiveTimeout, cancellationToken).ConfigureAwait(false); + var nextOrchestrationWorkItem = await workItemQueue.GetNext(receiveTimeout, cancellationToken).ConfigureAwait(false); if (nextOrchestrationWorkItem != null) { nextOrchestrationWorkItem.MessageBatch.WaitingSince = null; this.workItemTraceHelper.TraceWorkItemStarted( - nextOrchestrationWorkItem.Partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + nextOrchestrationWorkItem.Partition.PartitionId, + nextOrchestrationWorkItem.WorkItemType, nextOrchestrationWorkItem.MessageBatch.WorkItemId, nextOrchestrationWorkItem.MessageBatch.InstanceId, nextOrchestrationWorkItem.Type.ToString(), @@ -857,7 +1056,7 @@ namespace DurableTask.Netherite // It's unavoidable by design, but let's at least create a warning. this.workItemTraceHelper.TraceWorkItemDiscarded( partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + orchestrationWorkItem.WorkItemType, messageBatch.WorkItemId, workItem.InstanceId, "", @@ -899,7 +1098,7 @@ namespace DurableTask.Netherite this.workItemTraceHelper.TraceWorkItemCompleted( partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + orchestrationWorkItem.WorkItemType, messageBatch.WorkItemId, workItem.InstanceId, batchProcessedEvent.OrchestrationStatus, @@ -1077,5 +1276,15 @@ namespace DurableTask.Netherite int IOrchestrationService.MaxConcurrentTaskActivityWorkItems => this.Settings.MaxConcurrentActivityFunctions; int IOrchestrationService.TaskActivityDispatcherCount => this.Settings.ActivityDispatcherCount; + + EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties() + { + EntityMessageReorderWindow = TimeSpan.Zero, + MaxConcurrentTaskEntityWorkItems = this.Settings.MaxConcurrentEntityFunctions, + MaxEntityOperationBatchSize = this.Settings.MaxEntityOperationBatchSize, + MaximumSignalDelayTime = TimeSpan.MaxValue, + SupportsImplicitEntityDeletion = true, + UseSeparateQueueForEntityWorkItems = this.Settings.UseSeparateQueueForEntityWorkItems, + }; } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 46a14f8..52fc254 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -18,7 +18,8 @@ namespace DurableTask.Netherite public class NetheriteOrchestrationServiceSettings { /// - /// The name of the taskhub. Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. + /// The name of the taskhub. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. /// public string HubName { get; set; } @@ -57,19 +58,40 @@ namespace DurableTask.Netherite public Faster.BlobManager.FasterTuningParameters FasterTuningParameters { get; set; } = null; /// - /// Gets or sets the maximum number of work items that can be processed concurrently on a single node. + /// Gets or sets the maximum number of activity work items that can be processed concurrently on a single node. /// The default value is 100. - /// Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. /// public int MaxConcurrentActivityFunctions { get; set; } = 100; /// - /// Gets or sets the maximum number of orchestrations that can be processed concurrently on a single node. + /// Gets or sets the maximum number of orchestration work items that can be processed concurrently on a single node. /// The default value is 100. - /// Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. /// public int MaxConcurrentOrchestratorFunctions { get; set; } = 100; + /// + /// Gets or sets the maximum number of entity work items that can be processed concurrently on a single node. + /// The default value is 100. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. + /// + public int MaxConcurrentEntityFunctions { get; set; } = 100; + + /// + /// Whether to use separate work item queues for entities and orchestrators. + /// This defaults to false, to maintain compatility with legacy front ends. + /// Newer front ends explicitly set this to true. + /// + public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Gets or sets the maximum number of entity operations that are processed as a single batch. + /// The default value is 1000. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. + /// + public int MaxEntityOperationBatchSize { get; set; } = 1000; + /// /// Gets or sets the number of dispatchers used to dispatch orchestrations. /// diff --git a/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs b/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs index eab9bf6..11cb5f0 100644 --- a/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs +++ b/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs @@ -32,6 +32,8 @@ namespace DurableTask.Netherite public override bool RestoreOriginalRuntimeStateDuringCompletion => false; + public WorkItemTraceHelper.WorkItemType WorkItemType => DurableTask.Core.Common.Entities.IsEntityInstance(this.InstanceId) ? WorkItemTraceHelper.WorkItemType.Entity : WorkItemTraceHelper.WorkItemType.Orchestration; + public OrchestrationWorkItem(Partition partition, OrchestrationMessageBatch messageBatch, List previousHistory = null, string customStatus = null) { this.Partition = partition; diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 7f641cc..a90343e 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -46,6 +46,7 @@ namespace DurableTask.Netherite public TransportAbstraction.ISender BatchSender { get; private set; } public WorkItemQueue ActivityWorkItemQueue { get; private set; } public WorkItemQueue OrchestrationWorkItemQueue { get; private set; } + public WorkItemQueue EntityWorkItemQueue { get; private set; } public LoadPublishWorker LoadPublisher { get; private set; } public BatchTimer PendingTimers { get; private set; } @@ -70,6 +71,7 @@ namespace DurableTask.Netherite string storageAccountName, WorkItemQueue activityWorkItemQueue, WorkItemQueue orchestrationWorkItemQueue, + WorkItemQueue entityWorkItemQueue, LoadPublishWorker loadPublisher, WorkItemTraceHelper workItemTraceHelper) @@ -83,6 +85,7 @@ namespace DurableTask.Netherite this.StorageAccountName = storageAccountName; this.ActivityWorkItemQueue = activityWorkItemQueue; this.OrchestrationWorkItemQueue = orchestrationWorkItemQueue; + this.EntityWorkItemQueue = entityWorkItemQueue; this.LoadPublisher = loadPublisher; this.TraceHelper = new PartitionTraceHelper(host.TraceHelper.Logger, settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this.PartitionId); this.EventTraceHelper = new EventTraceHelper(host.LoggerFactory, settings.EventLogLevelLimit, this); @@ -326,14 +329,21 @@ namespace DurableTask.Netherite { this.WorkItemTraceHelper.TraceWorkItemQueued( this.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + item.WorkItemType, item.MessageBatch.WorkItemId, item.InstanceId, item.Type.ToString(), item.EventCount, WorkItemTraceHelper.FormatMessageIdList(item.MessageBatch.TracedMessages)); - this.OrchestrationWorkItemQueue.Add(item); + if (this.Settings.UseSeparateQueueForEntityWorkItems && item.WorkItemType == WorkItemTraceHelper.WorkItemType.Entity) + { + this.EntityWorkItemQueue.Add(item); + } + else + { + this.OrchestrationWorkItemQueue.Add(item); + } } } } diff --git a/src/DurableTask.Netherite/PartitionState/SessionsState.cs b/src/DurableTask.Netherite/PartitionState/SessionsState.cs index f37f096..02f25e3 100644 --- a/src/DurableTask.Netherite/PartitionState/SessionsState.cs +++ b/src/DurableTask.Netherite/PartitionState/SessionsState.cs @@ -327,7 +327,7 @@ namespace DurableTask.Netherite { this.Partition.WorkItemTraceHelper.TraceWorkItemDiscarded( this.Partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + DurableTask.Core.Common.Entities.IsEntityInstance(evt.InstanceId) ? WorkItemTraceHelper.WorkItemType.Entity : WorkItemTraceHelper.WorkItemType.Orchestration, evt.WorkItemId, evt.InstanceId, session != null ? this.GetSessionPosition(session) : null, "session was replaced"); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 8d99045..193f0cd 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -974,8 +974,8 @@ namespace DurableTask.Netherite.Faster { while (enumerator.MoveNext()) { - if (!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) - && !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix)) + if ((!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) && !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix)) + || (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(enumerator.Current))) { // the instance does not match the prefix continue; @@ -1194,8 +1194,8 @@ namespace DurableTask.Netherite.Faster scanned++; //this.partition.EventDetailTracer?.TraceEventProcessingDetail($"found instance {key.InstanceId}"); - if (string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) - || key.Val.InstanceId.StartsWith(instanceQuery.InstanceIdPrefix)) + if ((string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) || key.Val.InstanceId.StartsWith(instanceQuery.InstanceIdPrefix)) + || (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(key.Val.InstanceId))) { //this.partition.EventDetailTracer?.TraceEventProcessingDetail($"reading instance {key.InstanceId}"); diff --git a/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs b/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs index 8c8e390..0def4ca 100644 --- a/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs +++ b/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs @@ -57,7 +57,8 @@ namespace DurableTask.Netherite None, Client, Activity, - Orchestration + Orchestration, + Entity, } public enum ClientStatus diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index a2b96cf..fdc40ef 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -9,6 +9,7 @@ + @@ -24,7 +25,7 @@ - + diff --git a/test/LoadGeneratorApp/LoadGeneratorApp.csproj b/test/LoadGeneratorApp/LoadGeneratorApp.csproj index 512d9cc..6c7c37a 100644 --- a/test/LoadGeneratorApp/LoadGeneratorApp.csproj +++ b/test/LoadGeneratorApp/LoadGeneratorApp.csproj @@ -5,7 +5,7 @@ - + diff --git a/test/PerformanceTests/PerformanceTests.csproj b/test/PerformanceTests/PerformanceTests.csproj index 38d9dee..f0a0807 100644 --- a/test/PerformanceTests/PerformanceTests.csproj +++ b/test/PerformanceTests/PerformanceTests.csproj @@ -6,7 +6,7 @@ - + From f8b5634746585c92a59fa0019c39930925c28aa9 Mon Sep 17 00:00:00 2001 From: Naiyuan Tian <110135109+nytian@users.noreply.github.com> Date: Tue, 30 Apr 2024 13:49:52 -0700 Subject: [PATCH 05/10] Update Durable Dependencies to Latest Version (#389) * Update DurableTask.Netherite.AzureFunctions.csproj * increase durabletask extension ver and patch * incre durabletask.core ver * remove typo --- samples/Hello_Netherite_with_DotNetCore/HelloDF.csproj | 2 +- .../DurableTask.Netherite.AzureFunctions.csproj | 6 +++--- src/DurableTask.Netherite/DurableTask.Netherite.csproj | 4 ++-- ...Functions.Worker.Extensions.DurableTask.Netherite.csproj | 2 +- .../DurableTask.Netherite.AzureFunctions.Tests.csproj | 4 ++-- test/LoadGeneratorApp/LoadGeneratorApp.csproj | 2 +- test/PerformanceTests/PerformanceTests.csproj | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/samples/Hello_Netherite_with_DotNetCore/HelloDF.csproj b/samples/Hello_Netherite_with_DotNetCore/HelloDF.csproj index 20ad7d4..abe2989 100644 --- a/samples/Hello_Netherite_with_DotNetCore/HelloDF.csproj +++ b/samples/Hello_Netherite_with_DotNetCore/HelloDF.csproj @@ -5,7 +5,7 @@ - + diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 5d2950b..d26eb1f 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -26,7 +26,7 @@ 1 5 - 0 + 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 @@ -51,8 +51,8 @@ - - + + diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 9682550..2988cad 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -26,7 +26,7 @@ 1 5 - 0 + 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 @@ -56,7 +56,7 @@ - + diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj index 59a6d14..1ea2842 100644 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj +++ b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj @@ -27,7 +27,7 @@ 1 5 - 0 + 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index fdc40ef..3c535f9 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -9,7 +9,7 @@ - + @@ -25,7 +25,7 @@ - + diff --git a/test/LoadGeneratorApp/LoadGeneratorApp.csproj b/test/LoadGeneratorApp/LoadGeneratorApp.csproj index 6c7c37a..31c4404 100644 --- a/test/LoadGeneratorApp/LoadGeneratorApp.csproj +++ b/test/LoadGeneratorApp/LoadGeneratorApp.csproj @@ -5,7 +5,7 @@ - + diff --git a/test/PerformanceTests/PerformanceTests.csproj b/test/PerformanceTests/PerformanceTests.csproj index f0a0807..6ae9559 100644 --- a/test/PerformanceTests/PerformanceTests.csproj +++ b/test/PerformanceTests/PerformanceTests.csproj @@ -6,7 +6,7 @@ - + From beb84a0a7c52246164588c280ccc0ef0d55a3409 Mon Sep 17 00:00:00 2001 From: Jaliya Udagedara Date: Sat, 4 May 2024 11:31:10 +1200 Subject: [PATCH 06/10] Cleanup unnecessary references. (#388) --- .../DurableTask.Netherite.AzureFunctions.Tests.csproj | 2 -- test/PerformanceTests/PerformanceTests.csproj | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index 3c535f9..6dbd5c9 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -9,7 +9,6 @@ - @@ -25,7 +24,6 @@ - diff --git a/test/PerformanceTests/PerformanceTests.csproj b/test/PerformanceTests/PerformanceTests.csproj index 6ae9559..a20d999 100644 --- a/test/PerformanceTests/PerformanceTests.csproj +++ b/test/PerformanceTests/PerformanceTests.csproj @@ -6,10 +6,9 @@ - - + From aa124cdf5046759c8bffabba4a167b1087b039f1 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 7 May 2024 16:30:59 -0700 Subject: [PATCH 07/10] rev FASTER.Core to 2.6.5 (#395) --- src/DurableTask.Netherite/DurableTask.Netherite.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 2988cad..4629c4b 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -57,7 +57,7 @@ - + From 51c6b957c7f7c10ec8120b9619c916ea5730e587 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 9 May 2024 15:19:36 -0700 Subject: [PATCH 08/10] Auto-generate AssemblyInfo + refactor version into common props. (#393) --- DurableTask.Netherite.sln | 5 +++- ...urableTask.Netherite.AzureFunctions.csproj | 14 +++-------- .../DurableTask.Netherite.csproj | 14 ++--------- ...er.Extensions.DurableTask.Netherite.csproj | 24 +++++++++---------- .../Properties/AssemblyInfo.cs | 7 ------ src/common.props | 14 +++++++++++ 6 files changed, 35 insertions(+), 43 deletions(-) delete mode 100644 src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs create mode 100644 src/common.props diff --git a/DurableTask.Netherite.sln b/DurableTask.Netherite.sln index 4297106..08ee2a5 100644 --- a/DurableTask.Netherite.sln +++ b/DurableTask.Netherite.sln @@ -24,6 +24,9 @@ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PerformanceTests", "test\PerformanceTests\PerformanceTests.csproj", "{DD1E1B3F-4FA2-4F3A-9AE1-6B2A0B864AAF}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{D33AB157-04B9-4BAD-B580-C3C87C17828C}" + ProjectSection(SolutionItems) = preProject + src\common.props = src\common.props + EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}" EndProject @@ -41,7 +44,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDF", "sample EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TokenCredentialDTFx", "samples\TokenCredentialDTFx\TokenCredentialDTFx.csproj", "{FBFF0814-E6C0-489A-ACCF-9D0699219621}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Functions.Worker.Extensions.DurableTask.Netherite", "src\Functions.Worker.Extensions.DurableTask.Netherite\Functions.Worker.Extensions.DurableTask.Netherite.csproj", "{3E17402B-3F65-4E5B-B752-48AD56B81208}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Functions.Worker.Extensions.DurableTask.Netherite", "src\Functions.Worker.Extensions.DurableTask.Netherite\Functions.Worker.Extensions.DurableTask.Netherite.csproj", "{3E17402B-3F65-4E5B-B752-48AD56B81208}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index d26eb1f..3df2e8d 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -22,17 +22,9 @@ icon.png - - - 1 - 5 - 1 - $(MajorVersion).$(MinorVersion).$(PatchVersion) - - $(MajorVersion).0.0.0 - .$(GITHUB_RUN_NUMBER) - $(VersionPrefix)$(BuildSuffix) - + + + diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 4629c4b..91e629f 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -22,18 +22,8 @@ true - - - 1 - 5 - 1 - $(MajorVersion).$(MinorVersion).$(PatchVersion) - - $(MajorVersion).0.0.0 - .$(GITHUB_RUN_NUMBER) - $(VersionPrefix)$(BuildSuffix) - - + + diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj index 1ea2842..7806b68 100644 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj +++ b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj @@ -22,18 +22,8 @@ icon.png - - - - 1 - 5 - 1 - $(MajorVersion).$(MinorVersion).$(PatchVersion) - - $(MajorVersion).0.0.0 - .$(GITHUB_RUN_NUMBER) - $(VersionPrefix)$(BuildSuffix) - + + @@ -50,4 +40,14 @@ + + + + <_Parameter1>Microsoft.Azure.DurableTask.Netherite.AzureFunctions + <_Parameter2>$(PackageVersion) + <_Parameter3>true + <_Parameter3_IsLiteral>true + + + diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs b/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs deleted file mode 100644 index 70f63c1..0000000 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; - -// This must be updated when updating the version of the package -[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.5.0", true)] \ No newline at end of file diff --git a/src/common.props b/src/common.props new file mode 100644 index 0000000..200b1b5 --- /dev/null +++ b/src/common.props @@ -0,0 +1,14 @@ + + + + + 1 + 5 + 1 + $(MajorVersion).$(MinorVersion).$(PatchVersion) + + $(MajorVersion).0.0.0 + .$(GITHUB_RUN_NUMBER) + $(VersionPrefix)$(BuildSuffix) + + \ No newline at end of file From 122885a5f85557844b5f11f958ab4295bd96d767 Mon Sep 17 00:00:00 2001 From: Jaliya Udagedara Date: Fri, 31 May 2024 10:52:22 +1200 Subject: [PATCH 09/10] Add initial integration tests for Durable Entities. (#391) --- .../CoreScenarios.cs | 104 ++++++++++++++++++ .../IntegrationTestBase.cs | 2 +- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/CoreScenarios.cs b/test/DurableTask.Netherite.AzureFunctions.Tests/CoreScenarios.cs index ecd3423..88d7dcc 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/CoreScenarios.cs +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/CoreScenarios.cs @@ -63,6 +63,44 @@ namespace DurableTask.Netherite.AzureFunctions.Tests actual: status.Output?.ToString(Formatting.None)); } + [Fact] + public async Task CanOrchestrateEntities() + { + DurableOrchestrationStatus status = await this.RunOrchestrationAsync(nameof(Functions.OrchestrateCounterEntity)); + Assert.Equal(OrchestrationRuntimeStatus.Completed, status.RuntimeStatus); + Assert.Equal(7, (int)status.Output); + } + + [Fact] + public async Task CanClientInteractWithEntities() + { + IDurableClient client = await this.GetDurableClientAsync(); + + var entityId = new EntityId(nameof(Functions.Counter), Guid.NewGuid().ToString("N")); + EntityStateResponse result = await client.ReadEntityStateAsync(entityId); + Assert.False(result.EntityExists); + + await Task.WhenAll( + client.SignalEntityAsync(entityId, "incr"), + client.SignalEntityAsync(entityId, "incr"), + client.SignalEntityAsync(entityId, "incr"), + client.SignalEntityAsync(entityId, "add", 4)); + + await Task.Delay(TimeSpan.FromSeconds(5)); + + result = await client.ReadEntityStateAsync(entityId); + Assert.True(result.EntityExists); + Assert.Equal(7, result.EntityState); + } + + [Fact] + public async Task CanOrchestrationInteractWithEntities() + { + DurableOrchestrationStatus status = await this.RunOrchestrationAsync(nameof(Functions.IncrementThenGet)); + Assert.Equal(OrchestrationRuntimeStatus.Completed, status.RuntimeStatus); + Assert.Equal(1, (int)status.Output); + } + static class Functions { [FunctionName(nameof(Sequence))] @@ -99,6 +137,72 @@ namespace DurableTask.Netherite.AzureFunctions.Tests [FunctionName(nameof(IntToString))] public static string IntToString([ActivityTrigger] int input) => input.ToString(); + + + [FunctionName(nameof(OrchestrateCounterEntity))] + public static async Task OrchestrateCounterEntity( + [OrchestrationTrigger] IDurableOrchestrationContext ctx) + { + var entityId = new EntityId(nameof(Counter), ctx.NewGuid().ToString("N")); + ctx.SignalEntity(entityId, "incr"); + ctx.SignalEntity(entityId, "incr"); + ctx.SignalEntity(entityId, "incr"); + ctx.SignalEntity(entityId, "add", 4); + + using (await ctx.LockAsync(entityId)) + { + int result = await ctx.CallEntityAsync(entityId, "get"); + return result; + } + } + + [FunctionName(nameof(Counter))] + public static void Counter([EntityTrigger] IDurableEntityContext ctx) + { + int current = ctx.GetState(); + switch (ctx.OperationName) + { + case "incr": + ctx.SetState(current + 1); + break; + case "add": + int amount = ctx.GetInput(); + ctx.SetState(current + amount); + break; + case "get": + ctx.Return(current); + break; + case "set": + amount = ctx.GetInput(); + ctx.SetState(amount); + break; + case "delete": + ctx.DeleteState(); + break; + default: + throw new NotImplementedException("No such entity operation"); + } + } + + [FunctionName(nameof(IncrementThenGet))] + public static async Task IncrementThenGet([OrchestrationTrigger] IDurableOrchestrationContext context) + { + // Key needs to be pseudo-random to avoid conflicts with multiple test runs. + string key = context.NewGuid().ToString().Substring(0, 8); + EntityId entityId = new EntityId(nameof(Counter), key); + + context.SignalEntity(entityId, "add", 1); + + // Invoking a sub-orchestration as a regression test for https://github.com/microsoft/durabletask-mssql/issues/146 + return await context.CallSubOrchestratorAsync(nameof(GetEntityAsync), entityId); + } + + [FunctionName(nameof(GetEntityAsync))] + public static async Task GetEntityAsync([OrchestrationTrigger] IDurableOrchestrationContext context) + { + EntityId entityId = context.GetInput(); + return await context.CallEntityAsync(entityId, "get"); + } } } } diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/IntegrationTestBase.cs b/test/DurableTask.Netherite.AzureFunctions.Tests/IntegrationTestBase.cs index db1dba7..74d31d0 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/IntegrationTestBase.cs +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/IntegrationTestBase.cs @@ -140,7 +140,7 @@ namespace DurableTask.Netherite.AzureFunctions.Tests return await client.PurgeInstanceHistoryAsync(default, default, null); } - async Task GetDurableClientAsync() + protected async Task GetDurableClientAsync() { var clientRef = new IDurableClient[1]; await this.CallFunctionAsync(nameof(ClientFunctions.GetDurableClient), "clientRef", clientRef); From f3e5f2621f05d615d52fd6005bf9c6d6d811ce99 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 30 May 2024 16:41:36 -0700 Subject: [PATCH 10/10] Add code mirror pipeline (#399) * add code mirror pipeline * move within eng/ci as per convention --- eng/ci/code-mirror.yml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 eng/ci/code-mirror.yml diff --git a/eng/ci/code-mirror.yml b/eng/ci/code-mirror.yml new file mode 100644 index 0000000..f884e10 --- /dev/null +++ b/eng/ci/code-mirror.yml @@ -0,0 +1,19 @@ +trigger: + branches: + include: + # These are the branches we'll mirror to our internal ADO instance + # Keep this set limited as appropriate (don't mirror individual user branches). + - main + +resources: + repositories: + - repository: eng + type: git + name: engineering + ref: refs/tags/release + +variables: + - template: ci/variables/cfs.yml@eng + +extends: + template: ci/code-mirror.yml@eng