Adding EndOfStream receiver support (#93)

* Adding EndOfStream receiver support.

* Adding EPH EndOfStream unut test

* Remove DiscoverEndOfStream() call from InitialOffsetProviderWithOffset test since it is unnecessary.
This commit is contained in:
Serkant Karaca 2017-01-23 14:33:44 -08:00 коммит произвёл GitHub
Родитель ceda3fec37
Коммит 8774bc4afb
3 изменённых файлов: 95 добавлений и 9 удалений

Просмотреть файл

@ -27,6 +27,12 @@ namespace Microsoft.Azure.EventHubs
/// </summary> /// </summary>
public static readonly string StartOfStream = "-1"; public static readonly string StartOfStream = "-1";
/// <summary>
/// The constant that denotes the end of a stream. This can be used as an offset argument in receiver creation to
/// start receiving from the latest event, instead of a specific point in time/offset value.
/// </summary>
public static readonly string EndOfStream = "@latest";
/// <summary> /// <summary>
/// The default consumer group name: $Default. /// The default consumer group name: $Default.
/// </summary> /// </summary>

Просмотреть файл

@ -509,10 +509,10 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests
async Task InitialOffsetProviderWithDateTime() async Task InitialOffsetProviderWithDateTime()
{ {
// Send and receive single message so we can find out enqueue date-time of the last message. // Send and receive single message so we can find out enqueue date-time of the last message.
var lastEvents = await DiscoverEndOfStream(); var partitions = await DiscoverEndOfStream();
// We will use last enqueued message's enqueue date-time so EPH will pick messages only after that point. // We will use last enqueued message's enqueue date-time so EPH will pick messages only after that point.
var lastEnqueueDateTime = lastEvents.Max(le => le.Value.Item2); var lastEnqueueDateTime = partitions.Max(le => le.Value.Item2);
Log($"Last message enqueued at {lastEnqueueDateTime}"); Log($"Last message enqueued at {lastEnqueueDateTime}");
// Use a randomly generated container name so that initial offset provider will be respected. // Use a randomly generated container name so that initial offset provider will be respected.
@ -540,11 +540,11 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests
async Task InitialOffsetProviderWithOffset() async Task InitialOffsetProviderWithOffset()
{ {
// Send and receive single message so we can find out offset of the last message. // Send and receive single message so we can find out offset of the last message.
var lastOffsets = await DiscoverEndOfStream(); var partitions = await DiscoverEndOfStream();
Log("Discovered last event offsets on each partition as below:"); Log("Discovered last event offsets on each partition as below:");
foreach (var lastEvent in lastOffsets) foreach (var p in partitions)
{ {
Log($"Partition {lastEvent.Key}: {lastEvent.Value.Item1}"); Log($"Partition {p.Key}: {p.Value.Item1}");
} }
// Use a randomly generated container name so that initial offset provider will be respected. // Use a randomly generated container name so that initial offset provider will be respected.
@ -558,7 +558,31 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests
var processorOptions = new EventProcessorOptions var processorOptions = new EventProcessorOptions
{ {
ReceiveTimeout = TimeSpan.FromSeconds(15), ReceiveTimeout = TimeSpan.FromSeconds(15),
InitialOffsetProvider = partitionId => lastOffsets[partitionId].Item1, InitialOffsetProvider = partitionId => partitions[partitionId].Item1,
MaxBatchSize = 100
};
var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions);
// We should have received only 1 event from each partition.
Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
}
[Fact]
async Task InitialOffsetProviderWithEndOfStream()
{
// Use a randomly generated container name so that initial offset provider will be respected.
var eventProcessorHost = new EventProcessorHost(
string.Empty,
PartitionReceiver.DefaultConsumerGroupName,
this.EventHubConnectionString,
this.StorageConnectionString,
Guid.NewGuid().ToString());
var processorOptions = new EventProcessorOptions
{
ReceiveTimeout = TimeSpan.FromSeconds(15),
InitialOffsetProvider = partitionId => PartitionReceiver.EndOfStream,
MaxBatchSize = 100 MaxBatchSize = 100
}; };
@ -702,15 +726,15 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests
async Task<Dictionary<string, Tuple<string, DateTime>>> DiscoverEndOfStream() async Task<Dictionary<string, Tuple<string, DateTime>>> DiscoverEndOfStream()
{ {
var ehClient = EventHubClient.CreateFromConnectionString(this.EventHubConnectionString); var ehClient = EventHubClient.CreateFromConnectionString(this.EventHubConnectionString);
var lastEvents = new Dictionary<string, Tuple<string, DateTime>>(); var partitions = new Dictionary<string, Tuple<string, DateTime>>();
foreach (var pid in this.PartitionIds) foreach (var pid in this.PartitionIds)
{ {
var pInfo = await ehClient.GetPartitionRuntimeInformationAsync(pid); var pInfo = await ehClient.GetPartitionRuntimeInformationAsync(pid);
lastEvents.Add(pid, Tuple.Create(pInfo.LastEnqueuedOffset, pInfo.LastEnqueuedTimeUtc)); partitions.Add(pid, Tuple.Create(pInfo.LastEnqueuedOffset, pInfo.LastEnqueuedTimeUtc));
} }
return lastEvents.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); return partitions.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
} }
async Task<Dictionary<string, List<EventData>>> RunGenericScenario(EventProcessorHost eventProcessorHost, async Task<Dictionary<string, List<EventData>>> RunGenericScenario(EventProcessorHost eventProcessorHost,
@ -769,6 +793,9 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests
await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo);
// Wait 5 seconds to avoid races in scenarios like EndOfStream.
await Task.Delay(5000);
Log($"Sending {totalNumberOfEventsToSend} event(s) to each partition"); Log($"Sending {totalNumberOfEventsToSend} event(s) to each partition");
var sendTasks = new List<Task>(); var sendTasks = new List<Task>();
foreach (var partitionId in PartitionIds) foreach (var partitionId in PartitionIds)

Просмотреть файл

@ -267,6 +267,59 @@ namespace Microsoft.Azure.EventHubs.UnitTests
} }
} }
[Fact]
async Task CreateReceiverWithEndOfStream()
{
// Randomly pick one of the available partitons.
var partitionId = this.PartitionIds[new Random().Next(this.PartitionIds.Count())];
Log($"Randomly picked partition {partitionId}");
var partitionSender = this.EventHubClient.CreatePartitionSender(partitionId);
// Send couple of messages before creating an EndOfStream receiver.
// We are not expecting to receive these messages would be sent before receiver creation.
for (int i = 0; i < 10; i++)
{
var ed = new EventData(new byte[1]);
await partitionSender.SendAsync(ed);
}
// Create a new receiver which will start reading from the end of the stream.
Log($"Creating a new receiver with offset EndOFStream");
var receiver = this.EventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, PartitionReceiver.EndOfStream);
// Attemp to receive the message. This should return only 1 message.
var receiveTask = receiver.ReceiveAsync(100);
// Send a new message which is expected to go to the end of stream.
// We are expecting to receive only this message.
// Wait 5 seconds before sending to avoid race.
await Task.Delay(5000);
var eventToReceive = new EventData(new byte[1]);
eventToReceive.Properties = new Dictionary<string, object>();
eventToReceive.Properties.Add("stamp", Guid.NewGuid().ToString());
await partitionSender.SendAsync(eventToReceive);
// Complete asyncy receive task.
var receivedMessages = await receiveTask;
// We should have received only 1 message from this call.
Assert.True(receivedMessages.Count() == 1, $"Didn't receive 1 message. Received {receivedMessages.Count()} messages(s).");
// Check stamp.
Assert.True(receivedMessages.Single().Properties["stamp"].ToString() == eventToReceive.Properties["stamp"].ToString()
, "Stamps didn't match on the message sent and received!");
Log("Received correct message as expected.");
// Next receive on this partition shouldn't return any more messages.
receivedMessages = await receiver.ReceiveAsync(100, TimeSpan.FromSeconds(15));
Assert.True(receivedMessages == null, $"Received messages at the end.");
await partitionSender.CloseAsync();
await receiver.CloseAsync();
}
[Fact] [Fact]
async Task CreateReceiverWithOffset() async Task CreateReceiverWithOffset()
{ {