implement faster partition shutdown, by not waiting for EventProcessorHost to initiate shutdown. (#14)

This commit is contained in:
Sebastian Burckhardt 2021-01-25 15:37:23 -08:00 коммит произвёл GitHub
Родитель c30a121db4
Коммит 2742fb3566
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 53 добавлений и 23 удалений

Просмотреть файл

@ -31,6 +31,8 @@ namespace DurableTask.Netherite.EventHubs
//private uint partitionId;
CancellationTokenSource eventProcessorShutdown;
// we set this task once shutdown has been initiated
Task shutdownTask = null;
// we occasionally checkpoint received packets with eventhubs. It is not required for correctness
// as we filter duplicates anyway, but it will help startup time.
@ -68,7 +70,8 @@ namespace DurableTask.Netherite.EventHubs
TaskhubParameters parameters,
PartitionContext partitionContext,
NetheriteOrchestrationServiceSettings settings,
EventHubsTraceHelper logger)
EventHubsTraceHelper logger,
CancellationToken shutdownToken)
{
this.host = host;
this.sender = sender;
@ -81,6 +84,10 @@ namespace DurableTask.Netherite.EventHubs
this.taskHubGuid = parameters.TaskhubGuid.ToByteArray();
this.partitionId = uint.Parse(this.eventHubPartition);
this.traceHelper = logger;
var _ = shutdownToken.Register(
() => { var _ = Task.Run(this.IdempotentShutdown); },
useSynchronizationContext: false);
}
Task IEventProcessor.OpenAsync(PartitionContext context)
@ -191,31 +198,54 @@ namespace DurableTask.Netherite.EventHubs
return c;
}
async Task IdempotentShutdown()
{
async Task ShutdownAsync()
{
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} is shutting down", this.eventHubName, this.eventHubPartition);
this.eventProcessorShutdown.Cancel(); // stops reincarnations
PartitionIncarnation current = await this.currentIncarnation.ConfigureAwait(false);
while (current != null && current.ErrorHandler.IsTerminated)
{
current = await current.Next.ConfigureAwait(false);
}
if (current == null)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} already canceled or terminated", this.eventHubName, this.eventHubPartition);
}
else
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} stopping partition (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, current.Incarnation);
await current.Partition.StopAsync(false).ConfigureAwait(false);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} stopped partition (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, current.Incarnation);
}
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} is shut down", this.eventHubName, this.eventHubPartition);
}
using (await this.deliveryLock.LockAsync())
{
if (this.shutdownTask == null)
{
this.shutdownTask = ShutdownAsync();
}
}
await this.shutdownTask;
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} is closing", this.eventHubName, this.eventHubPartition);
this.eventProcessorShutdown.Cancel(); // stops the automatic partition restart
PartitionIncarnation current = await this.currentIncarnation.ConfigureAwait(false);
while (current != null && current.ErrorHandler.IsTerminated)
{
current = await current.Next.ConfigureAwait(false);
}
if (current == null)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} already canceled or terminated", this.eventHubName, this.eventHubPartition);
}
else
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} stopping partition (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, current.Incarnation);
await current.Partition.StopAsync(false).ConfigureAwait(false);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} stopped partition (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, current.Incarnation);
}
await this.IdempotentShutdown();
await this.SaveEventHubsReceiverCheckpoint(context).ConfigureAwait(false);
this.deliveryLock.Dispose();
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} closed", this.eventHubName, this.eventHubPartition);

Просмотреть файл

@ -214,8 +214,8 @@ namespace DurableTask.Netherite.EventHubs
async Task ITaskHub.StopAsync(bool isForced)
{
this.traceHelper.LogInformation("Shutting down EventHubsBackend");
this.traceHelper.LogDebug("Stopping client event loop");
this.shutdownSource.Cancel();
this.shutdownSource.Cancel(); // initiates shutdown of client and of all partitions
this.traceHelper.LogDebug("Stopping client");
await this.client.StopAsync().ConfigureAwait(false);
this.traceHelper.LogDebug("Unregistering event processor");
@ -239,7 +239,7 @@ namespace DurableTask.Netherite.EventHubs
IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext partitionContext)
{
var processor = new EventHubsProcessor(this.host, this, this.parameters, partitionContext, this.settings, this.traceHelper);
var processor = new EventHubsProcessor(this.host, this, this.parameters, partitionContext, this.settings, this.traceHelper, this.shutdownSource.Token);
return processor;
}