This commit is contained in:
clemensv 2017-10-09 09:05:17 +02:00
Родитель 963e1740fe 468d02fd16
Коммит 22347ec7bf
13 изменённых файлов: 393 добавлений и 123 удалений

Просмотреть файл

@ -444,7 +444,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
return action;
}
static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObject)
internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObject)
{
amqpObject = null;
if (netObject == null)

Просмотреть файл

@ -56,6 +56,9 @@ namespace Microsoft.Azure.ServiceBus.Amqp
public static readonly MapKey Message = new MapKey("message");
public static readonly MapKey Messages = new MapKey("messages");
public static readonly MapKey DispositionStatus = new MapKey("disposition-status");
public static readonly MapKey PropertiesToModify = new MapKey("properties-to-modify");
public static readonly MapKey DeadLetterReason = new MapKey("deadletter-reason");
public static readonly MapKey DeadLetterDescription = new MapKey("deadletter-description");
public static readonly MapKey FromSequenceNumber = new MapKey("from-sequence-number");
public static readonly MapKey MessageCount = new MapKey("message-count");

Просмотреть файл

@ -23,6 +23,8 @@ namespace Microsoft.Azure.ServiceBus
public const int DefaultClientPrefetchCount = 0;
public const int MaxDeadLetterReasonLength = 4096;
public static readonly long DefaultLastPeekedSequenceNumber = 0;
public static readonly TimeSpan DefaultOperationTimeout = TimeSpan.FromMinutes(1);

Просмотреть файл

@ -111,6 +111,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
/// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
/// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
@ -119,7 +120,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
Task DeferAsync(string lockToken);
Task DeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null);
/// <summary>
/// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.

Просмотреть файл

@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus.Core
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@ -82,17 +83,19 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// Abandoning a message will increase the delivery count on the message.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
Task AbandonAsync(string lockToken);
Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null);
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
@ -100,6 +103,21 @@ namespace Microsoft.Azure.ServiceBus.Core
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
Task DeadLetterAsync(string lockToken);
Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null);
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="deadLetterReason">The reason for deadlettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
}
}

Просмотреть файл

@ -121,7 +121,6 @@ namespace Microsoft.Azure.ServiceBus.Core
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.ReceiveMode = receiveMode;
this.OperationTimeout = serviceBusConnection.OperationTimeout;
this.Path = entityPath;
this.EntityType = entityType;
this.CbsTokenProvider = cbsTokenProvider;
@ -445,12 +444,13 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// Abandoning a message will increase the delivery count on the message.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public async Task AbandonAsync(string lockToken)
public async Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
this.ThrowIfClosed();
this.ThrowIfNotPeekLockMode();
@ -458,7 +458,7 @@ namespace Microsoft.Azure.ServiceBus.Core
MessagingEventSource.Log.MessageAbandonStart(this.ClientId, 1, lockToken);
try
{
await this.RetryPolicy.RunOperation(() => this.OnAbandonAsync(lockToken), this.OperationTimeout)
await this.RetryPolicy.RunOperation(() => this.OnAbandonAsync(lockToken, propertiesToModify), this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
@ -472,6 +472,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
/// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
/// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
@ -480,7 +481,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public async Task DeferAsync(string lockToken)
public async Task DeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
this.ThrowIfClosed();
this.ThrowIfNotPeekLockMode();
@ -489,7 +490,7 @@ namespace Microsoft.Azure.ServiceBus.Core
try
{
await this.RetryPolicy.RunOperation(() => this.OnDeferAsync(lockToken), this.OperationTimeout)
await this.RetryPolicy.RunOperation(() => this.OnDeferAsync(lockToken, propertiesToModify), this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
@ -505,6 +506,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
@ -512,7 +514,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public async Task DeadLetterAsync(string lockToken)
public async Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
this.ThrowIfClosed();
this.ThrowIfNotPeekLockMode();
@ -521,7 +523,41 @@ namespace Microsoft.Azure.ServiceBus.Core
try
{
await this.RetryPolicy.RunOperation(() => this.OnDeadLetterAsync(lockToken), this.OperationTimeout)
await this.RetryPolicy.RunOperation(() => this.OnDeadLetterAsync(lockToken, propertiesToModify), this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.MessageDeadLetterException(this.ClientId, exception);
throw;
}
MessagingEventSource.Log.MessageDeadLetterStop(this.ClientId);
}
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="deadLetterReason">The reason for deadlettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public async Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)
{
this.ThrowIfClosed();
this.ThrowIfNotPeekLockMode();
MessagingEventSource.Log.MessageDeadLetterStart(this.ClientId, 1, lockToken);
try
{
await this.RetryPolicy.RunOperation(() => this.OnDeadLetterAsync(lockToken, null, deadLetterReason, deadLetterErrorDescription), this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
@ -966,39 +1002,50 @@ namespace Microsoft.Azure.ServiceBus.Core
return this.DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome);
}
protected virtual Task OnAbandonAsync(string lockToken)
protected virtual Task OnAbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
var lockTokens = new[] { new Guid(lockToken) };
if (lockTokens.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
{
return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Abandoned);
return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Abandoned, propertiesToModify);
}
return this.DisposeMessagesAsync(lockTokens, new Modified());
return this.DisposeMessagesAsync(lockTokens, GetAbandonOutcome(propertiesToModify));
}
protected virtual Task OnDeferAsync(string lockToken)
protected virtual Task OnDeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
var lockTokens = new[] { new Guid(lockToken) };
if (lockTokens.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
{
return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Defered);
return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Defered, propertiesToModify);
}
return this.DisposeMessagesAsync(lockTokens, new Modified { UndeliverableHere = true });
return this.DisposeMessagesAsync(lockTokens, GetDeferOutcome(propertiesToModify));
}
protected virtual Task OnDeadLetterAsync(string lockToken)
protected virtual Task OnDeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null, string deadLetterReason = null, string deadLetterErrorDescription = null)
{
if (deadLetterReason != null && deadLetterReason.Length > Constants.MaxDeadLetterReasonLength)
{
throw new ArgumentOutOfRangeException(nameof(deadLetterReason), $"Maximum permitted length is {Constants.MaxDeadLetterReasonLength}");
}
if (deadLetterErrorDescription != null && deadLetterErrorDescription.Length > Constants.MaxDeadLetterReasonLength)
{
throw new ArgumentOutOfRangeException(nameof(deadLetterErrorDescription), $"Maximum permitted length is {Constants.MaxDeadLetterReasonLength}");
}
var lockTokens = new[] { new Guid(lockToken) };
if (lockTokens.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
{
return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Suspended);
return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Suspended, propertiesToModify, deadLetterReason, deadLetterErrorDescription);
}
return this.DisposeMessagesAsync(lockTokens, AmqpConstants.RejectedOutcome);
return this.DisposeMessagesAsync(lockTokens, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription));
}
protected virtual async Task<DateTime> OnRenewLockAsync(string lockToken)
{
var lockedUntilUtc = DateTime.MinValue;
DateTime lockedUntilUtc;
try
{
// Create an AmqpRequest Message to renew lock
@ -1185,7 +1232,7 @@ namespace Microsoft.Azure.ServiceBus.Core
}
}
async Task DisposeMessageRequestResponseAsync(Guid[] lockTokens, DispositionStatus dispositionStatus)
async Task DisposeMessageRequestResponseAsync(Guid[] lockTokens, DispositionStatus dispositionStatus, IDictionary<string, object> propertiesToModify = null, string deadLetterReason = null, string deadLetterDescription = null)
{
try
{
@ -1194,6 +1241,43 @@ namespace Microsoft.Azure.ServiceBus.Core
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant();
if (deadLetterReason != null)
{
amqpRequestMessage.Map[ManagementConstants.Properties.DeadLetterReason] = deadLetterReason;
}
if (deadLetterDescription != null)
{
amqpRequestMessage.Map[ManagementConstants.Properties.DeadLetterDescription] = deadLetterDescription;
}
if (propertiesToModify != null)
{
var amqpPropertiesToModify = new AmqpMap();
foreach (var pair in propertiesToModify)
{
object amqpObject;
if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out amqpObject))
{
amqpPropertiesToModify[new MapKey(pair.Key)] = amqpObject;
}
else
{
throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetType()));
}
}
if (amqpPropertiesToModify.Count > 0)
{
amqpRequestMessage.Map[ManagementConstants.Properties.PropertiesToModify] = amqpPropertiesToModify;
}
}
if (!string.IsNullOrWhiteSpace(this.SessionIdInternal))
{
amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
}
var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);
if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
{
@ -1316,5 +1400,79 @@ namespace Microsoft.Azure.ServiceBus.Core
throw this.LinkException;
}
}
Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify)
{
return this.GetModifiedOutcome(propertiesToModify, false);
}
Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify)
{
return this.GetModifiedOutcome(propertiesToModify, true);
}
Outcome GetModifiedOutcome(IDictionary<string, object> propertiesToModify, bool undeliverableHere)
{
Modified modified = new Modified();
if (undeliverableHere)
{
modified.UndeliverableHere = true;
}
if (propertiesToModify != null)
{
modified.MessageAnnotations = new Fields();
foreach (var pair in propertiesToModify)
{
object amqpObject;
if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out amqpObject))
{
modified.MessageAnnotations.Add(pair.Key, amqpObject);
}
else
{
throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetType()));
}
}
}
return modified;
}
Rejected GetRejectedOutcome(IDictionary<string, object> propertiesToModify, string deadLetterReason, string deadLetterErrorDescription)
{
var rejected = AmqpConstants.RejectedOutcome;
if (deadLetterReason != null || deadLetterErrorDescription != null || propertiesToModify != null)
{
rejected = new Rejected { Error = new Error { Condition = AmqpClientConstants.DeadLetterName, Info = new Fields() } };
if (deadLetterReason != null)
{
rejected.Error.Info.Add(Message.DeadLetterReasonHeader, deadLetterReason);
}
if (deadLetterErrorDescription != null)
{
rejected.Error.Info.Add(Message.DeadLetterErrorDescriptionHeader, deadLetterErrorDescription);
}
if (propertiesToModify != null)
{
foreach (var pair in propertiesToModify)
{
object amqpObject;
if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out amqpObject))
{
rejected.Error.Info.Add(pair.Key, amqpObject);
}
else
{
throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetType()));
}
}
}
}
return rejected;
}
}
}

Просмотреть файл

@ -13,6 +13,16 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
public class Message
{
/// <summary>
/// User property key representing deadletter reason, when a message is received from a deadletter subqueue of an entity.
/// </summary>
public static string DeadLetterReasonHeader = "DeadLetterReason";
/// <summary>
/// User property key representing detailed error description, when a message is received from a deadletter subqueue of an entity.
/// </summary>
public static string DeadLetterErrorDescriptionHeader = "DeadLetterErrorDescription";
private string messageId;
private string sessionId;
private string replyToSessionId;

Просмотреть файл

@ -5,12 +5,10 @@ namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
using System.Diagnostics;
using System.Threading;
[DebuggerStepThrough]
struct TimeoutHelper
{
public static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(int.MaxValue);
DateTime deadline;
bool deadlineSet;
TimeSpan originalTimeout;
@ -34,63 +32,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
}
}
public TimeSpan OriginalTimeout => this.originalTimeout;
public static bool IsTooLarge(TimeSpan timeout)
{
return (timeout > TimeoutHelper.MaxWait) && (timeout != TimeSpan.MaxValue);
}
public static TimeSpan FromMilliseconds(int milliseconds)
{
if (milliseconds == Timeout.Infinite)
{
return TimeSpan.MaxValue;
}
return TimeSpan.FromMilliseconds(milliseconds);
}
public static int ToMilliseconds(TimeSpan timeout)
{
if (timeout == TimeSpan.MaxValue)
{
return Timeout.Infinite;
}
var ticks = Ticks.FromTimeSpan(timeout);
if (ticks / TimeSpan.TicksPerMillisecond > int.MaxValue)
{
return int.MaxValue;
}
return Ticks.ToMilliseconds(ticks);
}
public static TimeSpan Min(TimeSpan val1, TimeSpan val2)
{
if (val1 > val2)
{
return val2;
}
return val1;
}
public static DateTime Min(DateTime val1, DateTime val2)
{
if (val1 > val2)
{
return val2;
}
return val1;
}
public static TimeSpan Add(TimeSpan timeout1, TimeSpan timeout2)
{
return Ticks.ToTimeSpan(Ticks.Add(Ticks.FromTimeSpan(timeout1), Ticks.FromTimeSpan(timeout2)));
}
public static DateTime Add(DateTime time, TimeSpan timeout)
{
if (timeout >= TimeSpan.Zero && DateTime.MaxValue - time <= timeout)
@ -145,18 +86,6 @@ namespace Microsoft.Azure.ServiceBus.Primitives
}
}
public static bool WaitOne(WaitHandle waitHandle, TimeSpan timeout)
{
ThrowIfNegativeArgument(timeout);
if (timeout == TimeSpan.MaxValue)
{
waitHandle.WaitOne();
return true;
}
return waitHandle.WaitOne(timeout);
}
public TimeSpan RemainingTime()
{
if (!this.deadlineSet)

Просмотреть файл

@ -327,33 +327,54 @@ namespace Microsoft.Azure.ServiceBus
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// Abandoning a message will increase the delivery count on the message.
/// This operation can only be performed on messages that were received by this client.
/// </remarks>
/// This operation can only be performed on messages that were received by this client.
public Task AbandonAsync(string lockToken)
public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
this.ThrowIfClosed();
return this.InnerReceiver.AbandonAsync(lockToken);
return this.InnerReceiver.AbandonAsync(lockToken, propertiesToModify);
}
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver"/> or <see cref="IQueueClient"/>, with the corresponding path.
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this client.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public Task DeadLetterAsync(string lockToken)
public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
this.ThrowIfClosed();
return this.InnerReceiver.DeadLetterAsync(lockToken);
return this.InnerReceiver.DeadLetterAsync(lockToken, propertiesToModify);
}
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="deadLetterReason">The reason for deadlettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)
{
this.ThrowIfClosed();
return this.InnerReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription);
}
/// <summary>

Просмотреть файл

@ -65,7 +65,7 @@ namespace Microsoft.Azure.ServiceBus
return baseMessage;
}
return StringUtility.FormatInvariant("{0}, ({1})", this.ServiceBusNamespace);
return "{0}, ({1})".FormatInvariant(baseMessage, this.ServiceBusNamespace);
}
}

Просмотреть файл

@ -280,15 +280,16 @@ namespace Microsoft.Azure.ServiceBus
/// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processing.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
/// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// Abandoning a message will increase the delivery count on the message.
/// This operation can only be performed on messages that were received by this client.
/// </remarks>
public Task AbandonAsync(string lockToken)
public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
{
this.ThrowIfClosed();
return this.InnerSubscriptionClient.InnerReceiver.AbandonAsync(lockToken);
return this.InnerSubscriptionClient.InnerReceiver.AbandonAsync(lockToken, propertiesToModify);
}
/// <summary>
@ -308,6 +309,43 @@ namespace Microsoft.Azure.ServiceBus
return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken);
}
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify)
{
this.ThrowIfClosed();
return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken, propertiesToModify);
}
/// <summary>
/// Moves a message to the deadletter sub-queue.
/// </summary>
/// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
/// <param name="deadLetterReason">The reason for deadlettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
/// <remarks>
/// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
/// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
/// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, with the corresponding path.
/// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
/// This operation can only be performed on messages that were received by this receiver.
/// </remarks>
public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)
{
this.ThrowIfClosed();
return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription);
}
/// <summary>
/// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receive messages.
/// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the receiver.

Просмотреть файл

@ -123,6 +123,8 @@ namespace Microsoft.Azure.ServiceBus
}
public class Message
{
public static string DeadLetterErrorDescriptionHeader;
public static string DeadLetterReasonHeader;
public Message() { }
public Message(byte[] body) { }
public byte[] Body { get; set; }
@ -184,10 +186,11 @@ namespace Microsoft.Azure.ServiceBus
public string QueueName { get; }
public Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public System.Threading.Tasks.Task AbandonAsync(string lockToken) { }
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
public System.Threading.Tasks.Task CompleteAsync(string lockToken) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
public void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { }
@ -321,11 +324,13 @@ namespace Microsoft.Azure.ServiceBus
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public string SubscriptionName { get; }
public string TopicPath { get; }
public System.Threading.Tasks.Task AbandonAsync(string lockToken) { }
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task AddRuleAsync(string ruleName, Microsoft.Azure.ServiceBus.Filter filter) { }
public System.Threading.Tasks.Task AddRuleAsync(Microsoft.Azure.ServiceBus.RuleDescription description) { }
public System.Threading.Tasks.Task CompleteAsync(string lockToken) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { }
public System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Microsoft.Azure.ServiceBus.RuleDescription>> GetRulesAsync() { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
public void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
@ -370,7 +375,7 @@ namespace Microsoft.Azure.ServiceBus.Core
{
long LastPeekedSequenceNumber { get; }
System.Threading.Tasks.Task CompleteAsync(System.Collections.Generic.IEnumerable<string> lockTokens);
System.Threading.Tasks.Task DeferAsync(string lockToken);
System.Threading.Tasks.Task DeferAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null);
System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Message> PeekAsync();
System.Threading.Tasks.Task<System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message>> PeekAsync(int maxMessageCount);
System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Message> PeekBySequenceNumberAsync(long fromSequenceNumber);
@ -390,9 +395,10 @@ namespace Microsoft.Azure.ServiceBus.Core
string Path { get; }
int PrefetchCount { get; set; }
Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; }
System.Threading.Tasks.Task AbandonAsync(string lockToken);
System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null);
System.Threading.Tasks.Task CompleteAsync(string lockToken);
System.Threading.Tasks.Task DeadLetterAsync(string lockToken);
System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null);
System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler);
void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions);
}
@ -413,16 +419,17 @@ namespace Microsoft.Azure.ServiceBus.Core
public int PrefetchCount { get; set; }
public Microsoft.Azure.ServiceBus.ReceiveMode ReceiveMode { get; set; }
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public System.Threading.Tasks.Task AbandonAsync(string lockToken) { }
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task CompleteAsync(string lockToken) { }
public System.Threading.Tasks.Task CompleteAsync(System.Collections.Generic.IEnumerable<string> lockTokens) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken) { }
public System.Threading.Tasks.Task DeferAsync(string lockToken) { }
protected virtual System.Threading.Tasks.Task OnAbandonAsync(string lockToken) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { }
public System.Threading.Tasks.Task DeferAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
protected virtual System.Threading.Tasks.Task OnAbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
protected virtual System.Threading.Tasks.Task OnCompleteAsync(System.Collections.Generic.IEnumerable<string> lockTokens) { }
protected virtual System.Threading.Tasks.Task OnDeadLetterAsync(string lockToken) { }
protected virtual System.Threading.Tasks.Task OnDeferAsync(string lockToken) { }
protected virtual System.Threading.Tasks.Task OnDeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, string deadLetterReason = null, string deadLetterErrorDescription = null) { }
protected virtual System.Threading.Tasks.Task OnDeferAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
protected virtual void OnMessageHandler(Microsoft.Azure.ServiceBus.MessageHandlerOptions registerHandlerOptions, System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> callback) { }
protected virtual System.Threading.Tasks.Task<System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message>> OnPeekAsync(long fromSequenceNumber, int messageCount = 1) { }
protected virtual System.Threading.Tasks.Task<System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message>> OnReceiveAsync(int maxMessageCount, System.TimeSpan serverWaitTime) { }

Просмотреть файл

@ -7,7 +7,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
using System.Text;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus.Core;
using Core;
using Xunit;
public class SenderReceiverTests : SenderReceiverClientTestBase
@ -192,15 +192,16 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
TestUtility.Log("Begin to receive from an empty queue.");
Task throwingTask;
bool exceptionReceived = false;
object syncLock = new object();
var exceptionReceived = false;
var syncLock = new object();
try
{
throwingTask = new Task(async () =>
throwingTask = Task.Run(async () =>
{
try
{
await receiver.ReceiveAsync(TimeSpan.FromSeconds(40));
var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(40));
throw new Exception($"Received unexpected message: {Encoding.ASCII.GetString(message.Body)}");
}
catch (ObjectDisposedException)
{
@ -209,12 +210,11 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
exceptionReceived = true;
}
}
catch(Exception e)
catch (Exception e)
{
TestUtility.Log("Unexpected exception: " + e);
}
});
throwingTask.Start();
await Task.Delay(1000);
TestUtility.Log("Waited for 1 Sec");
}
@ -224,13 +224,96 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
TestUtility.Log("Closed Receiver");
}
TestUtility.Log("Waiting for 4 Secs");
await Task.Delay(4000);
TestUtility.Log("Waiting for maximum 10 Secs");
var waitingTask = Task.Delay(10000);
await Task.WhenAny(throwingTask, waitingTask);
Assert.True(throwingTask.IsCompleted, "ReceiveAsync did not return immediately after closing connection");
lock (syncLock)
{
Assert.True(exceptionReceived, "Did not receive ObjectDisposedException");
}
}
[Fact]
[DisplayTestMethodName]
public async Task DeadLetterReasonShouldPropagateToTheReceivedMessage()
{
var queueName = TestConstants.NonPartitionedQueueName;
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, queueName);
var dlqReceiver = new MessageReceiver(TestUtility.NamespaceConnectionString, EntityNameHelper.FormatDeadLetterPath(queueName), ReceiveMode.ReceiveAndDelete);
try
{
await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("deadLetterTest2")));
var message = await receiver.ReceiveAsync();
Assert.NotNull(message);
await receiver.DeadLetterAsync(
message.SystemProperties.LockToken,
"deadLetterReason",
"deadLetterDescription");
var dlqMessage = await dlqReceiver.ReceiveAsync();
Assert.NotNull(dlqMessage);
Assert.True(dlqMessage.UserProperties.ContainsKey(Message.DeadLetterReasonHeader));
Assert.True(dlqMessage.UserProperties.ContainsKey(Message.DeadLetterErrorDescriptionHeader));
Assert.Equal(dlqMessage.UserProperties[Message.DeadLetterReasonHeader], "deadLetterReason");
Assert.Equal(dlqMessage.UserProperties[Message.DeadLetterErrorDescriptionHeader], "deadLetterDescription");
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
await dlqReceiver.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
public async Task DispositionWithUpdatedPropertiesShouldPropagateToReceivedMessage()
{
var queueName = TestConstants.NonPartitionedQueueName;
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, queueName);
try
{
await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("propertiesToUpdate")));
var message = await receiver.ReceiveAsync();
Assert.NotNull(message);
await receiver.AbandonAsync(message.SystemProperties.LockToken, new Dictionary<string, object>
{
{"key", "value1"}
});
message = await receiver.ReceiveAsync();
Assert.NotNull(message);
Assert.True(message.UserProperties.ContainsKey("key"));
Assert.Equal(message.UserProperties["key"], "value1");
long sequenceNumber = message.SystemProperties.SequenceNumber;
await receiver.DeferAsync(message.SystemProperties.LockToken, new Dictionary<string, object>
{
{"key", "value2"}
});
message = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
Assert.NotNull(message);
Assert.True(message.UserProperties.ContainsKey("key"));
Assert.Equal(message.UserProperties["key"], "value2");
await receiver.CompleteAsync(message.SystemProperties.LockToken);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
}
}
}