diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs index c5f8657..8bd0247 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs @@ -14,9 +14,10 @@ namespace Microsoft.Azure.EventHubs.Processor class PartitionManager { readonly EventProcessorHost host; - readonly CancellationTokenSource cancellationTokenSource; readonly ConcurrentDictionary partitionPumps; + IList partitionIds; + CancellationTokenSource cancellationTokenSource; Task runTask; internal PartitionManager(EventProcessorHost host) @@ -39,7 +40,7 @@ namespace Microsoft.Azure.EventHubs.Processor this.partitionIds = runtimeInfo.PartitionIds.ToList(); } catch (Exception e) - { + { throw new EventProcessorConfigurationException("Encountered error while fetching the list of EventHub PartitionIds", e); } finally @@ -76,6 +77,10 @@ namespace Microsoft.Azure.EventHubs.Processor { await localRunTask.ConfigureAwait(false); } + + // once it is closed let's reset the task + this.runTask = null; + this.cancellationTokenSource = new CancellationTokenSource(); } async Task RunAsync() @@ -97,7 +102,7 @@ namespace Microsoft.Azure.EventHubs.Processor await this.RemoveAllPumpsAsync(CloseReason.Shutdown).ConfigureAwait(false); } catch (Exception e) - { + { ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during shutdown", e.ToString()); this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionManagerCleanup); } @@ -110,16 +115,16 @@ namespace Microsoft.Azure.EventHubs.Processor if (!await leaseManager.LeaseStoreExistsAsync().ConfigureAwait(false)) { await RetryAsync(() => leaseManager.CreateLeaseStoreIfNotExistsAsync(), null, "Failure creating lease store for this Event Hub, retrying", - "Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.CreatingLeaseStore, 5).ConfigureAwait(false); + "Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.CreatingLeaseStore, 5).ConfigureAwait(false); } // else // lease store already exists, no work needed - + // Now make sure the leases exist foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false)) { await RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(id), id, "Failure creating lease for partition, retrying", - "Out of retries creating lease for partition", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false); + "Out of retries creating lease for partition", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false); } // Make sure the checkpoint store exists @@ -127,26 +132,26 @@ namespace Microsoft.Azure.EventHubs.Processor if (!await checkpointManager.CheckpointStoreExistsAsync().ConfigureAwait(false)) { await RetryAsync(() => checkpointManager.CreateCheckpointStoreIfNotExistsAsync(), null, "Failure creating checkpoint store for this Event Hub, retrying", - "Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.CreatingCheckpointStore, 5).ConfigureAwait(false); + "Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.CreatingCheckpointStore, 5).ConfigureAwait(false); } // else // checkpoint store already exists, no work needed - + // Now make sure the checkpoints exist foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false)) { await RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(id), id, "Failure creating checkpoint for partition, retrying", - "Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false); + "Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false); } } - + // Throws if it runs out of retries. If it returns, action succeeded. async Task RetryAsync(Func lambda, string partitionId, string retryMessage, string finalFailureMessage, string action, int maxRetries) // throws ExceptionWithAction { Exception finalException = null; bool createdOK = false; - int retryCount = 0; - do + int retryCount = 0; + do { try { @@ -224,9 +229,9 @@ namespace Microsoft.Azure.EventHubs.Processor // Just log here, expired leases will be picked by same or another host anyway. ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, lease.PartitionId, "Failed to renew lease.", renewResult.Exception?.Message); this.host.EventProcessorOptions.NotifyOfException( - this.host.HostName, - lease.PartitionId, - renewResult.Exception, + this.host.HostName, + lease.PartitionId, + renewResult.Exception, EventProcessorHostActionStrings.RenewingLease); } }, cancellationToken)); @@ -444,7 +449,8 @@ namespace Microsoft.Azure.EventHubs.Processor Dictionary CountLeasesByOwner(IEnumerable leases) { - var counts = leases.GroupBy(lease => lease.Owner).Select(group => new { + var counts = leases.GroupBy(lease => lease.Owner).Select(group => new + { Owner = group.Key, Count = group.Count() }); diff --git a/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs b/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs index aaf5ff3..5e04cda 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs @@ -948,6 +948,31 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor await RunGenericScenario(eventProcessorHost, epo); } + [Fact] + [DisplayTestMethodName] + async Task ReRegisterEventProcessor() + { + var eventProcessorHost = new EventProcessorHost( + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + TestUtility.EventHubsConnectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + // Calling register for the first time should succeed. + TestUtility.Log("Registering EventProcessorHost for the first time."); + await eventProcessorHost.RegisterEventProcessorAsync(); + + // Unregister event processor should succed + TestUtility.Log("Registering EventProcessorHost for the first time."); + await eventProcessorHost.UnregisterEventProcessorAsync(); + + var epo = await GetOptionsAsync(); + + // Run a generic scenario with TestEventProcessor instead + await RunGenericScenario(eventProcessorHost, epo); + } + async Task>> DiscoverEndOfStream() { var ehClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString); @@ -982,7 +1007,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor try { - TestUtility.Log($"Calling RegisterEventProcessorAsync"); + TestUtility.Log("Calling RegisterEventProcessorAsync"); var processorFactory = new TestEventProcessorFactory(); processorFactory.OnCreateProcessor += (f, createArgs) => @@ -1067,7 +1092,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor async Task GetOptionsAsync() { var partitions = await DiscoverEndOfStream(); - return new EventProcessorOptions() + return new EventProcessorOptions { MaxBatchSize = 100, InitialOffsetProvider = pId => EventPosition.FromOffset(partitions[pId].Item1) @@ -1080,7 +1105,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor public ConcurrentDictionary> ReceivedEvents = new ConcurrentDictionary>(); public int NumberOfFailures = 0; - object listLock = new object(); + readonly object listLock = new object(); public void AddEvents(string partitionId, IEnumerable addEvents) { diff --git a/test/Microsoft.Azure.EventHubs.Tests/Processor/TestEventProcessor.cs b/test/Microsoft.Azure.EventHubs.Tests/Processor/TestEventProcessor.cs index 523be49..325ab9b 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/Processor/TestEventProcessor.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/Processor/TestEventProcessor.cs @@ -59,6 +59,29 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor } } + class SecondTestEventProcessor : IEventProcessor + { + Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) + { + return Task.CompletedTask; + } + + Task IEventProcessor.ProcessErrorAsync(PartitionContext context, Exception error) + { + return Task.CompletedTask; + } + + Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable events) + { + return Task.CompletedTask; + } + + Task IEventProcessor.OpenAsync(PartitionContext context) + { + return Task.CompletedTask; + } + } + class TestEventProcessorFactory : IEventProcessorFactory { public event EventHandler> OnCreateProcessor;