validate partition count for eventhubs

This commit is contained in:
Sebastian Burckhardt 2020-11-03 14:12:56 -08:00
Родитель 705600d002
Коммит d067c6b667
1 изменённых файлов: 14 добавлений и 12 удалений

Просмотреть файл

@ -33,7 +33,6 @@ namespace DurableTask.Netherite.EventHubs
{
readonly TransportAbstraction.IHost host;
readonly NetheriteOrchestrationServiceSettings settings;
readonly EventHubsConnections connections;
readonly CloudStorageAccount cloudStorageAccount;
readonly EventHubsTraceHelper traceHelper;
@ -41,6 +40,8 @@ namespace DurableTask.Netherite.EventHubs
TransportAbstraction.IClient client;
TaskhubParameters parameters;
byte[] taskhubGuid;
EventHubsConnections connections;
Task clientEventLoopTask = Task.CompletedTask;
CancellationTokenSource shutdownSource;
@ -59,12 +60,6 @@ namespace DurableTask.Netherite.EventHubs
string namespaceName = TransportConnectionString.EventHubsNamespaceName(settings.EventHubsConnectionString);
this.traceHelper = new EventHubsTraceHelper(loggerFactory, settings.TransportLogLevelLimit, this.cloudStorageAccount.Credentials.AccountName, settings.HubName, namespaceName);
this.ClientId = Guid.NewGuid();
this.connections = new EventHubsConnections(settings.EventHubsConnectionString, EventHubsTransport.PartitionHubs, EventHubsTransport.ClientHubs)
{
Host = host,
TraceHelper = this.traceHelper,
UseJsonPackets = settings.UseJsonPackets,
};
var blobContainerName = GetContainerName(settings.HubName);
var cloudBlobClient = this.cloudStorageAccount.CreateCloudBlobClient();
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(blobContainerName);
@ -149,16 +144,24 @@ namespace DurableTask.Netherite.EventHubs
// load the taskhub parameters
var jsonText = await this.taskhubParameters.DownloadTextAsync().ConfigureAwait(false);
this.parameters = JsonConvert.DeserializeObject<TaskhubParameters>(jsonText);
this.taskhubGuid = this.parameters.TaskhubGuid.ToByteArray();
// check that we are the correct taskhub!
if (this.parameters.TaskhubName != this.settings.HubName)
{
throw new InvalidOperationException($"The specified taskhub name does not match the task hub name in {this.taskhubParameters.Name}");
}
this.host.NumberPartitions = (uint) this.parameters.StartPositions.Length;
await this.connections.StartAsync();
this.connections = new EventHubsConnections(this.settings.EventHubsConnectionString, this.parameters.PartitionHubs, this.parameters.ClientHubs)
{
Host = host,
TraceHelper = this.traceHelper,
UseJsonPackets = this.settings.UseJsonPackets,
};
await this.connections.StartAsync(this.parameters.StartPositions.Length);
this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);
@ -220,7 +223,6 @@ namespace DurableTask.Netherite.EventHubs
async Task ITaskHub.StopAsync(bool isForced)
{
this.parameters = null;
this.traceHelper.LogInformation("Shutting down EventHubsBackend");
this.traceHelper.LogDebug("Stopping client event loop");
this.shutdownSource.Cancel();
@ -257,13 +259,13 @@ namespace DurableTask.Netherite.EventHubs
{
case ClientEvent clientEvent:
var clientId = clientEvent.ClientId;
var clientSender = this.connections.GetClientSender(clientEvent.ClientId, this.parameters.TaskhubGuid.ToByteArray());
var clientSender = this.connections.GetClientSender(clientEvent.ClientId, this.taskhubGuid);
clientSender.Submit(clientEvent);
break;
case PartitionEvent partitionEvent:
var partitionId = partitionEvent.PartitionId;
var partitionSender = this.connections.GetPartitionSender((int) partitionId, this.parameters.TaskhubGuid.ToByteArray());
var partitionSender = this.connections.GetPartitionSender((int) partitionId, this.taskhubGuid);
partitionSender.Submit(partitionEvent);
break;