sync dev w/ main (#324)
* Update GH automation (#303)
* initial commit (#290)
* Revert "initial commit (#290)" (#314)
This reverts commit 3a2d193e57
.
* Bump Azure.Identity from 1.7.0 to 1.10.2 in /samples/TokenCredentialDTFx (#323)
Bumps [Azure.Identity](https://github.com/Azure/azure-sdk-for-net) from 1.7.0 to 1.10.2.
- [Release notes](https://github.com/Azure/azure-sdk-for-net/releases)
- [Commits](https://github.com/Azure/azure-sdk-for-net/compare/Azure.Identity_1.7.0...Azure.Identity_1.10.2)
---
updated-dependencies:
- dependency-name: Azure.Identity
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Terminate partition when FASTER refuses to checkpoint for over a minute (#301)
---------
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Varshitha Bachu <vabachu@microsoft.com>
Co-authored-by: Sebastian Burckhardt <sburckha@microsoft.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
Родитель
08e760aeaf
Коммит
fb4aa617ff
|
@ -57,6 +57,16 @@ configuration:
|
|||
reply: This issue has been marked as duplicate and has not had any activity for **1 day**. It will be closed for housekeeping purposes.
|
||||
- closeIssue
|
||||
eventResponderTasks:
|
||||
- if:
|
||||
- payloadType: Issues
|
||||
- and:
|
||||
- isOpen
|
||||
- not:
|
||||
and:
|
||||
- isLabeled
|
||||
then:
|
||||
- addLabel:
|
||||
label: "Needs: Triage :mag:"
|
||||
- if:
|
||||
- payloadType: Issue_Comment
|
||||
- isAction:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
|
@ -8,7 +8,7 @@
|
|||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.Identity" Version="1.7.0" />
|
||||
<PackageReference Include="Azure.Identity" Version="1.10.2" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Netherite" Version="1.3.2" />
|
||||
</ItemGroup>
|
||||
|
|
|
@ -61,7 +61,7 @@ namespace DurableTask.Netherite
|
|||
this.host = host;
|
||||
}
|
||||
|
||||
public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
|
||||
public void HandleError(string context, string message, Exception? exception, bool terminatePartition, bool isWarning)
|
||||
{
|
||||
bool isFatal = exception != null && Utils.IsFatal(exception);
|
||||
|
||||
|
@ -94,7 +94,7 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
}
|
||||
|
||||
void TraceError(bool isWarning, string context, string message, Exception exception, bool terminatePartition)
|
||||
void TraceError(bool isWarning, string context, string message, Exception? exception, bool terminatePartition)
|
||||
{
|
||||
var logLevel = isWarning ? LogLevel.Warning : LogLevel.Error;
|
||||
if (this.logLevelLimit <= logLevel)
|
||||
|
|
|
@ -484,7 +484,11 @@ namespace DurableTask.Netherite.Faster
|
|||
public override async Task<long> RunCompactionAsync(long target)
|
||||
{
|
||||
string id = DateTime.UtcNow.ToString("O"); // for tracing purposes
|
||||
|
||||
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is requesting to enter semaphore with {maxCompactionThreads.CurrentCount} threads available");
|
||||
await maxCompactionThreads.WaitAsync();
|
||||
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} entered semaphore");
|
||||
|
||||
try
|
||||
{
|
||||
long beginAddressBeforeCompaction = this.Log.BeginAddress;
|
||||
|
@ -499,20 +503,47 @@ namespace DurableTask.Netherite.Faster
|
|||
target - this.Log.BeginAddress,
|
||||
this.GetElapsedCompactionMilliseconds());
|
||||
|
||||
var tokenSource = new CancellationTokenSource();
|
||||
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token);
|
||||
var tcs = new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}");
|
||||
thread.Start();
|
||||
|
||||
var winner = await Task.WhenAny(tcs.Task, timeoutTask);
|
||||
|
||||
if (winner == timeoutTask)
|
||||
{
|
||||
// compaction timed out. Terminate partition
|
||||
var exceptionMessage = $"Compaction {id} time out";
|
||||
this.partition.ErrorHandler.HandleError(nameof(RunCompactionAsync), exceptionMessage, e: null, terminatePartition: true, reportAsWarning: true);
|
||||
|
||||
// we need resolve the task to ensure the 'finally' block is executed which frees up another thread to start compating
|
||||
tcs.TrySetException(new OperationCanceledException(exceptionMessage));
|
||||
}
|
||||
else
|
||||
{
|
||||
// cancel the timeout task since compaction completed
|
||||
tokenSource.Cancel();
|
||||
}
|
||||
|
||||
await timeoutTask.ContinueWith(_ => tokenSource.Dispose());
|
||||
|
||||
// return result of compaction task
|
||||
return await tcs.Task;
|
||||
|
||||
void RunCompaction()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
||||
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} started");
|
||||
|
||||
var session = this.CreateASession($"compaction-{id}", true);
|
||||
|
||||
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} obtained a FASTER session");
|
||||
using (this.TrackTemporarySession(session))
|
||||
{
|
||||
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is invoking FASTER's compaction routine");
|
||||
long compactedUntil = session.Compact(target, CompactionType.Scan);
|
||||
|
||||
this.TraceHelper.FasterCompactionProgress(
|
||||
|
@ -525,17 +556,17 @@ namespace DurableTask.Netherite.Faster
|
|||
this.Log.BeginAddress - beginAddressBeforeCompaction,
|
||||
this.GetElapsedCompactionMilliseconds());
|
||||
|
||||
tcs.SetResult(compactedUntil);
|
||||
tcs.TrySetResult(compactedUntil);
|
||||
}
|
||||
}
|
||||
catch (Exception exception)
|
||||
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
|
||||
{
|
||||
tcs.SetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
|
||||
tcs.TrySetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
tcs.SetException(e);
|
||||
tcs.TrySetException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,6 +128,15 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
}
|
||||
|
||||
public void FasterProgress(Func<string> constructString)
|
||||
{
|
||||
if (this.logLevelLimit <= LogLevel.Debug)
|
||||
{
|
||||
var details = constructString();
|
||||
this.FasterProgress(details);
|
||||
}
|
||||
}
|
||||
|
||||
public void FasterStorageProgress(string details)
|
||||
{
|
||||
if (this.logLevelLimit <= LogLevel.Trace)
|
||||
|
|
|
@ -21,6 +21,7 @@ namespace DurableTask.Netherite.Faster
|
|||
readonly EffectTracker effectTracker;
|
||||
|
||||
bool isShuttingDown;
|
||||
DateTime? timeOfFirstRefusedCheckpoint;
|
||||
|
||||
public string InputQueueFingerprint { get; private set; }
|
||||
public (long,int) InputQueuePosition { get; private set; }
|
||||
|
@ -30,15 +31,15 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
// periodic index and store checkpointing
|
||||
CheckpointTrigger pendingCheckpointTrigger;
|
||||
Task pendingIndexCheckpoint;
|
||||
Task<(long, (long,int))> pendingStoreCheckpoint;
|
||||
Task? pendingIndexCheckpoint;
|
||||
Task<(long, (long,int))>? pendingStoreCheckpoint;
|
||||
(long,int) lastCheckpointedInputQueuePosition;
|
||||
long lastCheckpointedCommitLogPosition;
|
||||
long numberEventsSinceLastCheckpoint;
|
||||
DateTime timeOfNextIdleCheckpoint;
|
||||
|
||||
// periodic compaction
|
||||
Task<long?> pendingCompaction;
|
||||
Task<long?>? pendingCompaction;
|
||||
|
||||
// periodic load publishing
|
||||
PartitionLoadInfo loadInfo;
|
||||
|
@ -295,6 +296,50 @@ namespace DurableTask.Netherite.Faster
|
|||
Idle
|
||||
}
|
||||
|
||||
void LogCheckpointStats()
|
||||
{
|
||||
long inputQueuePositionLag = this.GetInputQueuePositionLag();
|
||||
|
||||
// since this is a pure function, we declare it as local static for improved performance
|
||||
static string ReportNullableTaskStatus(Task? t)
|
||||
{
|
||||
if (t == null)
|
||||
{
|
||||
return "is null";
|
||||
}
|
||||
else
|
||||
{
|
||||
return t.IsCompleted ? "is completed" : "is not completed";
|
||||
}
|
||||
};
|
||||
|
||||
// since the statistics log is only emitted if the trace level is at least "Debug",
|
||||
// we defer the construction of the string to relieve GC pressure
|
||||
string ConstructLogString()
|
||||
{
|
||||
|
||||
var log = $"Checkpoint statistics: " +
|
||||
$"LastCheckpointedCommitLogPosition={this.lastCheckpointedCommitLogPosition}, " +
|
||||
$"MaxNumberBytesBetweenCheckpoints={this.partition.Settings.MaxNumberBytesBetweenCheckpoints}, " +
|
||||
$"CommitLogPosition={this.CommitLogPosition}, " +
|
||||
$"NumberEventsSinceLastCheckpoint={this.numberEventsSinceLastCheckpoint}, " +
|
||||
$"MaxNumberEventsBetweenCheckpoints={this.partition.Settings.MaxNumberEventsBetweenCheckpoints}, " +
|
||||
$"InputQueuePositionLag={inputQueuePositionLag}," +
|
||||
$"TimeOfNextIdleCheckpoint={this.timeOfNextIdleCheckpoint}, " +
|
||||
$"TimeOfFirstRefusedCheckpoint={this.timeOfFirstRefusedCheckpoint}, " +
|
||||
$"PendingCompaction status={ReportNullableTaskStatus(this.pendingCompaction)}, " +
|
||||
$"PendingIndexCheckpoint status={ReportNullableTaskStatus(this.pendingIndexCheckpoint)}, " +
|
||||
$"PendingStoreCheckpoint status={ReportNullableTaskStatus(this.pendingStoreCheckpoint)}";
|
||||
return log;
|
||||
}
|
||||
this.traceHelper.FasterProgress(ConstructLogString);
|
||||
}
|
||||
|
||||
long GetInputQueuePositionLag()
|
||||
{
|
||||
return this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
|
||||
}
|
||||
|
||||
bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil)
|
||||
{
|
||||
// in a test setting, let the test decide when to checkpoint or compact
|
||||
|
@ -306,8 +351,7 @@ namespace DurableTask.Netherite.Faster
|
|||
trigger = CheckpointTrigger.None;
|
||||
compactUntil = null;
|
||||
|
||||
long inputQueuePositionLag =
|
||||
this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
|
||||
long inputQueuePositionLag = this.GetInputQueuePositionLag();
|
||||
|
||||
if (this.lastCheckpointedCommitLogPosition + this.partition.Settings.MaxNumberBytesBetweenCheckpoints <= this.CommitLogPosition)
|
||||
{
|
||||
|
@ -363,7 +407,114 @@ namespace DurableTask.Netherite.Faster
|
|||
var actual = (((earliest - offset - 1) / period) + 1) * period + offset;
|
||||
this.timeOfNextIdleCheckpoint = new DateTime(actual, DateTimeKind.Utc);
|
||||
}
|
||||
|
||||
|
||||
void StartCheckpointOrFailOnTimeout(Func<bool> checkpointRoutine, string messageOnError)
|
||||
{
|
||||
var checkpointStarted = checkpointRoutine.Invoke();
|
||||
if (checkpointStarted)
|
||||
{
|
||||
this.timeOfFirstRefusedCheckpoint = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
// track start of FASTER refusal to start checkpoint
|
||||
var currentTime = DateTime.UtcNow;
|
||||
this.timeOfFirstRefusedCheckpoint ??= currentTime;
|
||||
|
||||
// if the refusal to checkpoint started over a minute ago, terminate partition
|
||||
TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value;
|
||||
if (duration > TimeSpan.FromMinutes(1))
|
||||
{
|
||||
messageOnError += $". FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}. Terminating partition.";
|
||||
this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, e: null, terminatePartition: true, reportAsWarning: false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async ValueTask RunCheckpointingStateMachine()
|
||||
{
|
||||
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
|
||||
if (this.pendingStoreCheckpoint != null)
|
||||
{
|
||||
if (this.pendingStoreCheckpoint.IsCompleted == true)
|
||||
{
|
||||
this.traceHelper.FasterProgress("Checkpointing state machine: pendingStorecheckpoint has completed.");
|
||||
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
|
||||
= await this.pendingStoreCheckpoint; // observe exceptions here
|
||||
|
||||
// force collection of memory used during checkpointing
|
||||
GC.Collect();
|
||||
|
||||
this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state");
|
||||
// we have reached the end of the state machine transitions
|
||||
this.pendingStoreCheckpoint = null;
|
||||
this.pendingCheckpointTrigger = CheckpointTrigger.None;
|
||||
this.ScheduleNextIdleCheckpointTime();
|
||||
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
|
||||
}
|
||||
}
|
||||
else if (this.pendingIndexCheckpoint != null)
|
||||
{
|
||||
if (this.pendingIndexCheckpoint.IsCompleted == true)
|
||||
{
|
||||
this.traceHelper.FasterProgress("Checkpointing state machine: pendingIndexCheckpoint has completed");
|
||||
await this.pendingIndexCheckpoint; // observe exceptions here
|
||||
|
||||
// the store checkpoint is next
|
||||
this.StartCheckpointOrFailOnTimeout(
|
||||
checkpointRoutine: () =>
|
||||
{
|
||||
var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null);
|
||||
if (token.HasValue)
|
||||
{
|
||||
this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint started");
|
||||
this.pendingIndexCheckpoint = null;
|
||||
this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true);
|
||||
this.numberEventsSinceLastCheckpoint = 0;
|
||||
}
|
||||
var checkpointStarted = token.HasValue;
|
||||
return checkpointStarted;
|
||||
|
||||
},
|
||||
messageOnError: "Could not start store checkpoint before timeout");
|
||||
}
|
||||
}
|
||||
else if (this.pendingCompaction != null)
|
||||
{
|
||||
if (this.pendingCompaction.IsCompleted == true)
|
||||
{
|
||||
this.traceHelper.FasterProgress("Checkpointing state machine: pendingCompaction has completed");
|
||||
await this.pendingCompaction; // observe exceptions here
|
||||
|
||||
// force collection of memory used during compaction
|
||||
GC.Collect();
|
||||
|
||||
// the index checkpoint is next
|
||||
this.StartCheckpointOrFailOnTimeout(
|
||||
checkpointRoutine: () =>
|
||||
{
|
||||
var token = this.store.StartIndexCheckpoint();
|
||||
if (token.HasValue)
|
||||
{
|
||||
this.traceHelper.FasterProgress("Checkpointing state machine: index checkpoint started");
|
||||
this.pendingCompaction = null;
|
||||
this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false);
|
||||
}
|
||||
var checkpointStarted = token.HasValue;
|
||||
return checkpointStarted;
|
||||
},
|
||||
messageOnError: "Could not start index checkpoint before timeout");
|
||||
}
|
||||
}
|
||||
else if (this.CheckpointDue(out var trigger, out long? compactUntil))
|
||||
{
|
||||
this.traceHelper.FasterProgress($"Checkpointing state machine: checkpoint is due. Trigger='{trigger}'. compactUntil='{compactUntil}'");
|
||||
this.pendingCheckpointTrigger = trigger;
|
||||
|
||||
this.pendingCompaction = this.RunCompactionAsync(compactUntil);
|
||||
}
|
||||
}
|
||||
|
||||
protected override async Task Process(IList<PartitionEvent> batch)
|
||||
{
|
||||
try
|
||||
|
@ -436,68 +587,15 @@ namespace DurableTask.Netherite.Faster
|
|||
this.store.AdjustCacheSize();
|
||||
|
||||
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
|
||||
if (this.pendingStoreCheckpoint != null)
|
||||
{
|
||||
if (this.pendingStoreCheckpoint.IsCompleted == true)
|
||||
{
|
||||
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
|
||||
= await this.pendingStoreCheckpoint; // observe exceptions here
|
||||
|
||||
// force collection of memory used during checkpointing
|
||||
GC.Collect();
|
||||
|
||||
// we have reached the end of the state machine transitions
|
||||
this.pendingStoreCheckpoint = null;
|
||||
this.pendingCheckpointTrigger = CheckpointTrigger.None;
|
||||
this.ScheduleNextIdleCheckpointTime();
|
||||
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
|
||||
}
|
||||
}
|
||||
else if (this.pendingIndexCheckpoint != null)
|
||||
{
|
||||
if (this.pendingIndexCheckpoint.IsCompleted == true)
|
||||
{
|
||||
await this.pendingIndexCheckpoint; // observe exceptions here
|
||||
|
||||
// the store checkpoint is next
|
||||
var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null);
|
||||
if (token.HasValue)
|
||||
{
|
||||
this.pendingIndexCheckpoint = null;
|
||||
this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true);
|
||||
this.numberEventsSinceLastCheckpoint = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (this.pendingCompaction != null)
|
||||
{
|
||||
if (this.pendingCompaction.IsCompleted == true)
|
||||
{
|
||||
await this.pendingCompaction; // observe exceptions here
|
||||
|
||||
// force collection of memory used during compaction
|
||||
GC.Collect();
|
||||
|
||||
// the index checkpoint is next
|
||||
var token = this.store.StartIndexCheckpoint();
|
||||
if (token.HasValue)
|
||||
{
|
||||
this.pendingCompaction = null;
|
||||
this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (this.CheckpointDue(out var trigger, out long? compactUntil))
|
||||
{
|
||||
this.pendingCheckpointTrigger = trigger;
|
||||
this.pendingCompaction = this.RunCompactionAsync(compactUntil);
|
||||
}
|
||||
await this.RunCheckpointingStateMachine();
|
||||
|
||||
// periodically publish the partition load information and the send/receive positions
|
||||
// also report checkpointing stats
|
||||
if (this.lastPublished + PublishInterval < DateTime.UtcNow)
|
||||
{
|
||||
this.lastPublished = DateTime.UtcNow;
|
||||
await this.PublishLoadAndPositions();
|
||||
this.LogCheckpointStats();
|
||||
}
|
||||
|
||||
if (this.partition.NumberPartitions() > 1 && this.partition.Settings.ActivityScheduler == ActivitySchedulerOptions.Locavore)
|
||||
|
@ -574,7 +672,6 @@ namespace DurableTask.Netherite.Faster
|
|||
if (target.HasValue)
|
||||
{
|
||||
target = await this.store.RunCompactionAsync(target.Value);
|
||||
|
||||
this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler);
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче