Periodic removal of checkpoints (#119)

* use path prefix to solve atomicity of taskhub creation/deletion

* implement periodic removal of obsolete checkpoints
This commit is contained in:
Sebastian Burckhardt 2022-01-25 15:27:33 -08:00 коммит произвёл GitHub
Родитель ba56fa4b23
Коммит 2499572f7f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 165 добавлений и 19 удалений

Просмотреть файл

@ -61,7 +61,7 @@ namespace DurableTask.Netherite.Faster
public CloudBlobContainer BlockBlobContainer => this.blockBlobContainer;
public CloudBlobContainer PageBlobContainer => this.pageBlobContainer;
public int PartitionId => (int) this.partitionId;
public int PartitionId => (int)this.partitionId;
public IPartitionErrorHandler PartitionErrorHandler { get; private set; }
@ -119,8 +119,8 @@ namespace DurableTask.Netherite.Faster
static readonly int[] StorageFormatVersion = new int[] {
1, //initial version
2, //0.7.0-beta changed singleton storage, and adds dequeue count
3, //changed naming of files
};
3, //changed organization of files
};
public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings)
{
@ -128,8 +128,8 @@ namespace DurableTask.Netherite.Faster
{
UseAlternateObjectStore = settings.UseAlternateObjectStore,
FormatVersion = StorageFormatVersion.Last(),
},
serializerSettings);
},
serializerSettings);
}
[JsonObject]
@ -142,12 +142,12 @@ namespace DurableTask.Netherite.Faster
// the following can be changed between versions
[JsonProperty("UseAlternateObjectStore", DefaultValueHandling=DefaultValueHandling.Ignore)]
[JsonProperty("UseAlternateObjectStore", DefaultValueHandling = DefaultValueHandling.Ignore)]
public bool? UseAlternateObjectStore { get; set; }
}
static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
{
{
TypeNameHandling = TypeNameHandling.None,
MissingMemberHandling = MissingMemberHandling.Ignore,
CheckAdditionalContent = false,
@ -169,7 +169,7 @@ namespace DurableTask.Netherite.Faster
throw new InvalidOperationException($"The current storage format version (={StorageFormatVersion.Last()}) is incompatible with the existing taskhub (={taskhubFormat.FormatVersion}).");
}
}
catch(Exception e)
catch (Exception e)
{
throw new InvalidOperationException("The taskhub has an incompatible storage format", e);
}
@ -190,7 +190,7 @@ namespace DurableTask.Netherite.Faster
throw new NotImplementedException("Purges are handled directly on recovery, not via FASTER");
}
public void OnRecovery(Guid indexToken, Guid logToken)
public void OnRecovery(Guid indexToken, Guid logToken)
{
// we handle cleanup of old checkpoints somewhere else
}
@ -284,7 +284,7 @@ namespace DurableTask.Netherite.Faster
{
this.LocalCheckpointManager = new LocalFileCheckpointManager(
this.CheckpointInfo,
this.LocalCheckpointDirectoryPath,
this.LocalCheckpointDirectoryPath,
this.GetCheckpointCompletedBlobName());
}
@ -459,7 +459,7 @@ namespace DurableTask.Netherite.Faster
if (pageBlobAccount != account)
{
await DeleteContainerContents(pageBlobAccount);
}
}
}
}
@ -717,23 +717,154 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper.LeaseProgress("Blob manager stopped");
}
public async Task RemoveObsoleteCheckpoints()
{
if (this.UseLocalFiles)
{
//TODO
return;
}
else
{
this.TraceHelper.FasterProgress("Removing obsolete checkpoints");
var tasks = new List<Task<(int,int)>>();
tasks.Add(RemoveObsoleteCheckpoints(this.blockBlobPartitionDirectory.GetDirectoryReference(cprCheckpointPrefix)));
tasks.Add(RemoveObsoleteCheckpoints(this.blockBlobPartitionDirectory.GetDirectoryReference(indexCheckpointPrefix)));
if (this.pageBlobPartitionDirectory != this.blockBlobPartitionDirectory)
{
tasks.Add(RemoveObsoleteCheckpoints(this.pageBlobPartitionDirectory.GetDirectoryReference(cprCheckpointPrefix)));
tasks.Add(RemoveObsoleteCheckpoints(this.pageBlobPartitionDirectory.GetDirectoryReference(indexCheckpointPrefix)));
}
await Task.WhenAll(tasks);
this.TraceHelper.FasterProgress($"Removed {tasks.Select(t => t.Result.Item1).Sum()} checkpoint directories containing {tasks.Select(t => t.Result.Item2).Sum()} blobs");
async Task<(int,int)> RemoveObsoleteCheckpoints(CloudBlobDirectory directory)
{
IEnumerable<IListBlobItem> results = null;
await this.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageWriteMaxConcurrency,
true,
"CloudBlobDirectory.ListBlobsSegmentedAsync",
"RemoveObsoleteCheckpoints",
"",
directory.Prefix,
1000,
false,
async (numAttempts) =>
{
var response = await directory.ListBlobsSegmentedAsync(
useFlatBlobListing: false,
BlobListingDetails.None, 5, null, null, null);
results = response.Results.ToList();
return results.Count();
});
string postFix1 = $"{cprCheckpointPrefix}{this.CheckpointInfo.LogToken.ToString()}/";
string postFix2 = $"{indexCheckpointPrefix}{this.CheckpointInfo.IndexToken.ToString()}/";
var deletionTasks = new List<Task<int>>();
foreach (var item in results)
{
if (item is CloudBlobDirectory cloudBlobDirectory)
{
if (!cloudBlobDirectory.Prefix.EndsWith(postFix1)
&& !cloudBlobDirectory.Prefix.EndsWith(postFix2))
{
deletionTasks.Add(DeleteCheckpointDirectory(cloudBlobDirectory));
}
}
}
await Task.WhenAll(deletionTasks);
return (deletionTasks.Count, deletionTasks.Select(t => t.Result).Sum());
}
async Task<int> DeleteCheckpointDirectory(CloudBlobDirectory directory)
{
BlobContinuationToken continuationToken = null;
var deletionTasks = new List<Task>();
int count = 0;
do
{
BlobResultSegment listingResult = null;
await this.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageWriteMaxConcurrency,
false,
"CloudBlobDirectory.ListBlobsSegmentedAsync",
"DeleteCheckpointDirectory",
"",
directory.Prefix,
1000,
false,
async (numAttempts) =>
{
var response = await directory.ListBlobsSegmentedAsync(
useFlatBlobListing: true,
BlobListingDetails.None, 5, continuationToken, null, null);
listingResult = response;
return listingResult.Results.Count();
});
continuationToken = listingResult.ContinuationToken;
foreach (var item in listingResult.Results)
{
if (item is CloudBlob blob)
{
count++;
deletionTasks.Add(
this.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageWriteMaxConcurrency,
false,
"BlobUtils.ForceDeleteAsync",
"DeleteCheckpointDirectory",
"",
blob.Name,
1000,
false,
async (numAttempts) => (await BlobUtils.ForceDeleteAsync(blob) ? 1 : 0)));
}
}
await Task.WhenAll(deletionTasks);
}
while (continuationToken != null);
return count;
}
}
}
#region Blob Name Management
string GetCheckpointCompletedBlobName() => $"last-checkpoint.json";
string GetCheckpointCompletedBlobName() => "last-checkpoint.json";
string GetIndexCheckpointMetaBlobName(Guid token) => $"index-checkpoints/{token}/info.dat";
const string indexCheckpointPrefix = "index-checkpoints/";
(string, string) GetPrimaryHashTableBlobName(Guid token) => ($"index-checkpoints/{token}", "ht.dat");
const string cprCheckpointPrefix = "cpr-checkpoints/";
string GetHybridLogCheckpointMetaBlobName(Guid token) => $"cpr-checkpoints/{token}/info.dat";
string GetIndexCheckpointMetaBlobName(Guid token) => $"{indexCheckpointPrefix}{token}/info.dat";
(string, string) GetLogSnapshotBlobName(Guid token) => ($"cpr-checkpoints/{token}", "snapshot.dat");
(string, string) GetPrimaryHashTableBlobName(Guid token) => ($"{indexCheckpointPrefix}{token}", "ht.dat");
(string, string) GetObjectLogSnapshotBlobName(Guid token) => ($"cpr-checkpoints/{token}", "snapshot.obj.dat");
string GetHybridLogCheckpointMetaBlobName(Guid token) => $"{cprCheckpointPrefix}{token}/info.dat";
(string, string) GetDeltaLogSnapshotBlobName(Guid token) => ($"cpr-checkpoints/{token}", "snapshot.delta.dat");
(string, string) GetLogSnapshotBlobName(Guid token) => ($"{cprCheckpointPrefix}{token}", "snapshot.dat");
string GetSingletonsSnapshotBlobName(Guid token) => $"cpr-checkpoints/{token}/singletons.dat";
(string, string) GetObjectLogSnapshotBlobName(Guid token) => ($"{cprCheckpointPrefix}{token}", "snapshot.obj.dat");
(string, string) GetDeltaLogSnapshotBlobName(Guid token) => ($"{cprCheckpointPrefix}{token}", "snapshot.delta.dat");
string GetSingletonsSnapshotBlobName(Guid token) => $"{cprCheckpointPrefix}{token}/singletons.dat";
#endregion

Просмотреть файл

@ -151,6 +151,12 @@ namespace DurableTask.Netherite.Faster
return true;
}
public override Task RemoveObsoleteCheckpoints()
{
//TODO
return Task.CompletedTask;
}
public async override ValueTask CompleteCheckpointAsync()
{
await this.checkpointTask.ConfigureAwait(false);

Просмотреть файл

@ -178,6 +178,11 @@ namespace DurableTask.Netherite.Faster
}
}
public override async Task RemoveObsoleteCheckpoints()
{
await this.blobManager.RemoveObsoleteCheckpoints();
}
public async static Task RunOnDedicatedThreadAsync(Func<Task> asyncAction)
{
Task<Task> tasktask = new Task<Task>(() => asyncAction());

Просмотреть файл

@ -420,6 +420,8 @@ namespace DurableTask.Netherite.Faster
this.traceHelper.FasterCheckpointPersisted(checkpointToken, description, commitLogPosition, inputQueuePosition, stopwatch.ElapsedMilliseconds);
await this.store.RemoveObsoleteCheckpoints();
this.Notify();
return (commitLogPosition, inputQueuePosition);
}

Просмотреть файл

@ -23,6 +23,8 @@ namespace DurableTask.Netherite.Faster
public abstract bool TakeFullCheckpoint(long commitLogPosition, long inputQueuePosition, out Guid checkpointGuid);
public abstract Task RemoveObsoleteCheckpoints();
public abstract Guid? StartIndexCheckpoint();
public abstract Guid? StartStoreCheckpoint(long commitLogPosition, long inputQueuePosition);