Remove deleted instance and history objects from Faster (#28)

* implement proper deletion of tracked objects

* add assertion
This commit is contained in:
Sebastian Burckhardt 2021-03-30 19:03:30 -07:00 коммит произвёл GitHub
Родитель 5e8f9cc544
Коммит e29e8effca
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 54 добавлений и 24 удалений

Просмотреть файл

@ -17,15 +17,19 @@ namespace DurableTask.Netherite
class EffectTracker : List<TrackedObjectKey>
{
readonly Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore;
readonly Func<IEnumerable<TrackedObjectKey>, ValueTask> removeFromStore;
readonly Func<(long, long)> getPositions;
readonly System.Diagnostics.Stopwatch stopWatch;
readonly HashSet<TrackedObjectKey> deletedKeys;
public EffectTracker(Partition partition, Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore, Func<(long, long)> getPositions)
public EffectTracker(Partition partition, Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore, Func<IEnumerable<TrackedObjectKey>, ValueTask> removeFromStore, Func<(long, long)> getPositions)
{
this.Partition = partition;
this.applyToStore = applyToStore;
this.removeFromStore = removeFromStore;
this.getPositions = getPositions;
this.stopWatch = new System.Diagnostics.Stopwatch();
this.deletedKeys = new HashSet<TrackedObjectKey>();
this.stopWatch.Start();
}
@ -57,6 +61,11 @@ namespace DurableTask.Netherite
trackedObject.Process(this.Effect, this);
}
public void AddDeletion(TrackedObjectKey key)
{
this.deletedKeys.Add(key);
}
public async Task ProcessUpdate(PartitionUpdateEvent updateEvent)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
@ -99,6 +108,12 @@ namespace DurableTask.Netherite
this.RemoveAt(startPos);
}
if (this.deletedKeys.Count > 0)
{
await this.removeFromStore(this.deletedKeys);
this.deletedKeys.Clear();
}
this.Effect = null;
}
catch (OperationCanceledException)

Просмотреть файл

@ -43,14 +43,6 @@ namespace DurableTask.Netherite
return $"History InstanceId={this.InstanceId} ExecutionId={this.ExecutionId} Events={this.History.Count}";
}
void DeleteHistory()
{
this.History = null;
this.Episode = 0;
this.ExecutionId = null;
this.CachedOrchestrationWorkItem = null;
}
public void Process(BatchProcessed evt, EffectTracker effects)
{
// can add events to the history, or replace it with a new history
@ -99,15 +91,5 @@ namespace DurableTask.Netherite
}
}
}
public void Process(DeletionRequestReceived deletionRequestReceived, EffectTracker effects)
{
this.DeleteHistory();
}
public void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects)
{
this.DeleteHistory();
}
}
}

Просмотреть файл

@ -138,11 +138,13 @@ namespace DurableTask.Netherite
if (this.OrchestrationState != null
&& (!deletionRequest.CreatedTime.HasValue || deletionRequest.CreatedTime.Value == this.OrchestrationState.CreatedTime))
{
this.OrchestrationState = null;
numberInstancesDeleted++;
// we also delete this instance's history, and pending operations on it
effects.Add(TrackedObjectKey.History(this.InstanceId));
// delete instance object and history object
effects.AddDeletion(this.Key);
effects.AddDeletion(TrackedObjectKey.History(this.InstanceId));
// also delete all task messages headed for this instance
effects.Add(TrackedObjectKey.Sessions);
}
@ -163,9 +165,11 @@ namespace DurableTask.Netherite
if (this.OrchestrationState != null
&& purgeBatchIssued.InstanceQuery.Matches(this.OrchestrationState))
{
this.OrchestrationState = null;
purgeBatchIssued.Purged.Add(this.InstanceId);
effects.Add(TrackedObjectKey.History(this.InstanceId));
// delete instance object and history object
effects.AddDeletion(this.Key);
effects.AddDeletion(TrackedObjectKey.History(this.InstanceId));
}
}
}

Просмотреть файл

@ -367,6 +367,12 @@ namespace DurableTask.Netherite.Faster
}
}
public override ValueTask RemoveKeys(IEnumerable<TrackedObjectKey> keys)
{
// TODO
throw new NotImplementedException();
}
#region storage access operation
CloudBlockBlob GetBlob(TrackedObjectKey key)

Просмотреть файл

@ -512,6 +512,16 @@ namespace DurableTask.Netherite.Faster
}
}
public override ValueTask RemoveKeys(IEnumerable<TrackedObjectKey> keys)
{
foreach (var key in keys)
{
this.partition.Assert(!key.IsSingleton);
this.mainSession.Delete(key);
}
return default;
}
IAsyncEnumerable<OrchestrationState> ScanOrchestrationStates(
EffectTracker effectTracker,
PartitionQueryEvent queryEvent)

Просмотреть файл

@ -60,6 +60,7 @@ namespace DurableTask.Netherite.Faster
this.effectTracker = new EffectTracker(
this.partition,
(key, tracker) => store.ProcessEffectOnTrackedObject(key, tracker),
(keys) => store.RemoveKeys(keys),
() => (this.CommitLogPosition, this.InputQueuePosition)
);
}

Просмотреть файл

@ -48,6 +48,8 @@ namespace DurableTask.Netherite.Faster
public abstract ValueTask ProcessEffectOnTrackedObject(FasterKV.Key k, EffectTracker tracker);
public abstract ValueTask RemoveKeys(IEnumerable<TrackedObjectKey> keys);
public StoreStatistics StoreStats { get; } = new StoreStatistics();
public class StoreStatistics

Просмотреть файл

@ -113,6 +113,7 @@ namespace DurableTask.Netherite
var effects = new EffectTracker(
this.partition,
this.ApplyToStore,
this.RemoveFromStore,
() => (this.commitPosition, this.inputQueuePosition)
);
@ -183,6 +184,15 @@ namespace DurableTask.Netherite
return default;
}
public ValueTask RemoveFromStore(IEnumerable<TrackedObjectKey> keys)
{
foreach (var key in keys)
{
this.trackedObjects.TryRemove(key, out _);
}
return default;
}
public Task Prefetch(IEnumerable<TrackedObjectKey> keys)
{
return Task.CompletedTask;