Extending the plugins to messagesessions (#212)

Ensuring the registered plugins gets passed on to the sessions that were accepted.
This commit is contained in:
Neeraj Makam 2017-07-11 20:08:23 -07:00 коммит произвёл vinaysurya
Родитель c64800bcc1
Коммит 1479fa3c69
8 изменённых файлов: 129 добавлений и 14 удалений

Просмотреть файл

@ -35,7 +35,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// </remarks>
/// <seealso cref="IMessageSession"/>
/// <seealso cref="SessionClient"/>
public interface ISessionClient
public interface ISessionClient : IClientEntity
{
/// <summary>
/// Gets the path of the entity. This is either the name of the queue, or the full path of the subscription.
@ -45,6 +45,8 @@ namespace Microsoft.Azure.ServiceBus.Core
/// <summary>
/// Gets a session object of any <see cref="IMessageSession.SessionId"/> that can be used to receive messages for that sessionId.
/// </summary>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
Task<IMessageSession> AcceptMessageSessionAsync();
@ -52,6 +54,8 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Gets a session object of any <see cref="IMessageSession.SessionId"/> that can be used to receive messages for that sessionId.
/// </summary>
/// <param name="serverWaitTime">Amount of time for which the call should wait for to fetch the next session.</param>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
Task<IMessageSession> AcceptMessageSessionAsync(TimeSpan serverWaitTime);
@ -59,6 +63,8 @@ namespace Microsoft.Azure.ServiceBus.Core
/// Gets a particular session object identified by <paramref name="sessionId"/> that can be used to receive messages for that sessionId.
/// </summary>
/// <param name="sessionId">The sessionId present in all its messages.</param>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
Task<IMessageSession> AcceptMessageSessionAsync(string sessionId);
@ -67,6 +73,8 @@ namespace Microsoft.Azure.ServiceBus.Core
/// </summary>
/// <param name="sessionId">The sessionId present in all its messages.</param>
/// <param name="serverWaitTime">Amount of time for which the call should wait for to fetch the next session.</param>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
Task<IMessageSession> AcceptMessageSessionAsync(string sessionId, TimeSpan serverWaitTime);
}

Просмотреть файл

@ -103,6 +103,7 @@ namespace Microsoft.Azure.ServiceBus
MessagingEventSource.Log.QueueClientCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath, receiveMode.ToString());
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.OperationTimeout = this.ServiceBusConnection.OperationTimeout;
this.syncLock = new object();
this.QueueName = entityPath;
this.ReceiveMode = receiveMode;
@ -236,7 +237,10 @@ namespace Microsoft.Azure.ServiceBus
this.PrefetchCount,
this.ServiceBusConnection,
this.CbsTokenProvider,
this.RetryPolicy);
this.RetryPolicy,
this.RegisteredPlugins);
}
}
}

Просмотреть файл

@ -5,6 +5,7 @@ namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Amqp;
using Azure.Amqp;
@ -86,7 +87,8 @@ namespace Microsoft.Azure.ServiceBus
prefetchCount,
new ServiceBusNamespaceConnection(connectionString),
null,
retryPolicy)
retryPolicy,
null)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
@ -110,15 +112,26 @@ namespace Microsoft.Azure.ServiceBus
int prefetchCount,
ServiceBusConnection serviceBusConnection,
ICbsTokenProvider cbsTokenProvider,
RetryPolicy retryPolicy)
RetryPolicy retryPolicy,
IList<ServiceBusPlugin> registeredPlugins)
: base(clientId, retryPolicy ?? RetryPolicy.Default)
{
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.OperationTimeout = this.ServiceBusConnection.OperationTimeout;
this.EntityPath = entityPath;
this.EntityType = entityType;
this.ReceiveMode = receiveMode;
this.PrefetchCount = prefetchCount;
this.CbsTokenProvider = cbsTokenProvider;
// Register plugins on the message session.
if (registeredPlugins != null)
{
foreach (var serviceBusPlugin in registeredPlugins)
{
this.RegisterPlugin(serviceBusPlugin);
}
}
}
ReceiveMode ReceiveMode { get; }
@ -139,7 +152,7 @@ namespace Microsoft.Azure.ServiceBus
/// <summary>
/// Gets a list of currently registered plugins.
/// </summary>
public override IList<ServiceBusPlugin> RegisteredPlugins => throw new NotImplementedException();
public override IList<ServiceBusPlugin> RegisteredPlugins { get; } = new List<ServiceBusPlugin>();
/// <summary></summary>
/// <returns>The asynchronous operation.</returns>
@ -154,6 +167,8 @@ namespace Microsoft.Azure.ServiceBus
/// <summary>
/// Gets a session object of any <see cref="IMessageSession.SessionId"/> that can be used to receive messages for that sessionId.
/// </summary>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
public Task<IMessageSession> AcceptMessageSessionAsync()
{
@ -164,6 +179,8 @@ namespace Microsoft.Azure.ServiceBus
/// Gets a session object of any <see cref="IMessageSession.SessionId"/> that can be used to receive messages for that sessionId.
/// </summary>
/// <param name="serverWaitTime">Amount of time for which the call should wait to fetch the next session.</param>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
public Task<IMessageSession> AcceptMessageSessionAsync(TimeSpan serverWaitTime)
{
@ -174,6 +191,8 @@ namespace Microsoft.Azure.ServiceBus
/// Gets a particular session object identified by <paramref name="sessionId"/> that can be used to receive messages for that sessionId.
/// </summary>
/// <param name="sessionId">The sessionId present in all its messages.</param>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
public Task<IMessageSession> AcceptMessageSessionAsync(string sessionId)
{
@ -185,6 +204,8 @@ namespace Microsoft.Azure.ServiceBus
/// </summary>
/// <param name="sessionId">The sessionId present in all its messages.</param>
/// <param name="serverWaitTime">Amount of time for which the call should wait to fetch the next session.</param>
/// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSession"/> that is accepted.
/// Individual sessions can further register additional plugins.</remarks>
/// <returns>A session object.</returns>
public async Task<IMessageSession> AcceptMessageSessionAsync(string sessionId, TimeSpan serverWaitTime)
{
@ -232,6 +253,12 @@ namespace Microsoft.Azure.ServiceBus
session.SessionIdInternal);
session.UpdateClientId(ClientEntity.GenerateClientId(nameof(MessageSession), $"{this.EntityPath}_{session.SessionId}"));
// Register plugins on the message session.
foreach (var serviceBusPlugin in this.RegisteredPlugins)
{
session.RegisterPlugin(serviceBusPlugin);
}
return session;
}
@ -241,7 +268,15 @@ namespace Microsoft.Azure.ServiceBus
/// <param name="serviceBusPlugin">The <see cref="ServiceBusPlugin"/> to register.</param>
public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
{
throw new NotImplementedException();
if (serviceBusPlugin == null)
{
throw new ArgumentNullException(nameof(serviceBusPlugin), Resources.ArgumentNullOrWhiteSpace.FormatForUser(nameof(serviceBusPlugin)));
}
else if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPlugin.Name))
{
throw new ArgumentException(nameof(serviceBusPlugin), Resources.PluginAlreadyRegistered.FormatForUser(nameof(serviceBusPlugin)));
}
this.RegisteredPlugins.Add(serviceBusPlugin);
}
/// <summary>
@ -250,7 +285,19 @@ namespace Microsoft.Azure.ServiceBus
/// <param name="serviceBusPluginName">The <see cref="ServiceBusPlugin.Name"/> of the plugin to be unregistered.</param>
public override void UnregisterPlugin(string serviceBusPluginName)
{
throw new NotImplementedException();
if (this.RegisteredPlugins == null)
{
return;
}
if (serviceBusPluginName == null)
{
throw new ArgumentNullException(nameof(serviceBusPluginName), Resources.ArgumentNullOrWhiteSpace.FormatForUser(nameof(serviceBusPluginName)));
}
if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPluginName))
{
var plugin = this.RegisteredPlugins.First(p => p.Name == serviceBusPluginName);
this.RegisteredPlugins.Remove(plugin);
}
}
}
}

Просмотреть файл

@ -104,6 +104,7 @@ namespace Microsoft.Azure.ServiceBus
MessagingEventSource.Log.SubscriptionClientCreateStart(serviceBusConnection?.Endpoint.Authority, topicPath, subscriptionName, receiveMode.ToString());
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.OperationTimeout = this.ServiceBusConnection.OperationTimeout;
this.syncLock = new object();
this.TopicPath = topicPath;
this.SubscriptionName = subscriptionName;
@ -212,7 +213,8 @@ namespace Microsoft.Azure.ServiceBus
this.PrefetchCount,
this.ServiceBusConnection,
this.CbsTokenProvider,
this.RetryPolicy);
this.RetryPolicy,
this.RegisteredPlugins);
}
}
}

Просмотреть файл

@ -74,6 +74,7 @@ namespace Microsoft.Azure.ServiceBus
MessagingEventSource.Log.TopicClientCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath);
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.OperationTimeout = this.ServiceBusConnection.OperationTimeout;
this.syncLock = new object();
this.TopicName = entityPath;
this.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(

Просмотреть файл

@ -220,7 +220,7 @@ namespace Microsoft.Azure.ServiceBus
public override string Message { get; }
public string ServiceBusNamespace { get; }
}
public sealed class SessionClient : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.ISessionClient
public sealed class SessionClient : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.ISessionClient, Microsoft.Azure.ServiceBus.IClientEntity
{
public SessionClient(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
public SessionClient(string connectionString, string entityPath, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
@ -350,7 +350,7 @@ namespace Microsoft.Azure.ServiceBus.Core
System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message);
System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message> messageList);
}
public interface ISessionClient
public interface ISessionClient : Microsoft.Azure.ServiceBus.IClientEntity
{
string EntityPath { get; }
System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.IMessageSession> AcceptMessageSessionAsync();

Просмотреть файл

@ -83,7 +83,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
try
{
Stopwatch stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed.TotalSeconds <= 5)
while (stopwatch.Elapsed.TotalSeconds <= 10)
{
if (exceptionReceivedHandlerCalled)
{

Просмотреть файл

@ -98,9 +98,12 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
};
await messageSender.SendAsync(sendMessage);
var receivedMessage = await messageReceiver.ReceiveAsync(1, TimeSpan.FromMinutes(1));
// Ensure the plugin is called.
Assert.True(sendReceivePlugin.MessageBodies.ContainsKey(sendMessage.MessageId));
Assert.Equal(sendMessage.Body, receivedMessage.First().Body);
var receivedMessage = await messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1));
Assert.Equal(sendMessage.Body, receivedMessage.Body);
}
finally
@ -152,6 +155,56 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
await messageReceiver.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
async Task QueueClientShouldPassPluginsToMessageSession()
{
var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, TestConstants.SessionNonPartitionedQueueName);
try
{
bool messageReceived = false;
var sendReceivePlugin = new SendReceivePlugin();
queueClient.RegisterPlugin(sendReceivePlugin);
var sendMessage = new Message(Encoding.UTF8.GetBytes("Test message"))
{
MessageId = Guid.NewGuid().ToString(),
SessionId = Guid.NewGuid().ToString()
};
await queueClient.SendAsync(sendMessage);
// Ensure the plugin is called.
Assert.True(sendReceivePlugin.MessageBodies.ContainsKey(sendMessage.MessageId));
queueClient.RegisterSessionHandler(
(session, message, cancellationToken) =>
{
Assert.Equal(sendMessage.SessionId, session.SessionId);
Assert.True(session.RegisteredPlugins.Contains(sendReceivePlugin));
Assert.Equal(sendMessage.Body, message.Body);
messageReceived = true;
return Task.CompletedTask;
},
exceptionArgs => Task.CompletedTask);
for (int i = 0; i < 20; i++)
{
if (messageReceived)
{
break;
}
await Task.Delay(TimeSpan.FromSeconds(2));
}
Assert.True(messageReceived);
}
finally
{
await queueClient.CloseAsync();
}
}
}
internal class FirstSendPlugin : ServiceBusPlugin
@ -181,7 +234,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
internal class SendReceivePlugin : ServiceBusPlugin
{
// Null the body on send, and replace it when received.
Dictionary<string, byte[]> MessageBodies = new Dictionary<string,byte[]>();
public Dictionary<string, byte[]> MessageBodies = new Dictionary<string,byte[]>();
public override string Name => nameof(SendReceivePlugin);