This commit is contained in:
sebastianburckhardt 2023-11-01 15:55:34 -07:00
Родитель 2a64e9eba8
Коммит e11afe7594
2 изменённых файлов: 34 добавлений и 19 удалений

Просмотреть файл

@ -55,7 +55,10 @@ namespace DurableTask.Netherite
// A little helper property that allows us to conventiently check the condition for low-level event tracing
public EventTraceHelper EventDetailTracer => this.EventTraceHelper.IsTracingAtMostDetailedLevel ? this.EventTraceHelper : null;
static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(5);
// We use this semaphore to limit how many partitions can be starting up at the same time on the same host
// because starting up a partition may temporarily consume a lot of CPU, I/O, and memory
const int ConcurrentStartsLimit = 5;
static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(ConcurrentStartsLimit);
public Partition(
NetheriteOrchestrationService host,
@ -108,7 +111,7 @@ namespace DurableTask.Netherite
// before we start the partition, we have to acquire the MaxConcurrentStarts semaphore
// (to prevent a host from being overwhelmed by the simultaneous start of too many partitions)
this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, "");
this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, $"max={ConcurrentStartsLimit} available={MaxConcurrentStarts.CurrentCount}");
await MaxConcurrentStarts.WaitAsync();
try
@ -117,9 +120,8 @@ namespace DurableTask.Netherite
(long, int) inputQueuePosition;
using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes)))
await using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes)))
{
// create or restore partition state from last snapshot
// create the state
this.State = ((TransportAbstraction.IHost)this.host).StorageLayer.CreatePartitionState(parameters);

Просмотреть файл

@ -11,9 +11,9 @@ namespace DurableTask.Netherite
/// <summary>
/// A utility class for terminating the partition if some task takes too long.
/// Implemented as a disposable, with <see cref="IDisposable.Dispose"> used to cancel the timeout.
/// Implemented as a disposable, with <see cref="IAsyncDisposable.DisposeAsync"> used to cancel the timeout.
/// </summary>
class PartitionTimeout : IDisposable
class PartitionTimeout : IAsyncDisposable
{
readonly CancellationTokenSource tokenSource;
readonly Task timeoutTask;
@ -21,26 +21,39 @@ namespace DurableTask.Netherite
public PartitionTimeout(IPartitionErrorHandler errorHandler, string task, TimeSpan timeout)
{
this.tokenSource = new CancellationTokenSource();
this.timeoutTask = Task.Delay(timeout, this.tokenSource.Token);
this.timeoutTask = Task.Run(async () =>
{
try
{
await Task.Delay(timeout, this.tokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// we did not time out
return;
}
// if the timeout tasks runs to completion without being cancelled, terminate the partition
this.timeoutTask.ContinueWith(
_ => errorHandler.HandleError(
$"{nameof(PartitionTimeout)}",
$"{task} timed out after {timeout}",
e: null,
terminatePartition: true,
reportAsWarning: false),
TaskContinuationOptions.OnlyOnRanToCompletion);
errorHandler.HandleError(
$"{nameof(PartitionTimeout)}",
$"{task} timed out after {timeout}",
e: null,
terminatePartition: true,
reportAsWarning: false);
});
}
public void Dispose()
public async ValueTask DisposeAsync()
{
// cancel the timeout task (if it has not already completed)
this.tokenSource.Cancel();
// dispose the token source after the timeout task has completed
this.timeoutTask.ContinueWith(_ => this.tokenSource.Dispose());
// wait for the timeouttask to complete here, so we can be sure that the
// decision about the timeout firing or not firing has been made
// before we leave this method
await this.timeoutTask.ConfigureAwait(false);
// we can dispose the token source now since the timeoutTask is completed
this.tokenSource.Dispose();
}
}
}