From d067c6b667c9f1717da44082b6ef8877cad3ffb4 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Tue, 3 Nov 2020 14:12:56 -0800 Subject: [PATCH] validate partition count for eventhubs --- .../EventHubs/EventHubsTransport.cs | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/DurableTask.Netherite/TransportProviders/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportProviders/EventHubs/EventHubsTransport.cs index 2e611ea..c113e46 100644 --- a/src/DurableTask.Netherite/TransportProviders/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportProviders/EventHubs/EventHubsTransport.cs @@ -33,7 +33,6 @@ namespace DurableTask.Netherite.EventHubs { readonly TransportAbstraction.IHost host; readonly NetheriteOrchestrationServiceSettings settings; - readonly EventHubsConnections connections; readonly CloudStorageAccount cloudStorageAccount; readonly EventHubsTraceHelper traceHelper; @@ -41,6 +40,8 @@ namespace DurableTask.Netherite.EventHubs TransportAbstraction.IClient client; TaskhubParameters parameters; + byte[] taskhubGuid; + EventHubsConnections connections; Task clientEventLoopTask = Task.CompletedTask; CancellationTokenSource shutdownSource; @@ -59,12 +60,6 @@ namespace DurableTask.Netherite.EventHubs string namespaceName = TransportConnectionString.EventHubsNamespaceName(settings.EventHubsConnectionString); this.traceHelper = new EventHubsTraceHelper(loggerFactory, settings.TransportLogLevelLimit, this.cloudStorageAccount.Credentials.AccountName, settings.HubName, namespaceName); this.ClientId = Guid.NewGuid(); - this.connections = new EventHubsConnections(settings.EventHubsConnectionString, EventHubsTransport.PartitionHubs, EventHubsTransport.ClientHubs) - { - Host = host, - TraceHelper = this.traceHelper, - UseJsonPackets = settings.UseJsonPackets, - }; var blobContainerName = GetContainerName(settings.HubName); var cloudBlobClient = this.cloudStorageAccount.CreateCloudBlobClient(); this.cloudBlobContainer = cloudBlobClient.GetContainerReference(blobContainerName); @@ -149,16 +144,24 @@ namespace DurableTask.Netherite.EventHubs // load the taskhub parameters var jsonText = await this.taskhubParameters.DownloadTextAsync().ConfigureAwait(false); this.parameters = JsonConvert.DeserializeObject(jsonText); + this.taskhubGuid = this.parameters.TaskhubGuid.ToByteArray(); // check that we are the correct taskhub! if (this.parameters.TaskhubName != this.settings.HubName) { throw new InvalidOperationException($"The specified taskhub name does not match the task hub name in {this.taskhubParameters.Name}"); } - + this.host.NumberPartitions = (uint) this.parameters.StartPositions.Length; - await this.connections.StartAsync(); + this.connections = new EventHubsConnections(this.settings.EventHubsConnectionString, this.parameters.PartitionHubs, this.parameters.ClientHubs) + { + Host = host, + TraceHelper = this.traceHelper, + UseJsonPackets = this.settings.UseJsonPackets, + }; + + await this.connections.StartAsync(this.parameters.StartPositions.Length); this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this); @@ -220,7 +223,6 @@ namespace DurableTask.Netherite.EventHubs async Task ITaskHub.StopAsync(bool isForced) { - this.parameters = null; this.traceHelper.LogInformation("Shutting down EventHubsBackend"); this.traceHelper.LogDebug("Stopping client event loop"); this.shutdownSource.Cancel(); @@ -257,13 +259,13 @@ namespace DurableTask.Netherite.EventHubs { case ClientEvent clientEvent: var clientId = clientEvent.ClientId; - var clientSender = this.connections.GetClientSender(clientEvent.ClientId, this.parameters.TaskhubGuid.ToByteArray()); + var clientSender = this.connections.GetClientSender(clientEvent.ClientId, this.taskhubGuid); clientSender.Submit(clientEvent); break; case PartitionEvent partitionEvent: var partitionId = partitionEvent.PartitionId; - var partitionSender = this.connections.GetPartitionSender((int) partitionId, this.parameters.TaskhubGuid.ToByteArray()); + var partitionSender = this.connections.GetPartitionSender((int) partitionId, this.taskhubGuid); partitionSender.Submit(partitionEvent); break;