From 321f18844876acc0c66cf6a98d3d35fb9a203efc Mon Sep 17 00:00:00 2001 From: JamesBirdsall Date: Thu, 14 Feb 2019 10:39:17 -0800 Subject: [PATCH] ServiceFabricProcessor preview (#262) This is the code that built and released as preview version 0.5.2 https://www.nuget.org/packages/Microsoft.Azure.EventHubs.ServiceFabricProcessor/0.5.2 At the time it couldn't be merged with dev due to test issues from unrelated work, so we did the release from the SFprocessor branch. Those issues have been resolved, and we expect that future preview releases will come from dev branch. --- Microsoft.Azure.EventHubs.sln | 7 + .../Checkpoint.cs | 151 ++++++ .../CloseReason.cs | 21 + .../Constants.cs | 24 + .../EventHubMocks.cs | 209 ++++++++ .../EventHubWrappers.cs | 140 ++++++ .../EventProcessorConfigurationException.cs | 21 + .../EventProcessorEventSource.cs | 124 +++++ .../EventProcessorOptions.cs | 92 ++++ .../ICheckpointMananger.cs | 53 ++ .../IEventProcessor.cs | 65 +++ ...re.EventHubs.ServiceFabricProcessor.csproj | 47 ++ .../PartitionContext.cs | 102 ++++ .../ProgrammersGuide.md | 271 ++++++++++ .../ReliableDictionaryCheckpointMananger.cs | 159 ++++++ .../ServiceFabricProcessor.cs | 464 ++++++++++++++++++ .../Properties/AssemblyInfo.cs | 2 +- 17 files changed, 1951 insertions(+), 1 deletion(-) create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Checkpoint.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/CloseReason.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Constants.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubMocks.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubWrappers.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorConfigurationException.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorEventSource.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorOptions.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ICheckpointMananger.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/IEventProcessor.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Microsoft.Azure.EventHubs.ServiceFabricProcessor.csproj create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/PartitionContext.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ProgrammersGuide.md create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ReliableDictionaryCheckpointMananger.cs create mode 100644 src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ServiceFabricProcessor.cs diff --git a/Microsoft.Azure.EventHubs.sln b/Microsoft.Azure.EventHubs.sln index 3edf9f2..3f83517 100644 --- a/Microsoft.Azure.EventHubs.sln +++ b/Microsoft.Azure.EventHubs.sln @@ -18,6 +18,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.EventHubs.T EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.EventHubs.Processor", "src\Microsoft.Azure.EventHubs.Processor\Microsoft.Azure.EventHubs.Processor.csproj", "{8C89967A-4E1F-46B0-8458-81B545C822B2}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.EventHubs.ServiceFabricProcessor", "src\Microsoft.Azure.EventHubs.ServiceFabricProcessor\Microsoft.Azure.EventHubs.ServiceFabricProcessor.csproj", "{D96BCC8F-D5EE-464A-9C15-EF59613F9F82}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -36,6 +38,10 @@ Global {8C89967A-4E1F-46B0-8458-81B545C822B2}.Debug|Any CPU.Build.0 = Debug|Any CPU {8C89967A-4E1F-46B0-8458-81B545C822B2}.Release|Any CPU.ActiveCfg = Release|Any CPU {8C89967A-4E1F-46B0-8458-81B545C822B2}.Release|Any CPU.Build.0 = Release|Any CPU + {D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D96BCC8F-D5EE-464A-9C15-EF59613F9F82}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -44,5 +50,6 @@ Global {126D946D-CE0F-4F14-9F13-8FD7098B81D8} = {08E028C1-29E7-42B5-871F-C911DB93E78A} {154F7B4C-B998-4FA0-933F-F34DB0CA9B88} = {AF49C862-CB78-4110-A275-8111B387805D} {8C89967A-4E1F-46B0-8458-81B545C822B2} = {08E028C1-29E7-42B5-871F-C911DB93E78A} + {D96BCC8F-D5EE-464A-9C15-EF59613F9F82} = {08E028C1-29E7-42B5-871F-C911DB93E78A} EndGlobalSection EndGlobal diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Checkpoint.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Checkpoint.cs new file mode 100644 index 0000000..15456c3 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Checkpoint.cs @@ -0,0 +1,151 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + using System.Collections.Generic; + + /// + /// A persistable representation of what events in the stream have been processed. + /// Version 1 checkpoint is just a high-water mark, containing an offset and sequence number. All events at or lower than the given position + /// have been processed. Any events higher than the given position are unprocessed. + /// + public class Checkpoint + { + /// + /// Create an uninitialized checkpoint of the given version. + /// + /// + internal Checkpoint(int version) + { + this.Version = version; + this.Valid = false; + } + + /// + /// Create an initialized version 1 checkpoint. + /// + /// Offset of highest-processed position. + /// Sequence number of highest-processed position. + public Checkpoint(string offset, long sequenceNumber) + { + this.Version = 1; + this.Offset = offset; + this.SequenceNumber = sequenceNumber; + this.Valid = true; + } + + #region AllVersions + // + // Methods and properties valid for all versions. + // + + /// + /// Version of this checkpoint. + /// + public int Version { get; protected set; } + + /// + /// True if this checkpoint contains a valid position. + /// + public bool Valid { get; protected set; } + + /// + /// Serialize this instance to a persistable representation as a name-value dictionary. + /// + /// Serialized dictionary representation. + public Dictionary ToDictionary() + { + Dictionary converted = new Dictionary(); + + converted.Add(Constants.CheckpointPropertyVersion, this.Version); + converted.Add(Constants.CheckpointPropertyValid, this.Valid); + + switch (this.Version) + { + case 1: + converted.Add(Constants.CheckpointPropertyOffsetV1, this.Offset); + converted.Add(Constants.CheckpointPropertySequenceNumberV1, this.SequenceNumber); + break; + + default: + throw new NotImplementedException(); + } + + return converted; + } + + /// + /// Deserialize from a name-value dictionary. + /// + /// Serialized representation. + /// Deserialized instance. + static public Checkpoint CreateFromDictionary(Dictionary dictionary) + { + int version = (int)dictionary[Constants.CheckpointPropertyVersion]; + bool valid = (bool)dictionary[Constants.CheckpointPropertyValid]; + + Checkpoint result = new Checkpoint(version); + + if (valid) + { + result.Valid = true; + + switch (result.Version) + { + case 1: + result.Offset = (string)dictionary[Constants.CheckpointPropertyOffsetV1]; + result.SequenceNumber = (long)dictionary[Constants.CheckpointPropertySequenceNumberV1]; + break; + + default: + throw new NotImplementedException($"Unrecognized checkpoint version {result.Version}"); + } + } + + return result; + } + #endregion AllVersions + + #region Version1 + // + // Methods and properties for Version==1 + // + + /// + /// Initialize an uninitialized instance as a version 1 checkpoint. + /// + /// Offset of highest-processed position. + /// Sequence number of highest-processed position. + public void InitializeV1(string offset, long sequenceNumber) + { + this.Version = 1; + + if (string.IsNullOrEmpty(offset)) + { + throw new ArgumentException("offset must not be null or empty"); + } + if (sequenceNumber < 0) + { + throw new ArgumentException("sequenceNumber must be >= 0"); + } + + this.Offset = offset; + this.SequenceNumber = sequenceNumber; + + this.Valid = true; + } + + /// + /// Offset of highest-processed position. Immutable after construction or initialization. + /// + public string Offset { get; private set; } + + /// + /// Sequence number of highest-processed position. Immutable after construction or initialization. + /// + public long SequenceNumber { get; private set; } + #endregion Version1 + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/CloseReason.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/CloseReason.cs new file mode 100644 index 0000000..b0755f7 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/CloseReason.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + /// + /// Why the event processor is being shut down. + /// + public enum CloseReason + { + /// + /// It was cancelled by Service Fabric. + /// + Cancelled, + + /// + /// There was an event hubs failure. + /// + Failure + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Constants.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Constants.cs new file mode 100644 index 0000000..2c80296 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Constants.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + + class Constants + { + internal static readonly int RetryCount = 5; + + internal static readonly int FixedReceiverEpoch = 0; + + internal static readonly TimeSpan MetricReportingInterval = TimeSpan.FromMinutes(1.0); + internal static readonly string DefaultUserLoadMetricName = "CountOfPartitions"; + + internal static readonly TimeSpan ReliableDictionaryTimeout = TimeSpan.FromSeconds(10.0); // arbitrary + internal static readonly string CheckpointDictionaryName = "EventProcessorCheckpointDictionary"; + internal static readonly string CheckpointPropertyVersion = "version"; + internal static readonly string CheckpointPropertyValid = "valid"; + internal static readonly string CheckpointPropertyOffsetV1 = "offsetV1"; + internal static readonly string CheckpointPropertySequenceNumberV1 = "sequenceNumberV1"; + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubMocks.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubMocks.cs new file mode 100644 index 0000000..ed164a6 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubMocks.cs @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + /// + /// Mocks for the underlying event hub client. Using these instead of the regular wrappers allows unit testing without an event hub. + /// By default, EventProcessorService.EventHubClientFactory is a EventHubWrappers.EventHubClientFactory. + /// To use the mocks, change it to a EventHubMocks.EventHubClientFactoryMock. + /// + public class EventHubMocks + { + /// + /// Mock of an Event Hub partition receiver. + /// + public class PartitionReceiverMock : EventHubWrappers.IPartitionReceiver + { + private readonly string partitionId; + private long sequenceNumber; + private IPartitionReceiveHandler outerHandler; + private bool invokeWhenNoEvents; + private readonly CancellationToken token; + + /// + /// Construct the partition receiver mock. + /// + /// + /// + /// + public PartitionReceiverMock(string partitionId, long sequenceNumber, CancellationToken token) + { + this.partitionId = partitionId; + this.sequenceNumber = sequenceNumber; + this.token = token; + } + + /// + /// Receive mock events. + /// + /// + /// + /// + public Task> ReceiveAsync(int maxEventCount, TimeSpan waitTime) + { + List events = new List(); + for (int i = 0; i < maxEventCount; i++) + { + this.sequenceNumber++; + byte[] body = new byte[] { 0x4D, 0x4F, 0x43, 0x4B, 0x42, 0x4F, 0x44, 0x59 }; // M O C K B O D Y + EventData e = new EventData(body); + // TODO -- need a way to set the system properties of the EventData + //e.ForceSystemProperties(new EventData.SystemPropertiesCollection(this.sequenceNumber, DateTime.UtcNow, (this.sequenceNumber * 100).ToString(), "")); + e.Properties.Add("userkey", "uservalue"); + events.Add(e); + } + Thread.Sleep(5000); + EventProcessorEventSource.Current.Message($"MOCK ReceiveAsync returning {maxEventCount} events for partition {this.partitionId} ending at {this.sequenceNumber}"); + return Task.FromResult>(events); + } + + /// + /// Set a mock receive handler. + /// + /// + /// + public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = false) + { + EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.SetReceiveHandler"); + this.outerHandler = receiveHandler; + this.invokeWhenNoEvents = invokeWhenNoEvents; // TODO mock does not emulate receive timeouts + if (this.outerHandler != null) + { + Task.Run(() => GenerateMessages()); + } + } + + /// + /// Close the mock receiver. + /// + /// + public Task CloseAsync() + { + EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.CloseAsync"); + return Task.CompletedTask; + } + + private async void GenerateMessages() + { + while ((!this.token.IsCancellationRequested) && (this.outerHandler != null)) + { + EventProcessorEventSource.Current.Message("MOCK Generating messages and sending to handler"); + IEnumerable events = ReceiveAsync(10, TimeSpan.FromSeconds(10.0)).Result; // TODO get count from somewhere real + IPartitionReceiveHandler capturedHandler = this.outerHandler; + if (capturedHandler != null) + { + await capturedHandler.ProcessEventsAsync(events); + } + } + EventProcessorEventSource.Current.Message("MOCK Message generation ending"); + } + } + + /// + /// Mock of EventHubClient class. + /// + public class EventHubClientMock : EventHubWrappers.IEventHubClient + { + private readonly int partitionCount; + private readonly EventHubsConnectionStringBuilder csb; + private CancellationToken token = new CancellationToken(); + + /// + /// Construct the mock. + /// + /// + /// + public EventHubClientMock(int partitionCount, EventHubsConnectionStringBuilder csb) + { + this.partitionCount = partitionCount; + this.csb = csb; + } + + internal void SetCancellationToken(CancellationToken t) + { + this.token = t; + } + + /// + /// Get runtime info of the fake event hub. + /// + /// + public Task GetRuntimeInformationAsync() + { + EventHubRuntimeInformation ehri = new EventHubRuntimeInformation(); + ehri.PartitionCount = this.partitionCount; + ehri.PartitionIds = new string[this.partitionCount]; + for (int i = 0; i < this.partitionCount; i++) + { + ehri.PartitionIds[i] = i.ToString(); + } + ehri.Path = csb.EntityPath; + EventProcessorEventSource.Current.Message($"MOCK GetRuntimeInformationAsync for {ehri.Path}"); + return Task.FromResult(ehri); + } + + /// + /// Create a mock receiver on the fake event hub. + /// + /// + /// + /// + /// + /// + /// + /// + public EventHubWrappers.IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string partitionId, EventPosition eventPosition, string offset, long epoch, ReceiverOptions receiverOptions) + { + EventProcessorEventSource.Current.Message($"MOCK CreateEpochReceiver(CG {consumerGroupName}, part {partitionId}, offset {offset} epoch {epoch})"); + // TODO implement epoch semantics + long startSeq = (offset != null) ? (long.Parse(offset) / 100L) : 0L; + return new PartitionReceiverMock(partitionId, startSeq, this.token); + } + + /// + /// Close the mock EventHubClient. + /// + /// + public Task CloseAsync() + { + EventProcessorEventSource.Current.Message("MOCK IEventHubClient.CloseAsync"); + return Task.CompletedTask; + } + } + + /// + /// An EventHubClient factory which dispenses mocks. + /// + public class EventHubClientFactoryMock : EventHubWrappers.IEventHubClientFactory + { + private readonly int partitionCount; + + /// + /// Construct the mock factory. + /// + /// + public EventHubClientFactoryMock(int partitionCount) + { + this.partitionCount = partitionCount; + } + + /// + /// Dispense a mock instance operating on a fake event hub with name taken from the connection string. + /// + /// + /// + public EventHubWrappers.IEventHubClient CreateFromConnectionString(string connectionString) + { + throw new NotImplementedException("Need a change to EventData before mocks can be supported"); + //EventProcessorEventSource.Current.Message($"MOCK Creating IEventHubClient {connectionString} with {this.partitionCount} partitions"); + //return new EventHubClientMock(this.partitionCount, new EventHubsConnectionStringBuilder(connectionString)); + } + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubWrappers.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubWrappers.cs new file mode 100644 index 0000000..d67ad29 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventHubWrappers.cs @@ -0,0 +1,140 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + + /// + /// Wrappers for the underlying Event Hub client which allow mocking. + /// The interfaces include only the client functionality used by the Service Fabric Processor. + /// + public class EventHubWrappers + { + /// + /// Interface for a partition receiver. + /// + public interface IPartitionReceiver + { + /// + /// + /// + /// + /// + Task> ReceiveAsync(int maxEventCount, TimeSpan waitTime); + + /// + /// + /// + /// + void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = false); + + /// + /// + /// + Task CloseAsync(); + } + + /// + /// Interface representing EventHubClient + /// + public interface IEventHubClient + { + /// + /// + /// + Task GetRuntimeInformationAsync(); + + /// + /// + /// + /// + /// + /// Only used by mocks + /// + /// + /// + IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string partitionId, EventPosition eventPosition, string offset, long epoch, ReceiverOptions receiverOptions); + + /// + /// + /// + Task CloseAsync(); + } + + /// + /// Interface for an EventHubClient factory so that we can have factories which dispense different implementations of IEventHubClient. + /// + public interface IEventHubClientFactory + { + /// + /// + /// + /// + IEventHubClient CreateFromConnectionString(string connectionString); + } + + internal class PartitionReceiverWrapper : IPartitionReceiver + { + private readonly PartitionReceiver inner; + + internal PartitionReceiverWrapper(PartitionReceiver receiver) + { + this.inner = receiver; + this.MaxBatchSize = 10; // TODO get this from somewhere real + } + + public Task> ReceiveAsync(int maxEventCount, TimeSpan waitTime) + { + return this.inner.ReceiveAsync(maxEventCount, waitTime); + } + + public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = false) + { + this.inner.SetReceiveHandler(receiveHandler, invokeWhenNoEvents); + } + + public Task CloseAsync() + { + return this.inner.CloseAsync(); + } + + public int MaxBatchSize { get; set; } + } + + internal class EventHubClientWrapper : IEventHubClient + { + private readonly EventHubClient inner; + + internal EventHubClientWrapper(EventHubClient ehc) + { + this.inner = ehc; + } + + public Task GetRuntimeInformationAsync() + { + return this.inner.GetRuntimeInformationAsync(); + } + + public IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string partitionId, EventPosition eventPosition, string offset, long epoch, ReceiverOptions receiverOptions) + { + return new PartitionReceiverWrapper(this.inner.CreateEpochReceiver(consumerGroupName, partitionId, eventPosition, epoch, receiverOptions)); + } + + public Task CloseAsync() + { + return this.inner.CloseAsync(); + } + } + + internal class EventHubClientFactory : IEventHubClientFactory + { + public IEventHubClient CreateFromConnectionString(string connectionString) + { + return new EventHubClientWrapper(EventHubClient.CreateFromConnectionString(connectionString)); + } + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorConfigurationException.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorConfigurationException.cs new file mode 100644 index 0000000..7505b54 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorConfigurationException.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + + /// + /// Exception thrown when the configuration of the service has a problem. + /// + public class EventProcessorConfigurationException : Exception + { + /// + /// Construct the exception. + /// + /// + public EventProcessorConfigurationException(string message) : base(message) + { + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorEventSource.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorEventSource.cs new file mode 100644 index 0000000..c29b021 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorEventSource.cs @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + using System.Diagnostics.Tracing; + using System.Fabric; + + [EventSource(Name = "Microsoft-Azure-EventHubs-ServiceFabricProcessor")] + internal sealed class EventProcessorEventSource : EventSource + { + public static readonly EventProcessorEventSource Current = new EventProcessorEventSource(); + + // Instance constructor is private to enforce singleton semantics + private EventProcessorEventSource() : base() { } + + #region Keywords + // Event keywords can be used to categorize events. + // Each keyword is a bit flag. A single event can be associated with multiple keywords (via EventAttribute.Keywords property). + // Keywords must be defined as a public class named 'Keywords' inside EventSource that uses them. + public static class Keywords + { + public const EventKeywords Requests = (EventKeywords)0x1L; + public const EventKeywords ServiceInitialization = (EventKeywords)0x2L; + } + #endregion + + #region Events + // Define an instance method for each event you want to record and apply an [Event] attribute to it. + // The method name is the name of the event. + // Pass any parameters you want to record with the event (only primitive integer types, DateTime, Guid & string are allowed). + // Each event method implementation should check whether the event source is enabled, and if it is, call WriteEvent() method to raise the event. + // The number and types of arguments passed to every event method must exactly match what is passed to WriteEvent(). + // Put [NonEvent] attribute on all methods that do not define an event. + // For more information see https://msdn.microsoft.com/en-us/library/system.diagnostics.tracing.eventsource.aspx + + [NonEvent] + public void Message(string message, params object[] args) + { + if (this.IsEnabled()) + { + string finalMessage = string.Format(message, args); + Message(finalMessage); + } + } + + private const int MessageEventId = 1; + [Event(MessageEventId, Level = EventLevel.Informational, Message = "{0}")] + public void Message(string message) + { + if (this.IsEnabled()) + { + WriteEvent(MessageEventId, message); + } + } + + [NonEvent] + public void ServiceMessage(StatefulServiceContext serviceContext, string message, params object[] args) + { + if (this.IsEnabled()) + { + string finalMessage = string.Format(message, args); + ServiceMessage( + serviceContext.ServiceName.ToString(), + serviceContext.ServiceTypeName, + serviceContext.ReplicaId, + serviceContext.PartitionId, + serviceContext.CodePackageActivationContext.ApplicationName, + serviceContext.CodePackageActivationContext.ApplicationTypeName, + serviceContext.NodeContext.NodeName, + finalMessage); + } + } + + private const int ServiceMessageEventId = 2; + [Event(ServiceMessageEventId, Level = EventLevel.Informational, Message = "{7}")] + private + void ServiceMessage( + string serviceName, + string serviceTypeName, + long replicaOrInstanceId, + Guid partitionId, + string applicationName, + string applicationTypeName, + string nodeName, + string message) + { + WriteEvent(ServiceMessageEventId, serviceName, serviceTypeName, replicaOrInstanceId, partitionId, applicationName, applicationTypeName, nodeName, message); + } + + private const int ServiceTypeRegisteredEventId = 3; + [Event(ServiceTypeRegisteredEventId, Level = EventLevel.Informational, Message = "Service host process {0} registered service type {1}", Keywords = Keywords.ServiceInitialization)] + public void ServiceTypeRegistered(int hostProcessId, string serviceType) + { + WriteEvent(ServiceTypeRegisteredEventId, hostProcessId, serviceType); + } + + private const int ServiceHostInitializationFailedEventId = 4; + [Event(ServiceHostInitializationFailedEventId, Level = EventLevel.Error, Message = "Service host initialization failed", Keywords = Keywords.ServiceInitialization)] + public void ServiceHostInitializationFailed(string exception) + { + WriteEvent(ServiceHostInitializationFailedEventId, exception); + } + + // A pair of events sharing the same name prefix with a "Start"/"Stop" suffix implicitly marks boundaries of an event tracing activity. + // These activities can be automatically picked up by debugging and profiling tools, which can compute their execution time, child activities, + // and other statistics. + private const int ServiceRequestStartEventId = 5; + [Event(ServiceRequestStartEventId, Level = EventLevel.Informational, Message = "Service request '{0}' started", Keywords = Keywords.Requests)] + public void ServiceRequestStart(string requestTypeName) + { + WriteEvent(ServiceRequestStartEventId, requestTypeName); + } + + private const int ServiceRequestStopEventId = 6; + [Event(ServiceRequestStopEventId, Level = EventLevel.Informational, Message = "Service request '{0}' finished", Keywords = Keywords.Requests)] + public void ServiceRequestStop(string requestTypeName, string exception = "") + { + WriteEvent(ServiceRequestStopEventId, requestTypeName, exception); + } + #endregion + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorOptions.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorOptions.cs new file mode 100644 index 0000000..17d54df --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/EventProcessorOptions.cs @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + + /// + /// Type used for OnShutdown property. + /// + /// + public delegate void ShutdownNotification(Exception e); + + /// + /// Options that govern the functioning of the processor. + /// + public class EventProcessorOptions + { + /// + /// Construct with default options. + /// + public EventProcessorOptions() + { + this.MaxBatchSize = 10; + this.PrefetchCount = 300; + this.ReceiveTimeout = TimeSpan.FromMinutes(1); + this.EnableReceiverRuntimeMetric = false; + this.InvokeProcessorAfterReceiveTimeout = false; + this.InitialPositionProvider = partitionId => EventPosition.FromStart(); + this.ClientReceiverOptions = null; + this.OnShutdown = null; + } + + /// + /// The maximum number of events that will be presented to IEventProcessor.OnEventsAsync in one call. + /// Defaults to 10. + /// + public int MaxBatchSize { get; set; } + + /// + /// The prefetch count for the Event Hubs receiver. + /// Defaults to 300. + /// + public int PrefetchCount { get; set; } + + /// + /// The timeout for the Event Hubs receiver. + /// Defaults to one minute. + /// + public TimeSpan ReceiveTimeout { get; set; } + + /// + /// Gets or sets a value indicating whether the runtime metric of a receiver is enabled (true) or disabled (false). + /// Defaults to false. + /// + public bool EnableReceiverRuntimeMetric { get; set; } + + /// + /// Determines whether IEventProcessor.OnEventsAsync is called when the Event Hubs receiver times out. + /// Set to true to get calls with empty event list. + /// Set to false to not get calls. + /// Defaults to false. + /// + public bool InvokeProcessorAfterReceiveTimeout { get; set; } + + /// + /// If there is no checkpoint, the user can provide a position for the Event Hubs receiver to start at. + /// Defaults to first event available in the stream. + /// + public Func InitialPositionProvider { get; set; } + + /// + /// ReceiverOptions used by the underlying Event Hubs client. + /// Defaults to null. + /// + public ReceiverOptions ClientReceiverOptions { get; set; } + + /// + /// TODO -- is this needed? It's called just before SFP.RunAsync throws out/returns to user code anyway. + /// But user code won't see that until it awaits the Task, so maybe this is useful? + /// + public ShutdownNotification OnShutdown { get; set; } + + internal void NotifyOnShutdown(Exception shutdownException) + { + if (this.OnShutdown != null) + { + this.OnShutdown(shutdownException); + } + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ICheckpointMananger.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ICheckpointMananger.cs new file mode 100644 index 0000000..5597d64 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ICheckpointMananger.cs @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System.Threading; + using System.Threading.Tasks; + + /// + /// Interface for a checkpoint manager which persists Checkpoints. + /// + public interface ICheckpointMananger + { + /// + /// Does the checkpoint store exist? + /// + /// + /// True if it exists, false if not. + Task CheckpointStoreExistsAsync(CancellationToken cancellationToken); + + /// + /// Create the checkpoint store if it doesn't exist. + /// + /// + /// True if it exists or was created OK, false if not. + Task CreateCheckpointStoreIfNotExistsAsync(CancellationToken cancellationToken); + + /// + /// Create an uninitialized checkpoint for the given partition. + /// + /// + /// + /// + Task CreateCheckpointIfNotExistsAsync(string partitionId, CancellationToken cancellationToken); + + /// + /// Get the checkpoint for the given partition. Returns null if there is no checkpoint or if it is uninitialized. + /// + /// + /// + /// + Task GetCheckpointAsync(string partitionId, CancellationToken cancellationToken); + + /// + /// Persist the Checkpoint for the given partition. + /// + /// + /// + /// + /// + Task UpdateCheckpointAsync(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken); + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/IEventProcessor.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/IEventProcessor.cs new file mode 100644 index 0000000..9af4bd9 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/IEventProcessor.cs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + /// + /// Interface for processing events. + /// + public abstract class IEventProcessor + { + /// + /// Called on startup. + /// + /// + /// + /// + abstract public Task OpenAsync(CancellationToken cancellationToken, PartitionContext context); + + /// + /// Called on shutdown. + /// + /// + /// + /// + abstract public Task CloseAsync(PartitionContext context, CloseReason reason); + + /// + /// Called when events are available. + /// + /// + /// + /// + /// + abstract public Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable events); + + /// + /// Called when an error occurs. + /// + /// + /// + /// + abstract public Task ProcessErrorAsync(PartitionContext context, Exception error); + + /// + /// Called periodically to get user-supplied load metrics. + /// + /// + /// + /// + virtual public Dictionary GetLoadMetric(CancellationToken cancellationToken, PartitionContext context) + { + // By default all partitions have a metric of named CountOfPartitions with value 1. If Service Fabric is configured to use this metric, + // it will balance primaries across nodes simply by the number of primaries on a node. This can be overridden to return + // more sophisticated metrics like number of events processed or CPU usage. + Dictionary defaultMetric = new Dictionary(); + defaultMetric.Add(Constants.DefaultUserLoadMetricName, 1); + return defaultMetric; + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Microsoft.Azure.EventHubs.ServiceFabricProcessor.csproj b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Microsoft.Azure.EventHubs.ServiceFabricProcessor.csproj new file mode 100644 index 0000000..a126013 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/Microsoft.Azure.EventHubs.ServiceFabricProcessor.csproj @@ -0,0 +1,47 @@ + + + + This is the next generation Azure Event Hubs .NET Standard Service Fabric Processor library, which integrates Event Hub event consumption with Service Fabric. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/ + Microsoft.Azure.EventHubs.ServiceFabricProcessor + 0.5.2-PREVIEW + Microsoft + netstandard2.0 + true + Microsoft.Azure.EventHubs.ServiceFabricProcessor + ../../build/keyfile.snk + true + true + Microsoft.Azure.EventHubs.ServiceFabricProcessor + Azure;Event Hubs;EventHubs;.NET;AMQP;IoT + https://github.com/Azure/azure-event-hubs-dotnet/releases + https://raw.githubusercontent.com/Azure/azure-event-hubs-dotnet/master/event-hubs.png + https://github.com/Azure/azure-event-hubs-dotnet + https://raw.githubusercontent.com/Azure/azure-event-hubs-dotnet/master/LICENSE + true + false + false + false + full + bin\$(Configuration)\$(TargetFramework)\Microsoft.Azure.EventHubs.Processor.xml + 0.5.2 + 0.5.2.0 + 0.5.2.0 + true + © Microsoft Corporation. All rights reserved. + + + + x64 + + + + + + + + + + + + + diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/PartitionContext.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/PartitionContext.cs new file mode 100644 index 0000000..83c2660 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/PartitionContext.cs @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System.Threading; + using System.Threading.Tasks; + + /// + /// Passed to an event processor instance to describe the environment. + /// + public class PartitionContext + { + readonly private ICheckpointMananger checkpointMananger; + + /// + /// Construct an instance. + /// + /// CancellationToken that the event processor should respect. Same as token passed to IEventProcessor methods. + /// Id of the partition for which the event processor is handling events. + /// Name of the event hub which is the source of events. + /// Name of the consumer group on the event hub. + /// The checkpoint manager instance to use. + public PartitionContext(CancellationToken cancellationToken, string partitionId, string eventHubPath, string consumerGroupName, ICheckpointMananger checkpointMananger) + { + this.CancellationToken = cancellationToken; + this.PartitionId = partitionId; + this.EventHubPath = eventHubPath; + this.ConsumerGroupName = consumerGroupName; + + // TODO: Requires client change to support + this.RuntimeInformation = null; // new ReceiverRuntimeInformation(this.PartitionId); + + this.checkpointMananger = checkpointMananger; + } + + /// + /// The event processor implementation should respect this CancellationToken. It is the same as the token passed + /// in to IEventProcessor methods. It is here primarily for compatibility with Event Processor Host. + /// + public CancellationToken CancellationToken { get; private set; } + + /// + /// Name of the consumer group on the event hub. + /// + public string ConsumerGroupName { get; private set; } + + /// + /// Name of the event hub. + /// + public string EventHubPath { get; private set; } + + /// + /// Id of the partition. + /// + public string PartitionId { get; private set; } + + /// + /// Gets the approximate receiver runtime information for a logical partition of an Event Hub. + /// To enable the setting, refer to + /// + public ReceiverRuntimeInformation RuntimeInformation + { + get; + // internal set; + } + + internal string Offset { get; set; } + + internal long SequenceNumber { get; set; } + + internal void SetOffsetAndSequenceNumber(EventData eventData) + { + this.Offset = eventData.SystemProperties.Offset; + this.SequenceNumber = eventData.SystemProperties.SequenceNumber; + } + + /// + /// Mark the last event of the current batch and all previous events as processed. + /// + /// + public async Task CheckpointAsync() + { + await CheckpointAsync(new Checkpoint(this.Offset, this.SequenceNumber)); + } + + /// + /// Mark the given event and all previous events as processed. + /// + /// Highest-processed event. + /// + public async Task CheckpointAsync(EventData eventData) + { + await CheckpointAsync(new Checkpoint(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber)); + } + + private async Task CheckpointAsync(Checkpoint checkpoint) + { + await this.checkpointMananger.UpdateCheckpointAsync(this.PartitionId, checkpoint, this.CancellationToken); + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ProgrammersGuide.md b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ProgrammersGuide.md new file mode 100644 index 0000000..3ef9fe4 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ProgrammersGuide.md @@ -0,0 +1,271 @@ +# Programmer's Guide to Service Fabric Processor + +## Introduction + +Service Fabric Processor (SFP) allows a user to easily create a Service +Fabric-based stateful service that processes events from an Event Hub. The +service is required to have exactly as many partitions as the target Event Hub +does, because each Service Fabric partition is permanently associated with a +corresponding Event Hub partition. The user's processing logic for events is +contained in their implementation of the IEventProcessor interface. Each +partition of the service has an instance of that implementation, which is called +by SFP to handle events consumed from the corresponding Event Hub partition. +Only the primary Service Fabric replica for each partition consumes and processes +events. The secondary replicas exist to maintain the reliable dictionary that +Service Fabric Processor uses for checkpointing. + +## IEventProcessor + +There are four methods that the user is required to implement, and one more +which the user may override if desired. + +Most of the methods provide a +CancellationToken as an argument, and it is important for long-running +operations in the user's code to honor that token. Service Fabric cancels it +when a primary replica's state is changing, and that is the code's chance to +clean up and shut down gracefully instead of being forcefully terminated! + +All methods provide a PartitionContext as an argument, which contains +various information about the Event Hub and the partition that may be useful +to the user's code. The PartitionContext also has a reference to the +CancellationToken. + +### OpenAsync + +```csharp +Task OpenAsync(CancellationToken cancellationToken, PartitionContext context) +``` + +SFP calls this method when a partition replica becomes primary. This is the +opportunity to initialize resources that the event processing logic will +use, such as a database connection, or perform any other startup needed. + +### ProcessEventsAsync + +```csharp +Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable events) +``` + +After the Task returned by OpenAsync completes, SFP calls this method repeatedly +as events become available. SFP makes only one call to ProcessEventsAsync at a +time: it waits for the Task returned by the current call to complete before +attempting to consume more events from the Event Hub. By default, this method +is called only when events are available, so if traffic on the associated +Event Hub partition is sparse, it may be some time between calls to this method. + +### CloseAsync + +```csharp +Task CloseAsync(PartitionContext context, CloseReason reason) +``` + +SFP calls this method when a primary replica is being shut down, whether due to +an Event Hub failure or because Service Fabric is changing the replica's state. +No cancellation token is provided because the user's code is expected to already +be shutting down as quickly as possible, and in many cases the token will already +be cancelled. The CloseReason indicates whether the shutdown is due to an Event +Hub failure or Service Fabric cancellation. + +This method will will not be called until the Task returned by the most recent +ProcessEventsAsync call has completed. + +### ProcessErrorAsync + +```csharp +Task ProcessErrorAsync(PartitionContext context, Exception error) +``` + +SFP calls this method when an Event Hubs error has occurred. It is purely +informational. Recovering from the error, if possible, is up to SFP. + +### GetLoadMetric + +```csharp +Dictionary GetLoadMetric(CancellationToken cancellationToken, PartitionContext context) +``` + +Service Fabric offers sophisticated load balancing of +partition replicas between nodes based on user-provided metrics, and this +method allows SFP users to take advantage of that feature. SFP polls this +method periodically and passes the metrics returned to Service Fabric. The +metrics are represented as a dictionary of string-int pairs, where the string +is the metric name and the int is the metric value. SFP provides a default +implementation of this method, which returns a metric named "CountOfPartitions" +that has a constant value of 1. + +It is up to the user to configure Service Fabric to use metrics returned by +this method. Service Fabric ignores any metrics not mentioned in the +configuration, so it is safe to return any metrics that might be interesting +and then decide later which particular ones to use. + +## Integrating With Service Fabric + +SFP requires a stateful Service Fabric service that is configured to have the +same number of partitions as the Event Hub. + +The user's service will have a class derived from the Service Fabric class +StatefulService, which in turn has a RunAsync method that is called on primary +partition replicas. SFP setup and activation occur within that RunAsync method. + +### SFP Options + +SFP provides a class EventProcessorOptions, which allows setting a wide variety +of options that adjust how SFP operates. To use it, create a new instance, which +is initialized with the default settings, then change the options of interest, +and finally pass the instance to the ServiceFabricProcessor constructor. + +```csharp +EventProcessorOptions options = new EventProcessorOptions(); +options.MaxBatchSize = 50; +ServiceFabricProcessor processor = new ServiceFabricProcessor(..., options); +``` + +Available options are: + +* int MaxBatchSize: the _maximum_ number of events that will be passed to +ProcessEventsAsync at one time. Default is 10. The actual number of events for +each call is variable and depends on how many events are available in the Event +Hub partition, how fast they can be transferred, how long the previous call to +ProcessEventsAsync took, and other factors. If your system as a whole is +processing events faster than they are generated, then much of the time +ProcessEventsAsync will be called with only one event. This option only +governs the _maximum_ number. + +* int PrefetchCount: passed to the underlying Event Hubs client, this option +governs how many events can be prefetched. Default is 300. This generally +comes into play only when there is a backlog of events due to slow processing. +If your system as a whole is processing events faster than they are generated, +then the prefetch buffer will be empty, because every event that becomes +available can be immediately passed to ProcessEventsAsync. + +* TimeSpan ReceiveTimeout: passed to the underlying Event Hubs client, this +option governs the timeout duration for the Event Hubs receiver. Default is +60 seconds. + +* bool EnableReceiverRuntimeMetric: TODO -- needs client changes before it +can be supported + +* bool InvokeProcessorAfterReceiveTimeout: if false, ProcessEventsAsync is +called only when at least one event is available. If true, ProcessEventsAsync +is called with an empty event list when a receive timeout occurs. Default is +false. + +* ShutdownNotification OnShutdown: set a delegate which is called just before +the Task returned by ServiceFabricProcessor.RunAsync completes. Depending on +how the code in the service's RunAsync is structured, SFP might shut down due +to an error long before RunAsync awaits the returned Task. This delegate +provides notification of such a shutdown. + +* Func InitialPositionProvider: see "Starting Position +and Checkpointing" below + +### Instantiating ServiceFabricProcessor + +```csharp +IEventProcessor myEventProcessor = new MyEventProcessorClass(...); +ServiceFabricProcessor processor = new ServiceFabricProcessor( + this.Context.ServiceName, + this.Context.PartitionId, + this.StateManager, + this.Partition, + myEventProcessor, + eventHubConnectionString, + eventHubConsumerGroup, + options +); +``` + +* The first four arguments are Service Fabric artifacts that SFP needs to get +information about Service Fabric partitions and to access Service Fabric reliable +dictionaries. They are all available as members of the StatefulService-derived +class. + +* The next argument is an instance of the user's implementation of IEventProcessor. + +* The next two arguments are the connection string of the Event Hub to consume +from, and the consumer group. The consumer group is optional: if null or omitted, +the default consumer group "$Default" is used. + +* The last argument is an optional instance of EventProcessorOptions. If null +or omitted, all options have their default value. + +### Start processing events + +```csharp +Task processing = processor.RunAsync(cancellationToken); +// do other stuff here if desired +await processing; +``` + +Note that ServiceFabricProcessor.RunAsync can be called only once. If the await +throws, you can either allow the exception to propagate out to Service Fabric and +let Service Fabric restart the replica, or you can create a new instance of +ServiceFabricProcessor and call RunAsync on the new instance. + +## Starting Position and Checkpointing + +When ServiceFabricProcessor.RunAsync is called, one of the first things it does +is to create a receiver on the Event Hub partition so it can consume events. +Because Event Hubs do not have a service-side cursor, a receiver must specify +a starting position when it is created. + +One of the features of SFP is checkpointing, which provides a client-side +cursor by persisting the offset of the last event processed successfully. +Checkpointing does not happen automatically, because there are scenarios +which do not need it. To use checkpointing, the user's implementation of +IEventProcessor.ProcessEventsAsync calls the CheckpointAsync methods on +the supplied PartitionContext. + +### Finding the Starting Position + +SFP follows these steps to determine the starting position when creating a +receiver: + +* If there is a checkpoint for the partition, start at the next event after +that position. + +* Else, call EventProcessorOptions.InitialPositionProvider, if present: when +setting up options, the user can provide a function which takes an Event Hub +partition id and returns an EventPosition. + +* Else, start at the oldest available event. + +### Checkpointing + +PartitionContext provides two methods for setting a checkpoint: + +* With no arguments, the checkpoint contains the position of the last event +in the current batch. If once-per-batch is a reasonable checkpoint strategy +for your application, calling PartitionContext.CheckpointAsync just before +returning from IEventProcessor.ProcessEventsAsync is simple and convenient. + +* The other overload of CheckpointAsync takes an EventData as an argument +and sets a checkpoint with the position of the given event. This way, your +application can checkpoint at any interval. + +It is important to await CheckpointAsync, to be sure that the checkpoint +was actually persisted. + +### Special Considerations + +A checkpoint is a representation of a position in the event stream of a +particular Event Hub+consumer group+partition combination. All events up to and +including the checkpointed position are assumed to be processed, and any events +after the position are assumed to be unprocessed. The intention is that a +newly-created receiver will pick up where the previous receiver left off, for +example when Service Fabric moves the primary replica of a partition from one +node to another for load balancing. + +For performance reasons, it is not always desirable to checkpoint after processing +each event. Checkpointing at larger intervals (for example, every ten events, or +every ten seconds, etc.) can improve performance, but also means that if the +receiver is recreated, the new receiver may consume events that have already +been processed. It is up to the application owner to evaluate the tradeoff between +performance and reprocessing events. + +Even if the application checkpoints after every event, that still +does not completely prevent the possibility of reprocessing an already-consumed +event, because there is always a time window between when an event is processed +and when the checkpoint is fully persisted, during which a node could fail. As a +best practice, an application must be able to cope with event reprocessing in some +way that is reasonable for the impact. diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ReliableDictionaryCheckpointMananger.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ReliableDictionaryCheckpointMananger.cs new file mode 100644 index 0000000..57e91c4 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ReliableDictionaryCheckpointMananger.cs @@ -0,0 +1,159 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using Microsoft.ServiceFabric.Data; + using Microsoft.ServiceFabric.Data.Collections; + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + class ReliableDictionaryCheckpointMananger : ICheckpointMananger + { + private IReliableStateManager reliableStateManager = null; + private IReliableDictionary> store = null; + + internal ReliableDictionaryCheckpointMananger(IReliableStateManager rsm) + { + this.reliableStateManager = rsm; + } + + public async Task CheckpointStoreExistsAsync(CancellationToken cancellationToken) + { + ConditionalValue> tryStore = await + this.reliableStateManager.TryGetAsync>(Constants.CheckpointDictionaryName); + EventProcessorEventSource.Current.Message($"CheckpointStoreExistsAsync = {tryStore.HasValue}"); + return tryStore.HasValue; + } + + public async Task CreateCheckpointStoreIfNotExistsAsync(CancellationToken cancellationToken) + { + // Create or get access to the dictionary. + this.store = await reliableStateManager.GetOrAddAsync>>(Constants.CheckpointDictionaryName); + EventProcessorEventSource.Current.Message("CreateCheckpointStoreIfNotExistsAsync OK"); + return true; + } + + public async Task CreateCheckpointIfNotExistsAsync(string partitionId, CancellationToken cancellationToken) + { + Checkpoint existingCheckpoint = await GetWithRetry(partitionId, cancellationToken); + + if (existingCheckpoint == null) + { + existingCheckpoint = new Checkpoint(1); + await PutWithRetry(partitionId, existingCheckpoint, cancellationToken); + } + EventProcessorEventSource.Current.Message("CreateCheckpointIfNotExists OK"); + + return existingCheckpoint; + } + + public async Task GetCheckpointAsync(string partitionId, CancellationToken cancellationToken) + { + return await GetWithRetry(partitionId, cancellationToken); + } + + public async Task UpdateCheckpointAsync(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken) + { + await PutWithRetry(partitionId, checkpoint, cancellationToken); + } + + // Throws on error or if cancelled. + // Returns null if there is no entry for the given partition. + private async Task GetWithRetry(string partitionId, CancellationToken cancellationToken) + { + EventProcessorEventSource.Current.Message($"Getting checkpoint for {partitionId}"); + + Checkpoint result = null; + Exception lastException = null; + for (int i = 0; i < Constants.RetryCount; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + lastException = null; + + try + { + using (ITransaction tx = this.reliableStateManager.CreateTransaction()) + { + ConditionalValue> rawCheckpoint = await + this.store.TryGetValueAsync(tx, partitionId, Constants.ReliableDictionaryTimeout, cancellationToken); + + await tx.CommitAsync(); + + // Success! Save the result, if any, and break out of the retry loop. + if (rawCheckpoint.HasValue) + { + result = Checkpoint.CreateFromDictionary(rawCheckpoint.Value); + } + else + { + result = null; + } + break; + } + } + catch (TimeoutException e) + { + lastException = e; + } + } + + if (lastException != null) + { + // Ran out of retries, throw. + throw new Exception("Ran out of retries creating checkpoint", lastException); + } + + if (result != null) + { + EventProcessorEventSource.Current.Message($"Got checkpoint for {partitionId}: {result.Offset}//{result.SequenceNumber}"); + } + else + { + EventProcessorEventSource.Current.Message($"No checkpoint found for {partitionId}: returning null"); + } + + return result; + } + + private async Task PutWithRetry(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken) + { + EventProcessorEventSource.Current.Message($"Setting checkpoint for {partitionId}: {checkpoint.Offset}//{checkpoint.SequenceNumber}"); + + Exception lastException = null; + for (int i = 0; i < Constants.RetryCount; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + lastException = null; + + Dictionary putThis = checkpoint.ToDictionary(); + + try + { + using (ITransaction tx = this.reliableStateManager.CreateTransaction()) + { + await this.store.SetAsync(tx, partitionId, putThis, Constants.ReliableDictionaryTimeout, cancellationToken); + await tx.CommitAsync(); + + // Success! Break out of the retry loop. + break; + } + } + catch (TimeoutException e) + { + lastException = e; + } + } + + if (lastException != null) + { + // Ran out of retries, throw. + throw new Exception("Ran out of retries creating checkpoint", lastException); + } + + EventProcessorEventSource.Current.Message($"Set checkpoint for {partitionId} OK"); + } + } +} diff --git a/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ServiceFabricProcessor.cs b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ServiceFabricProcessor.cs new file mode 100644 index 0000000..dd87c11 --- /dev/null +++ b/src/Microsoft.Azure.EventHubs.ServiceFabricProcessor/ServiceFabricProcessor.cs @@ -0,0 +1,464 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; + +namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor +{ + using System; + using System.Collections.Generic; + using System.Fabric; + using System.Fabric.Description; + using System.Fabric.Query; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.ServiceFabric.Data; + + /// + /// Base class that implements event processor functionality. + /// + public class ServiceFabricProcessor : IPartitionReceiveHandler + { + // Service Fabric objects initialized in constructor + private readonly IReliableStateManager serviceStateManager; + private readonly Uri serviceFabricServiceName; + private readonly Guid serviceFabricPartitionId; + private readonly IStatefulServicePartition servicePartition; + + // ServiceFabricProcessor settings initialized in constructor + private readonly IEventProcessor userEventProcessor; + private readonly EventProcessorOptions options; + private readonly ICheckpointMananger checkpointManager; + + // Initialized during RunAsync startup + private int fabricPartitionOrdinal = -1; + private int servicePartitionCount = -1; + private string hubPartitionId; + private PartitionContext partitionContext; + private string initialOffset; + private CancellationTokenSource internalCanceller; + private Exception internalFatalException; + private CancellationToken linkedCancellationToken; + private EventHubsConnectionStringBuilder ehConnectionString; + private string consumerGroupName; + + // Value managed by RunAsync + private int running = 0; + + + /// + /// Constructor. Arguments break down into three groups: (1) Service Fabric objects so this library can access + /// Service Fabric facilities, (2) Event Hub-related arguments which indicate what event hub to receive from and + /// how to process the events, and (3) advanced, which right now consists only of the ability to replace the default + /// reliable dictionary-based checkpoint manager with a user-provided implementation. + /// + /// Service Fabric Uri found in StatefulServiceContext + /// Service Fabric partition id found in StatefulServiceContext + /// Service Fabric-provided state manager, provides access to reliable dictionaries + /// Service Fabric-provided partition information + /// User's event processor implementation + /// Connection string for user's event hub + /// Name of event hub consumer group to receive from + /// Optional: Options structure for ServiceFabricProcessor library + /// Very advanced/optional: user-provided checkpoint manager implementation + public ServiceFabricProcessor(Uri serviceFabricServiceName, Guid serviceFabricPartitionId, IReliableStateManager stateManager, IStatefulServicePartition partition, IEventProcessor userEventProcessor, + string eventHubConnectionString, string eventHubConsumerGroup, + EventProcessorOptions options = null, ICheckpointMananger checkpointManager = null) + { + if (serviceFabricServiceName == null) + { + throw new ArgumentNullException("serviceFabricServiceName is null"); + } + if (serviceFabricPartitionId == null) + { + throw new ArgumentNullException("serviceFabricPartitionId is null"); + } + if (stateManager == null) + { + throw new ArgumentNullException("stateManager is null"); + } + if (partition == null) + { + throw new ArgumentNullException("partition is null"); + } + if (userEventProcessor == null) + { + throw new ArgumentNullException("userEventProcessor is null"); + } + if (string.IsNullOrEmpty(eventHubConnectionString)) + { + throw new ArgumentException("eventHubConnectionString is null or empty"); + } + if (string.IsNullOrEmpty(eventHubConsumerGroup)) + { + throw new ArgumentException("eventHubConsumerGroup is null or empty"); + } + + this.serviceFabricServiceName = serviceFabricServiceName; + this.serviceFabricPartitionId = serviceFabricPartitionId; + this.serviceStateManager = stateManager; + this.servicePartition = partition; + + this.userEventProcessor = userEventProcessor; + + this.ehConnectionString = new EventHubsConnectionStringBuilder(eventHubConnectionString); + this.consumerGroupName = eventHubConsumerGroup; + + this.options = options ?? new EventProcessorOptions(); + this.checkpointManager = checkpointManager ?? new ReliableDictionaryCheckpointMananger(this.serviceStateManager); + + this.EventHubClientFactory = new EventHubWrappers.EventHubClientFactory(); + this.TestMode = false; + } + + /// + /// For testing purposes. Do not change after calling RunAsync. + /// + public EventHubWrappers.IEventHubClientFactory EventHubClientFactory { get; set; } + + /// + /// For testing purposes. Do not change after calling RunAsync. + /// + public bool TestMode { get; set; } + + /// + /// Starts processing of events. + /// + /// Cancellation token provided by Service Fabric, assumed to indicate instance shutdown when cancelled. + /// Task that completes when event processing shuts down. + public async Task RunAsync(CancellationToken fabricCancellationToken) + { + if (Interlocked.Exchange(ref this.running, 1) == 1) + { + EventProcessorEventSource.Current.Message("Already running"); + throw new InvalidOperationException("EventProcessorService.RunAsync has already been called."); + } + + this.internalCanceller = new CancellationTokenSource(); + this.internalFatalException = null; + + try + { + using (CancellationTokenSource linkedCanceller = CancellationTokenSource.CreateLinkedTokenSource(fabricCancellationToken, this.internalCanceller.Token)) + { + this.linkedCancellationToken = linkedCanceller.Token; + + await InnerRunAsync(); + + this.options.NotifyOnShutdown(null); + } + } + catch (Exception e) + { + // If InnerRunAsync throws, that is intended to be a fatal exception for this instance. + // Catch it here just long enough to log and notify, then rethrow. + + EventProcessorEventSource.Current.Message("THROWING OUT: {0}", e); + if (e.InnerException != null) + { + EventProcessorEventSource.Current.Message("THROWING OUT INNER: {0}", e.InnerException); + } + this.options.NotifyOnShutdown(e); + throw e; + } + } + + private async Task InnerRunAsync() + { + EventHubWrappers.IEventHubClient ehclient = null; + EventHubWrappers.IPartitionReceiver receiver = null; + + try + { + // + // Get Service Fabric partition information. + // + await GetServicePartitionId(this.linkedCancellationToken); + + // + // Create EventHubClient and check partition count. + // + Exception lastException = null; + EventProcessorEventSource.Current.Message("Creating event hub client"); + lastException = RetryWrapper(() => { ehclient = this.EventHubClientFactory.CreateFromConnectionString(this.ehConnectionString.ToString()); }); + if (ehclient == null) + { + EventProcessorEventSource.Current.Message("Out of retries event hub client"); + throw new Exception("Out of retries creating EventHubClient", lastException); + } + EventProcessorEventSource.Current.Message("Event hub client OK"); + EventProcessorEventSource.Current.Message("Getting event hub info"); + EventHubRuntimeInformation ehInfo = null; + // Lambda MUST be synchronous to work with RetryWrapper! + lastException = RetryWrapper(() => { ehInfo = ehclient.GetRuntimeInformationAsync().Result; }); + if (ehInfo == null) + { + EventProcessorEventSource.Current.Message("Out of retries getting event hub info"); + throw new Exception("Out of retries getting event hub runtime info", lastException); + } + if (this.TestMode) + { + if (this.servicePartitionCount > ehInfo.PartitionCount) + { + EventProcessorEventSource.Current.Message("TestMode requires event hub partition count larger than service partitinon count"); + throw new EventProcessorConfigurationException("TestMode requires event hub partition count larger than service partitinon count"); + } + else if (this.servicePartitionCount < ehInfo.PartitionCount) + { + EventProcessorEventSource.Current.Message("TestMode: receiving from subset of event hub"); + } + } + else if (ehInfo.PartitionCount != this.servicePartitionCount) + { + EventProcessorEventSource.Current.Message($"Service partition count {this.servicePartitionCount} does not match event hub partition count {ehInfo.PartitionCount}"); + throw new EventProcessorConfigurationException($"Service partition count {this.servicePartitionCount} does not match event hub partition count {ehInfo.PartitionCount}"); + } + this.hubPartitionId = ehInfo.PartitionIds[this.fabricPartitionOrdinal]; + + // + // Generate a PartitionContext now that the required info is available. + // + this.partitionContext = new PartitionContext(this.linkedCancellationToken, this.hubPartitionId, this.ehConnectionString.EntityPath, this.consumerGroupName, this.checkpointManager); + + // + // Start up checkpoint manager. + // + await CheckpointStartup(this.linkedCancellationToken); + + // + // If there was a checkpoint, the offset is in this.initialOffset, so convert it to an EventPosition. + // If no checkpoint, get starting point from user-supplied provider. + // + EventPosition initialPosition = null; + if (this.initialOffset != null) + { + EventProcessorEventSource.Current.Message($"Initial position from checkpoint, offset {this.initialOffset}"); + initialPosition = EventPosition.FromOffset(this.initialOffset); + } + else + { + initialPosition = this.options.InitialPositionProvider(this.hubPartitionId); + EventProcessorEventSource.Current.Message("Initial position from provider"); + } + + // + // Create receiver. + // + EventProcessorEventSource.Current.Message("Creating receiver"); + lastException = RetryWrapper(() => { receiver = ehclient.CreateEpochReceiver(this.consumerGroupName, this.hubPartitionId, initialPosition, this.initialOffset, + Constants.FixedReceiverEpoch, this.options.ClientReceiverOptions); }); + if (receiver == null) + { + EventProcessorEventSource.Current.Message("Out of retries creating receiver"); + throw new Exception("Out of retries creating event hub receiver", lastException); + } + + // + // Call Open on user's event processor instance. + // If user's Open code fails, treat that as a fatal exception and let it throw out. + // + EventProcessorEventSource.Current.Message("Creating event processor"); + await this.userEventProcessor.OpenAsync(this.linkedCancellationToken, this.partitionContext); + EventProcessorEventSource.Current.Message("Event processor created and opened OK"); + + // + // Start metrics reporting. This runs as a separate background thread. + // + Thread t = new Thread(this.MetricsHandler); + t.Start(); + + // + // Receive pump. + // + EventProcessorEventSource.Current.Message("RunAsync setting handler and waiting"); + this.MaxBatchSize = this.options.MaxBatchSize; + receiver.SetReceiveHandler(this, this.options.InvokeProcessorAfterReceiveTimeout); + this.linkedCancellationToken.WaitHandle.WaitOne(); + + EventProcessorEventSource.Current.Message("RunAsync continuing, cleanup"); + } + finally + { + if (this.partitionContext != null) + { + await this.userEventProcessor.CloseAsync(this.partitionContext, this.linkedCancellationToken.IsCancellationRequested ? CloseReason.Cancelled : CloseReason.Failure); + } + if (receiver != null) + { + receiver.SetReceiveHandler(null); + await receiver.CloseAsync(); + } + if (ehclient != null) + { + await ehclient.CloseAsync(); + } + if (this.internalFatalException != null) + { + throw this.internalFatalException; + } + } + } + + private EventHubsException RetryWrapper(Action action) + { + EventHubsException lastException = null; + + for (int i = 0; i < Constants.RetryCount; i++) + { + this.linkedCancellationToken.ThrowIfCancellationRequested(); + try + { + action.Invoke(); + break; + } + catch (EventHubsException e) + { + if (!e.IsTransient) + { + throw e; + } + lastException = e; + } + } + + return lastException; + } + + /// + /// From IPartitionReceiveHandler + /// + public int MaxBatchSize { get; set; } + + async Task IPartitionReceiveHandler.ProcessEventsAsync(IEnumerable events) + { + IEnumerable effectiveEvents = events ?? new List(); // convert to empty list if events is null + + if (events != null) + { + // Save position of last event if we got a real list of events + IEnumerator scanner = effectiveEvents.GetEnumerator(); + EventData last = null; + while (scanner.MoveNext()) + { + last = scanner.Current; + } + if (last != null) + { + this.partitionContext.SetOffsetAndSequenceNumber(last); + if (this.options.EnableReceiverRuntimeMetric) + { + // TODO: requires client change to support + // this.partitionContext.RuntimeInformation.Update(last); + } + } + } + + await this.userEventProcessor.ProcessEventsAsync(this.linkedCancellationToken, this.partitionContext, effectiveEvents); + + foreach (EventData ev in effectiveEvents) + { + ev.Dispose(); + } + } + + Task IPartitionReceiveHandler.ProcessErrorAsync(Exception error) + { + EventProcessorEventSource.Current.Message($"RECEIVE EXCEPTION on {this.hubPartitionId}: {error}"); + this.userEventProcessor.ProcessErrorAsync(this.partitionContext, error); + if (error is EventHubsException) + { + if (!(error as EventHubsException).IsTransient) + { + this.internalFatalException = error; + this.internalCanceller.Cancel(); + } + // else don't cancel on transient errors + } + else + { + // All other exceptions are assumed fatal. + this.internalFatalException = error; + this.internalCanceller.Cancel(); + } + return Task.CompletedTask; + } + + private async Task CheckpointStartup(CancellationToken cancellationToken) + { + // Set up store and get checkpoint, if any. + await this.checkpointManager.CreateCheckpointStoreIfNotExistsAsync(cancellationToken); + Checkpoint checkpoint = await this.checkpointManager.CreateCheckpointIfNotExistsAsync(this.hubPartitionId, cancellationToken); + if (!checkpoint.Valid) + { + // Not actually any existing checkpoint. + this.initialOffset = null; + EventProcessorEventSource.Current.Message("No checkpoint"); + } + else if (checkpoint.Version == 1) + { + this.initialOffset = checkpoint.Offset; + EventProcessorEventSource.Current.Message($"Checkpoint provides initial offset {this.initialOffset}"); + } + else + { + // It's actually a later-version checkpoint but we don't know the details. + // Access it via the V1 interface and hope it does something sensible. + this.initialOffset = checkpoint.Offset; + EventProcessorEventSource.Current.Message($"Unexpected checkpoint version {checkpoint.Version}, provided initial offset {this.initialOffset}"); + } + } + + private async Task GetServicePartitionId(CancellationToken cancellationToken) + { + if (this.fabricPartitionOrdinal == -1) + { + using (FabricClient fabricClient = new FabricClient()) + { + ServicePartitionList partitionList = + await fabricClient.QueryManager.GetPartitionListAsync(this.serviceFabricServiceName); + + // Set the number of partitions + this.servicePartitionCount = partitionList.Count; + + // Which partition is this one? + for (int a = 0; a < partitionList.Count; a++) + { + if (partitionList[a].PartitionInformation.Id == this.serviceFabricPartitionId) + { + this.fabricPartitionOrdinal = a; + break; + } + } + + EventProcessorEventSource.Current.Message($"Total partitions {this.servicePartitionCount}"); + } + } + } + + private void MetricsHandler() + { + EventProcessorEventSource.Current.Message("METRIC reporter starting"); + + while (!this.linkedCancellationToken.IsCancellationRequested) + { + Dictionary userMetrics = this.userEventProcessor.GetLoadMetric(this.linkedCancellationToken, this.partitionContext); + + try + { + List reportableMetrics = new List(); + foreach (KeyValuePair metric in userMetrics) + { + EventProcessorEventSource.Current.Message($"METRIC {metric.Key} for partition {this.partitionContext.PartitionId} is {metric.Value}"); + reportableMetrics.Add(new LoadMetric(metric.Key, metric.Value)); + } + this.servicePartition.ReportLoad(reportableMetrics); + Task.Delay(Constants.MetricReportingInterval, this.linkedCancellationToken).Wait(); // throws on cancel + } + catch (Exception e) + { + EventProcessorEventSource.Current.Message($"METRIC partition {this.partitionContext.PartitionId} exception {e}"); + } + } + + EventProcessorEventSource.Current.Message("METRIC reporter exiting"); + } + } +} diff --git a/src/Microsoft.Azure.EventHubs/Properties/AssemblyInfo.cs b/src/Microsoft.Azure.EventHubs/Properties/AssemblyInfo.cs index 5aaee42..568ea55 100644 --- a/src/Microsoft.Azure.EventHubs/Properties/AssemblyInfo.cs +++ b/src/Microsoft.Azure.EventHubs/Properties/AssemblyInfo.cs @@ -33,4 +33,4 @@ using System.Runtime.InteropServices; "dd8a96737e5385b31414369dc3e42f371172127252856a0650793e1f5673a16d5d78e2ac852a10" + "4bc51e6f018dca44fdd26a219c27cb2b263956a80620223c8e9c2f8913c3c903e1e453e9e4e840" + "98afdad5f4badb8c1ebe0a7b0a4b57a08454646a65886afe3e290a791ff3260099ce0edf0bdbcc" + -"afadfeb6")] \ No newline at end of file +"afadfeb6")]