use path prefix to solve atomicity of taskhub creation/deletion (#118)

This commit is contained in:
Sebastian Burckhardt 2022-01-25 13:20:28 -08:00 коммит произвёл GitHub
Родитель a9e6dbca23
Коммит ba56fa4b23
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 180 добавлений и 26 удалений

Просмотреть файл

@ -3,6 +3,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Threading.Tasks;
/// <summary>
@ -20,6 +21,6 @@ namespace DurableTask.Netherite
/// Deletes all partition states.
/// </summary>
/// <returns></returns>
Task DeleteAllPartitionStatesAsync();
Task DeleteTaskhubAsync(string pathPrefix);
}
}

Просмотреть файл

@ -25,6 +25,11 @@ namespace DurableTask.Netherite
/// </summary>
uint NumberPartitions { set; }
/// <summary>
/// Assigned by the transport backend to inform the host about the file/blob paths for the partitition state in storage.
/// </summary>
string PathPrefix { set; }
/// <summary>
/// Returns the storage provider for storing the partition states.
/// </summary>

Просмотреть файл

@ -64,6 +64,8 @@ namespace DurableTask.Netherite
internal NetheriteOrchestrationServiceSettings Settings { get; private set; }
internal uint NumberPartitions { get; private set; }
uint TransportAbstraction.IHost.NumberPartitions { set => this.NumberPartitions = value; }
internal string PathPrefix { get; private set; }
string TransportAbstraction.IHost.PathPrefix { set => this.PathPrefix = value; }
internal string StorageAccountName { get; private set; }
internal WorkItemQueue<ActivityWorkItem> ActivityWorkItemQueue { get; private set; }
@ -207,14 +209,20 @@ namespace DurableTask.Netherite
return new MemoryStorage(this.TraceHelper.Logger);
case TransportConnectionString.StorageChoices.Faster:
return new Faster.FasterStorage(this.Settings.ResolvedStorageConnectionString, this.Settings.ResolvedPageBlobStorageConnectionString, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName, this.LoggerFactory);
return new Faster.FasterStorage(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedPageBlobStorageConnectionString,
this.Settings.UseLocalDirectoryForPartitionStorage,
this.Settings.HubName,
this.PathPrefix,
this.LoggerFactory);
default:
throw new NotImplementedException("no such storage choice");
}
}
async Task IStorageProvider.DeleteAllPartitionStatesAsync()
async Task IStorageProvider.DeleteTaskhubAsync(string pathPrefix)
{
if (!(this.LoadMonitorService is null))
await this.LoadMonitorService.DeleteIfExistsAsync(CancellationToken.None).ConfigureAwait(false);
@ -230,7 +238,8 @@ namespace DurableTask.Netherite
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedPageBlobStorageConnectionString,
this.Settings.UseLocalDirectoryForPartitionStorage,
this.Settings.HubName).ConfigureAwait(false);
this.Settings.HubName,
pathPrefix).ConfigureAwait(false);
break;
default:

Просмотреть файл

@ -29,6 +29,7 @@ namespace DurableTask.Netherite.Faster
readonly CancellationTokenSource shutDownOrTermination;
readonly CloudStorageAccount cloudStorageAccount;
readonly CloudStorageAccount pageBlobAccount;
readonly string taskHubPrefix;
readonly CloudBlobContainer blockBlobContainer;
readonly CloudBlobContainer pageBlobContainer;
@ -118,6 +119,7 @@ namespace DurableTask.Netherite.Faster
static readonly int[] StorageFormatVersion = new int[] {
1, //initial version
2, //0.7.0-beta changed singleton storage, and adds dequeue count
3, //changed naming of files
};
public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings)
@ -245,6 +247,7 @@ namespace DurableTask.Netherite.Faster
CloudStorageAccount pageBlobAccount,
string localFilePath,
string taskHubName,
string taskHubPrefix,
FaultInjector faultInjector,
ILogger logger,
Microsoft.Extensions.Logging.LogLevel logLevelLimit,
@ -256,6 +259,7 @@ namespace DurableTask.Netherite.Faster
this.UseLocalFiles = (localFilePath != null);
this.LocalFileDirectoryForTestingAndDebugging = localFilePath;
this.ContainerName = GetContainerName(taskHubName);
this.taskHubPrefix = taskHubPrefix;
this.FaultInjector = faultInjector;
this.partitionId = partitionId;
this.CheckpointInfo = new CheckpointInfo();
@ -289,7 +293,7 @@ namespace DurableTask.Netherite.Faster
this.shutDownOrTermination = CancellationTokenSource.CreateLinkedTokenSource(errorHandler.Token);
}
string PartitionFolderName => $"p{this.partitionId:D2}";
string PartitionFolderName => $"{this.taskHubPrefix}p{this.partitionId:D2}";
string PsfGroupFolderName(int groupOrdinal) => $"psfgroup.{groupOrdinal:D3}";
// For testing and debugging with local files
@ -400,13 +404,13 @@ namespace DurableTask.Netherite.Faster
await this.LeaseMaintenanceLoopTask; // wait for loop to terminate cleanly
}
public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, CloudStorageAccount pageBlobAccount, string localFileDirectoryPath, string taskHubName)
public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, CloudStorageAccount pageBlobAccount, string localFileDirectoryPath, string taskHubName, string pathPrefix)
{
var containerName = GetContainerName(taskHubName);
if (!string.IsNullOrEmpty(localFileDirectoryPath))
{
DirectoryInfo di = new DirectoryInfo($"{localFileDirectoryPath}\\{containerName}");
DirectoryInfo di = new DirectoryInfo($"{localFileDirectoryPath}\\{containerName}"); //TODO fine-grained deletion
if (di.Exists)
{
di.Delete(true);
@ -421,16 +425,33 @@ namespace DurableTask.Netherite.Faster
if (await blobContainer.ExistsAsync())
{
// do a complete deletion of all contents of this directory
var tasks = blobContainer.ListBlobs(null, true)
.Where(blob => blob.GetType() == typeof(CloudBlob) || blob.GetType().BaseType == typeof(CloudBlob))
.Select(blob => BlobUtils.ForceDeleteAsync((CloudBlob)blob))
.ToArray();
await Task.WhenAll(tasks);
BlobContinuationToken continuationToken = null;
var deletionTasks = new List<Task>();
do
{
var listingResult = await blobContainer.ListBlobsSegmentedAsync(
pathPrefix,
useFlatBlobListing: true,
BlobListingDetails.None, 50, continuationToken, null, null);
continuationToken = listingResult.ContinuationToken;
foreach (var result in listingResult.Results)
{
if (result is CloudBlob blob)
{
deletionTasks.Add(BlobUtils.ForceDeleteAsync(blob));
}
}
await Task.WhenAll(deletionTasks);
}
while (continuationToken != null);
}
// We are not deleting the container itself because it creates problems when trying to recreate
// the same container soon afterwards so we leave an empty container behind. Oh well.
// the same container.
}
await DeleteContainerContents(account);

Просмотреть файл

@ -18,6 +18,7 @@ namespace DurableTask.Netherite.Faster
readonly string localFileDirectory;
readonly CloudStorageAccount pageBlobStorageAccount;
readonly string taskHubName;
readonly string pathPrefix;
readonly ILogger logger;
Partition partition;
@ -32,7 +33,7 @@ namespace DurableTask.Netherite.Faster
internal FasterTraceHelper TraceHelper { get; private set; }
public FasterStorage(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName, ILoggerFactory loggerFactory)
public FasterStorage(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName, string pathPrefix, ILoggerFactory loggerFactory)
{
if (!string.IsNullOrEmpty(localFileDirectory))
{
@ -51,14 +52,15 @@ namespace DurableTask.Netherite.Faster
this.pageBlobStorageAccount = this.storageAccount;
}
this.taskHubName = taskHubName;
this.pathPrefix = pathPrefix;
this.logger = loggerFactory.CreateLogger($"{NetheriteOrchestrationService.LoggerCategoryName}.FasterStorage");
}
public static Task DeleteTaskhubStorageAsync(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName)
public static Task DeleteTaskhubStorageAsync(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName, string pathPrefix)
{
var storageAccount = string.IsNullOrEmpty(connectionString) ? null : CloudStorageAccount.Parse(connectionString);
var pageBlobAccount = string.IsNullOrEmpty(pageBlobConnectionString) ? storageAccount : CloudStorageAccount.Parse(pageBlobConnectionString);
return BlobManager.DeleteTaskhubStorageAsync(storageAccount, pageBlobAccount, localFileDirectory, taskHubName);
return BlobManager.DeleteTaskhubStorageAsync(storageAccount, pageBlobAccount, localFileDirectory, taskHubName, pathPrefix);
}
async Task<T> TerminationWrapper<T>(Task<T> what)
@ -90,6 +92,7 @@ namespace DurableTask.Netherite.Faster
this.pageBlobStorageAccount,
this.localFileDirectory,
this.taskHubName,
this.pathPrefix,
partition.Settings.TestHooks?.FaultInjector,
this.logger,
this.partition.Settings.StorageLogLevelLimit,

Просмотреть файл

@ -75,6 +75,10 @@ namespace DurableTask.Netherite.EventHubs
public static string ClientConsumerGroup = "$Default";
public static string LoadMonitorConsumerGroup = "$Default";
// the path prefix is used to prevent some issues (races, partial deletions) when recreating a taskhub of the same name
// since it is a rare circumstance, taking six characters of the Guid is unique enough
public static string TaskhubPathPrefix(Guid taskhubGuid) => $"{taskhubGuid.ToString()}/";
static string GetContainerName(string taskHubName) => taskHubName.ToLowerInvariant() + "-storage";
async Task<TaskhubParameters> TryLoadExistingTaskhubAsync()
@ -169,13 +173,17 @@ namespace DurableTask.Netherite.EventHubs
async Task DeleteAsync()
{
if (await this.taskhubParameters.ExistsAsync())
{
await BlobUtils.ForceDeleteAsync(this.taskhubParameters);
}
var parameters = await this.TryLoadExistingTaskhubAsync();
// todo delete consumption checkpoints
await this.host.StorageProvider.DeleteAllPartitionStatesAsync();
if (parameters != null)
{
// first, delete the parameters file which deletes the taskhub logically
await BlobUtils.ForceDeleteAsync(this.taskhubParameters);
// delete all the files/blobs in the directory/container that represents this taskhub
// If this does not complete successfully, some garbage may be left behind.
await this.host.StorageProvider.DeleteTaskhubAsync(TaskhubPathPrefix(parameters.TaskhubGuid));
}
}
async Task StartAsync()
@ -197,6 +205,7 @@ namespace DurableTask.Netherite.EventHubs
BlobManager.CheckStorageFormat(this.parameters.StorageFormat, this.settings);
this.host.NumberPartitions = (uint)this.parameters.StartPositions.Length;
this.host.PathPrefix = TaskhubPathPrefix(this.parameters.TaskhubGuid);
this.connections = new EventHubsConnections(this.settings.ResolvedTransportConnectionString, this.parameters.PartitionHubs, this.parameters.ClientHubs, LoadMonitorHub)
{
@ -268,7 +277,8 @@ namespace DurableTask.Netherite.EventHubs
EventHubsTransport.PartitionConsumerGroup,
this.settings.ResolvedTransportConnectionString,
this.settings.ResolvedStorageConnectionString,
this.cloudBlobContainer.Name);
this.cloudBlobContainer.Name,
$"{TaskhubPathPrefix(this.parameters.TaskhubGuid)}eh-checkpoints/{(PartitionHubs[0])}");
var processorOptions = new EventProcessorOptions()
{
@ -327,7 +337,7 @@ namespace DurableTask.Netherite.EventHubs
this.settings.ResolvedTransportConnectionString,
this.settings.ResolvedStorageConnectionString,
this.cloudBlobContainer.Name,
LoadMonitorHub);
$"{TaskhubPathPrefix(this.parameters.TaskhubGuid)}eh-checkpoints/{LoadMonitorHub}");
var processorOptions = new EventProcessorOptions()
{

Просмотреть файл

@ -61,7 +61,7 @@ namespace DurableTask.Netherite.Emulated
Task ITaskHub.DeleteAsync()
{
this.clientQueues = null;
return this.host.StorageProvider.DeleteAllPartitionStatesAsync();
return this.host.StorageProvider.DeleteTaskhubAsync("");
}
async Task<bool> ITaskHub.ExistsAsync()

Просмотреть файл

@ -0,0 +1,105 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.Tests
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Xunit;
using Xunit.Abstractions;
[Collection("NetheriteTests")]
[Trait("AnyTransport", "true")]
public class TaskhubTests : IDisposable
{
readonly SingleHostFixture.TestTraceListener traceListener;
readonly ILoggerFactory loggerFactory;
readonly XunitLoggerProvider provider;
readonly Action<string> output;
ITestOutputHelper outputHelper;
public TaskhubTests(ITestOutputHelper outputHelper)
{
this.outputHelper = outputHelper;
this.output = (string message) => this.outputHelper?.WriteLine(message);
this.loggerFactory = new LoggerFactory();
this.provider = new XunitLoggerProvider();
this.loggerFactory.AddProvider(this.provider);
this.traceListener = new SingleHostFixture.TestTraceListener();
Trace.Listeners.Add(this.traceListener);
this.traceListener.Output = this.output;
}
public void Dispose()
{
this.outputHelper = null;
Trace.Listeners.Remove(this.traceListener);
}
/// <summary>
/// Create a taskhub, delete it, and create it again.
/// </summary>
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task CreateDeleteCreate(bool deleteTwice)
{
var settings = TestConstants.GetNetheriteOrchestrationServiceSettings();
settings.HubName = $"{nameof(TaskhubTests)}-{Guid.NewGuid()}";
{
// start the service
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
var orchestrationService = (IOrchestrationService)service;
var orchestrationServiceClient = (IOrchestrationServiceQueryClient)service;
await orchestrationService.CreateAsync();
await orchestrationService.StartAsync();
var host = (TransportAbstraction.IHost)service;
var client = new TaskHubClient(service);
// run a query
var states = await orchestrationServiceClient.GetAllOrchestrationStatesAsync(CancellationToken.None);
Assert.Empty(states);
// stop and delete the service
await orchestrationService.StopAsync();
await orchestrationService.DeleteAsync();
if (deleteTwice)
{
// delete again, should be idempotent
await orchestrationService.DeleteAsync();
}
}
{
// run the service a second time
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
var orchestrationService = (IOrchestrationService)service;
var orchestrationServiceClient = (IOrchestrationServiceQueryClient)service;
await orchestrationService.CreateAsync();
await orchestrationService.StartAsync();
var host = (TransportAbstraction.IHost)service;
var client = new TaskHubClient(service);
// run a query
var states = await orchestrationServiceClient.GetAllOrchestrationStatesAsync(CancellationToken.None);
Assert.Empty(states);
// stop and delete the service again
await orchestrationService.StopAsync();
await orchestrationService.DeleteAsync();
}
}
}
}