Validate partition table name on consumption plan (#207)
* disable the use of blob publishing (tablename null) on consumption plan * update blob publisher to use the right directories, fix null checks of table name to null or empty checks * fix log level for LoadMonitor
This commit is contained in:
Родитель
2f03f54759
Коммит
7079232c57
|
@ -175,6 +175,12 @@ namespace DurableTask.Netherite.AzureFunctions
|
|||
// validate the settings and resolve the connections
|
||||
netheriteSettings.Validate(this.connectionResolver);
|
||||
|
||||
// must always use AzureTableLoadPublisher on consumption plans
|
||||
if (string.IsNullOrEmpty(netheriteSettings.LoadInformationAzureTableName) && this.inConsumption)
|
||||
{
|
||||
throw new NetheriteConfigurationException("The Netherite setting LoadInformationAzureTableName must not be null or empty when running on a consumption plan");
|
||||
}
|
||||
|
||||
int randomProbability = 0;
|
||||
bool attachFaultInjector =
|
||||
(this.options.StorageProvider.TryGetValue("FaultInjectionProbability", out object value)
|
||||
|
|
|
@ -31,7 +31,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
/// <summary>
|
||||
/// The table storage account, used for publishing load information.
|
||||
/// Not required if <see cref="NetheriteOrchestrationServiceSettings.LoadInformationAzureTableName"/> is set to null, or if
|
||||
/// Not required if <see cref="NetheriteOrchestrationServiceSettings.LoadInformationAzureTableName"/> is set to null or empty, or if
|
||||
/// the layer configuration uses <see cref="StorageChoices.Memory"/>.
|
||||
/// </summary>
|
||||
TableStorage,
|
||||
|
|
|
@ -29,7 +29,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
if (this.logger.IsEnabled(LogLevel.Debug))
|
||||
{
|
||||
this.logger.LogInformation("LoadMonitor {details}", details);
|
||||
this.logger.LogDebug("LoadMonitor {details}", details);
|
||||
}
|
||||
if (EtwSource.Log.IsEnabled())
|
||||
{
|
||||
|
|
|
@ -47,7 +47,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
/// <summary>
|
||||
/// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty,
|
||||
/// then Azure blobs are used instead.
|
||||
/// then Azure blobs are used instead. The use of Azure blobs is currently not supported on consumption plans, or on elastic premium plans without runtime scaling.
|
||||
/// </summary>
|
||||
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";
|
||||
|
||||
|
@ -419,7 +419,7 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
}
|
||||
|
||||
if (this.StorageChoice == StorageChoices.Faster && this.LoadInformationAzureTableName != null)
|
||||
if (this.StorageChoice == StorageChoices.Faster && !string.IsNullOrEmpty(this.LoadInformationAzureTableName))
|
||||
{
|
||||
// we need a valid table storage connection
|
||||
try
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
namespace DurableTask.Netherite.Scaling
|
||||
{
|
||||
using DurableTask.Netherite.Abstractions;
|
||||
using DurableTask.Netherite.Faster;
|
||||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
|
@ -18,6 +19,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
readonly string taskHubName;
|
||||
readonly Task<CloudBlobContainer> blobContainer;
|
||||
TaskhubParameters parameters;
|
||||
|
||||
readonly static JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
|
||||
{
|
||||
|
@ -25,8 +27,6 @@ namespace DurableTask.Netherite.Scaling
|
|||
MissingMemberHandling = MissingMemberHandling.Ignore,
|
||||
};
|
||||
|
||||
int? numPartitions;
|
||||
|
||||
public AzureBlobLoadPublisher(ConnectionInfo connectionInfo, string taskHubName)
|
||||
{
|
||||
this.blobContainer = this.GetBlobContainer(connectionInfo, taskHubName);
|
||||
|
@ -49,18 +49,17 @@ namespace DurableTask.Netherite.Scaling
|
|||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task PublishAsync(Dictionary<uint, PartitionLoadInfo> info, CancellationToken cancellationToken)
|
||||
async ValueTask<bool> LoadParameters(bool throwIfNotFound, CancellationToken cancellationToken)
|
||||
{
|
||||
async Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo)
|
||||
if (this.parameters == null)
|
||||
{
|
||||
var blobDirectory = (await this.blobContainer).GetDirectoryReference($"p{partitionId:D2}");
|
||||
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
|
||||
var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented, serializerSettings);
|
||||
await blob.UploadTextAsync(json, cancellationToken);
|
||||
this.parameters = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
|
||||
(await this.blobContainer).GetBlockBlobReference("taskhubparameters.json"),
|
||||
throwIfNotFound: throwIfNotFound,
|
||||
throwOnParseError: throwIfNotFound,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
List<Task> tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList();
|
||||
return Task.WhenAll(tasks);
|
||||
return this.parameters != null;
|
||||
}
|
||||
|
||||
public async Task<T> ReadJsonBlobAsync<T>(CloudBlockBlob blob, bool throwIfNotFound, bool throwOnParseError, CancellationToken token) where T : class
|
||||
|
@ -86,64 +85,54 @@ namespace DurableTask.Netherite.Scaling
|
|||
return null;
|
||||
}
|
||||
|
||||
public async Task PublishAsync(Dictionary<uint, PartitionLoadInfo> info, CancellationToken cancellationToken)
|
||||
{
|
||||
await this.LoadParameters(throwIfNotFound: true, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
async Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo)
|
||||
{
|
||||
var blobDirectory = (await this.blobContainer).GetDirectoryReference($"{this.parameters.TaskhubGuid}/p{partitionId:D2}");
|
||||
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
|
||||
var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented, serializerSettings);
|
||||
await blob.UploadTextAsync(json, cancellationToken);
|
||||
}
|
||||
|
||||
List<Task> tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList();
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
public async Task<Dictionary<uint, PartitionLoadInfo>> QueryAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!this.numPartitions.HasValue)
|
||||
{
|
||||
// determine number of partitions of taskhub
|
||||
var info = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
|
||||
(await this.blobContainer).GetBlockBlobReference("taskhubparameters.json"),
|
||||
throwIfNotFound: true,
|
||||
throwOnParseError: true,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
this.numPartitions = info.PartitionCount;
|
||||
}
|
||||
await this.LoadParameters(throwIfNotFound: true, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
async Task<(uint, PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId)
|
||||
{
|
||||
PartitionLoadInfo info = await this.ReadJsonBlobAsync<PartitionLoadInfo>(
|
||||
(await this.blobContainer).GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"),
|
||||
(await this.blobContainer).GetDirectoryReference($"{this.parameters.TaskhubGuid}/p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"),
|
||||
throwIfNotFound: false,
|
||||
throwOnParseError: true,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return (partitionId, info);
|
||||
}
|
||||
|
||||
var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DownloadPartitionInfo((uint)partitionId)).ToList();
|
||||
var tasks = Enumerable.Range(0, this.parameters.PartitionCount).Select(partitionId => DownloadPartitionInfo((uint)partitionId)).ToList();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
return tasks.Select(task => task.Result).Where(pair => pair.Item2 != null).ToDictionary(pair => pair.Item1, pair => pair.Item2);
|
||||
}
|
||||
|
||||
public async Task DeleteIfExistsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!this.numPartitions.HasValue)
|
||||
if (await this.LoadParameters(throwIfNotFound: false, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
// determine number of partitions of taskhub
|
||||
var info = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
|
||||
(await this.blobContainer).GetBlockBlobReference("taskhubparameters.json"),
|
||||
throwIfNotFound: false,
|
||||
throwOnParseError: false,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (info == null)
|
||||
async Task DeletePartitionInfo(uint partitionId)
|
||||
{
|
||||
return;
|
||||
var blob = (await this.blobContainer).GetDirectoryReference($"{this.parameters.TaskhubGuid}/p{partitionId:D2}").GetBlockBlobReference("loadinfo.json");
|
||||
await BlobUtils.ForceDeleteAsync(blob).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.numPartitions = info.PartitionCount;
|
||||
}
|
||||
}
|
||||
|
||||
async Task DeletePartitionInfo(uint partitionId)
|
||||
{
|
||||
var blob = (await this.blobContainer).GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json");
|
||||
await BlobUtils.ForceDeleteAsync(blob).ConfigureAwait(false);
|
||||
var tasks = Enumerable.Range(0, this.parameters.PartitionCount).Select(partitionId => DeletePartitionInfo((uint)partitionId)).ToList();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DeletePartitionInfo((uint)partitionId)).ToList();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,9 @@
|
|||
// set this to true to attach replay checker
|
||||
//"AttachCacheDebugger": true,
|
||||
|
||||
// can change this to use a different table, or blobs
|
||||
//"LoadInformationAzureTableName": "",
|
||||
|
||||
// set this to "Scripted" to control the scenario with a partition script
|
||||
// or to "ClientOnly" to run only the client
|
||||
"PartitionManagement": "EventProcessorHost",
|
||||
|
|
Загрузка…
Ссылка в новой задаче