Microsoft-Message-Bridge/Messaging/MessageBus.cs

285 строки
11 KiB
C#
Исходник Постоянная ссылка Обычный вид История

// --------------------------------------------------------------------------------------------------------------------
// <copyright file="MessageBus.cs" company="Microsoft Corporation">
// Copyright 2015 Microsoft Corporation. All rights reserved.
// </copyright>
// --------------------------------------------------------------------------------------------------------------------
namespace Microsoft.MessageBridge.Messaging
{
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.MessageBridge.Monitoring;
/// <summary>
/// <para>
/// A message bus implementation for sending and receiving messages.
/// </para>
/// <para>
/// A message bus is a facility that provides entities for publishing and subscribing to messages. A message bus
/// entity directs an homogenous set of message data. It is not required that all messages contain the identical
/// properties, but they represent the same object. This means that property A if present has the same meaning as
/// property A in all other messages on the entity. A message bus entity is referred to by name. It is equivalent
/// to a queue, topic, or hub in most systems.
/// </para>
/// <para>
/// Messages include a JSON-encoded message body along with an optional set of properties, a partition key, and
/// a message instance key.
/// </para>
/// </summary>
public class MessageBus : IMessageBus
{
#region Constructors and Destructors
/// <summary>
/// Initializes a new instance of the <see cref="MessageBus"/> class.
/// </summary>
/// <param name="description">
/// The message bus description.
/// </param>
public MessageBus(MessageBusDescription description)
{
this.Description = description;
this.Publishers = new ConcurrentDictionary<string, IPublisher>();
this.Subscribers = new ConcurrentDictionary<string, ISubscriber>();
}
#endregion
#region Properties
/// <summary>
/// Gets or sets the message bus description. This is used for connecting to the bus.
/// </summary>
private MessageBusDescription Description { get; set; }
/// <summary>
/// Gets or sets the publishers. There will be only one publisher per entity per bus instance.
/// </summary>
private ConcurrentDictionary<string, IPublisher> Publishers { get; set; }
/// <summary>
/// Gets or sets the subscribers. There will be only one subscription per entity per bus instance.
/// </summary>
private ConcurrentDictionary<string, ISubscriber> Subscribers { get; set; }
#endregion
#region Public Methods and Operators
/// <summary>
/// Closes the message bus instance asynchronously.
/// </summary>
/// <returns>
/// A <see cref="Task" /> representing the close operation.
/// </returns>
/// <exception cref="MessageBusException">Occurs when either a publisher or subscriber fails to close properly.</exception>
public async Task CloseAsync()
{
using (ActivityMonitor.Instance.MessageBusClose(this))
{
try
{
await Task.WhenAll(this.CloseSubscribersAsync(), this.ClosePublishersAsync());
}
catch (Exception e)
{
ActivityMonitor.Instance.ReportMessageBusException(this, e, false);
throw new MessageBusException(e.Message, e);
}
}
}
/// <summary>
/// Registers the message bus subscription handler asynchronously.
/// </summary>
/// <param name="entity">
/// The entity.
/// </param>
/// <param name="name">
/// The name.
/// </param>
/// <param name="handler">
/// The handler.
/// </param>
/// <exception cref="System.ArgumentNullException">
/// Occurs when the handler method is null.
/// </exception>
/// <exception cref="System.ArgumentException">
/// Occurs when the message bus entity name is null or whitespace.
/// </exception>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
public async Task RegisterHandlerAsync(string entity, string name, Func<IMessage, Task> handler)
{
using (ActivityMonitor.Instance.MessageBusRegisterHandler(this, entity, name))
{
if (string.IsNullOrWhiteSpace(entity))
{
var exception = new ArgumentException("The entity name cannot be null or empty.", entity);
ActivityMonitor.Instance.ReportMessageBusException(this, exception, false);
throw exception;
}
if (string.IsNullOrWhiteSpace(name))
{
var exception = new ArgumentException("The subscription name cannot be null or empty.", name);
ActivityMonitor.Instance.ReportMessageBusException(this, exception, false);
throw exception;
}
if (handler == null)
{
var exception = new ArgumentNullException("handler");
ActivityMonitor.Instance.ReportMessageBusException(this, exception, false);
throw exception;
}
var subscriber = this.Description.Factory.CreateSubscriber();
if (this.Subscribers.TryAdd(entity, subscriber))
{
var cs = this.Description.ConnectionString;
try
{
await
subscriber.InitializeAsync(
new SubscriberDescription
{
ConnectionString = cs,
Entity = entity,
Name = name,
StorageConnectionString =
this.Description.StorageConnectionString
},
handler);
}
catch (Exception e)
{
ActivityMonitor.Instance.ReportMessageBusException(this, e, false);
throw new MessageBusException(e.Message, e);
}
}
}
}
/// <summary>
/// Sends the message to the message bus entity asynchronously.
/// </summary>
/// <param name="entity">
/// The message bus entity.
/// </param>
/// <param name="message">
/// The message.
/// </param>
/// <returns>
/// The <see cref="Task"/> representing the send operation.
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// Occurs when the message argument is null.
/// </exception>
/// <exception cref="System.ArgumentException">
/// Occurs when the message bus entity name is null or whitespace.
/// </exception>
/// <exception cref="MessageBusException">
/// Occurs when either the publisher cannot be initialized or the publisher fails to
/// send the message.
/// </exception>
public async Task SendAsync(string entity, IMessage message)
{
using (ActivityMonitor.Instance.MessageBusSend(this, message, entity))
{
if (string.IsNullOrWhiteSpace(entity))
{
var exception = new ArgumentException("The entity name cannot be null or empty.", entity);
ActivityMonitor.Instance.ReportMessageBusException(this, exception, false);
throw exception;
}
if (message == null)
{
var exception = new ArgumentNullException("message");
ActivityMonitor.Instance.ReportMessageBusException(this, exception, false);
throw exception;
}
try
{
var publisher = this.GetPublisher(entity);
await publisher.SendAsync(message);
}
catch (Exception e)
{
ActivityMonitor.Instance.ReportMessageBusException(this, e, false);
throw new MessageBusException(e.Message, e);
}
}
}
#endregion
#region Methods
/// <summary>
/// Closes the publishers asynchronously.
/// </summary>
/// <returns>A <see cref="Task" /> representing the close operation.</returns>
private async Task ClosePublishersAsync()
{
foreach (var publishers in this.Publishers.Values)
{
await publishers.CloseAsync();
}
this.Publishers.Clear();
}
/// <summary>
/// Closes the subscribers asynchronously.
/// </summary>
/// <returns>A <see cref="Task" /> representing the close operation.</returns>
private async Task CloseSubscribersAsync()
{
foreach (var subscriber in this.Subscribers.Values)
{
await subscriber.CloseAsync();
}
this.Subscribers.Clear();
}
/// <summary>
/// Gets the publisher for the message bus entity asynchronously.
/// </summary>
/// <param name="entity">
/// The entity.
/// </param>
/// <returns>
/// The <see cref="IPublisher"/>
/// </returns>
private IPublisher GetPublisher(string entity)
{
return this.Publishers.GetOrAdd(
entity,
s =>
{
var publisher = this.Description.Factory.CreatePublisher();
var cs = this.Description.ConnectionString;
var cert = this.Description.Certificate;
var description = new PublisherDescription
{
ConnectionString = cs,
Certificate = cert,
Entity = entity
};
publisher.InitializeAsync(description).Wait();
return publisher;
});
}
#endregion
}
}