From 8774bc4afbfbe44f1722ee648cbaca7ae07edede Mon Sep 17 00:00:00 2001 From: Serkant Karaca Date: Mon, 23 Jan 2017 14:33:44 -0800 Subject: [PATCH] Adding EndOfStream receiver support (#93) * Adding EndOfStream receiver support. * Adding EPH EndOfStream unut test * Remove DiscoverEndOfStream() call from InitialOffsetProviderWithOffset test since it is unnecessary. --- .../PartitionReceiver.cs | 6 +++ .../EventProcessorHostTests.cs | 45 ++++++++++++---- .../EventHubClientTests.cs | 53 +++++++++++++++++++ 3 files changed, 95 insertions(+), 9 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs/PartitionReceiver.cs b/src/Microsoft.Azure.EventHubs/PartitionReceiver.cs index 4af7941..7ea828e 100644 --- a/src/Microsoft.Azure.EventHubs/PartitionReceiver.cs +++ b/src/Microsoft.Azure.EventHubs/PartitionReceiver.cs @@ -27,6 +27,12 @@ namespace Microsoft.Azure.EventHubs /// public static readonly string StartOfStream = "-1"; + /// + /// The constant that denotes the end of a stream. This can be used as an offset argument in receiver creation to + /// start receiving from the latest event, instead of a specific point in time/offset value. + /// + public static readonly string EndOfStream = "@latest"; + /// /// The default consumer group name: $Default. /// diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs index 4457efd..f14cfc0 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs @@ -509,10 +509,10 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests async Task InitialOffsetProviderWithDateTime() { // Send and receive single message so we can find out enqueue date-time of the last message. - var lastEvents = await DiscoverEndOfStream(); + var partitions = await DiscoverEndOfStream(); // We will use last enqueued message's enqueue date-time so EPH will pick messages only after that point. - var lastEnqueueDateTime = lastEvents.Max(le => le.Value.Item2); + var lastEnqueueDateTime = partitions.Max(le => le.Value.Item2); Log($"Last message enqueued at {lastEnqueueDateTime}"); // Use a randomly generated container name so that initial offset provider will be respected. @@ -540,11 +540,11 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests async Task InitialOffsetProviderWithOffset() { // Send and receive single message so we can find out offset of the last message. - var lastOffsets = await DiscoverEndOfStream(); + var partitions = await DiscoverEndOfStream(); Log("Discovered last event offsets on each partition as below:"); - foreach (var lastEvent in lastOffsets) + foreach (var p in partitions) { - Log($"Partition {lastEvent.Key}: {lastEvent.Value.Item1}"); + Log($"Partition {p.Key}: {p.Value.Item1}"); } // Use a randomly generated container name so that initial offset provider will be respected. @@ -558,7 +558,31 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests var processorOptions = new EventProcessorOptions { ReceiveTimeout = TimeSpan.FromSeconds(15), - InitialOffsetProvider = partitionId => lastOffsets[partitionId].Item1, + InitialOffsetProvider = partitionId => partitions[partitionId].Item1, + MaxBatchSize = 100 + }; + + var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + + // We should have received only 1 event from each partition. + Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + + [Fact] + async Task InitialOffsetProviderWithEndOfStream() + { + // Use a randomly generated container name so that initial offset provider will be respected. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + this.EventHubConnectionString, + this.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(15), + InitialOffsetProvider = partitionId => PartitionReceiver.EndOfStream, MaxBatchSize = 100 }; @@ -702,15 +726,15 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests async Task>> DiscoverEndOfStream() { var ehClient = EventHubClient.CreateFromConnectionString(this.EventHubConnectionString); - var lastEvents = new Dictionary>(); + var partitions = new Dictionary>(); foreach (var pid in this.PartitionIds) { var pInfo = await ehClient.GetPartitionRuntimeInformationAsync(pid); - lastEvents.Add(pid, Tuple.Create(pInfo.LastEnqueuedOffset, pInfo.LastEnqueuedTimeUtc)); + partitions.Add(pid, Tuple.Create(pInfo.LastEnqueuedOffset, pInfo.LastEnqueuedTimeUtc)); } - return lastEvents.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + return partitions.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); } async Task>> RunGenericScenario(EventProcessorHost eventProcessorHost, @@ -769,6 +793,9 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); + // Wait 5 seconds to avoid races in scenarios like EndOfStream. + await Task.Delay(5000); + Log($"Sending {totalNumberOfEventsToSend} event(s) to each partition"); var sendTasks = new List(); foreach (var partitionId in PartitionIds) diff --git a/test/Microsoft.Azure.EventHubs.UnitTests/EventHubClientTests.cs b/test/Microsoft.Azure.EventHubs.UnitTests/EventHubClientTests.cs index ad1764f..21fac62 100644 --- a/test/Microsoft.Azure.EventHubs.UnitTests/EventHubClientTests.cs +++ b/test/Microsoft.Azure.EventHubs.UnitTests/EventHubClientTests.cs @@ -267,6 +267,59 @@ namespace Microsoft.Azure.EventHubs.UnitTests } } + [Fact] + async Task CreateReceiverWithEndOfStream() + { + // Randomly pick one of the available partitons. + var partitionId = this.PartitionIds[new Random().Next(this.PartitionIds.Count())]; + Log($"Randomly picked partition {partitionId}"); + + var partitionSender = this.EventHubClient.CreatePartitionSender(partitionId); + + // Send couple of messages before creating an EndOfStream receiver. + // We are not expecting to receive these messages would be sent before receiver creation. + for (int i = 0; i < 10; i++) + { + var ed = new EventData(new byte[1]); + await partitionSender.SendAsync(ed); + } + + // Create a new receiver which will start reading from the end of the stream. + Log($"Creating a new receiver with offset EndOFStream"); + var receiver = this.EventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, PartitionReceiver.EndOfStream); + + // Attemp to receive the message. This should return only 1 message. + var receiveTask = receiver.ReceiveAsync(100); + + // Send a new message which is expected to go to the end of stream. + // We are expecting to receive only this message. + // Wait 5 seconds before sending to avoid race. + await Task.Delay(5000); + var eventToReceive = new EventData(new byte[1]); + eventToReceive.Properties = new Dictionary(); + eventToReceive.Properties.Add("stamp", Guid.NewGuid().ToString()); + await partitionSender.SendAsync(eventToReceive); + + // Complete asyncy receive task. + var receivedMessages = await receiveTask; + + // We should have received only 1 message from this call. + Assert.True(receivedMessages.Count() == 1, $"Didn't receive 1 message. Received {receivedMessages.Count()} messages(s)."); + + // Check stamp. + Assert.True(receivedMessages.Single().Properties["stamp"].ToString() == eventToReceive.Properties["stamp"].ToString() + , "Stamps didn't match on the message sent and received!"); + + Log("Received correct message as expected."); + + // Next receive on this partition shouldn't return any more messages. + receivedMessages = await receiver.ReceiveAsync(100, TimeSpan.FromSeconds(15)); + Assert.True(receivedMessages == null, $"Received messages at the end."); + + await partitionSender.CloseAsync(); + await receiver.CloseAsync(); + } + [Fact] async Task CreateReceiverWithOffset() {