Terminate partition when FASTER refuses to checkpoint for over a minute (#301)

This commit is contained in:
David Justo 2023-10-19 13:55:54 -07:00 коммит произвёл GitHub
Родитель 6effbf572c
Коммит ba15eed262
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 205 добавлений и 68 удалений

Просмотреть файл

@ -61,7 +61,7 @@ namespace DurableTask.Netherite
this.host = host;
}
public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
public void HandleError(string context, string message, Exception? exception, bool terminatePartition, bool isWarning)
{
bool isFatal = exception != null && Utils.IsFatal(exception);
@ -94,7 +94,7 @@ namespace DurableTask.Netherite
}
}
void TraceError(bool isWarning, string context, string message, Exception exception, bool terminatePartition)
void TraceError(bool isWarning, string context, string message, Exception? exception, bool terminatePartition)
{
var logLevel = isWarning ? LogLevel.Warning : LogLevel.Error;
if (this.logLevelLimit <= logLevel)

Просмотреть файл

@ -484,7 +484,11 @@ namespace DurableTask.Netherite.Faster
public override async Task<long> RunCompactionAsync(long target)
{
string id = DateTime.UtcNow.ToString("O"); // for tracing purposes
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is requesting to enter semaphore with {maxCompactionThreads.CurrentCount} threads available");
await maxCompactionThreads.WaitAsync();
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} entered semaphore");
try
{
long beginAddressBeforeCompaction = this.Log.BeginAddress;
@ -499,20 +503,47 @@ namespace DurableTask.Netherite.Faster
target - this.Log.BeginAddress,
this.GetElapsedCompactionMilliseconds());
var tokenSource = new CancellationTokenSource();
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token);
var tcs = new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously);
var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}");
thread.Start();
var winner = await Task.WhenAny(tcs.Task, timeoutTask);
if (winner == timeoutTask)
{
// compaction timed out. Terminate partition
var exceptionMessage = $"Compaction {id} time out";
this.partition.ErrorHandler.HandleError(nameof(RunCompactionAsync), exceptionMessage, e: null, terminatePartition: true, reportAsWarning: true);
// we need resolve the task to ensure the 'finally' block is executed which frees up another thread to start compating
tcs.TrySetException(new OperationCanceledException(exceptionMessage));
}
else
{
// cancel the timeout task since compaction completed
tokenSource.Cancel();
}
await timeoutTask.ContinueWith(_ => tokenSource.Dispose());
// return result of compaction task
return await tcs.Task;
void RunCompaction()
{
try
{
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} started");
var session = this.CreateASession($"compaction-{id}", true);
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} obtained a FASTER session");
using (this.TrackTemporarySession(session))
{
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is invoking FASTER's compaction routine");
long compactedUntil = session.Compact(target, CompactionType.Scan);
this.TraceHelper.FasterCompactionProgress(
@ -525,17 +556,17 @@ namespace DurableTask.Netherite.Faster
this.Log.BeginAddress - beginAddressBeforeCompaction,
this.GetElapsedCompactionMilliseconds());
tcs.SetResult(compactedUntil);
tcs.TrySetResult(compactedUntil);
}
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
{
tcs.SetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
tcs.TrySetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
}
catch (Exception e)
{
tcs.SetException(e);
tcs.TrySetException(e);
}
}
}

Просмотреть файл

@ -128,6 +128,15 @@ namespace DurableTask.Netherite.Faster
}
}
public void FasterProgress(Func<string> constructString)
{
if (this.logLevelLimit <= LogLevel.Debug)
{
var details = constructString();
this.FasterProgress(details);
}
}
public void FasterStorageProgress(string details)
{
if (this.logLevelLimit <= LogLevel.Trace)

Просмотреть файл

@ -21,6 +21,7 @@ namespace DurableTask.Netherite.Faster
readonly EffectTracker effectTracker;
bool isShuttingDown;
DateTime? timeOfFirstRefusedCheckpoint;
public string InputQueueFingerprint { get; private set; }
public (long,int) InputQueuePosition { get; private set; }
@ -30,15 +31,15 @@ namespace DurableTask.Netherite.Faster
// periodic index and store checkpointing
CheckpointTrigger pendingCheckpointTrigger;
Task pendingIndexCheckpoint;
Task<(long, (long,int))> pendingStoreCheckpoint;
Task? pendingIndexCheckpoint;
Task<(long, (long,int))>? pendingStoreCheckpoint;
(long,int) lastCheckpointedInputQueuePosition;
long lastCheckpointedCommitLogPosition;
long numberEventsSinceLastCheckpoint;
DateTime timeOfNextIdleCheckpoint;
// periodic compaction
Task<long?> pendingCompaction;
Task<long?>? pendingCompaction;
// periodic load publishing
PartitionLoadInfo loadInfo;
@ -281,6 +282,50 @@ namespace DurableTask.Netherite.Faster
Idle
}
void LogCheckpointStats()
{
long inputQueuePositionLag = this.GetInputQueuePositionLag();
// since this is a pure function, we declare it as local static for improved performance
static string ReportNullableTaskStatus(Task? t)
{
if (t == null)
{
return "is null";
}
else
{
return t.IsCompleted ? "is completed" : "is not completed";
}
};
// since the statistics log is only emitted if the trace level is at least "Debug",
// we defer the construction of the string to relieve GC pressure
string ConstructLogString()
{
var log = $"Checkpoint statistics: " +
$"LastCheckpointedCommitLogPosition={this.lastCheckpointedCommitLogPosition}, " +
$"MaxNumberBytesBetweenCheckpoints={this.partition.Settings.MaxNumberBytesBetweenCheckpoints}, " +
$"CommitLogPosition={this.CommitLogPosition}, " +
$"NumberEventsSinceLastCheckpoint={this.numberEventsSinceLastCheckpoint}, " +
$"MaxNumberEventsBetweenCheckpoints={this.partition.Settings.MaxNumberEventsBetweenCheckpoints}, " +
$"InputQueuePositionLag={inputQueuePositionLag}," +
$"TimeOfNextIdleCheckpoint={this.timeOfNextIdleCheckpoint}, " +
$"TimeOfFirstRefusedCheckpoint={this.timeOfFirstRefusedCheckpoint}, " +
$"PendingCompaction status={ReportNullableTaskStatus(this.pendingCompaction)}, " +
$"PendingIndexCheckpoint status={ReportNullableTaskStatus(this.pendingIndexCheckpoint)}, " +
$"PendingStoreCheckpoint status={ReportNullableTaskStatus(this.pendingStoreCheckpoint)}";
return log;
}
this.traceHelper.FasterProgress(ConstructLogString);
}
long GetInputQueuePositionLag()
{
return this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
}
bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil)
{
// in a test setting, let the test decide when to checkpoint or compact
@ -292,8 +337,7 @@ namespace DurableTask.Netherite.Faster
trigger = CheckpointTrigger.None;
compactUntil = null;
long inputQueuePositionLag =
this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
long inputQueuePositionLag = this.GetInputQueuePositionLag();
if (this.lastCheckpointedCommitLogPosition + this.partition.Settings.MaxNumberBytesBetweenCheckpoints <= this.CommitLogPosition)
{
@ -349,7 +393,114 @@ namespace DurableTask.Netherite.Faster
var actual = (((earliest - offset - 1) / period) + 1) * period + offset;
this.timeOfNextIdleCheckpoint = new DateTime(actual, DateTimeKind.Utc);
}
void StartCheckpointOrFailOnTimeout(Func<bool> checkpointRoutine, string messageOnError)
{
var checkpointStarted = checkpointRoutine.Invoke();
if (checkpointStarted)
{
this.timeOfFirstRefusedCheckpoint = null;
}
else
{
// track start of FASTER refusal to start checkpoint
var currentTime = DateTime.UtcNow;
this.timeOfFirstRefusedCheckpoint ??= currentTime;
// if the refusal to checkpoint started over a minute ago, terminate partition
TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value;
if (duration > TimeSpan.FromMinutes(1))
{
messageOnError += $". FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}. Terminating partition.";
this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, e: null, terminatePartition: true, reportAsWarning: false);
}
}
}
async ValueTask RunCheckpointingStateMachine()
{
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
if (this.pendingStoreCheckpoint != null)
{
if (this.pendingStoreCheckpoint.IsCompleted == true)
{
this.traceHelper.FasterProgress("Checkpointing state machine: pendingStorecheckpoint has completed.");
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
= await this.pendingStoreCheckpoint; // observe exceptions here
// force collection of memory used during checkpointing
GC.Collect();
this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state");
// we have reached the end of the state machine transitions
this.pendingStoreCheckpoint = null;
this.pendingCheckpointTrigger = CheckpointTrigger.None;
this.ScheduleNextIdleCheckpointTime();
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
}
}
else if (this.pendingIndexCheckpoint != null)
{
if (this.pendingIndexCheckpoint.IsCompleted == true)
{
this.traceHelper.FasterProgress("Checkpointing state machine: pendingIndexCheckpoint has completed");
await this.pendingIndexCheckpoint; // observe exceptions here
// the store checkpoint is next
this.StartCheckpointOrFailOnTimeout(
checkpointRoutine: () =>
{
var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null);
if (token.HasValue)
{
this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint started");
this.pendingIndexCheckpoint = null;
this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true);
this.numberEventsSinceLastCheckpoint = 0;
}
var checkpointStarted = token.HasValue;
return checkpointStarted;
},
messageOnError: "Could not start store checkpoint before timeout");
}
}
else if (this.pendingCompaction != null)
{
if (this.pendingCompaction.IsCompleted == true)
{
this.traceHelper.FasterProgress("Checkpointing state machine: pendingCompaction has completed");
await this.pendingCompaction; // observe exceptions here
// force collection of memory used during compaction
GC.Collect();
// the index checkpoint is next
this.StartCheckpointOrFailOnTimeout(
checkpointRoutine: () =>
{
var token = this.store.StartIndexCheckpoint();
if (token.HasValue)
{
this.traceHelper.FasterProgress("Checkpointing state machine: index checkpoint started");
this.pendingCompaction = null;
this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false);
}
var checkpointStarted = token.HasValue;
return checkpointStarted;
},
messageOnError: "Could not start index checkpoint before timeout");
}
}
else if (this.CheckpointDue(out var trigger, out long? compactUntil))
{
this.traceHelper.FasterProgress($"Checkpointing state machine: checkpoint is due. Trigger='{trigger}'. compactUntil='{compactUntil}'");
this.pendingCheckpointTrigger = trigger;
this.pendingCompaction = this.RunCompactionAsync(compactUntil);
}
}
protected override async Task Process(IList<PartitionEvent> batch)
{
try
@ -420,68 +571,15 @@ namespace DurableTask.Netherite.Faster
this.store.AdjustCacheSize();
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
if (this.pendingStoreCheckpoint != null)
{
if (this.pendingStoreCheckpoint.IsCompleted == true)
{
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
= await this.pendingStoreCheckpoint; // observe exceptions here
// force collection of memory used during checkpointing
GC.Collect();
// we have reached the end of the state machine transitions
this.pendingStoreCheckpoint = null;
this.pendingCheckpointTrigger = CheckpointTrigger.None;
this.ScheduleNextIdleCheckpointTime();
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
}
}
else if (this.pendingIndexCheckpoint != null)
{
if (this.pendingIndexCheckpoint.IsCompleted == true)
{
await this.pendingIndexCheckpoint; // observe exceptions here
// the store checkpoint is next
var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null);
if (token.HasValue)
{
this.pendingIndexCheckpoint = null;
this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true);
this.numberEventsSinceLastCheckpoint = 0;
}
}
}
else if (this.pendingCompaction != null)
{
if (this.pendingCompaction.IsCompleted == true)
{
await this.pendingCompaction; // observe exceptions here
// force collection of memory used during compaction
GC.Collect();
// the index checkpoint is next
var token = this.store.StartIndexCheckpoint();
if (token.HasValue)
{
this.pendingCompaction = null;
this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false);
}
}
}
else if (this.CheckpointDue(out var trigger, out long? compactUntil))
{
this.pendingCheckpointTrigger = trigger;
this.pendingCompaction = this.RunCompactionAsync(compactUntil);
}
await this.RunCheckpointingStateMachine();
// periodically publish the partition load information and the send/receive positions
// also report checkpointing stats
if (this.lastPublished + PublishInterval < DateTime.UtcNow)
{
this.lastPublished = DateTime.UtcNow;
await this.PublishLoadAndPositions();
this.LogCheckpointStats();
}
if (this.partition.NumberPartitions() > 1 && this.partition.Settings.ActivityScheduler == ActivitySchedulerOptions.Locavore)
@ -558,7 +656,6 @@ namespace DurableTask.Netherite.Faster
if (target.HasValue)
{
target = await this.store.RunCompactionAsync(target.Value);
this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler);
}