Some random cleanups (#302)
* remove redundant local variable assignments in OnValueSize * remove redundant local variable assignments in OnValueSize * clean up some whitespace * remove redundant ClientId parameter in ActiveSendReceiveClientLink * use pattern matching * redundant braces * redundant method * remove redundant doco * simplify doc file generation * remove settings that are inferred from the project file name * remove redundant test attributes * remove redundant async state machines
This commit is contained in:
Родитель
e650ac4c4e
Коммит
51f8d77f04
|
@ -73,7 +73,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
ActiveClientLinkManager thisPtr = (ActiveClientLinkManager)state;
|
||||
await thisPtr.RenewCBSTokenAsync(thisPtr.activeRequestResponseClientLink).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
|
||||
async Task RenewCBSTokenAsync(ActiveClientLinkObject activeClientLinkObject)
|
||||
{
|
||||
try
|
||||
|
|
|
@ -8,7 +8,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
sealed class ActiveSendReceiveClientLink : ActiveClientLinkObject
|
||||
{
|
||||
public ActiveSendReceiveClientLink(AmqpLink link, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc, string clientId)
|
||||
public ActiveSendReceiveClientLink(AmqpLink link, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
|
||||
: base(link, endpointUri, audience, requiredClaims, authorizationValidUntilUtc)
|
||||
{
|
||||
this.Link = link;
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
static class AmqpExceptionHelper
|
||||
{
|
||||
static readonly Dictionary<string, AmqpResponseStatusCode> ConditionToStatusMap = new Dictionary<string, AmqpResponseStatusCode>()
|
||||
static readonly Dictionary<string, AmqpResponseStatusCode> ConditionToStatusMap = new Dictionary<string, AmqpResponseStatusCode>
|
||||
{
|
||||
{ AmqpClientConstants.TimeoutError.Value, AmqpResponseStatusCode.RequestTimeout },
|
||||
{ AmqpErrorCode.NotFound.Value, AmqpResponseStatusCode.NotFound },
|
||||
|
@ -179,8 +179,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
return new ServiceBusCommunicationException(message, aggregateException);
|
||||
|
||||
case IOException _:
|
||||
var socketException = exception.InnerException as SocketException;
|
||||
if (socketException != null)
|
||||
if (exception.InnerException is SocketException socketException)
|
||||
{
|
||||
message = builder.AppendFormat(CultureInfo.InvariantCulture, $" ErrorCode: {socketException.SocketErrorCode}").ToString();
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
// Authenticate over CBS
|
||||
AmqpCbsLink cbsLink = connection.Extensions.Find<AmqpCbsLink>();
|
||||
|
||||
|
||||
string resource = this.endpointAddress.AbsoluteUri;
|
||||
MessagingEventSource.Log.AmqpSendAuthenticanTokenStart(this.endpointAddress, resource, resource, this.requiredClaims);
|
||||
DateTime cbsTokenExpiresAtUtc = await cbsLink.SendTokenAsync(this.cbsTokenProvider, this.endpointAddress, resource, resource, this.requiredClaims, timeoutHelper.RemainingTime()).ConfigureAwait(false);
|
||||
|
|
|
@ -57,7 +57,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
if (dataList == null)
|
||||
{
|
||||
dataList = new List<Data>() { ToData(firstAmqpMessage) };
|
||||
dataList = new List<Data> { ToData(firstAmqpMessage) };
|
||||
}
|
||||
|
||||
dataList.Add(ToData(amqpMessage));
|
||||
|
@ -93,7 +93,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
|
||||
{
|
||||
var amqpMessage = sbMessage.Body == null ? AmqpMessage.Create() : AmqpMessage.Create(new Data () { Value = new ArraySegment<byte>(sbMessage.Body) });
|
||||
var amqpMessage = sbMessage.Body == null ? AmqpMessage.Create() : AmqpMessage.Create(new Data { Value = new ArraySegment<byte>(sbMessage.Body) });
|
||||
|
||||
amqpMessage.Properties.MessageId = sbMessage.MessageId;
|
||||
amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId;
|
||||
|
@ -123,7 +123,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
{
|
||||
amqpMessage.MessageAnnotations.Map.Add(ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTimeUtc);
|
||||
}
|
||||
|
||||
|
||||
if (sbMessage.PartitionKey != null)
|
||||
{
|
||||
amqpMessage.MessageAnnotations.Map.Add(PartitionKeyName, sbMessage.PartitionKey);
|
||||
|
@ -161,7 +161,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
}
|
||||
|
||||
SBMessage sbMessage;
|
||||
|
||||
|
||||
if ((amqpMessage.BodyType & SectionFlag.AmqpValue) != 0
|
||||
&& amqpMessage.ValueBody.Value != null)
|
||||
{
|
||||
|
@ -170,7 +170,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
object dotNetObject = null;
|
||||
if (TryGetNetObjectFromAmqpObject(amqpMessage.ValueBody.Value, MappingType.MessageBody, out dotNetObject))
|
||||
{
|
||||
sbMessage.SystemProperties.BodyObject = dotNetObject;
|
||||
sbMessage.SystemProperties.BodyObject = dotNetObject;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -398,7 +398,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
case AmqpCorrelationFilterCodec.Code:
|
||||
var amqpCorrelationFilter = (AmqpCorrelationFilterCodec)amqpFilter;
|
||||
var correlationFilter = new CorrelationFilter()
|
||||
var correlationFilter = new CorrelationFilter
|
||||
{
|
||||
CorrelationId = amqpCorrelationFilter.CorrelationId,
|
||||
MessageId = amqpCorrelationFilter.MessageId,
|
||||
|
@ -672,7 +672,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
|
||||
return correlationFilterMap;
|
||||
}
|
||||
|
||||
|
||||
static AmqpMap GetRuleActionMap(SqlRuleAction sqlRuleAction)
|
||||
{
|
||||
AmqpMap ruleActionMap = null;
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
|
|||
AmqpRequestMessage(string operation, TimeSpan timeout, string trackingId)
|
||||
{
|
||||
this.Map = new AmqpMap();
|
||||
this.requestMessage = AmqpMessage.Create(new AmqpValue() { Value = this.Map });
|
||||
this.requestMessage = AmqpMessage.Create(new AmqpValue { Value = this.Map });
|
||||
this.requestMessage.ApplicationProperties.Map[ManagementConstants.Request.Operation] = operation;
|
||||
this.requestMessage.ApplicationProperties.Map[ManagementConstants.Properties.ServerTimeout] = (uint)timeout.TotalMilliseconds;
|
||||
this.requestMessage.ApplicationProperties.Map[ManagementConstants.Properties.TrackingId] = trackingId ?? Guid.NewGuid().ToString();
|
||||
|
|
|
@ -112,17 +112,15 @@ namespace Microsoft.Azure.ServiceBus.Amqp.Framing
|
|||
|
||||
protected override int OnValueSize()
|
||||
{
|
||||
int valueSize = AmqpCodec.GetStringEncodeSize(this.CorrelationId);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.MessageId);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.To);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.ReplyTo);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.Label);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.SessionId);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.ReplyToSessionId);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.ContentType);
|
||||
valueSize += AmqpCodec.GetMapEncodeSize(this.properties);
|
||||
|
||||
return valueSize;
|
||||
return AmqpCodec.GetStringEncodeSize(this.CorrelationId) +
|
||||
AmqpCodec.GetStringEncodeSize(this.MessageId) +
|
||||
AmqpCodec.GetStringEncodeSize(this.To) +
|
||||
AmqpCodec.GetStringEncodeSize(this.ReplyTo) +
|
||||
AmqpCodec.GetStringEncodeSize(this.Label) +
|
||||
AmqpCodec.GetStringEncodeSize(this.SessionId) +
|
||||
AmqpCodec.GetStringEncodeSize(this.ReplyToSessionId) +
|
||||
AmqpCodec.GetStringEncodeSize(this.ContentType) +
|
||||
AmqpCodec.GetMapEncodeSize(this.properties);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -70,12 +70,10 @@ namespace Microsoft.Azure.ServiceBus.Amqp.Framing
|
|||
|
||||
protected override int OnValueSize()
|
||||
{
|
||||
var valueSize = AmqpCodec.GetSerializableEncodeSize(this.Filter);
|
||||
valueSize += AmqpCodec.GetSerializableEncodeSize(this.Action);
|
||||
valueSize += AmqpCodec.GetStringEncodeSize(this.RuleName);
|
||||
valueSize += AmqpCodec.GetTimeStampEncodeSize(this.CreatedAt);
|
||||
|
||||
return valueSize;
|
||||
return AmqpCodec.GetSerializableEncodeSize(this.Filter) +
|
||||
AmqpCodec.GetSerializableEncodeSize(this.Action) +
|
||||
AmqpCodec.GetStringEncodeSize(this.RuleName) +
|
||||
AmqpCodec.GetTimeStampEncodeSize(this.CreatedAt);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -51,10 +51,8 @@ namespace Microsoft.Azure.ServiceBus.Amqp.Framing
|
|||
|
||||
protected override int OnValueSize()
|
||||
{
|
||||
var valueSize = AmqpCodec.GetStringEncodeSize(this.Expression);
|
||||
valueSize += AmqpCodec.GetIntEncodeSize(this.CompatibilityLevel);
|
||||
|
||||
return valueSize;
|
||||
return AmqpCodec.GetStringEncodeSize(this.Expression) +
|
||||
AmqpCodec.GetIntEncodeSize(this.CompatibilityLevel);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -59,10 +59,8 @@ namespace Microsoft.Azure.ServiceBus.Amqp.Framing
|
|||
|
||||
protected override int OnValueSize()
|
||||
{
|
||||
var valueSize = AmqpCodec.GetStringEncodeSize(this.SqlExpression);
|
||||
valueSize += AmqpCodec.GetIntEncodeSize(this.CompatibilityLevel);
|
||||
|
||||
return valueSize;
|
||||
return AmqpCodec.GetStringEncodeSize(this.SqlExpression) +
|
||||
AmqpCodec.GetIntEncodeSize(this.CompatibilityLevel);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,10 +19,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
readonly string clientTypeName;
|
||||
readonly object syncLock;
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="clientTypeName"></param>
|
||||
/// <param name="postfix"></param>
|
||||
/// <param name="retryPolicy"></param>
|
||||
protected ClientEntity(string clientTypeName, string postfix, RetryPolicy retryPolicy)
|
||||
{
|
||||
this.clientTypeName = clientTypeName;
|
||||
|
@ -59,7 +55,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <summary>
|
||||
/// Closes the Client. Closes the connections opened by it.
|
||||
/// </summary>
|
||||
/// <returns>The asynchronous operation</returns>
|
||||
public async Task CloseAsync()
|
||||
{
|
||||
bool callClose = false;
|
||||
|
@ -95,12 +90,8 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <param name="serviceBusPluginName">The name <see cref="ServiceBusPlugin.Name"/> to be unregistered</param>
|
||||
public abstract void UnregisterPlugin(string serviceBusPluginName);
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns></returns>
|
||||
protected abstract Task OnClosingAsync();
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns></returns>
|
||||
protected static long GetNextId()
|
||||
{
|
||||
return Interlocked.Increment(ref nextId);
|
||||
|
@ -126,11 +117,10 @@ namespace Microsoft.Azure.ServiceBus
|
|||
throw new ObjectDisposedException($"{this.clientTypeName} with Id '{this.ClientId}' has already been closed. Please create a new {this.clientTypeName}.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Updates the client id.
|
||||
/// </summary>
|
||||
/// <param name="newClientId"></param>
|
||||
internal void UpdateClientId(string newClientId)
|
||||
{
|
||||
MessagingEventSource.Log.UpdateClientId(this.ClientId, newClientId);
|
||||
|
|
|
@ -32,7 +32,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
public static readonly TimeSpan MaximumRenewBufferDuration = TimeSpan.FromSeconds(10);
|
||||
|
||||
public static readonly TimeSpan DefaultRetryDeltaBackoff = TimeSpan.FromSeconds(3);
|
||||
|
||||
|
||||
public static readonly TimeSpan NoMessageBackoffTimeSpan = TimeSpan.FromSeconds(5);
|
||||
}
|
||||
}
|
|
@ -18,7 +18,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName),
|
||||
/// ReceiveMode.PeekLock);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Receive a message from the Subscription.
|
||||
/// <code>
|
||||
/// var message = await messageReceiver.ReceiveAsync();
|
||||
|
@ -26,8 +26,8 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </code>
|
||||
/// </example>
|
||||
/// <remarks>
|
||||
/// The MessageReceiver provides advanced functionality that is not found in the
|
||||
/// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
|
||||
/// The MessageReceiver provides advanced functionality that is not found in the
|
||||
/// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
|
||||
/// <see cref="ReceiveAsync()"/>, which allows you to receive messages on demand, but also requires
|
||||
/// you to manually renew locks using <see cref="RenewLockAsync(Message)"/>.
|
||||
/// </remarks>
|
||||
|
@ -75,7 +75,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Receives a specific deferred message identified by <paramref name="sequenceNumber"/>.
|
||||
/// </summary>
|
||||
/// <param name="sequenceNumber">The sequence number of the message that will be received.</param>
|
||||
/// <returns>Message identified by sequence number <paramref name="sequenceNumber"/>. Returns null if no such message is found.
|
||||
/// <returns>Message identified by sequence number <paramref name="sequenceNumber"/>. Returns null if no such message is found.
|
||||
/// Throws if the message has not been deferred.</returns>
|
||||
/// <seealso cref="DeferAsync"/>
|
||||
Task<Message> ReceiveDeferredMessageAsync(long sequenceNumber);
|
||||
|
@ -93,25 +93,23 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Completes a series of <see cref="Message"/> using a list of lock tokens. This will delete the message from the service.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// </remarks>
|
||||
/// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding messages to complete.</param>
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task CompleteAsync(IEnumerable<string> lockTokens);
|
||||
|
||||
/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
|
||||
/// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive this message again in the future, you will need to save the <see cref="Message.SystemPropertiesCollection.SequenceNumber"/>
|
||||
/// and receive it using <see cref="ReceiveDeferredMessageAsync(long)"/>.
|
||||
/// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task DeferAsync(string lockToken);
|
||||
|
||||
/// <summary>
|
||||
|
@ -119,19 +117,18 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </summary>
|
||||
/// <param name="message"> <see cref="Message" />.</param>
|
||||
/// <remarks>
|
||||
/// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on the server for this
|
||||
/// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on the server for this
|
||||
/// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration).
|
||||
/// If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is renewed by
|
||||
/// the entity's LockDuration.
|
||||
/// If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is renewed by
|
||||
/// the entity's LockDuration.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task RenewLockAsync(Message message);
|
||||
|
||||
/// <summary>
|
||||
/// Fetches the next active message without changing the state of the receiver or the message source.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// fetches the subsequent message in the entity.
|
||||
/// Unlike a received messaged, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Defered/Deadlettered/Renewed.
|
||||
/// Also, unlike <see cref="ReceiveAsync()"/>, this method will fetch even Deferred messages (but not Deadlettered message)
|
||||
|
@ -143,7 +140,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Fetches the next batch of active messages without changing the state of the receiver or the message source.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// fetches the subsequent message in the entity.
|
||||
/// Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Defered/Deadlettered/Renewed.
|
||||
/// Also, unlike <see cref="ReceiveAsync()"/>, this method will fetch even Deferred messages (but not Deadlettered message)
|
||||
|
|
|
@ -13,7 +13,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// namespaceConnectionString,
|
||||
/// queueName)
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Send message
|
||||
/// <code>
|
||||
/// byte[] data = GetData();
|
||||
|
|
|
@ -22,16 +22,16 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Setting the value to zero turns prefetch off.
|
||||
/// Defaults to 0.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// </para>
|
||||
/// <para>Updates to this value take effect on the next receive call to the service.</para>
|
||||
/// </remarks>
|
||||
|
@ -54,7 +54,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks>Enable prefetch to speed up the receive rate.
|
||||
/// <remarks>Enable prefetch to speed up the receive rate.
|
||||
/// Use <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, MessageHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler);
|
||||
|
||||
|
@ -72,23 +72,21 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to complete.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task CompleteAsync(string lockToken);
|
||||
|
||||
/// <summary>
|
||||
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
|
||||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// Abandoning a message will increase the delivery count on the message.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task AbandonAsync(string lockToken);
|
||||
|
||||
/// <summary>
|
||||
|
@ -96,13 +94,12 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
|
||||
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task DeadLetterAsync(string lockToken);
|
||||
}
|
||||
}
|
|
@ -25,7 +25,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName),
|
||||
/// ReceiveMode.PeekLock);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Receive a message from the Subscription.
|
||||
/// <code>
|
||||
/// var message = await messageReceiver.ReceiveAsync();
|
||||
|
@ -33,8 +33,8 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </code>
|
||||
/// </example>
|
||||
/// <remarks>
|
||||
/// The MessageReceiver provides advanced functionality that is not found in the
|
||||
/// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
|
||||
/// The MessageReceiver provides advanced functionality that is not found in the
|
||||
/// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
|
||||
/// <see cref="ReceiveAsync()"/>, which allows you to receive messages on demand, but also requires
|
||||
/// you to manually renew locks using <see cref="RenewLockAsync(Message)"/>.
|
||||
/// It uses AMQP protocol to communicate with service.
|
||||
|
@ -60,7 +60,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// <param name="connectionStringBuilder">The <see cref="ServiceBusConnectionStringBuilder"/> having entity level connection details.</param>
|
||||
/// <param name="receiveMode">The <see cref="ServiceBus.ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
|
||||
/// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bus. Defaults to <see cref="RetryPolicy.Default"/>.</param>
|
||||
/// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this receiver
|
||||
/// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this receiver
|
||||
/// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
|
||||
/// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
|
||||
public MessageReceiver(
|
||||
|
@ -76,11 +76,11 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Creates a new MessageReceiver from a specified connection string and entity path.
|
||||
/// </summary>
|
||||
/// <param name="connectionString">Namespace connection string used to communicate with Service Bus. Must not contain Entity details.</param>
|
||||
/// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path.
|
||||
/// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path.
|
||||
/// You can use <see cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/>, to help create this path.</param>
|
||||
/// <param name="receiveMode">The <see cref="ServiceBus.ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
|
||||
/// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bus. Defaults to <see cref="RetryPolicy.Default"/></param>
|
||||
/// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this receiver
|
||||
/// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this receiver
|
||||
/// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
|
||||
/// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
|
||||
public MessageReceiver(
|
||||
|
@ -123,7 +123,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
this.ReceiveMode = receiveMode;
|
||||
this.OperationTimeout = serviceBusConnection.OperationTimeout;
|
||||
this.Path = entityPath;
|
||||
this.EntityType = entityType;
|
||||
this.EntityType = entityType;
|
||||
this.CbsTokenProvider = cbsTokenProvider;
|
||||
this.SessionIdInternal = sessionId;
|
||||
this.isSessionReceiver = isSessionReceiver;
|
||||
|
@ -153,16 +153,16 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Setting the value to zero turns prefetch off.
|
||||
/// Defaults to 0.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// </para>
|
||||
/// <para>Prefetch also works equivalently with the <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, Func{ExceptionReceivedEventArgs, Task})"/> APIs.</para>
|
||||
/// <para>Updates to this value take effect on the next receive call to the service.</para>
|
||||
|
@ -320,7 +320,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Receives a specific deferred message identified by <paramref name="sequenceNumber"/>.
|
||||
/// </summary>
|
||||
/// <param name="sequenceNumber">The sequence number of the message that will be received.</param>
|
||||
/// <returns>Message identified by sequence number <paramref name="sequenceNumber"/>. Returns null if no such message is found.
|
||||
/// <returns>Message identified by sequence number <paramref name="sequenceNumber"/>. Returns null if no such message is found.
|
||||
/// Throws if the message has not been deferred.</returns>
|
||||
/// <seealso cref="DeferAsync"/>
|
||||
public async Task<Message> ReceiveDeferredMessageAsync(long sequenceNumber)
|
||||
|
@ -345,7 +345,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
{
|
||||
this.ThrowIfClosed();
|
||||
this.ThrowIfNotPeekLockMode();
|
||||
|
||||
|
||||
int count = MessageReceiver.ValidateSequenceNumbers(sequenceNumbers);
|
||||
|
||||
MessagingEventSource.Log.MessageReceiveDeferredMessageStart(this.ClientId, count, sequenceNumbers);
|
||||
|
@ -376,11 +376,10 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to complete.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task CompleteAsync(string lockToken)
|
||||
{
|
||||
return this.CompleteAsync(new[] { lockToken });
|
||||
|
@ -390,12 +389,11 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Completes a series of <see cref="Message"/> using a list of lock tokens. This will delete the message from the service.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding messages to complete.</param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public async Task CompleteAsync(IEnumerable<string> lockTokens)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -426,12 +424,11 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
|
||||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// Abandoning a message will increase the delivery count on the message.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public async Task AbandonAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -459,14 +456,13 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
|
||||
/// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive this message again in the future, you will need to save the <see cref="Message.SystemPropertiesCollection.SequenceNumber"/>
|
||||
/// and receive it using <see cref="ReceiveDeferredMessageAsync(long)"/>.
|
||||
/// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public async Task DeferAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -497,13 +493,12 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
|
||||
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
|
||||
/// This operation can only be performed on messages that were received by this receiver.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public async Task DeadLetterAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -534,12 +529,11 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// </summary>
|
||||
/// <param name="message"> <see cref="Message" />.</param>
|
||||
/// <remarks>
|
||||
/// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on the server for this
|
||||
/// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on the server for this
|
||||
/// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration).
|
||||
/// If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is renewed by
|
||||
/// the entity's LockDuration.
|
||||
/// If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is renewed by
|
||||
/// the entity's LockDuration.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public async Task RenewLockAsync(Message message)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -569,7 +563,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Fetches the next active message without changing the state of the receiver or the message source.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// fetches the subsequent message in the entity.
|
||||
/// Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Defered/Deadlettered/Renewed.
|
||||
/// Also, unlike <see cref="ReceiveAsync()"/>, this method will fetch even Deferred messages (but not Deadlettered message)
|
||||
|
@ -584,7 +578,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// Fetches the next batch of active messages without changing the state of the receiver or the message source.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequent call
|
||||
/// fetches the subsequent message in the entity.
|
||||
/// Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Defered/Deadlettered/Renewed.
|
||||
/// Also, unlike <see cref="ReceiveAsync()"/>, this method will fetch even Deferred messages (but not Deadlettered message)
|
||||
|
@ -748,8 +742,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
return responseMessage;
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected override async Task OnClosingAsync()
|
||||
{
|
||||
this.clientLinkManager.Close();
|
||||
|
@ -771,10 +763,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="maxMessageCount"></param>
|
||||
/// <param name="serverWaitTime"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
|
||||
{
|
||||
ReceivingAmqpLink receiveLink = null;
|
||||
|
@ -786,7 +774,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
|
||||
try
|
||||
{
|
||||
TimeoutHelper timeoutHelper = new TimeoutHelper(serverWaitTime, true);
|
||||
TimeoutHelper timeoutHelper = new TimeoutHelper(serverWaitTime, true);
|
||||
if(!this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
|
||||
{
|
||||
MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdInternal, false, this.LinkException);
|
||||
|
@ -836,7 +824,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
brokeredMessages.Add(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if(brokeredMessages != null)
|
||||
{
|
||||
break;
|
||||
|
@ -852,10 +840,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="fromSequenceNumber"></param>
|
||||
/// <param name="messageCount"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task<IList<Message>> OnPeekAsync(long fromSequenceNumber, int messageCount = 1)
|
||||
{
|
||||
try
|
||||
|
@ -912,9 +896,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="sequenceNumbers"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task<IList<Message>> OnReceiveDeferredMessageAsync(IEnumerable<long> sequenceNumbers)
|
||||
{
|
||||
List<Message> messages = new List<Message>();
|
||||
|
@ -957,9 +938,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
return messages;
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="lockTokens"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task OnCompleteAsync(IEnumerable<string> lockTokens)
|
||||
{
|
||||
var lockTokenGuids = lockTokens.Select(lt => new Guid(lt));
|
||||
|
@ -973,9 +951,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="lockToken"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task OnAbandonAsync(string lockToken)
|
||||
{
|
||||
IEnumerable<Guid> lockTokens = new[] { new Guid(lockToken) };
|
||||
|
@ -989,9 +964,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="lockToken"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task OnDeferAsync(string lockToken)
|
||||
{
|
||||
IEnumerable<Guid> lockTokens = new[] { new Guid(lockToken) };
|
||||
|
@ -1001,13 +973,10 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
else
|
||||
{
|
||||
await this.DisposeMessagesAsync(lockTokens, new Modified() { UndeliverableHere = true }).ConfigureAwait(false);
|
||||
await this.DisposeMessagesAsync(lockTokens, new Modified { UndeliverableHere = true }).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="lockToken"></param>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected virtual async Task OnDeadLetterAsync(string lockToken)
|
||||
{
|
||||
IEnumerable<Guid> lockTokens = new[] { new Guid(lockToken) };
|
||||
|
@ -1021,9 +990,6 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="lockToken"></param>
|
||||
/// <returns>The asynchronour operation.</returns>
|
||||
protected virtual async Task<DateTime> OnRenewLockAsync(string lockToken)
|
||||
{
|
||||
DateTime lockedUntilUtc = DateTime.MinValue;
|
||||
|
@ -1182,7 +1148,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
if (!this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
|
||||
{
|
||||
MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdInternal, false, this.LinkException);
|
||||
receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
|
||||
receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
Task<Outcome>[] disposeMessageTasks = new Task<Outcome>[deliveryTags.Count];
|
||||
|
@ -1294,8 +1260,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
endPointAddress,
|
||||
endPointAddress.AbsoluteUri,
|
||||
claims,
|
||||
linkDetails.Item2,
|
||||
this.ClientId);
|
||||
linkDetails.Item2);
|
||||
|
||||
this.clientLinkManager.SetActiveSendReceiveLink(activeSendReceiveClientLink);
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// namespaceConnectionString,
|
||||
/// queueName)
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Send message
|
||||
/// <code>
|
||||
/// byte[] data = GetData();
|
||||
|
@ -59,8 +59,8 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
/// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bus. Defaults to <see cref="RetryPolicy.Default"/></param>
|
||||
/// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
|
||||
public MessageSender(
|
||||
string connectionString,
|
||||
string entityPath,
|
||||
string connectionString,
|
||||
string entityPath,
|
||||
RetryPolicy retryPolicy = null)
|
||||
: this(entityPath, null, new ServiceBusNamespaceConnection(connectionString), null, retryPolicy)
|
||||
{
|
||||
|
@ -405,7 +405,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
{
|
||||
if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink))
|
||||
{
|
||||
amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
|
||||
amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
|
||||
}
|
||||
if (amqpLink.Settings.MaxMessageSize.HasValue)
|
||||
{
|
||||
|
@ -525,8 +525,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
endPointAddress,
|
||||
endPointAddress.AbsoluteUri,
|
||||
claims,
|
||||
linkDetails.Item2,
|
||||
this.ClientId);
|
||||
linkDetails.Item2);
|
||||
|
||||
this.clientLinkManager.SetActiveSendReceiveLink(activeSendReceiveClientLink);
|
||||
|
||||
|
@ -551,7 +550,7 @@ namespace Microsoft.Azure.ServiceBus.Core
|
|||
linkSettings,
|
||||
this.ClientId);
|
||||
|
||||
Tuple<AmqpObject, DateTime> linkDetails =
|
||||
Tuple<AmqpObject, DateTime> linkDetails =
|
||||
await requestResponseLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
|
||||
|
||||
var requestResponseAmqpLink = (RequestResponseAmqpLink) linkDetails.Item1;
|
||||
|
|
|
@ -42,7 +42,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
/// <param name="topicPath">The name of the topic, including slashes.</param>
|
||||
/// <param name="subscriptionName">The name of the subscription.</param>
|
||||
/// <returns></returns>
|
||||
public static string FormatSubscriptionPath(string topicPath, string subscriptionName)
|
||||
{
|
||||
return string.Concat(topicPath, PathDelimiter, Subscriptions, PathDelimiter, subscriptionName);
|
||||
|
|
|
@ -8,13 +8,13 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
/// <summary>Message completion operation</summary>
|
||||
public const string Complete = "Complete";
|
||||
|
||||
|
||||
/// <summary>Message abandon operation</summary>
|
||||
public const string Abandon = "Abandon";
|
||||
|
||||
|
||||
/// <summary>User message handler invocation</summary>
|
||||
public const string UserCallback = "UserCallback";
|
||||
|
||||
|
||||
/// <summary>Message receive operation</summary>
|
||||
public const string Receive = "Receive";
|
||||
|
||||
|
|
|
@ -10,8 +10,8 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
|
||||
/// <summary>
|
||||
/// This class describes a serializer class used to serialize and deserialize an Object.
|
||||
/// This class is almost identical to DataContractSerializer; only difference is that
|
||||
/// ReadObject(Stream) and WriteObject(Stream, object) pick Binary Xml Reader/Writer
|
||||
/// This class is almost identical to DataContractSerializer; only difference is that
|
||||
/// ReadObject(Stream) and WriteObject(Stream, object) pick Binary Xml Reader/Writer
|
||||
/// instead of text.
|
||||
/// </summary>
|
||||
sealed class DataContractBinarySerializer : XmlObjectSerializer
|
||||
|
@ -21,7 +21,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// Initializes a new DataContractBinarySerializer instance
|
||||
/// </summary>
|
||||
/// <param name="type"></param>
|
||||
public DataContractBinarySerializer(Type type)
|
||||
{
|
||||
this.dataContractSerializer = new DataContractSerializer(type);
|
||||
|
@ -30,9 +29,8 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// Converts from stream to the corresponding object
|
||||
/// </summary>
|
||||
/// <param name="stream"></param>
|
||||
/// <returns>Object corresponding to the stream</returns>
|
||||
// <remarks>Override the default (Text) and use Binary Xml Reader instead</remarks>
|
||||
/// <remarks>Override the default (Text) and use Binary Xml Reader instead</remarks>
|
||||
public override object ReadObject(Stream stream)
|
||||
{
|
||||
return this.ReadObject(XmlDictionaryReader.CreateBinaryReader(stream, XmlDictionaryReaderQuotas.Max));
|
||||
|
@ -41,8 +39,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// Serializes the object into the stream
|
||||
/// </summary>
|
||||
/// <param name="stream"></param>
|
||||
/// <param name="graph"></param>
|
||||
/// <remarks>Override the default (Text) and use Binary Xml Reader instead</remarks>
|
||||
public override void WriteObject(Stream stream, object graph)
|
||||
{
|
||||
|
@ -59,8 +55,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// Serializes the object into the stream using the XmlDictionaryWriter
|
||||
/// </summary>
|
||||
/// <param name="writer"></param>
|
||||
/// <param name="graph"></param>
|
||||
public override void WriteObject(XmlDictionaryWriter writer, object graph)
|
||||
{
|
||||
if (writer == null)
|
||||
|
@ -74,8 +68,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// This method simply delegates to the DataContractSerializer implementation
|
||||
/// </summary>
|
||||
/// <param name="reader"></param>
|
||||
/// <returns></returns>
|
||||
public override bool IsStartObject(XmlDictionaryReader reader)
|
||||
{
|
||||
return this.dataContractSerializer.IsStartObject(reader);
|
||||
|
@ -84,9 +76,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// This method simply delegates to the DataContractSerializer implementation
|
||||
/// </summary>
|
||||
/// <param name="reader"></param>
|
||||
/// <param name="verifyObjectName"></param>
|
||||
/// <returns></returns>
|
||||
public override object ReadObject(XmlDictionaryReader reader, bool verifyObjectName)
|
||||
{
|
||||
return this.dataContractSerializer.ReadObject(reader, verifyObjectName);
|
||||
|
@ -95,7 +84,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// This method simply delegates to the DataContractSerializer implementation
|
||||
/// </summary>
|
||||
/// <param name="writer"></param>
|
||||
public override void WriteEndObject(XmlDictionaryWriter writer)
|
||||
{
|
||||
this.dataContractSerializer.WriteEndObject(writer);
|
||||
|
@ -104,8 +92,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// This method simply delegates to the DataContractSerializer implementation
|
||||
/// </summary>
|
||||
/// <param name="writer"></param>
|
||||
/// <param name="graph"></param>
|
||||
public override void WriteObjectContent(XmlDictionaryWriter writer, object graph)
|
||||
{
|
||||
this.dataContractSerializer.WriteObjectContent(writer, graph);
|
||||
|
@ -114,8 +100,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// This method simply delegates to the DataContractSerializer implementation
|
||||
/// </summary>
|
||||
/// <param name="writer"></param>
|
||||
/// <param name="graph"></param>
|
||||
public override void WriteStartObject(XmlDictionaryWriter writer, object graph)
|
||||
{
|
||||
this.dataContractSerializer.WriteStartObject(writer, graph);
|
||||
|
@ -125,7 +109,6 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <summary>
|
||||
/// Returns a static <see cref="DataContractBinarySerializer"/> instance of type T
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
public static class DataContractBinarySerializer<T>
|
||||
{
|
||||
/// <summary>
|
||||
|
|
|
@ -8,10 +8,10 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
using System.Runtime.Serialization;
|
||||
|
||||
/// <summary>
|
||||
/// A Message Extension Class that provides extension methods to deserialize
|
||||
/// A Message Extension Class that provides extension methods to deserialize
|
||||
/// the body of a message that was serialized and sent to ServiceBus Queue/Topic
|
||||
/// using the WindowsAzure.Messaging client library. The WindowsAzure.Messaging
|
||||
/// client libary serializes objects using the
|
||||
/// client libary serializes objects using the
|
||||
/// <see cref="DataContractBinarySerializer"/> (default serializer) or <see cref="DataContractSerializer"/>
|
||||
/// when sending message. This class provides extension methods to deserialize
|
||||
/// and retrieve the body of such messages.
|
||||
|
@ -19,14 +19,14 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// <remarks>
|
||||
/// 1. If a message is only being sent and received using this Microsoft.Azure.ServiceBus
|
||||
/// client library, then the below extension methods are not relevant and should not be used.
|
||||
///
|
||||
/// 2. If this client library will be used to receive messages that were sent using both
|
||||
///
|
||||
/// 2. If this client library will be used to receive messages that were sent using both
|
||||
/// WindowsAzure.Messaging client library and this (Microsoft.Azure.ServiceBus) library,
|
||||
/// then the Users need to add a User property <see cref="Message.UserProperties"/>
|
||||
/// while sending the message. On receiving the message, this property can be examined to
|
||||
/// determine if the message was from WindowsAzure.Messaging client library and if so
|
||||
/// use the message.GetBody() extension method to get the actual body associated with the message.
|
||||
///
|
||||
///
|
||||
/// ----------------------------------------------
|
||||
/// Scenarios to use the GetBody Extension method:
|
||||
/// ----------------------------------------------
|
||||
|
@ -35,38 +35,38 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
/// var message1 = new BrokeredMessage("contoso"); // Sending a plain string
|
||||
/// var message2 = new BrokeredMessage(sampleObject); // Sending an actual customer object
|
||||
/// var message3 = new BrokeredMessage(Encoding.UTF8.GetBytes("contoso")); // Sending a UTF8 encoded byte array object
|
||||
///
|
||||
///
|
||||
/// await messageSender.SendAsync(message1);
|
||||
/// await messageSender.SendAsync(message2);
|
||||
/// await messageSender.SendAsync(message3);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Then retreive the original objects using this client library as follows:
|
||||
/// (By default <see cref="DataContractBinarySerializer"/> will be used to deserialize and retrieve the body.
|
||||
/// If a serializer other than that was used, pass in the serializer explicitly.)
|
||||
/// <code>
|
||||
/// var message1 = await messageReceiver.ReceiveAsync();
|
||||
/// var returnedData1 = message1.GetBody<string>();
|
||||
///
|
||||
///
|
||||
/// var message2 = await messageReceiver.ReceiveAsync();
|
||||
/// var returnedData2 = message1.GetBody<SampleObject>();
|
||||
///
|
||||
///
|
||||
/// var message3 = await messageReceiver.ReceiveAsync();
|
||||
/// var returnedData3Bytes = message1.GetBody<byte[]>();
|
||||
/// Console.WriteLine($"Message3 String: {Encoding.UTF8.GetString(returnedData3Bytes)}");
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// -------------------------------------------------
|
||||
/// Scenarios to NOT use the GetBody Extension method:
|
||||
/// -------------------------------------------------
|
||||
/// If message was sent using the WindowsAzure.Messaging client library as follows:
|
||||
/// var message4 = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes("contoso")));
|
||||
/// await messageSender.SendAsync(message4);
|
||||
///
|
||||
///
|
||||
/// Then retreive the original objects using this client library as follows:
|
||||
/// var message4 = await messageReceiver.ReceiveAsync();
|
||||
/// string returned = Encoding.UTF8.GetString(message4.Body); // Since message was sent as Stream, no deserialization required here.
|
||||
///
|
||||
///
|
||||
/// </remarks>
|
||||
public static class MessageInteropExtensions
|
||||
{
|
||||
|
@ -100,7 +100,7 @@ namespace Microsoft.Azure.ServiceBus.InteropExtensions
|
|||
stream.Write(message.Body, 0, message.Body.Length);
|
||||
stream.Flush();
|
||||
stream.Position = 0;
|
||||
return (T)serializer.ReadObject(stream);
|
||||
return (T)serializer.ReadObject(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,11 +13,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <remarks>
|
||||
/// <para>
|
||||
/// A CorrelationFilter holds a set of conditions that are matched against one of more of an arriving message's user and system properties.
|
||||
/// A common use is a match against the <see cref="Message.CorrelationId"/> property, but the application can also choose to match against
|
||||
/// <see cref="Message.ContentType"/>, <see cref="Message.Label"/>, <see cref="Message.MessageId"/>, <see cref="Message.ReplyTo"/>,
|
||||
/// <see cref="Message.ReplyToSessionId"/>, <see cref="Message.SessionId"/>, <see cref="Message.To"/>, and any user-defined properties.
|
||||
/// A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions,
|
||||
/// the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition,
|
||||
/// A common use is a match against the <see cref="Message.CorrelationId"/> property, but the application can also choose to match against
|
||||
/// <see cref="Message.ContentType"/>, <see cref="Message.Label"/>, <see cref="Message.MessageId"/>, <see cref="Message.ReplyTo"/>,
|
||||
/// <see cref="Message.ReplyToSessionId"/>, <see cref="Message.SessionId"/>, <see cref="Message.To"/>, and any user-defined properties.
|
||||
/// A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions,
|
||||
/// the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition,
|
||||
/// meaning all conditions must match for the filter to match.
|
||||
/// </para>
|
||||
/// <para>
|
||||
|
@ -143,8 +143,8 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <value>The application specific properties of the message.</value>
|
||||
/// <remarks>
|
||||
/// Only following value types are supported:
|
||||
/// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal,
|
||||
/// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan, Stream, byte[],
|
||||
/// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal,
|
||||
/// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan, Stream, byte[],
|
||||
/// and IList / IDictionary of supported types
|
||||
/// </remarks>
|
||||
public IDictionary<string, object> Properties => this.properties ?? (this.properties = new PropertyDictionary());
|
||||
|
|
|
@ -14,7 +14,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Gets the name of the default rule on the subscription.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Whenever a new subscription is created, a default rule is always added.
|
||||
/// Whenever a new subscription is created, a default rule is always added.
|
||||
/// The default rule is a <see cref="TrueFilter"/> which will enable all messages in the topic to reach subscription.
|
||||
/// </remarks>
|
||||
public const string DefaultRuleName = "$Default";
|
||||
|
|
|
@ -11,10 +11,10 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Represents a filter which is a composition of an expression and an action that is executed in the pub/sub pipeline.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// A <see cref="SqlFilter"/> holds a SQL-like condition expression that is evaluated in the broker against the arriving messages'
|
||||
/// user-defined properties and system properties. All system properties (which are all properties explicitly listed
|
||||
/// on the <see cref="Message"/> class) must be prefixed with <code>sys.</code> in the condition expression. The SQL subset implements
|
||||
/// testing for existence of properties (EXISTS), testing for null-values (IS NULL), logical NOT/AND/OR, relational operators,
|
||||
/// A <see cref="SqlFilter"/> holds a SQL-like condition expression that is evaluated in the broker against the arriving messages'
|
||||
/// user-defined properties and system properties. All system properties (which are all properties explicitly listed
|
||||
/// on the <see cref="Message"/> class) must be prefixed with <code>sys.</code> in the condition expression. The SQL subset implements
|
||||
/// testing for existence of properties (EXISTS), testing for null-values (IS NULL), logical NOT/AND/OR, relational operators,
|
||||
/// numeric arithmetic, and simple text pattern matching with LIKE.
|
||||
/// </remarks>
|
||||
public class SqlFilter : Filter
|
||||
|
|
|
@ -32,7 +32,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <summary>
|
||||
/// Closes the Client. Closes the connections opened by it.
|
||||
/// </summary>
|
||||
/// <returns>The asynchronous operation</returns>
|
||||
Task CloseAsync();
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -16,11 +16,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Service Bus Sessions, also called 'Groups' in the AMQP 1.0 protocol, are unbounded sequences of related messages. ServiceBus guarantees ordering of messages in a session.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Any sender can create a session when submitting messages into a Topic or Queue by setting the <see cref="Message.SessionId"/> property on Message to some
|
||||
/// Any sender can create a session when submitting messages into a Topic or Queue by setting the <see cref="Message.SessionId"/> property on Message to some
|
||||
/// application defined unique identifier. At the AMQP 1.0 protocol level, this value maps to the group-id property.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Sessions come into existence when there is at least one message with the session's SessionId in the Queue or Topic subscription.
|
||||
/// Sessions come into existence when there is at least one message with the session's SessionId in the Queue or Topic subscription.
|
||||
/// Once a Session exists, there is no defined moment or gesture for when the session expires or disappears.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
|
@ -46,7 +46,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Set a custom state on the session which can be later retrieved using <see cref="GetStateAsync"/>
|
||||
/// </summary>
|
||||
/// <param name="sessionState">A byte array of session state</param>
|
||||
/// <returns>The asynchronous operation</returns>
|
||||
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
|
||||
Task SetStateAsync(byte[] sessionState);
|
||||
|
||||
|
@ -56,14 +55,13 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <remarks>
|
||||
/// <para>
|
||||
/// When you accept a session, the session is locked for this client instance by the service for a duration as specified during the Queue/Subscription creation.
|
||||
/// If processing of the session requires longer than this duration, the session-lock needs to be renewed. For each renewal, the session-lock is renewed by
|
||||
/// the entity's LockDuration.
|
||||
/// If processing of the session requires longer than this duration, the session-lock needs to be renewed. For each renewal, the session-lock is renewed by
|
||||
/// the entity's LockDuration.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Renewal of session renews all the messages in the session as well. Each individual message need not be renewed.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
Task RenewSessionLockAsync();
|
||||
}
|
||||
}
|
|
@ -20,13 +20,13 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// ReceiveMode.PeekLock,
|
||||
/// RetryExponential);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Send a message to the queue:
|
||||
/// <code>
|
||||
/// byte[] data = GetData();
|
||||
/// await queueClient.SendAsync(data);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Register a message handler which will be invoked every time a message is received.
|
||||
/// <code>
|
||||
/// queueClient.RegisterMessageHandler(
|
||||
|
@ -60,11 +60,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continously from the queue. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate.
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate.
|
||||
/// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler);
|
||||
|
||||
|
@ -72,7 +72,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continously from the queue. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate. </remarks>
|
||||
|
|
|
@ -7,11 +7,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.ServiceBus.Core;
|
||||
|
||||
/// <summary>
|
||||
/// Describes a Session client. A session client can be used to accept session objects which can be used to interact with all messages with the same sessionId.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// You can accept any session or a given session (identified by <see cref="IMessageSession.SessionId"/> using a session client.
|
||||
/// <summary>
|
||||
/// Describes a Session client. A session client can be used to accept session objects which can be used to interact with all messages with the same sessionId.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// You can accept any session or a given session (identified by <see cref="IMessageSession.SessionId"/> using a session client.
|
||||
/// Once you accept a session, you can use it as a <see cref="MessageReceiver"/> which receives only messages having the same session id.
|
||||
/// See <see cref="IMessageSession"/> for usage of session object.
|
||||
/// <example>
|
||||
|
@ -22,18 +22,18 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// queueName,
|
||||
/// ReceiveMode.PeekLock);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// To receive a session object for a given sessionId
|
||||
/// <code>
|
||||
/// IMessageSession session = await sessionClient.AcceptMessageSessionAsync(sessionId);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// To receive any session
|
||||
/// <code>
|
||||
/// IMessageSession session = await sessionClient.AcceptMessageSessionAsync();
|
||||
/// </code>
|
||||
/// </example>
|
||||
/// </remarks>
|
||||
/// </remarks>
|
||||
/// <seealso cref="IMessageSession"/>
|
||||
/// <seealso cref="SessionClient"/>
|
||||
public interface ISessionClient : IClientEntity
|
||||
|
|
|
@ -22,7 +22,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// ReceiveMode.PeekLock,
|
||||
/// RetryExponential);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Register a message handler which will be invoked every time a message is received.
|
||||
/// <code>
|
||||
/// subscriptionClient.RegisterMessageHandler(
|
||||
|
@ -100,11 +100,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continously from the subscription. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{T1,T2,T3,TResult}"/>) is awaited on every time a new message is received by the subscription client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate.
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate.
|
||||
/// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler);
|
||||
|
||||
|
@ -112,7 +112,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continously from the subscription. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the subscription client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate. </remarks>
|
||||
|
|
|
@ -16,7 +16,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// topicName,
|
||||
/// RetryExponential);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Send a message to the topic:
|
||||
/// <code>
|
||||
/// byte[] data = GetData();
|
||||
|
|
|
@ -141,12 +141,12 @@ namespace Microsoft.Azure.ServiceBus
|
|||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the message’s time to live value. This is the duration after which the message expires, starting from when the message is sent to the Service Bus.
|
||||
/// Messages older than their TimeToLive value will expire and no longer be retained in the message store. Expired messages cannot be received.
|
||||
/// TimeToLive is the maximum lifetime that a message can be received, but its value cannot exceed the entity specified value on the destination queue or subscription.
|
||||
/// If a lower TimeToLive value is specified, it will be applied to the individual message. However, a larger value specified on the message will be overridden by the
|
||||
/// Gets or sets the message’s time to live value. This is the duration after which the message expires, starting from when the message is sent to the Service Bus.
|
||||
/// Messages older than their TimeToLive value will expire and no longer be retained in the message store. Expired messages cannot be received.
|
||||
/// TimeToLive is the maximum lifetime that a message can be received, but its value cannot exceed the entity specified value on the destination queue or subscription.
|
||||
/// If a lower TimeToLive value is specified, it will be applied to the individual message. However, a larger value specified on the message will be overridden by the
|
||||
/// entity’s DefaultMessageTimeToLive value.
|
||||
/// </summary>
|
||||
/// </summary>
|
||||
/// <value>The message’s time to live value.</value>
|
||||
/// <remarks>If the TTL set on a message by the sender exceeds the destination's TTL, then the message's TTL will be overwritten by the later one.</remarks>
|
||||
public TimeSpan TimeToLive
|
||||
|
@ -182,18 +182,18 @@ namespace Microsoft.Azure.ServiceBus
|
|||
public string To { get; set; }
|
||||
|
||||
/// <summary>Gets or sets the type of the content.</summary>
|
||||
/// <value>The type of the content of the message body. This is a
|
||||
/// content type identifier utilized by the sender and receiver for application specific logic.</value>
|
||||
/// <value>The type of the content of the message body. This is a
|
||||
/// content type identifier utilized by the sender and receiver for application specific logic.</value>
|
||||
public string ContentType { get; set; }
|
||||
|
||||
/// <summary>Gets or sets the address of the queue to reply to.</summary>
|
||||
/// <value>The reply to queue address.</value>
|
||||
public string ReplyTo { get; set; }
|
||||
|
||||
/// <summary>Gets or sets the date and time in UTC at which the message will be enqueued. This
|
||||
/// property returns the time in UTC; when setting the property, the supplied DateTime value must also be in UTC.</summary>
|
||||
/// <value>The scheduled enqueue time in UTC. This value is for delayed message sending.
|
||||
/// It is utilized to delay messages sending to a specific time in the future.</value>
|
||||
/// <summary>Gets or sets the date and time in UTC at which the message will be enqueued. This
|
||||
/// property returns the time in UTC; when setting the property, the supplied DateTime value must also be in UTC.</summary>
|
||||
/// <value>The scheduled enqueue time in UTC. This value is for delayed message sending.
|
||||
/// It is utilized to delay messages sending to a specific time in the future.</value>
|
||||
/// <remarks> Message enquing time does not mean that the message will be sent at the same time. It will get enqueued, but the actual sending time
|
||||
/// depends on the queue's workload and its state.</remarks>
|
||||
public DateTime ScheduledEnqueueTimeUtc { get; set; }
|
||||
|
@ -213,8 +213,8 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Only following value types are supported:
|
||||
/// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal,
|
||||
/// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan, Stream, byte[],
|
||||
/// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal,
|
||||
/// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan, Stream, byte[],
|
||||
/// and IList / IDictionary of supported types
|
||||
/// </remarks>
|
||||
public IDictionary<string, object> UserProperties { get; internal set; }
|
||||
|
@ -386,7 +386,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
|
||||
/// <summary>Gets or sets the enqueued sequence number of the message.</summary>
|
||||
/// <value>The enqueued sequence number of the message.</value>
|
||||
/// <remarks>In scenarios of Topic-Subscription or ForwardTo, the message is initially enqueued on a different entity as compared to the
|
||||
/// <remarks>In scenarios of Topic-Subscription or ForwardTo, the message is initially enqueued on a different entity as compared to the
|
||||
/// entity from where the message is received. This returns the sequence number of the message in the initial entity.</remarks>
|
||||
public long EnqueuedSequenceNumber
|
||||
{
|
||||
|
|
|
@ -75,7 +75,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
}
|
||||
}
|
||||
|
||||
internal bool AutoRenewLock => this.MaxAutoRenewDuration > TimeSpan.Zero;
|
||||
internal bool AutoRenewLock => this.MaxAutoRenewDuration > TimeSpan.Zero;
|
||||
|
||||
internal TimeSpan ReceiveTimeOut { get; }
|
||||
|
||||
|
|
|
@ -18,10 +18,10 @@ namespace Microsoft.Azure.ServiceBus
|
|||
readonly CancellationToken pumpCancellationToken;
|
||||
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
|
||||
|
||||
public MessageReceivePump(IMessageReceiver messageReceiver,
|
||||
MessageHandlerOptions registerHandlerOptions,
|
||||
Func<Message, CancellationToken, Task> callback,
|
||||
string endpoint,
|
||||
public MessageReceivePump(IMessageReceiver messageReceiver,
|
||||
MessageHandlerOptions registerHandlerOptions,
|
||||
Func<Message, CancellationToken, Task> callback,
|
||||
string endpoint,
|
||||
CancellationToken pumpCancellationToken)
|
||||
{
|
||||
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
|
||||
|
@ -58,7 +58,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
try
|
||||
{
|
||||
await this.maxConcurrentCallsSemaphoreSlim.WaitAsync(this.pumpCancellationToken).ConfigureAwait(false);
|
||||
message = await this.messageReceiver.ReceiveAsync(this.registerHandlerOptions.ReceiveTimeOut).ConfigureAwait(false);
|
||||
message = await this.messageReceiver.ReceiveAsync(this.registerHandlerOptions.ReceiveTimeOut).ConfigureAwait(false);
|
||||
|
||||
if (message != null)
|
||||
{
|
||||
|
@ -109,11 +109,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
|
||||
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
|
||||
|
||||
|
||||
// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
|
||||
if (!(exception is MessageLockLostException))
|
||||
{
|
||||
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
|
||||
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// AbandonMessageIfNeededAsync should take care of not throwing exception
|
||||
|
|
|
@ -351,7 +351,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
}
|
||||
|
||||
// Unused - 31;32;33
|
||||
|
||||
|
||||
[NonEvent]
|
||||
public void AmqpSendLinkCreateStart(string clientId, MessagingEntityType? entityType, string entityPath)
|
||||
{
|
||||
|
@ -711,7 +711,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
if (this.IsEnabled())
|
||||
{
|
||||
this.WriteEvent(67, clientId, currentSemaphoreCount);
|
||||
this.WriteEvent(67, clientId, currentSemaphoreCount);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1011,7 +1011,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
if (this.IsEnabled())
|
||||
{
|
||||
this.WriteEvent(89, clientId, entityPath, sessionId);
|
||||
this.WriteEvent(89, clientId, entityPath, sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1050,7 +1050,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
if (this.IsEnabled())
|
||||
{
|
||||
this.AmqpConnectionCreated(hostName, connection.ToString(), connection.State.ToString());
|
||||
this.AmqpConnectionCreated(hostName, connection.ToString(), connection.State.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1095,7 +1095,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
if (this.IsEnabled())
|
||||
{
|
||||
this.WriteEvent(95, pluginName, messageId);
|
||||
this.WriteEvent(95, pluginName, messageId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1104,7 +1104,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
if (this.IsEnabled())
|
||||
{
|
||||
this.WriteEvent(96, pluginName, messageId);
|
||||
this.WriteEvent(96, pluginName, messageId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1120,7 +1120,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
[Event(97, Level = EventLevel.Error, Message = "Exception during {0} plugin execution. MessageId: {1}, Exception {2}")]
|
||||
void PluginCallFailed(string pluginName, string messageId, string exception)
|
||||
{
|
||||
this.WriteEvent(97, pluginName, messageId, exception);
|
||||
this.WriteEvent(97, pluginName, messageId, exception);
|
||||
}
|
||||
|
||||
[NonEvent]
|
||||
|
@ -1173,7 +1173,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
if (this.IsEnabled())
|
||||
{
|
||||
WriteEvent(101, oldClientId, newClientId);
|
||||
WriteEvent(101, oldClientId, newClientId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,16 +2,13 @@
|
|||
|
||||
<PropertyGroup>
|
||||
<Description>This is the next generation Azure Service Bus .NET Standard client library that focuses on queues & topics. For more information about Service Bus, see https://azure.microsoft.com/en-us/services/service-bus/</Description>
|
||||
<AssemblyTitle>Microsoft.Azure.ServiceBus</AssemblyTitle>
|
||||
<VersionPrefix>1.0.0</VersionPrefix>
|
||||
<Authors>Microsoft</Authors>
|
||||
<TargetFrameworks>net451;netstandard1.3;uap10.0</TargetFrameworks>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<AssemblyName>Microsoft.Azure.ServiceBus</AssemblyName>
|
||||
<AssemblyOriginatorKeyFile>../../build/keyfile.snk</AssemblyOriginatorKeyFile>
|
||||
<SignAssembly>true</SignAssembly>
|
||||
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
|
||||
<PackageId>Microsoft.Azure.ServiceBus</PackageId>
|
||||
<PackageTags>Azure;Service Bus;ServiceBus;.NET;AMQP;IoT;Queue;Topic</PackageTags>
|
||||
<PackageReleaseNotes>https://github.com/Azure/azure-service-bus-dotnet/releases</PackageReleaseNotes>
|
||||
<PackageIconUrl>https://raw.githubusercontent.com/Azure/azure-service-bus-dotnet/master/service-bus.png</PackageIconUrl>
|
||||
|
@ -25,8 +22,9 @@
|
|||
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
|
||||
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
|
||||
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
|
||||
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\Microsoft.Azure.ServiceBus.xml</DocumentationFile>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<LangVersion>7</LangVersion>
|
||||
<NoWarn>CS1591</NoWarn>
|
||||
</PropertyGroup>
|
||||
|
||||
<Target Name="IncludePDBsInPackage">
|
||||
|
|
|
@ -17,7 +17,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <param name="remainingTime">The remaining time before the timeout expires.</param>
|
||||
/// <param name="currentRetryCount">The number of attempts that have been processed.</param>
|
||||
/// <param name="retryInterval">The amount of time to delay before retry.</param>
|
||||
/// <returns></returns>
|
||||
protected override bool OnShouldRetry(
|
||||
TimeSpan remainingTime,
|
||||
int currentRetryCount,
|
||||
|
|
|
@ -40,8 +40,7 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
|
||||
static string GetAssemblyAttributeValue<T>(Assembly assembly, Func<T, string> getter) where T : Attribute
|
||||
{
|
||||
var attribute = assembly.GetCustomAttribute(typeof(T)) as T;
|
||||
return attribute == null ? null : getter(attribute);
|
||||
return !(assembly.GetCustomAttribute(typeof(T)) is T attribute) ? null : getter(attribute);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
get { return this.token; }
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
protected virtual string ExpiresOnFieldName
|
||||
{
|
||||
get
|
||||
|
@ -118,7 +117,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
protected virtual string AudienceFieldName
|
||||
{
|
||||
get
|
||||
|
@ -127,7 +125,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
protected virtual string KeyValueSeparator
|
||||
{
|
||||
get
|
||||
|
@ -136,7 +133,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
protected virtual string PairSeparator
|
||||
{
|
||||
get
|
||||
|
|
|
@ -43,12 +43,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
{
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="keyName"></param>
|
||||
/// <param name="sharedAccessKey"></param>
|
||||
/// <param name="customKeyEncoder"></param>
|
||||
/// <param name="tokenTimeToLive"></param>
|
||||
/// <param name="tokenScope"></param>
|
||||
protected SharedAccessSignatureTokenProvider(string keyName, string sharedAccessKey, Func<string, byte[]> customKeyEncoder, TimeSpan tokenTimeToLive, TokenScope tokenScope)
|
||||
: base(tokenScope)
|
||||
{
|
||||
|
@ -83,11 +77,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
TokenProvider.MessagingTokenProviderKeyEncoder(sharedAccessKey);
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="appliesTo"></param>
|
||||
/// <param name="action"></param>
|
||||
/// <param name="timeout"></param>
|
||||
/// <returns></returns>
|
||||
protected override Task<SecurityToken> OnGetTokenAsync(string appliesTo, string action, TimeSpan timeout)
|
||||
{
|
||||
string tokenString = this.BuildSignature(appliesTo);
|
||||
|
@ -95,9 +84,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
return Task.FromResult<SecurityToken>(securityToken);
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="targetUri"></param>
|
||||
/// <returns></returns>
|
||||
protected virtual string BuildSignature(string targetUri)
|
||||
{
|
||||
return string.IsNullOrWhiteSpace(this.sharedAccessSignature)
|
||||
|
|
|
@ -10,10 +10,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
|
||||
static class StringUtility
|
||||
{
|
||||
public static string GetRandomString()
|
||||
{
|
||||
return Guid.NewGuid().ToString().Substring(0, 6);
|
||||
}
|
||||
|
||||
public static string GetFormattedLockTokens(IEnumerable<string> lockTokens)
|
||||
{
|
||||
|
|
|
@ -16,14 +16,11 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
internal static readonly Func<string, byte[]> MessagingTokenProviderKeyEncoder = Encoding.UTF8.GetBytes;
|
||||
const TokenScope DefaultTokenScope = TokenScope.Entity;
|
||||
|
||||
/// <summary></summary>
|
||||
protected TokenProvider()
|
||||
: this(TokenProvider.DefaultTokenScope)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="tokenScope"></param>
|
||||
protected TokenProvider(TokenScope tokenScope)
|
||||
{
|
||||
this.TokenScope = tokenScope;
|
||||
|
@ -35,7 +32,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
/// </summary>
|
||||
public TokenScope TokenScope { get; }
|
||||
|
||||
/// <summary></summary>
|
||||
protected object ThisLock { get; }
|
||||
|
||||
/// <summary>
|
||||
|
@ -107,7 +103,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
/// <param name="appliesTo">The URI which the access token applies to</param>
|
||||
/// <param name="action">The request action</param>
|
||||
/// <param name="timeout">The time span that specifies the timeout value for the message that gets the security token</param>
|
||||
/// <returns></returns>
|
||||
public Task<SecurityToken> GetTokenAsync(string appliesTo, string action, TimeSpan timeout)
|
||||
{
|
||||
TimeoutHelper.ThrowIfNegativeArgument(timeout);
|
||||
|
@ -115,16 +110,8 @@ namespace Microsoft.Azure.ServiceBus.Primitives
|
|||
return this.OnGetTokenAsync(appliesTo, action, timeout);
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="appliesTo"></param>
|
||||
/// <param name="action"></param>
|
||||
/// <param name="timeout"></param>
|
||||
/// <returns></returns>
|
||||
protected abstract Task<SecurityToken> OnGetTokenAsync(string appliesTo, string action, TimeSpan timeout);
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="appliesTo"></param>
|
||||
/// <returns></returns>
|
||||
protected virtual string NormalizeAppliesTo(string appliesTo)
|
||||
{
|
||||
return ServiceBusUriHelper.NormalizeUri(appliesTo, "http", true, stripPath: this.TokenScope == TokenScope.Namespace, ensureTrailingSlash: true);
|
||||
|
|
|
@ -23,13 +23,13 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// ReceiveMode.PeekLock,
|
||||
/// RetryExponential);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Send a message to the queue:
|
||||
/// <code>
|
||||
/// byte[] data = GetData();
|
||||
/// await queueClient.SendAsync(data);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Register a message handler which will be invoked every time a message is received.
|
||||
/// <code>
|
||||
/// queueClient.RegisterMessageHandler(
|
||||
|
@ -145,16 +145,16 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Setting the value to zero turns prefetch off.
|
||||
/// Defaults to 0.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. The message pump will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. The message pump will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// </para>
|
||||
/// <para>Updates to this value take effect on the next receive call to the service.</para>
|
||||
/// </remarks>
|
||||
|
@ -319,11 +319,10 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to complete.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task CompleteAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -334,13 +333,12 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
|
||||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// Abandoning a message will increase the delivery count on the message.
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// </remarks>
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task AbandonAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -352,13 +350,12 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver"/> or <see cref="IQueueClient"/>, with the corresponding path.
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver"/> or <see cref="IQueueClient"/>, with the corresponding path.
|
||||
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task DeadLetterAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -372,7 +369,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks>Enable prefetch to speed up the receive rate.
|
||||
/// <remarks>Enable prefetch to speed up the receive rate.
|
||||
/// Use <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, MessageHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
|
||||
{
|
||||
|
@ -396,11 +393,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate.
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate.
|
||||
/// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
|
||||
{
|
||||
|
@ -412,7 +409,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continously from the queue. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
|
||||
/// <remarks> Enable prefetch to speed up the receive rate. </remarks>
|
||||
|
@ -467,8 +464,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
this.InnerReceiver.UnregisterPlugin(serviceBusPluginName);
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns></returns>
|
||||
protected override async Task OnClosingAsync()
|
||||
{
|
||||
if (this.innerSender != null)
|
||||
|
|
|
@ -77,7 +77,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <param name="remainingTime">The remaining time before the timeout expires.</param>
|
||||
/// <param name="currentRetryCount">The number of attempts that have been processed.</param>
|
||||
/// <param name="retryInterval">The amount of time to delay before retry.</param>
|
||||
/// <returns></returns>
|
||||
protected override bool OnShouldRetry(TimeSpan remainingTime, int currentRetryCount, out TimeSpan retryInterval)
|
||||
{
|
||||
if (currentRetryCount > this.MaxRetryCount)
|
||||
|
|
|
@ -10,7 +10,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
using Primitives;
|
||||
|
||||
/// <summary>
|
||||
/// Represents an abstraction for retrying messaging operations. Users should not
|
||||
/// Represents an abstraction for retrying messaging operations. Users should not
|
||||
/// implement this class, and instead should use one of the provided implementations.
|
||||
/// </summary>
|
||||
public abstract class RetryPolicy
|
||||
|
@ -27,7 +27,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
// This is a volatile copy of IsServerBusy. IsServerBusy is synchronized with a lock, whereas encounteredServerBusy is kept volatile for performance reasons.
|
||||
volatile bool encounteredServerBusy;
|
||||
|
||||
/// <summary></summary>
|
||||
protected RetryPolicy()
|
||||
{
|
||||
this.serverBusyResetTimer = new Timer(OnTimerCallback, this, TimeSpan.FromMilliseconds(-1), TimeSpan.FromMilliseconds(-1));
|
||||
|
@ -111,7 +110,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <summary>
|
||||
/// Determines whether or not the exception can be retried.
|
||||
/// </summary>
|
||||
/// <param name="exception"></param>
|
||||
/// <returns>A bool indicating whether or not the operation can be retried.</returns>
|
||||
public virtual bool IsRetryableException(Exception exception)
|
||||
{
|
||||
|
@ -187,11 +185,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="remainingTime"></param>
|
||||
/// <param name="currentRetryCount"></param>
|
||||
/// <param name="retryInterval"></param>
|
||||
/// <returns></returns>
|
||||
protected abstract bool OnShouldRetry(TimeSpan remainingTime, int currentRetryCount, out TimeSpan retryInterval);
|
||||
|
||||
static void OnTimerCallback(object state)
|
||||
|
|
|
@ -10,16 +10,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
public class ServiceBusCommunicationException : ServiceBusException
|
||||
{
|
||||
/// <summary></summary>
|
||||
/// <param name="message"></param>
|
||||
protected internal ServiceBusCommunicationException(string message)
|
||||
: this(message, null)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <param name="message"></param>
|
||||
/// <param name="innerException"></param>
|
||||
protected internal ServiceBusCommunicationException(string message, Exception innerException)
|
||||
: base(true, message, innerException)
|
||||
{
|
||||
|
|
|
@ -12,11 +12,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
using Core;
|
||||
using Primitives;
|
||||
|
||||
/// <summary>
|
||||
/// A session client can be used to accept session objects which can be used to interact with all messages with the same sessionId.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// You can accept any session or a given session (identified by <see cref="IMessageSession.SessionId"/> using a session client.
|
||||
/// <summary>
|
||||
/// A session client can be used to accept session objects which can be used to interact with all messages with the same sessionId.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// You can accept any session or a given session (identified by <see cref="IMessageSession.SessionId"/> using a session client.
|
||||
/// Once you accept a session, you can use it as a <see cref="MessageReceiver"/> which receives only messages having the same session id.
|
||||
/// See <see cref="IMessageSession"/> for usage of session object.
|
||||
/// This uses AMQP protocol to communicate with the service.
|
||||
|
@ -29,12 +29,12 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// queueName,
|
||||
/// ReceiveMode.PeekLock);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// To receive a session object for a given sessionId
|
||||
/// <code>
|
||||
/// IMessageSession session = await sessionClient.AcceptMessageSessionAsync(sessionId);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// To receive any session
|
||||
/// <code>
|
||||
/// IMessageSession session = await sessionClient.AcceptMessageSessionAsync();
|
||||
|
@ -61,7 +61,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
RetryPolicy retryPolicy = null,
|
||||
int prefetchCount = DefaultPrefetchCount)
|
||||
: this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, receiveMode, retryPolicy, prefetchCount)
|
||||
{
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -130,7 +130,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
foreach (var serviceBusPlugin in registeredPlugins)
|
||||
{
|
||||
this.RegisterPlugin(serviceBusPlugin);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -247,7 +247,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
await session.CloseAsync().ConfigureAwait(false);
|
||||
throw AmqpExceptionHelper.GetClientException(exception);
|
||||
}
|
||||
|
||||
|
||||
MessagingEventSource.Log.AmqpSessionClientAcceptMessageSessionStop(
|
||||
this.ClientId,
|
||||
this.EntityPath,
|
||||
|
@ -259,7 +259,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
{
|
||||
session.RegisterPlugin(serviceBusPlugin);
|
||||
}
|
||||
|
||||
|
||||
return session;
|
||||
}
|
||||
|
||||
|
@ -305,8 +305,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
protected override async Task OnClosingAsync()
|
||||
{
|
||||
if (this.ownsConnection)
|
||||
|
|
|
@ -20,12 +20,12 @@ namespace Microsoft.Azure.ServiceBus
|
|||
readonly SemaphoreSlim maxConcurrentSessionsSemaphoreSlim;
|
||||
readonly SemaphoreSlim maxPendingAcceptSessionsSemaphoreSlim;
|
||||
|
||||
public SessionReceivePump(string clientId,
|
||||
ISessionClient client,
|
||||
ReceiveMode receiveMode,
|
||||
SessionHandlerOptions sessionHandlerOptions,
|
||||
Func<IMessageSession, Message, CancellationToken, Task> callback,
|
||||
string endpoint,
|
||||
public SessionReceivePump(string clientId,
|
||||
ISessionClient client,
|
||||
ReceiveMode receiveMode,
|
||||
SessionHandlerOptions sessionHandlerOptions,
|
||||
Func<IMessageSession, Message, CancellationToken, Task> callback,
|
||||
string endpoint,
|
||||
CancellationToken token)
|
||||
{
|
||||
this.client = client ?? throw new ArgumentException(nameof(client));
|
||||
|
@ -89,7 +89,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Complete).ConfigureAwait(false);
|
||||
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Complete).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,7 +221,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
callbackExceptionOccured = true;
|
||||
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
|
||||
{
|
||||
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
|
||||
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
|
|
@ -25,7 +25,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// ReceiveMode.PeekLock,
|
||||
/// RetryExponential);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Register a message handler which will be invoked every time a message is received.
|
||||
/// <code>
|
||||
/// subscriptionClient.RegisterMessageHandler(
|
||||
|
@ -153,16 +153,16 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Setting the value to zero turns prefetch off.
|
||||
/// Defaults to 0.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. The message pump will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than what the application
|
||||
/// immediately asks for. The message pump will therefore acquire a message for immediate consumption
|
||||
/// that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is
|
||||
/// replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the
|
||||
/// buffer and then wait or block as expected.
|
||||
/// </para>
|
||||
/// <para>Updates to this value take effect on the next receive call to the service.</para>
|
||||
/// </remarks>
|
||||
|
@ -271,11 +271,10 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to complete.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task CompleteAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -286,12 +285,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
|
||||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// Abandoning a message will increase the delivery count on the message.
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task AbandonAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -303,13 +301,12 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// </summary>
|
||||
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
|
||||
/// <remarks>
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver"/> or <see cref="IQueueClient"/>, with the corresponding path.
|
||||
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
|
||||
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
|
||||
/// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver"/> or <see cref="IQueueClient"/>, with the corresponding path.
|
||||
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
|
||||
/// This operation can only be performed on messages that were received by this client.
|
||||
/// </remarks>
|
||||
/// <returns>The asynchronous operation.</returns>
|
||||
public Task DeadLetterAsync(string lockToken)
|
||||
{
|
||||
this.ThrowIfClosed();
|
||||
|
@ -323,7 +320,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks>Enable prefetch to speed up the receive rate.
|
||||
/// <remarks>Enable prefetch to speed up the receive rate.
|
||||
/// Use <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, MessageHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
|
||||
{
|
||||
|
@ -348,11 +345,11 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the subscription client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
|
||||
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
|
||||
/// <remarks> Enable prefetch to speed up the receive rate.
|
||||
/// <remarks> Enable prefetch to speed up the receive rate.
|
||||
/// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOptions)"/> to configure the settings of the pump.</remarks>
|
||||
public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
|
||||
{
|
||||
|
@ -364,7 +361,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// Receive session messages continously from the queue. Registers a message handler and begins a new thread to receive session-messages.
|
||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the subscription client.
|
||||
/// </summary>
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes messages.
|
||||
/// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the <see cref="Message"/></param>
|
||||
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
|
||||
/// <remarks> Enable prefetch to speeden up the receive rate. </remarks>
|
||||
|
@ -383,7 +380,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// <remarks>
|
||||
/// You can add rules to the subscription that decides which messages from the topic should reach the subscription.
|
||||
/// A default <see cref="TrueFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added while creation of the Subscription.
|
||||
/// You can add multiple rules with distinct names to the same subscription.
|
||||
/// You can add multiple rules with distinct names to the same subscription.
|
||||
/// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the message is passed on to the subscription.
|
||||
/// Max allowed length of rule name is 50 chars.
|
||||
/// </remarks>
|
||||
|
@ -509,8 +506,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
this.InnerSubscriptionClient.InnerReceiver.UnregisterPlugin(serviceBusPluginName);
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns></returns>
|
||||
protected override async Task OnClosingAsync()
|
||||
{
|
||||
if (this.innerSubscriptionClient != null)
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace Microsoft.Azure.ServiceBus
|
|||
/// topicName,
|
||||
/// RetryExponential);
|
||||
/// </code>
|
||||
///
|
||||
///
|
||||
/// Send a message to the topic:
|
||||
/// <code>
|
||||
/// byte[] data = GetData();
|
||||
|
@ -203,8 +203,6 @@ namespace Microsoft.Azure.ServiceBus
|
|||
this.InnerSender.UnregisterPlugin(serviceBusPluginName);
|
||||
}
|
||||
|
||||
/// <summary></summary>
|
||||
/// <returns></returns>
|
||||
protected override async Task OnClosingAsync()
|
||||
{
|
||||
if (this.innerSender != null)
|
||||
|
|
|
@ -82,7 +82,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
var messageId = "test-message1";
|
||||
var sessionId = Guid.NewGuid().ToString();
|
||||
await sender.SendAsync(new Message() { MessageId = messageId, SessionId = sessionId });
|
||||
await sender.SendAsync(new Message { MessageId = messageId, SessionId = sessionId });
|
||||
TestUtility.Log($"Sent Message: {messageId} to Session: {sessionId}");
|
||||
|
||||
var sessionReceiver = await sessionClient.AcceptMessageSessionAsync(sessionId);
|
||||
|
@ -147,7 +147,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
var messageId = "test-message1";
|
||||
var sessionId = Guid.NewGuid().ToString();
|
||||
await sender.SendAsync(new Message() { MessageId = messageId, SessionId = sessionId });
|
||||
await sender.SendAsync(new Message { MessageId = messageId, SessionId = sessionId });
|
||||
TestUtility.Log($"Sent Message: {messageId} to Session: {sessionId}");
|
||||
|
||||
sessionReceiver = await sessionClient.AcceptMessageSessionAsync(sessionId);
|
||||
|
|
|
@ -38,7 +38,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop
|
|||
{
|
||||
// Send Plain string
|
||||
string message1Body = "contosoString";
|
||||
var message1 = new BrokeredMessage(message1Body);
|
||||
var message1 = new BrokeredMessage(message1Body);
|
||||
await fullFrameWorkClientSender.SendAsync(message1);
|
||||
|
||||
// Receive Plain string
|
||||
|
|
|
@ -1,20 +1,14 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<AssemblyTitle>Microsoft.Azure.ServiceBus.UnitTests</AssemblyTitle>
|
||||
<TargetFrameworks>netcoreapp1.0;net46</TargetFrameworks>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<AssemblyName>Microsoft.Azure.ServiceBus.UnitTests</AssemblyName>
|
||||
<AssemblyOriginatorKeyFile>../../build/keyfile.snk</AssemblyOriginatorKeyFile>
|
||||
<SignAssembly>true</SignAssembly>
|
||||
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
|
||||
<PackageId>Microsoft.Azure.ServiceBus.UnitTests</PackageId>
|
||||
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
|
||||
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net46+win8</PackageTargetFallback>
|
||||
<RuntimeFrameworkVersion>1.0.4</RuntimeFrameworkVersion>
|
||||
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
|
||||
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
|
||||
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
@ -60,4 +54,4 @@
|
|||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
</Project>
|
|
@ -22,25 +22,25 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnMessagePeekLockWithAutoCompleteTrue(string queueName, int maxConcurrentCalls)
|
||||
Task OnMessagePeekLockWithAutoCompleteTrue(string queueName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnMessageTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
return this.OnMessageTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnMessagePeekLockWithAutoCompleteFalse(string queueName, int maxConcurrentCalls)
|
||||
Task OnMessagePeekLockWithAutoCompleteFalse(string queueName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnMessageTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, false);
|
||||
return this.OnMessageTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, false);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnMessageReceiveDelete(string queueName, int maxConcurrentCalls)
|
||||
Task OnMessageReceiveDelete(string queueName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnMessageTestAsync(queueName, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
|
||||
return this.OnMessageTestAsync(queueName, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
|
@ -99,7 +99,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
finally
|
||||
{
|
||||
await queueClient.CloseAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async Task OnMessageTestAsync(string queueName, int maxConcurrentCalls, ReceiveMode mode, bool autoComplete)
|
||||
|
|
|
@ -20,17 +20,17 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnMessagePeekLockWithAutoCompleteTrue(string topicName, int maxConcurrentCalls)
|
||||
Task OnMessagePeekLockWithAutoCompleteTrue(string topicName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnMessageTestAsync(topicName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
return this.OnMessageTestAsync(topicName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnMessageReceiveDelete(string topicName, int maxConcurrentCalls)
|
||||
Task OnMessageReceiveDelete(string topicName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnMessageTestAsync(topicName, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
|
||||
return this.OnMessageTestAsync(topicName, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
|
||||
}
|
||||
|
||||
async Task OnMessageTestAsync(string topicName, int maxConcurrentCalls, ReceiveMode mode, bool autoComplete)
|
||||
|
|
|
@ -28,25 +28,25 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnSessionPeekLockWithAutoCompleteTrue(string queueName, int maxConcurrentCalls)
|
||||
Task OnSessionPeekLockWithAutoCompleteTrue(string queueName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnSessionTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
return this.OnSessionTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnSessionPeekLockWithAutoCompleteFalse(string queueName, int maxConcurrentCalls)
|
||||
Task OnSessionPeekLockWithAutoCompleteFalse(string queueName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnSessionTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, false);
|
||||
return this.OnSessionTestAsync(queueName, maxConcurrentCalls, ReceiveMode.PeekLock, false);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(PartitionedNonPartitionedTestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnSessionReceiveDelete(string queueName, int maxConcurrentCalls)
|
||||
Task OnSessionReceiveDelete(string queueName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnSessionTestAsync(queueName, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
|
||||
return this.OnSessionTestAsync(queueName, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
|
@ -24,17 +24,17 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnSessionPeekLockWithAutoCompleteTrue(string topicName, int maxConcurrentCalls)
|
||||
Task OnSessionPeekLockWithAutoCompleteTrue(string topicName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnSessionTestAsync(topicName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
return this.OnSessionTestAsync(topicName, maxConcurrentCalls, ReceiveMode.PeekLock, true);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
async Task OnSessionPeekLockWithAutoCompleteFalse(string topicName, int maxConcurrentCalls)
|
||||
Task OnSessionPeekLockWithAutoCompleteFalse(string topicName, int maxConcurrentCalls)
|
||||
{
|
||||
await this.OnSessionTestAsync(topicName, maxConcurrentCalls, ReceiveMode.PeekLock, false);
|
||||
return this.OnSessionTestAsync(topicName, maxConcurrentCalls, ReceiveMode.PeekLock, false);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
[Fact]
|
||||
[DisplayTestMethodName]
|
||||
async Task Registering_plugin_multiple_times_should_throw()
|
||||
Task Registering_plugin_multiple_times_should_throw()
|
||||
{
|
||||
var messageReceiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.NonPartitionedQueueName, ReceiveMode.ReceiveAndDelete);
|
||||
var firstPlugin = new FirstSendPlugin();
|
||||
|
@ -23,19 +23,19 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
|
||||
messageReceiver.RegisterPlugin(firstPlugin);
|
||||
Assert.Throws<ArgumentException>(() => messageReceiver.RegisterPlugin(secondPlugin));
|
||||
await messageReceiver.CloseAsync();
|
||||
return messageReceiver.CloseAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[DisplayTestMethodName]
|
||||
async Task Unregistering_plugin_should_complete_with_plugin_set()
|
||||
Task Unregistering_plugin_should_complete_with_plugin_set()
|
||||
{
|
||||
var messageReceiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.NonPartitionedQueueName, ReceiveMode.ReceiveAndDelete);
|
||||
var firstPlugin = new FirstSendPlugin();
|
||||
|
||||
messageReceiver.RegisterPlugin(firstPlugin);
|
||||
messageReceiver.UnregisterPlugin(firstPlugin.Name);
|
||||
await messageReceiver.CloseAsync();
|
||||
return messageReceiver.CloseAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
@ -121,7 +121,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
try
|
||||
{
|
||||
var plugin = new ExceptionPlugin();
|
||||
|
||||
|
||||
messageSender.RegisterPlugin(plugin);
|
||||
|
||||
var sendMessage = new Message(Encoding.UTF8.GetBytes("Test message"));
|
||||
|
|
|
@ -1,24 +0,0 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using System.Reflection;
|
||||
using System.Runtime.InteropServices;
|
||||
using Xunit;
|
||||
|
||||
// General Information about an assembly is controlled through the following
|
||||
// set of attributes. Change these attribute values to modify the information
|
||||
// associated with an assembly.
|
||||
[assembly: AssemblyConfiguration("")]
|
||||
[assembly: AssemblyCompany("Microsoft")]
|
||||
[assembly: AssemblyProduct("Microsoft.Azure.ServiceBus.UnitTests")]
|
||||
[assembly: AssemblyTrademark("")]
|
||||
|
||||
// Setting ComVisible to false makes the types in this assembly not visible
|
||||
// to COM components. If you need to access a type in this assembly from
|
||||
// COM, set the ComVisible attribute to true on that type.
|
||||
[assembly: ComVisible(false)]
|
||||
|
||||
// The following GUID is for the ID of the typelib if this project is exposed to COM
|
||||
[assembly: Guid("bda98e20-3a26-4a5a-9371-cb02b73e3434")]
|
||||
|
||||
[assembly: CollectionBehavior(DisableTestParallelization = true)]
|
|
@ -30,12 +30,12 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
var messageId1 = "test-message1";
|
||||
var sessionId1 = "sessionId1";
|
||||
await sender.SendAsync(new Message() { MessageId = messageId1, SessionId = sessionId1 }).ConfigureAwait(false);
|
||||
await sender.SendAsync(new Message { MessageId = messageId1, SessionId = sessionId1 }).ConfigureAwait(false);
|
||||
TestUtility.Log($"Sent Message: {messageId1} to Session: {sessionId1}");
|
||||
|
||||
var messageId2 = "test-message2";
|
||||
var sessionId2 = "sessionId2";
|
||||
await sender.SendAsync(new Message() { MessageId = messageId2, SessionId = sessionId2 }).ConfigureAwait(false);
|
||||
await sender.SendAsync(new Message { MessageId = messageId2, SessionId = sessionId2 }).ConfigureAwait(false);
|
||||
TestUtility.Log($"Sent Message: {messageId2} to Session: {sessionId2}");
|
||||
|
||||
// Receive Message, Complete and Close with SessionId - sessionId 1
|
||||
|
@ -47,7 +47,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
// Receive Message, Complete and Close - With Null SessionId specified
|
||||
var messageId3 = "test-message3";
|
||||
var sessionId3 = "sessionId3";
|
||||
await sender.SendAsync(new Message() { MessageId = messageId3, SessionId = sessionId3 }).ConfigureAwait(false);
|
||||
await sender.SendAsync(new Message { MessageId = messageId3, SessionId = sessionId3 }).ConfigureAwait(false);
|
||||
|
||||
await this.AcceptAndCompleteSessionsAsync(sessionClient, null, messageId3).ConfigureAwait(false);
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(TestPermutations))]
|
||||
[DisplayTestMethodName]
|
||||
|
@ -71,7 +71,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
var messageId = "test-message1";
|
||||
var sessionId = Guid.NewGuid().ToString();
|
||||
await sender.SendAsync(new Message()
|
||||
await sender.SendAsync(new Message
|
||||
{
|
||||
MessageId = messageId,
|
||||
SessionId = sessionId
|
||||
|
@ -129,7 +129,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
var messageId = "test-message1";
|
||||
var sessionId = Guid.NewGuid().ToString();
|
||||
await sender.SendAsync(new Message() { MessageId = messageId, SessionId = sessionId });
|
||||
await sender.SendAsync(new Message { MessageId = messageId, SessionId = sessionId });
|
||||
TestUtility.Log($"Sent Message: {messageId} to Session: {sessionId}");
|
||||
|
||||
var sessionReceiver = await sessionClient.AcceptMessageSessionAsync(sessionId);
|
||||
|
@ -142,7 +142,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
TestUtility.Log("Sleeping 10 seconds...");
|
||||
await Task.Delay(TimeSpan.FromSeconds(10));
|
||||
|
||||
// For session it looks like when the session is received, sometimes the session LockedUntil UTC
|
||||
// For session it looks like when the session is received, sometimes the session LockedUntil UTC
|
||||
// is turning out slightly more than the Default Lock Duration(lock is for 1 minute, but the session was locked
|
||||
// for 1 min and 2 seconds. We will need to look at if this is an issue on service or some kind of time SKU.
|
||||
// Temporarily changing this test to look at the renew request time instead.
|
||||
|
@ -182,12 +182,12 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
var messageId1 = "test-message1";
|
||||
var sessionId1 = "sessionId1";
|
||||
await sender.SendAsync(new Message() { MessageId = messageId1, SessionId = sessionId1 });
|
||||
await sender.SendAsync(new Message { MessageId = messageId1, SessionId = sessionId1 });
|
||||
TestUtility.Log($"Sent Message: {messageId1} to Session: {sessionId1}");
|
||||
|
||||
var messageId2 = "test-message2";
|
||||
var sessionId2 = "sessionId2";
|
||||
await sender.SendAsync(new Message() { MessageId = messageId2, SessionId = sessionId2 });
|
||||
await sender.SendAsync(new Message { MessageId = messageId2, SessionId = sessionId2 });
|
||||
TestUtility.Log($"Sent Message: {messageId2} to Session: {sessionId2}");
|
||||
|
||||
// Peek Message, Receive and Delete with SessionId - sessionId 1
|
||||
|
|
|
@ -127,7 +127,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
TestUtility.Log("Sleeping 10 seconds...");
|
||||
await Task.Delay(TimeSpan.FromSeconds(10));
|
||||
|
||||
|
||||
|
||||
await messageReceiver.RenewLockAsync(message);
|
||||
TestUtility.Log($"After First Renewal: {message.SystemProperties.LockedUntilUtc}");
|
||||
Assert.True(message.SystemProperties.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10));
|
||||
|
@ -284,7 +284,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
{
|
||||
int count = 0;
|
||||
messageReceiver.RegisterMessageHandler(
|
||||
async (message, token) =>
|
||||
async (message, token) =>
|
||||
{
|
||||
TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}");
|
||||
Interlocked.Increment(ref count);
|
||||
|
|
|
@ -128,7 +128,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
var messages = await subscriptionClient.InnerSubscriptionClient.InnerReceiver.ReceiveAsync(maxMessageCount: 2);
|
||||
Assert.NotNull(messages);
|
||||
Assert.True(messages.Count == 1);
|
||||
Assert.True(messageId2.Equals(messages.First().MessageId));
|
||||
Assert.True(messageId2.Equals(messages.First().MessageId));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -230,9 +230,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
|
|||
Assert.Equal(RuleDescription.DefaultRuleName, firstRule.Name);
|
||||
Assert.IsType<SqlFilter>(firstRule.Filter);
|
||||
Assert.Null(firstRule.Action);
|
||||
|
||||
|
||||
await subscriptionClient.AddRuleAsync(sqlRuleName, new SqlFilter("price > 10"));
|
||||
|
||||
|
||||
RuleDescription ruleDescription = new RuleDescription(correlationRuleName)
|
||||
{
|
||||
Filter = new CorrelationFilter
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
[assembly: Xunit.CollectionBehavior(DisableTestParallelization = true)]
|
Загрузка…
Ссылка в новой задаче