Brokered Message Methods to Defer/Complete/Abandon

This commit is contained in:
Vinay Suryanarayana 2016-10-28 10:48:36 -07:00
Родитель c8b382bdc3
Коммит f7ca8af5b9
7 изменённых файлов: 182 добавлений и 33 удалений

Просмотреть файл

@ -38,7 +38,6 @@ namespace Microsoft.Azure.Messaging.Amqp
if (brokeredMessages.Count() == 1)
{
BrokeredMessage singleBrokeredMessage = brokeredMessages.Single();
//TODO: ProcessFaultInjectionInfo(singleBrokeredMessage);
amqpMessage = AmqpMessageConverter.ClientGetMessage(singleBrokeredMessage);
}
else
@ -46,7 +45,6 @@ namespace Microsoft.Azure.Messaging.Amqp
var dataList = new List<Data>();
foreach (BrokeredMessage brokeredMessage in brokeredMessages)
{
//TODO: ProcessFaultInjectionInfo(brokeredMessage);
AmqpMessage amqpMessageItem = AmqpMessageConverter.ClientGetMessage(brokeredMessage);
ArraySegment<byte>[] payload = amqpMessageItem.GetPayload();
BufferListStream buffer = new BufferListStream(payload);

Просмотреть файл

@ -12,15 +12,15 @@ namespace Microsoft.Azure.Messaging.Amqp
sealed class AmqpMessageReceiver : MessageReceiver
{
const int DefaultPrefetchCount = 300;
public static readonly TimeSpan DefaultBatchFlushInterval = TimeSpan.FromMilliseconds(20);
public AmqpMessageReceiver(QueueClient queueClient)
: base()
: base(queueClient.Mode)
{
this.QueueClient = queueClient;
this.Path = this.QueueClient.QueueName;
this.Path = queueClient.QueueName;
this.ReceiveLinkManager = new FaultTolerantAmqpObject<ReceivingAmqpLink>(this.CreateLinkAsync, this.CloseSession);
this.PrefetchCount = DefaultPrefetchCount;
this.PrefetchCount = queueClient.PrefetchCount;
}
/// <summary>
@ -48,7 +48,7 @@ namespace Microsoft.Azure.Messaging.Amqp
ReceivingAmqpLink receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime());
IEnumerable<AmqpMessage> amqpMessages = null;
bool hasMessages = await Task.Factory.FromAsync(
(c, s) => receiveLink.BeginReceiveMessages(maxMessageCount, timeoutHelper.RemainingTime(), c, s),
(c, s) => receiveLink.BeginReceiveRemoteMessages(maxMessageCount, AmqpMessageReceiver.DefaultBatchFlushInterval, timeoutHelper.RemainingTime(), c, s),
(a) => receiveLink.EndReceiveMessages(a, out amqpMessages),
this);
@ -69,9 +69,12 @@ namespace Microsoft.Azure.Messaging.Amqp
if (this.QueueClient.Mode == ReceiveMode.ReceiveAndDelete)
{
receiveLink.DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome);
receiveLink.DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome);
}
brokeredMessages.Add(AmqpMessageConverter.ClientGetMessage(amqpMessage));
BrokeredMessage brokeredMessage = AmqpMessageConverter.ClientGetMessage(amqpMessage);
brokeredMessage.Receiver = this; // Associate the Message with this Receiver.
brokeredMessages.Add(brokeredMessage);
}
return brokeredMessages;

Просмотреть файл

@ -9,6 +9,7 @@ namespace Microsoft.Azure.Messaging
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;
using Microsoft.Azure.Messaging.Primitives;
@ -992,13 +993,7 @@ namespace Microsoft.Azure.Messaging
}
}
/// <summary> Creates the empty message. </summary>
/// <returns> . </returns>
/// TODO:
//internal static BrokeredMessage CreateEmptyMessage()
//{
// return new BrokeredMessage((object)null);
//}
internal MessageReceiver Receiver { get; set; }
/// <summary>Deserializes the brokered message body into an object of the specified type by using the
/// <see cref="System.Runtime.Serialization.DataContractSerializer" /> with a binary
@ -1075,6 +1070,47 @@ namespace Microsoft.Azure.Messaging
return (T)serializer.ReadObject(this.BodyStream);
}
// Summary:
// Asynchronously Abandons the lock on a peek-locked message.
public Task AbandonAsync()
{
this.ThrowIfDisposed();
this.ThrowIfNotLocked();
return this.Receiver.AbandonAsync(new Guid[] { this.LockToken });
}
// Summary:
// Asynchronously completes the receive operation of a message and indicates that
// the message should be marked as processed and deleted.
public Task CompleteAsync()
{
this.ThrowIfDisposed();
this.ThrowIfNotLocked();
return this.Receiver.CompleteAsync(new Guid[] {this.LockToken});
}
// Summary:
// Asynchronously moves the message to the dead letter queue.
public Task DeadLetterAsync()
{
this.ThrowIfDisposed();
this.ThrowIfNotLocked();
return this.Receiver.DeadLetterAsync(new Guid[] { this.LockToken });
}
// Summary:
// Asynchronously indicates that the receiver wants to defer the processing for this message.
public Task DeferAsync()
{
this.ThrowIfDisposed();
this.ThrowIfNotLocked();
return this.Receiver.DeferAsync(new Guid[] { this.LockToken });
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
@ -1394,6 +1430,21 @@ namespace Microsoft.Azure.Messaging
}
}
/// <summary> Throw if not locked. </summary>
/// <exception cref="FxTrace.Exception"> Thrown when as error. </exception>
void ThrowIfNotLocked()
{
if (this.Receiver == null)
{
throw Fx.Exception.AsError(new InvalidOperationException("The operation cannot be completed because the receiver is null."));
}
if (this.Receiver.ReceiveMode == ReceiveMode.ReceiveAndDelete)
{
throw Fx.Exception.AsError(new InvalidOperationException("The operation is only supported in 'PeekLock' receive mode."));
}
}
/// <summary> Throw if not received. </summary>
/// <exception cref="Fx.Exception"> Thrown when as error. </exception>
void ThrowIfNotReceived()

Просмотреть файл

@ -6,14 +6,18 @@ namespace Microsoft.Azure.Messaging
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
abstract class MessageReceiver : ClientEntity
{
protected MessageReceiver()
protected MessageReceiver(ReceiveMode receiveMode)
: base(nameof(MessageReceiver) + StringUtility.GetRandomString())
{
this.ReceiveMode = receiveMode;
}
public ReceiveMode ReceiveMode { get; protected set; }
public Task<IList<BrokeredMessage>> ReceiveAsync(int maxMessageCount)
{
return this.OnReceiveAsync(maxMessageCount);
@ -21,21 +25,33 @@ namespace Microsoft.Azure.Messaging
public Task CompleteAsync(IEnumerable<Guid> lockTokens)
{
this.ThrowIfNotPeekLockMode();
MessageReceiver.ValidateLockTokens(lockTokens);
return this.OnCompleteAsync(lockTokens);
}
public Task AbandonAsync(IEnumerable<Guid> lockTokens)
{
this.ThrowIfNotPeekLockMode();
MessageReceiver.ValidateLockTokens(lockTokens);
return this.OnAbandonAsync(lockTokens);
}
public Task DeferAsync(IEnumerable<Guid> lockTokens)
{
this.ThrowIfNotPeekLockMode();
MessageReceiver.ValidateLockTokens(lockTokens);
return this.OnDeferAsync(lockTokens);
}
public Task DeadLetterAsync(IEnumerable<Guid> lockTokens)
{
this.ThrowIfNotPeekLockMode();
MessageReceiver.ValidateLockTokens(lockTokens);
return this.OnDeadLetterAsync(lockTokens);
}
@ -48,5 +64,21 @@ namespace Microsoft.Azure.Messaging
protected abstract Task OnDeferAsync(IEnumerable<Guid> lockTokens);
protected abstract Task OnDeadLetterAsync(IEnumerable<Guid> lockTokens);
void ThrowIfNotPeekLockMode()
{
if (this.ReceiveMode != ReceiveMode.PeekLock)
{
throw Fx.Exception.AsError(new InvalidOperationException("The operation is only supported in 'PeekLock' receive mode."));
}
}
static void ValidateLockTokens(IEnumerable<Guid> lockTokens)
{
if (lockTokens == null || !lockTokens.Any())
{
throw Fx.Exception.ArgumentNull("lockTokens");
}
}
}
}

Просмотреть файл

@ -22,22 +22,17 @@ namespace Microsoft.Azure.Messaging
protected abstract Task OnSendAsync(IEnumerable<BrokeredMessage> brokeredMessages);
internal static int ValidateMessages(IEnumerable<BrokeredMessage> brokeredMessages)
static void ValidateMessages(IEnumerable<BrokeredMessage> brokeredMessages)
{
int count;
if (brokeredMessages == null || (count = brokeredMessages.Count()) == 0)
if (brokeredMessages == null || !brokeredMessages.Any())
{
throw Fx.Exception.Argument(nameof(brokeredMessages), Resources.BrokeredMessageListIsNullOrEmpty);
throw Fx.Exception.ArgumentNull("brokeredMessages");
}
foreach (BrokeredMessage brokeredMessage in brokeredMessages)
if (brokeredMessages.Any(brokeredMessage => brokeredMessage.IsLockTokenSet))
{
if (brokeredMessage.IsLockTokenSet)
{
throw Fx.Exception.Argument(nameof(brokeredMessages), "Cannot Send ReceivedMessages");
}
throw Fx.Exception.Argument(nameof(brokeredMessages), "Cannot Send ReceivedMessages");
}
return count;
}
}
}

Просмотреть файл

@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Linq;
namespace Microsoft.Azure.Messaging
{
using System;
@ -16,12 +18,13 @@ namespace Microsoft.Azure.Messaging
MessageSender innerSender;
MessageReceiver innerReceiver;
internal QueueClient(ServiceBusConnectionSettings connectionSettings, ReceiveMode mode)
internal QueueClient(ServiceBusConnectionSettings connectionSettings, ReceiveMode receiveMode)
: base($"{nameof(QueueClient)}{ClientEntity.GetNextId()}({connectionSettings.EntityPath})")
{
this.ConnectionSettings = connectionSettings;
this.QueueName = connectionSettings.EntityPath;
this.Mode = mode;
this.Mode = receiveMode;
}
public string QueueName { get; }
@ -30,6 +33,8 @@ namespace Microsoft.Azure.Messaging
public ReceiveMode Mode { get; private set; }
public int PrefetchCount { get; set; }
protected object ThisLock { get; } = new object();
MessageSender InnerSender
@ -158,6 +163,17 @@ namespace Microsoft.Azure.Messaging
}
}
public async Task<BrokeredMessage> ReceiveAsync()
{
IList<BrokeredMessage> messages = await this.ReceiveAsync(1);
if (messages != null && messages.Count > 0)
{
return messages[0];
}
return null;
}
public async Task<IList<BrokeredMessage>> ReceiveAsync(int maxMessageCount)
{
try

Просмотреть файл

@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Dynamic;
namespace Microsoft.Azure.Messaging.UnitTests
{
using System;
@ -9,7 +11,6 @@ namespace Microsoft.Azure.Messaging.UnitTests
using System.Threading.Tasks;
using System.Linq;
using Xunit;
using Microsoft.Azure.Messaging;
public class QueueClientTests
{
@ -18,8 +19,11 @@ namespace Microsoft.Azure.Messaging.UnitTests
{
ConnectionString = Environment.GetEnvironmentVariable("QUEUECLIENTCONNECTIONSTRING");
ConnectionString =
"Endpoint=sb://testvinsustandard924.servicebus.windows.net/;EntityPath=testq;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+nCcyesi2Vdw5eAQeJvR85XMwpj46o2gvxmdizbqXoY=";
//ConnectionString =
// "Endpoint=sb://testvinsustandard924.servicebus.windows.net/;EntityPath=testq;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+nCcyesi2Vdw5eAQeJvR85XMwpj46o2gvxmdizbqXoY=";
ConnectionString =
"Endpoint=sb://newvinsu1028.servicebus.windows.net/;EntityPath=testq11028;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=7oMVG2as0pelFCFujgSb2JExro7/tZ6oIGcECpljubc=";
if (string.IsNullOrWhiteSpace(ConnectionString))
{
throw new InvalidOperationException("QUEUECLIENTCONNECTIONSTRING environment variable was not found!");
@ -28,6 +32,49 @@ namespace Microsoft.Azure.Messaging.UnitTests
String ConnectionString { get; }
[Fact]
async Task BrokeredMessageOperationsTest()
{
//Create QueueClient with ReceiveDelete,
//Send and Receive a message, Try to Complete/Abandon/Defer/DeadLetter should throw InvalidOperationException()
QueueClient queueClient = QueueClient.Create(this.ConnectionString, ReceiveMode.ReceiveAndDelete);
await this.SendMessagesAsync(queueClient, 1);
BrokeredMessage message = await queueClient.ReceiveAsync();
Assert.NotNull((object)message);
await Assert.ThrowsAsync<InvalidOperationException>(async () => await message.CompleteAsync());
await Assert.ThrowsAsync<InvalidOperationException>(async () => await message.AbandonAsync());
await Assert.ThrowsAsync<InvalidOperationException>(async () => await message.DeferAsync());
await Assert.ThrowsAsync<InvalidOperationException>(async () => await message.DeadLetterAsync());
//Create a PeekLock queueClient and do rest of the operations
//Send a Message, Receive/ Abandon and Complete it using BrokeredMessage methods
queueClient = QueueClient.Create(this.ConnectionString);
await this.SendMessagesAsync(queueClient, 1);
message = await queueClient.ReceiveAsync();
Assert.NotNull((object)message);
await message.AbandonAsync();
await Task.Delay(TimeSpan.FromMilliseconds(100));
message = await queueClient.ReceiveAsync();
await message.CompleteAsync();
//Send a Message, Receive/DeadLetter using BrokeredMessage methods
await this.SendMessagesAsync(queueClient, 1);
message = await queueClient.ReceiveAsync();
await message.DeadLetterAsync();
string entityPath = EntityNameHelper.FormatDeadLetterPath(queueClient.QueueName);
QueueClient deadLetterQueueClient = QueueClient.Create(this.ConnectionString, entityPath);
message = await deadLetterQueueClient.ReceiveAsync();
await message.CompleteAsync();
//Send a Message, Receive/Defer using BrokeredMessage methods
await this.SendMessagesAsync(queueClient, 1);
message = await queueClient.ReceiveAsync();
await message.DeferAsync();
//TODO: Once ReceivebySequence is implemented, Receive and Complete this message
}
[Fact]
async Task QueueClientBasicPeekLockTest()
{
@ -162,13 +209,20 @@ namespace Microsoft.Azure.Messaging.UnitTests
async Task SendMessagesAsync(QueueClient queueClient, int messageCount)
{
if (messageCount == 0)
{
await Task.FromResult(false);
}
List<BrokeredMessage> messagesToSend = new List<BrokeredMessage>();
for (int i = 0; i < messageCount; i++)
{
BrokeredMessage message = new BrokeredMessage("test" + i);
message.Label = "test" + i;
await queueClient.SendAsync(message);
messagesToSend.Add(message);
}
await queueClient.SendAsync(messagesToSend);
WriteLine(string.Format("Sent {0} messages", messageCount));
}