Fix fencing of checkpoints (#326)
* fix fencing of checkpoints that caused occasional concurrent modification exceptions and can cause history corruption * add more comments to explain sessions and fencing
This commit is contained in:
Родитель
fb4aa617ff
Коммит
7a6e760b59
|
@ -218,7 +218,14 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
this.singletons = new TrackedObject[TrackedObjectKey.NumberSingletonTypes];
|
||||
string suffix = DateTime.UtcNow.ToString("O");
|
||||
// The main session is the session used by all reads and writes performed by the storeworker.
|
||||
// Using a single session means that the store worker sees a consistent state at all times.
|
||||
// A single session is sufficient since the storeworker performs only one operation at a time.
|
||||
this.mainSession = this.CreateASession($"main-{suffix}", false);
|
||||
// Since queries may require scans that can take a significant time, it is not advisable to perform
|
||||
// queries on the main session. We therefore create a pool of read-only sessions available for queries.
|
||||
// Query sessions may see a slightly stale store than the main session, i.e. may not contain
|
||||
// all the same updates.
|
||||
for (int i = 0; i < this.querySessions.Length; i++)
|
||||
{
|
||||
this.querySessions[i] = this.CreateASession($"query{i:D2}-{suffix}", true);
|
||||
|
@ -416,10 +423,40 @@ namespace DurableTask.Netherite.Faster
|
|||
this.fht.Log.ShiftBeginAddress(shiftBeginAddress.Value);
|
||||
}
|
||||
|
||||
long versionBeforeCheckpoint = this.mainSession.Version;
|
||||
|
||||
if (this.fht.TryInitiateHybridLogCheckpoint(out var token, CheckpointType.FoldOver))
|
||||
{
|
||||
// according to Badrish this ensures proper fencing w.r.t. session
|
||||
this.mainSession.Refresh();
|
||||
// After the checkpoint is initiated, any subsequent writes done in the mainSession must create a later version of the
|
||||
// object than the one being checkpointed. This is important to guarantee that the checkpoint contains an atomic snapshot of all objects
|
||||
// at the time the checkpoint is initiated.
|
||||
//
|
||||
// according to Badrish the loop below ensures this desired "fencing" of updates.
|
||||
// It works because the mainSession is the only session that updates tracked objects.
|
||||
// So, by waiting for it to advance its version, we make sure any later writes do not race
|
||||
// with the checkpointing thread.
|
||||
//
|
||||
// This is expected to complete very quickly; to avoid hanging
|
||||
// the store worker indefinitely should there be bugs, we add a timeout after
|
||||
// which we terminate and recover the partition.
|
||||
//
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
TimeSpan timeLimit = TimeSpan.FromMinutes(1);
|
||||
while (stopwatch.Elapsed < timeLimit)
|
||||
{
|
||||
this.mainSession.Refresh();
|
||||
if (this.mainSession.Version > versionBeforeCheckpoint)
|
||||
{
|
||||
break;
|
||||
}
|
||||
System.Threading.Thread.Sleep(5);
|
||||
}
|
||||
if (this.mainSession.Version == versionBeforeCheckpoint)
|
||||
{
|
||||
string message = $"FASTER did not advance version of main session after initiating checkpoint for over {timeLimit}. Terminating partition.";
|
||||
this.partition.ErrorHandler.HandleError(nameof(StartStoreCheckpoint), message, e: null, terminatePartition: true, reportAsWarning: false);
|
||||
return null;
|
||||
}
|
||||
|
||||
byte[] serializedSingletons = Serializer.SerializeSingletons(this.singletons);
|
||||
this.persistSingletonsTask = this.blobManager.PersistSingletonsAsync(serializedSingletons, token);
|
||||
|
|
Загрузка…
Ссылка в новой задаче