revise handling of fatal exception, implement emergency shutdown path for host
This commit is contained in:
Родитель
ebda6e7fac
Коммит
bb19e5726b
|
@ -62,6 +62,11 @@ namespace DurableTask.Netherite
|
|||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
void TraceWarning(string message);
|
||||
|
||||
/// <summary>
|
||||
/// Called when some component observed a fatal exception. Host may take action to initiate a fast shutdown.
|
||||
/// </summary>
|
||||
void OnFatalExceptionObserved(Exception e);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -472,7 +472,7 @@ namespace DurableTask.Netherite
|
|||
this.serviceShutdownSource.Dispose();
|
||||
this.serviceShutdownSource = null;
|
||||
|
||||
await this.transport.StopAsync();
|
||||
await this.transport.StopAsync(fatalExceptionObserved: false);
|
||||
|
||||
this.ActivityWorkItemQueue.Dispose();
|
||||
this.OrchestrationWorkItemQueue.Dispose();
|
||||
|
@ -551,7 +551,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId)
|
||||
{
|
||||
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName);
|
||||
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this);
|
||||
}
|
||||
|
||||
void TransportAbstraction.IHost.TraceWarning(string message)
|
||||
|
@ -559,6 +559,25 @@ namespace DurableTask.Netherite
|
|||
this.TraceHelper.TraceWarning(message);
|
||||
}
|
||||
|
||||
void TransportAbstraction.IHost.OnFatalExceptionObserved(Exception e)
|
||||
{
|
||||
if (this.Settings.EmergencyShutdownOnFatalExceptions)
|
||||
{
|
||||
Task.Run(async() =>
|
||||
{
|
||||
this.TraceHelper.TraceError($"OrchestrationService is initiating an emergency shutdown due to a fatal {e.GetType().FullName}", e);
|
||||
|
||||
// try to stop the transport as quickly as possible, and don't wait longer than 30 seconds
|
||||
await Task.WhenAny(this.transport.StopAsync(fatalExceptionObserved: true), Task.Delay(TimeSpan.FromSeconds(30)));
|
||||
|
||||
this.TraceHelper.TraceWarning($"OrchestrationService is killing process in 10 seconds");
|
||||
await Task.Delay(TimeSpan.FromSeconds(10));
|
||||
|
||||
System.Environment.Exit(333);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/******************************/
|
||||
// client methods
|
||||
/******************************/
|
||||
|
|
|
@ -150,6 +150,12 @@ namespace DurableTask.Netherite
|
|||
/// </summary>
|
||||
public bool KeepInstanceIdsInMemory = true;
|
||||
|
||||
/// <summary>
|
||||
/// Whether to immediately shut down the transport layer and terminate the process when a fatal exception is observed.
|
||||
/// This is true by default, to enable failing hosts to leave quickly which allows other hosts to recover the partitions more quickly.
|
||||
/// </summary>
|
||||
public bool EmergencyShutdownOnFatalExceptions = true;
|
||||
|
||||
/// <summary>
|
||||
/// Forces steps to pe persisted before applying their effects, disabling all pipelining.
|
||||
/// </summary>
|
||||
|
|
|
@ -4,8 +4,10 @@
|
|||
namespace DurableTask.Netherite
|
||||
{
|
||||
using System;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DurableTask.Core.Common;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
// For indicating and initiating termination, and for tracing errors and warnings relating to a partition.
|
||||
|
@ -19,6 +21,7 @@ namespace DurableTask.Netherite
|
|||
readonly string account;
|
||||
readonly string taskHub;
|
||||
readonly TaskCompletionSource<object> shutdownComplete;
|
||||
readonly TransportAbstraction.IHost host;
|
||||
|
||||
public event Action OnShutdown;
|
||||
|
||||
|
@ -46,7 +49,7 @@ namespace DurableTask.Netherite
|
|||
const int TerminatedWithError = 1;
|
||||
const int TerminatedNormally = 2;
|
||||
|
||||
public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelLimit, string storageAccountName, string taskHubName)
|
||||
public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelLimit, string storageAccountName, string taskHubName, TransportAbstraction.IHost host)
|
||||
{
|
||||
this.cts = new CancellationTokenSource();
|
||||
this.partitionId = partitionId;
|
||||
|
@ -55,14 +58,19 @@ namespace DurableTask.Netherite
|
|||
this.account = storageAccountName;
|
||||
this.taskHub = taskHubName;
|
||||
this.shutdownComplete = new TaskCompletionSource<object>();
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
|
||||
{
|
||||
bool isFatal = exception != null && Utils.IsFatal(exception);
|
||||
|
||||
isWarning = isWarning && !isFatal;
|
||||
terminatePartition = terminatePartition || isFatal;
|
||||
|
||||
this.TraceError(isWarning, context, message, exception, terminatePartition);
|
||||
|
||||
// terminate this partition in response to the error
|
||||
|
||||
// if necessary, terminate this partition in response to the error
|
||||
if (terminatePartition && this.terminationStatus == NotTerminated)
|
||||
{
|
||||
if (Interlocked.CompareExchange(ref this.terminationStatus, TerminatedWithError, NotTerminated) == NotTerminated)
|
||||
|
@ -70,6 +78,12 @@ namespace DurableTask.Netherite
|
|||
this.Terminate();
|
||||
}
|
||||
}
|
||||
|
||||
// if fatal, notify host, because it may start a quick shutdown path in response
|
||||
if (isFatal)
|
||||
{
|
||||
this.host.OnFatalExceptionObserved(exception);
|
||||
}
|
||||
}
|
||||
|
||||
public void TerminateNormally()
|
||||
|
|
|
@ -640,7 +640,7 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
continue;
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
this.PartitionErrorHandler.HandleError(nameof(AcquireOwnership), "Could not acquire partition lease", e, true, false);
|
||||
throw;
|
||||
|
@ -720,7 +720,7 @@ namespace DurableTask.Netherite.Faster
|
|||
// We lost the lease to someone else. Terminate ownership immediately.
|
||||
this.PartitionErrorHandler.HandleError(nameof(MaintenanceLoopAsync), "Lost partition lease", ex, true, true);
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
this.PartitionErrorHandler.HandleError(nameof(MaintenanceLoopAsync), "Could not maintain partition lease", e, true, false);
|
||||
}
|
||||
|
|
|
@ -472,7 +472,7 @@ namespace DurableTask.Netherite.Faster
|
|||
await Task.Delay(nextRetryIn);
|
||||
continue;
|
||||
}
|
||||
catch (Exception exception) when (!Utils.IsFatal(exception))
|
||||
catch (Exception exception)
|
||||
{
|
||||
this.blobManager.PartitionErrorHandler.HandleError(nameof(LoadAsync), "Could not read object from storage", exception, true, this.blobManager.PartitionErrorHandler.IsTerminated);
|
||||
throw;
|
||||
|
@ -562,7 +562,7 @@ namespace DurableTask.Netherite.Faster
|
|||
await Task.Delay(nextRetryIn);
|
||||
continue;
|
||||
}
|
||||
catch (Exception exception) when (!Utils.IsFatal(exception))
|
||||
catch (Exception exception)
|
||||
{
|
||||
this.blobManager?.HandleStorageError(nameof(StoreAsync), "could not write object to storage", blob.Name, exception, true, this.blobManager.PartitionErrorHandler.IsTerminated);
|
||||
throw;
|
||||
|
@ -592,7 +592,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
throw new OperationCanceledException("Partition was terminated.", this.terminationToken);
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
this.blobManager.PartitionErrorHandler.HandleError(nameof(WriteCheckpointIntention), "Failed to write checkpoint intention to storage", e, true, this.blobManager.PartitionErrorHandler.IsTerminated);
|
||||
throw;
|
||||
|
@ -616,7 +616,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
throw new OperationCanceledException("Partition was terminated.", this.terminationToken);
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
this.blobManager.PartitionErrorHandler.HandleError(nameof(RemoveCheckpointIntention), "Failed to remove checkpoint intention from storage", e, true, false);
|
||||
throw;
|
||||
|
@ -648,7 +648,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
throw new OperationCanceledException("Partition was terminated.", this.terminationToken);
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
this.blobManager.PartitionErrorHandler.HandleError(nameof(ReadCheckpointIntentions), "Failed to read checkpoint intentions from storage", e, true, false);
|
||||
throw;
|
||||
|
|
|
@ -679,7 +679,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
// partition is terminating
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
this.partition.ErrorHandler.HandleError(nameof(RunPrefetchSession), "PrefetchSession {sessionId} encountered exception", e, false, this.partition.ErrorHandler.IsTerminated);
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
this.blobBatchReceiver = new BlobBatchReceiver<PartitionEvent>(traceContext, this.traceHelper, this.settings, keepUntilConfirmed: true);
|
||||
|
||||
var _ = shutdownToken.Register(
|
||||
() => { var _ = Task.Run(() => this.IdempotentShutdown("shutdownToken", false)); },
|
||||
() => { var _ = Task.Run(() => this.IdempotentShutdown("shutdownToken", eventHubsTransport.FatalExceptionObserved)); },
|
||||
useSynchronizationContext: false);
|
||||
}
|
||||
|
||||
|
@ -253,7 +253,7 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
// the partition startup was canceled
|
||||
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} canceled partition startup (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, c.Incarnation);
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
c.SuccessiveStartupFailures = 1 + (prior?.SuccessiveStartupFailures ?? 0);
|
||||
c.ErrorHandler.HandleError("EventHubsProcessor.StartPartitionAsync", "failed to start partition", e, true, false);
|
||||
|
@ -338,11 +338,16 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
await context.CheckpointAsync(checkpoint);
|
||||
this.lastCheckpointedOffset = long.Parse(checkpoint.Offset);
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
catch (Exception e)
|
||||
{
|
||||
// updating EventHubs checkpoints has been known to fail occasionally due to leases shifting around; since it is optional anyway
|
||||
// we don't want this exception to cause havoc
|
||||
this.traceHelper.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition} failed to checkpoint receive position: {e}", this.eventHubName, this.eventHubPartition, e);
|
||||
|
||||
if (Utils.IsFatal(e))
|
||||
{
|
||||
this.host.OnFatalExceptionObserved(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -165,8 +165,13 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
|
||||
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send: {e}", this.eventHubName, this.eventHubPartition, e);
|
||||
senderException = e;
|
||||
|
||||
if (Utils.IsFatal(e))
|
||||
{
|
||||
this.host.OnFatalExceptionObserved(e);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -221,9 +226,14 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
else
|
||||
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} has confirmed {confirmed}, requeued {requeued}, dropped {dropped} outbound events", this.eventHubName, this.eventHubPartition, confirmed, requeued, dropped);
|
||||
}
|
||||
catch (Exception exception) when (!Utils.IsFatal(exception))
|
||||
catch (Exception exception)
|
||||
{
|
||||
this.traceHelper.LogError("EventHubsSender {eventHubName}/{eventHubPartitionId} encountered an error while trying to confirm messages: {exception}", this.eventHubName, this.eventHubPartition, exception);
|
||||
|
||||
if (Utils.IsFatal(exception))
|
||||
{
|
||||
this.host.OnFatalExceptionObserved(exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,10 +53,14 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
CloudBlockBlob partitionScript;
|
||||
ScriptedEventProcessorHost scriptedEventProcessorHost;
|
||||
|
||||
int shutdownTriggered;
|
||||
|
||||
public Guid ClientId { get; private set; }
|
||||
|
||||
public string Fingerprint => this.connections.Fingerprint;
|
||||
|
||||
public bool FatalExceptionObserved { get; private set; }
|
||||
|
||||
public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestrationServiceSettings settings, IStorageLayer storage, ILoggerFactory loggerFactory)
|
||||
{
|
||||
if (storage is MemoryStorageLayer)
|
||||
|
@ -291,16 +295,20 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
}
|
||||
}
|
||||
|
||||
async Task ITransportLayer.StopAsync()
|
||||
async Task ITransportLayer.StopAsync(bool fatalExceptionObserved)
|
||||
{
|
||||
this.traceHelper.LogInformation("EventHubsTransport is shutting down");
|
||||
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions
|
||||
if (Interlocked.CompareExchange(ref this.shutdownTriggered, 1, 0) == 0)
|
||||
{
|
||||
this.traceHelper.LogInformation("EventHubsTransport is shutting down");
|
||||
this.FatalExceptionObserved = fatalExceptionObserved;
|
||||
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions
|
||||
|
||||
await Task.WhenAll(
|
||||
this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
|
||||
this.StopClientsAndConnectionsAsync());
|
||||
await Task.WhenAll(
|
||||
this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
|
||||
this.StopClientsAndConnectionsAsync());
|
||||
|
||||
this.traceHelper.LogInformation("EventHubsTransport is shut down");
|
||||
this.traceHelper.LogInformation("EventHubsTransport is shut down");
|
||||
}
|
||||
}
|
||||
|
||||
async Task StopWorkersAsync()
|
||||
|
|
|
@ -33,7 +33,8 @@ namespace DurableTask.Netherite
|
|||
/// <summary>
|
||||
/// Stops the transport backend.
|
||||
/// </summary>
|
||||
/// <param name="fatalExceptionObserved">Whether this stop was initiated because we have observed a fatal exception.</param>
|
||||
/// <returns>After the transport backend has stopped.</returns>
|
||||
Task StopAsync();
|
||||
Task StopAsync(bool fatalExceptionObserved);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ namespace DurableTask.Netherite.SingleHostTransport
|
|||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
async Task ITransportLayer.StopAsync()
|
||||
async Task ITransportLayer.StopAsync(bool fatalExceptionObserved)
|
||||
{
|
||||
var tasks = new List<Task>();
|
||||
tasks.Add(this.clientQueue.Client.StopAsync());
|
||||
|
|
Загрузка…
Ссылка в новой задаче