From e11afe7594bcd43b6aa6e2ab8d6210366073139a Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 1 Nov 2023 15:55:34 -0700 Subject: [PATCH] address PR feedback --- .../OrchestrationService/Partition.cs | 10 +++-- .../Util/PartitionTimeout.cs | 43 ++++++++++++------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index cf21629..7f641cc 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -55,7 +55,10 @@ namespace DurableTask.Netherite // A little helper property that allows us to conventiently check the condition for low-level event tracing public EventTraceHelper EventDetailTracer => this.EventTraceHelper.IsTracingAtMostDetailedLevel ? this.EventTraceHelper : null; - static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(5); + // We use this semaphore to limit how many partitions can be starting up at the same time on the same host + // because starting up a partition may temporarily consume a lot of CPU, I/O, and memory + const int ConcurrentStartsLimit = 5; + static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(ConcurrentStartsLimit); public Partition( NetheriteOrchestrationService host, @@ -108,7 +111,7 @@ namespace DurableTask.Netherite // before we start the partition, we have to acquire the MaxConcurrentStarts semaphore // (to prevent a host from being overwhelmed by the simultaneous start of too many partitions) - this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, ""); + this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, $"max={ConcurrentStartsLimit} available={MaxConcurrentStarts.CurrentCount}"); await MaxConcurrentStarts.WaitAsync(); try @@ -117,9 +120,8 @@ namespace DurableTask.Netherite (long, int) inputQueuePosition; - using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes))) + await using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes))) { - // create or restore partition state from last snapshot // create the state this.State = ((TransportAbstraction.IHost)this.host).StorageLayer.CreatePartitionState(parameters); diff --git a/src/DurableTask.Netherite/Util/PartitionTimeout.cs b/src/DurableTask.Netherite/Util/PartitionTimeout.cs index 7272e34..e4c32b6 100644 --- a/src/DurableTask.Netherite/Util/PartitionTimeout.cs +++ b/src/DurableTask.Netherite/Util/PartitionTimeout.cs @@ -11,9 +11,9 @@ namespace DurableTask.Netherite /// /// A utility class for terminating the partition if some task takes too long. - /// Implemented as a disposable, with used to cancel the timeout. + /// Implemented as a disposable, with used to cancel the timeout. /// - class PartitionTimeout : IDisposable + class PartitionTimeout : IAsyncDisposable { readonly CancellationTokenSource tokenSource; readonly Task timeoutTask; @@ -21,26 +21,39 @@ namespace DurableTask.Netherite public PartitionTimeout(IPartitionErrorHandler errorHandler, string task, TimeSpan timeout) { this.tokenSource = new CancellationTokenSource(); - this.timeoutTask = Task.Delay(timeout, this.tokenSource.Token); + this.timeoutTask = Task.Run(async () => + { + try + { + await Task.Delay(timeout, this.tokenSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // we did not time out + return; + } - // if the timeout tasks runs to completion without being cancelled, terminate the partition - this.timeoutTask.ContinueWith( - _ => errorHandler.HandleError( - $"{nameof(PartitionTimeout)}", - $"{task} timed out after {timeout}", - e: null, - terminatePartition: true, - reportAsWarning: false), - TaskContinuationOptions.OnlyOnRanToCompletion); + errorHandler.HandleError( + $"{nameof(PartitionTimeout)}", + $"{task} timed out after {timeout}", + e: null, + terminatePartition: true, + reportAsWarning: false); + }); } - public void Dispose() + public async ValueTask DisposeAsync() { // cancel the timeout task (if it has not already completed) this.tokenSource.Cancel(); - // dispose the token source after the timeout task has completed - this.timeoutTask.ContinueWith(_ => this.tokenSource.Dispose()); + // wait for the timeouttask to complete here, so we can be sure that the + // decision about the timeout firing or not firing has been made + // before we leave this method + await this.timeoutTask.ConfigureAwait(false); + + // we can dispose the token source now since the timeoutTask is completed + this.tokenSource.Dispose(); } } }