Do not retry when there's an ambient transaction (#621)
* Do not retry when there's an ambient transaction Fixes #615
This commit is contained in:
Родитель
cdff4112a9
Коммит
a1e637d1c7
|
@ -8,7 +8,8 @@ namespace Microsoft.Azure.ServiceBus
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// RetryPolicy implementation where the delay between retries will grow in a staggered exponential manner.
|
/// RetryPolicy implementation where the delay between retries will grow in a staggered exponential manner.
|
||||||
/// RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount
|
/// RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount.
|
||||||
|
/// <remarks>RetryPolicy will not be applied when an ambient transaction is found.</remarks>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class RetryExponential : RetryPolicy
|
public sealed class RetryExponential : RetryPolicy
|
||||||
{
|
{
|
||||||
|
|
|
@ -6,11 +6,13 @@ namespace Microsoft.Azure.ServiceBus
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using System.Transactions;
|
||||||
using Primitives;
|
using Primitives;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Represents an abstraction for retrying messaging operations. Users should not
|
/// Represents an abstraction for retrying messaging operations. Users should not
|
||||||
/// implement this class, and instead should use one of the provided implementations.
|
/// implement this class, and instead should use one of the provided implementations.
|
||||||
|
/// <remarks>RetryPolicy will not be applied when an ambient transaction is found.</remarks>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public abstract class RetryPolicy
|
public abstract class RetryPolicy
|
||||||
{
|
{
|
||||||
|
@ -124,9 +126,9 @@ namespace Microsoft.Azure.ServiceBus
|
||||||
|
|
||||||
internal bool ShouldRetry(TimeSpan remainingTime, int currentRetryCount, Exception lastException, out TimeSpan retryInterval)
|
internal bool ShouldRetry(TimeSpan remainingTime, int currentRetryCount, Exception lastException, out TimeSpan retryInterval)
|
||||||
{
|
{
|
||||||
if (lastException == null)
|
// There is no exception information or there's there's an ambient transaction - should not retry
|
||||||
|
if (lastException == null || Transaction.Current != null)
|
||||||
{
|
{
|
||||||
// there are no exceptions.
|
|
||||||
retryInterval = TimeSpan.Zero;
|
retryInterval = TimeSpan.Zero;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
namespace Microsoft.Azure.ServiceBus.UnitTests
|
||||||
|
{
|
||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using System.Transactions;
|
||||||
|
using Xunit;
|
||||||
|
|
||||||
|
public class RetryPolicyTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
[DisplayTestMethodName]
|
||||||
|
public async Task Should_retry_when_throttled_and_no_ambient_transaction_is_detected()
|
||||||
|
{
|
||||||
|
var retryPolicy = RetryPolicy.Default;
|
||||||
|
|
||||||
|
var numberOfExecutions = 0;
|
||||||
|
|
||||||
|
await retryPolicy.RunOperation(() =>
|
||||||
|
{
|
||||||
|
if (numberOfExecutions > 1)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
numberOfExecutions++;
|
||||||
|
|
||||||
|
throw new ServerBusyException("Rico KABOOM!");
|
||||||
|
}, TimeSpan.FromSeconds(30));
|
||||||
|
|
||||||
|
Assert.Equal(2, numberOfExecutions);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
[DisplayTestMethodName]
|
||||||
|
public async Task Should_not_retry_when_throttled_and_ambient_transaction_is_detected()
|
||||||
|
{
|
||||||
|
var retryPolicy = RetryPolicy.Default;
|
||||||
|
var numberOfExecutions = 0;
|
||||||
|
|
||||||
|
using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
|
||||||
|
{
|
||||||
|
await Assert.ThrowsAsync<ServerBusyException>(() =>
|
||||||
|
retryPolicy.RunOperation(() =>
|
||||||
|
{
|
||||||
|
if (numberOfExecutions > 1)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
numberOfExecutions++;
|
||||||
|
|
||||||
|
throw new ServerBusyException("Rico KABOOM!");
|
||||||
|
}, TimeSpan.FromSeconds(30)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(1, numberOfExecutions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,8 +25,8 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
||||||
internal async Task ReceiveDeleteTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
|
internal async Task ReceiveDeleteTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
|
||||||
{
|
{
|
||||||
await TestUtility.SendMessagesAsync(messageSender, messageCount);
|
await TestUtility.SendMessagesAsync(messageSender, messageCount);
|
||||||
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount);
|
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount, TimeSpan.FromSeconds(10));
|
||||||
Assert.True(messageCount == receivedMessages.Count);
|
Assert.Equal(receivedMessages.Count, messageCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task PeekLockWithAbandonTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
|
internal async Task PeekLockWithAbandonTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
|
||||||
|
|
|
@ -3,9 +3,12 @@
|
||||||
|
|
||||||
namespace Microsoft.Azure.ServiceBus.UnitTests
|
namespace Microsoft.Azure.ServiceBus.UnitTests
|
||||||
{
|
{
|
||||||
|
using System;
|
||||||
|
|
||||||
static class TestConstants
|
static class TestConstants
|
||||||
{
|
{
|
||||||
internal const int MaxAttemptsCount = 5;
|
internal const int MaxAttemptsCount = 5;
|
||||||
|
internal readonly static TimeSpan WaitTimeBetweenAttempts = TimeSpan.FromSeconds(1);
|
||||||
|
|
||||||
internal const string ConnectionStringEnvironmentVariable = "azure-service-bus-dotnet/connectionstring";
|
internal const string ConnectionStringEnvironmentVariable = "azure-service-bus-dotnet/connectionstring";
|
||||||
|
|
||||||
|
|
|
@ -67,18 +67,27 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
||||||
Log($"Sent {messageCount} messages");
|
Log($"Sent {messageCount} messages");
|
||||||
}
|
}
|
||||||
|
|
||||||
internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount)
|
internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount, TimeSpan timeout = default)
|
||||||
{
|
{
|
||||||
var receiveAttempts = 0;
|
var receiveAttempts = 0;
|
||||||
var messagesToReturn = new List<Message>();
|
var messagesToReturn = new List<Message>();
|
||||||
|
var stopwatch = Stopwatch.StartNew();
|
||||||
|
|
||||||
while (receiveAttempts++ < TestConstants.MaxAttemptsCount && messagesToReturn.Count < messageCount)
|
if (timeout == default)
|
||||||
|
{
|
||||||
|
timeout = TimeSpan.Zero;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (messagesToReturn.Count < messageCount && (receiveAttempts++ < TestConstants.MaxAttemptsCount || stopwatch.Elapsed < timeout))
|
||||||
{
|
{
|
||||||
var messages = await messageReceiver.ReceiveAsync(messageCount - messagesToReturn.Count);
|
var messages = await messageReceiver.ReceiveAsync(messageCount - messagesToReturn.Count);
|
||||||
if (messages != null)
|
if (messages == null)
|
||||||
{
|
{
|
||||||
messagesToReturn.AddRange(messages);
|
await Task.Delay(TestConstants.WaitTimeBetweenAttempts);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
messagesToReturn.AddRange(messages);
|
||||||
}
|
}
|
||||||
|
|
||||||
VerifyUniqueMessages(messagesToReturn);
|
VerifyUniqueMessages(messagesToReturn);
|
||||||
|
@ -98,7 +107,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
||||||
var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);
|
var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);
|
||||||
if (msg != null)
|
if (msg != null)
|
||||||
{
|
{
|
||||||
messagesToReturn.Add(msg);
|
messagesToReturn.Add(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Загрузка…
Ссылка в новой задаче