Introduces transaction support.
Operations can now be executed inside a TransactionScope providing atomicity.
Supported operations for transactions:
`SendAsync`, `CompleteAsync`, `DeferAsync`, `DeadLetterAsync`, `AbandonAsync`

Transaction cannot work across connections. Hence, to be able to Send and Receive in a single transaction, `ServiceBusConnection` object has been exposed. Each of the client entities now accepts an already created connection.

Sample usage:
```csharp
var connection = new ServiceBusConnection(ConnectionString);
var sender = new MessageSender(connection, QueueName);
var receiver = new MessageReceiver(connection, QueueName);
var receivedMessage = await receiver.ReceiveAsync();

using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
	await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
	await sender.SendAsync(message).ConfigureAwait(false);
	ts.Complete();
        // Or, ts.Dispose();
}

await sender.CloseAsync();
await receiver.CloseAsync();
await connection.CloseAsync();
```
This commit is contained in:
Neeraj Makam 2018-03-29 19:38:05 -07:00 коммит произвёл GitHub
Родитель 61948293c7
Коммит e693fa6ce3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
41 изменённых файлов: 1384 добавлений и 414 удалений

Просмотреть файл

@ -5,7 +5,7 @@ $configuration = if ($CONFIGURATION -ne $null) { $CONFIGURATION } else { 'Debug
$platform = if ($PLATFORM -ne $null) { $PLATFORM } else { 'Any CPU' }
$projectFolder = if ($ENV:APPVEYOR_BUILD_FOLDER -ne $null) { "$ENV:APPVEYOR_BUILD_FOLDER" } else { $(Get-Location).path }
$buildFolder = $projectFolder + '\build\'
$runtime = if ($ENV:DotNetRunTime -ne $null) { $ENV:DotNetRunTime } else { 'netcoreapp1.0' }
$runtime = if ($ENV:DotNetRunTime -ne $null) { $ENV:DotNetRunTime } else { 'netcoreapp2.0' }
$artifactsFolder = $buildFolder + 'artifacts\'
$appProject = $projectFolder + '\src\Microsoft.Azure.ServiceBus\Microsoft.Azure.ServiceBus.csproj'
$testProject = $projectFolder + '\test\Microsoft.Azure.ServiceBus.UnitTests\Microsoft.Azure.ServiceBus.UnitTests.csproj'
@ -157,10 +157,22 @@ function Run-UnitTests
}
if ([bool]$codeCovSecret)
{
$ENV:PATH = 'C:\\Python34;C:\\Python34\\Scripts;' + $ENV:PATH
python -m pip install --upgrade pip
pip install git+git://github.com/codecov/codecov-python.git
codecov -f $coverageFile -t $codeCovSecret -X gcov
try
{
$ENV:PATH = 'C:\\Python34;C:\\Python34\\Scripts;' + $ENV:PATH
python -m pip install --upgrade pip
pip install git+git://github.com/codecov/codecov-python.git
codecov -f $coverageFile -t $codeCovSecret -X gcov
#choco install codecov
#codecov.exe -f $coverageFile -t $codeCovSecret -X gcov
}
catch
{
$error | Format-List *
$_ |select -expandproperty invocationinfo
Write-Host -ForegroundColor Red "Codecov failed"
}
}
else
{
@ -200,13 +212,35 @@ function Delete-AzureResources
Write-Host "Completed deleting Azure resources"
}
function Cleanup-EnvironmentVariables
{
Write-Host "Cleaning Environment variables"
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/CodeCovSecret', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/ClientSecret', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/TenantId', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/AppId', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/CodeCovSecret', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/ClientSecret', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/TenantId', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/AppId', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/CodeCovSecret', ' ', "Process")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/ClientSecret', ' ', "Process")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/TenantId', ' ', "Process")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/AppId', ' ', "Process")
}
Cleanup-EnvironmentVariables
Build-Solution
if (-Not $canDeploy -and -Not [bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Write-Host "Build exiting. CanDeploy: " + $canDeploy
return
}
try {
if ($canDeploy -and -not [bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Deploy-AzureResources
}
if ([bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Run-UnitTests
@ -220,4 +254,4 @@ finally {
if ($canDeploy -and $resourceGroupName) {
Delete-AzureResources
}
}
}

Просмотреть файл

@ -61,7 +61,6 @@ namespace Microsoft.Azure.ServiceBus.Amqp
MessagingEntityType.Subscriber,
this.ReceiveMode,
this.ServiceBusConnection,
null,
this.CbsTokenProvider,
this.RetryPolicy,
this.PrefetchCount);

Просмотреть файл

@ -0,0 +1,112 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Amqp
{
using System;
using System.Threading.Tasks;
using System.Transactions;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.ServiceBus.Primitives;
class AmqpTransactionEnlistment : Singleton<AmqpTransactionEnlistment>, IPromotableSinglePhaseNotification
{
readonly string transactionId;
readonly AmqpTransactionManager transactionManager;
readonly ServiceBusConnection serviceBusConnection;
public AmqpTransactionEnlistment(
Transaction transaction,
AmqpTransactionManager transactionManager,
ServiceBusConnection serviceBusConnection)
{
this.transactionId = transaction.TransactionInformation.LocalIdentifier;
this.transactionManager = transactionManager;
this.serviceBusConnection = serviceBusConnection;
}
public ArraySegment<byte> AmqpTransactionId { get; private set; }
protected override async Task<AmqpTransactionEnlistment> OnCreateAsync(TimeSpan timeout)
{
try
{
var faultTolerantController = this.serviceBusConnection.TransactionController;
var controller = await faultTolerantController.GetOrCreateAsync(this.serviceBusConnection.OperationTimeout).ConfigureAwait(false);
this.AmqpTransactionId = await controller.DeclareAsync().ConfigureAwait(false);
MessagingEventSource.Log.AmqpTransactionDeclared(this.transactionId, this.AmqpTransactionId);
return this;
}
catch (Exception exception)
{
MessagingEventSource.Log.AmqpTransactionInitializeException(this.transactionId, exception);
this.transactionManager.RemoveEnlistment(this.transactionId);
throw;
}
}
protected override void OnSafeClose(AmqpTransactionEnlistment value)
{
}
void IPromotableSinglePhaseNotification.Initialize()
{
}
void IPromotableSinglePhaseNotification.SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
{
this.transactionManager.RemoveEnlistment(this.transactionId);
TaskExtensionHelper.Schedule(() => this.SinglePhaseCommitAsync(singlePhaseEnlistment));
}
async Task SinglePhaseCommitAsync(SinglePhaseEnlistment singlePhaseEnlistment)
{
try
{
var faultTolerantController = this.serviceBusConnection.TransactionController;
var controller = await faultTolerantController.GetOrCreateAsync(this.serviceBusConnection.OperationTimeout).ConfigureAwait(false);
await controller.DischargeAsync(this.AmqpTransactionId, fail: false).ConfigureAwait(false);
singlePhaseEnlistment.Committed();
MessagingEventSource.Log.AmqpTransactionDischarged(this.transactionId, this.AmqpTransactionId, false);
await this.CloseAsync().ConfigureAwait(false);
}
catch (Exception e)
{
Exception exception = AmqpExceptionHelper.GetClientException(e, null);
MessagingEventSource.Log.AmqpTransactionDischargeException(this.transactionId, this.AmqpTransactionId, exception);
singlePhaseEnlistment.InDoubt(exception);
}
}
void IPromotableSinglePhaseNotification.Rollback(SinglePhaseEnlistment singlePhaseEnlistment)
{
this.transactionManager.RemoveEnlistment(this.transactionId);
TaskExtensionHelper.Schedule(() => this.RollbackAsync(singlePhaseEnlistment));
}
async Task RollbackAsync(SinglePhaseEnlistment singlePhaseEnlistment)
{
try
{
var faultTolerantController = this.serviceBusConnection.TransactionController;
var controller = await faultTolerantController.GetOrCreateAsync(this.serviceBusConnection.OperationTimeout).ConfigureAwait(false);
await controller.DischargeAsync(this.AmqpTransactionId, fail: true).ConfigureAwait(false);
singlePhaseEnlistment.Aborted();
MessagingEventSource.Log.AmqpTransactionDischarged(this.transactionId, this.AmqpTransactionId, true);
}
catch (Exception e)
{
Exception exception = AmqpExceptionHelper.GetClientException(e, null);
MessagingEventSource.Log.AmqpTransactionDischargeException(this.transactionId, this.AmqpTransactionId, exception);
singlePhaseEnlistment.Aborted(exception);
}
}
byte[] ITransactionPromoter.Promote()
{
throw new TransactionPromotionException("Local transactions are not supported with other resource managers/DTC.");
}
}
}

Просмотреть файл

@ -0,0 +1,57 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Amqp
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;
internal class AmqpTransactionManager
{
readonly object syncRoot = new object();
readonly Dictionary<string, AmqpTransactionEnlistment> enlistmentMap = new Dictionary<string, AmqpTransactionEnlistment>(StringComparer.Ordinal);
public static AmqpTransactionManager Instance { get; } = new AmqpTransactionManager();
public async Task<ArraySegment<byte>> EnlistAsync(
Transaction transaction,
ServiceBusConnection serviceBusConnection)
{
if (transaction.IsolationLevel != IsolationLevel.Serializable)
{
throw new InvalidOperationException($"The only supported IsolationLevel is {nameof(IsolationLevel.Serializable)}");
}
string transactionId = transaction.TransactionInformation.LocalIdentifier;
AmqpTransactionEnlistment transactionEnlistment;
lock (this.syncRoot)
{
if (!this.enlistmentMap.TryGetValue(transactionId, out transactionEnlistment))
{
transactionEnlistment = new AmqpTransactionEnlistment(transaction, this, serviceBusConnection);
this.enlistmentMap.Add(transactionId, transactionEnlistment);
if (!transaction.EnlistPromotableSinglePhase(transactionEnlistment))
{
this.enlistmentMap.Remove(transactionId);
throw new InvalidOperationException("Local transactions are not supported with other resource managers/DTC.");
}
}
}
transactionEnlistment = await transactionEnlistment.GetOrCreateAsync(serviceBusConnection.OperationTimeout).ConfigureAwait(false);
return transactionEnlistment.AmqpTransactionId;
}
public void RemoveEnlistment(string transactionId)
{
lock (this.syncRoot)
{
this.enlistmentMap.Remove(transactionId);
}
}
}
}

Просмотреть файл

@ -49,6 +49,16 @@ namespace Microsoft.Azure.ServiceBus
}
}
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public abstract ServiceBusConnection ServiceBusConnection { get; }
/// <summary>
/// Gets the name of the entity.
/// </summary>
public abstract string Path { get; }
/// <summary>
/// Duration after which individual operations will timeout.
/// </summary>

Просмотреть файл

@ -41,14 +41,14 @@ namespace Microsoft.Azure.ServiceBus.Core
long LastPeekedSequenceNumber { get; }
/// <summary>
/// Receive a message from the entity defined by <see cref="IReceiverClient.Path"/> using <see cref="ReceiveMode"/> mode.
/// Receive a message from the entity defined by <see cref="IClientEntity.Path"/> using <see cref="ReceiveMode"/> mode.
/// </summary>
/// <returns>The message received. Returns null if no message is found.</returns>
/// <remarks>Operation will time out after duration of <see cref="ClientEntity.OperationTimeout"/></remarks>
Task<Message> ReceiveAsync();
/// <summary>
/// Receive a message from the entity defined by <see cref="IReceiverClient.Path"/> using <see cref="ReceiveMode"/> mode.
/// Receive a message from the entity defined by <see cref="IClientEntity.Path"/> using <see cref="ReceiveMode"/> mode.
/// </summary>
/// <param name="operationTimeout">The time span the client waits for receiving a message before it times out.</param>
/// <returns>The message received. Returns null if no message is found.</returns>
@ -60,7 +60,7 @@ namespace Microsoft.Azure.ServiceBus.Core
Task<Message> ReceiveAsync(TimeSpan operationTimeout);
/// <summary>
/// Receives a maximum of <paramref name="maxMessageCount"/> messages from the entity defined by <see cref="IReceiverClient.Path"/> using <see cref="ReceiveMode"/> mode.
/// Receives a maximum of <paramref name="maxMessageCount"/> messages from the entity defined by <see cref="IClientEntity.Path"/> using <see cref="ReceiveMode"/> mode.
/// </summary>
/// <param name="maxMessageCount">The maximum number of messages that will be received.</param>
/// <returns>List of messages received. Returns null if no message is found.</returns>
@ -68,7 +68,7 @@ namespace Microsoft.Azure.ServiceBus.Core
Task<IList<Message>> ReceiveAsync(int maxMessageCount);
/// <summary>
/// Receives a maximum of <paramref name="maxMessageCount"/> messages from the entity defined by <see cref="IReceiverClient.Path"/> using <see cref="ReceiveMode"/> mode.
/// Receives a maximum of <paramref name="maxMessageCount"/> messages from the entity defined by <see cref="IClientEntity.Path"/> using <see cref="ReceiveMode"/> mode.
/// </summary>
/// <param name="maxMessageCount">The maximum number of messages that will be received.</param>
/// <param name="operationTimeout">The time span the client waits for receiving a message before it times out.</param>

Просмотреть файл

@ -38,11 +38,6 @@ namespace Microsoft.Azure.ServiceBus.Core
/// </remarks>
int PrefetchCount { get; set; }
/// <summary>
/// Gets the path of the <see cref="IReceiverClient"/>. This is either the name of the queue, or the full path of the subscription.
/// </summary>
string Path { get; }
/// <summary>
/// Gets the <see cref="ServiceBus.ReceiveMode"/> of the current receiver.
/// </summary>

Просмотреть файл

@ -9,6 +9,7 @@ namespace Microsoft.Azure.ServiceBus.Core
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
@ -91,17 +92,13 @@ namespace Microsoft.Azure.ServiceBus.Core
ReceiveMode receiveMode = ReceiveMode.PeekLock,
RetryPolicy retryPolicy = null,
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, new ServiceBusNamespaceConnection(connectionString), null, null, retryPolicy, prefetchCount)
: this(entityPath, null, receiveMode, new ServiceBusConnection(connectionString), null, retryPolicy, prefetchCount)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ownsConnection = true;
}
@ -125,26 +122,37 @@ namespace Microsoft.Azure.ServiceBus.Core
ReceiveMode receiveMode = ReceiveMode.PeekLock,
RetryPolicy retryPolicy = null,
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, new ServiceBusNamespaceConnection(endpoint, transportType, retryPolicy), tokenProvider, null, retryPolicy, prefetchCount)
: this(entityPath, null, receiveMode, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, null, retryPolicy, prefetchCount)
{
if (tokenProvider == null)
{
throw Fx.Exception.ArgumentNull(nameof(tokenProvider));
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ownsConnection = true;
}
/// <summary>
/// Creates a new AMQP MessageReceiver on a given <see cref="ServiceBusConnection"/>
/// </summary>
/// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
/// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path.
/// You can use <see cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/>, to help create this path.</param>
/// <param name="receiveMode">The <see cref="ServiceBus.ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
/// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bus. Defaults to <see cref="RetryPolicy.Default"/></param>
/// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this receiver
/// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
public MessageReceiver(
ServiceBusConnection serviceBusConnection,
string entityPath,
ReceiveMode receiveMode = ReceiveMode.PeekLock,
RetryPolicy retryPolicy = null,
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, serviceBusConnection, null, retryPolicy, prefetchCount)
{
this.ownsConnection = false;
}
internal MessageReceiver(
string entityPath,
MessagingEntityType? entityType,
ReceiveMode receiveMode,
ServiceBusConnection serviceBusConnection,
ITokenProvider tokenProvider,
ICbsTokenProvider cbsTokenProvider,
RetryPolicy retryPolicy,
int prefetchCount = Constants.DefaultClientPrefetchCount,
@ -154,12 +162,29 @@ namespace Microsoft.Azure.ServiceBus.Core
{
MessagingEventSource.Log.MessageReceiverCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath, receiveMode.ToString());
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.ReceiveMode = receiveMode;
this.Path = entityPath;
this.EntityType = entityType;
tokenProvider = tokenProvider ?? this.ServiceBusConnection.CreateTokenProvider();
this.CbsTokenProvider = cbsTokenProvider ?? new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
if (cbsTokenProvider != null)
{
this.CbsTokenProvider = cbsTokenProvider;
}
else if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
}
else
{
throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
}
this.SessionIdInternal = sessionId;
this.isSessionReceiver = isSessionReceiver;
this.ReceiveLinkManager = new FaultTolerantAmqpObject<ReceivingAmqpLink>(this.CreateLinkAsync, CloseSession);
@ -238,7 +263,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
/// <summary>The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path.</summary>
public virtual string Path { get; }
public override string Path { get; }
/// <summary>
/// Duration after which individual operations will timeout.
@ -248,6 +273,11 @@ namespace Microsoft.Azure.ServiceBus.Core
set => this.ServiceBusConnection.OperationTimeout = value;
}
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public override ServiceBusConnection ServiceBusConnection { get; }
/// <summary>
/// Gets the DateTime that the current receiver is locked until. This is only applicable when Sessions are used.
/// </summary>
@ -262,8 +292,6 @@ namespace Microsoft.Azure.ServiceBus.Core
Exception LinkException { get; set; }
ServiceBusConnection ServiceBusConnection { get; }
ICbsTokenProvider CbsTokenProvider { get; }
internal FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLinkManager { get; }
@ -944,6 +972,14 @@ namespace Microsoft.Azure.ServiceBus.Core
}
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
var ambientTransaction = Transaction.Current;
if (ambientTransaction != null)
{
transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBusConnection).ConfigureAwait(false);
}
if (!this.RequestResponseLinkManager.TryGetOpenedObject(out var requestResponseAmqpLink))
{
MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdInternal, true, this.LinkException);
@ -951,7 +987,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
var responseAmqpMessage = await Task.Factory.FromAsync(
(c, s) => requestResponseAmqpLink.BeginRequest(amqpMessage, timeoutHelper.RemainingTime(), c, s),
(c, s) => requestResponseAmqpLink.BeginRequest(amqpMessage, transactionId, timeoutHelper.RemainingTime(), c, s),
(a) => requestResponseAmqpLink.EndRequest(a),
this).ConfigureAwait(false);
@ -1333,6 +1369,13 @@ namespace Microsoft.Azure.ServiceBus.Core
ReceivingAmqpLink receiveLink = null;
try
{
ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
var ambientTransaction = Transaction.Current;
if (ambientTransaction != null)
{
transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBusConnection).ConfigureAwait(false);
}
if (!this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
{
MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdInternal, false, this.LinkException);
@ -1344,7 +1387,7 @@ namespace Microsoft.Azure.ServiceBus.Core
foreach (ArraySegment<byte> deliveryTag in deliveryTags)
{
disposeMessageTasks[i++] = Task.Factory.FromAsync(
(c, s) => receiveLink.BeginDisposeMessage(deliveryTag, outcome, true, timeoutHelper.RemainingTime(), c, s),
(c, s) => receiveLink.BeginDisposeMessage(deliveryTag, transactionId, outcome, true, timeoutHelper.RemainingTime(), c, s),
a => receiveLink.EndDisposeMessage(a),
this);
}

Просмотреть файл

@ -9,6 +9,7 @@ namespace Microsoft.Azure.ServiceBus.Core
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
@ -64,16 +65,12 @@ namespace Microsoft.Azure.ServiceBus.Core
string connectionString,
string entityPath,
RetryPolicy retryPolicy = null)
: this(entityPath, null, new ServiceBusNamespaceConnection(connectionString), null, null, retryPolicy)
: this(entityPath, null, new ServiceBusConnection(connectionString), null, retryPolicy)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ownsConnection = true;
}
@ -93,36 +90,58 @@ namespace Microsoft.Azure.ServiceBus.Core
ITokenProvider tokenProvider,
TransportType transportType = TransportType.Amqp,
RetryPolicy retryPolicy = null)
: this(entityPath, null, new ServiceBusNamespaceConnection(endpoint, transportType, retryPolicy), tokenProvider, null, retryPolicy)
: this(entityPath, null, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, null, retryPolicy)
{
if (tokenProvider == null)
{
throw Fx.Exception.ArgumentNull(nameof(tokenProvider));
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ownsConnection = true;
}
/// <summary>
/// Creates a new AMQP MessageSender on a given <see cref="ServiceBusConnection"/>
/// </summary>
/// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
/// <param name="entityPath">The path of the entity this sender should connect to.</param>
/// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bus. Defaults to <see cref="RetryPolicy.Default"/></param>
public MessageSender(
ServiceBusConnection serviceBusConnection,
string entityPath,
RetryPolicy retryPolicy = null)
: this(entityPath, null, serviceBusConnection, null, retryPolicy)
{
this.ownsConnection = false;
}
internal MessageSender(
string entityPath,
MessagingEntityType? entityType,
ServiceBusConnection serviceBusConnection,
ITokenProvider tokenProvider,
ICbsTokenProvider cbsTokenProvider,
RetryPolicy retryPolicy)
: base(nameof(MessageSender), entityPath, retryPolicy ?? RetryPolicy.Default)
{
MessagingEventSource.Log.MessageSenderCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath);
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.Path = entityPath;
this.EntityType = entityType;
tokenProvider = tokenProvider ?? this.ServiceBusConnection.CreateTokenProvider();
this.CbsTokenProvider = cbsTokenProvider ?? new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
if (cbsTokenProvider != null)
{
this.CbsTokenProvider = cbsTokenProvider;
}
else if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
}
else
{
throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
}
this.SendLinkManager = new FaultTolerantAmqpObject<SendingAmqpLink>(this.CreateLinkAsync, CloseSession);
this.RequestResponseLinkManager = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(this.CreateRequestResponseLinkAsync, CloseRequestResponseSession);
this.clientLinkManager = new ActiveClientLinkManager(this, this.CbsTokenProvider);
@ -140,7 +159,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// <summary>
/// Gets the entity path of the MessageSender.
/// </summary>
public virtual string Path { get; }
public override string Path { get; }
/// <summary>
/// Duration after which individual operations will timeout.
@ -151,9 +170,12 @@ namespace Microsoft.Azure.ServiceBus.Core
set => this.ServiceBusConnection.OperationTimeout = value;
}
internal MessagingEntityType? EntityType { get; }
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public override ServiceBusConnection ServiceBusConnection { get; }
ServiceBusConnection ServiceBusConnection { get; }
internal MessagingEntityType? EntityType { get; }
ICbsTokenProvider CbsTokenProvider { get; }
@ -347,13 +369,20 @@ namespace Microsoft.Azure.ServiceBus.Core
var amqpMessage = amqpRequestMessage.AmqpMessage;
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
var ambientTransaction = Transaction.Current;
if (ambientTransaction != null)
{
transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBusConnection).ConfigureAwait(false);
}
if (!this.RequestResponseLinkManager.TryGetOpenedObject(out var requestResponseAmqpLink))
{
requestResponseAmqpLink = await this.RequestResponseLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}
var responseAmqpMessage = await Task.Factory.FromAsync(
(c, s) => requestResponseAmqpLink.BeginRequest(amqpMessage, timeoutHelper.RemainingTime(), c, s),
(c, s) => requestResponseAmqpLink.BeginRequest(amqpMessage, transactionId, timeoutHelper.RemainingTime(), c, s),
a => requestResponseAmqpLink.EndRequest(a),
this).ConfigureAwait(false);
@ -457,6 +486,13 @@ namespace Microsoft.Azure.ServiceBus.Core
SendingAmqpLink amqpLink = null;
try
{
ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
var ambientTransaction = Transaction.Current;
if (ambientTransaction != null)
{
transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBusConnection).ConfigureAwait(false);
}
if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink))
{
amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
@ -470,7 +506,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
}
var outcome = await amqpLink.SendMessageAsync(amqpMessage, this.GetNextDeliveryTag(), AmqpConstants.NullBinary, timeoutHelper.RemainingTime()).ConfigureAwait(false);
var outcome = await amqpLink.SendMessageAsync(amqpMessage, this.GetNextDeliveryTag(), transactionId, timeoutHelper.RemainingTime()).ConfigureAwait(false);
if (outcome.DescriptorCode != Accepted.Code)
{
@ -487,7 +523,6 @@ namespace Microsoft.Azure.ServiceBus.Core
async Task<long> OnScheduleMessageAsync(Message message)
{
// TODO: Ensure System.Transactions.Transaction.Current is null. Transactions are not supported by 1.0.0 version of dotnet core.
using (var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message))
{
var request = AmqpRequestMessage.CreateRequest(
@ -540,7 +575,6 @@ namespace Microsoft.Azure.ServiceBus.Core
async Task OnCancelScheduledMessageAsync(long sequenceNumber)
{
// TODO: Ensure System.Transactions.Transaction.Current is null. Transactions are not supported by 1.0.0 version of dotnet core.
var request =
AmqpRequestMessage.CreateRequest(
ManagementConstants.Operations.CancelScheduledMessageOperation,

Просмотреть файл

@ -24,6 +24,11 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
bool IsClosedOrClosing { get; }
/// <summary>
/// Gets the entity path.
/// </summary>
string Path { get; }
/// <summary>
/// Duration after which individual operations will timeout.
/// </summary>
@ -34,6 +39,11 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
Task CloseAsync();
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
ServiceBusConnection ServiceBusConnection { get; }
/// <summary>
/// Gets a list of currently registered plugins for this client.
/// </summary>

Просмотреть файл

@ -26,7 +26,7 @@ namespace Microsoft.Azure.ServiceBus
int prefetchCount = Constants.DefaultClientPrefetchCount,
string sessionId = null,
bool isSessionReceiver = false)
: base(entityPath, entityType, receiveMode, serviceBusConnection, null, cbsTokenProvider, retryPolicy, prefetchCount, sessionId, isSessionReceiver)
: base(entityPath, entityType, receiveMode, serviceBusConnection, cbsTokenProvider, retryPolicy, prefetchCount, sessionId, isSessionReceiver)
{
this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
}

Просмотреть файл

@ -7,6 +7,7 @@ namespace Microsoft.Azure.ServiceBus
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.ServiceBus.Amqp;
@ -1253,5 +1254,88 @@ namespace Microsoft.Azure.ServiceBus
{
this.WriteEvent(107, clientId, exception);
}
[NonEvent]
public void AmqpTransactionInitializeException(string transactionId, Exception exception)
{
if (this.IsEnabled())
{
this.AmqpTransactionInitializeException(transactionId, exception.ToString());
}
}
[Event(108, Level = EventLevel.Error, Message = "AmqpTransactionInitializeException for TransactionId: {0} Exception: {1}.")]
void AmqpTransactionInitializeException(string transactionId, string exception)
{
this.WriteEvent(108, transactionId, exception);
}
[NonEvent]
public void AmqpTransactionDeclared(string localTransactionId, ArraySegment<byte> amqpTransactionId)
{
if (this.IsEnabled())
{
this.AmqpTransactionDeclared(localTransactionId, amqpTransactionId.GetAsciiString());
}
}
[Event(109, Level = EventLevel.Informational, Message = "AmqpTransactionDeclared for LocalTransactionId: {0} AmqpTransactionId: {1}.")]
void AmqpTransactionDeclared(string transactionId, string amqpTransactionId)
{
this.WriteEvent(109, transactionId, amqpTransactionId);
}
[NonEvent]
public void AmqpTransactionDischarged(string localTransactionId, ArraySegment<byte> amqpTransactionId, bool rollback)
{
if (this.IsEnabled())
{
this.AmqpTransactionDischarged(localTransactionId, amqpTransactionId.GetAsciiString(), rollback);
}
}
[Event(110, Level = EventLevel.Informational, Message = "AmqpTransactionDischarged for LocalTransactionId: {0} AmqpTransactionId: {1} Rollback: {2}.")]
void AmqpTransactionDischarged(string transactionId, string amqpTransactionId, bool rollback)
{
this.WriteEvent(110, transactionId, amqpTransactionId, rollback);
}
[NonEvent]
public void AmqpTransactionDischargeException(string transactionId, ArraySegment<byte> amqpTransactionId, Exception exception)
{
if (this.IsEnabled())
{
this.AmqpTransactionDischargeException(transactionId, amqpTransactionId.GetAsciiString(), exception.ToString());
}
}
[Event(111, Level = EventLevel.Error, Message = "AmqpTransactionDischargeException for TransactionId: {0} AmqpTransactionId: {1} Exception: {2}.")]
void AmqpTransactionDischargeException(string transactionId, string amqpTransactionId, string exception)
{
this.WriteEvent(111, transactionId, amqpTransactionId, exception);
}
[NonEvent]
public void AmqpCreateControllerException(string connectionManager, Exception exception)
{
if (this.IsEnabled())
{
this.AmqpCreateControllerException(connectionManager, exception.ToString());
}
}
[Event(112, Level = EventLevel.Error, Message = "AmqpCreateControllerException for ConnectionManager: {0} Exception: {1}.")]
void AmqpCreateControllerException(string connectionManager, string exception)
{
this.WriteEvent(112, connectionManager, exception);
}
}
internal static class TraceHelper
{
public static string GetAsciiString(this ArraySegment<byte> arraySegment)
{
return arraySegment.Array == null ? string.Empty : Encoding.ASCII.GetString(arraySegment.Array, arraySegment.Offset, arraySegment.Count);
}
}
}

Просмотреть файл

@ -30,7 +30,7 @@
</Target>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Amqp" Version="[2.1.3, 3.0.0)" />
<PackageReference Include="Microsoft.Azure.Amqp" Version="[2.2.0, 3.0.0)" />
<PackageReference Include="Microsoft.Azure.Services.AppAuthentication" Version="1.1.0-preview" />
<PackageReference Include="Microsoft.IdentityModel.Clients.ActiveDirectory" Version="[3.17.2, 4.0.0)" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.4.1" />
@ -43,6 +43,10 @@
<DotNetCliToolReference Include="dotnet-sourcelink-git" Version="2.1.2" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net461'">
<Reference Include="System.Transactions" />
</ItemGroup>
<ItemGroup>
<Compile Update="Resources.Designer.cs">
<DesignTime>True</DesignTime>

Просмотреть файл

@ -1,145 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.ServiceBus.Amqp;
internal abstract class ServiceBusConnection
{
static readonly Version AmqpVersion = new Version(1, 0, 0, 0);
protected ServiceBusConnection(TimeSpan operationTimeout, RetryPolicy retryPolicy)
{
this.OperationTimeout = operationTimeout;
this.RetryPolicy = retryPolicy;
}
public Uri Endpoint { get; set; }
/// <summary>
/// OperationTimeout is applied in erroneous situations to notify the caller about the relevant <see cref="ServiceBusException"/>
/// </summary>
public TimeSpan OperationTimeout { get; set; }
/// <summary>
/// Get the retry policy instance that was created as part of this builder's creation.
/// </summary>
public RetryPolicy RetryPolicy { get; set; }
/// <summary>
/// Get the shared access policy key value from the connection string
/// </summary>
/// <value>Shared Access Signature key</value>
public string SasKey { get; set; }
/// <summary>
/// Get the shared access policy name from the connection string
/// </summary>
public string SasKeyName { get; set; }
/// <summary>
/// Get the shared access signature token from the connection string
/// </summary>
public string SasToken { get; set; }
/// <summary>
/// Get the transport type from the connection string.
/// <remarks>Amqp and AmqpWebSockets are available.</remarks>
/// </summary>
public TransportType TransportType { get; set; }
internal FaultTolerantAmqpObject<AmqpConnection> ConnectionManager { get; set; }
public Task CloseAsync()
{
return this.ConnectionManager.CloseAsync();
}
protected void InitializeConnection(ServiceBusConnectionStringBuilder builder)
{
this.Endpoint = new Uri(builder.Endpoint);
this.SasKeyName = builder.SasKeyName;
this.SasKey = builder.SasKey;
this.SasToken = builder.SasToken;
this.TransportType = builder.TransportType;
this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnection);
}
static void CloseConnection(AmqpConnection connection)
{
MessagingEventSource.Log.AmqpConnectionClosed(connection);
connection.SafeClose();
}
async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout)
{
var hostName = this.Endpoint.Host;
var timeoutHelper = new TimeoutHelper(timeout);
var amqpSettings = AmqpConnectionHelper.CreateAmqpSettings(
amqpVersion: AmqpVersion,
useSslStreamSecurity: true,
hasTokenProvider: true,
useWebSockets: TransportType == TransportType.AmqpWebSockets);
var transportSettings = CreateTransportSettings();
var amqpTransportInitiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
var transport = await amqpTransportInitiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
var containerId = Guid.NewGuid().ToString();
var amqpConnectionSettings = AmqpConnectionHelper.CreateAmqpConnectionSettings(AmqpConstants.DefaultMaxFrameSize, containerId, hostName);
var connection = new AmqpConnection(transport, amqpSettings, amqpConnectionSettings);
await connection.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Always create the CBS Link + Session
var cbsLink = new AmqpCbsLink(connection);
if (connection.Extensions.Find<AmqpCbsLink>() == null)
{
connection.Extensions.Add(cbsLink);
}
MessagingEventSource.Log.AmqpConnectionCreated(hostName, connection);
return connection;
}
private TransportSettings CreateTransportSettings()
{
var hostName = this.Endpoint.Host;
var networkHost = this.Endpoint.Host;
var port = this.Endpoint.Port;
if (TransportType == TransportType.AmqpWebSockets)
{
return AmqpConnectionHelper.CreateWebSocketTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port);
}
return AmqpConnectionHelper.CreateTcpTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port,
useSslStreamSecurity: true);
}
internal TokenProvider CreateTokenProvider()
{
if (SasToken != null)
{
return TokenProvider.CreateSharedAccessSignatureTokenProvider(SasToken);
}
else
{
return TokenProvider.CreateSharedAccessSignatureTokenProvider(SasKeyName, SasKey);
}
}
}
}

Просмотреть файл

@ -1,49 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
internal class ServiceBusNamespaceConnection : ServiceBusConnection
{
public ServiceBusNamespaceConnection(string namespaceConnectionString)
: this(namespaceConnectionString, Constants.DefaultOperationTimeout, RetryPolicy.Default)
{
}
public ServiceBusNamespaceConnection(string namespaceConnectionString, TimeSpan operationTimeout, RetryPolicy retryPolicy)
: base(operationTimeout, retryPolicy)
{
if (string.IsNullOrWhiteSpace(namespaceConnectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(namespaceConnectionString));
}
var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder(namespaceConnectionString);
if (!string.IsNullOrWhiteSpace(serviceBusConnectionStringBuilder.EntityPath))
{
throw Fx.Exception.Argument(nameof(namespaceConnectionString), "NamespaceConnectionString should not contain EntityPath.");
}
this.InitializeConnection(serviceBusConnectionStringBuilder);
}
public ServiceBusNamespaceConnection(string endpoint, TransportType transportType, RetryPolicy retryPolicy)
: base(Constants.DefaultOperationTimeout, retryPolicy)
{
if (string.IsNullOrWhiteSpace(endpoint))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(endpoint));
}
var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder()
{
Endpoint = endpoint,
TransportType = transportType
};
this.InitializeConnection(serviceBusConnectionStringBuilder);
}
}
}

Просмотреть файл

@ -84,19 +84,13 @@ namespace Microsoft.Azure.ServiceBus
/// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></param>
/// <remarks>Creates a new connection to the queue, which is opened during the first send/receive operation.</remarks>
public QueueClient(string connectionString, string entityPath, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null)
: this(new ServiceBusNamespaceConnection(connectionString), entityPath, receiveMode, retryPolicy ?? RetryPolicy.Default)
: this(new ServiceBusConnection(connectionString), entityPath, receiveMode, retryPolicy ?? RetryPolicy.Default)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
var tokenProvider = this.ServiceBusConnection.CreateTokenProvider();
this.CbsTokenProvider = new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
this.ownsConnection = true;
}
@ -117,26 +111,41 @@ namespace Microsoft.Azure.ServiceBus
TransportType transportType = TransportType.Amqp,
ReceiveMode receiveMode = ReceiveMode.PeekLock,
RetryPolicy retryPolicy = null)
: this(new ServiceBusNamespaceConnection(endpoint, transportType, retryPolicy), entityPath, receiveMode, retryPolicy)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entityPath, receiveMode, retryPolicy)
{
if (tokenProvider == null)
{
throw Fx.Exception.ArgumentNull(nameof(tokenProvider));
}
this.CbsTokenProvider = new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
this.ownsConnection = true;
}
QueueClient(ServiceBusNamespaceConnection serviceBusConnection, string entityPath, ReceiveMode receiveMode, RetryPolicy retryPolicy)
/// <summary>
/// Creates a new instance of the Queue client on a given <see cref="ServiceBusConnection"/>
/// </summary>
/// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
/// <param name="entityPath">Queue path.</param>
/// <param name="receiveMode">Mode of receive of messages. Default to <see cref="ReceiveMode"/>.PeekLock.</param>
/// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></param>
public QueueClient(ServiceBusConnection serviceBusConnection, string entityPath, ReceiveMode receiveMode, RetryPolicy retryPolicy)
: base(nameof(QueueClient), entityPath, retryPolicy)
{
MessagingEventSource.Log.QueueClientCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath, receiveMode.ToString());
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.syncLock = new object();
this.QueueName = entityPath;
this.ReceiveMode = receiveMode;
this.ownsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
}
else
{
throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
}
MessagingEventSource.Log.QueueClientCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
}
@ -163,7 +172,7 @@ namespace Microsoft.Azure.ServiceBus
/// <summary>
/// Gets the name of the queue.
/// </summary>
public string Path => this.QueueName;
public override string Path => this.QueueName;
/// <summary>
/// Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when and before the application asks for one using Receive.
@ -210,6 +219,11 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
public override IList<ServiceBusPlugin> RegisteredPlugins => this.InnerSender.RegisteredPlugins;
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public override ServiceBusConnection ServiceBusConnection { get; }
internal MessageSender InnerSender
{
get
@ -224,7 +238,6 @@ namespace Microsoft.Azure.ServiceBus
this.QueueName,
MessagingEntityType.Queue,
this.ServiceBusConnection,
null,
this.CbsTokenProvider,
this.RetryPolicy);
}
@ -250,7 +263,6 @@ namespace Microsoft.Azure.ServiceBus
MessagingEntityType.Queue,
this.ReceiveMode,
this.ServiceBusConnection,
null,
this.CbsTokenProvider,
this.RetryPolicy,
this.PrefetchCount);
@ -279,7 +291,6 @@ namespace Microsoft.Azure.ServiceBus
this.ReceiveMode,
this.PrefetchCount,
this.ServiceBusConnection,
null,
this.CbsTokenProvider,
this.RetryPolicy,
this.RegisteredPlugins);
@ -315,9 +326,7 @@ namespace Microsoft.Azure.ServiceBus
return this.sessionPumpHost;
}
}
internal ServiceBusConnection ServiceBusConnection { get; }
ICbsTokenProvider CbsTokenProvider { get; }
/// <summary>

Просмотреть файл

@ -0,0 +1,248 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Transaction;
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.ServiceBus.Amqp;
using Microsoft.Azure.ServiceBus.Primitives;
/// <summary>
/// Connection object to service bus namespace
/// </summary>
public class ServiceBusConnection
{
static readonly Version AmqpVersion = new Version(1, 0, 0, 0);
/// <summary>
/// Creates a new connection to service bus.
/// </summary>
/// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace information.</param>
/// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks>
public ServiceBusConnection(ServiceBusConnectionStringBuilder connectionStringBuilder)
: this(connectionStringBuilder?.GetNamespaceConnectionString())
{
}
/// <summary>
/// Creates a new connection to service bus.
/// </summary>
/// <param name="namespaceConnectionString">Namespace connection string</param>
/// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks>
public ServiceBusConnection(string namespaceConnectionString)
: this(namespaceConnectionString, Constants.DefaultOperationTimeout, RetryPolicy.Default)
{
}
/// <summary>
/// Creates a new connection to service bus.
/// </summary>
/// <param name="namespaceConnectionString">Namespace connection string.</param>
/// <param name="operationTimeout">Duration after which individual operations will timeout.</param>
/// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param>
/// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks>
public ServiceBusConnection(string namespaceConnectionString, TimeSpan operationTimeout, RetryPolicy retryPolicy = null)
: this(operationTimeout, retryPolicy)
{
if (string.IsNullOrWhiteSpace(namespaceConnectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(namespaceConnectionString));
}
var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder(namespaceConnectionString);
if (!string.IsNullOrWhiteSpace(serviceBusConnectionStringBuilder.EntityPath))
{
throw Fx.Exception.Argument(nameof(namespaceConnectionString), "NamespaceConnectionString should not contain EntityPath.");
}
this.InitializeConnection(serviceBusConnectionStringBuilder);
}
/// <summary>
/// Creates a new connection to service bus.
/// </summary>
/// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.windows.net</param>
/// <param name="transportType">Transport type.</param>
/// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param>
public ServiceBusConnection(string endpoint, TransportType transportType, RetryPolicy retryPolicy = null)
: this(Constants.DefaultOperationTimeout, retryPolicy)
{
if (string.IsNullOrWhiteSpace(endpoint))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(endpoint));
}
var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder()
{
Endpoint = endpoint,
TransportType = transportType
};
this.InitializeConnection(serviceBusConnectionStringBuilder);
}
internal ServiceBusConnection(TimeSpan operationTimeout, RetryPolicy retryPolicy = null)
{
this.OperationTimeout = operationTimeout;
this.RetryPolicy = retryPolicy ?? RetryPolicy.Default;
}
/// <summary>
/// Fully qualified domain name for Service Bus.
/// </summary>
public Uri Endpoint { get; set; }
/// <summary>
/// OperationTimeout is applied in erroneous situations to notify the caller about the relevant <see cref="ServiceBusException"/>
/// </summary>
/// <remarks>Defaults to 1 minute.</remarks>
public TimeSpan OperationTimeout { get; set; }
/// <summary>
/// Retry policy for operations performed on the connection.
/// </summary>
/// <remarks>Defaults to <see cref="RetryPolicy.Default"/></remarks>
public RetryPolicy RetryPolicy { get; set; }
/// <summary>
/// Get the transport type from the connection string.
/// <remarks>Available options: Amqp and AmqpWebSockets.</remarks>
/// </summary>
public TransportType TransportType { get; set; }
/// <summary>
/// Token provider for authentication. <see cref="TokenProvider"/>
/// </summary>
public ITokenProvider TokenProvider { get; set; }
internal FaultTolerantAmqpObject<AmqpConnection> ConnectionManager { get; set; }
internal FaultTolerantAmqpObject<Controller> TransactionController { get; set; }
/// <summary>
/// Closes the connection.
/// </summary>
public Task CloseAsync()
{
return this.ConnectionManager.CloseAsync();
}
void InitializeConnection(ServiceBusConnectionStringBuilder builder)
{
this.Endpoint = new Uri(builder.Endpoint);
if (builder.SasToken != null)
{
this.TokenProvider = new SharedAccessSignatureTokenProvider(builder.SasToken);
}
else if (builder.SasKeyName != null || builder.SasKey != null)
{
this.TokenProvider = new SharedAccessSignatureTokenProvider(builder.SasKeyName, builder.SasKey);
}
this.TransportType = builder.TransportType;
this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnection);
this.TransactionController = new FaultTolerantAmqpObject<Controller>(this.CreateControllerAsync, CloseController);
}
static void CloseConnection(AmqpConnection connection)
{
MessagingEventSource.Log.AmqpConnectionClosed(connection);
connection.SafeClose();
}
static void CloseController(Controller controller)
{
controller.Close();
}
async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout)
{
var hostName = this.Endpoint.Host;
var timeoutHelper = new TimeoutHelper(timeout);
var amqpSettings = AmqpConnectionHelper.CreateAmqpSettings(
amqpVersion: AmqpVersion,
useSslStreamSecurity: true,
hasTokenProvider: true,
useWebSockets: TransportType == TransportType.AmqpWebSockets);
var transportSettings = CreateTransportSettings();
var amqpTransportInitiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
var transport = await amqpTransportInitiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
var containerId = Guid.NewGuid().ToString();
var amqpConnectionSettings = AmqpConnectionHelper.CreateAmqpConnectionSettings(AmqpConstants.DefaultMaxFrameSize, containerId, hostName);
var connection = new AmqpConnection(transport, amqpSettings, amqpConnectionSettings);
await connection.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Always create the CBS Link + Session
var cbsLink = new AmqpCbsLink(connection);
if (connection.Extensions.Find<AmqpCbsLink>() == null)
{
connection.Extensions.Add(cbsLink);
}
MessagingEventSource.Log.AmqpConnectionCreated(hostName, connection);
return connection;
}
async Task<Controller> CreateControllerAsync(TimeSpan timeout)
{
var timeoutHelper = new TimeoutHelper(timeout);
var connection = await this.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
AmqpSession amqpSession = null;
Controller controller;
try
{
amqpSession = connection.CreateSession(sessionSettings);
await amqpSession.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
controller = new Controller(amqpSession, timeoutHelper.RemainingTime());
await controller.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}
catch (Exception exception)
{
if (amqpSession != null)
{
await amqpSession.CloseAsync(timeout).ConfigureAwait(false);
}
MessagingEventSource.Log.AmqpCreateControllerException(this.ConnectionManager.ToString(), exception);
throw;
}
return controller;
}
TransportSettings CreateTransportSettings()
{
var hostName = this.Endpoint.Host;
var networkHost = this.Endpoint.Host;
var port = this.Endpoint.Port;
if (TransportType == TransportType.AmqpWebSockets)
{
return AmqpConnectionHelper.CreateWebSocketTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port);
}
return AmqpConnectionHelper.CreateTcpTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port,
useSslStreamSecurity: true);
}
}
}

Просмотреть файл

@ -87,8 +87,7 @@ namespace Microsoft.Azure.ServiceBus
null,
receiveMode,
prefetchCount,
new ServiceBusNamespaceConnection(connectionString),
null,
new ServiceBusConnection(connectionString),
null,
retryPolicy,
null)
@ -97,10 +96,6 @@ namespace Microsoft.Azure.ServiceBus
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ownsConnection = true;
}
@ -130,24 +125,42 @@ namespace Microsoft.Azure.ServiceBus
null,
receiveMode,
prefetchCount,
new ServiceBusNamespaceConnection(endpoint, transportType, retryPolicy),
tokenProvider,
new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider},
null,
retryPolicy,
null)
{
if (tokenProvider == null)
{
throw Fx.Exception.ArgumentNull(nameof(tokenProvider));
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ownsConnection = true;
}
/// <summary>
/// Creates a new SessionClient on a given <see cref="ServiceBusConnection"/>
/// </summary>
/// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
/// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the full path.</param>
/// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
/// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with ServiceBus. Defaults to <see cref="RetryPolicy.Default"/></param>
/// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages the session object
/// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
public SessionClient(
ServiceBusConnection serviceBusConnection,
string entityPath,
ReceiveMode receiveMode,
RetryPolicy retryPolicy = null,
int prefetchCount = DefaultPrefetchCount)
: this(nameof(SessionClient),
entityPath,
null,
receiveMode,
prefetchCount,
serviceBusConnection,
null,
retryPolicy,
null)
{
this.ownsConnection = false;
}
internal SessionClient(
string clientTypeName,
string entityPath,
@ -155,19 +168,35 @@ namespace Microsoft.Azure.ServiceBus
ReceiveMode receiveMode,
int prefetchCount,
ServiceBusConnection serviceBusConnection,
ITokenProvider tokenProvider,
ICbsTokenProvider cbsTokenProvider,
RetryPolicy retryPolicy,
IList<ServiceBusPlugin> registeredPlugins)
: base(clientTypeName, entityPath, retryPolicy ?? RetryPolicy.Default)
{
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.EntityPath = entityPath;
this.EntityType = entityType;
this.ReceiveMode = receiveMode;
this.PrefetchCount = prefetchCount;
tokenProvider = tokenProvider ?? this.ServiceBusConnection.CreateTokenProvider();
this.CbsTokenProvider = cbsTokenProvider ?? new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
if (cbsTokenProvider != null)
{
this.CbsTokenProvider = cbsTokenProvider;
}
else if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
}
else
{
throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
}
this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
// Register plugins on the message session.
@ -187,6 +216,11 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
public string EntityPath { get; }
/// <summary>
/// Gets the path of the entity. This is either the name of the queue, or the full path of the subscription.
/// </summary>
public override string Path => this.EntityPath;
/// <summary>
/// Duration after which individual operations will timeout.
/// </summary>
@ -196,12 +230,15 @@ namespace Microsoft.Azure.ServiceBus
set => this.ServiceBusConnection.OperationTimeout = value;
}
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public override ServiceBusConnection ServiceBusConnection { get; }
MessagingEntityType? EntityType { get; }
internal int PrefetchCount { get; set; }
ServiceBusConnection ServiceBusConnection { get; }
ICbsTokenProvider CbsTokenProvider { get; }
/// <summary>

Просмотреть файл

@ -79,23 +79,13 @@ namespace Microsoft.Azure.ServiceBus
/// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Default"/></param>
/// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</remarks>
public SubscriptionClient(string connectionString, string topicPath, string subscriptionName, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null)
: this(new ServiceBusNamespaceConnection(connectionString), topicPath, subscriptionName, receiveMode, retryPolicy ?? RetryPolicy.Default)
: this(new ServiceBusConnection(connectionString), topicPath, subscriptionName, receiveMode, retryPolicy ?? RetryPolicy.Default)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}
if (string.IsNullOrWhiteSpace(topicPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(topicPath);
}
if (string.IsNullOrWhiteSpace(subscriptionName))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(subscriptionName);
}
var tokenProvider = this.ServiceBusConnection.CreateTokenProvider();
this.CbsTokenProvider = new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
this.ownsConnection = true;
}
@ -118,20 +108,31 @@ namespace Microsoft.Azure.ServiceBus
TransportType transportType = TransportType.Amqp,
ReceiveMode receiveMode = ReceiveMode.PeekLock,
RetryPolicy retryPolicy = null)
: this(new ServiceBusNamespaceConnection(endpoint, transportType, retryPolicy), topicPath, subscriptionName, receiveMode, retryPolicy)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, topicPath, subscriptionName, receiveMode, retryPolicy)
{
if (tokenProvider == null)
{
throw Fx.Exception.ArgumentNull(nameof(tokenProvider));
}
this.CbsTokenProvider = new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
this.ownsConnection = true;
}
SubscriptionClient(ServiceBusNamespaceConnection serviceBusConnection, string topicPath, string subscriptionName, ReceiveMode receiveMode, RetryPolicy retryPolicy)
/// <summary>
/// Creates a new instance of the Subscription client on a given <see cref="ServiceBusConnection"/>
/// </summary>
/// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
/// <param name="topicPath">Topic path.</param>
/// <param name="subscriptionName">Subscription name.</param>
/// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</param>
/// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Default"/></param>
public SubscriptionClient(ServiceBusConnection serviceBusConnection, string topicPath, string subscriptionName, ReceiveMode receiveMode, RetryPolicy retryPolicy)
: base(nameof(SubscriptionClient), $"{topicPath}/{subscriptionName}", retryPolicy)
{
if (string.IsNullOrWhiteSpace(topicPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(topicPath);
}
if (string.IsNullOrWhiteSpace(subscriptionName))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(subscriptionName);
}
MessagingEventSource.Log.SubscriptionClientCreateStart(serviceBusConnection?.Endpoint.Authority, topicPath, subscriptionName, receiveMode.ToString());
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
@ -141,6 +142,15 @@ namespace Microsoft.Azure.ServiceBus
this.Path = EntityNameHelper.FormatSubscriptionPath(this.TopicPath, this.SubscriptionName);
this.ReceiveMode = receiveMode;
this.diagnosticSource = new ServiceBusDiagnosticSource(this.Path, serviceBusConnection.Endpoint);
this.ownsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
}
else
{
throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
}
MessagingEventSource.Log.SubscriptionClientCreateStop(serviceBusConnection.Endpoint.Authority, topicPath, subscriptionName, this.ClientId);
}
@ -154,7 +164,7 @@ namespace Microsoft.Azure.ServiceBus
/// Gets the formatted path of the subscription client.
/// </summary>
/// <seealso cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/>
public string Path { get; }
public override string Path { get; }
/// <summary>
/// Gets the name of the subscription.
@ -215,6 +225,11 @@ namespace Microsoft.Azure.ServiceBus
}
}
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public override ServiceBusConnection ServiceBusConnection { get; }
internal IInnerSubscriptionClient InnerSubscriptionClient
{
get
@ -254,7 +269,6 @@ namespace Microsoft.Azure.ServiceBus
this.ReceiveMode,
this.PrefetchCount,
this.ServiceBusConnection,
null,
this.CbsTokenProvider,
this.RetryPolicy,
this.RegisteredPlugins);
@ -289,8 +303,6 @@ namespace Microsoft.Azure.ServiceBus
}
}
internal ServiceBusNamespaceConnection ServiceBusConnection { get; }
ICbsTokenProvider CbsTokenProvider { get; }
/// <summary>

Просмотреть файл

@ -54,19 +54,13 @@ namespace Microsoft.Azure.ServiceBus
/// <param name="retryPolicy">Retry policy for topic operations. Defaults to <see cref="RetryPolicy.Default"/></param>
/// <remarks>Creates a new connection to the topic, which is opened during the first send operation.</remarks>
public TopicClient(string connectionString, string entityPath, RetryPolicy retryPolicy = null)
: this(new ServiceBusNamespaceConnection(connectionString), entityPath, retryPolicy ?? RetryPolicy.Default)
: this(new ServiceBusConnection(connectionString), entityPath, retryPolicy ?? RetryPolicy.Default)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
var tokenProvider = this.ServiceBusConnection.CreateTokenProvider();
this.CbsTokenProvider = new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
this.ownsConnection = true;
}
@ -85,25 +79,38 @@ namespace Microsoft.Azure.ServiceBus
ITokenProvider tokenProvider,
TransportType transportType = TransportType.Amqp,
RetryPolicy retryPolicy = null)
: this(new ServiceBusNamespaceConnection(endpoint, transportType, retryPolicy), entityPath, retryPolicy)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entityPath, retryPolicy)
{
if (tokenProvider == null)
{
throw Fx.Exception.ArgumentNull(nameof(tokenProvider));
}
this.CbsTokenProvider = new TokenProviderAdapter(tokenProvider, this.ServiceBusConnection.OperationTimeout);
this.ownsConnection = true;
}
TopicClient(ServiceBusNamespaceConnection serviceBusConnection, string entityPath, RetryPolicy retryPolicy)
/// <summary>
/// Creates a new instance of the Topic client on a given <see cref="ServiceBusConnection"/>
/// </summary>
/// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
/// <param name="entityPath">Topic path.</param>
/// <param name="retryPolicy">Retry policy for topic operations. Defaults to <see cref="RetryPolicy.Default"/></param>
public TopicClient(ServiceBusConnection serviceBusConnection, string entityPath, RetryPolicy retryPolicy)
: base(nameof(TopicClient), entityPath, retryPolicy)
{
MessagingEventSource.Log.TopicClientCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath);
if (string.IsNullOrWhiteSpace(entityPath))
{
throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
}
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.syncLock = new object();
this.TopicName = entityPath;
this.ownsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
}
else
{
throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
}
MessagingEventSource.Log.TopicClientCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
}
@ -125,7 +132,12 @@ namespace Microsoft.Azure.ServiceBus
/// <summary>
/// Gets the name of the topic.
/// </summary>
public string Path => this.TopicName;
public override string Path => this.TopicName;
/// <summary>
/// Connection object to the service bus namespace.
/// </summary>
public override ServiceBusConnection ServiceBusConnection { get; }
internal MessageSender InnerSender
{
@ -141,7 +153,6 @@ namespace Microsoft.Azure.ServiceBus
this.TopicName,
MessagingEntityType.Topic,
this.ServiceBusConnection,
null,
this.CbsTokenProvider,
this.RetryPolicy);
}
@ -151,9 +162,7 @@ namespace Microsoft.Azure.ServiceBus
return this.innerSender;
}
}
internal ServiceBusNamespaceConnection ServiceBusConnection { get; }
ICbsTokenProvider CbsTokenProvider { get; }
/// <summary>

Просмотреть файл

@ -10,8 +10,10 @@ namespace Microsoft.Azure.ServiceBus
public string ClientId { get; }
public bool IsClosedOrClosing { get; }
public abstract System.TimeSpan OperationTimeout { get; set; }
public abstract string Path { get; }
public abstract System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public Microsoft.Azure.ServiceBus.RetryPolicy RetryPolicy { get; }
public abstract Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public System.Threading.Tasks.Task CloseAsync() { }
protected static string GenerateClientId(string clientTypeName, string postfix = "") { }
protected static long GetNextId() { }
@ -77,7 +79,9 @@ namespace Microsoft.Azure.ServiceBus
string ClientId { get; }
bool IsClosedOrClosing { get; }
System.TimeSpan OperationTimeout { get; set; }
string Path { get; }
System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
System.Threading.Tasks.Task CloseAsync();
void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin);
void UnregisterPlugin(string serviceBusPluginName);
@ -196,12 +200,14 @@ namespace Microsoft.Azure.ServiceBus
public QueueClient(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public QueueClient(string connectionString, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public QueueClient(string endpoint, string entityPath, Microsoft.Azure.ServiceBus.Primitives.ITokenProvider tokenProvider, Microsoft.Azure.ServiceBus.TransportType transportType = 0, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public QueueClient(Microsoft.Azure.ServiceBus.ServiceBusConnection serviceBusConnection, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy) { }
public override System.TimeSpan OperationTimeout { get; set; }
public string Path { get; }
public override string Path { get; }
public int PrefetchCount { get; set; }
public string QueueName { get; }
public Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
public System.Threading.Tasks.Task CompleteAsync(string lockToken) { }
@ -269,6 +275,19 @@ namespace Microsoft.Azure.ServiceBus
public ServiceBusCommunicationException(string message) { }
public ServiceBusCommunicationException(string message, System.Exception innerException) { }
}
public class ServiceBusConnection
{
public ServiceBusConnection(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder) { }
public ServiceBusConnection(string namespaceConnectionString) { }
public ServiceBusConnection(string namespaceConnectionString, System.TimeSpan operationTimeout, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public ServiceBusConnection(string endpoint, Microsoft.Azure.ServiceBus.TransportType transportType, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public System.Uri Endpoint { get; set; }
public System.TimeSpan OperationTimeout { get; set; }
public Microsoft.Azure.ServiceBus.RetryPolicy RetryPolicy { get; set; }
public Microsoft.Azure.ServiceBus.Primitives.ITokenProvider TokenProvider { get; set; }
public Microsoft.Azure.ServiceBus.TransportType TransportType { get; set; }
public System.Threading.Tasks.Task CloseAsync() { }
}
public class ServiceBusConnectionStringBuilder
{
public ServiceBusConnectionStringBuilder() { }
@ -307,9 +326,12 @@ namespace Microsoft.Azure.ServiceBus
public SessionClient(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public SessionClient(string connectionString, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public SessionClient(string endpoint, string entityPath, Microsoft.Azure.ServiceBus.Primitives.ITokenProvider tokenProvider, Microsoft.Azure.ServiceBus.TransportType transportType = 0, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public SessionClient(Microsoft.Azure.ServiceBus.ServiceBusConnection serviceBusConnection, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public string EntityPath { get; }
public override System.TimeSpan OperationTimeout { get; set; }
public override string Path { get; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.IMessageSession> AcceptMessageSessionAsync() { }
public System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.IMessageSession> AcceptMessageSessionAsync(System.TimeSpan serverWaitTime) { }
public System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.IMessageSession> AcceptMessageSessionAsync(string sessionId) { }
@ -351,11 +373,13 @@ namespace Microsoft.Azure.ServiceBus
public SubscriptionClient(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, string subscriptionName, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public SubscriptionClient(string connectionString, string topicPath, string subscriptionName, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public SubscriptionClient(string endpoint, string topicPath, string subscriptionName, Microsoft.Azure.ServiceBus.Primitives.ITokenProvider tokenProvider, Microsoft.Azure.ServiceBus.TransportType transportType = 0, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public SubscriptionClient(Microsoft.Azure.ServiceBus.ServiceBusConnection serviceBusConnection, string topicPath, string subscriptionName, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy) { }
public override System.TimeSpan OperationTimeout { get; set; }
public string Path { get; }
public override string Path { get; }
public int PrefetchCount { get; set; }
public Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public string SubscriptionName { get; }
public string TopicPath { get; }
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
@ -380,9 +404,11 @@ namespace Microsoft.Azure.ServiceBus
public TopicClient(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public TopicClient(string connectionString, string entityPath, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public TopicClient(string endpoint, string entityPath, Microsoft.Azure.ServiceBus.Primitives.ITokenProvider tokenProvider, Microsoft.Azure.ServiceBus.TransportType transportType = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public TopicClient(Microsoft.Azure.ServiceBus.ServiceBusConnection serviceBusConnection, string entityPath, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy) { }
public override System.TimeSpan OperationTimeout { get; set; }
public string Path { get; }
public override string Path { get; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public string TopicName { get; }
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
@ -427,7 +453,6 @@ namespace Microsoft.Azure.ServiceBus.Core
public interface IMessageSender : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity { }
public interface IReceiverClient : Microsoft.Azure.ServiceBus.IClientEntity
{
string Path { get; }
int PrefetchCount { get; set; }
Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; }
System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null);
@ -449,12 +474,14 @@ namespace Microsoft.Azure.ServiceBus.Core
public MessageReceiver(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public MessageReceiver(string connectionString, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public MessageReceiver(string endpoint, string entityPath, Microsoft.Azure.ServiceBus.Primitives.ITokenProvider tokenProvider, Microsoft.Azure.ServiceBus.TransportType transportType = 0, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public MessageReceiver(Microsoft.Azure.ServiceBus.ServiceBusConnection serviceBusConnection, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public long LastPeekedSequenceNumber { get; }
public override System.TimeSpan OperationTimeout { get; set; }
public virtual string Path { get; }
public override string Path { get; }
public int PrefetchCount { get; set; }
public Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; set; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task CompleteAsync(string lockToken) { }
public System.Threading.Tasks.Task CompleteAsync(System.Collections.Generic.IEnumerable<string> lockTokens) { }
@ -493,9 +520,11 @@ namespace Microsoft.Azure.ServiceBus.Core
public MessageSender(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public MessageSender(string connectionString, string entityPath, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public MessageSender(string endpoint, string entityPath, Microsoft.Azure.ServiceBus.Primitives.ITokenProvider tokenProvider, Microsoft.Azure.ServiceBus.TransportType transportType = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public MessageSender(Microsoft.Azure.ServiceBus.ServiceBusConnection serviceBusConnection, string entityPath, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null) { }
public override System.TimeSpan OperationTimeout { get; set; }
public virtual string Path { get; }
public override string Path { get; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }

Просмотреть файл

@ -25,7 +25,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Diagnostics
Assert.Null(activity.Id);
var baggage = activity.Baggage.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
Assert.Equal(1, baggage.Count);
Assert.Single(baggage);
Assert.Contains("k1", baggage.Keys);
Assert.Equal("v1", baggage["k1"]);
}
@ -81,7 +81,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Diagnostics
Assert.Null(activity.Id);
var baggage = activity.Baggage.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
Assert.Equal(0, baggage.Count);
Assert.Empty(baggage);
}
@ -104,7 +104,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Diagnostics
Assert.Null(activity.Id);
var baggage = activity.Baggage.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
Assert.Equal(0, baggage.Count);
Assert.Empty(baggage);
}
[Theory]
@ -125,7 +125,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Diagnostics
var baggage = activity.Baggage.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
// baggage is ignored in absence of Id
Assert.Equal(0, baggage.Count);
Assert.Empty(baggage);
}
}
}

Просмотреть файл

@ -11,7 +11,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
public class MessageInteropTests
{
public static IEnumerable<object> TestSerializerPermutations => new object[]
public static IEnumerable<object[]> TestSerializerPermutations => new object[][]
{
new object[] { new DataContractBinarySerializer(typeof(TestBook)) },
new object[] { new DataContractSerializer(typeof(TestBook)) }

Просмотреть файл

@ -14,7 +14,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
public class MessageInteropEnd2EndTests
{
public static IEnumerable<object> TestEnd2EndEntityPermutations => new object[]
public static IEnumerable<object[]> TestEnd2EndEntityPermutations => new object[][]
{
new object[] { TransportType.NetMessaging, MessageInteropEnd2EndTests.GetSbConnectionString(TransportType.NetMessaging) },
new object[] { TransportType.Amqp, MessageInteropEnd2EndTests.GetSbConnectionString(TransportType.Amqp) }
@ -28,7 +28,11 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
var queueName = TestConstants.NonPartitionedQueueName;
// Create a full framework MessageSender
var messagingFactory = MessagingFactory.CreateFromConnectionString(sbConnectionString);
var csb = new Microsoft.ServiceBus.ServiceBusConnectionStringBuilder(sbConnectionString)
{
TransportType = transportType
};
var messagingFactory = MessagingFactory.CreateFromConnectionString(csb.ToString());
var fullFrameWorkClientSender = messagingFactory.CreateMessageSender(queueName);
// Create a .NetStandard MessageReceiver
@ -46,7 +50,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
TestUtility.Log($"Message1 SequenceNumber: {returnedMessage.SystemProperties.SequenceNumber}");
var returnedBody1 = returnedMessage.GetBody<string>();
TestUtility.Log($"Message1: {returnedBody1}");
Assert.True(string.Equals(message1Body, returnedBody1));
Assert.Equal(message1Body, returnedBody1);
// Send Custom object
var book = new TestBook("contosoBook", 1, 5);
@ -70,7 +74,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
TestUtility.Log($"Message3 SequenceNumber: {returnedMessage.SystemProperties.SequenceNumber}");
var returnedBody3 = Encoding.UTF8.GetString(returnedMessage.GetBody<byte[]>());
TestUtility.Log($"Message1: {returnedBody3}");
Assert.True(string.Equals(message3Body, returnedBody3));
Assert.Equal(message3Body, returnedBody3);
// Send Stream Object
var message4Body = "contosoStreamObject";
@ -82,7 +86,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
TestUtility.Log($"Message3 SequenceNumber: {returnedMessage.SystemProperties.SequenceNumber}");
var returnedBody4 = Encoding.UTF8.GetString(returnedMessage.Body);
TestUtility.Log($"Message4: {returnedBody4}");
Assert.True(string.Equals(message4Body, returnedBody4));
Assert.Equal(message4Body, returnedBody4);
}
finally
{

Просмотреть файл

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netcoreapp2.0;net461</TargetFrameworks>
<TargetFrameworks>net461;netcoreapp2.0</TargetFrameworks>
<AssemblyOriginatorKeyFile>../../build/keyfile.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
@ -22,6 +22,8 @@
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\OnMessageQueueTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\OnSessionQueueTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\RetryTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\TransactionTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\SubscriptionClientTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\Diagnostics\ExtractActivityTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\Diagnostics\QueueClientDiagnosticsTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\Diagnostics\SubscriptionClientDiagnosticsTests.cs" />
@ -30,21 +32,26 @@
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp2.0'">
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\OnMessageTopicSubscriptionTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\TopicClientTests.cs" />
<compile Remove="..\..\test\Microsoft.Azure.ServiceBus.UnitTests\OnSessionTopicSubscriptionTests.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" />
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" />
<PackageReference Include="System.ValueTuple" Version="4.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="PublicApiGenerator" Version="6.5.0" />
<PackageReference Include="ApprovalTests" Version="3.0.13" NoWarn="NU1701" />
<PackageReference Include="ApprovalUtilities" Version="3.0.13" NoWarn="NU1701" />
<PackageReference Include="Microsoft.Extensions.PlatformAbstractions" Version="1.1.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net461'">
<Reference Include="System.Transactions" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>

Просмотреть файл

@ -11,7 +11,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public class OnMessageQueueTests : SenderReceiverClientTestBase
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedQueueName, 1 },
new object[] { TestConstants.NonPartitionedQueueName, 10 },

Просмотреть файл

@ -9,7 +9,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public class OnMessageTopicSubscriptionTests : SenderReceiverClientTestBase
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedTopicName, 5 },
new object[] { TestConstants.PartitionedTopicName, 5 },

Просмотреть файл

@ -11,7 +11,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public class OnSessionQueueTests
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.SessionNonPartitionedQueueName, 1 },
new object[] { TestConstants.SessionNonPartitionedQueueName, 5 },
@ -19,7 +19,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
new object[] { TestConstants.SessionPartitionedQueueName, 5 },
};
public static IEnumerable<object> PartitionedNonPartitionedTestPermutations => new object[]
public static IEnumerable<object[]> PartitionedNonPartitionedTestPermutations => new object[][]
{
new object[] { TestConstants.SessionNonPartitionedQueueName, 5 },
new object[] { TestConstants.SessionPartitionedQueueName, 5 },

Просмотреть файл

@ -11,7 +11,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public class OnSessionTopicSubscriptionTests
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedSessionTopicName, 1 },
new object[] { TestConstants.NonPartitionedSessionTopicName, 5 },

Просмотреть файл

@ -18,35 +18,28 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
[Fact]
public void Returns_endpoint_with_proper_uri_scheme()
{
var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString);
var namespaceConnection = new ServiceBusConnection(NamespaceConnectionString);
Assert.Equal(Endpoint, namespaceConnection.Endpoint.Authority);
}
[Fact]
public void Returns_shared_access_key_name()
{
var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString);
Assert.Equal(SasKeyName, namespaceConnection.SasKeyName);
}
[Fact]
public void Returns_shared_access_key()
{
var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString);
Assert.Equal(SasKey, namespaceConnection.SasKey);
var namespaceConnection = new ServiceBusConnection(NamespaceConnectionString);
Assert.IsType<SharedAccessSignatureTokenProvider>(namespaceConnection.TokenProvider);
}
[Fact]
public void Returns_default_transport_type()
{
var namespaceConnection = new ServiceBusNamespaceConnection(NamespaceConnectionString);
var namespaceConnection = new ServiceBusConnection(NamespaceConnectionString);
Assert.Equal(TransportType.Amqp, namespaceConnection.TransportType);
}
[Fact]
public void Returns_transport_type_websockets()
{
var namespaceConnection = new ServiceBusNamespaceConnection(WebSocketsNamespaceConnectionString);
var namespaceConnection = new ServiceBusConnection(WebSocketsNamespaceConnectionString);
Assert.Equal(TransportType.AmqpWebSockets, namespaceConnection.TransportType);
}
}

Просмотреть файл

@ -9,7 +9,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public sealed class QueueClientTests : SenderReceiverClientTestBase
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedQueueName },
new object[] { TestConstants.PartitionedQueueName }

Просмотреть файл

@ -12,7 +12,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public sealed class QueueSessionTests
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.SessionNonPartitionedQueueName },
new object[] { TestConstants.SessionPartitionedQueueName },
@ -173,7 +173,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task PeekSessionAsyncTest(string queueName, int messageCount = 10)
async Task PeekSessionAsyncTest(string queueName)
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, queueName, ReceiveMode.ReceiveAndDelete);

Просмотреть файл

@ -13,7 +13,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public class RetryTests
{
// ExceptionType, CurrentRetryCount, ShouldRetry
public static IEnumerable<object> ListOfExceptions => new object[]
public static IEnumerable<object[]> ListOfExceptions => new object[][]
{
// Retry-able exceptions
new object[] { new ServiceBusCommunicationException(string.Empty), 0, true },

Просмотреть файл

@ -99,12 +99,12 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
Assert.True(receivedMessages.Count == messageCount - deferMessagesCount);
// Receive / Abandon deferred messages
receivedMessages = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumbers);
receivedMessages = await TestUtility.ReceiveDeferredMessagesAsync(messageReceiver, sequenceNumbers);
Assert.True(receivedMessages.Count == 5);
await TestUtility.DeferMessagesAsync(messageReceiver, receivedMessages);
// Receive Again and Check delivery count
receivedMessages = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumbers);
receivedMessages = await TestUtility.ReceiveDeferredMessagesAsync(messageReceiver, sequenceNumbers);
var count = receivedMessages.Count(message => message.SystemProperties.DeliveryCount == 3);
Assert.True(count == receivedMessages.Count);

Просмотреть файл

@ -15,7 +15,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
{
private static TimeSpan TwoSeconds = TimeSpan.FromSeconds(2);
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedQueueName },
new object[] { TestConstants.PartitionedQueueName }
@ -101,7 +101,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task ReceiveShouldThrowForServerTimeoutZero(string queueName)
async Task ReceiveShouldThrowForServerTimeoutZeroTest(string queueName)
{
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, queueName, receiveMode: ReceiveMode.ReceiveAndDelete);
@ -261,8 +261,8 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
Assert.NotNull(dlqMessage);
Assert.True(dlqMessage.UserProperties.ContainsKey(Message.DeadLetterReasonHeader));
Assert.True(dlqMessage.UserProperties.ContainsKey(Message.DeadLetterErrorDescriptionHeader));
Assert.Equal(dlqMessage.UserProperties[Message.DeadLetterReasonHeader], "deadLetterReason");
Assert.Equal(dlqMessage.UserProperties[Message.DeadLetterErrorDescriptionHeader], "deadLetterDescription");
Assert.Equal("deadLetterReason", dlqMessage.UserProperties[Message.DeadLetterReasonHeader]);
Assert.Equal("deadLetterDescription", dlqMessage.UserProperties[Message.DeadLetterErrorDescriptionHeader]);
}
finally
{
@ -295,7 +295,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
message = await receiver.ReceiveAsync();
Assert.NotNull(message);
Assert.True(message.UserProperties.ContainsKey("key"));
Assert.Equal(message.UserProperties["key"], "value1");
Assert.Equal("value1", message.UserProperties["key"]);
long sequenceNumber = message.SystemProperties.SequenceNumber;
await receiver.DeferAsync(message.SystemProperties.LockToken, new Dictionary<string, object>
@ -306,7 +306,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
message = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
Assert.NotNull(message);
Assert.True(message.UserProperties.ContainsKey("key"));
Assert.Equal(message.UserProperties["key"], "value2");
Assert.Equal("value2", message.UserProperties["key"]);
await receiver.CompleteAsync(message.SystemProperties.LockToken);
}

Просмотреть файл

@ -90,7 +90,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
var csBuilder = new ServiceBusConnectionStringBuilder(connectionString);
Assert.Equal("amqps://hello.servicebus.windows.net", csBuilder.Endpoint);
Assert.Equal("myQ", csBuilder.EntityPath);
Assert.Equal(1, csBuilder.ConnectionStringProperties.Count);
Assert.Single(csBuilder.ConnectionStringProperties);
Assert.True(csBuilder.ConnectionStringProperties.ContainsKey("secretmessage"));
Assert.Equal("h=llo", csBuilder.ConnectionStringProperties["secretmessage"]);
}

Просмотреть файл

@ -11,7 +11,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public sealed class SubscriptionClientTests : SenderReceiverClientTestBase
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedTopicName },
new object[] { TestConstants.PartitionedTopicName }
@ -22,7 +22,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task CorrelationFilterTestCase(string topicName, int messageCount = 10)
async Task CorrelationFilterTestCase(string topicName)
{
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName);
var subscriptionClient = new SubscriptionClient(
@ -59,7 +59,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
var messages = await subscriptionClient.InnerSubscriptionClient.InnerReceiver.ReceiveAsync(maxMessageCount: 2);
Assert.NotNull(messages);
Assert.True(messages.Count == 1);
Assert.True(messageId2.Equals(messages.First().MessageId));
Assert.Equal(messageId2, messages.First().MessageId);
}
finally
{
@ -81,7 +81,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task SqlFilterTestCase(string topicName, int messageCount = 10)
async Task SqlFilterTestCase(string topicName)
{
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName);
var subscriptionClient = new SubscriptionClient(
@ -128,7 +128,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
var messages = await subscriptionClient.InnerSubscriptionClient.InnerReceiver.ReceiveAsync(maxMessageCount: 2);
Assert.NotNull(messages);
Assert.True(messages.Count == 1);
Assert.True(messageId2.Equals(messages.First().MessageId));
Assert.Equal(messageId2, messages.First().MessageId);
}
finally
{
@ -150,7 +150,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task SqlActionTestCase(string topicName, int messageCount = 10)
async Task SqlActionTestCase(string topicName)
{
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName);
var subscriptionClient = new SubscriptionClient(
@ -198,7 +198,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
var messages = await subscriptionClient.InnerSubscriptionClient.InnerReceiver.ReceiveAsync(maxMessageCount: 2);
Assert.NotNull(messages);
Assert.True(messages.Count == 1);
Assert.True(messageId2.Equals(messages.First().MessageId));
Assert.Equal(messageId2, messages.First().MessageId);
Assert.True(messages.First().UserProperties["color"].Equals("RedSqlActionProcessed"));
}
finally
@ -225,7 +225,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
try
{
var rules = (await subscriptionClient.GetRulesAsync()).ToList();
Assert.Equal(1, rules.Count);
Assert.Single(rules);
var firstRule = rules[0];
Assert.Equal(RuleDescription.DefaultRuleName, firstRule.Name);
Assert.IsType<SqlFilter>(firstRule.Filter);

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
readonly SessionHandlerOptions sessionHandlerOptions;
ConcurrentDictionary<string, int> sessionMessageMap;
int totalMessageCount;
object syncLock = new object();
public TestSessionHandler(
ReceiveMode receiveMode,
@ -60,13 +61,16 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
await session.CompleteAsync(message.SystemProperties.LockToken);
}
if (!this.sessionMessageMap.ContainsKey(session.SessionId))
lock (syncLock)
{
this.sessionMessageMap[session.SessionId] = 1;
}
else
{
this.sessionMessageMap[session.SessionId]++;
if (!this.sessionMessageMap.ContainsKey(session.SessionId))
{
this.sessionMessageMap[session.SessionId] = 1;
}
else
{
this.sessionMessageMap[session.SessionId]++;
}
}
}

Просмотреть файл

@ -86,6 +86,25 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
return messagesToReturn;
}
/// <summary>
/// This utility method is required since for a partitioned entity, the messages could have been received from different partitions,
/// and we cannot receive all the deferred messages from different partitions in a single call.
/// </summary>
internal static async Task<IList<Message>> ReceiveDeferredMessagesAsync(IMessageReceiver messageReceiver, IEnumerable<long> sequenceNumbers)
{
var messagesToReturn = new List<Message>();
foreach(var sequenceNumber in sequenceNumbers)
{
var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);
if (msg != null)
{
messagesToReturn.Add(msg);
}
}
return messagesToReturn;
}
internal static async Task<Message> PeekMessageAsync(IMessageReceiver messageReceiver)
{
var message = await messageReceiver.PeekAsync();
@ -190,5 +209,15 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
}
}
}
internal static string GetString(this byte[] bytes)
{
return Encoding.ASCII.GetString(bytes);
}
internal static byte[] GetBytes(this string str)
{
return Encoding.ASCII.GetBytes(str);
}
}
}

Просмотреть файл

@ -9,7 +9,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
public sealed class TopicClientTests : SenderReceiverClientTestBase
{
public static IEnumerable<object> TestPermutations => new object[]
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedTopicName },
new object[] { TestConstants.PartitionedTopicName }

Просмотреть файл

@ -0,0 +1,401 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;
using Core;
using Xunit;
public class TransactionTests
{
const string QueueName = TestConstants.NonPartitionedQueueName;
static readonly string ConnectionString = TestUtility.NamespaceConnectionString;
static readonly TimeSpan ReceiveTimeout = TimeSpan.FromSeconds(5);
public static IEnumerable<object[]> TestPermutations => new object[][]
{
new object[] { TestConstants.NonPartitionedQueueName },
new object[] { TestConstants.PartitionedQueueName }
};
public static IEnumerable<object[]> SessionTestPermutations => new object[][]
{
new object[] { TestConstants.SessionNonPartitionedQueueName },
new object[] { TestConstants.SessionPartitionedQueueName }
};
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
public async Task TransactionalSendCommitTest(string queueName)
{
var sender = new MessageSender(ConnectionString, queueName);
var receiver = new MessageReceiver(ConnectionString, queueName);
try
{
string body = Guid.NewGuid().ToString("N");
var message = new Message(body.GetBytes()) {PartitionKey = "pk"};
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await sender.SendAsync(message).ConfigureAwait(false);
ts.Complete();
}
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body, receivedMessage.Body.GetString());
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
public async Task TransactionalSendRollbackTest(string queueName)
{
var sender = new MessageSender(ConnectionString, queueName);
var receiver = new MessageReceiver(ConnectionString, queueName);
try
{
string body = Guid.NewGuid().ToString("N");
var message = new Message(body.GetBytes()) { PartitionKey = "pk" };
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await sender.SendAsync(message).ConfigureAwait(false);
ts.Dispose();
}
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.Null(receivedMessage);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
public async Task TransactionalCompleteCommitTest(string queueName)
{
var sender = new MessageSender(ConnectionString, queueName);
var receiver = new MessageReceiver(ConnectionString, queueName);
try
{
string body = Guid.NewGuid().ToString("N");
var message = new Message(body.GetBytes());
await sender.SendAsync(message).ConfigureAwait(false);
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body, receivedMessage.Body.GetString());
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
ts.Complete();
}
await Assert.ThrowsAsync<MessageLockLostException>(async () => await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken));
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
public async Task TransactionalCompleteRollbackTest(string queueName)
{
var sender = new MessageSender(ConnectionString, queueName);
var receiver = new MessageReceiver(ConnectionString, queueName);
try
{
string body = Guid.NewGuid().ToString("N");
var message = new Message(body.GetBytes());
await sender.SendAsync(message).ConfigureAwait(false);
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body, receivedMessage.Body.GetString());
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
ts.Dispose();
}
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
[Theory]
[MemberData(nameof(SessionTestPermutations))]
[DisplayTestMethodName]
public async Task TransactionalSessionDispositionTest(string queueName)
{
var sender = new MessageSender(ConnectionString, queueName);
var sessionClient = new SessionClient(ConnectionString, queueName);
IMessageSession receiver = null;
try
{
string body = Guid.NewGuid().ToString("N");
var message = new Message(body.GetBytes())
{
SessionId = body
};
await sender.SendAsync(message).ConfigureAwait(false);
receiver = await sessionClient.AcceptMessageSessionAsync(body);
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body, receivedMessage.Body.GetString());
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
ts.Dispose();
}
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
ts.Complete();
}
await Assert.ThrowsAsync<SessionLockLostException>(async () => await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken));
}
finally
{
await sender.CloseAsync();
await sessionClient.CloseAsync();
if (receiver != null)
{
await receiver.CloseAsync();
}
}
}
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
public async Task TransactionalRequestResponseDispositionTest(string queueName)
{
var sender = new MessageSender(ConnectionString, queueName);
var receiver = new MessageReceiver(ConnectionString, queueName);
try
{
string body = Guid.NewGuid().ToString("N");
var message = new Message(body.GetBytes());
await sender.SendAsync(message).ConfigureAwait(false);
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body, receivedMessage.Body.GetString());
var sequenceNumber = receivedMessage.SystemProperties.SequenceNumber;
await receiver.DeferAsync(receivedMessage.SystemProperties.LockToken);
var deferredMessage = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(deferredMessage.SystemProperties.LockToken);
ts.Dispose();
}
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(deferredMessage.SystemProperties.LockToken);
ts.Complete();
}
await Assert.ThrowsAsync<MessageLockLostException>(async () => await receiver.CompleteAsync(deferredMessage.SystemProperties.LockToken));
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
public async Task TransactionThrowsWhenOperationsOfDifferentPartitionsAreInSameTransaction()
{
var queueName = TestConstants.PartitionedQueueName;
var sender = new MessageSender(ConnectionString, queueName);
var receiver = new MessageReceiver(ConnectionString, queueName);
try
{
string body = Guid.NewGuid().ToString("N");
var message1 = new Message((body + "1").GetBytes())
{
PartitionKey = "1"
};
var message2 = new Message((body + "2").GetBytes())
{
PartitionKey = "2"
};
// Two send operations to different partitions.
var transaction = new CommittableTransaction();
using (TransactionScope ts = new TransactionScope(transaction, TransactionScopeAsyncFlowOption.Enabled))
{
await sender.SendAsync(message1);
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await sender.SendAsync(message2));
ts.Complete();
}
transaction.Rollback();
// Two complete operations to different partitions.
await sender.SendAsync(message1);
await sender.SendAsync(message2);
var receivedMessage1 = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage1);
var receivedMessage2 = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage2);
transaction = new CommittableTransaction();
using (TransactionScope ts = new TransactionScope(transaction, TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage1.SystemProperties.LockToken);
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await receiver.CompleteAsync(receivedMessage2.SystemProperties.LockToken));
ts.Complete();
}
transaction.Rollback();
await receiver.CompleteAsync(receivedMessage1.SystemProperties.LockToken);
await receiver.CompleteAsync(receivedMessage2.SystemProperties.LockToken);
}
catch (Exception e)
{
Console.WriteLine(e);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
public async Task TransactionCommitWorksAcrossClientsUsingSameConnectionToSameEntity()
{
var connection = new ServiceBusConnection(ConnectionString);
var sender = new MessageSender(connection, QueueName);
var receiver = new MessageReceiver(connection, QueueName);
try
{
string body1 = Guid.NewGuid().ToString("N");
string body2 = Guid.NewGuid().ToString("N");
var message = new Message(body1.GetBytes());
var message2 = new Message(body2.GetBytes());
await sender.SendAsync(message).ConfigureAwait(false);
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body1, receivedMessage.Body.GetString());
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
await sender.SendAsync(message2).ConfigureAwait(false);
ts.Complete();
}
// Assert that complete did succeed
await Assert.ThrowsAsync<MessageLockLostException>(async () => await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken));
// Assert that send did succeed
receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body2, receivedMessage.Body.GetString());
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
await connection.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
public async Task TransactionRollbackWorksAcrossClientsUsingSameConnectionToSameEntity()
{
var connection = new ServiceBusConnection(ConnectionString);
var sender = new MessageSender(connection, QueueName);
var receiver = new MessageReceiver(connection, QueueName);
try
{
string body1 = Guid.NewGuid().ToString("N");
string body2 = Guid.NewGuid().ToString("N");
var message = new Message(body1.GetBytes());
var message2 = new Message(body2.GetBytes());
await sender.SendAsync(message).ConfigureAwait(false);
var receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.NotNull(receivedMessage);
Assert.Equal(body1, receivedMessage.Body.GetString());
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
await sender.SendAsync(message2).ConfigureAwait(false);
ts.Dispose();
}
// Following should succeed without exceptions
await receiver.CompleteAsync(receivedMessage.SystemProperties.LockToken);
// Assert that send failed
receivedMessage = await receiver.ReceiveAsync(ReceiveTimeout);
Assert.Null(receivedMessage);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
await connection.CloseAsync();
}
}
}
}