Fix closing the receiver can allow OnChangeRoleAsync to be called while the ReceivePumpAsync is still running (#296)
This commit is contained in:
Родитель
36c3087510
Коммит
d46680038e
|
@ -40,12 +40,12 @@ namespace Microsoft.Azure.EventHubs.Amqp
|
||||||
|
|
||||||
FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLinkManager { get; }
|
FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLinkManager { get; }
|
||||||
|
|
||||||
protected override Task OnCloseAsync()
|
protected async override Task OnCloseAsync()
|
||||||
{
|
{
|
||||||
// Close any ReceiveHandler (this is safe if there is none) and the ReceiveLinkManager in parallel.
|
// Close any ReceiveHandler (this is safe if there is none) and the ReceiveLinkManager in parallel.
|
||||||
this.ReceiveHandlerClose();
|
await this.ReceiveHandlerClose();
|
||||||
this.clientLinkManager.Close();
|
this.clientLinkManager.Close();
|
||||||
return this.ReceiveLinkManager.CloseAsync();
|
await this.ReceiveLinkManager.CloseAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task<IList<EventData>> OnReceiveAsync(int maxMessageCount, TimeSpan waitTime)
|
protected override async Task<IList<EventData>> OnReceiveAsync(int maxMessageCount, TimeSpan waitTime)
|
||||||
|
@ -163,6 +163,7 @@ namespace Microsoft.Azure.EventHubs.Amqp
|
||||||
// newReceiveHandler == null, so this is an unregister call, ensure pump is shut down.
|
// newReceiveHandler == null, so this is an unregister call, ensure pump is shut down.
|
||||||
if (this.receivePumpTask != null)
|
if (this.receivePumpTask != null)
|
||||||
{
|
{
|
||||||
|
// Do not wait as could block and would still match the previous behavior
|
||||||
this.ReceiveHandlerClose();
|
this.ReceiveHandlerClose();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -339,14 +340,17 @@ namespace Microsoft.Azure.EventHubs.Amqp
|
||||||
|
|
||||||
// Encapsulates taking the receivePumpLock, checking this.receiveHandler for null,
|
// Encapsulates taking the receivePumpLock, checking this.receiveHandler for null,
|
||||||
// calls this.receiveHandler.CloseAsync (starting this operation inside the receivePumpLock).
|
// calls this.receiveHandler.CloseAsync (starting this operation inside the receivePumpLock).
|
||||||
void ReceiveHandlerClose()
|
Task ReceiveHandlerClose()
|
||||||
{
|
{
|
||||||
|
Task task = null;
|
||||||
|
|
||||||
lock (this.receivePumpLock)
|
lock (this.receivePumpLock)
|
||||||
{
|
{
|
||||||
if (this.receiveHandler != null)
|
if (this.receiveHandler != null)
|
||||||
{
|
{
|
||||||
if (this.receivePumpTask != null)
|
if (this.receivePumpTask != null)
|
||||||
{
|
{
|
||||||
|
task = this.receivePumpTask;
|
||||||
this.receivePumpCancellationSource.Cancel();
|
this.receivePumpCancellationSource.Cancel();
|
||||||
this.receivePumpCancellationSource.Dispose();
|
this.receivePumpCancellationSource.Dispose();
|
||||||
this.receivePumpCancellationSource = null;
|
this.receivePumpCancellationSource = null;
|
||||||
|
@ -356,6 +360,8 @@ namespace Microsoft.Azure.EventHubs.Amqp
|
||||||
this.receiveHandler = null;
|
this.receiveHandler = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return task ?? Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encapsulates taking the receivePumpLock, checking this.receiveHandler for null,
|
// Encapsulates taking the receivePumpLock, checking this.receiveHandler for null,
|
||||||
|
|
Загрузка…
Ссылка в новой задаче