Fix synchronization of semaphore in retry delegating handler (#3135)

* Decouple client open-close semaphore from callback subscription semaphore

* Cancel pending operations when CloseAsync() is invoked
This commit is contained in:
Abhipsa Misra 2023-03-07 15:51:33 -08:00 коммит произвёл GitHub
Родитель 480a27ba6a
Коммит d2971800f4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 320 добавлений и 140 удалений

Просмотреть файл

@ -217,7 +217,7 @@ Function BuildPackage($path, $message)
SignDotNetBinary $filesToSign
}
& dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --output $localPackages
& dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --property:PackageOutputPath=$localPackages
if ($LASTEXITCODE -ne 0)
{

Просмотреть файл

@ -114,6 +114,7 @@ namespace Microsoft.Azure.Devices.E2ETests
[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[TestCategory("LongRunning")]
[TestCategory("Proxy")]
public async Task FileUpload_SmallFile_Http_GranularSteps_Proxy()
{
string filename = await GetTestFileNameAsync(FileSizeSmall).ConfigureAwait(false);

Просмотреть файл

@ -328,6 +328,7 @@ namespace Microsoft.Azure.Devices.E2ETests.IotHub.Service
[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[TestCategory("Proxy")]
public async Task RegistryManager_AddDeviceWithProxy()
{
string deviceId = _idPrefix + Guid.NewGuid();

Просмотреть файл

@ -11,8 +11,9 @@ namespace Microsoft.Azure.Devices.Client.Transport
{
internal abstract class DefaultDelegatingHandler : IDelegatingHandler
{
private volatile IDelegatingHandler _innerHandler;
protected const string ClientDisposedMessage = "The client has been disposed and is no longer usable.";
protected volatile bool _isDisposed;
private volatile IDelegatingHandler _innerHandler;
protected DefaultDelegatingHandler(PipelineContext context, IDelegatingHandler innerHandler)
{
@ -209,7 +210,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
{
if (_isDisposed)
{
throw new ObjectDisposedException("IoT hub client");
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}
}

Просмотреть файл

@ -21,17 +21,23 @@ namespace Microsoft.Azure.Devices.Client.Transport
private RetryPolicy _internalRetryPolicy;
private SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _clientOpenSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _cloudToDeviceMessageSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _cloudToDeviceEventSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _directMethodSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _twinEventsSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private bool _openCalled;
private bool _opened;
private bool _methodsEnabled;
private bool _twinEnabled;
private bool _eventsEnabled;
private bool _deviceReceiveMessageEnabled;
private bool _isDisposing;
private bool _isAnEdgeModule = true;
private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0.
private Task _transportClosedTask;
private readonly CancellationTokenSource _handleDisconnectCts = new CancellationTokenSource();
private readonly CancellationTokenSource _cancelPendingOperationsCts = new CancellationTokenSource();
private readonly ConnectionStatusChangesHandler _onConnectionStatusChanged;
@ -76,20 +82,22 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, message, cancellationToken, nameof(SendEventAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
if (message.IsBodyCalled)
{
message.ResetBody();
}
await base.SendEventAsync(message, cancellationToken).ConfigureAwait(false);
await base.SendEventAsync(message, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -106,11 +114,13 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, messages, cancellationToken, nameof(SendEventAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
foreach (Message m in messages)
{
@ -120,9 +130,9 @@ namespace Microsoft.Azure.Devices.Client.Transport
}
}
await base.SendEventAsync(messages, cancellationToken).ConfigureAwait(false);
await base.SendEventAsync(messages, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -139,14 +149,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, method, cancellationToken, nameof(SendMethodResponseAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await base.SendMethodResponseAsync(method, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await base.SendMethodResponseAsync(method, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -163,14 +175,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(ReceiveAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
return await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
return await base.ReceiveAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
return await base.ReceiveAsync(operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -188,6 +202,8 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Enter(this, timeoutHelper, nameof(ReceiveAsync));
using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime());
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token);
return await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
@ -195,7 +211,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
await EnsureOpenedAsync(false, timeoutHelper).ConfigureAwait(false);
return await base.ReceiveAsync(timeoutHelper).ConfigureAwait(false);
},
cts.Token)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -212,28 +228,38 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(EnableReceiveMessageAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
// Ensure that the connection has been opened, before enabling the callback for receiving messages.
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
// Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Wait to acquire the _cloudToDeviceSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
// The telemetry downlink needs to be enabled only for the first time that the callback is set.
Debug.Assert(!_deviceReceiveMessageEnabled);
await base.EnableReceiveMessageAsync(cancellationToken).ConfigureAwait(false);
await base.EnableReceiveMessageAsync(operationCts.Token).ConfigureAwait(false);
_deviceReceiveMessageEnabled = true;
}
finally
{
_handlerSemaphore?.Release();
try
{
_cloudToDeviceMessageSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -252,28 +278,38 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(EnsurePendingMessagesAreDeliveredAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
// Ensure that the connection has been opened before returning pending messages to the callback.
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
// Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
// Ensure that a callback for receiving messages has been previously set.
Debug.Assert(_deviceReceiveMessageEnabled);
await base.EnsurePendingMessagesAreDeliveredAsync(cancellationToken).ConfigureAwait(false);
await base.EnsurePendingMessagesAreDeliveredAsync(operationCts.Token).ConfigureAwait(false);
}
finally
{
_handlerSemaphore?.Release();
try
{
_cloudToDeviceMessageSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -290,28 +326,38 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(DisableReceiveMessageAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
// Ensure that the connection has been opened, before disabling the callback for receiving messages.
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
// Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
// Ensure that a callback for receiving messages has been previously set.
Debug.Assert(_deviceReceiveMessageEnabled);
await base.DisableReceiveMessageAsync(cancellationToken).ConfigureAwait(false);
await base.DisableReceiveMessageAsync(operationCts.Token).ConfigureAwait(false);
_deviceReceiveMessageEnabled = false;
}
finally
{
_handlerSemaphore?.Release();
try
{
_cloudToDeviceMessageSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -328,25 +374,35 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(EnableMethodsAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await _directMethodSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
Debug.Assert(!_methodsEnabled);
await base.EnableMethodsAsync(cancellationToken).ConfigureAwait(false);
await base.EnableMethodsAsync(operationCts.Token).ConfigureAwait(false);
_methodsEnabled = true;
}
finally
{
_handlerSemaphore?.Release();
try
{
_directMethodSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing direct method subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -363,24 +419,35 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(DisableMethodsAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await _directMethodSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
Debug.Assert(_methodsEnabled);
await base.DisableMethodsAsync(cancellationToken).ConfigureAwait(false);
await base.DisableMethodsAsync(operationCts.Token).ConfigureAwait(false);
_methodsEnabled = false;
}
finally
{
_handlerSemaphore?.Release();
try
{
_directMethodSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing direct method subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -394,28 +461,39 @@ namespace Microsoft.Azure.Devices.Client.Transport
{
try
{
_isAnEdgeModule = isAnEdgeModule;
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(EnableEventReceiveAsync));
_isAnEdgeModule = isAnEdgeModule;
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await _cloudToDeviceEventSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
await base.EnableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);
Debug.Assert(!_eventsEnabled);
await base.EnableEventReceiveAsync(isAnEdgeModule, operationCts.Token).ConfigureAwait(false);
_eventsEnabled = true;
}
finally
{
_handlerSemaphore?.Release();
try
{
_cloudToDeviceEventSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing cloud-to-device event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -429,28 +507,39 @@ namespace Microsoft.Azure.Devices.Client.Transport
{
try
{
_isAnEdgeModule = isAnEdgeModule;
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(DisableEventReceiveAsync));
_isAnEdgeModule = isAnEdgeModule;
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await _cloudToDeviceEventSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
Debug.Assert(_eventsEnabled);
await base.DisableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);
await base.DisableEventReceiveAsync(isAnEdgeModule, operationCts.Token).ConfigureAwait(false);
_eventsEnabled = false;
}
finally
{
_handlerSemaphore?.Release();
try
{
_cloudToDeviceEventSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing cloud-to-device event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -467,24 +556,35 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(EnableTwinPatchAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await _twinEventsSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
Debug.Assert(!_twinEnabled);
await base.EnableTwinPatchAsync(cancellationToken).ConfigureAwait(false);
await base.EnableTwinPatchAsync(operationCts.Token).ConfigureAwait(false);
_twinEnabled = true;
}
finally
{
_handlerSemaphore?.Release();
try
{
_twinEventsSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -501,24 +601,35 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(DisableTwinPatchAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await _twinEventsSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
Debug.Assert(_twinEnabled);
await base.DisableTwinPatchAsync(cancellationToken).ConfigureAwait(false);
await base.DisableTwinPatchAsync(operationCts.Token).ConfigureAwait(false);
_twinEnabled = false;
}
finally
{
_handlerSemaphore?.Release();
try
{
_twinEventsSubscriptionSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -535,14 +646,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(SendTwinGetAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
return await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
return await base.SendTwinGetAsync(cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
return await base.SendTwinGetAsync(operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -559,14 +672,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, reportedProperties, cancellationToken, nameof(SendTwinPatchAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await base.SendTwinPatchAsync(reportedProperties, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await base.SendTwinPatchAsync(reportedProperties, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -583,14 +698,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, lockToken, cancellationToken, nameof(CompleteAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await base.CompleteAsync(lockToken, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await base.CompleteAsync(lockToken, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -607,14 +724,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, lockToken, cancellationToken, nameof(AbandonAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await base.AbandonAsync(lockToken, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await base.AbandonAsync(lockToken, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -631,14 +750,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Enter(this, lockToken, cancellationToken, nameof(RejectAsync));
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
await _internalRetryPolicy
.RunWithRetryAsync(
async () =>
{
await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false);
await base.RejectAsync(lockToken, cancellationToken).ConfigureAwait(false);
await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false);
await base.RejectAsync(lockToken, operationCts.Token).ConfigureAwait(false);
},
cancellationToken)
operationCts.Token)
.ConfigureAwait(false);
}
finally
@ -655,7 +776,6 @@ namespace Microsoft.Azure.Devices.Client.Transport
public override async Task CloseAsync(CancellationToken cancellationToken)
{
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (!_openCalled)
@ -667,15 +787,15 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Enter(this, cancellationToken, nameof(CloseAsync));
_handleDisconnectCts.Cancel();
_cancelPendingOperationsCts.Cancel();
await base.CloseAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
Dispose(true);
if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(CloseAsync));
_handlerSemaphore?.Release();
Dispose(true);
}
}
@ -684,23 +804,25 @@ namespace Microsoft.Azure.Devices.Client.Transport
/// </summary>
private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellationToken)
{
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
// If this object has already been disposed, we will throw an exception indicating that.
// This is the entry point for interacting with the client and this safety check should be done here.
// The current behavior does not support open->close->open
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(RetryDelegatingHandler));
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}
if (Volatile.Read(ref _opened))
if (Interlocked.Read(ref _isOpened) == 1)
{
return;
}
await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (!_opened)
if (Interlocked.Read(ref _isOpened) == 0)
{
if (Logging.IsEnabled)
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));
@ -709,7 +831,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
// we are returning the corresponding connection status change event => disconnected: retry_expired.
try
{
await OpenInternalAsync(withRetry, cancellationToken).ConfigureAwait(false);
await OpenInternalAsync(withRetry, operationCts.Token).ConfigureAwait(false);
}
catch (Exception ex) when (!ex.IsFatal())
{
@ -719,7 +841,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (!_isDisposed)
{
_opened = true;
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
_openCalled = true;
// Send the request for transport close notification.
@ -736,26 +858,42 @@ namespace Microsoft.Azure.Devices.Client.Transport
}
finally
{
_handlerSemaphore?.Release();
try
{
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
}
private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper)
{
if (Volatile.Read(ref _opened))
using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime());
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token);
// If this object has already been disposed, we will throw an exception indicating that.
// This is the entry point for interacting with the client and this safety check should be done here.
// The current behavior does not support open->close->open
if (_isDisposed)
{
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}
if (Interlocked.Read(ref _isOpened) == 1)
{
return;
}
bool gain = await _handlerSemaphore.WaitAsync(timeoutHelper.GetRemainingTime()).ConfigureAwait(false);
if (!gain)
{
throw new TimeoutException("Timed out to acquire handler lock.");
}
await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (!_opened)
if (Interlocked.Read(ref _isOpened) == 0)
{
if (Logging.IsEnabled)
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));
@ -774,7 +912,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (!_isDisposed)
{
_opened = true;
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
_openCalled = true;
// Send the request for transport close notification.
@ -791,12 +929,23 @@ namespace Microsoft.Azure.Devices.Client.Transport
}
finally
{
_handlerSemaphore?.Release();
try
{
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
}
private async Task OpenInternalAsync(bool withRetry, CancellationToken cancellationToken)
{
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);
if (withRetry)
{
await _internalRetryPolicy
@ -809,7 +958,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Enter(this, cancellationToken, nameof(OpenAsync));
// Will throw on error.
await base.OpenAsync(cancellationToken).ConfigureAwait(false);
await base.OpenAsync(operationCts.Token).ConfigureAwait(false);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
}
catch (Exception ex) when (!ex.IsFatal())
@ -823,7 +972,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Exit(this, cancellationToken, nameof(OpenAsync));
}
},
cancellationToken).ConfigureAwait(false);
operationCts.Token).ConfigureAwait(false);
}
else
{
@ -833,7 +982,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Enter(this, cancellationToken, nameof(OpenAsync));
// Will throw on error.
await base.OpenAsync(cancellationToken).ConfigureAwait(false);
await base.OpenAsync(operationCts.Token).ConfigureAwait(false);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
}
catch (Exception ex) when (!ex.IsFatal())
@ -852,6 +1001,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
private async Task OpenInternalAsync(bool withRetry, TimeoutHelper timeoutHelper)
{
using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime());
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token);
if (withRetry)
{
@ -879,7 +1029,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Exit(this, timeoutHelper, nameof(OpenAsync));
}
},
cts.Token)
operationCts.Token)
.ConfigureAwait(false);
}
else
@ -904,8 +1054,6 @@ namespace Microsoft.Azure.Devices.Client.Transport
Logging.Exit(this, timeoutHelper, nameof(OpenAsync));
}
}
}
// Triggered from connection loss event
@ -937,8 +1085,8 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (Logging.IsEnabled)
Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync));
await _handlerSemaphore.WaitAsync().ConfigureAwait(false);
_opened = false;
await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false);
_ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed"
try
{
@ -1009,7 +1157,7 @@ namespace Microsoft.Azure.Devices.Client.Transport
// Send the request for transport close notification.
_transportClosedTask = HandleDisconnectAsync();
_opened = true;
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
if (Logging.IsEnabled)
@ -1026,7 +1174,16 @@ namespace Microsoft.Azure.Devices.Client.Transport
}
finally
{
_handlerSemaphore?.Release();
try
{
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
}
@ -1085,20 +1242,41 @@ namespace Microsoft.Azure.Devices.Client.Transport
if (!_isDisposed)
{
_isDisposing = true;
base.Dispose(disposing);
if (disposing)
{
_handleDisconnectCts?.Cancel();
_handleDisconnectCts?.Dispose();
if (_handlerSemaphore != null && _handlerSemaphore.CurrentCount == 0)
var disposables = new List<IDisposable>
{
_handlerSemaphore.Release();
_handleDisconnectCts,
_cancelPendingOperationsCts,
_clientOpenSemaphore,
_cloudToDeviceMessageSubscriptionSemaphore,
_cloudToDeviceEventSubscriptionSemaphore,
_directMethodSubscriptionSemaphore,
_twinEventsSubscriptionSemaphore,
};
_handleDisconnectCts?.Cancel();
_cancelPendingOperationsCts?.Cancel();
foreach (IDisposable disposable in disposables)
{
try
{
disposable?.Dispose();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Tried disposing the IDisposable {disposable} but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
_handlerSemaphore?.Dispose();
_handlerSemaphore = null;
}
// the _disposed flag is inherited from the base class DefaultDelegatingHandler and is finally set to null there.
// the _disposed flag is inherited from the base class DefaultDelegatingHandler and is finally set to true there.
}
}
finally

Просмотреть файл

@ -27,13 +27,12 @@ namespace Microsoft.Azure.Devices.Client.Test
// arrange
int callCounter = 0;
var ct = CancellationToken.None;
PipelineContext contextMock = Substitute.For<PipelineContext>();
contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
IDelegatingHandler innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock
.OpenAsync(ct)
.OpenAsync(Arg.Any<CancellationToken>())
.Returns(t =>
{
return ++callCounter == 1
@ -45,7 +44,7 @@ namespace Microsoft.Azure.Devices.Client.Test
var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock);
// act
await retryDelegatingHandler.OpenAsync(ct).ConfigureAwait(false);
await retryDelegatingHandler.OpenAsync(CancellationToken.None).ConfigureAwait(false);
// assert
callCounter.Should().Be(2);
@ -255,12 +254,12 @@ namespace Microsoft.Azure.Devices.Client.Test
public async Task RetryTransientErrorThrownAfterNumberOfRetriesThrows()
{
// arrange
using var cts = new CancellationTokenSource(100);
using var cts = new CancellationTokenSource(1000);
var contextMock = Substitute.For<PipelineContext>();
contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock
.OpenAsync(cts.Token)
.OpenAsync(Arg.Any<CancellationToken>())
.Returns(t => throw new IotHubException(TestExceptionMessage, isTransient: true));
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
@ -352,12 +351,12 @@ namespace Microsoft.Azure.Devices.Client.Test
delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
var retryPolicy = new TestRetryPolicy();
var retryPolicy = new TestRetryPolicyRetryTwice();
sut.SetRetryPolicy(retryPolicy);
int innerHandlerCallCounter = 0;
innerHandlerMock.OpenAsync(CancellationToken.None).Returns(t =>
innerHandlerMock.OpenAsync(Arg.Any<CancellationToken>()).Returns(t =>
{
innerHandlerCallCounter++;
throw new IotHubCommunicationException();
@ -397,7 +396,7 @@ namespace Microsoft.Azure.Devices.Client.Test
{
// arrange
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock.AbandonAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
innerHandlerMock.AbandonAsync(null, Arg.Any<CancellationToken>()).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
var contextMock = Substitute.For<PipelineContext>();
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
@ -416,7 +415,7 @@ namespace Microsoft.Azure.Devices.Client.Test
{
// arrange
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock.RejectAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
innerHandlerMock.RejectAsync(null, Arg.Any<CancellationToken>()).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
var contextMock = Substitute.For<PipelineContext>();
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
@ -427,7 +426,7 @@ namespace Microsoft.Azure.Devices.Client.Test
await sut.RejectAsync(Arg.Any<string>(), cts.Token).ExpectedAsync<TaskCanceledException>().ConfigureAwait(false);
}
private class TestRetryPolicy : IRetryPolicy
private class TestRetryPolicyRetryTwice : IRetryPolicy
{
public int Counter { get; private set; }

Просмотреть файл

@ -1,4 +1,7 @@
using System;
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -13,6 +16,7 @@ using Moq;
namespace Microsoft.Azure.Devices.Client.Tests.Amqp
{
[TestClass]
[TestCategory("Unit")]
public class AmqpConnectionPoolTests
{
internal class AmqpConnectionPoolTest : AmqpConnectionPool

Просмотреть файл

@ -548,11 +548,7 @@ jobs:
pool:
vmImage: windows-2022
steps:
- script: |
rem Run dotnet first experience.
dotnet new
rem Start build
build.cmd -clean -build -configuration Debug -package
- powershell: .\build.ps1 -clean -build -configutaion Debug -package
displayName: Build Package
- task: ComponentGovernanceComponentDetection@0