Update to latest Azure Storage Tables client (#195)
* update to latest Azure Storage Tables client * Apply suggestions from code review Co-authored-by: Jacob Viau <javia@microsoft.com> * use fail-fast when parsing records, add unit tests, and fix a number of bugs * Add some .ConfigureAwait(false) to AzureBlobLoadPublisher * address PR feedback. Co-authored-by: Jacob Viau <javia@microsoft.com>
This commit is contained in:
Родитель
0333d908f0
Коммит
cf1e345881
|
@ -50,8 +50,8 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.Data.Tables" Version="12.6.1" />
|
||||
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.2" />
|
||||
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
|
||||
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
|
||||
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
|
||||
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.16" />
|
||||
|
|
|
@ -6,7 +6,6 @@ namespace DurableTask.Netherite
|
|||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Documents.SystemFunctions;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
// For indicating and initiating termination, and for tracing errors and warnings relating to a partition.
|
||||
|
|
|
@ -18,7 +18,13 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
readonly string taskHubName;
|
||||
readonly CloudBlobContainer blobContainer;
|
||||
|
||||
|
||||
readonly static JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
|
||||
{
|
||||
TypeNameHandling = TypeNameHandling.None,
|
||||
MissingMemberHandling = MissingMemberHandling.Ignore,
|
||||
};
|
||||
|
||||
int? numPartitions;
|
||||
|
||||
public AzureBlobLoadPublisher(string connectionString, string taskHubName)
|
||||
|
@ -32,15 +38,9 @@ namespace DurableTask.Netherite.Scaling
|
|||
|
||||
public TimeSpan PublishInterval => TimeSpan.FromSeconds(10);
|
||||
|
||||
public Task DeleteIfExistsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
// not needed since this blob is stored together with the taskhub storage
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task CreateIfNotExistsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
// not needed since this blob is stored together with the taskhub storage
|
||||
// not needed since the blobs are stored in the taskhub's container
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
@ -50,12 +50,35 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}");
|
||||
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
|
||||
var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented);
|
||||
var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented, serializerSettings);
|
||||
return blob.UploadTextAsync(json, cancellationToken);
|
||||
}
|
||||
|
||||
List<Task> tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList();
|
||||
return Task.WhenAll(tasks);
|
||||
return Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
public async Task<T> ReadJsonBlobAsync<T>(CloudBlockBlob blob, bool throwIfNotFound, bool throwOnParseError, CancellationToken token) where T : class
|
||||
{
|
||||
try
|
||||
{
|
||||
var jsonText = await blob.DownloadTextAsync(token).ConfigureAwait(false);
|
||||
return JsonConvert.DeserializeObject<T>(jsonText);
|
||||
}
|
||||
catch (StorageException e) when (!throwIfNotFound && e.RequestInformation?.HttpStatusCode == 404)
|
||||
{
|
||||
// container or blob does not exist
|
||||
}
|
||||
catch (JsonException) when (!throwOnParseError)
|
||||
{
|
||||
// cannot parse content of blob
|
||||
}
|
||||
catch(StorageException e) when (e.InnerException is OperationCanceledException operationCanceledException)
|
||||
{
|
||||
throw new OperationCanceledException("Blob read was canceled.", operationCanceledException);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public async Task<Dictionary<uint, PartitionLoadInfo>> QueryAsync(CancellationToken cancellationToken)
|
||||
|
@ -63,31 +86,59 @@ namespace DurableTask.Netherite.Scaling
|
|||
if (!this.numPartitions.HasValue)
|
||||
{
|
||||
// determine number of partitions of taskhub
|
||||
var blob = this.blobContainer.GetBlockBlobReference("taskhubparameters.json");
|
||||
var jsonText = await blob.DownloadTextAsync().ConfigureAwait(false);
|
||||
var info = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
|
||||
this.blobContainer.GetBlockBlobReference("taskhubparameters.json"),
|
||||
throwIfNotFound: true,
|
||||
throwOnParseError: true,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
/* Unmerged change from project 'DurableTask.Netherite (netcoreapp3.1)'
|
||||
Before:
|
||||
var info = JsonConvert.DeserializeObject<EventHubs.TaskhubParameters>(jsonText);
|
||||
After:
|
||||
var info = JsonConvert.DeserializeObject<TaskhubParameters>(jsonText);
|
||||
*/
|
||||
var info = JsonConvert.DeserializeObject<Netherite.Abstractions.TaskhubParameters>(jsonText);
|
||||
this.numPartitions = info.PartitionCount;
|
||||
}
|
||||
|
||||
async Task<(uint,PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId)
|
||||
async Task<(uint, PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId)
|
||||
{
|
||||
var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}");
|
||||
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
|
||||
string json = await blob.DownloadTextAsync(cancellationToken);
|
||||
PartitionLoadInfo info = JsonConvert.DeserializeObject<PartitionLoadInfo>(json);
|
||||
PartitionLoadInfo info = await this.ReadJsonBlobAsync<PartitionLoadInfo>(
|
||||
this.blobContainer.GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"),
|
||||
throwIfNotFound: false,
|
||||
throwOnParseError: true,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return (partitionId, info);
|
||||
}
|
||||
|
||||
var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DownloadPartitionInfo((uint) partitionId)).ToList();
|
||||
await Task.WhenAll(tasks);
|
||||
return tasks.Select(task => task.Result).ToDictionary(pair => pair.Item1, pair => pair.Item2);
|
||||
var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DownloadPartitionInfo((uint)partitionId)).ToList();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
return tasks.Select(task => task.Result).Where(pair => pair.Item2 != null).ToDictionary(pair => pair.Item1, pair => pair.Item2);
|
||||
}
|
||||
|
||||
public async Task DeleteIfExistsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!this.numPartitions.HasValue)
|
||||
{
|
||||
// determine number of partitions of taskhub
|
||||
var info = await this.ReadJsonBlobAsync<Netherite.Abstractions.TaskhubParameters>(
|
||||
this.blobContainer.GetBlockBlobReference("taskhubparameters.json"),
|
||||
throwIfNotFound: false,
|
||||
throwOnParseError: false,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (info == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
this.numPartitions = info.PartitionCount;
|
||||
}
|
||||
}
|
||||
|
||||
async Task DeletePartitionInfo(uint partitionId)
|
||||
{
|
||||
var blob = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}").GetBlockBlobReference("loadinfo.json");
|
||||
await BlobUtils.ForceDeleteAsync(blob).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DeletePartitionInfo((uint)partitionId)).ToList();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,8 +3,11 @@
|
|||
|
||||
namespace DurableTask.Netherite.Scaling
|
||||
{
|
||||
using Microsoft.Azure.Cosmos.Table;
|
||||
using Azure;
|
||||
using Azure.Data.Tables;
|
||||
using Microsoft.Extensions.Azure;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
|
@ -13,13 +16,12 @@ namespace DurableTask.Netherite.Scaling
|
|||
|
||||
class AzureTableLoadPublisher : ILoadPublisherService
|
||||
{
|
||||
readonly CloudTable table;
|
||||
readonly TableClient table;
|
||||
readonly string taskHubName;
|
||||
|
||||
public AzureTableLoadPublisher(string connectionString, string tableName, string taskHubName)
|
||||
{
|
||||
var account = CloudStorageAccount.Parse(connectionString);
|
||||
this.table = account.CreateCloudTableClient().GetTableReference(tableName);
|
||||
this.table = new TableClient(connectionString, tableName);
|
||||
this.taskHubName = taskHubName;
|
||||
}
|
||||
|
||||
|
@ -27,90 +29,78 @@ namespace DurableTask.Netherite.Scaling
|
|||
|
||||
public async Task DeleteIfExistsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (! await this.table.ExistsAsync().ConfigureAwait(false))
|
||||
try
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var query = new TableQuery<PartitionInfoEntity>().Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, this.taskHubName));
|
||||
TableContinuationToken continuationToken = null;
|
||||
do
|
||||
{
|
||||
var batch = await this.table.ExecuteQuerySegmentedAsync<PartitionInfoEntity>(query, continuationToken, null, null, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (batch.Count() > 0)
|
||||
var tableBatch = new List<TableTransactionAction>();
|
||||
await foreach (var e in this.table.QueryAsync<PartitionInfoEntity>(x => x.PartitionKey == this.taskHubName, cancellationToken: cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
// delete all entities in this batch. Max partition number is 32 so it always fits.
|
||||
TableBatchOperation tableBatch = new TableBatchOperation();
|
||||
|
||||
foreach (var e in batch)
|
||||
{
|
||||
tableBatch.Add(TableOperation.Delete(e));
|
||||
}
|
||||
|
||||
await this.table.ExecuteBatchAsync(tableBatch).ConfigureAwait(false);
|
||||
tableBatch.Add(new TableTransactionAction(TableTransactionActionType.Delete, e));
|
||||
}
|
||||
if (tableBatch.Count > 0)
|
||||
{
|
||||
await this.table.SubmitTransactionAsync(tableBatch, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
while (continuationToken != null);
|
||||
catch(Azure.RequestFailedException e) when (e.Status == 404) // table may not exist
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
public async Task CreateIfNotExistsAsync(CancellationToken cancellationToken)
|
||||
public Task CreateIfNotExistsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (await this.table.ExistsAsync().ConfigureAwait(false))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await this.table.CreateAsync().ConfigureAwait(false);
|
||||
return this.table.CreateIfNotExistsAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public Task PublishAsync(Dictionary<uint, PartitionLoadInfo> info, CancellationToken cancellationToken)
|
||||
{
|
||||
TableBatchOperation tableBatch = new TableBatchOperation();
|
||||
var tableBatch = new List<TableTransactionAction>();
|
||||
foreach(var kvp in info)
|
||||
{
|
||||
tableBatch.Add(TableOperation.InsertOrReplace(new PartitionInfoEntity(this.taskHubName, kvp.Key, kvp.Value)));
|
||||
tableBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, new PartitionInfoEntity(this.taskHubName, kvp.Key, kvp.Value)));
|
||||
}
|
||||
if (tableBatch.Count > 0)
|
||||
{
|
||||
return this.table.SubmitTransactionAsync(tableBatch, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
return this.table.ExecuteBatchAsync(tableBatch, null, null, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<Dictionary<uint, PartitionLoadInfo>> QueryAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var query = new TableQuery<PartitionInfoEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, this.taskHubName));
|
||||
TableContinuationToken continuationToken = null;
|
||||
Dictionary<uint, PartitionLoadInfo> result = new Dictionary<uint, PartitionLoadInfo>();
|
||||
do
|
||||
await foreach (var e in this.table.QueryAsync<PartitionInfoEntity>(x => x.PartitionKey == this.taskHubName, cancellationToken: cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var batch = await this.table.ExecuteQuerySegmentedAsync<PartitionInfoEntity>(query, continuationToken, null, null, cancellationToken).ConfigureAwait(false);
|
||||
foreach (var e in batch)
|
||||
result.Add(uint.Parse(e.RowKey), new PartitionLoadInfo()
|
||||
{
|
||||
int.TryParse(e.CachePct, out int cachePct);
|
||||
double.TryParse(e.MissRate, out double missRatePct);
|
||||
result.Add(e.PartitionId, new PartitionLoadInfo()
|
||||
{
|
||||
WorkItems = e.WorkItems,
|
||||
Activities = e.Activities,
|
||||
Timers = e.Timers,
|
||||
Requests = e.Requests,
|
||||
Wakeup = e.NextTimer,
|
||||
Outbox = e.Outbox,
|
||||
Instances = e.Instances,
|
||||
InputQueuePosition = e.InputQueuePosition,
|
||||
CommitLogPosition = e.CommitLogPosition,
|
||||
WorkerId = e.WorkerId,
|
||||
LatencyTrend = e.LatencyTrend,
|
||||
MissRate = missRatePct / 100,
|
||||
CachePct = cachePct,
|
||||
CacheMB = e.CacheMB,
|
||||
});
|
||||
}
|
||||
WorkItems = e.WorkItems,
|
||||
Activities = e.Activities,
|
||||
Timers = e.Timers,
|
||||
Requests = e.Requests,
|
||||
Wakeup = e.NextTimer,
|
||||
Outbox = e.Outbox,
|
||||
Instances = e.Instances,
|
||||
InputQueuePosition = e.InputQueuePosition,
|
||||
CommitLogPosition = e.CommitLogPosition,
|
||||
WorkerId = e.WorkerId,
|
||||
LatencyTrend = e.LatencyTrend,
|
||||
MissRate = double.Parse(e.MissRate.Substring(0, e.MissRate.Length - 1)) / 100,
|
||||
CachePct = int.Parse(e.CachePct.Substring(0, e.CachePct.Length - 1)),
|
||||
CacheMB = e.CacheMB,
|
||||
});
|
||||
}
|
||||
while (continuationToken != null);
|
||||
return result;
|
||||
}
|
||||
|
||||
public class PartitionInfoEntity : TableEntity
|
||||
public class PartitionInfoEntity : ITableEntity
|
||||
{
|
||||
public string PartitionKey { get; set; } // TaskHub name
|
||||
public string RowKey { get; set; } // partitionId
|
||||
public ETag ETag { get; set; }
|
||||
public DateTimeOffset? Timestamp { get; set; }
|
||||
|
||||
public int WorkItems { get; set; }
|
||||
public int Activities { get; set; }
|
||||
public int Timers { get; set; }
|
||||
|
@ -130,22 +120,21 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
}
|
||||
|
||||
// constructor for creating a deletion query; only the partition and row keys matter
|
||||
public PartitionInfoEntity(string taskHubName, uint partitionId)
|
||||
: base(taskHubName, partitionId.ToString("D2"))
|
||||
{
|
||||
this.ETag = "*"; // no conditions when deleting
|
||||
this.PartitionKey = taskHubName;
|
||||
this.RowKey = partitionId.ToString("D2");
|
||||
this.ETag = ETag.All; // no conditions
|
||||
}
|
||||
|
||||
// constructor for updating load information
|
||||
public PartitionInfoEntity(string taskHubName, uint partitionId, PartitionLoadInfo info)
|
||||
: base(taskHubName, partitionId.ToString("D2"))
|
||||
: this(taskHubName, partitionId)
|
||||
{
|
||||
this.WorkItems = info.WorkItems;
|
||||
this.Activities = info.Activities;
|
||||
this.Timers = info.Timers;
|
||||
this.Requests = info.Requests;
|
||||
this.NextTimer = info.Wakeup;
|
||||
this.NextTimer = info.Wakeup.HasValue ? info.Wakeup.Value.ToUniversalTime() : null;
|
||||
this.Outbox = info.Outbox;
|
||||
this.Instances = info.Instances;
|
||||
this.InputQueuePosition = info.InputQueuePosition;
|
||||
|
@ -155,12 +144,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
this.MissRate = $"{info.MissRate*100:f2}%";
|
||||
this.CachePct = $"{info.CachePct}%";
|
||||
this.CacheMB = info.CacheMB;
|
||||
|
||||
this.ETag = "*"; // no conditions when inserting, replace existing
|
||||
}
|
||||
|
||||
public string TaskHubName => this.PartitionKey;
|
||||
public uint PartitionId => uint.Parse(this.RowKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,8 +12,6 @@ namespace DurableTask.Netherite.Faster
|
|||
using DurableTask.Netherite.Abstractions;
|
||||
using DurableTask.Netherite.EventHubsTransport;
|
||||
using DurableTask.Netherite.Scaling;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.Azure.Documents.SystemFunctions;
|
||||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
@ -195,12 +193,12 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
if (parameters != null)
|
||||
{
|
||||
// first, delete the parameters file which deletes the taskhub logically
|
||||
await BlobUtils.ForceDeleteAsync(this.taskhubParameters);
|
||||
|
||||
// delete load information
|
||||
await this.LoadPublisher.DeleteIfExistsAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
|
||||
// delete the parameters file which deletes the taskhub logically
|
||||
await BlobUtils.ForceDeleteAsync(this.taskhubParameters);
|
||||
|
||||
// delete all the files/blobs in the directory/container that represents this taskhub
|
||||
// If this does not complete successfully, some garbage may be left behind.
|
||||
await BlobManager.DeleteTaskhubStorageAsync(this.storageAccount, this.pageBlobStorageAccount, this.localFileDirectory, parameters.TaskhubName, TaskhubPathPrefix(parameters));
|
||||
|
|
|
@ -7,7 +7,6 @@ namespace DurableTask.Netherite.SingleHostTransport
|
|||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Extensions.Azure;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.OData.Edm.Vocabularies;
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
|
|
|
@ -14,7 +14,6 @@ namespace DurableTask.Netherite.Tests
|
|||
using System.Threading.Tasks;
|
||||
using DurableTask.Core;
|
||||
using DurableTask.Core.Exceptions;
|
||||
using Microsoft.Azure.Cosmos.Table;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Xunit;
|
||||
|
|
|
@ -14,7 +14,6 @@ namespace DurableTask.Netherite.Tests
|
|||
using System.Threading.Tasks;
|
||||
using DurableTask.Core;
|
||||
using DurableTask.Core.Exceptions;
|
||||
using Microsoft.Azure.Cosmos.Table;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Xunit;
|
||||
|
|
|
@ -0,0 +1,263 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
namespace DurableTask.Netherite.Tests
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Data.Tables;
|
||||
using DurableTask.Core;
|
||||
using DurableTask.Netherite.Faster;
|
||||
using DurableTask.Netherite.Scaling;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Newtonsoft.Json.Serialization;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
using static DurableTask.Netherite.Tests.ScenarioTests;
|
||||
|
||||
[Collection("NetheriteTests")]
|
||||
[Trait("AnyTransport", "false")]
|
||||
public class LoadPublisherTests
|
||||
{
|
||||
string tableName;
|
||||
readonly List<string> taskHubs = new List<string>();
|
||||
|
||||
IStorageLayer GetFreshTaskHub(string tableName)
|
||||
{
|
||||
var settings = TestConstants.GetNetheriteOrchestrationServiceSettings("SingleHost");
|
||||
this.tableName = settings.LoadInformationAzureTableName = tableName;
|
||||
settings.HubName = $"{nameof(LoadPublisherTests)}-{Guid.NewGuid()}";
|
||||
this.taskHubs.Add(settings.HubName);
|
||||
var loggerFactory = new LoggerFactory();
|
||||
return new FasterStorageLayer(settings, new OrchestrationServiceTraceHelper(loggerFactory, LogLevel.None, "test", settings.HubName), loggerFactory);
|
||||
}
|
||||
|
||||
async Task NothingLeftBehind()
|
||||
{
|
||||
if (this.tableName == null)
|
||||
{
|
||||
foreach (var taskHub in this.taskHubs)
|
||||
{
|
||||
// there should not be anything left inside the blob container
|
||||
var blobContainerName = taskHub.ToLowerInvariant() + "-storage";
|
||||
var cloudBlobClient = Microsoft.Azure.Storage.CloudStorageAccount.Parse(Environment.GetEnvironmentVariable(TestConstants.StorageConnectionName)).CreateCloudBlobClient();
|
||||
var cloudBlobContainer = cloudBlobClient.GetContainerReference(blobContainerName);
|
||||
if (await cloudBlobContainer.ExistsAsync())
|
||||
{
|
||||
var allBlobs = cloudBlobContainer.ListBlobs().ToList();
|
||||
Assert.Empty(allBlobs);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// there should not be anything left in the table
|
||||
var tableClient = new TableClient(Environment.GetEnvironmentVariable(TestConstants.StorageConnectionName), this.tableName);
|
||||
try
|
||||
{
|
||||
await foreach (var e in tableClient.QueryAsync<TableEntity>())
|
||||
{
|
||||
Assert.Fail("table is not empty");
|
||||
}
|
||||
}
|
||||
catch (Azure.RequestFailedException e) when (e.Status == 404) // table may not exist
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(false)]
|
||||
[InlineData(true)]
|
||||
public async Task IdempotentCreationAndDeletion(bool useBlobs)
|
||||
{
|
||||
string tableName = $"Test{Guid.NewGuid():N}";
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub(useBlobs ? null : tableName);
|
||||
|
||||
// create taskhub and load publisher
|
||||
await taskhub.CreateTaskhubIfNotExistsAsync();
|
||||
|
||||
var parameters = await taskhub.TryLoadTaskhubAsync(true);
|
||||
|
||||
// this is superfluous since it is already implied by taskhub
|
||||
await taskhub.LoadPublisher.CreateIfNotExistsAsync(CancellationToken.None);
|
||||
await taskhub.LoadPublisher.DeleteIfExistsAsync(CancellationToken.None);
|
||||
|
||||
// delete taskhub and load publisher
|
||||
await taskhub.DeleteTaskhubAsync();
|
||||
|
||||
// check that nothing is left in storage
|
||||
await this.NothingLeftBehind();
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(false)]
|
||||
[InlineData(true)]
|
||||
public async Task DeleteNonexisting(bool useBlobs)
|
||||
{
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub(useBlobs ? null : "IDoNotExist");
|
||||
await taskhub.LoadPublisher.DeleteIfExistsAsync(CancellationToken.None); // should not throw
|
||||
|
||||
// check that nothing is left in storage
|
||||
await this.NothingLeftBehind();
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(false)]
|
||||
[InlineData(true)]
|
||||
public async Task PopulateAndQuery(bool useBlobs)
|
||||
{
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub(useBlobs ? null : $"Test{Guid.NewGuid():N}");
|
||||
|
||||
// create taskhub and load publisher
|
||||
await taskhub.CreateTaskhubIfNotExistsAsync();
|
||||
|
||||
// publish empty
|
||||
await taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>(), CancellationToken.None);
|
||||
|
||||
// publish two
|
||||
await taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { {0, this.Create("A") }, {1, this.Create("B") } }, CancellationToken.None);
|
||||
|
||||
// publish another two
|
||||
await taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { {0, this.Create("C") }, {2, this.Create("D") } }, CancellationToken.None);
|
||||
|
||||
// cancel a publishing
|
||||
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { { 1, this.Create("Y") }, { 2, this.Create("Z") } }, new CancellationToken(true)));
|
||||
|
||||
// do conflicting publishes
|
||||
Parallel.For(0, 10, i =>
|
||||
{
|
||||
taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { { 3, this.Create("E") } }, CancellationToken.None).Wait();
|
||||
});
|
||||
|
||||
// read all
|
||||
var results = await taskhub.LoadPublisher.QueryAsync(CancellationToken.None);
|
||||
|
||||
this.Check(results[0], "C");
|
||||
this.Check(results[1], "B");
|
||||
this.Check(results[2], "D");
|
||||
this.Check(results[3], "E");
|
||||
|
||||
// cancel a read
|
||||
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => taskhub.LoadPublisher.QueryAsync(new CancellationToken(true)));
|
||||
|
||||
// delete taskhub and load publisher
|
||||
await taskhub.DeleteTaskhubAsync();
|
||||
|
||||
// check that nothing is left in storage
|
||||
await this.NothingLeftBehind();
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(false)]
|
||||
[InlineData(true)]
|
||||
public async Task DeleteOne(bool useBlobs)
|
||||
{
|
||||
string tableName = useBlobs ? null : $"Test{Guid.NewGuid():N}";
|
||||
|
||||
IStorageLayer taskhub1 = this.GetFreshTaskHub(tableName);
|
||||
IStorageLayer taskhub2 = this.GetFreshTaskHub(tableName);
|
||||
|
||||
// create taskhubs and load publishers
|
||||
await Task.WhenAll(taskhub1.CreateTaskhubIfNotExistsAsync(), taskhub2.CreateTaskhubIfNotExistsAsync());
|
||||
|
||||
// publish two
|
||||
await Task.WhenAll(
|
||||
taskhub1.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { { 0, this.Create("A") }, { 1, this.Create("B") } }, CancellationToken.None),
|
||||
taskhub2.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { { 0, this.Create("A") }, { 1, this.Create("B") } }, CancellationToken.None));
|
||||
|
||||
// delete one
|
||||
await taskhub1.DeleteTaskhubAsync();
|
||||
|
||||
// check that stuff was deleted
|
||||
var queryResults = await taskhub1.LoadPublisher.QueryAsync(CancellationToken.None);
|
||||
Assert.Empty(queryResults);
|
||||
|
||||
// check that the other task hub is still there
|
||||
var results = await taskhub2.LoadPublisher.QueryAsync(CancellationToken.None);
|
||||
this.Check(results[0], "A");
|
||||
this.Check(results[1], "B");
|
||||
|
||||
await taskhub2.DeleteTaskhubAsync();
|
||||
|
||||
// check that nothing is left in storage
|
||||
await this.NothingLeftBehind();
|
||||
}
|
||||
|
||||
PartitionLoadInfo Create(string worker)
|
||||
{
|
||||
return new PartitionLoadInfo()
|
||||
{
|
||||
WorkerId = worker,
|
||||
Activities = 11,
|
||||
CacheMB = 1.1,
|
||||
CachePct = 33,
|
||||
CommitLogPosition = 64,
|
||||
InputQueuePosition = 1231,
|
||||
Instances = 3,
|
||||
LatencyTrend = "IIIII",
|
||||
MissRate = 0.1,
|
||||
Outbox = 44,
|
||||
Requests = 55,
|
||||
Timers = 66,
|
||||
Wakeup = DateTime.Parse("2022-10-08T17:00:44.7400082Z").ToUniversalTime(),
|
||||
WorkItems = 77
|
||||
};
|
||||
}
|
||||
|
||||
void Check(PartitionLoadInfo actual, string worker)
|
||||
{
|
||||
var expected = this.Create(worker);
|
||||
Assert.Equal(expected.WorkerId, actual.WorkerId);
|
||||
Assert.Equal(expected.Activities, actual.Activities);
|
||||
Assert.Equal(expected.CacheMB, actual.CacheMB);
|
||||
Assert.Equal(expected.CachePct, actual.CachePct);
|
||||
Assert.Equal(expected.CommitLogPosition, actual.CommitLogPosition);
|
||||
Assert.Equal(expected.InputQueuePosition, actual.InputQueuePosition);
|
||||
Assert.Equal(expected.Instances, actual.Instances);
|
||||
Assert.Equal(expected.LatencyTrend, actual.LatencyTrend);
|
||||
Assert.Equal(expected.MissRate, actual.MissRate);
|
||||
Assert.Equal(expected.Outbox, actual.Outbox);
|
||||
Assert.Equal(expected.Requests, actual.Requests);
|
||||
Assert.Equal(expected.Timers, actual.Timers);
|
||||
Assert.Equal(expected.Wakeup, actual.Wakeup);
|
||||
Assert.Equal(expected.WorkItems, actual.WorkItems);
|
||||
Assert.Equal(expected.WorkItems, actual.WorkItems);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PublishToMissingTable()
|
||||
{
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub("IDoNotExist");
|
||||
await Assert.ThrowsAnyAsync<Azure.RequestFailedException>(() => taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { { 1, this.Create("Y") } }, CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PublishToMissingTaskHub()
|
||||
{
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub(null);
|
||||
await Assert.ThrowsAnyAsync<Microsoft.Azure.Storage.StorageException>(() => taskhub.LoadPublisher.PublishAsync(new Dictionary<uint, PartitionLoadInfo>() { { 1, this.Create("Y") } }, CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryFromMissingTable()
|
||||
{
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub("IDoNotExist");
|
||||
await Assert.ThrowsAnyAsync<Azure.RequestFailedException>(() => taskhub.LoadPublisher.QueryAsync(CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryFromMissingTaskHub()
|
||||
{
|
||||
IStorageLayer taskhub = this.GetFreshTaskHub(null);
|
||||
await Assert.ThrowsAnyAsync<Microsoft.Azure.Storage.StorageException>(() => taskhub.LoadPublisher.QueryAsync(CancellationToken.None));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,7 +4,9 @@
|
|||
namespace DurableTask.Netherite.Tests
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.ComponentModel;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
|
@ -12,9 +14,10 @@ namespace DurableTask.Netherite.Tests
|
|||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Data.Tables;
|
||||
using Castle.Core.Resource;
|
||||
using DurableTask.Core;
|
||||
using DurableTask.Core.Exceptions;
|
||||
using Microsoft.Azure.Cosmos.Table;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Xunit;
|
||||
|
@ -1816,47 +1819,40 @@ namespace DurableTask.Netherite.Tests
|
|||
|
||||
internal class WriteTableRow : TaskActivity<Tuple<string, string>, string>
|
||||
{
|
||||
static CloudTable cachedTable;
|
||||
static TableClient tableClient;
|
||||
|
||||
internal static CloudTable TestCloudTable
|
||||
internal static TableClient TestCloudTable
|
||||
{
|
||||
get
|
||||
{
|
||||
if (cachedTable == null)
|
||||
if (tableClient == null)
|
||||
{
|
||||
CloudTable table = CloudStorageAccount
|
||||
.Parse(Environment.GetEnvironmentVariable(TestConstants.StorageConnectionName))
|
||||
.CreateCloudTableClient()
|
||||
.GetTableReference("TestTable");
|
||||
TableClient table = new TableClient(Environment.GetEnvironmentVariable(TestConstants.StorageConnectionName), "TestTable");
|
||||
table.CreateIfNotExistsAsync().Wait();
|
||||
cachedTable = table;
|
||||
tableClient = table;
|
||||
}
|
||||
|
||||
return cachedTable;
|
||||
return tableClient;
|
||||
}
|
||||
}
|
||||
|
||||
protected override string Execute(TaskContext context, Tuple<string, string> rowData)
|
||||
{
|
||||
var entity = new DynamicTableEntity(
|
||||
partitionKey: rowData.Item1,
|
||||
rowKey: $"{rowData.Item2}.{Guid.NewGuid():N}");
|
||||
TestCloudTable.ExecuteAsync(TableOperation.Insert(entity)).Wait();
|
||||
var entity = new TableEntity(rowData.Item1, $"{rowData.Item2}.{Guid.NewGuid():N}");
|
||||
WriteTableRow.TestCloudTable.AddEntity(entity);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
internal class CountTableRows : TaskActivity<string, int>
|
||||
{
|
||||
protected override int Execute(TaskContext context, string partitionKey)
|
||||
{
|
||||
var query = new TableQuery<DynamicTableEntity>().Where(
|
||||
TableQuery.GenerateFilterCondition(
|
||||
"PartitionKey",
|
||||
QueryComparisons.Equal,
|
||||
partitionKey));
|
||||
|
||||
return WriteTableRow.TestCloudTable.ExecuteQuerySegmentedAsync(query, null).GetAwaiter().GetResult().Count();
|
||||
var asyncPageable = WriteTableRow.TestCloudTable.QueryAsync<TableEntity>($"PartitionKey eq '{partitionKey}'");
|
||||
var count = asyncPageable.CountAsync().GetAwaiter().GetResult();
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче