ServiceFabricProcessor preview (#262)

This is the code that built and released as preview version 0.5.2 https://www.nuget.org/packages/Microsoft.Azure.EventHubs.ServiceFabricProcessor/0.5.2

At the time it couldn't be merged with dev due to test issues from unrelated work, so we did the release from the SFprocessor branch. Those issues have been resolved, and we expect that future preview releases will come from dev branch.
This commit is contained in:
JamesBirdsall 2019-02-14 10:39:17 -08:00 коммит произвёл GitHub
Родитель 4ce1613cab
Коммит 321f188448
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
17 изменённых файлов: 1951 добавлений и 1 удалений

Просмотреть файл

@ -18,6 +18,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.EventHubs.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.EventHubs.Processor", "src\Microsoft.Azure.EventHubs.Processor\Microsoft.Azure.EventHubs.Processor.csproj", "{8C89967A-4E1F-46B0-8458-81B545C822B2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.EventHubs.ServiceFabricProcessor", "src\Microsoft.Azure.EventHubs.ServiceFabricProcessor\Microsoft.Azure.EventHubs.ServiceFabricProcessor.csproj", "{D96BCC8F-D5EE-464A-9C15-EF59613F9F82}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -36,6 +38,10 @@ Global
{8C89967A-4E1F-46B0-8458-81B545C822B2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8C89967A-4E1F-46B0-8458-81B545C822B2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8C89967A-4E1F-46B0-8458-81B545C822B2}.Release|Any CPU.Build.0 = Release|Any CPU
{D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -44,5 +50,6 @@ Global
{126D946D-CE0F-4F14-9F13-8FD7098B81D8} = {08E028C1-29E7-42B5-871F-C911DB93E78A}
{154F7B4C-B998-4FA0-933F-F34DB0CA9B88} = {AF49C862-CB78-4110-A275-8111B387805D}
{8C89967A-4E1F-46B0-8458-81B545C822B2} = {08E028C1-29E7-42B5-871F-C911DB93E78A}
{D96BCC8F-D5EE-464A-9C15-EF59613F9F82} = {08E028C1-29E7-42B5-871F-C911DB93E78A}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -0,0 +1,151 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
using System.Collections.Generic;
/// <summary>
/// A persistable representation of what events in the stream have been processed.
/// Version 1 checkpoint is just a high-water mark, containing an offset and sequence number. All events at or lower than the given position
/// have been processed. Any events higher than the given position are unprocessed.
/// </summary>
public class Checkpoint
{
/// <summary>
/// Create an uninitialized checkpoint of the given version.
/// </summary>
/// <param name="version"></param>
internal Checkpoint(int version)
{
this.Version = version;
this.Valid = false;
}
/// <summary>
/// Create an initialized version 1 checkpoint.
/// </summary>
/// <param name="offset">Offset of highest-processed position.</param>
/// <param name="sequenceNumber">Sequence number of highest-processed position.</param>
public Checkpoint(string offset, long sequenceNumber)
{
this.Version = 1;
this.Offset = offset;
this.SequenceNumber = sequenceNumber;
this.Valid = true;
}
#region AllVersions
//
// Methods and properties valid for all versions.
//
/// <summary>
/// Version of this checkpoint.
/// </summary>
public int Version { get; protected set; }
/// <summary>
/// True if this checkpoint contains a valid position.
/// </summary>
public bool Valid { get; protected set; }
/// <summary>
/// Serialize this instance to a persistable representation as a name-value dictionary.
/// </summary>
/// <returns>Serialized dictionary representation.</returns>
public Dictionary<string, object> ToDictionary()
{
Dictionary<string, object> converted = new Dictionary<string, object>();
converted.Add(Constants.CheckpointPropertyVersion, this.Version);
converted.Add(Constants.CheckpointPropertyValid, this.Valid);
switch (this.Version)
{
case 1:
converted.Add(Constants.CheckpointPropertyOffsetV1, this.Offset);
converted.Add(Constants.CheckpointPropertySequenceNumberV1, this.SequenceNumber);
break;
default:
throw new NotImplementedException();
}
return converted;
}
/// <summary>
/// Deserialize from a name-value dictionary.
/// </summary>
/// <param name="dictionary">Serialized representation.</param>
/// <returns>Deserialized instance.</returns>
static public Checkpoint CreateFromDictionary(Dictionary<string, object> dictionary)
{
int version = (int)dictionary[Constants.CheckpointPropertyVersion];
bool valid = (bool)dictionary[Constants.CheckpointPropertyValid];
Checkpoint result = new Checkpoint(version);
if (valid)
{
result.Valid = true;
switch (result.Version)
{
case 1:
result.Offset = (string)dictionary[Constants.CheckpointPropertyOffsetV1];
result.SequenceNumber = (long)dictionary[Constants.CheckpointPropertySequenceNumberV1];
break;
default:
throw new NotImplementedException($"Unrecognized checkpoint version {result.Version}");
}
}
return result;
}
#endregion AllVersions
#region Version1
//
// Methods and properties for Version==1
//
/// <summary>
/// Initialize an uninitialized instance as a version 1 checkpoint.
/// </summary>
/// <param name="offset">Offset of highest-processed position.</param>
/// <param name="sequenceNumber">Sequence number of highest-processed position.</param>
public void InitializeV1(string offset, long sequenceNumber)
{
this.Version = 1;
if (string.IsNullOrEmpty(offset))
{
throw new ArgumentException("offset must not be null or empty");
}
if (sequenceNumber < 0)
{
throw new ArgumentException("sequenceNumber must be >= 0");
}
this.Offset = offset;
this.SequenceNumber = sequenceNumber;
this.Valid = true;
}
/// <summary>
/// Offset of highest-processed position. Immutable after construction or initialization.
/// </summary>
public string Offset { get; private set; }
/// <summary>
/// Sequence number of highest-processed position. Immutable after construction or initialization.
/// </summary>
public long SequenceNumber { get; private set; }
#endregion Version1
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
/// <summary>
/// Why the event processor is being shut down.
/// </summary>
public enum CloseReason
{
/// <summary>
/// It was cancelled by Service Fabric.
/// </summary>
Cancelled,
/// <summary>
/// There was an event hubs failure.
/// </summary>
Failure
}
}

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
class Constants
{
internal static readonly int RetryCount = 5;
internal static readonly int FixedReceiverEpoch = 0;
internal static readonly TimeSpan MetricReportingInterval = TimeSpan.FromMinutes(1.0);
internal static readonly string DefaultUserLoadMetricName = "CountOfPartitions";
internal static readonly TimeSpan ReliableDictionaryTimeout = TimeSpan.FromSeconds(10.0); // arbitrary
internal static readonly string CheckpointDictionaryName = "EventProcessorCheckpointDictionary";
internal static readonly string CheckpointPropertyVersion = "version";
internal static readonly string CheckpointPropertyValid = "valid";
internal static readonly string CheckpointPropertyOffsetV1 = "offsetV1";
internal static readonly string CheckpointPropertySequenceNumberV1 = "sequenceNumberV1";
}
}

Просмотреть файл

@ -0,0 +1,209 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Mocks for the underlying event hub client. Using these instead of the regular wrappers allows unit testing without an event hub.
/// By default, EventProcessorService.EventHubClientFactory is a EventHubWrappers.EventHubClientFactory.
/// To use the mocks, change it to a EventHubMocks.EventHubClientFactoryMock.
/// </summary>
public class EventHubMocks
{
/// <summary>
/// Mock of an Event Hub partition receiver.
/// </summary>
public class PartitionReceiverMock : EventHubWrappers.IPartitionReceiver
{
private readonly string partitionId;
private long sequenceNumber;
private IPartitionReceiveHandler outerHandler;
private bool invokeWhenNoEvents;
private readonly CancellationToken token;
/// <summary>
/// Construct the partition receiver mock.
/// </summary>
/// <param name="partitionId"></param>
/// <param name="sequenceNumber"></param>
/// <param name="token"></param>
public PartitionReceiverMock(string partitionId, long sequenceNumber, CancellationToken token)
{
this.partitionId = partitionId;
this.sequenceNumber = sequenceNumber;
this.token = token;
}
/// <summary>
/// Receive mock events.
/// </summary>
/// <param name="maxEventCount"></param>
/// <param name="waitTime"></param>
/// <returns></returns>
public Task<IEnumerable<EventData>> ReceiveAsync(int maxEventCount, TimeSpan waitTime)
{
List<EventData> events = new List<EventData>();
for (int i = 0; i < maxEventCount; i++)
{
this.sequenceNumber++;
byte[] body = new byte[] { 0x4D, 0x4F, 0x43, 0x4B, 0x42, 0x4F, 0x44, 0x59 }; // M O C K B O D Y
EventData e = new EventData(body);
// TODO -- need a way to set the system properties of the EventData
//e.ForceSystemProperties(new EventData.SystemPropertiesCollection(this.sequenceNumber, DateTime.UtcNow, (this.sequenceNumber * 100).ToString(), ""));
e.Properties.Add("userkey", "uservalue");
events.Add(e);
}
Thread.Sleep(5000);
EventProcessorEventSource.Current.Message($"MOCK ReceiveAsync returning {maxEventCount} events for partition {this.partitionId} ending at {this.sequenceNumber}");
return Task.FromResult<IEnumerable<EventData>>(events);
}
/// <summary>
/// Set a mock receive handler.
/// </summary>
/// <param name="receiveHandler"></param>
/// <param name="invokeWhenNoEvents"></param>
public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = false)
{
EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.SetReceiveHandler");
this.outerHandler = receiveHandler;
this.invokeWhenNoEvents = invokeWhenNoEvents; // TODO mock does not emulate receive timeouts
if (this.outerHandler != null)
{
Task.Run(() => GenerateMessages());
}
}
/// <summary>
/// Close the mock receiver.
/// </summary>
/// <returns></returns>
public Task CloseAsync()
{
EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.CloseAsync");
return Task.CompletedTask;
}
private async void GenerateMessages()
{
while ((!this.token.IsCancellationRequested) && (this.outerHandler != null))
{
EventProcessorEventSource.Current.Message("MOCK Generating messages and sending to handler");
IEnumerable<EventData> events = ReceiveAsync(10, TimeSpan.FromSeconds(10.0)).Result; // TODO get count from somewhere real
IPartitionReceiveHandler capturedHandler = this.outerHandler;
if (capturedHandler != null)
{
await capturedHandler.ProcessEventsAsync(events);
}
}
EventProcessorEventSource.Current.Message("MOCK Message generation ending");
}
}
/// <summary>
/// Mock of EventHubClient class.
/// </summary>
public class EventHubClientMock : EventHubWrappers.IEventHubClient
{
private readonly int partitionCount;
private readonly EventHubsConnectionStringBuilder csb;
private CancellationToken token = new CancellationToken();
/// <summary>
/// Construct the mock.
/// </summary>
/// <param name="partitionCount"></param>
/// <param name="csb"></param>
public EventHubClientMock(int partitionCount, EventHubsConnectionStringBuilder csb)
{
this.partitionCount = partitionCount;
this.csb = csb;
}
internal void SetCancellationToken(CancellationToken t)
{
this.token = t;
}
/// <summary>
/// Get runtime info of the fake event hub.
/// </summary>
/// <returns></returns>
public Task<EventHubRuntimeInformation> GetRuntimeInformationAsync()
{
EventHubRuntimeInformation ehri = new EventHubRuntimeInformation();
ehri.PartitionCount = this.partitionCount;
ehri.PartitionIds = new string[this.partitionCount];
for (int i = 0; i < this.partitionCount; i++)
{
ehri.PartitionIds[i] = i.ToString();
}
ehri.Path = csb.EntityPath;
EventProcessorEventSource.Current.Message($"MOCK GetRuntimeInformationAsync for {ehri.Path}");
return Task.FromResult<EventHubRuntimeInformation>(ehri);
}
/// <summary>
/// Create a mock receiver on the fake event hub.
/// </summary>
/// <param name="consumerGroupName"></param>
/// <param name="partitionId"></param>
/// <param name="eventPosition"></param>
/// <param name="offset"></param>
/// <param name="epoch"></param>
/// <param name="receiverOptions"></param>
/// <returns></returns>
public EventHubWrappers.IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string partitionId, EventPosition eventPosition, string offset, long epoch, ReceiverOptions receiverOptions)
{
EventProcessorEventSource.Current.Message($"MOCK CreateEpochReceiver(CG {consumerGroupName}, part {partitionId}, offset {offset} epoch {epoch})");
// TODO implement epoch semantics
long startSeq = (offset != null) ? (long.Parse(offset) / 100L) : 0L;
return new PartitionReceiverMock(partitionId, startSeq, this.token);
}
/// <summary>
/// Close the mock EventHubClient.
/// </summary>
/// <returns></returns>
public Task CloseAsync()
{
EventProcessorEventSource.Current.Message("MOCK IEventHubClient.CloseAsync");
return Task.CompletedTask;
}
}
/// <summary>
/// An EventHubClient factory which dispenses mocks.
/// </summary>
public class EventHubClientFactoryMock : EventHubWrappers.IEventHubClientFactory
{
private readonly int partitionCount;
/// <summary>
/// Construct the mock factory.
/// </summary>
/// <param name="partitionCount"></param>
public EventHubClientFactoryMock(int partitionCount)
{
this.partitionCount = partitionCount;
}
/// <summary>
/// Dispense a mock instance operating on a fake event hub with name taken from the connection string.
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
public EventHubWrappers.IEventHubClient CreateFromConnectionString(string connectionString)
{
throw new NotImplementedException("Need a change to EventData before mocks can be supported");
//EventProcessorEventSource.Current.Message($"MOCK Creating IEventHubClient {connectionString} with {this.partitionCount} partitions");
//return new EventHubClientMock(this.partitionCount, new EventHubsConnectionStringBuilder(connectionString));
}
}
}
}

Просмотреть файл

@ -0,0 +1,140 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
/// <summary>
/// Wrappers for the underlying Event Hub client which allow mocking.
/// The interfaces include only the client functionality used by the Service Fabric Processor.
/// </summary>
public class EventHubWrappers
{
/// <summary>
/// Interface for a partition receiver.
/// </summary>
public interface IPartitionReceiver
{
/// <summary>
/// </summary>
/// <param name="maxEventCount"></param>
/// <param name="waitTime"></param>
/// <returns></returns>
Task<IEnumerable<EventData>> ReceiveAsync(int maxEventCount, TimeSpan waitTime);
/// <summary>
/// </summary>
/// <param name="receiveHandler"></param>
/// <param name="invokeWhenNoEvents"></param>
void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = false);
/// <summary>
/// </summary>
/// <returns></returns>
Task CloseAsync();
}
/// <summary>
/// Interface representing EventHubClient
/// </summary>
public interface IEventHubClient
{
/// <summary>
/// </summary>
/// <returns></returns>
Task<EventHubRuntimeInformation> GetRuntimeInformationAsync();
/// <summary>
/// </summary>
/// <param name="consumerGroupName"></param>
/// <param name="partitionId"></param>
/// <param name="eventPosition"></param>
/// <param name="offset">Only used by mocks</param>
/// <param name="epoch"></param>
/// <param name="receiverOptions"></param>
/// <returns></returns>
IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string partitionId, EventPosition eventPosition, string offset, long epoch, ReceiverOptions receiverOptions);
/// <summary>
/// </summary>
/// <returns></returns>
Task CloseAsync();
}
/// <summary>
/// Interface for an EventHubClient factory so that we can have factories which dispense different implementations of IEventHubClient.
/// </summary>
public interface IEventHubClientFactory
{
/// <summary>
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
IEventHubClient CreateFromConnectionString(string connectionString);
}
internal class PartitionReceiverWrapper : IPartitionReceiver
{
private readonly PartitionReceiver inner;
internal PartitionReceiverWrapper(PartitionReceiver receiver)
{
this.inner = receiver;
this.MaxBatchSize = 10; // TODO get this from somewhere real
}
public Task<IEnumerable<EventData>> ReceiveAsync(int maxEventCount, TimeSpan waitTime)
{
return this.inner.ReceiveAsync(maxEventCount, waitTime);
}
public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = false)
{
this.inner.SetReceiveHandler(receiveHandler, invokeWhenNoEvents);
}
public Task CloseAsync()
{
return this.inner.CloseAsync();
}
public int MaxBatchSize { get; set; }
}
internal class EventHubClientWrapper : IEventHubClient
{
private readonly EventHubClient inner;
internal EventHubClientWrapper(EventHubClient ehc)
{
this.inner = ehc;
}
public Task<EventHubRuntimeInformation> GetRuntimeInformationAsync()
{
return this.inner.GetRuntimeInformationAsync();
}
public IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string partitionId, EventPosition eventPosition, string offset, long epoch, ReceiverOptions receiverOptions)
{
return new PartitionReceiverWrapper(this.inner.CreateEpochReceiver(consumerGroupName, partitionId, eventPosition, epoch, receiverOptions));
}
public Task CloseAsync()
{
return this.inner.CloseAsync();
}
}
internal class EventHubClientFactory : IEventHubClientFactory
{
public IEventHubClient CreateFromConnectionString(string connectionString)
{
return new EventHubClientWrapper(EventHubClient.CreateFromConnectionString(connectionString));
}
}
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
/// <summary>
/// Exception thrown when the configuration of the service has a problem.
/// </summary>
public class EventProcessorConfigurationException : Exception
{
/// <summary>
/// Construct the exception.
/// </summary>
/// <param name="message"></param>
public EventProcessorConfigurationException(string message) : base(message)
{
}
}
}

Просмотреть файл

@ -0,0 +1,124 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
using System.Diagnostics.Tracing;
using System.Fabric;
[EventSource(Name = "Microsoft-Azure-EventHubs-ServiceFabricProcessor")]
internal sealed class EventProcessorEventSource : EventSource
{
public static readonly EventProcessorEventSource Current = new EventProcessorEventSource();
// Instance constructor is private to enforce singleton semantics
private EventProcessorEventSource() : base() { }
#region Keywords
// Event keywords can be used to categorize events.
// Each keyword is a bit flag. A single event can be associated with multiple keywords (via EventAttribute.Keywords property).
// Keywords must be defined as a public class named 'Keywords' inside EventSource that uses them.
public static class Keywords
{
public const EventKeywords Requests = (EventKeywords)0x1L;
public const EventKeywords ServiceInitialization = (EventKeywords)0x2L;
}
#endregion
#region Events
// Define an instance method for each event you want to record and apply an [Event] attribute to it.
// The method name is the name of the event.
// Pass any parameters you want to record with the event (only primitive integer types, DateTime, Guid & string are allowed).
// Each event method implementation should check whether the event source is enabled, and if it is, call WriteEvent() method to raise the event.
// The number and types of arguments passed to every event method must exactly match what is passed to WriteEvent().
// Put [NonEvent] attribute on all methods that do not define an event.
// For more information see https://msdn.microsoft.com/en-us/library/system.diagnostics.tracing.eventsource.aspx
[NonEvent]
public void Message(string message, params object[] args)
{
if (this.IsEnabled())
{
string finalMessage = string.Format(message, args);
Message(finalMessage);
}
}
private const int MessageEventId = 1;
[Event(MessageEventId, Level = EventLevel.Informational, Message = "{0}")]
public void Message(string message)
{
if (this.IsEnabled())
{
WriteEvent(MessageEventId, message);
}
}
[NonEvent]
public void ServiceMessage(StatefulServiceContext serviceContext, string message, params object[] args)
{
if (this.IsEnabled())
{
string finalMessage = string.Format(message, args);
ServiceMessage(
serviceContext.ServiceName.ToString(),
serviceContext.ServiceTypeName,
serviceContext.ReplicaId,
serviceContext.PartitionId,
serviceContext.CodePackageActivationContext.ApplicationName,
serviceContext.CodePackageActivationContext.ApplicationTypeName,
serviceContext.NodeContext.NodeName,
finalMessage);
}
}
private const int ServiceMessageEventId = 2;
[Event(ServiceMessageEventId, Level = EventLevel.Informational, Message = "{7}")]
private
void ServiceMessage(
string serviceName,
string serviceTypeName,
long replicaOrInstanceId,
Guid partitionId,
string applicationName,
string applicationTypeName,
string nodeName,
string message)
{
WriteEvent(ServiceMessageEventId, serviceName, serviceTypeName, replicaOrInstanceId, partitionId, applicationName, applicationTypeName, nodeName, message);
}
private const int ServiceTypeRegisteredEventId = 3;
[Event(ServiceTypeRegisteredEventId, Level = EventLevel.Informational, Message = "Service host process {0} registered service type {1}", Keywords = Keywords.ServiceInitialization)]
public void ServiceTypeRegistered(int hostProcessId, string serviceType)
{
WriteEvent(ServiceTypeRegisteredEventId, hostProcessId, serviceType);
}
private const int ServiceHostInitializationFailedEventId = 4;
[Event(ServiceHostInitializationFailedEventId, Level = EventLevel.Error, Message = "Service host initialization failed", Keywords = Keywords.ServiceInitialization)]
public void ServiceHostInitializationFailed(string exception)
{
WriteEvent(ServiceHostInitializationFailedEventId, exception);
}
// A pair of events sharing the same name prefix with a "Start"/"Stop" suffix implicitly marks boundaries of an event tracing activity.
// These activities can be automatically picked up by debugging and profiling tools, which can compute their execution time, child activities,
// and other statistics.
private const int ServiceRequestStartEventId = 5;
[Event(ServiceRequestStartEventId, Level = EventLevel.Informational, Message = "Service request '{0}' started", Keywords = Keywords.Requests)]
public void ServiceRequestStart(string requestTypeName)
{
WriteEvent(ServiceRequestStartEventId, requestTypeName);
}
private const int ServiceRequestStopEventId = 6;
[Event(ServiceRequestStopEventId, Level = EventLevel.Informational, Message = "Service request '{0}' finished", Keywords = Keywords.Requests)]
public void ServiceRequestStop(string requestTypeName, string exception = "")
{
WriteEvent(ServiceRequestStopEventId, requestTypeName, exception);
}
#endregion
}
}

Просмотреть файл

@ -0,0 +1,92 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
/// <summary>
/// Type used for OnShutdown property.
/// </summary>
/// <param name="e"></param>
public delegate void ShutdownNotification(Exception e);
/// <summary>
/// Options that govern the functioning of the processor.
/// </summary>
public class EventProcessorOptions
{
/// <summary>
/// Construct with default options.
/// </summary>
public EventProcessorOptions()
{
this.MaxBatchSize = 10;
this.PrefetchCount = 300;
this.ReceiveTimeout = TimeSpan.FromMinutes(1);
this.EnableReceiverRuntimeMetric = false;
this.InvokeProcessorAfterReceiveTimeout = false;
this.InitialPositionProvider = partitionId => EventPosition.FromStart();
this.ClientReceiverOptions = null;
this.OnShutdown = null;
}
/// <summary>
/// The maximum number of events that will be presented to IEventProcessor.OnEventsAsync in one call.
/// Defaults to 10.
/// </summary>
public int MaxBatchSize { get; set; }
/// <summary>
/// The prefetch count for the Event Hubs receiver.
/// Defaults to 300.
/// </summary>
public int PrefetchCount { get; set; }
/// <summary>
/// The timeout for the Event Hubs receiver.
/// Defaults to one minute.
/// </summary>
public TimeSpan ReceiveTimeout { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the runtime metric of a receiver is enabled (true) or disabled (false).
/// Defaults to false.
/// </summary>
public bool EnableReceiverRuntimeMetric { get; set; }
/// <summary>
/// Determines whether IEventProcessor.OnEventsAsync is called when the Event Hubs receiver times out.
/// Set to true to get calls with empty event list.
/// Set to false to not get calls.
/// Defaults to false.
/// </summary>
public bool InvokeProcessorAfterReceiveTimeout { get; set; }
/// <summary>
/// If there is no checkpoint, the user can provide a position for the Event Hubs receiver to start at.
/// Defaults to first event available in the stream.
/// </summary>
public Func<string, EventPosition> InitialPositionProvider { get; set; }
/// <summary>
/// ReceiverOptions used by the underlying Event Hubs client.
/// Defaults to null.
/// </summary>
public ReceiverOptions ClientReceiverOptions { get; set; }
/// <summary>
/// TODO -- is this needed? It's called just before SFP.RunAsync throws out/returns to user code anyway.
/// But user code won't see that until it awaits the Task, so maybe this is useful?
/// </summary>
public ShutdownNotification OnShutdown { get; set; }
internal void NotifyOnShutdown(Exception shutdownException)
{
if (this.OnShutdown != null)
{
this.OnShutdown(shutdownException);
}
}
}
}

Просмотреть файл

@ -0,0 +1,53 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Interface for a checkpoint manager which persists Checkpoints.
/// </summary>
public interface ICheckpointMananger
{
/// <summary>
/// Does the checkpoint store exist?
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns>True if it exists, false if not.</returns>
Task<bool> CheckpointStoreExistsAsync(CancellationToken cancellationToken);
/// <summary>
/// Create the checkpoint store if it doesn't exist.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns>True if it exists or was created OK, false if not.</returns>
Task<bool> CreateCheckpointStoreIfNotExistsAsync(CancellationToken cancellationToken);
/// <summary>
/// Create an uninitialized checkpoint for the given partition.
/// </summary>
/// <param name="partitionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId, CancellationToken cancellationToken);
/// <summary>
/// Get the checkpoint for the given partition. Returns null if there is no checkpoint or if it is uninitialized.
/// </summary>
/// <param name="partitionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<Checkpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken);
/// <summary>
/// Persist the Checkpoint for the given partition.
/// </summary>
/// <param name="partitionId"></param>
/// <param name="checkpoint"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task UpdateCheckpointAsync(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -0,0 +1,65 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Interface for processing events.
/// </summary>
public abstract class IEventProcessor
{
/// <summary>
/// Called on startup.
/// </summary>
/// <param name="cancellationToken"></param>
/// <param name="context"></param>
/// <returns></returns>
abstract public Task OpenAsync(CancellationToken cancellationToken, PartitionContext context);
/// <summary>
/// Called on shutdown.
/// </summary>
/// <param name="context"></param>
/// <param name="reason"></param>
/// <returns></returns>
abstract public Task CloseAsync(PartitionContext context, CloseReason reason);
/// <summary>
/// Called when events are available.
/// </summary>
/// <param name="cancellationToken"></param>
/// <param name="context"></param>
/// <param name="events"></param>
/// <returns></returns>
abstract public Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<EventData> events);
/// <summary>
/// Called when an error occurs.
/// </summary>
/// <param name="context"></param>
/// <param name="error"></param>
/// <returns></returns>
abstract public Task ProcessErrorAsync(PartitionContext context, Exception error);
/// <summary>
/// Called periodically to get user-supplied load metrics.
/// </summary>
/// <param name="cancellationToken"></param>
/// <param name="context"></param>
/// <returns></returns>
virtual public Dictionary<string, int> GetLoadMetric(CancellationToken cancellationToken, PartitionContext context)
{
// By default all partitions have a metric of named CountOfPartitions with value 1. If Service Fabric is configured to use this metric,
// it will balance primaries across nodes simply by the number of primaries on a node. This can be overridden to return
// more sophisticated metrics like number of events processed or CPU usage.
Dictionary<string, int> defaultMetric = new Dictionary<string, int>();
defaultMetric.Add(Constants.DefaultUserLoadMetricName, 1);
return defaultMetric;
}
}
}

Просмотреть файл

@ -0,0 +1,47 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>This is the next generation Azure Event Hubs .NET Standard Service Fabric Processor library, which integrates Event Hub event consumption with Service Fabric. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<AssemblyTitle>Microsoft.Azure.EventHubs.ServiceFabricProcessor</AssemblyTitle>
<VersionPrefix>0.5.2-PREVIEW</VersionPrefix>
<Authors>Microsoft</Authors>
<TargetFramework>netstandard2.0</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>Microsoft.Azure.EventHubs.ServiceFabricProcessor</AssemblyName>
<AssemblyOriginatorKeyFile>../../build/keyfile.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<PackageId>Microsoft.Azure.EventHubs.ServiceFabricProcessor</PackageId>
<PackageTags>Azure;Event Hubs;EventHubs;.NET;AMQP;IoT</PackageTags>
<PackageReleaseNotes>https://github.com/Azure/azure-event-hubs-dotnet/releases</PackageReleaseNotes>
<PackageIconUrl>https://raw.githubusercontent.com/Azure/azure-event-hubs-dotnet/master/event-hubs.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/Azure/azure-event-hubs-dotnet</PackageProjectUrl>
<PackageLicenseUrl>https://raw.githubusercontent.com/Azure/azure-event-hubs-dotnet/master/LICENSE</PackageLicenseUrl>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<DebugType>full</DebugType>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\Microsoft.Azure.EventHubs.Processor.xml</DocumentationFile>
<Version>0.5.2</Version>
<AssemblyVersion>0.5.2.0</AssemblyVersion>
<FileVersion>0.5.2.0</FileVersion>
<DelaySign>true</DelaySign>
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.ServiceFabric" Version="6.4.624" />
<PackageReference Include="Microsoft.ServiceFabric.Data" Version="3.3.624" />
<PackageReference Include="Microsoft.ServiceFabric.Services" Version="3.3.624" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Microsoft.Azure.EventHubs\Microsoft.Azure.EventHubs.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,102 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Passed to an event processor instance to describe the environment.
/// </summary>
public class PartitionContext
{
readonly private ICheckpointMananger checkpointMananger;
/// <summary>
/// Construct an instance.
/// </summary>
/// <param name="cancellationToken">CancellationToken that the event processor should respect. Same as token passed to IEventProcessor methods.</param>
/// <param name="partitionId">Id of the partition for which the event processor is handling events.</param>
/// <param name="eventHubPath">Name of the event hub which is the source of events.</param>
/// <param name="consumerGroupName">Name of the consumer group on the event hub.</param>
/// <param name="checkpointMananger">The checkpoint manager instance to use.</param>
public PartitionContext(CancellationToken cancellationToken, string partitionId, string eventHubPath, string consumerGroupName, ICheckpointMananger checkpointMananger)
{
this.CancellationToken = cancellationToken;
this.PartitionId = partitionId;
this.EventHubPath = eventHubPath;
this.ConsumerGroupName = consumerGroupName;
// TODO: Requires client change to support
this.RuntimeInformation = null; // new ReceiverRuntimeInformation(this.PartitionId);
this.checkpointMananger = checkpointMananger;
}
/// <summary>
/// The event processor implementation should respect this CancellationToken. It is the same as the token passed
/// in to IEventProcessor methods. It is here primarily for compatibility with Event Processor Host.
/// </summary>
public CancellationToken CancellationToken { get; private set; }
/// <summary>
/// Name of the consumer group on the event hub.
/// </summary>
public string ConsumerGroupName { get; private set; }
/// <summary>
/// Name of the event hub.
/// </summary>
public string EventHubPath { get; private set; }
/// <summary>
/// Id of the partition.
/// </summary>
public string PartitionId { get; private set; }
/// <summary>
/// Gets the approximate receiver runtime information for a logical partition of an Event Hub.
/// To enable the setting, refer to <see cref="EventProcessorOptions.EnableReceiverRuntimeMetric"/>
/// </summary>
public ReceiverRuntimeInformation RuntimeInformation
{
get;
// internal set;
}
internal string Offset { get; set; }
internal long SequenceNumber { get; set; }
internal void SetOffsetAndSequenceNumber(EventData eventData)
{
this.Offset = eventData.SystemProperties.Offset;
this.SequenceNumber = eventData.SystemProperties.SequenceNumber;
}
/// <summary>
/// Mark the last event of the current batch and all previous events as processed.
/// </summary>
/// <returns></returns>
public async Task CheckpointAsync()
{
await CheckpointAsync(new Checkpoint(this.Offset, this.SequenceNumber));
}
/// <summary>
/// Mark the given event and all previous events as processed.
/// </summary>
/// <param name="eventData">Highest-processed event.</param>
/// <returns></returns>
public async Task CheckpointAsync(EventData eventData)
{
await CheckpointAsync(new Checkpoint(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber));
}
private async Task CheckpointAsync(Checkpoint checkpoint)
{
await this.checkpointMananger.UpdateCheckpointAsync(this.PartitionId, checkpoint, this.CancellationToken);
}
}
}

Просмотреть файл

@ -0,0 +1,271 @@
# Programmer's Guide to Service Fabric Processor
## Introduction
Service Fabric Processor (SFP) allows a user to easily create a Service
Fabric-based stateful service that processes events from an Event Hub. The
service is required to have exactly as many partitions as the target Event Hub
does, because each Service Fabric partition is permanently associated with a
corresponding Event Hub partition. The user's processing logic for events is
contained in their implementation of the IEventProcessor interface. Each
partition of the service has an instance of that implementation, which is called
by SFP to handle events consumed from the corresponding Event Hub partition.
Only the primary Service Fabric replica for each partition consumes and processes
events. The secondary replicas exist to maintain the reliable dictionary that
Service Fabric Processor uses for checkpointing.
## IEventProcessor
There are four methods that the user is required to implement, and one more
which the user may override if desired.
Most of the methods provide a
CancellationToken as an argument, and it is important for long-running
operations in the user's code to honor that token. Service Fabric cancels it
when a primary replica's state is changing, and that is the code's chance to
clean up and shut down gracefully instead of being forcefully terminated!
All methods provide a PartitionContext as an argument, which contains
various information about the Event Hub and the partition that may be useful
to the user's code. The PartitionContext also has a reference to the
CancellationToken.
### OpenAsync
```csharp
Task OpenAsync(CancellationToken cancellationToken, PartitionContext context)
```
SFP calls this method when a partition replica becomes primary. This is the
opportunity to initialize resources that the event processing logic will
use, such as a database connection, or perform any other startup needed.
### ProcessEventsAsync
```csharp
Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<EventData> events)
```
After the Task returned by OpenAsync completes, SFP calls this method repeatedly
as events become available. SFP makes only one call to ProcessEventsAsync at a
time: it waits for the Task returned by the current call to complete before
attempting to consume more events from the Event Hub. By default, this method
is called only when events are available, so if traffic on the associated
Event Hub partition is sparse, it may be some time between calls to this method.
### CloseAsync
```csharp
Task CloseAsync(PartitionContext context, CloseReason reason)
```
SFP calls this method when a primary replica is being shut down, whether due to
an Event Hub failure or because Service Fabric is changing the replica's state.
No cancellation token is provided because the user's code is expected to already
be shutting down as quickly as possible, and in many cases the token will already
be cancelled. The CloseReason indicates whether the shutdown is due to an Event
Hub failure or Service Fabric cancellation.
This method will will not be called until the Task returned by the most recent
ProcessEventsAsync call has completed.
### ProcessErrorAsync
```csharp
Task ProcessErrorAsync(PartitionContext context, Exception error)
```
SFP calls this method when an Event Hubs error has occurred. It is purely
informational. Recovering from the error, if possible, is up to SFP.
### GetLoadMetric
```csharp
Dictionary<string, int> GetLoadMetric(CancellationToken cancellationToken, PartitionContext context)
```
Service Fabric offers sophisticated load balancing of
partition replicas between nodes based on user-provided metrics, and this
method allows SFP users to take advantage of that feature. SFP polls this
method periodically and passes the metrics returned to Service Fabric. The
metrics are represented as a dictionary of string-int pairs, where the string
is the metric name and the int is the metric value. SFP provides a default
implementation of this method, which returns a metric named "CountOfPartitions"
that has a constant value of 1.
It is up to the user to configure Service Fabric to use metrics returned by
this method. Service Fabric ignores any metrics not mentioned in the
configuration, so it is safe to return any metrics that might be interesting
and then decide later which particular ones to use.
## Integrating With Service Fabric
SFP requires a stateful Service Fabric service that is configured to have the
same number of partitions as the Event Hub.
The user's service will have a class derived from the Service Fabric class
StatefulService, which in turn has a RunAsync method that is called on primary
partition replicas. SFP setup and activation occur within that RunAsync method.
### SFP Options
SFP provides a class EventProcessorOptions, which allows setting a wide variety
of options that adjust how SFP operates. To use it, create a new instance, which
is initialized with the default settings, then change the options of interest,
and finally pass the instance to the ServiceFabricProcessor constructor.
```csharp
EventProcessorOptions options = new EventProcessorOptions();
options.MaxBatchSize = 50;
ServiceFabricProcessor processor = new ServiceFabricProcessor(..., options);
```
Available options are:
* int MaxBatchSize: the _maximum_ number of events that will be passed to
ProcessEventsAsync at one time. Default is 10. The actual number of events for
each call is variable and depends on how many events are available in the Event
Hub partition, how fast they can be transferred, how long the previous call to
ProcessEventsAsync took, and other factors. If your system as a whole is
processing events faster than they are generated, then much of the time
ProcessEventsAsync will be called with only one event. This option only
governs the _maximum_ number.
* int PrefetchCount: passed to the underlying Event Hubs client, this option
governs how many events can be prefetched. Default is 300. This generally
comes into play only when there is a backlog of events due to slow processing.
If your system as a whole is processing events faster than they are generated,
then the prefetch buffer will be empty, because every event that becomes
available can be immediately passed to ProcessEventsAsync.
* TimeSpan ReceiveTimeout: passed to the underlying Event Hubs client, this
option governs the timeout duration for the Event Hubs receiver. Default is
60 seconds.
* bool EnableReceiverRuntimeMetric: TODO -- needs client changes before it
can be supported
* bool InvokeProcessorAfterReceiveTimeout: if false, ProcessEventsAsync is
called only when at least one event is available. If true, ProcessEventsAsync
is called with an empty event list when a receive timeout occurs. Default is
false.
* ShutdownNotification OnShutdown: set a delegate which is called just before
the Task returned by ServiceFabricProcessor.RunAsync completes. Depending on
how the code in the service's RunAsync is structured, SFP might shut down due
to an error long before RunAsync awaits the returned Task. This delegate
provides notification of such a shutdown.
* Func<string, EventPosition> InitialPositionProvider: see "Starting Position
and Checkpointing" below
### Instantiating ServiceFabricProcessor
```csharp
IEventProcessor myEventProcessor = new MyEventProcessorClass(...);
ServiceFabricProcessor processor = new ServiceFabricProcessor(
this.Context.ServiceName,
this.Context.PartitionId,
this.StateManager,
this.Partition,
myEventProcessor,
eventHubConnectionString,
eventHubConsumerGroup,
options
);
```
* The first four arguments are Service Fabric artifacts that SFP needs to get
information about Service Fabric partitions and to access Service Fabric reliable
dictionaries. They are all available as members of the StatefulService-derived
class.
* The next argument is an instance of the user's implementation of IEventProcessor.
* The next two arguments are the connection string of the Event Hub to consume
from, and the consumer group. The consumer group is optional: if null or omitted,
the default consumer group "$Default" is used.
* The last argument is an optional instance of EventProcessorOptions. If null
or omitted, all options have their default value.
### Start processing events
```csharp
Task processing = processor.RunAsync(cancellationToken);
// do other stuff here if desired
await processing;
```
Note that ServiceFabricProcessor.RunAsync can be called only once. If the await
throws, you can either allow the exception to propagate out to Service Fabric and
let Service Fabric restart the replica, or you can create a new instance of
ServiceFabricProcessor and call RunAsync on the new instance.
## Starting Position and Checkpointing
When ServiceFabricProcessor.RunAsync is called, one of the first things it does
is to create a receiver on the Event Hub partition so it can consume events.
Because Event Hubs do not have a service-side cursor, a receiver must specify
a starting position when it is created.
One of the features of SFP is checkpointing, which provides a client-side
cursor by persisting the offset of the last event processed successfully.
Checkpointing does not happen automatically, because there are scenarios
which do not need it. To use checkpointing, the user's implementation of
IEventProcessor.ProcessEventsAsync calls the CheckpointAsync methods on
the supplied PartitionContext.
### Finding the Starting Position
SFP follows these steps to determine the starting position when creating a
receiver:
* If there is a checkpoint for the partition, start at the next event after
that position.
* Else, call EventProcessorOptions.InitialPositionProvider, if present: when
setting up options, the user can provide a function which takes an Event Hub
partition id and returns an EventPosition.
* Else, start at the oldest available event.
### Checkpointing
PartitionContext provides two methods for setting a checkpoint:
* With no arguments, the checkpoint contains the position of the last event
in the current batch. If once-per-batch is a reasonable checkpoint strategy
for your application, calling PartitionContext.CheckpointAsync just before
returning from IEventProcessor.ProcessEventsAsync is simple and convenient.
* The other overload of CheckpointAsync takes an EventData as an argument
and sets a checkpoint with the position of the given event. This way, your
application can checkpoint at any interval.
It is important to await CheckpointAsync, to be sure that the checkpoint
was actually persisted.
### Special Considerations
A checkpoint is a representation of a position in the event stream of a
particular Event Hub+consumer group+partition combination. All events up to and
including the checkpointed position are assumed to be processed, and any events
after the position are assumed to be unprocessed. The intention is that a
newly-created receiver will pick up where the previous receiver left off, for
example when Service Fabric moves the primary replica of a partition from one
node to another for load balancing.
For performance reasons, it is not always desirable to checkpoint after processing
each event. Checkpointing at larger intervals (for example, every ten events, or
every ten seconds, etc.) can improve performance, but also means that if the
receiver is recreated, the new receiver may consume events that have already
been processed. It is up to the application owner to evaluate the tradeoff between
performance and reprocessing events.
Even if the application checkpoints after every event, that still
does not completely prevent the possibility of reprocessing an already-consumed
event, because there is always a time window between when an event is processed
and when the checkpoint is fully persisted, during which a node could fail. As a
best practice, an application must be able to cope with event reprocessing in some
way that is reasonable for the impact.

Просмотреть файл

@ -0,0 +1,159 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using Microsoft.ServiceFabric.Data;
using Microsoft.ServiceFabric.Data.Collections;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class ReliableDictionaryCheckpointMananger : ICheckpointMananger
{
private IReliableStateManager reliableStateManager = null;
private IReliableDictionary<string, Dictionary<string, object>> store = null;
internal ReliableDictionaryCheckpointMananger(IReliableStateManager rsm)
{
this.reliableStateManager = rsm;
}
public async Task<bool> CheckpointStoreExistsAsync(CancellationToken cancellationToken)
{
ConditionalValue<IReliableDictionary<string, Checkpoint>> tryStore = await
this.reliableStateManager.TryGetAsync<IReliableDictionary<string, Checkpoint>>(Constants.CheckpointDictionaryName);
EventProcessorEventSource.Current.Message($"CheckpointStoreExistsAsync = {tryStore.HasValue}");
return tryStore.HasValue;
}
public async Task<bool> CreateCheckpointStoreIfNotExistsAsync(CancellationToken cancellationToken)
{
// Create or get access to the dictionary.
this.store = await reliableStateManager.GetOrAddAsync<IReliableDictionary<string, Dictionary<string, object>>>(Constants.CheckpointDictionaryName);
EventProcessorEventSource.Current.Message("CreateCheckpointStoreIfNotExistsAsync OK");
return true;
}
public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId, CancellationToken cancellationToken)
{
Checkpoint existingCheckpoint = await GetWithRetry(partitionId, cancellationToken);
if (existingCheckpoint == null)
{
existingCheckpoint = new Checkpoint(1);
await PutWithRetry(partitionId, existingCheckpoint, cancellationToken);
}
EventProcessorEventSource.Current.Message("CreateCheckpointIfNotExists OK");
return existingCheckpoint;
}
public async Task<Checkpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken)
{
return await GetWithRetry(partitionId, cancellationToken);
}
public async Task UpdateCheckpointAsync(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken)
{
await PutWithRetry(partitionId, checkpoint, cancellationToken);
}
// Throws on error or if cancelled.
// Returns null if there is no entry for the given partition.
private async Task<Checkpoint> GetWithRetry(string partitionId, CancellationToken cancellationToken)
{
EventProcessorEventSource.Current.Message($"Getting checkpoint for {partitionId}");
Checkpoint result = null;
Exception lastException = null;
for (int i = 0; i < Constants.RetryCount; i++)
{
cancellationToken.ThrowIfCancellationRequested();
lastException = null;
try
{
using (ITransaction tx = this.reliableStateManager.CreateTransaction())
{
ConditionalValue<Dictionary<string, object>> rawCheckpoint = await
this.store.TryGetValueAsync(tx, partitionId, Constants.ReliableDictionaryTimeout, cancellationToken);
await tx.CommitAsync();
// Success! Save the result, if any, and break out of the retry loop.
if (rawCheckpoint.HasValue)
{
result = Checkpoint.CreateFromDictionary(rawCheckpoint.Value);
}
else
{
result = null;
}
break;
}
}
catch (TimeoutException e)
{
lastException = e;
}
}
if (lastException != null)
{
// Ran out of retries, throw.
throw new Exception("Ran out of retries creating checkpoint", lastException);
}
if (result != null)
{
EventProcessorEventSource.Current.Message($"Got checkpoint for {partitionId}: {result.Offset}//{result.SequenceNumber}");
}
else
{
EventProcessorEventSource.Current.Message($"No checkpoint found for {partitionId}: returning null");
}
return result;
}
private async Task PutWithRetry(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken)
{
EventProcessorEventSource.Current.Message($"Setting checkpoint for {partitionId}: {checkpoint.Offset}//{checkpoint.SequenceNumber}");
Exception lastException = null;
for (int i = 0; i < Constants.RetryCount; i++)
{
cancellationToken.ThrowIfCancellationRequested();
lastException = null;
Dictionary<string, object> putThis = checkpoint.ToDictionary();
try
{
using (ITransaction tx = this.reliableStateManager.CreateTransaction())
{
await this.store.SetAsync(tx, partitionId, putThis, Constants.ReliableDictionaryTimeout, cancellationToken);
await tx.CommitAsync();
// Success! Break out of the retry loop.
break;
}
}
catch (TimeoutException e)
{
lastException = e;
}
}
if (lastException != null)
{
// Ran out of retries, throw.
throw new Exception("Ran out of retries creating checkpoint", lastException);
}
EventProcessorEventSource.Current.Message($"Set checkpoint for {partitionId} OK");
}
}
}

Просмотреть файл

@ -0,0 +1,464 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
{
using System;
using System.Collections.Generic;
using System.Fabric;
using System.Fabric.Description;
using System.Fabric.Query;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceFabric.Data;
/// <summary>
/// Base class that implements event processor functionality.
/// </summary>
public class ServiceFabricProcessor : IPartitionReceiveHandler
{
// Service Fabric objects initialized in constructor
private readonly IReliableStateManager serviceStateManager;
private readonly Uri serviceFabricServiceName;
private readonly Guid serviceFabricPartitionId;
private readonly IStatefulServicePartition servicePartition;
// ServiceFabricProcessor settings initialized in constructor
private readonly IEventProcessor userEventProcessor;
private readonly EventProcessorOptions options;
private readonly ICheckpointMananger checkpointManager;
// Initialized during RunAsync startup
private int fabricPartitionOrdinal = -1;
private int servicePartitionCount = -1;
private string hubPartitionId;
private PartitionContext partitionContext;
private string initialOffset;
private CancellationTokenSource internalCanceller;
private Exception internalFatalException;
private CancellationToken linkedCancellationToken;
private EventHubsConnectionStringBuilder ehConnectionString;
private string consumerGroupName;
// Value managed by RunAsync
private int running = 0;
/// <summary>
/// Constructor. Arguments break down into three groups: (1) Service Fabric objects so this library can access
/// Service Fabric facilities, (2) Event Hub-related arguments which indicate what event hub to receive from and
/// how to process the events, and (3) advanced, which right now consists only of the ability to replace the default
/// reliable dictionary-based checkpoint manager with a user-provided implementation.
/// </summary>
/// <param name="serviceFabricServiceName">Service Fabric Uri found in StatefulServiceContext</param>
/// <param name="serviceFabricPartitionId">Service Fabric partition id found in StatefulServiceContext</param>
/// <param name="stateManager">Service Fabric-provided state manager, provides access to reliable dictionaries</param>
/// <param name="partition">Service Fabric-provided partition information</param>
/// <param name="userEventProcessor">User's event processor implementation</param>
/// <param name="eventHubConnectionString">Connection string for user's event hub</param>
/// <param name="eventHubConsumerGroup">Name of event hub consumer group to receive from</param>
/// <param name="options">Optional: Options structure for ServiceFabricProcessor library</param>
/// <param name="checkpointManager">Very advanced/optional: user-provided checkpoint manager implementation</param>
public ServiceFabricProcessor(Uri serviceFabricServiceName, Guid serviceFabricPartitionId, IReliableStateManager stateManager, IStatefulServicePartition partition, IEventProcessor userEventProcessor,
string eventHubConnectionString, string eventHubConsumerGroup,
EventProcessorOptions options = null, ICheckpointMananger checkpointManager = null)
{
if (serviceFabricServiceName == null)
{
throw new ArgumentNullException("serviceFabricServiceName is null");
}
if (serviceFabricPartitionId == null)
{
throw new ArgumentNullException("serviceFabricPartitionId is null");
}
if (stateManager == null)
{
throw new ArgumentNullException("stateManager is null");
}
if (partition == null)
{
throw new ArgumentNullException("partition is null");
}
if (userEventProcessor == null)
{
throw new ArgumentNullException("userEventProcessor is null");
}
if (string.IsNullOrEmpty(eventHubConnectionString))
{
throw new ArgumentException("eventHubConnectionString is null or empty");
}
if (string.IsNullOrEmpty(eventHubConsumerGroup))
{
throw new ArgumentException("eventHubConsumerGroup is null or empty");
}
this.serviceFabricServiceName = serviceFabricServiceName;
this.serviceFabricPartitionId = serviceFabricPartitionId;
this.serviceStateManager = stateManager;
this.servicePartition = partition;
this.userEventProcessor = userEventProcessor;
this.ehConnectionString = new EventHubsConnectionStringBuilder(eventHubConnectionString);
this.consumerGroupName = eventHubConsumerGroup;
this.options = options ?? new EventProcessorOptions();
this.checkpointManager = checkpointManager ?? new ReliableDictionaryCheckpointMananger(this.serviceStateManager);
this.EventHubClientFactory = new EventHubWrappers.EventHubClientFactory();
this.TestMode = false;
}
/// <summary>
/// For testing purposes. Do not change after calling RunAsync.
/// </summary>
public EventHubWrappers.IEventHubClientFactory EventHubClientFactory { get; set; }
/// <summary>
/// For testing purposes. Do not change after calling RunAsync.
/// </summary>
public bool TestMode { get; set; }
/// <summary>
/// Starts processing of events.
/// </summary>
/// <param name="fabricCancellationToken">Cancellation token provided by Service Fabric, assumed to indicate instance shutdown when cancelled.</param>
/// <returns>Task that completes when event processing shuts down.</returns>
public async Task RunAsync(CancellationToken fabricCancellationToken)
{
if (Interlocked.Exchange(ref this.running, 1) == 1)
{
EventProcessorEventSource.Current.Message("Already running");
throw new InvalidOperationException("EventProcessorService.RunAsync has already been called.");
}
this.internalCanceller = new CancellationTokenSource();
this.internalFatalException = null;
try
{
using (CancellationTokenSource linkedCanceller = CancellationTokenSource.CreateLinkedTokenSource(fabricCancellationToken, this.internalCanceller.Token))
{
this.linkedCancellationToken = linkedCanceller.Token;
await InnerRunAsync();
this.options.NotifyOnShutdown(null);
}
}
catch (Exception e)
{
// If InnerRunAsync throws, that is intended to be a fatal exception for this instance.
// Catch it here just long enough to log and notify, then rethrow.
EventProcessorEventSource.Current.Message("THROWING OUT: {0}", e);
if (e.InnerException != null)
{
EventProcessorEventSource.Current.Message("THROWING OUT INNER: {0}", e.InnerException);
}
this.options.NotifyOnShutdown(e);
throw e;
}
}
private async Task InnerRunAsync()
{
EventHubWrappers.IEventHubClient ehclient = null;
EventHubWrappers.IPartitionReceiver receiver = null;
try
{
//
// Get Service Fabric partition information.
//
await GetServicePartitionId(this.linkedCancellationToken);
//
// Create EventHubClient and check partition count.
//
Exception lastException = null;
EventProcessorEventSource.Current.Message("Creating event hub client");
lastException = RetryWrapper(() => { ehclient = this.EventHubClientFactory.CreateFromConnectionString(this.ehConnectionString.ToString()); });
if (ehclient == null)
{
EventProcessorEventSource.Current.Message("Out of retries event hub client");
throw new Exception("Out of retries creating EventHubClient", lastException);
}
EventProcessorEventSource.Current.Message("Event hub client OK");
EventProcessorEventSource.Current.Message("Getting event hub info");
EventHubRuntimeInformation ehInfo = null;
// Lambda MUST be synchronous to work with RetryWrapper!
lastException = RetryWrapper(() => { ehInfo = ehclient.GetRuntimeInformationAsync().Result; });
if (ehInfo == null)
{
EventProcessorEventSource.Current.Message("Out of retries getting event hub info");
throw new Exception("Out of retries getting event hub runtime info", lastException);
}
if (this.TestMode)
{
if (this.servicePartitionCount > ehInfo.PartitionCount)
{
EventProcessorEventSource.Current.Message("TestMode requires event hub partition count larger than service partitinon count");
throw new EventProcessorConfigurationException("TestMode requires event hub partition count larger than service partitinon count");
}
else if (this.servicePartitionCount < ehInfo.PartitionCount)
{
EventProcessorEventSource.Current.Message("TestMode: receiving from subset of event hub");
}
}
else if (ehInfo.PartitionCount != this.servicePartitionCount)
{
EventProcessorEventSource.Current.Message($"Service partition count {this.servicePartitionCount} does not match event hub partition count {ehInfo.PartitionCount}");
throw new EventProcessorConfigurationException($"Service partition count {this.servicePartitionCount} does not match event hub partition count {ehInfo.PartitionCount}");
}
this.hubPartitionId = ehInfo.PartitionIds[this.fabricPartitionOrdinal];
//
// Generate a PartitionContext now that the required info is available.
//
this.partitionContext = new PartitionContext(this.linkedCancellationToken, this.hubPartitionId, this.ehConnectionString.EntityPath, this.consumerGroupName, this.checkpointManager);
//
// Start up checkpoint manager.
//
await CheckpointStartup(this.linkedCancellationToken);
//
// If there was a checkpoint, the offset is in this.initialOffset, so convert it to an EventPosition.
// If no checkpoint, get starting point from user-supplied provider.
//
EventPosition initialPosition = null;
if (this.initialOffset != null)
{
EventProcessorEventSource.Current.Message($"Initial position from checkpoint, offset {this.initialOffset}");
initialPosition = EventPosition.FromOffset(this.initialOffset);
}
else
{
initialPosition = this.options.InitialPositionProvider(this.hubPartitionId);
EventProcessorEventSource.Current.Message("Initial position from provider");
}
//
// Create receiver.
//
EventProcessorEventSource.Current.Message("Creating receiver");
lastException = RetryWrapper(() => { receiver = ehclient.CreateEpochReceiver(this.consumerGroupName, this.hubPartitionId, initialPosition, this.initialOffset,
Constants.FixedReceiverEpoch, this.options.ClientReceiverOptions); });
if (receiver == null)
{
EventProcessorEventSource.Current.Message("Out of retries creating receiver");
throw new Exception("Out of retries creating event hub receiver", lastException);
}
//
// Call Open on user's event processor instance.
// If user's Open code fails, treat that as a fatal exception and let it throw out.
//
EventProcessorEventSource.Current.Message("Creating event processor");
await this.userEventProcessor.OpenAsync(this.linkedCancellationToken, this.partitionContext);
EventProcessorEventSource.Current.Message("Event processor created and opened OK");
//
// Start metrics reporting. This runs as a separate background thread.
//
Thread t = new Thread(this.MetricsHandler);
t.Start();
//
// Receive pump.
//
EventProcessorEventSource.Current.Message("RunAsync setting handler and waiting");
this.MaxBatchSize = this.options.MaxBatchSize;
receiver.SetReceiveHandler(this, this.options.InvokeProcessorAfterReceiveTimeout);
this.linkedCancellationToken.WaitHandle.WaitOne();
EventProcessorEventSource.Current.Message("RunAsync continuing, cleanup");
}
finally
{
if (this.partitionContext != null)
{
await this.userEventProcessor.CloseAsync(this.partitionContext, this.linkedCancellationToken.IsCancellationRequested ? CloseReason.Cancelled : CloseReason.Failure);
}
if (receiver != null)
{
receiver.SetReceiveHandler(null);
await receiver.CloseAsync();
}
if (ehclient != null)
{
await ehclient.CloseAsync();
}
if (this.internalFatalException != null)
{
throw this.internalFatalException;
}
}
}
private EventHubsException RetryWrapper(Action action)
{
EventHubsException lastException = null;
for (int i = 0; i < Constants.RetryCount; i++)
{
this.linkedCancellationToken.ThrowIfCancellationRequested();
try
{
action.Invoke();
break;
}
catch (EventHubsException e)
{
if (!e.IsTransient)
{
throw e;
}
lastException = e;
}
}
return lastException;
}
/// <summary>
/// From IPartitionReceiveHandler
/// </summary>
public int MaxBatchSize { get; set; }
async Task IPartitionReceiveHandler.ProcessEventsAsync(IEnumerable<EventData> events)
{
IEnumerable<EventData> effectiveEvents = events ?? new List<EventData>(); // convert to empty list if events is null
if (events != null)
{
// Save position of last event if we got a real list of events
IEnumerator<EventData> scanner = effectiveEvents.GetEnumerator();
EventData last = null;
while (scanner.MoveNext())
{
last = scanner.Current;
}
if (last != null)
{
this.partitionContext.SetOffsetAndSequenceNumber(last);
if (this.options.EnableReceiverRuntimeMetric)
{
// TODO: requires client change to support
// this.partitionContext.RuntimeInformation.Update(last);
}
}
}
await this.userEventProcessor.ProcessEventsAsync(this.linkedCancellationToken, this.partitionContext, effectiveEvents);
foreach (EventData ev in effectiveEvents)
{
ev.Dispose();
}
}
Task IPartitionReceiveHandler.ProcessErrorAsync(Exception error)
{
EventProcessorEventSource.Current.Message($"RECEIVE EXCEPTION on {this.hubPartitionId}: {error}");
this.userEventProcessor.ProcessErrorAsync(this.partitionContext, error);
if (error is EventHubsException)
{
if (!(error as EventHubsException).IsTransient)
{
this.internalFatalException = error;
this.internalCanceller.Cancel();
}
// else don't cancel on transient errors
}
else
{
// All other exceptions are assumed fatal.
this.internalFatalException = error;
this.internalCanceller.Cancel();
}
return Task.CompletedTask;
}
private async Task CheckpointStartup(CancellationToken cancellationToken)
{
// Set up store and get checkpoint, if any.
await this.checkpointManager.CreateCheckpointStoreIfNotExistsAsync(cancellationToken);
Checkpoint checkpoint = await this.checkpointManager.CreateCheckpointIfNotExistsAsync(this.hubPartitionId, cancellationToken);
if (!checkpoint.Valid)
{
// Not actually any existing checkpoint.
this.initialOffset = null;
EventProcessorEventSource.Current.Message("No checkpoint");
}
else if (checkpoint.Version == 1)
{
this.initialOffset = checkpoint.Offset;
EventProcessorEventSource.Current.Message($"Checkpoint provides initial offset {this.initialOffset}");
}
else
{
// It's actually a later-version checkpoint but we don't know the details.
// Access it via the V1 interface and hope it does something sensible.
this.initialOffset = checkpoint.Offset;
EventProcessorEventSource.Current.Message($"Unexpected checkpoint version {checkpoint.Version}, provided initial offset {this.initialOffset}");
}
}
private async Task GetServicePartitionId(CancellationToken cancellationToken)
{
if (this.fabricPartitionOrdinal == -1)
{
using (FabricClient fabricClient = new FabricClient())
{
ServicePartitionList partitionList =
await fabricClient.QueryManager.GetPartitionListAsync(this.serviceFabricServiceName);
// Set the number of partitions
this.servicePartitionCount = partitionList.Count;
// Which partition is this one?
for (int a = 0; a < partitionList.Count; a++)
{
if (partitionList[a].PartitionInformation.Id == this.serviceFabricPartitionId)
{
this.fabricPartitionOrdinal = a;
break;
}
}
EventProcessorEventSource.Current.Message($"Total partitions {this.servicePartitionCount}");
}
}
}
private void MetricsHandler()
{
EventProcessorEventSource.Current.Message("METRIC reporter starting");
while (!this.linkedCancellationToken.IsCancellationRequested)
{
Dictionary<string, int> userMetrics = this.userEventProcessor.GetLoadMetric(this.linkedCancellationToken, this.partitionContext);
try
{
List<LoadMetric> reportableMetrics = new List<LoadMetric>();
foreach (KeyValuePair<string, int> metric in userMetrics)
{
EventProcessorEventSource.Current.Message($"METRIC {metric.Key} for partition {this.partitionContext.PartitionId} is {metric.Value}");
reportableMetrics.Add(new LoadMetric(metric.Key, metric.Value));
}
this.servicePartition.ReportLoad(reportableMetrics);
Task.Delay(Constants.MetricReportingInterval, this.linkedCancellationToken).Wait(); // throws on cancel
}
catch (Exception e)
{
EventProcessorEventSource.Current.Message($"METRIC partition {this.partitionContext.PartitionId} exception {e}");
}
}
EventProcessorEventSource.Current.Message("METRIC reporter exiting");
}
}
}

Просмотреть файл

@ -33,4 +33,4 @@ using System.Runtime.InteropServices;
"dd8a96737e5385b31414369dc3e42f371172127252856a0650793e1f5673a16d5d78e2ac852a10" +
"4bc51e6f018dca44fdd26a219c27cb2b263956a80620223c8e9c2f8913c3c903e1e453e9e4e840" +
"98afdad5f4badb8c1ebe0a7b0a4b57a08454646a65886afe3e290a791ff3260099ce0edf0bdbcc" +
"afadfeb6")]
"afadfeb6")]