Implement clean shutdown of Event Hubs senders (#257)
* shutdown eventhubs senders * reorder the stopping of timeouts and clientrequests (to avoid ObjectDisposed exceptions)
This commit is contained in:
Родитель
0654d60c88
Коммит
d47dc356be
|
@ -80,8 +80,6 @@ namespace DurableTask.Netherite
|
|||
// cancel the token, if not already cancelled.
|
||||
this.cts.Cancel();
|
||||
|
||||
await this.ResponseTimeouts.StopAsync();
|
||||
|
||||
// We now enter the final stage of client shutdown, where we forcefully cancel
|
||||
// all requests that have not completed yet.
|
||||
this.allRemainingRequestsAreNowBeingCancelled = true;
|
||||
|
@ -98,6 +96,8 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
}
|
||||
|
||||
await this.ResponseTimeouts.StopAsync();
|
||||
|
||||
this.cts.Dispose();
|
||||
|
||||
this.traceHelper.TraceProgress("Stopped");
|
||||
|
|
|
@ -19,12 +19,12 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
readonly EventHubsSender<ClientEvent>[] channels;
|
||||
int roundRobin;
|
||||
|
||||
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, EventHubsTraceHelper traceHelper)
|
||||
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
|
||||
{
|
||||
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
|
||||
for (int i = 0; i < senders.Length; i++)
|
||||
{
|
||||
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], traceHelper);
|
||||
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], shutdownToken, traceHelper);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -41,5 +41,10 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
var channel = this.channels.FirstOrDefault(this.Idle) ?? this.NextChannel();
|
||||
channel.Submit(toSend);
|
||||
}
|
||||
|
||||
public Task WaitForShutdownAsync()
|
||||
{
|
||||
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
readonly string[] clientHubs;
|
||||
readonly string partitionHub;
|
||||
readonly string loadMonitorHub;
|
||||
readonly CancellationToken shutdownToken;
|
||||
|
||||
EventHubClient partitionClient;
|
||||
List<EventHubClient> clientClients;
|
||||
|
@ -44,12 +45,14 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
ConnectionInfo connectionInfo,
|
||||
string partitionHub,
|
||||
string[] clientHubs,
|
||||
string loadMonitorHub)
|
||||
string loadMonitorHub,
|
||||
CancellationToken shutdownToken)
|
||||
{
|
||||
this.connectionInfo = connectionInfo;
|
||||
this.partitionHub = partitionHub;
|
||||
this.clientHubs = clientHubs;
|
||||
this.loadMonitorHub = loadMonitorHub;
|
||||
this.shutdownToken = shutdownToken;
|
||||
}
|
||||
|
||||
public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
|
||||
|
@ -62,30 +65,43 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
this.EnsureLoadMonitorAsync());
|
||||
}
|
||||
|
||||
public async Task StopAsync()
|
||||
public Task StopAsync()
|
||||
{
|
||||
IEnumerable<EventHubClient> Clients()
|
||||
return Task.WhenAll(
|
||||
this.StopClientClients(),
|
||||
this.StopPartitionClients(),
|
||||
this.StopLoadMonitorClients()
|
||||
);
|
||||
}
|
||||
|
||||
async Task StopClientClients()
|
||||
{
|
||||
await Task.WhenAll(this._clientSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
|
||||
|
||||
if (this.clientClients != null)
|
||||
{
|
||||
if (this.partitionClient != null)
|
||||
{
|
||||
yield return this.partitionClient;
|
||||
}
|
||||
|
||||
if (this.clientClients != null)
|
||||
{
|
||||
foreach (var client in this.clientClients)
|
||||
{
|
||||
yield return client;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.loadMonitorHub != null)
|
||||
{
|
||||
yield return this.loadMonitorClient;
|
||||
}
|
||||
await Task.WhenAll(this.clientClients.Select(client => client.CloseAsync()).ToList());
|
||||
}
|
||||
}
|
||||
|
||||
await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList());
|
||||
async Task StopPartitionClients()
|
||||
{
|
||||
await Task.WhenAll(this._partitionSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
|
||||
|
||||
if (this.partitionClient != null)
|
||||
{
|
||||
await this.partitionClient.CloseAsync();
|
||||
}
|
||||
}
|
||||
|
||||
async Task StopLoadMonitorClients()
|
||||
{
|
||||
await Task.WhenAll(this._loadMonitorSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
|
||||
|
||||
if (this.loadMonitorHub != null)
|
||||
{
|
||||
await this.loadMonitorClient.CloseAsync();
|
||||
}
|
||||
}
|
||||
|
||||
const int EventHubCreationRetries = 5;
|
||||
|
@ -267,6 +283,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
this.Host,
|
||||
taskHubGuid,
|
||||
partitionSender,
|
||||
this.shutdownToken,
|
||||
this.TraceHelper);
|
||||
this.TraceHelper.LogDebug("Created PartitionSender {sender} from {clientId}", partitionSender.ClientId, client.ClientId);
|
||||
return sender;
|
||||
|
@ -290,6 +307,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
taskHubGuid,
|
||||
clientId,
|
||||
partitionSenders,
|
||||
this.shutdownToken,
|
||||
this.TraceHelper);
|
||||
return sender;
|
||||
});
|
||||
|
@ -304,6 +322,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
this.Host,
|
||||
taskHubGuid,
|
||||
loadMonitorSender,
|
||||
this.shutdownToken,
|
||||
this.TraceHelper);
|
||||
this.TraceHelper.LogDebug("Created LoadMonitorSender {sender} from {clientId}", loadMonitorSender.ClientId, this.loadMonitorClient.ClientId);
|
||||
return sender;
|
||||
|
|
|
@ -27,8 +27,8 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
|
||||
readonly Stopwatch stopwatch = new Stopwatch();
|
||||
|
||||
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
|
||||
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
|
||||
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
|
||||
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
|
||||
{
|
||||
this.host = host;
|
||||
this.taskHubGuid = taskHubGuid;
|
||||
|
@ -59,6 +59,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
async Task SendBatch(int lastPosition)
|
||||
{
|
||||
maybeSent = lastPosition;
|
||||
this.cancellationToken.ThrowIfCancellationRequested();
|
||||
this.stopwatch.Restart();
|
||||
await this.sender.SendAsync(batch).ConfigureAwait(false);
|
||||
this.stopwatch.Stop();
|
||||
|
@ -140,6 +141,12 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB",
|
||||
this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024);
|
||||
}
|
||||
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// normal during shutdown
|
||||
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
|
||||
return;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
|
||||
|
|
|
@ -102,7 +102,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
// check that the storage format is supported, and load the relevant FASTER tuning parameters
|
||||
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
|
||||
|
||||
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
|
||||
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
|
||||
{
|
||||
Host = host,
|
||||
TraceHelper = this.traceHelper,
|
||||
|
|
|
@ -26,8 +26,8 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
|
||||
readonly Stopwatch stopwatch = new Stopwatch();
|
||||
|
||||
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
|
||||
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
|
||||
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
|
||||
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
|
||||
{
|
||||
this.host = host;
|
||||
this.taskHubGuid = taskHubGuid;
|
||||
|
@ -89,6 +89,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
int length = (int)(this.stream.Position);
|
||||
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), 0, length);
|
||||
var eventData = new EventData(arraySegment);
|
||||
this.cancellationToken.ThrowIfCancellationRequested();
|
||||
await this.sender.SendAsync(eventData);
|
||||
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
|
||||
this.stream.Seek(0, SeekOrigin.Begin);
|
||||
|
@ -106,6 +107,12 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// normal during shutdown
|
||||
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
|
||||
return;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
|
||||
|
|
|
@ -39,6 +39,8 @@ namespace DurableTask.Netherite
|
|||
bool processingBatch;
|
||||
public TimeSpan? ProcessingBatchSince => this.processingBatch ? this.stopwatch.Elapsed : null;
|
||||
|
||||
volatile TaskCompletionSource<object> shutdownCompletionSource;
|
||||
|
||||
/// <summary>
|
||||
/// Constructor including a cancellation token.
|
||||
/// </summary>
|
||||
|
@ -93,6 +95,22 @@ namespace DurableTask.Netherite
|
|||
return tcs.Task;
|
||||
}
|
||||
|
||||
public Task WaitForShutdownAsync()
|
||||
{
|
||||
if (!this.cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
throw new InvalidOperationException("must call this only after canceling the token");
|
||||
}
|
||||
|
||||
if (this.shutdownCompletionSource == null)
|
||||
{
|
||||
Interlocked.CompareExchange(ref this.shutdownCompletionSource, new TaskCompletionSource<object>(), null);
|
||||
this.NotifyInternal();
|
||||
}
|
||||
|
||||
return this.shutdownCompletionSource.Task;
|
||||
}
|
||||
|
||||
readonly List<T> batch = new List<T>();
|
||||
readonly List<TaskCompletionSource<bool>> waiters = new List<TaskCompletionSource<bool>>();
|
||||
IList<T> requeued = null;
|
||||
|
@ -226,6 +244,11 @@ namespace DurableTask.Netherite
|
|||
this.processingBatch = false;
|
||||
previousBatch = this.batch.Count;
|
||||
}
|
||||
|
||||
if (this.cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
this.shutdownCompletionSource?.TrySetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
public void Resume()
|
||||
|
|
Загрузка…
Ссылка в новой задаче