Implementing Plugins for EventHub (#324)

* Implement Plugin to Process each event when client is sending telemetry

*  Microsoft copyright header

* Fix Typo

* Changes for https://github.com/Azure/azure-event-hubs-dotnet/pull/324#pullrequestreview-162989181

* Implement AfterEventsReceive for EventHubsPlugin

* Implement Plugin Tests

* Sort usings

* changes for https://github.com/Azure/azure-event-hubs-dotnet/pull/324#issuecomment-428688265

* Fix Resources

* Changes for https://github.com/Azure/azure-event-hubs-dotnet/pull/324#pullrequestreview-163616505

* Change for https://github.com/Azure/azure-event-hubs-dotnet/pull/324#issuecomment-429417199

* Move Using to Namespace block
This commit is contained in:
David Revoledo 2018-10-15 21:00:38 +00:00 коммит произвёл Serkant Karaca
Родитель 61bdc55a5b
Коммит 9c6da772dd
9 изменённых файлов: 352 добавлений и 5 удалений

Просмотреть файл

@ -93,7 +93,7 @@ namespace Microsoft.Azure.EventHubs.Processor
receiverOptions);
this.partitionReceiver.PrefetchCount = this.Host.EventProcessorOptions.PrefetchCount;
ProcessorEventSource.Log.PartitionPumpCreateClientsStop(this.Host.HostName, this.PartitionContext.PartitionId);
}

Просмотреть файл

@ -3,6 +3,7 @@
namespace Microsoft.Azure.EventHubs
{
using System;
using System.Diagnostics.Tracing;
/// <summary>

Просмотреть файл

@ -0,0 +1,35 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.EventHubs.Core
{
using System.Collections.Generic;
using System.Threading.Tasks;
/// <summary>
/// This class provides methods that can be overridden to manipulate messages for custom plugin functionality.
/// </summary>
public abstract class EventHubsPlugin
{
/// <summary>
/// Gets the name of the <see cref="EventHubsPlugin" />.
/// </summary>
/// <remarks>This name is used to identify the plugin, and prevent a plugin from being registered multiple times.</remarks>
public abstract string Name { get; }
/// <summary>
/// Determines whether or an exception in the plugin should prevent a send or receive operation.
/// </summary>
public virtual bool ShouldContinueOnException => false;
/// <summary>
/// This operation is called before an event is sent.
/// </summary>
/// <param name="eventData">The <see cref="EventData" /> to be modified by the plugin</param>
/// <returns>The modified event <see cref="EventData" /></returns>
public virtual Task<EventData> BeforeEventSend(EventData eventData)
{
return Task.FromResult(eventData);
}
}
}

Просмотреть файл

@ -3,6 +3,7 @@
namespace Microsoft.Azure.EventHubs
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
@ -21,9 +22,12 @@ namespace Microsoft.Azure.EventHubs
protected string PartitionId { get; }
public Task SendAsync(IEnumerable<EventData> eventDatas, string partitionKey)
public async Task SendAsync(IEnumerable<EventData> eventDatas, string partitionKey)
{
return this.OnSendAsync(eventDatas, partitionKey);
var processedEvents = await this.ProcessEvents(eventDatas).ConfigureAwait(false);
await this.OnSendAsync(processedEvents, partitionKey)
.ConfigureAwait(false);
}
protected abstract Task OnSendAsync(IEnumerable<EventData> eventDatas, string partitionKey);
@ -40,6 +44,51 @@ namespace Microsoft.Azure.EventHubs
return count;
}
async Task<EventData> ProcessEvent(EventData eventData)
{
if (this.RegisteredPlugins == null || this.RegisteredPlugins.Count == 0)
return eventData;
var processedEvent = eventData;
foreach (var plugin in this.RegisteredPlugins.Values)
{
try
{
EventHubsEventSource.Log.PluginCallStarted(plugin.Name, ClientId);
processedEvent = await plugin.BeforeEventSend(eventData).ConfigureAwait(false);
EventHubsEventSource.Log.PluginCallCompleted(plugin.Name, ClientId);
}
catch (Exception ex)
{
EventHubsEventSource.Log.PluginCallFailed(plugin.Name, ClientId, ex);
if (!plugin.ShouldContinueOnException)
{
throw;
}
}
}
return processedEvent;
}
async Task<IEnumerable<EventData>> ProcessEvents(IEnumerable<EventData> eventDatas)
{
if (this.RegisteredPlugins.Count < 1)
{
return eventDatas;
}
var processedEventList = new List<EventData>();
foreach (var eventData in eventDatas)
{
var processedMessage = await this.ProcessEvent(eventData)
.ConfigureAwait(false);
processedEventList.Add(processedMessage);
}
return processedEventList;
}
internal long MaxMessageSize
{
get;

Просмотреть файл

@ -3,6 +3,7 @@
namespace Microsoft.Azure.EventHubs
{
using System;
using System.Diagnostics.Tracing;
/// <summary>
@ -209,6 +210,36 @@ namespace Microsoft.Azure.EventHubs
}
}
//
// 100-120 reserved for Plugins traces
//
[Event(100, Level = EventLevel.Verbose, Message = "User plugin {0} called on client {1}")]
public void PluginCallStarted(string pluginName, string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(100, pluginName, clientId);
}
}
[Event(101, Level = EventLevel.Verbose, Message = "User plugin {0} completed on client {1}")]
public void PluginCallCompleted(string pluginName, string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(101, pluginName, clientId);
}
}
[Event(102, Level = EventLevel.Error, Message = "Exception during {0} plugin execution. clientId: {1}, Exception {2}")]
public void PluginCallFailed(string pluginName, string clientId, Exception exception)
{
if (this.IsEnabled())
{
this.WriteEvent(102, pluginName, clientId, exception);
}
}
// TODO: Add Keywords if desired.
//public class Keywords // This is a bitvector
//{

Просмотреть файл

@ -1,11 +1,15 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs.Core;
/// <summary>
/// Contract for all client entities with Open-Close/Abort state m/c
@ -14,7 +18,6 @@ namespace Microsoft.Azure.EventHubs
public abstract class ClientEntity
{
static int nextId;
RetryPolicy retryPolicy;
/// <summary></summary>
@ -33,7 +36,13 @@ namespace Microsoft.Azure.EventHubs
}
/// <summary>
/// Gets the <see cref="RetryPolicy.RetryPolicy"/> for the ClientEntity.
/// Gets a list of currently registered plugins for this Client.
/// </summary>
public virtual ConcurrentDictionary<string, EventHubsPlugin> RegisteredPlugins { get; }
= new ConcurrentDictionary<string, EventHubsPlugin>();
/// <summary>
/// Gets the <see cref="EventHubs.RetryPolicy"/> for the ClientEntity.
/// </summary>
public RetryPolicy RetryPolicy
{
@ -55,6 +64,43 @@ namespace Microsoft.Azure.EventHubs
/// <returns>The asynchronous operation</returns>
public abstract Task CloseAsync();
/// <summary>
/// Registers a <see cref="EventHubsPlugin"/> to be used with this client.
/// </summary>
public virtual void RegisterPlugin(EventHubsPlugin eventHubsPlugin)
{
if (eventHubsPlugin == null)
{
throw new ArgumentNullException(nameof(eventHubsPlugin), Resources.ArgumentNullOrWhiteSpace.FormatForUser(nameof(eventHubsPlugin)));
}
if (this.RegisteredPlugins.Any(p => p.Value.Name == eventHubsPlugin.Name))
{
throw new ArgumentException(eventHubsPlugin.Name, Resources.PluginAlreadyRegistered.FormatForUser(eventHubsPlugin.Name));
}
if (!this.RegisteredPlugins.TryAdd(eventHubsPlugin.Name, eventHubsPlugin))
{
throw new ArgumentException(eventHubsPlugin.Name, Resources.PluginRegistrationFailed.FormatForUser(eventHubsPlugin.Name));
}
}
/// <summary>
/// Unregisters a <see cref="EventHubsPlugin"/>.
/// </summary>
/// <param name="pluginName">The <see cref="EventHubsPlugin.Name"/> of the plugin to be unregistered.</param>
public virtual void UnregisterPlugin(string pluginName)
{
if (this.RegisteredPlugins == null)
{
return;
}
if (string.IsNullOrWhiteSpace(pluginName))
{
throw new ArgumentNullException(nameof(pluginName), Resources.ArgumentNullOrWhiteSpace.FormatForUser(nameof(pluginName)));
}
this.RegisteredPlugins.TryRemove(pluginName, out EventHubsPlugin plugin);
}
/// <summary>
/// Closes the ClientEntity.
/// </summary>

19
src/Microsoft.Azure.EventHubs/Resources.Designer.cs сгенерированный
Просмотреть файл

@ -12,6 +12,7 @@ namespace Microsoft.Azure.EventHubs {
using System;
using System.Reflection;
/// <summary>
/// A strongly-typed resource class, for looking up localized strings, etc.
/// </summary>
@ -150,6 +151,24 @@ namespace Microsoft.Azure.EventHubs {
}
}
/// <summary>
/// Looks up a localized string similar to The {0} plugin has already been registered..
/// </summary>
internal static string PluginAlreadyRegistered {
get {
return ResourceManager.GetString("PluginAlreadyRegistered", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to There was an error trying to register the {0} plugin..
/// </summary>
internal static string PluginRegistrationFailed {
get {
return ResourceManager.GetString("PluginRegistrationFailed", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to The &apos;identifier&apos; parameter exceeds the maximum allowed size of {0} characters..
/// </summary>

Просмотреть файл

@ -168,4 +168,10 @@
<data name="ValueOutOfRange" xml:space="preserve">
<value>The value supplied must be between {0} and {1}.</value>
</data>
<data name="PluginAlreadyRegistered" xml:space="preserve">
<value>The {0} plugin has already been registered.</value>
</data>
<data name="PluginRegistrationFailed" xml:space="preserve">
<value>There was an error trying to register the {0} plugin.</value>
</data>
</root>

Просмотреть файл

@ -0,0 +1,160 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.EventHubs.Tests.Client
{
using System;
using System.Threading.Tasks;
using System.Text;
using Microsoft.Azure.EventHubs.Core;
using Xunit;
public class PluginTests
{
protected EventHubClient EventHubClient;
[Fact]
[DisplayTestMethodName]
Task Registering_plugin_multiple_times_should_throw()
{
this.EventHubClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
var firstPlugin = new FirstSendPlugin();
var secondPlugin = new FirstSendPlugin();
this.EventHubClient.RegisterPlugin(firstPlugin);
Assert.Throws<ArgumentException>(() => EventHubClient.RegisterPlugin(secondPlugin));
return EventHubClient.CloseAsync();
}
[Fact]
[DisplayTestMethodName]
Task Unregistering_plugin_should_complete_with_plugin_set()
{
this.EventHubClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
var firstPlugin = new FirstSendPlugin();
this.EventHubClient.RegisterPlugin(firstPlugin);
this.EventHubClient.UnregisterPlugin(firstPlugin.Name);
return this.EventHubClient.CloseAsync();
}
[Fact]
[DisplayTestMethodName]
Task Unregistering_plugin_should_complete_without_plugin_set()
{
this.EventHubClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
this.EventHubClient.UnregisterPlugin("Non-existant plugin");
return this.EventHubClient.CloseAsync();
}
[Fact]
[DisplayTestMethodName]
async Task Multiple_plugins_should_run_in_order()
{
this.EventHubClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
try
{
var firstPlugin = new FirstSendPlugin();
var secondPlugin = new SecondSendPlugin();
this.EventHubClient.RegisterPlugin(firstPlugin);
this.EventHubClient.RegisterPlugin(secondPlugin);
var testEvent = new EventData(Encoding.UTF8.GetBytes("Test message"));
await this.EventHubClient.SendAsync(testEvent);
// BeforeEventSend for Plugin2 should break is 1 was not called
}
finally
{
await this.EventHubClient.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
async Task Plugin_without_ShouldContinueOnException_should_throw()
{
this.EventHubClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
try
{
var plugin = new ExceptionPlugin();
this.EventHubClient.RegisterPlugin(plugin);
var testEvent = new EventData(Encoding.UTF8.GetBytes("Test message"));
await this.EventHubClient.SendAsync(testEvent);
await Assert.ThrowsAsync<NotImplementedException>(() => this.EventHubClient.SendAsync(testEvent));
}
finally
{
await this.EventHubClient.CloseAsync();
}
}
[Fact]
[DisplayTestMethodName]
async Task Plugin_with_ShouldContinueOnException_should_continue()
{
this.EventHubClient = EventHubClient.CreateFromConnectionString(TestUtility.EventHubsConnectionString);
try
{
var plugin = new ShouldCompleteAnywayExceptionPlugin();
this.EventHubClient.RegisterPlugin(plugin);
var testEvent = new EventData(Encoding.UTF8.GetBytes("Test message"));
await this.EventHubClient.SendAsync(testEvent);
}
finally
{
await this.EventHubClient.CloseAsync();
}
}
}
internal class FirstSendPlugin : EventHubsPlugin
{
public override string Name => nameof(FirstSendPlugin);
public override Task<EventData> BeforeEventSend(EventData eventData)
{
eventData.Properties.Add("FirstSendPlugin", true);
return Task.FromResult(eventData);
}
}
internal class SecondSendPlugin : EventHubsPlugin
{
public override string Name => nameof(SecondSendPlugin);
public override Task<EventData> BeforeEventSend(EventData eventData)
{
Assert.True((bool)eventData.Properties["FirstSendPlugin"]);
eventData.Properties.Add("SecondSendPlugin", true);
return Task.FromResult(eventData);
}
}
internal class ExceptionPlugin : EventHubsPlugin
{
public override string Name => nameof(ExceptionPlugin);
public override Task<EventData> BeforeEventSend(EventData eventData)
{
throw new NotImplementedException();
}
}
internal class ShouldCompleteAnywayExceptionPlugin : EventHubsPlugin
{
public override bool ShouldContinueOnException => true;
public override string Name => nameof(ShouldCompleteAnywayExceptionPlugin);
public override Task<EventData> BeforeEventSend(EventData eventData)
{
throw new NotImplementedException();
}
}
}