* revisit the mechanism for specifying alternate page blob storage: use more descriptive names, and implement deletion

* fix missing code.

* fix bug

* fix checkpoint devices so they use page blob storage as well.

(cherry picked from commit 1ea23e69899798e61f54eeba9dfeacbac2d4c483)
This commit is contained in:
Sebastian Burckhardt 2021-06-24 05:44:25 -07:00 коммит произвёл GitHub
Родитель fbe045449d
Коммит 6621e9a5a3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 111 добавлений и 57 удалений

Просмотреть файл

@ -184,7 +184,7 @@ namespace DurableTask.Netherite
return new MemoryStorage(this.Logger);
case TransportConnectionString.StorageChoices.Faster:
return new Faster.FasterStorage(this.Settings.ResolvedStorageConnectionString, this.Settings.PremiumStorageConnectionName, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName, this.LoggerFactory);
return new Faster.FasterStorage(this.Settings.ResolvedStorageConnectionString, this.Settings.ResolvedPageBlobStorageConnectionString, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName, this.LoggerFactory);
default:
throw new NotImplementedException("no such storage choice");
@ -203,7 +203,11 @@ namespace DurableTask.Netherite
break;
case TransportConnectionString.StorageChoices.Faster:
await Faster.FasterStorage.DeleteTaskhubStorageAsync(this.Settings.ResolvedStorageConnectionString, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName).ConfigureAwait(false);
await Faster.FasterStorage.DeleteTaskhubStorageAsync(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedPageBlobStorageConnectionString,
this.Settings.UseLocalDirectoryForPartitionStorage,
this.Settings.HubName).ConfigureAwait(false);
break;
default:

Просмотреть файл

@ -161,12 +161,18 @@ namespace DurableTask.Netherite
public int PackPartitionTaskMessages { get; set; } = 100;
/// <summary>
/// Gets or sets the name used for resolving the premium Azure storage connection string, if used.
/// A name for resolving a storage connection string to be used specifically for the page blobs, or null if page blobs are to be stored in the default account.
/// </summary>
public string PremiumStorageConnectionName { get; set; } = null;
public string PageBlobStorageConnectionName { get; set; } = null;
/// <summary>
/// The resolved page blob storage connection string, or null if page blobs are to be stored in the default account. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
public string ResolvedPageBlobStorageConnectionString { get; set; }
[JsonIgnore]
internal bool UsePremiumStorage => !string.IsNullOrEmpty(this.PremiumStorageConnectionName);
internal bool UseSeparatePageBlobStorage => !string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString);
/// <summary>
/// A lower limit on the severity level of trace events emitted by the transport layer.
@ -224,6 +230,17 @@ namespace DurableTask.Netherite
}
}
if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString)
&& !string.IsNullOrEmpty(this.PageBlobStorageConnectionName))
{
this.ResolvedPageBlobStorageConnectionString = connectionStringResolver(this.PageBlobStorageConnectionName);
if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString))
{
throw new InvalidOperationException($"Could not resolve {nameof(this.PageBlobStorageConnectionName)}:{this.PageBlobStorageConnectionName} for Netherite storage provider.");
}
}
if (string.IsNullOrEmpty(this.ResolvedTransportConnectionString))
{
if (string.IsNullOrEmpty(this.EventHubsConnectionName))

Просмотреть файл

@ -28,7 +28,7 @@ namespace DurableTask.Netherite.Faster
readonly uint partitionId;
readonly CancellationTokenSource shutDownOrTermination;
readonly CloudStorageAccount cloudStorageAccount;
readonly CloudStorageAccount secondaryStorageAccount;
readonly CloudStorageAccount pageBlobAccount;
readonly CloudBlobContainer blockBlobContainer;
readonly CloudBlobContainer pageBlobContainer;
@ -70,7 +70,7 @@ namespace DurableTask.Netherite.Faster
internal const long HashTableSize = 1L << 14; // 16 k buckets, 1 GB
//internal const long HashTableSize = 1L << 14; // 8 M buckets, 512 GB
public FasterLogSettings EventLogSettings(bool usePremiumStorage) => new FasterLogSettings
public FasterLogSettings EventLogSettings(bool useSeparatePageBlobStorage) => new FasterLogSettings
{
LogDevice = this.EventLogDevice,
LogCommitManager = this.UseLocalFiles
@ -78,20 +78,20 @@ namespace DurableTask.Netherite.Faster
: (ILogCommitManager)this,
PageSizeBits = 21, // 2MB
SegmentSizeBits =
usePremiumStorage ? 35 // 32 GB
: 30, // 1 GB
useSeparatePageBlobStorage ? 35 // 32 GB
: 30, // 1 GB
MemorySizeBits = 22, // 2MB
};
public LogSettings StoreLogSettings(bool usePremiumStorage, uint numPartitions) => new LogSettings
public LogSettings StoreLogSettings(bool useSeparatePageBlobStorage, uint numPartitions) => new LogSettings
{
LogDevice = this.HybridLogDevice,
ObjectLogDevice = this.ObjectLogDevice,
PageSizeBits = 17, // 128kB
MutableFraction = 0.9,
SegmentSizeBits =
usePremiumStorage ? 35 // 32 GB
: 32, // 4 GB
useSeparatePageBlobStorage ? 35 // 32 GB
: 32, // 4 GB
CopyReadsToTail = true,
MemorySizeBits =
(numPartitions <= 1) ? 25 : // 32MB
@ -204,8 +204,8 @@ namespace DurableTask.Netherite.Faster
=> TimeSpan.FromSeconds(Math.Pow(2, (numAttempts - 1)));
// For tests only; TODO consider adding PSFs
internal BlobManager(CloudStorageAccount storageAccount, CloudStorageAccount secondaryStorageAccount, string localFileDirectory, string taskHubName, ILogger logger, Microsoft.Extensions.Logging.LogLevel logLevelLimit, uint partitionId, IPartitionErrorHandler errorHandler)
: this(storageAccount, secondaryStorageAccount, localFileDirectory, taskHubName, logger, logLevelLimit, partitionId, errorHandler, 0)
internal BlobManager(CloudStorageAccount storageAccount, CloudStorageAccount pageBlobAccount, string localFileDirectory, string taskHubName, ILogger logger, Microsoft.Extensions.Logging.LogLevel logLevelLimit, uint partitionId, IPartitionErrorHandler errorHandler)
: this(storageAccount, pageBlobAccount, localFileDirectory, taskHubName, logger, logLevelLimit, partitionId, errorHandler, 0)
{
}
@ -213,7 +213,7 @@ namespace DurableTask.Netherite.Faster
/// Create a blob manager.
/// </summary>
/// <param name="storageAccount">The cloud storage account, or null if using local file paths</param>
/// <param name="secondaryStorageAccount">Optionally, a secondary cloud storage accounts</param>
/// <param name="pageBlobAccount">The storage account to use for page blobs</param>
/// <param name="localFilePath">The local file path, or null if using cloud storage</param>
/// <param name="taskHubName">The name of the taskhub</param>
/// <param name="logger">A logger for logging</param>
@ -223,7 +223,7 @@ namespace DurableTask.Netherite.Faster
/// <param name="psfGroupCount">Number of PSF groups to be created in FASTER</param>
public BlobManager(
CloudStorageAccount storageAccount,
CloudStorageAccount secondaryStorageAccount,
CloudStorageAccount pageBlobAccount,
string localFilePath,
string taskHubName,
ILogger logger,
@ -232,7 +232,7 @@ namespace DurableTask.Netherite.Faster
int psfGroupCount)
{
this.cloudStorageAccount = storageAccount;
this.secondaryStorageAccount = secondaryStorageAccount;
this.pageBlobAccount = pageBlobAccount;
this.UseLocalFiles = (localFilePath != null);
this.LocalFileDirectoryForTestingAndDebugging = localFilePath;
this.ContainerName = GetContainerName(taskHubName);
@ -244,8 +244,16 @@ namespace DurableTask.Netherite.Faster
{
CloudBlobClient serviceClient = this.cloudStorageAccount.CreateCloudBlobClient();
this.blockBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
serviceClient = this.secondaryStorageAccount.CreateCloudBlobClient();
this.pageBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
if (pageBlobAccount == storageAccount)
{
this.pageBlobContainer = this.BlockBlobContainer;
}
else
{
serviceClient = this.pageBlobAccount.CreateCloudBlobClient();
this.pageBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
}
}
else
{
@ -303,10 +311,18 @@ namespace DurableTask.Netherite.Faster
else
{
await this.blockBlobContainer.CreateIfNotExistsAsync();
await this.pageBlobContainer.CreateIfNotExistsAsync();
this.pageBlobPartitionDirectory = this.pageBlobContainer.GetDirectoryReference(this.PartitionFolderName);
this.blockBlobPartitionDirectory = this.blockBlobContainer.GetDirectoryReference(this.PartitionFolderName);
if (this.pageBlobContainer == this.blockBlobContainer)
{
this.pageBlobPartitionDirectory = this.blockBlobPartitionDirectory;
}
else
{
await this.pageBlobContainer.CreateIfNotExistsAsync();
this.pageBlobPartitionDirectory = this.pageBlobContainer.GetDirectoryReference(this.PartitionFolderName);
}
this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobReference(CommitBlobName);
AzureStorageDevice createDevice(string name) =>
@ -317,8 +333,9 @@ namespace DurableTask.Netherite.Faster
var objectLogDevice = createDevice(ObjectLogBlobName);
var psfLogDevices = (from groupOrdinal in Enumerable.Range(0, this.PsfGroupCount)
let psfDirectory = this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(groupOrdinal))
select new AzureStorageDevice(PsfHybridLogBlobName, psfDirectory.GetDirectoryReference(PsfHybridLogBlobName), psfDirectory.GetDirectoryReference(PsfHybridLogBlobName), this, true)).ToArray();
let psfblockDirectory = this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(groupOrdinal))
let psfpageDirectory = this.pageBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(groupOrdinal))
select new AzureStorageDevice(PsfHybridLogBlobName, psfblockDirectory.GetDirectoryReference(PsfHybridLogBlobName), psfpageDirectory.GetDirectoryReference(PsfHybridLogBlobName), this, true)).ToArray();
await this.AcquireOwnership();
@ -362,7 +379,7 @@ namespace DurableTask.Netherite.Faster
await this.LeaseMaintenanceLoopTask; // wait for loop to terminate cleanly
}
public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, string localFileDirectoryPath, string taskHubName)
public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, CloudStorageAccount pageBlobAccount, string localFileDirectoryPath, string taskHubName)
{
var containerName = GetContainerName(taskHubName);
@ -376,21 +393,31 @@ namespace DurableTask.Netherite.Faster
}
else
{
CloudBlobClient serviceClient = account.CreateCloudBlobClient();
var blobContainer = serviceClient.GetContainerReference(containerName);
if (await blobContainer.ExistsAsync())
async Task DeleteContainerContents(CloudStorageAccount account)
{
// do a complete deletion of all contents of this directory
var tasks = blobContainer.ListBlobs(null, true)
.Where(blob => blob.GetType() == typeof(CloudBlob) || blob.GetType().BaseType == typeof(CloudBlob))
.Select(blob => BlobUtils.ForceDeleteAsync((CloudBlob)blob))
.ToArray();
await Task.WhenAll(tasks);
CloudBlobClient serviceClient = account.CreateCloudBlobClient();
var blobContainer = serviceClient.GetContainerReference(containerName);
if (await blobContainer.ExistsAsync())
{
// do a complete deletion of all contents of this directory
var tasks = blobContainer.ListBlobs(null, true)
.Where(blob => blob.GetType() == typeof(CloudBlob) || blob.GetType().BaseType == typeof(CloudBlob))
.Select(blob => BlobUtils.ForceDeleteAsync((CloudBlob)blob))
.ToArray();
await Task.WhenAll(tasks);
}
// We are not deleting the container itself because it creates problems when trying to recreate
// the same container soon afterwards so we leave an empty container behind. Oh well.
}
// We are not deleting the container itself because it creates problems when trying to recreate
// the same container soon afterwards so we leave an empty container behind. Oh well.
await DeleteContainerContents(account);
if (pageBlobAccount != account)
{
await DeleteContainerContents(pageBlobAccount);
}
}
}
@ -975,16 +1002,23 @@ namespace DurableTask.Netherite.Faster
return result;
}
void GetPartitionDirectories(bool isPsf, int psfGroupOrdinal, string path, out CloudBlobDirectory blockBlobDir, out CloudBlobDirectory pageBlobDir)
{
var blockPartDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
blockBlobDir = blockPartDir.GetDirectoryReference(path);
var pagePartDir = isPsf ? this.pageBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.pageBlobPartitionDirectory;
pageBlobDir = pagePartDir.GetDirectoryReference(path);
}
internal IDevice GetIndexDevice(Guid indexToken, int psfGroupOrdinal)
{
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexDevice Called on {tag}, indexToken={indexToken}");
var (path, blobName) = this.GetPrimaryHashTableBlobName(indexToken);
var partDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
var blobDirectory = partDir.GetDirectoryReference(path);
var device = new AzureStorageDevice(blobName, blobDirectory, blobDirectory, this, false); // we don't need a lease since the token provides isolation
this.GetPartitionDirectories(isPsf, psfGroupOrdinal, path, out var blockBlobDir, out var pageBlobDir);
var device = new AzureStorageDevice(blobName, blockBlobDir, pageBlobDir, this, false); // we don't need a lease since the token provides isolation
device.StartAsync().Wait();
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexDevice Returned from {tag}, target={blobDirectory.Prefix}{blobName}");
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexDevice Returned from {tag}, target={blockBlobDir.Prefix}{blobName}");
return device;
}
@ -993,11 +1027,10 @@ namespace DurableTask.Netherite.Faster
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotLogDevice Called on {tag}, token={token}");
var (path, blobName) = this.GetLogSnapshotBlobName(token);
var partDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
var blobDirectory = partDir.GetDirectoryReference(path);
var device = new AzureStorageDevice(blobName, blobDirectory, blobDirectory, this, false); // we don't need a lease since the token provides isolation
this.GetPartitionDirectories(isPsf, psfGroupOrdinal, path, out var blockBlobDir, out var pageBlobDir);
var device = new AzureStorageDevice(blobName, blockBlobDir, pageBlobDir, this, false); // we don't need a lease since the token provides isolation
device.StartAsync().Wait();
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotLogDevice Returned from {tag}, blobDirectory={blobDirectory} blobName={blobName}");
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotLogDevice Returned from {tag}, blobDirectory={blockBlobDir} blobName={blobName}");
return device;
}
@ -1006,11 +1039,10 @@ namespace DurableTask.Netherite.Faster
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotObjectLogDevice Called on {tag}, token={token}");
var (path, blobName) = this.GetObjectLogSnapshotBlobName(token);
var partDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
var blobDirectory = partDir.GetDirectoryReference(path);
var device = new AzureStorageDevice(blobName, blobDirectory, blobDirectory, this, false); // we don't need a lease since the token provides isolation
this.GetPartitionDirectories(isPsf, psfGroupOrdinal, path, out var blockBlobDir, out var pageBlobDir);
var device = new AzureStorageDevice(blobName, blockBlobDir, pageBlobDir, this, false); // we don't need a lease since the token provides isolation
device.StartAsync().Wait();
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotObjectLogDevice Returned from {tag}, blobDirectory={blobDirectory} blobName={blobName}");
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotObjectLogDevice Returned from {tag}, blobDirectory={blockBlobDir} blobName={blobName}");
return device;
}

Просмотреть файл

@ -33,7 +33,7 @@ namespace DurableTask.Netherite.Faster
this.fht = new FasterKV<Key, Value>(
BlobManager.HashTableSize,
blobManager.StoreLogSettings(partition.Settings.UsePremiumStorage, partition.NumberPartitions()),
blobManager.StoreLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.NumberPartitions()),
blobManager.StoreCheckpointSettings,
new SerializerSettings<Key, Value>
{

Просмотреть файл

@ -16,7 +16,7 @@ namespace DurableTask.Netherite.Faster
public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings settings)
{
this.log = new FASTER.core.FasterLog(blobManager.EventLogSettings(settings.UsePremiumStorage));
this.log = new FASTER.core.FasterLog(blobManager.EventLogSettings(settings.UseSeparatePageBlobStorage));
this.terminationToken = blobManager.PartitionErrorHandler.Token;
var _ = this.terminationToken.Register(

Просмотреть файл

@ -31,7 +31,7 @@ namespace DurableTask.Netherite.Faster
internal FasterTraceHelper TraceHelper { get; private set; }
public FasterStorage(string connectionString, string premiumStorageConnectionString, string localFileDirectory, string taskHubName, ILoggerFactory loggerFactory)
public FasterStorage(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName, ILoggerFactory loggerFactory)
{
if (!string.IsNullOrEmpty(localFileDirectory))
{
@ -41,9 +41,9 @@ namespace DurableTask.Netherite.Faster
{
this.storageAccount = CloudStorageAccount.Parse(connectionString);
}
if (!string.IsNullOrEmpty(premiumStorageConnectionString))
if (pageBlobConnectionString != connectionString && !string.IsNullOrEmpty(pageBlobConnectionString))
{
this.pageBlobStorageAccount = CloudStorageAccount.Parse(premiumStorageConnectionString);
this.pageBlobStorageAccount = CloudStorageAccount.Parse(pageBlobConnectionString);
}
else
{
@ -53,10 +53,11 @@ namespace DurableTask.Netherite.Faster
this.logger = loggerFactory.CreateLogger($"{NetheriteOrchestrationService.LoggerCategoryName}.FasterStorage");
}
public static Task DeleteTaskhubStorageAsync(string connectionString, string localFileDirectory, string taskHubName)
public static Task DeleteTaskhubStorageAsync(string connectionString, string pageBlobConnectionString, string localFileDirectory, string taskHubName)
{
var storageAccount = string.IsNullOrEmpty(localFileDirectory) ? CloudStorageAccount.Parse(connectionString) : null;
return BlobManager.DeleteTaskhubStorageAsync(storageAccount, localFileDirectory, taskHubName);
var storageAccount = string.IsNullOrEmpty(connectionString) ? null : CloudStorageAccount.Parse(connectionString);
var pageBlobAccount = string.IsNullOrEmpty(pageBlobConnectionString) ? storageAccount : CloudStorageAccount.Parse(pageBlobConnectionString);
return BlobManager.DeleteTaskhubStorageAsync(storageAccount, pageBlobAccount, localFileDirectory, taskHubName);
}
public async Task<long> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, long firstInputQueuePosition)

Просмотреть файл

@ -35,7 +35,7 @@ namespace DurableTask.Netherite.Faster
this.traceHelper = traceHelper;
this.intakeWorker = new IntakeWorker(cancellationToken, this, partition.TraceHelper);
this.maxFragmentSize = (1 << this.blobManager.EventLogSettings(partition.Settings.UsePremiumStorage).PageSizeBits) - 64; // faster needs some room for header, 64 bytes is conservative
this.maxFragmentSize = (1 << this.blobManager.EventLogSettings(partition.Settings.UseSeparatePageBlobStorage).PageSizeBits) - 64; // faster needs some room for header, 64 bytes is conservative
}
public const byte first = 0x1;