Merge pull request #271 from Azure/dev

Merge from dev to master for 1.0.0 release
This commit is contained in:
vinaysurya 2017-08-10 21:57:51 -07:00 коммит произвёл GitHub
Родитель 1ac0f2f16e 31342b0080
Коммит 4f72af709f
26 изменённых файлов: 410 добавлений и 90 удалений

Просмотреть файл

@ -4,7 +4,6 @@
# Microsoft Azure Service Bus Client for .NET
**Please be aware that this library is currently in active development, and is not intended for production**
|Build/Package|Status|
|------|-------------|

Просмотреть файл

@ -0,0 +1,132 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Amqp
{
using Microsoft.Azure.Amqp;
using System;
using System.Threading;
using System.Threading.Tasks;
sealed class ActiveClientLinkManager
{
static readonly TimeSpan SendTokenTimeout = TimeSpan.FromMinutes(1);
static readonly TimeSpan TokenRefreshBuffer = TimeSpan.FromSeconds(10);
readonly string clientId;
readonly ICbsTokenProvider cbsTokenProvider;
readonly Timer sendReceiveLinkCBSTokenRenewalTimer;
readonly Timer requestResponseLinkCBSTokenRenewalTimer;
ActiveSendReceiveClientLink activeSendReceiveClientLink;
ActiveRequestResponseLink activeRequestResponseClientLink;
public ActiveClientLinkManager(string clientId, ICbsTokenProvider tokenProvider)
{
this.clientId = clientId;
this.cbsTokenProvider = tokenProvider;
this.sendReceiveLinkCBSTokenRenewalTimer = new Timer(OnRenewSendReceiveCBSToken, this, Timeout.Infinite, Timeout.Infinite);
this.requestResponseLinkCBSTokenRenewalTimer = new Timer(OnRenewRequestResponseCBSToken, this, Timeout.Infinite, Timeout.Infinite);
}
public void Close()
{
this.ChangeRenewTimer(this.activeSendReceiveClientLink, Timeout.InfiniteTimeSpan);
this.ChangeRenewTimer(this.activeRequestResponseClientLink, Timeout.InfiniteTimeSpan);
}
public void SetActiveSendReceiveLink(ActiveSendReceiveClientLink sendReceiveClientLink)
{
this.activeSendReceiveClientLink = sendReceiveClientLink;
this.activeSendReceiveClientLink.Link.Closed += this.OnSendReceiveLinkClosed;
if (this.activeSendReceiveClientLink.Link.State == AmqpObjectState.Opened)
{
this.SetRenewCBSTokenTimer(sendReceiveClientLink);
}
}
void OnSendReceiveLinkClosed(object sender, EventArgs e)
{
this.ChangeRenewTimer(this.activeSendReceiveClientLink, Timeout.InfiniteTimeSpan);
}
public void SetActiveRequestResponseLink(ActiveRequestResponseLink requestResponseLink)
{
this.activeRequestResponseClientLink = requestResponseLink;
this.activeRequestResponseClientLink.Link.Closed += this.OnRequestResponseLinkClosed;
if (this.activeRequestResponseClientLink.Link.State == AmqpObjectState.Opened)
{
this.SetRenewCBSTokenTimer(requestResponseLink);
}
}
void OnRequestResponseLinkClosed(object sender, EventArgs e)
{
this.ChangeRenewTimer(this.activeRequestResponseClientLink, Timeout.InfiniteTimeSpan);
}
static async void OnRenewSendReceiveCBSToken(object state)
{
ActiveClientLinkManager thisPtr = (ActiveClientLinkManager)state;
await thisPtr.RenewCBSTokenAsync(thisPtr.activeSendReceiveClientLink).ConfigureAwait(false);
}
static async void OnRenewRequestResponseCBSToken(object state)
{
ActiveClientLinkManager thisPtr = (ActiveClientLinkManager)state;
await thisPtr.RenewCBSTokenAsync(thisPtr.activeRequestResponseClientLink).ConfigureAwait(false);
}
void SetRenewCBSTokenTimer(ActiveClientLinkObject activeClientLinkObject)
{
if (activeClientLinkObject.AuthorizationValidUntilUtc < DateTime.UtcNow)
{
return;
}
TimeSpan interval = activeClientLinkObject.AuthorizationValidUntilUtc.Subtract(DateTime.UtcNow) - ActiveClientLinkManager.TokenRefreshBuffer;
this.ChangeRenewTimer(activeClientLinkObject, interval);
}
void ChangeRenewTimer(ActiveClientLinkObject activeClientLinkObject, TimeSpan dueTime)
{
if (activeClientLinkObject is ActiveSendReceiveClientLink)
{
this.sendReceiveLinkCBSTokenRenewalTimer.Change(dueTime, Timeout.InfiniteTimeSpan);
}
else
{
this.requestResponseLinkCBSTokenRenewalTimer.Change(dueTime, Timeout.InfiniteTimeSpan);
}
}
async Task RenewCBSTokenAsync(ActiveClientLinkObject activeClientLinkObject)
{
try
{
AmqpCbsLink cbsLink = activeClientLinkObject.Connection.Extensions.Find<AmqpCbsLink>() ?? new AmqpCbsLink(activeClientLinkObject.Connection);
MessagingEventSource.Log.AmqpSendAuthenticanTokenStart(activeClientLinkObject.EndpointUri, activeClientLinkObject.Audience, activeClientLinkObject.Audience, activeClientLinkObject.RequiredClaims);
activeClientLinkObject.AuthorizationValidUntilUtc = await cbsLink.SendTokenAsync(
this.cbsTokenProvider,
activeClientLinkObject.EndpointUri,
activeClientLinkObject.Audience,
activeClientLinkObject.Audience,
activeClientLinkObject.RequiredClaims,
ActiveClientLinkManager.SendTokenTimeout).ConfigureAwait(false);
this.SetRenewCBSTokenTimer(activeClientLinkObject);
MessagingEventSource.Log.AmqpSendAuthenticanTokenStop();
}
catch (Exception e)
{
// failed to refresh token, no need to do anything since the server will shut the link itself
MessagingEventSource.Log.AmqpSendAuthenticanTokenException(this.clientId, e);
this.ChangeRenewTimer(activeClientLinkObject, Timeout.InfiniteTimeSpan);
}
}
}
}

Просмотреть файл

@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Amqp
{
using Microsoft.Azure.Amqp;
using System;
abstract class ActiveClientLinkObject
{
readonly string[] requiredClaims;
protected ActiveClientLinkObject(AmqpObject amqpLinkObject, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
{
this.LinkObject = amqpLinkObject;
this.EndpointUri = endpointUri;
this.Audience = audience;
this.requiredClaims = requiredClaims;
this.AuthorizationValidUntilUtc = authorizationValidUntilUtc;
}
public AmqpObject LinkObject { get; }
public string Audience { get; }
public Uri EndpointUri { get; }
public string[] RequiredClaims => (string[])this.requiredClaims.Clone();
public DateTime AuthorizationValidUntilUtc { get; set; }
public abstract AmqpConnection Connection { get; }
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Amqp
{
using Microsoft.Azure.Amqp;
using System;
sealed class ActiveRequestResponseLink : ActiveClientLinkObject
{
public ActiveRequestResponseLink(RequestResponseAmqpLink link, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
: base(link, endpointUri, audience, requiredClaims, authorizationValidUntilUtc)
{
this.Link = link;
}
public RequestResponseAmqpLink Link { get; }
public override AmqpConnection Connection => this.Link.Session.Connection;
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Amqp
{
using Microsoft.Azure.Amqp;
using System;
sealed class ActiveSendReceiveClientLink : ActiveClientLinkObject
{
public ActiveSendReceiveClientLink(AmqpLink link, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc, string clientId)
: base(link, endpointUri, audience, requiredClaims, authorizationValidUntilUtc)
{
this.Link = link;
}
public AmqpLink Link { get; }
public override AmqpConnection Connection => this.Link.Session.Connection;
}
}

Просмотреть файл

@ -7,6 +7,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Sasl;
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.ServiceBus.Primitives;
internal class AmqpConnectionHelper
{
@ -100,12 +101,17 @@ namespace Microsoft.Azure.ServiceBus.Amqp
public static AmqpConnectionSettings CreateAmqpConnectionSettings(uint maxFrameSize, string containerId, string hostName)
{
AmqpConnectionSettings connectionSettings = new AmqpConnectionSettings
var connectionSettings = new AmqpConnectionSettings
{
MaxFrameSize = maxFrameSize,
ContainerId = containerId,
HostName = hostName
};
connectionSettings.AddProperty("product", ClientInfo.Product);
connectionSettings.AddProperty("version", ClientInfo.Version);
connectionSettings.AddProperty("framework", ClientInfo.Framework);
connectionSettings.AddProperty("platform", ClientInfo.Platform);
return connectionSettings;
}
}

Просмотреть файл

@ -160,7 +160,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
return new ServiceBusException(true, message);
}
public static Exception GetClientException(Exception exception, string referenceId = null, Exception innerException = null)
public static Exception GetClientException(Exception exception, string referenceId = null, Exception innerException = null, bool connectionError = false)
{
StringBuilder builder = new StringBuilder();
builder.AppendFormat(CultureInfo.InvariantCulture, exception.Message);
@ -187,10 +187,10 @@ namespace Microsoft.Azure.ServiceBus.Amqp
return new ServiceBusCommunicationException(message, aggregateException);
case AmqpException amqpException:
return amqpException.Error.ToMessagingContractException();
return amqpException.Error.ToMessagingContractException(connectionError);
case OperationCanceledException operationCanceledException when operationCanceledException.InnerException is AmqpException amqpException:
return amqpException.Error.ToMessagingContractException();
return amqpException.Error.ToMessagingContractException(connectionError);
case OperationCanceledException _:
return new ServiceBusException(true, message, aggregateException);

Просмотреть файл

@ -13,14 +13,16 @@ namespace Microsoft.Azure.ServiceBus.Amqp
{
readonly string entityPath;
readonly ServiceBusConnection serviceBusConnection;
readonly Uri endpointAddress;
readonly string[] requiredClaims;
readonly ICbsTokenProvider cbsTokenProvider;
readonly AmqpLinkSettings amqpLinkSettings;
protected AmqpLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings amqpLinkSettings, string clientId)
protected AmqpLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings amqpLinkSettings, string clientId)
{
this.entityPath = entityPath;
this.serviceBusConnection = serviceBusConnection;
this.endpointAddress = endpointAddress;
this.requiredClaims = requiredClaims;
this.cbsTokenProvider = cbsTokenProvider;
this.amqpLinkSettings = amqpLinkSettings;
@ -29,7 +31,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
protected string ClientId { get; }
public async Task<AmqpObject> CreateAndOpenAmqpLinkAsync()
public async Task<Tuple<AmqpObject, DateTime>> CreateAndOpenAmqpLinkAsync()
{
TimeoutHelper timeoutHelper = new TimeoutHelper(this.serviceBusConnection.OperationTimeout);
@ -39,12 +41,10 @@ namespace Microsoft.Azure.ServiceBus.Amqp
// Authenticate over CBS
AmqpCbsLink cbsLink = connection.Extensions.Find<AmqpCbsLink>();
Uri address = new Uri(this.serviceBusConnection.Endpoint, this.entityPath);
string audience = address.AbsoluteUri;
string resource = address.AbsoluteUri;
MessagingEventSource.Log.AmqpSendAuthenticanTokenStart(address, audience, resource, this.requiredClaims);
await cbsLink.SendTokenAsync(this.cbsTokenProvider, address, audience, resource, this.requiredClaims, timeoutHelper.RemainingTime()).ConfigureAwait(false);
string resource = this.endpointAddress.AbsoluteUri;
MessagingEventSource.Log.AmqpSendAuthenticanTokenStart(this.endpointAddress, resource, resource, this.requiredClaims);
DateTime cbsTokenExpiresAtUtc = await cbsLink.SendTokenAsync(this.cbsTokenProvider, this.endpointAddress, resource, resource, this.requiredClaims, timeoutHelper.RemainingTime()).ConfigureAwait(false);
MessagingEventSource.Log.AmqpSendAuthenticanTokenStop();
AmqpSession session = null;
@ -68,7 +68,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
// Create Link
link = this.OnCreateAmqpLink(connection, this.amqpLinkSettings, session);
await link.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
return link;
return new Tuple<AmqpObject, DateTime>(link, cbsTokenExpiresAtUtc);
}
catch (Exception exception)
{
@ -78,7 +78,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
connection,
exception);
throw AmqpExceptionHelper.GetClientException(exception, null, link?.GetInnerException());
throw AmqpExceptionHelper.GetClientException(exception, null, link?.GetInnerException(), session.IsClosing());
}
}

Просмотреть файл

@ -3,6 +3,7 @@
namespace Microsoft.Azure.ServiceBus.Amqp
{
using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.ServiceBus.Primitives;
@ -10,8 +11,8 @@ namespace Microsoft.Azure.ServiceBus.Amqp
{
readonly string entityPath;
public AmqpRequestResponseLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, requiredClaims, cbsTokenProvider, linkSettings, clientId)
public AmqpRequestResponseLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, endpointAddress, requiredClaims, cbsTokenProvider, linkSettings, clientId)
{
this.entityPath = entityPath;
}

Просмотреть файл

@ -3,13 +3,14 @@
namespace Microsoft.Azure.ServiceBus.Amqp
{
using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.ServiceBus.Primitives;
internal class AmqpSendReceiveLinkCreator : AmqpLinkCreator
{
public AmqpSendReceiveLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, requiredClaims, cbsTokenProvider, linkSettings, clientId)
public AmqpSendReceiveLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, endpointAddress, requiredClaims, cbsTokenProvider, linkSettings, clientId)
{
}

Просмотреть файл

@ -47,6 +47,7 @@ namespace Microsoft.Azure.ServiceBus.Core
readonly bool isSessionReceiver;
readonly object messageReceivePumpSyncLock;
readonly bool ownsConnection;
readonly ActiveClientLinkManager clientLinkManager;
int prefetchCount;
long lastPeekedSequenceNumber;
@ -131,6 +132,7 @@ namespace Microsoft.Azure.ServiceBus.Core
this.requestResponseLockedMessages = new ConcurrentExpiringSet<Guid>();
this.PrefetchCount = prefetchCount;
this.messageReceivePumpSyncLock = new object();
this.clientLinkManager = new ActiveClientLinkManager(this.ClientId, this.CbsTokenProvider);
MessagingEventSource.Log.MessageReceiverCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
}
@ -678,6 +680,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// <returns>The asynchronous operation.</returns>
protected override async Task OnClosingAsync()
{
this.clientLinkManager.Close();
lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump != null)
@ -838,7 +841,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, receiveLink?.GetTrackingId());
throw AmqpExceptionHelper.GetClientException(exception, receiveLink?.GetTrackingId(), null, receiveLink?.Session.IsClosing() ?? false);
}
}
@ -1168,8 +1171,21 @@ namespace Microsoft.Azure.ServiceBus.Core
linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.TotalMilliseconds);
AmqpSendReceiveLinkCreator sendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, new[] { ClaimConstants.Listen }, this.CbsTokenProvider, linkSettings, this.ClientId);
ReceivingAmqpLink receivingAmqpLink = (ReceivingAmqpLink)await sendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
Uri endPointAddress = new Uri(this.ServiceBusConnection.Endpoint, this.Path);
string[] claims = new[] { ClaimConstants.Listen };
AmqpSendReceiveLinkCreator sendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, endPointAddress, claims, this.CbsTokenProvider, linkSettings, this.ClientId);
Tuple<AmqpObject, DateTime> linkDetails = await sendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
var receivingAmqpLink = (ReceivingAmqpLink) linkDetails.Item1;
var activeSendReceiveClientLink = new ActiveSendReceiveClientLink(
receivingAmqpLink,
endPointAddress,
endPointAddress.AbsoluteUri,
claims,
linkDetails.Item2,
this.ClientId);
this.clientLinkManager.SetActiveSendReceiveLink(activeSendReceiveClientLink);
MessagingEventSource.Log.AmqpReceiveLinkCreateStop(this.ClientId);
@ -1185,8 +1201,19 @@ namespace Microsoft.Azure.ServiceBus.Core
AmqpLinkSettings linkSettings = new AmqpLinkSettings();
linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
AmqpRequestResponseLinkCreator requestResponseLinkCreator = new AmqpRequestResponseLinkCreator(entityPath, this.ServiceBusConnection, new[] { ClaimConstants.Manage, ClaimConstants.Listen }, this.CbsTokenProvider, linkSettings, this.ClientId);
RequestResponseAmqpLink requestResponseAmqpLink = (RequestResponseAmqpLink)await requestResponseLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
Uri endPointAddress = new Uri(this.ServiceBusConnection.Endpoint, entityPath);
string[] claims = new[] { ClaimConstants.Manage, ClaimConstants.Listen };
AmqpRequestResponseLinkCreator requestResponseLinkCreator = new AmqpRequestResponseLinkCreator(entityPath, this.ServiceBusConnection, endPointAddress, claims, this.CbsTokenProvider, linkSettings, this.ClientId);
Tuple<AmqpObject, DateTime> linkDetails = await requestResponseLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
var requestResponseAmqpLink = (RequestResponseAmqpLink)linkDetails.Item1;
var activeRequestResponseClientLink = new ActiveRequestResponseLink(
requestResponseAmqpLink,
endPointAddress,
endPointAddress.AbsoluteUri,
claims,
linkDetails.Item2);
this.clientLinkManager.SetActiveRequestResponseLink(activeRequestResponseClientLink);
MessagingEventSource.Log.AmqpReceiveLinkCreateStop(this.ClientId);
return requestResponseAmqpLink;

Просмотреть файл

@ -36,6 +36,7 @@ namespace Microsoft.Azure.ServiceBus.Core
{
int deliveryCount;
readonly bool ownsConnection;
readonly ActiveClientLinkManager clientLinkManager;
/// <summary>
/// Creates a new AMQP MessageSender.
@ -96,6 +97,7 @@ namespace Microsoft.Azure.ServiceBus.Core
this.CbsTokenProvider = cbsTokenProvider;
this.SendLinkManager = new FaultTolerantAmqpObject<SendingAmqpLink>(this.CreateLinkAsync, this.CloseSession);
this.RequestResponseLinkManager = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(this.CreateRequestResponseLinkAsync, this.CloseRequestResponseSession);
this.clientLinkManager = new ActiveClientLinkManager(this.ClientId, this.CbsTokenProvider);
MessagingEventSource.Log.MessageSenderCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
}
@ -133,6 +135,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// <summary>Closes the connection.</summary>
protected override async Task OnClosingAsync()
{
this.clientLinkManager.Close();
await this.SendLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
@ -142,7 +145,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
}
private async Task<Message> ProcessMessage(Message message)
async Task<Message> ProcessMessage(Message message)
{
var processedMessage = message;
foreach (var plugin in this.RegisteredPlugins)
@ -165,7 +168,7 @@ namespace Microsoft.Azure.ServiceBus.Core
return processedMessage;
}
private async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
{
if (this.RegisteredPlugins.Count < 1)
{
@ -349,7 +352,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId());
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false);
}
}
}
@ -441,8 +444,21 @@ namespace Microsoft.Azure.ServiceBus.Core
linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)this.EntityType);
}
AmqpSendReceiveLinkCreator sendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, new[] { ClaimConstants.Send }, this.CbsTokenProvider, linkSettings, this.ClientId);
SendingAmqpLink sendingAmqpLink = (SendingAmqpLink)await sendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
Uri endPointAddress = new Uri(this.ServiceBusConnection.Endpoint, this.Path);
string[] claims = new[] {ClaimConstants.Send};
AmqpSendReceiveLinkCreator sendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, endPointAddress, claims, this.CbsTokenProvider, linkSettings, this.ClientId);
Tuple<AmqpObject, DateTime> linkDetails = await sendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
var sendingAmqpLink = (SendingAmqpLink) linkDetails.Item1;
var activeSendReceiveClientLink = new ActiveSendReceiveClientLink(
sendingAmqpLink,
endPointAddress,
endPointAddress.AbsoluteUri,
claims,
linkDetails.Item2,
this.ClientId);
this.clientLinkManager.SetActiveSendReceiveLink(activeSendReceiveClientLink);
MessagingEventSource.Log.AmqpSendLinkCreateStop(this.ClientId);
return sendingAmqpLink;
@ -454,17 +470,29 @@ namespace Microsoft.Azure.ServiceBus.Core
AmqpLinkSettings linkSettings = new AmqpLinkSettings();
linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
Uri endPointAddress = new Uri(this.ServiceBusConnection.Endpoint, entityPath);
string[] claims = new[] { ClaimConstants.Manage, ClaimConstants.Send };
AmqpRequestResponseLinkCreator requestResponseLinkCreator = new AmqpRequestResponseLinkCreator(
entityPath,
this.ServiceBusConnection,
new[] { ClaimConstants.Manage, ClaimConstants.Send },
endPointAddress,
claims,
this.CbsTokenProvider,
linkSettings,
this.ClientId);
RequestResponseAmqpLink requestResponseAmqpLink =
(RequestResponseAmqpLink)await requestResponseLinkCreator.CreateAndOpenAmqpLinkAsync()
.ConfigureAwait(false);
Tuple<AmqpObject, DateTime> linkDetails =
await requestResponseLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
var requestResponseAmqpLink = (RequestResponseAmqpLink) linkDetails.Item1;
var activeRequestResponseClientLink = new ActiveRequestResponseLink(
requestResponseAmqpLink,
endPointAddress,
endPointAddress.AbsoluteUri,
claims,
linkDetails.Item2);
this.clientLinkManager.SetActiveRequestResponseLink(activeRequestResponseClientLink);
return requestResponseAmqpLink;
}

Просмотреть файл

@ -131,6 +131,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
/// <summary>
/// Initializes a DataContractBinarySerializer instance of type T
/// </summary>
public static XmlObjectSerializer Instance = new DataContractBinarySerializer(typeof(T));
public static readonly XmlObjectSerializer Instance = new DataContractBinarySerializer(typeof(T));
}
}

Просмотреть файл

@ -39,16 +39,16 @@ namespace Microsoft.Azure.ServiceBus
/// <summary>
/// Gets the session state.
/// </summary>
/// <returns>The session state stream.</returns>
Task<Stream> GetStateAsync();
/// <returns>The session state as byte array.</returns>
Task<byte[]> GetStateAsync();
/// <summary>
/// Set a custom state on the session which can be later retrieved using <see cref="GetStateAsync"/>
/// </summary>
/// <param name="sessionState">A <see cref="Stream"/> of session state</param>
/// <param name="sessionState">A byte array of session state</param>
/// <returns>The asynchronous operation</returns>
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
Task SetStateAsync(Stream sessionState);
Task SetStateAsync(byte[] sessionState);
/// <summary>
/// Renews the lock on the session specified by the <see cref="SessionId"/>. The lock will be renewed based on the setting specified on the entity.

Просмотреть файл

@ -109,9 +109,14 @@ namespace Microsoft.Azure.ServiceBus
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
if (!(exception is MessageLockLostException))
{
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
}
// AbandonMessageIfNeededAsync should take care of not throwing exception
this.maxConcurrentCallsSemaphoreSlim.Release();
return;
}

Просмотреть файл

@ -42,12 +42,12 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
public string SessionId => this.SessionIdInternal;
public Task<Stream> GetStateAsync()
public Task<byte[]> GetStateAsync()
{
return this.OnGetStateAsync();
}
public Task SetStateAsync(Stream sessionState)
public Task SetStateAsync(byte[] sessionState)
{
return this.OnSetStateAsync(sessionState);
}
@ -67,7 +67,7 @@ namespace Microsoft.Azure.ServiceBus
throw new InvalidOperationException($"{nameof(RenewLockAsync)} is not supported for Session. Use {nameof(RenewSessionLockAsync)} to renew sessions instead");
}
protected async Task<Stream> OnGetStateAsync()
protected async Task<byte[]> OnGetStateAsync()
{
try
{
@ -76,12 +76,12 @@ namespace Microsoft.Azure.ServiceBus
AmqpResponseMessage amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);
Stream sessionState = null;
byte[] sessionState = null;
if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
{
if (amqpResponseMessage.Map[ManagementConstants.Properties.SessionState] != null)
{
sessionState = new BufferListStream(new[] { amqpResponseMessage.GetValue<ArraySegment<byte>>(ManagementConstants.Properties.SessionState) });
sessionState = amqpResponseMessage.GetValue<ArraySegment<byte>>(ManagementConstants.Properties.SessionState).Array;
}
}
else
@ -97,22 +97,16 @@ namespace Microsoft.Azure.ServiceBus
}
}
protected async Task OnSetStateAsync(Stream sessionState)
protected async Task OnSetStateAsync(byte[] sessionState)
{
try
{
if (sessionState != null && sessionState.CanSeek && sessionState.Position != 0)
{
throw new InvalidOperationException(Resources.CannotSerializeSessionStateWithPartiallyConsumedStream);
}
AmqpRequestMessage amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.SetSessionStateOperation, this.OperationTimeout, null);
amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
if (sessionState != null)
{
BufferListStream buffer = BufferListStream.Create(sessionState, AmqpConstants.SegmentSize);
ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
var value = new ArraySegment<byte>(sessionState);
amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = value;
}
else

Просмотреть файл

@ -1239,5 +1239,20 @@ namespace Microsoft.Azure.ServiceBus
{
WriteEvent(106, clientId, sessionId, linkException);
}
[NonEvent]
public void AmqpSendAuthenticanTokenException(string clientId, Exception exception)
{
if (this.IsEnabled())
{
this.AmqpSendAuthenticanTokenException(clientId, exception.ToString());
}
}
[Event(107, Level = EventLevel.Error, Message = "{0}: AmqpSendAuthenticanTokenException Exception: {1}.")]
void AmqpSendAuthenticanTokenException(string clientId, string exception)
{
this.WriteEvent(107, clientId, exception);
}
}
}

Просмотреть файл

@ -3,7 +3,7 @@
<PropertyGroup>
<Description>This is the next generation Azure Service Bus .NET Standard client library that focuses on queues &amp; topics. For more information about Service Bus, see https://azure.microsoft.com/en-us/services/service-bus/</Description>
<AssemblyTitle>Microsoft.Azure.ServiceBus</AssemblyTitle>
<VersionPrefix>1.0.0-RC1</VersionPrefix>
<VersionPrefix>1.0.0</VersionPrefix>
<Authors>Microsoft</Authors>
<TargetFrameworks>net451;netstandard1.3;uap10.0</TargetFrameworks>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>

Просмотреть файл

@ -0,0 +1,47 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
using System.Reflection;
using System.Runtime.Versioning;
internal static class ClientInfo
{
internal static readonly string Product;
internal static readonly string Version;
internal static readonly string Framework;
internal static readonly string Platform;
static ClientInfo()
{
try
{
Assembly assembly = typeof(ClientInfo).GetTypeInfo().Assembly;
Product = GetAssemblyAttributeValue<AssemblyProductAttribute>(assembly, p => p.Product);
Version = GetAssemblyAttributeValue<AssemblyFileVersionAttribute>(assembly, v => v.Version);
Framework = GetAssemblyAttributeValue<TargetFrameworkAttribute>(assembly, f => f.FrameworkName);
#if NETSTANDARD1_3
Platform = System.Runtime.InteropServices.RuntimeInformation.OSDescription;
#elif UAP10_0
Platform = "UAP";
#elif NET451
Platform = Environment.OSVersion.VersionString;
#else
Platform = "Unknown";
#endif
}
catch
{
// ignored
}
}
static string GetAssemblyAttributeValue<T>(Assembly assembly, Func<T, string> getter) where T : Attribute
{
var attribute = assembly.GetCustomAttribute(typeof(T)) as T;
return attribute == null ? null : getter(attribute);
}
}
}

11
src/Microsoft.Azure.ServiceBus/Resources.Designer.cs сгенерированный
Просмотреть файл

@ -121,16 +121,7 @@ namespace Microsoft.Azure.ServiceBus {
return ResourceManager.GetString("CannotSendAnEmptyMessage", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Failed to serialize the session state because its state stream has been partially consumed..
/// </summary>
public static string CannotSerializeSessionStateWithPartiallyConsumedStream {
get {
return ResourceManager.GetString("CannotSerializeSessionStateWithPartiallyConsumedStream", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to &apos;{0}&apos; contains character &apos;{1}&apos; which is not allowed because it is reserved in the Uri scheme..
/// </summary>

Просмотреть файл

@ -135,9 +135,6 @@
<data name="CannotSendAnEmptyMessage" xml:space="preserve">
<value>Sending empty {0} is not a valid operation.</value>
</data>
<data name="CannotSerializeSessionStateWithPartiallyConsumedStream" xml:space="preserve">
<value>Failed to serialize the session state because its state stream has been partially consumed.</value>
</data>
<data name="FailedToSerializeUnsupportedType" xml:space="preserve">
<value>Serialization operation failed due to unsupported type {0}.</value>
</data>

Просмотреть файл

@ -219,7 +219,10 @@ namespace Microsoft.Azure.ServiceBus
MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
callbackExceptionOccured = true;
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
{
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
}
}
finally
{

Просмотреть файл

@ -87,9 +87,9 @@ namespace Microsoft.Azure.ServiceBus
{
System.DateTime LockedUntilUtc { get; }
string SessionId { get; }
System.Threading.Tasks.Task<System.IO.Stream> GetStateAsync();
System.Threading.Tasks.Task<byte[]> GetStateAsync();
System.Threading.Tasks.Task RenewSessionLockAsync();
System.Threading.Tasks.Task SetStateAsync(System.IO.Stream sessionState);
System.Threading.Tasks.Task SetStateAsync(byte[] sessionState);
}
public interface IQueueClient : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity
{
@ -462,7 +462,7 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
public class static DataContractBinarySerializer<T>
{
public static System.Runtime.Serialization.XmlObjectSerializer Instance;
public static readonly System.Runtime.Serialization.XmlObjectSerializer Instance;
}
public class static MessageInteropExtensions
{

Просмотреть файл

@ -5,7 +5,6 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Core;
@ -72,7 +71,11 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
{
var messageId = "test-message1";
var sessionId = Guid.NewGuid().ToString();
await sender.SendAsync(new Message() { MessageId = messageId, SessionId = sessionId });
await sender.SendAsync(new Message()
{
MessageId = messageId,
SessionId = sessionId
});
TestUtility.Log($"Sent Message: {messageId} to Session: {sessionId}");
var sessionReceiver = await sessionClient.AcceptMessageSessionAsync(sessionId);
@ -82,34 +85,28 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
Assert.True(message.MessageId == messageId);
var sessionStateString = "Received Message From Session!";
var sessionState = new MemoryStream(Encoding.UTF8.GetBytes(sessionStateString));
var sessionState = Encoding.UTF8.GetBytes(sessionStateString);
await sessionReceiver.SetStateAsync(sessionState);
TestUtility.Log($"Set Session State: {sessionStateString} for Session: {sessionReceiver.SessionId}");
var returnedSessionState = await sessionReceiver.GetStateAsync();
using (var reader = new StreamReader(returnedSessionState, Encoding.UTF8))
{
var returnedSessionStateString = await reader.ReadToEndAsync();
TestUtility.Log($"Get Session State Returned: {returnedSessionStateString} for Session: {sessionReceiver.SessionId}");
Assert.Equal(sessionStateString, returnedSessionStateString);
}
var returnedSessionStateString = Encoding.UTF8.GetString(returnedSessionState);
TestUtility.Log($"Get Session State Returned: {returnedSessionStateString} for Session: {sessionReceiver.SessionId}");
Assert.Equal(sessionStateString, returnedSessionStateString);
// Complete message using Session Receiver
await sessionReceiver.CompleteAsync(message.SystemProperties.LockToken);
TestUtility.Log($"Completed Message: {message.MessageId} for Session: {sessionReceiver.SessionId}");
sessionStateString = "Completed Message On Session!";
sessionState = new MemoryStream(Encoding.UTF8.GetBytes(sessionStateString));
sessionState = Encoding.UTF8.GetBytes(sessionStateString);
await sessionReceiver.SetStateAsync(sessionState);
TestUtility.Log($"Set Session State: {sessionStateString} for Session: {sessionReceiver.SessionId}");
returnedSessionState = await sessionReceiver.GetStateAsync();
using (var reader = new StreamReader(returnedSessionState, Encoding.UTF8))
{
var returnedSessionStateString = await reader.ReadToEndAsync();
TestUtility.Log($"Get Session State Returned: {returnedSessionStateString} for Session: {sessionReceiver.SessionId}");
Assert.Equal(sessionStateString, returnedSessionStateString);
}
returnedSessionStateString = Encoding.UTF8.GetString(returnedSessionState);
TestUtility.Log($"Get Session State Returned: {returnedSessionStateString} for Session: {sessionReceiver.SessionId}");
Assert.Equal(sessionStateString, returnedSessionStateString);
await sessionReceiver.CloseAsync();
}

Просмотреть файл

@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Xunit;
@ -245,7 +246,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
async (message, token) =>
{
TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}");
count++;
Interlocked.Increment(ref count);
if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete)
{
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
@ -281,7 +282,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
async (message, token) =>
{
TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}");
count++;
Interlocked.Increment(ref count);
await Task.CompletedTask;
},
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete });

Просмотреть файл

@ -52,7 +52,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
Assert.NotNull(session);
Assert.NotNull(message);
this.totalMessageCount++;
Interlocked.Increment(ref this.totalMessageCount);
TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}");
if (this.receiveMode == ReceiveMode.PeekLock && !this.sessionHandlerOptions.AutoComplete)