implement a watchdog that terminates CompletePending quickly if it hangs
This commit is contained in:
Родитель
2ca5422bf6
Коммит
a35c6c8af9
|
@ -232,6 +232,17 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
TimeSpan limit = Debugger.IsAttached ? TimeSpan.FromMinutes(30) : TimeSpan.FromMinutes(3);
|
||||
|
||||
if (this.blobManager.PartitionErrorHandler.IsTerminated)
|
||||
{
|
||||
return; // partition is already terminated, no point in checking for hangs
|
||||
}
|
||||
|
||||
// check if a store worker is stuck in a specific place.
|
||||
if (this.storeWorker.WatchdogSeesExpiredDeadline())
|
||||
{
|
||||
this.blobManager.PartitionErrorHandler.HandleError("CheckForStuckWorkers", $"store worker deemed stuck by watchdog", null, true, false);
|
||||
}
|
||||
|
||||
// check if any of the workers got stuck in a processing loop
|
||||
Check("StoreWorker", this.storeWorker.ProcessingBatchSince);
|
||||
Check("LogWorker", this.logWorker.ProcessingBatchSince);
|
||||
|
|
|
@ -50,6 +50,20 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
CancellationTokenSource ioCompletionNotificationCancellation;
|
||||
|
||||
long Deadline = 0;
|
||||
void StartWatchdog(TimeSpan maxTime)
|
||||
{
|
||||
Interlocked.Exchange(ref this.Deadline, (DateTime.UtcNow + maxTime).Ticks);
|
||||
}
|
||||
void StopWatchdog()
|
||||
{
|
||||
Interlocked.Exchange(ref this.Deadline, 0);
|
||||
}
|
||||
public bool WatchdogSeesExpiredDeadline()
|
||||
{
|
||||
var deadline = Interlocked.Read(ref this.Deadline);
|
||||
return deadline != 0 && DateTime.UtcNow > new DateTime(deadline);
|
||||
}
|
||||
|
||||
public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken)
|
||||
: base($"{nameof(StoreWorker)}{partition.PartitionId:D2}", true, 500, cancellationToken, partition.TraceHelper)
|
||||
|
@ -372,7 +386,9 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
|
||||
// if there are IO responses ready to process, do that first
|
||||
this.StartWatchdog(TimeSpan.FromSeconds(10)); // This is a temporary mitigation for #251
|
||||
this.store.CompletePending();
|
||||
this.StopWatchdog();
|
||||
|
||||
// record the current time, for measuring latency in the event processing pipeline
|
||||
partitionEvent.IssuedTimestamp = this.partition.CurrentTimeMs;
|
||||
|
|
Загрузка…
Ссылка в новой задаче