fix various issues with client shutdown

This commit is contained in:
sebastianburckhardt 2023-03-17 10:20:13 -07:00
Родитель 4b9a83d345
Коммит 2aa4c49e6a
5 изменённых файлов: 137 добавлений и 31 удалений

Просмотреть файл

@ -23,12 +23,12 @@ namespace DurableTask.Netherite
class Client : TransportAbstraction.IClient
{
readonly NetheriteOrchestrationService host;
readonly CancellationToken shutdownToken;
readonly ClientTraceHelper traceHelper;
readonly string account;
readonly Guid taskHubGuid;
readonly WorkItemTraceHelper workItemTraceHelper;
readonly Stopwatch workItemStopwatch;
readonly CancellationTokenSource cts;
static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5);
@ -37,6 +37,8 @@ namespace DurableTask.Netherite
long SequenceNumber; // for numbering requests that enter on this client
volatile bool allRemainingRequestsAreNowBeingCancelled = false; // set when entering the final stage of shutdown
readonly BatchTimer<PendingRequest> ResponseTimeouts;
readonly ConcurrentDictionary<long, PendingRequest> ResponseWaiters;
readonly Dictionary<string, (MemoryStream, int)> Fragments;
@ -59,8 +61,8 @@ namespace DurableTask.Netherite
this.workItemTraceHelper = workItemTraceHelper;
this.account = host.StorageAccountName;
this.BatchSender = batchSender;
this.shutdownToken = shutdownToken;
this.ResponseTimeouts = new BatchTimer<PendingRequest>(this.shutdownToken, this.Timeout, this.traceHelper.TraceTimerProgress);
this.cts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken);
this.ResponseTimeouts = new BatchTimer<PendingRequest>(this.cts.Token, this.Timeout, this.traceHelper.TraceTimerProgress);
this.ResponseWaiters = new ConcurrentDictionary<long, PendingRequest>();
this.Fragments = new Dictionary<string, (MemoryStream, int)>();
this.QueryResponses = new Dictionary<long, QueryResponseReceived>();
@ -71,10 +73,35 @@ namespace DurableTask.Netherite
this.traceHelper.TraceProgress("Started");
}
public Task StopAsync()
public async Task StopAsync()
{
this.traceHelper.TraceProgress("Stopped");
return Task.CompletedTask;
this.traceHelper.TraceProgress("Stopping");
// cancel the token, if not already cancelled.
this.cts.Cancel();
await this.ResponseTimeouts.StopAsync();
// We now enter the final stage of client shutdown, where we forcefully cancel
// all requests that have not completed yet. We do this as late as possible in the shutdown
// process, so that requests still have a chance to successfully complete as long as possible.
this.allRemainingRequestsAreNowBeingCancelled = true;
while (true)
{
var entry = this.ResponseWaiters.GetEnumerator();
if (entry.MoveNext())
{
entry.Current.Value.TryCancel(shutdownException);
}
else
{
break;
}
}
this.cts.Dispose();
this.traceHelper.TraceProgress("Stopped");
}
public void ReportTransportError(string message, Exception e)
@ -220,13 +247,14 @@ namespace DurableTask.Netherite
const long ticksPerBucket = 2 * TimeSpan.TicksPerSecond;
DateTime GetTimeoutBucket(TimeSpan timeout) => new DateTime((((DateTime.UtcNow + timeout).Ticks / ticksPerBucket) * ticksPerBucket), DateTimeKind.Utc);
Task<ClientEvent> PerformRequestWithTimeoutAndCancellation(CancellationToken token, IClientRequestEvent request, bool doneWhenSent)
Task<ClientEvent> PerformRequestWithTimeout(IClientRequestEvent request)
{
var partitionEvent = (PartitionEvent)request;
int timeoutId = this.ResponseTimeouts.GetFreshId();
var pendingRequest = new PendingRequest(request.RequestId, request.EventId, partitionEvent.PartitionId, this, request.TimeoutUtc, timeoutId);
this.ResponseWaiters.TryAdd(request.RequestId, pendingRequest);
this.ResponseTimeouts.Schedule(request.TimeoutUtc, pendingRequest, timeoutId);
this.CheckForShutdown();
DurabilityListeners.Register((Event)request, pendingRequest);
@ -235,6 +263,51 @@ namespace DurableTask.Netherite
return pendingRequest.Task;
}
async Task<ClientEvent> PerformRequestWithTimeoutAndCancellation(IClientRequestEvent request, CancellationToken token)
{
long requestId = request.RequestId;
using CancellationTokenRegistration _ = token.Register(() =>
{
if (this.ResponseWaiters.TryGetValue(requestId, out PendingRequest request))
{
var exception = new OperationCanceledException("Client request was cancelled by the application-provided cancellation token", token);
request.TryCancel(exception);
}
});
var partitionEvent = (PartitionEvent)request;
int timeoutId = this.ResponseTimeouts.GetFreshId();
var pendingRequest = new PendingRequest(requestId, request.EventId, partitionEvent.PartitionId, this, request.TimeoutUtc, timeoutId);
this.ResponseWaiters.TryAdd(requestId, pendingRequest);
this.ResponseTimeouts.Schedule(request.TimeoutUtc, pendingRequest, timeoutId);
this.CheckForShutdown();
DurabilityListeners.Register((Event)request, pendingRequest);
this.Send(partitionEvent);
return await pendingRequest.Task.ConfigureAwait(false);
}
void CheckForShutdown()
{
try
{
if (this.allRemainingRequestsAreNowBeingCancelled)
{
throw shutdownException;
}
}
catch (ObjectDisposedException)
{
throw shutdownException;
}
}
static readonly TimeoutException timeoutException = new TimeoutException("Client request timed out.");
static readonly OperationCanceledException shutdownException = new OperationCanceledException("Client request was cancelled because host is shutting down.");
internal class PendingRequest : TransportAbstraction.IDurabilityOrExceptionListener
{
readonly long requestId;
@ -245,7 +318,6 @@ namespace DurableTask.Netherite
readonly TaskCompletionSource<ClientEvent> continuation;
readonly double startTime;
static readonly TimeoutException timeoutException = new TimeoutException("Client request timed out.");
public Task<ClientEvent> Task => this.continuation.Task;
public (DateTime, int) TimeoutKey => this.timeoutKey;
@ -361,6 +433,15 @@ namespace DurableTask.Netherite
this.continuation.TrySetException(timeoutException);
}
}
public void TryCancel(OperationCanceledException exception)
{
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var pendingRequest))
{
this.client.traceHelper.TraceTimerProgress($"cancelling ({this.timeoutKey.due:o},{this.timeoutKey.id})");
this.continuation.TrySetException(exception);
}
}
}
/******************************/
@ -393,7 +474,7 @@ namespace DurableTask.Netherite
"CreateOrchestration",
WorkItemTraceHelper.FormatEmptyMessageIdList());
var response = await this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, false).ConfigureAwait(false);
var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
var creationResponseReceived = (CreationResponseReceived)response;
if (!creationResponseReceived.Succeeded)
{
@ -435,7 +516,7 @@ namespace DurableTask.Netherite
WorkItemTraceHelper.FormatEmptyMessageIdList());
}
return this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, true);
return this.PerformRequestWithTimeout(request);
}
public async Task<OrchestrationState> WaitForOrchestrationAsync(
@ -463,7 +544,7 @@ namespace DurableTask.Netherite
try
{
var response = await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
return ((WaitResponseReceived)response)?.OrchestrationState;
}
catch (TimeoutException)
@ -490,7 +571,7 @@ namespace DurableTask.Netherite
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
};
var response = await this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, false).ConfigureAwait(false);
var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
return ((StateResponseReceived)response)?.OrchestrationState;
}
@ -510,7 +591,7 @@ namespace DurableTask.Netherite
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
};
var response = (HistoryResponseReceived)await this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, false).ConfigureAwait(false);
var response = (HistoryResponseReceived)await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
return (response?.ExecutionId, response?.History);
}
@ -746,7 +827,7 @@ namespace DurableTask.Netherite
{
var request = requestCreator(partitionId);
request.ContinuationToken = partitionPositions[partitionId];
var response = (TResponse)await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
var response = (TResponse)await this.PerformRequestWithTimeoutAndCancellation(request, cancellationToken).ConfigureAwait(false);
partitionPositions[partitionId] = response.ContinuationToken;
this.traceHelper.TraceQueryProgress(clientQueryId, request.EventIdString, partitionId, stopwatch.Elapsed, request.PageSize, response.Count, response.ContinuationToken);
return response;
@ -837,7 +918,7 @@ namespace DurableTask.Netherite
}
var stopwatch = Stopwatch.StartNew();
var response = (TResponse)await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
var response = (TResponse)await this.PerformRequestWithTimeoutAndCancellation(request, cancellationToken).ConfigureAwait(false);
this.traceHelper.TraceQueryProgress(clientQueryId, request.EventIdString, partitionId, stopwatch.Elapsed, request.PageSize, response.Count, response.ContinuationToken);
ResetRetries();
@ -904,7 +985,7 @@ namespace DurableTask.Netherite
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
};
return this.PerformRequestWithTimeoutAndCancellation(CancellationToken.None, request, true);
return this.PerformRequestWithTimeout(request);
}
public async Task<int> DeleteAllDataForOrchestrationInstance(uint partitionId, string instanceId)
@ -923,7 +1004,7 @@ namespace DurableTask.Netherite
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
};
var response = await this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, false).ConfigureAwait(false);
var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
return ((DeletionResponseReceived)response).NumberInstancesDeleted;
}
}

Просмотреть файл

@ -466,9 +466,6 @@ namespace DurableTask.Netherite
this.OnStopping?.Invoke();
this.checkedClient = null;
this.client = null;
if (this.serviceShutdownSource != null)
{
this.serviceShutdownSource.Cancel();

Просмотреть файл

@ -175,6 +175,9 @@ namespace DurableTask.Netherite
bool takeCheckpoint = this.Settings.TakeStateCheckpointWhenStoppingPartition && !quickly;
// wait for the timer loop to be stopped so we don't have timers firing during shutdown
await this.PendingTimers.StopAsync();
// for a clean shutdown we try to save some of the latest progress to storage and then release the lease
bool clean = true;
try

Просмотреть файл

@ -292,9 +292,6 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogInformation("Shutting down EventHubsBackend");
this.shutdownSource.Cancel(); // initiates shutdown of client and of all partitions
this.traceHelper.LogDebug("Stopping client");
await this.client.StopAsync();
if (this.hasWorkers)
{
this.traceHelper.LogDebug("Stopping partition and loadmonitor hosts");
@ -306,6 +303,9 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogDebug("Stopping client process loop");
await this.clientProcessTask;
this.traceHelper.LogDebug("Stopping client");
await this.client.StopAsync();
this.traceHelper.LogDebug("Closing connections");
await this.connections.StopAsync();

Просмотреть файл

@ -7,14 +7,16 @@ namespace DurableTask.Netherite
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class BatchTimer<T>
{
readonly CancellationToken cancellationToken;
readonly CancellationTokenSource cts;
readonly Action<List<T>> handler;
readonly SortedList<(DateTime due, int id), T> schedule;
readonly SemaphoreSlim notify;
readonly Action<string> tracer;
readonly TaskCompletionSource<bool> shutdownComplete;
readonly object thisLock; //TODO replace this class with a lock-free implementation
string name;
@ -22,23 +24,45 @@ namespace DurableTask.Netherite
public BatchTimer(CancellationToken token, Action<List<T>> handler, Action<string> tracer = null)
{
this.cancellationToken = token;
this.cts = CancellationTokenSource.CreateLinkedTokenSource(token);
this.handler = handler;
this.tracer = tracer;
this.schedule = new SortedList<(DateTime due, int id), T>();
this.notify = new SemaphoreSlim(0, int.MaxValue);
this.thisLock = new object();
token.Register(() => this.notify.Release());
this.shutdownComplete = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
public void Start(string name)
{
var thread = TrackedThreads.MakeTrackedThread(this.ExpirationCheckLoop, name);
var thread = TrackedThreads.MakeTrackedThread(() =>
{
try
{
this.ExpirationCheckLoop();
this.shutdownComplete.TrySetResult(true);
}
catch (OperationCanceledException)
{
// normal during shutdown
this.shutdownComplete.TrySetResult(true);
}
catch (Exception ex)
{
this.shutdownComplete.TrySetException(ex);
}
}, name);
this.name = name;
thread.Start();
}
public async Task StopAsync()
{
this.cts.Cancel();
await this.shutdownComplete.Task.ConfigureAwait(false);
this.notify.Dispose();
}
public int GetFreshId()
{
lock (this.thisLock)
@ -87,13 +111,14 @@ namespace DurableTask.Netherite
(DateTime due, int id) firstInBatch = default;
(DateTime due, int id) nextAfterBatch = default;
while (!this.cancellationToken.IsCancellationRequested)
while (!this.cts.Token.IsCancellationRequested)
{
// wait for the next expiration time or cleanup, but cut the wait short if notified
if (this.RequiresDelay(out int delay, out var due))
{
var startWait = DateTime.UtcNow;
this.notify.Wait(delay); // blocks thread until delay is over, or until notified
this.notify.Wait(delay, this.cts.Token); // blocks thread until delay is over, or until notified
this.tracer?.Invoke($"{this.name} is awakening at {(DateTime.UtcNow - due).TotalSeconds}s");
}
@ -103,7 +128,7 @@ namespace DurableTask.Netherite
while (this.schedule.Count > 0
&& next.Key.due <= DateTime.UtcNow
&& !this.cancellationToken.IsCancellationRequested)
&& !this.cts.Token.IsCancellationRequested)
{
this.schedule.RemoveAt(0);
batch.Add(next.Value);