add delay when starting partitions repeatedly fails

This commit is contained in:
sebastianburckhardt 2023-04-28 08:13:10 -07:00
Родитель 6e1ec94b6d
Коммит f9f96d2194
1 изменённых файлов: 21 добавлений и 2 удалений

Просмотреть файл

@ -63,6 +63,7 @@ namespace DurableTask.Netherite.EventHubsTransport
public TransportAbstraction.IPartition Partition;
public Task<PartitionIncarnation> Next;
public long NextPacketToReceive;
public int SuccessiveStartupFailures;
}
readonly Dictionary<string, MemoryStream> reassembly = new Dictionary<string, MemoryStream>();
@ -155,7 +156,24 @@ namespace DurableTask.Netherite.EventHubsTransport
{
// we are now becoming the current incarnation
this.currentIncarnation = prior.Next;
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is restarting partition (incarnation {incarnation}) soon", this.eventHubName, this.eventHubPartition, c.Incarnation);
// sometimes we can get stuck into a loop of failing attempts to reincarnate a partition.
// We don't want to waste CPU and pollute the logs, but we als can't just give up because
// the failures can be transient. Thus we back off the retry pace.
TimeSpan addedDelay =
(prior.SuccessiveStartupFailures < 2) ? TimeSpan.Zero
: (prior.SuccessiveStartupFailures < 10) ? TimeSpan.FromSeconds(1)
: (prior.SuccessiveStartupFailures < 100) ? TimeSpan.FromSeconds(5)
: (prior.SuccessiveStartupFailures < 200) ? TimeSpan.FromSeconds(10)
: TimeSpan.FromMinutes(1);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is restarting partition (incarnation {incarnation}) soon; addedDelay={addedDelay}", this.eventHubName, this.eventHubPartition, c.Incarnation, addedDelay);
if (addedDelay != TimeSpan.Zero)
{
await Task.Delay(addedDelay);
}
// we wait at most 20 seconds for the previous partition to terminate cleanly
int tries = 4;
@ -226,8 +244,9 @@ namespace DurableTask.Netherite.EventHubsTransport
// the partition startup was canceled
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} canceled partition startup (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, c.Incarnation);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
c.SuccessiveStartupFailures = 1 + (prior?.SuccessiveStartupFailures ?? 0);
c.ErrorHandler.HandleError("EventHubsProcessor.StartPartitionAsync", "failed to start partition", e, true, false);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} failed during startup (incarnation {incarnation}): {exception}", this.eventHubName, this.eventHubPartition, c.Incarnation, e);
}