From f7ca8af5b9104658b85df92e20d23bdd3f3daf5b Mon Sep 17 00:00:00 2001 From: Vinay Suryanarayana Date: Fri, 28 Oct 2016 10:48:36 -0700 Subject: [PATCH] Brokered Message Methods to Defer/Complete/Abandon --- .../Amqp/AmqpMessageConverter.cs | 2 - .../Amqp/AmqpMessageReceiver.cs | 17 +++-- .../BrokeredMessage.cs | 65 +++++++++++++++++-- .../MessageReceiver.cs | 34 +++++++++- .../MessageSender.cs | 15 ++--- .../Microsoft.Azure.Messaging/QueueClient.cs | 20 +++++- .../QueueClientTests.cs | 62 ++++++++++++++++-- 7 files changed, 182 insertions(+), 33 deletions(-) diff --git a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageConverter.cs b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageConverter.cs index b5653b2..fa15f1a 100644 --- a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageConverter.cs +++ b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageConverter.cs @@ -38,7 +38,6 @@ namespace Microsoft.Azure.Messaging.Amqp if (brokeredMessages.Count() == 1) { BrokeredMessage singleBrokeredMessage = brokeredMessages.Single(); - //TODO: ProcessFaultInjectionInfo(singleBrokeredMessage); amqpMessage = AmqpMessageConverter.ClientGetMessage(singleBrokeredMessage); } else @@ -46,7 +45,6 @@ namespace Microsoft.Azure.Messaging.Amqp var dataList = new List(); foreach (BrokeredMessage brokeredMessage in brokeredMessages) { - //TODO: ProcessFaultInjectionInfo(brokeredMessage); AmqpMessage amqpMessageItem = AmqpMessageConverter.ClientGetMessage(brokeredMessage); ArraySegment[] payload = amqpMessageItem.GetPayload(); BufferListStream buffer = new BufferListStream(payload); diff --git a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageReceiver.cs b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageReceiver.cs index e4f15fb..5a33719 100644 --- a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageReceiver.cs +++ b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/Amqp/AmqpMessageReceiver.cs @@ -12,15 +12,15 @@ namespace Microsoft.Azure.Messaging.Amqp sealed class AmqpMessageReceiver : MessageReceiver { - const int DefaultPrefetchCount = 300; + public static readonly TimeSpan DefaultBatchFlushInterval = TimeSpan.FromMilliseconds(20); public AmqpMessageReceiver(QueueClient queueClient) - : base() + : base(queueClient.Mode) { this.QueueClient = queueClient; - this.Path = this.QueueClient.QueueName; + this.Path = queueClient.QueueName; this.ReceiveLinkManager = new FaultTolerantAmqpObject(this.CreateLinkAsync, this.CloseSession); - this.PrefetchCount = DefaultPrefetchCount; + this.PrefetchCount = queueClient.PrefetchCount; } /// @@ -48,7 +48,7 @@ namespace Microsoft.Azure.Messaging.Amqp ReceivingAmqpLink receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()); IEnumerable amqpMessages = null; bool hasMessages = await Task.Factory.FromAsync( - (c, s) => receiveLink.BeginReceiveMessages(maxMessageCount, timeoutHelper.RemainingTime(), c, s), + (c, s) => receiveLink.BeginReceiveRemoteMessages(maxMessageCount, AmqpMessageReceiver.DefaultBatchFlushInterval, timeoutHelper.RemainingTime(), c, s), (a) => receiveLink.EndReceiveMessages(a, out amqpMessages), this); @@ -69,9 +69,12 @@ namespace Microsoft.Azure.Messaging.Amqp if (this.QueueClient.Mode == ReceiveMode.ReceiveAndDelete) { - receiveLink.DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome); + receiveLink.DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome); } - brokeredMessages.Add(AmqpMessageConverter.ClientGetMessage(amqpMessage)); + + BrokeredMessage brokeredMessage = AmqpMessageConverter.ClientGetMessage(amqpMessage); + brokeredMessage.Receiver = this; // Associate the Message with this Receiver. + brokeredMessages.Add(brokeredMessage); } return brokeredMessages; diff --git a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/BrokeredMessage.cs b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/BrokeredMessage.cs index 7a3f038..809a94a 100644 --- a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/BrokeredMessage.cs +++ b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/BrokeredMessage.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Messaging using System.IO; using System.Runtime.Serialization; using System.Threading; + using System.Threading.Tasks; using System.Xml.Serialization; using Microsoft.Azure.Messaging.Primitives; @@ -992,13 +993,7 @@ namespace Microsoft.Azure.Messaging } } - /// Creates the empty message. - /// . - /// TODO: - //internal static BrokeredMessage CreateEmptyMessage() - //{ - // return new BrokeredMessage((object)null); - //} + internal MessageReceiver Receiver { get; set; } /// Deserializes the brokered message body into an object of the specified type by using the /// with a binary @@ -1075,6 +1070,47 @@ namespace Microsoft.Azure.Messaging return (T)serializer.ReadObject(this.BodyStream); } + // Summary: + // Asynchronously Abandons the lock on a peek-locked message. + public Task AbandonAsync() + { + this.ThrowIfDisposed(); + this.ThrowIfNotLocked(); + + return this.Receiver.AbandonAsync(new Guid[] { this.LockToken }); + } + + // Summary: + // Asynchronously completes the receive operation of a message and indicates that + // the message should be marked as processed and deleted. + public Task CompleteAsync() + { + this.ThrowIfDisposed(); + this.ThrowIfNotLocked(); + + return this.Receiver.CompleteAsync(new Guid[] {this.LockToken}); + } + + // Summary: + // Asynchronously moves the message to the dead letter queue. + public Task DeadLetterAsync() + { + this.ThrowIfDisposed(); + this.ThrowIfNotLocked(); + + return this.Receiver.DeadLetterAsync(new Guid[] { this.LockToken }); + } + + // Summary: + // Asynchronously indicates that the receiver wants to defer the processing for this message. + public Task DeferAsync() + { + this.ThrowIfDisposed(); + this.ThrowIfNotLocked(); + + return this.Receiver.DeferAsync(new Guid[] { this.LockToken }); + } + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. public void Dispose() { @@ -1394,6 +1430,21 @@ namespace Microsoft.Azure.Messaging } } + /// Throw if not locked. + /// Thrown when as error. + void ThrowIfNotLocked() + { + if (this.Receiver == null) + { + throw Fx.Exception.AsError(new InvalidOperationException("The operation cannot be completed because the receiver is null.")); + } + + if (this.Receiver.ReceiveMode == ReceiveMode.ReceiveAndDelete) + { + throw Fx.Exception.AsError(new InvalidOperationException("The operation is only supported in 'PeekLock' receive mode.")); + } + } + /// Throw if not received. /// Thrown when as error. void ThrowIfNotReceived() diff --git a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageReceiver.cs b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageReceiver.cs index 211f0ce..534d408 100644 --- a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageReceiver.cs +++ b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageReceiver.cs @@ -6,14 +6,18 @@ namespace Microsoft.Azure.Messaging using System; using System.Collections.Generic; using System.Threading.Tasks; + using System.Linq; abstract class MessageReceiver : ClientEntity { - protected MessageReceiver() + protected MessageReceiver(ReceiveMode receiveMode) : base(nameof(MessageReceiver) + StringUtility.GetRandomString()) { + this.ReceiveMode = receiveMode; } + public ReceiveMode ReceiveMode { get; protected set; } + public Task> ReceiveAsync(int maxMessageCount) { return this.OnReceiveAsync(maxMessageCount); @@ -21,21 +25,33 @@ namespace Microsoft.Azure.Messaging public Task CompleteAsync(IEnumerable lockTokens) { + this.ThrowIfNotPeekLockMode(); + MessageReceiver.ValidateLockTokens(lockTokens); + return this.OnCompleteAsync(lockTokens); } public Task AbandonAsync(IEnumerable lockTokens) { + this.ThrowIfNotPeekLockMode(); + MessageReceiver.ValidateLockTokens(lockTokens); + return this.OnAbandonAsync(lockTokens); } public Task DeferAsync(IEnumerable lockTokens) { + this.ThrowIfNotPeekLockMode(); + MessageReceiver.ValidateLockTokens(lockTokens); + return this.OnDeferAsync(lockTokens); } public Task DeadLetterAsync(IEnumerable lockTokens) { + this.ThrowIfNotPeekLockMode(); + MessageReceiver.ValidateLockTokens(lockTokens); + return this.OnDeadLetterAsync(lockTokens); } @@ -48,5 +64,21 @@ namespace Microsoft.Azure.Messaging protected abstract Task OnDeferAsync(IEnumerable lockTokens); protected abstract Task OnDeadLetterAsync(IEnumerable lockTokens); + + void ThrowIfNotPeekLockMode() + { + if (this.ReceiveMode != ReceiveMode.PeekLock) + { + throw Fx.Exception.AsError(new InvalidOperationException("The operation is only supported in 'PeekLock' receive mode.")); + } + } + + static void ValidateLockTokens(IEnumerable lockTokens) + { + if (lockTokens == null || !lockTokens.Any()) + { + throw Fx.Exception.ArgumentNull("lockTokens"); + } + } } } diff --git a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageSender.cs b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageSender.cs index 53af915..e42c85f 100644 --- a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageSender.cs +++ b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/MessageSender.cs @@ -22,22 +22,17 @@ namespace Microsoft.Azure.Messaging protected abstract Task OnSendAsync(IEnumerable brokeredMessages); - internal static int ValidateMessages(IEnumerable brokeredMessages) + static void ValidateMessages(IEnumerable brokeredMessages) { - int count; - if (brokeredMessages == null || (count = brokeredMessages.Count()) == 0) + if (brokeredMessages == null || !brokeredMessages.Any()) { - throw Fx.Exception.Argument(nameof(brokeredMessages), Resources.BrokeredMessageListIsNullOrEmpty); + throw Fx.Exception.ArgumentNull("brokeredMessages"); } - foreach (BrokeredMessage brokeredMessage in brokeredMessages) + if (brokeredMessages.Any(brokeredMessage => brokeredMessage.IsLockTokenSet)) { - if (brokeredMessage.IsLockTokenSet) - { - throw Fx.Exception.Argument(nameof(brokeredMessages), "Cannot Send ReceivedMessages"); - } + throw Fx.Exception.Argument(nameof(brokeredMessages), "Cannot Send ReceivedMessages"); } - return count; } } } diff --git a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/QueueClient.cs b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/QueueClient.cs index e020cda..f8f633e 100644 --- a/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/QueueClient.cs +++ b/Microsoft.Azure.Messaging/src/Microsoft.Azure.Messaging/QueueClient.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Linq; + namespace Microsoft.Azure.Messaging { using System; @@ -16,12 +18,13 @@ namespace Microsoft.Azure.Messaging MessageSender innerSender; MessageReceiver innerReceiver; - internal QueueClient(ServiceBusConnectionSettings connectionSettings, ReceiveMode mode) + + internal QueueClient(ServiceBusConnectionSettings connectionSettings, ReceiveMode receiveMode) : base($"{nameof(QueueClient)}{ClientEntity.GetNextId()}({connectionSettings.EntityPath})") { this.ConnectionSettings = connectionSettings; this.QueueName = connectionSettings.EntityPath; - this.Mode = mode; + this.Mode = receiveMode; } public string QueueName { get; } @@ -30,6 +33,8 @@ namespace Microsoft.Azure.Messaging public ReceiveMode Mode { get; private set; } + public int PrefetchCount { get; set; } + protected object ThisLock { get; } = new object(); MessageSender InnerSender @@ -158,6 +163,17 @@ namespace Microsoft.Azure.Messaging } } + public async Task ReceiveAsync() + { + IList messages = await this.ReceiveAsync(1); + if (messages != null && messages.Count > 0) + { + return messages[0]; + } + + return null; + } + public async Task> ReceiveAsync(int maxMessageCount) { try diff --git a/Microsoft.Azure.Messaging/test/Microsoft.Azure.Messaging.UnitTests/QueueClientTests.cs b/Microsoft.Azure.Messaging/test/Microsoft.Azure.Messaging.UnitTests/QueueClientTests.cs index 9aa7038..0953756 100644 --- a/Microsoft.Azure.Messaging/test/Microsoft.Azure.Messaging.UnitTests/QueueClientTests.cs +++ b/Microsoft.Azure.Messaging/test/Microsoft.Azure.Messaging.UnitTests/QueueClientTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Dynamic; + namespace Microsoft.Azure.Messaging.UnitTests { using System; @@ -9,7 +11,6 @@ namespace Microsoft.Azure.Messaging.UnitTests using System.Threading.Tasks; using System.Linq; using Xunit; - using Microsoft.Azure.Messaging; public class QueueClientTests { @@ -18,8 +19,11 @@ namespace Microsoft.Azure.Messaging.UnitTests { ConnectionString = Environment.GetEnvironmentVariable("QUEUECLIENTCONNECTIONSTRING"); - ConnectionString = - "Endpoint=sb://testvinsustandard924.servicebus.windows.net/;EntityPath=testq;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+nCcyesi2Vdw5eAQeJvR85XMwpj46o2gvxmdizbqXoY="; + //ConnectionString = + // "Endpoint=sb://testvinsustandard924.servicebus.windows.net/;EntityPath=testq;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+nCcyesi2Vdw5eAQeJvR85XMwpj46o2gvxmdizbqXoY="; + + ConnectionString = + "Endpoint=sb://newvinsu1028.servicebus.windows.net/;EntityPath=testq11028;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=7oMVG2as0pelFCFujgSb2JExro7/tZ6oIGcECpljubc="; if (string.IsNullOrWhiteSpace(ConnectionString)) { throw new InvalidOperationException("QUEUECLIENTCONNECTIONSTRING environment variable was not found!"); @@ -28,6 +32,49 @@ namespace Microsoft.Azure.Messaging.UnitTests String ConnectionString { get; } + [Fact] + async Task BrokeredMessageOperationsTest() + { + //Create QueueClient with ReceiveDelete, + //Send and Receive a message, Try to Complete/Abandon/Defer/DeadLetter should throw InvalidOperationException() + QueueClient queueClient = QueueClient.Create(this.ConnectionString, ReceiveMode.ReceiveAndDelete); + await this.SendMessagesAsync(queueClient, 1); + BrokeredMessage message = await queueClient.ReceiveAsync(); + Assert.NotNull((object)message); + + await Assert.ThrowsAsync(async () => await message.CompleteAsync()); + await Assert.ThrowsAsync(async () => await message.AbandonAsync()); + await Assert.ThrowsAsync(async () => await message.DeferAsync()); + await Assert.ThrowsAsync(async () => await message.DeadLetterAsync()); + + //Create a PeekLock queueClient and do rest of the operations + //Send a Message, Receive/ Abandon and Complete it using BrokeredMessage methods + queueClient = QueueClient.Create(this.ConnectionString); + await this.SendMessagesAsync(queueClient, 1); + message = await queueClient.ReceiveAsync(); + Assert.NotNull((object)message); + await message.AbandonAsync(); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + message = await queueClient.ReceiveAsync(); + await message.CompleteAsync(); + + //Send a Message, Receive/DeadLetter using BrokeredMessage methods + await this.SendMessagesAsync(queueClient, 1); + message = await queueClient.ReceiveAsync(); + await message.DeadLetterAsync(); + string entityPath = EntityNameHelper.FormatDeadLetterPath(queueClient.QueueName); + QueueClient deadLetterQueueClient = QueueClient.Create(this.ConnectionString, entityPath); + message = await deadLetterQueueClient.ReceiveAsync(); + await message.CompleteAsync(); + + //Send a Message, Receive/Defer using BrokeredMessage methods + await this.SendMessagesAsync(queueClient, 1); + message = await queueClient.ReceiveAsync(); + await message.DeferAsync(); + + //TODO: Once ReceivebySequence is implemented, Receive and Complete this message + } + [Fact] async Task QueueClientBasicPeekLockTest() { @@ -162,13 +209,20 @@ namespace Microsoft.Azure.Messaging.UnitTests async Task SendMessagesAsync(QueueClient queueClient, int messageCount) { + if (messageCount == 0) + { + await Task.FromResult(false); + } + + List messagesToSend = new List(); for (int i = 0; i < messageCount; i++) { BrokeredMessage message = new BrokeredMessage("test" + i); message.Label = "test" + i; - await queueClient.SendAsync(message); + messagesToSend.Add(message); } + await queueClient.SendAsync(messagesToSend); WriteLine(string.Format("Sent {0} messages", messageCount)); }