Nullify Task when The Stop is complete (#342)

* Nullfy Task when The Stop is complete

* Test for Re Register event processor

* Reset CancellationTokenSource
This commit is contained in:
David Revoledo 2018-11-08 20:25:19 -03:00 коммит произвёл Serkant Karaca
Родитель d19b55dcc9
Коммит 3b27eecb60
3 изменённых файлов: 73 добавлений и 19 удалений

Просмотреть файл

@ -14,9 +14,10 @@ namespace Microsoft.Azure.EventHubs.Processor
class PartitionManager
{
readonly EventProcessorHost host;
readonly CancellationTokenSource cancellationTokenSource;
readonly ConcurrentDictionary<string, PartitionPump> partitionPumps;
IList<string> partitionIds;
CancellationTokenSource cancellationTokenSource;
Task runTask;
internal PartitionManager(EventProcessorHost host)
@ -39,7 +40,7 @@ namespace Microsoft.Azure.EventHubs.Processor
this.partitionIds = runtimeInfo.PartitionIds.ToList();
}
catch (Exception e)
{
{
throw new EventProcessorConfigurationException("Encountered error while fetching the list of EventHub PartitionIds", e);
}
finally
@ -76,6 +77,10 @@ namespace Microsoft.Azure.EventHubs.Processor
{
await localRunTask.ConfigureAwait(false);
}
// once it is closed let's reset the task
this.runTask = null;
this.cancellationTokenSource = new CancellationTokenSource();
}
async Task RunAsync()
@ -97,7 +102,7 @@ namespace Microsoft.Azure.EventHubs.Processor
await this.RemoveAllPumpsAsync(CloseReason.Shutdown).ConfigureAwait(false);
}
catch (Exception e)
{
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during shutdown", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionManagerCleanup);
}
@ -110,16 +115,16 @@ namespace Microsoft.Azure.EventHubs.Processor
if (!await leaseManager.LeaseStoreExistsAsync().ConfigureAwait(false))
{
await RetryAsync(() => leaseManager.CreateLeaseStoreIfNotExistsAsync(), null, "Failure creating lease store for this Event Hub, retrying",
"Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.CreatingLeaseStore, 5).ConfigureAwait(false);
"Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.CreatingLeaseStore, 5).ConfigureAwait(false);
}
// else
// lease store already exists, no work needed
// Now make sure the leases exist
foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false))
{
await RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(id), id, "Failure creating lease for partition, retrying",
"Out of retries creating lease for partition", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false);
"Out of retries creating lease for partition", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false);
}
// Make sure the checkpoint store exists
@ -127,26 +132,26 @@ namespace Microsoft.Azure.EventHubs.Processor
if (!await checkpointManager.CheckpointStoreExistsAsync().ConfigureAwait(false))
{
await RetryAsync(() => checkpointManager.CreateCheckpointStoreIfNotExistsAsync(), null, "Failure creating checkpoint store for this Event Hub, retrying",
"Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.CreatingCheckpointStore, 5).ConfigureAwait(false);
"Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.CreatingCheckpointStore, 5).ConfigureAwait(false);
}
// else
// checkpoint store already exists, no work needed
// Now make sure the checkpoints exist
foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false))
{
await RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(id), id, "Failure creating checkpoint for partition, retrying",
"Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false);
"Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false);
}
}
// Throws if it runs out of retries. If it returns, action succeeded.
async Task RetryAsync(Func<Task> lambda, string partitionId, string retryMessage, string finalFailureMessage, string action, int maxRetries) // throws ExceptionWithAction
{
Exception finalException = null;
bool createdOK = false;
int retryCount = 0;
do
int retryCount = 0;
do
{
try
{
@ -224,9 +229,9 @@ namespace Microsoft.Azure.EventHubs.Processor
// Just log here, expired leases will be picked by same or another host anyway.
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, lease.PartitionId, "Failed to renew lease.", renewResult.Exception?.Message);
this.host.EventProcessorOptions.NotifyOfException(
this.host.HostName,
lease.PartitionId,
renewResult.Exception,
this.host.HostName,
lease.PartitionId,
renewResult.Exception,
EventProcessorHostActionStrings.RenewingLease);
}
}, cancellationToken));
@ -444,7 +449,8 @@ namespace Microsoft.Azure.EventHubs.Processor
Dictionary<string, int> CountLeasesByOwner(IEnumerable<Lease> leases)
{
var counts = leases.GroupBy(lease => lease.Owner).Select(group => new {
var counts = leases.GroupBy(lease => lease.Owner).Select(group => new
{
Owner = group.Key,
Count = group.Count()
});

Просмотреть файл

@ -948,6 +948,31 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
await RunGenericScenario(eventProcessorHost, epo);
}
[Fact]
[DisplayTestMethodName]
async Task ReRegisterEventProcessor()
{
var eventProcessorHost = new EventProcessorHost(
null, // Entity path will be picked from connection string.
PartitionReceiver.DefaultConsumerGroupName,
TestUtility.EventHubsConnectionString,
TestUtility.StorageConnectionString,
Guid.NewGuid().ToString());
// Calling register for the first time should succeed.
TestUtility.Log("Registering EventProcessorHost for the first time.");
await eventProcessorHost.RegisterEventProcessorAsync<SecondTestEventProcessor>();
// Unregister event processor should succed
TestUtility.Log("Registering EventProcessorHost for the first time.");
await eventProcessorHost.UnregisterEventProcessorAsync();
var epo = await GetOptionsAsync();
// Run a generic scenario with TestEventProcessor instead
await RunGenericScenario(eventProcessorHost, epo);
}
async Task<Dictionary<string, Tuple<string, DateTime>>> DiscoverEndOfStream()
{
var ehClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
@ -982,7 +1007,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
try
{
TestUtility.Log($"Calling RegisterEventProcessorAsync");
TestUtility.Log("Calling RegisterEventProcessorAsync");
var processorFactory = new TestEventProcessorFactory();
processorFactory.OnCreateProcessor += (f, createArgs) =>
@ -1067,7 +1092,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
async Task<EventProcessorOptions> GetOptionsAsync()
{
var partitions = await DiscoverEndOfStream();
return new EventProcessorOptions()
return new EventProcessorOptions
{
MaxBatchSize = 100,
InitialOffsetProvider = pId => EventPosition.FromOffset(partitions[pId].Item1)
@ -1080,7 +1105,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
public ConcurrentDictionary<string, List<EventData>> ReceivedEvents = new ConcurrentDictionary<string, List<EventData>>();
public int NumberOfFailures = 0;
object listLock = new object();
readonly object listLock = new object();
public void AddEvents(string partitionId, IEnumerable<EventData> addEvents)
{

Просмотреть файл

@ -59,6 +59,29 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
}
}
class SecondTestEventProcessor : IEventProcessor
{
Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
return Task.CompletedTask;
}
Task IEventProcessor.ProcessErrorAsync(PartitionContext context, Exception error)
{
return Task.CompletedTask;
}
Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
return Task.CompletedTask;
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
return Task.CompletedTask;
}
}
class TestEventProcessorFactory : IEventProcessorFactory
{
public event EventHandler<Tuple<PartitionContext, TestEventProcessor>> OnCreateProcessor;