This commit is contained in:
sebastianburckhardt 2023-04-20 09:35:10 -07:00
Родитель 67255a9072
Коммит 3b74b846cf
6 изменённых файлов: 89 добавлений и 28 удалений

Просмотреть файл

@ -19,12 +19,12 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly EventHubsSender<ClientEvent>[] channels;
int roundRobin;
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, EventHubsTraceHelper traceHelper)
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
{
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
for (int i = 0; i < senders.Length; i++)
{
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], traceHelper);
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], shutdownToken, traceHelper);
}
}
@ -41,5 +41,10 @@ namespace DurableTask.Netherite.EventHubsTransport
var channel = this.channels.FirstOrDefault(this.Idle) ?? this.NextChannel();
channel.Submit(toSend);
}
public Task WaitForShutdownAsync()
{
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
}
}
}

Просмотреть файл

@ -19,6 +19,7 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly string[] clientHubs;
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;
EventHubClient partitionClient;
List<EventHubClient> clientClients;
@ -44,12 +45,14 @@ namespace DurableTask.Netherite.EventHubsTransport
ConnectionInfo connectionInfo,
string partitionHub,
string[] clientHubs,
string loadMonitorHub)
string loadMonitorHub,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
}
public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
@ -62,30 +65,43 @@ namespace DurableTask.Netherite.EventHubsTransport
this.EnsureLoadMonitorAsync());
}
public async Task StopAsync()
public Task StopAsync()
{
IEnumerable<EventHubClient> Clients()
return Task.WhenAll(
this.StopClientClients(),
this.StopPartitionClients(),
this.StopLoadMonitorClients()
);
}
async Task StopClientClients()
{
await Task.WhenAll(this._clientSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
if (this.clientClients != null)
{
if (this.partitionClient != null)
{
yield return this.partitionClient;
}
if (this.clientClients != null)
{
foreach (var client in this.clientClients)
{
yield return client;
}
}
if (this.loadMonitorHub != null)
{
yield return this.loadMonitorClient;
}
await Task.WhenAll(this.clientClients.Select(client => client.CloseAsync()).ToList());
}
}
await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList());
async Task StopPartitionClients()
{
await Task.WhenAll(this._partitionSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
if (this.partitionClient != null)
{
await this.partitionClient.CloseAsync();
}
}
async Task StopLoadMonitorClients()
{
await Task.WhenAll(this._loadMonitorSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
if (this.loadMonitorHub != null)
{
await this.loadMonitorClient.CloseAsync();
}
}
const int EventHubCreationRetries = 5;
@ -267,6 +283,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.Host,
taskHubGuid,
partitionSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created PartitionSender {sender} from {clientId}", partitionSender.ClientId, client.ClientId);
return sender;
@ -290,6 +307,7 @@ namespace DurableTask.Netherite.EventHubsTransport
taskHubGuid,
clientId,
partitionSenders,
this.shutdownToken,
this.TraceHelper);
return sender;
});
@ -304,6 +322,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.Host,
taskHubGuid,
loadMonitorSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created LoadMonitorSender {sender} from {clientId}", loadMonitorSender.ClientId, this.loadMonitorClient.ClientId);
return sender;

Просмотреть файл

@ -27,8 +27,8 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
@ -59,6 +59,7 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task SendBatch(int lastPosition)
{
maybeSent = lastPosition;
this.cancellationToken.ThrowIfCancellationRequested();
this.stopwatch.Restart();
await this.sender.SendAsync(batch).ConfigureAwait(false);
this.stopwatch.Stop();
@ -140,6 +141,12 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB",
this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024);
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);

Просмотреть файл

@ -102,7 +102,7 @@ namespace DurableTask.Netherite.EventHubsTransport
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
{
Host = host,
TraceHelper = this.traceHelper,

Просмотреть файл

@ -26,8 +26,8 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
@ -89,6 +89,7 @@ namespace DurableTask.Netherite.EventHubsTransport
int length = (int)(this.stream.Position);
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), 0, length);
var eventData = new EventData(arraySegment);
this.cancellationToken.ThrowIfCancellationRequested();
await this.sender.SendAsync(eventData);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.stream.Seek(0, SeekOrigin.Begin);
@ -106,6 +107,12 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
}
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);

Просмотреть файл

@ -39,6 +39,8 @@ namespace DurableTask.Netherite
bool processingBatch;
public TimeSpan? ProcessingBatchSince => this.processingBatch ? this.stopwatch.Elapsed : null;
volatile TaskCompletionSource<object> shutdownCompletionSource;
/// <summary>
/// Constructor including a cancellation token.
/// </summary>
@ -93,6 +95,22 @@ namespace DurableTask.Netherite
return tcs.Task;
}
public Task WaitForShutdownAsync()
{
if (!this.cancellationToken.IsCancellationRequested)
{
throw new InvalidOperationException("must call this only after canceling the token");
}
if (this.shutdownCompletionSource == null)
{
Interlocked.CompareExchange(ref this.shutdownCompletionSource, new TaskCompletionSource<object>(), null);
this.NotifyInternal();
}
return this.shutdownCompletionSource.Task;
}
readonly List<T> batch = new List<T>();
readonly List<TaskCompletionSource<bool>> waiters = new List<TaskCompletionSource<bool>>();
IList<T> requeued = null;
@ -226,6 +244,11 @@ namespace DurableTask.Netherite
this.processingBatch = false;
previousBatch = this.batch.Count;
}
if (this.cancellationToken.IsCancellationRequested)
{
this.shutdownCompletionSource?.TrySetResult(null);
}
}
public void Resume()