* draft

* fix handling of batchposition, and tolerate race condition for deleted blob

* address PR feedback

* fix incorrect handling of skipped events

* fix check that removes confirmed events and add detail-level tracing

* fix broken trace statement

* fix handling of blob deleted

* add more tracing to blob download

* do not process any more events if already shutting down

* address PR feedback (add comments)

* fix handling of missing blobs

* Update src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs

Co-authored-by: David Justo <david.justo.1996@gmail.com>

---------

Co-authored-by: David Justo <david.justo.1996@gmail.com>
This commit is contained in:
Sebastian Burckhardt 2024-04-16 11:24:38 -07:00 коммит произвёл GitHub
Родитель 96b3ca5e97
Коммит ab706eb79e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
7 изменённых файлов: 176 добавлений и 144 удалений

Просмотреть файл

@ -759,7 +759,7 @@ namespace DurableTask.Netherite.Faster
// (note that it may not be the very next in the sequence since readonly events are not persisted in the log)
if (partitionUpdateEvent.NextInputQueuePosition > 0 && partitionUpdateEvent.NextInputQueuePositionTuple.CompareTo(this.InputQueuePosition) <= 0)
{
this.partition.ErrorHandler.HandleError(nameof(ProcessUpdate), "Duplicate event detected", null, false, false);
this.partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Duplicate event detected: #{partitionUpdateEvent.NextInputQueuePositionTuple}", null, true, false);
return;
}

Просмотреть файл

@ -215,8 +215,8 @@ namespace DurableTask.Netherite
// ----- general event processing and statistics
[Event(240, Level = EventLevel.Informational, Version = 3)]
public void PartitionEventProcessed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, string Category, string PartitionEventId, string EventInfo, string InstanceId, long NextCommitLogPosition, long NextInputQueuePosition, double QueueElapsedMs, double FetchElapsedMs, double ElapsedMs, bool IsReplaying, string AppName, string ExtensionVersion)
[Event(240, Level = EventLevel.Informational, Version = 4)]
public void PartitionEventProcessed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, string Category, string PartitionEventId, string EventInfo, string InstanceId, long NextCommitLogPosition, string NextInputQueuePosition, double QueueElapsedMs, double FetchElapsedMs, double ElapsedMs, bool IsReplaying, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(240, Account, TaskHub, PartitionId, CommitLogPosition, Category, PartitionEventId, EventInfo, InstanceId, NextCommitLogPosition, NextInputQueuePosition, QueueElapsedMs, FetchElapsedMs, ElapsedMs, IsReplaying, AppName, ExtensionVersion);

Просмотреть файл

@ -50,14 +50,15 @@ namespace DurableTask.Netherite
double queueLatencyMs = evt.IssuedTimestamp - evt.ReceivedTimestamp;
double fetchLatencyMs = startedTimestamp - evt.IssuedTimestamp;
double latencyMs = finishedTimestamp - startedTimestamp;
string nextInputQueuePosition = evt.NextInputQueuePosition > 0 ? $"({evt.NextInputQueuePosition},{evt.NextInputQueueBatchPosition})" : string.Empty;
if (this.logger.IsEnabled(LogLevel.Information))
{
var details = string.Format($"{(replaying ? "Replayed" : "Processed")} {(evt.NextInputQueuePosition > 0 ? "external" : "internal")} {category}");
this.logger.LogInformation("Part{partition:D2}.{commitLogPosition:D10} {details} {event} eventId={eventId} instanceId={instanceId} pos=({nextCommitLogPosition},{nextInputQueuePosition}) latency=({queueLatencyMs:F0}, {fetchLatencyMs:F0}, {latencyMs:F0})", this.partitionId, commitLogPosition, details, evt, evt.EventIdString, evt.TracedInstanceId, nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs);
this.logger.LogInformation("Part{partition:D2}.{commitLogPosition:D10} {details} {event} eventId={eventId} instanceId={instanceId} nextCommitLogPosition={nextCommitLogPosition} nextInputQueuePosition={nextInputQueuePosition} latency=({queueLatencyMs:F0}, {fetchLatencyMs:F0}, {latencyMs:F0})", this.partitionId, commitLogPosition, details, evt, evt.EventIdString, evt.TracedInstanceId, nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs);
}
this.etw?.PartitionEventProcessed(this.account, this.taskHub, this.partitionId, commitLogPosition, category.ToString(), evt.EventIdString, evt.ToString(), evt.TracedInstanceId ?? string.Empty, nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs, replaying, TraceUtils.AppName, TraceUtils.ExtensionVersion) ;
this.etw?.PartitionEventProcessed(this.account, this.taskHub, this.partitionId, commitLogPosition, category.ToString(), evt.EventIdString, evt.ToString(), evt.TracedInstanceId ?? string.Empty, nextCommitLogPosition, nextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs, replaying, TraceUtils.AppName, TraceUtils.ExtensionVersion) ;
}
}

Просмотреть файл

@ -26,15 +26,12 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly EventHubsTraceHelper traceHelper;
readonly EventHubsTraceHelper lowestTraceLevel;
readonly BlobContainerClient containerClient;
readonly bool keepUntilConfirmed;
readonly bool isClientReceiver;
// Event Hubs discards messages after 24h, so we can throw away batches that are older than that
readonly static TimeSpan expirationTimeSpan = TimeSpan.FromHours(24) + TimeSpan.FromMinutes(1);
BlobDeletions blobDeletions;
public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings, bool keepUntilConfirmed)
public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
{
this.traceContext = traceContext;
this.traceHelper = traceHelper;
@ -42,16 +39,15 @@ namespace DurableTask.Netherite.EventHubsTransport
var serviceClient = BlobUtilsV12.GetServiceClients(settings.BlobStorageConnection).WithRetries;
string containerName = BlobManager.GetContainerName(settings.HubName);
this.containerClient = serviceClient.GetBlobContainerClient(containerName);
this.keepUntilConfirmed = keepUntilConfirmed;
this.blobDeletions = this.keepUntilConfirmed ? new BlobDeletions(this) : null;
this.isClientReceiver = typeof(TEvent) == typeof(ClientEvent);
}
public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long)> ReceiveEventsAsync(
public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long, BlockBlobClient)> ReceiveEventsAsync(
byte[] guid,
IEnumerable<EventData> hubMessages,
[EnumeratorCancellation] CancellationToken token,
MutableLong nextPacketToReceive = null)
IPartitionErrorHandler errorHandler = null,
Position nextPacketToReceive = null)
{
int ignoredPacketCount = 0;
@ -61,14 +57,14 @@ namespace DurableTask.Netherite.EventHubsTransport
if (nextPacketToReceive != null)
{
if (seqno < nextPacketToReceive.Value)
if (seqno < nextPacketToReceive.SeqNo)
{
this.lowestTraceLevel?.LogTrace("{context} discarded packet #{seqno} because it is already processed", this.traceContext, seqno);
continue;
}
else if (seqno > nextPacketToReceive.Value)
else if (seqno > nextPacketToReceive.SeqNo)
{
this.traceHelper.LogError("{context} received wrong packet, #{seqno} instead of #{expected} ", this.traceContext, seqno, nextPacketToReceive.Value);
this.traceHelper.LogError("{context} received wrong packet, #{seqno} instead of #{expected} ", this.traceContext, seqno, nextPacketToReceive.SeqNo);
// this should never happen, as EventHubs guarantees in-order delivery of packets
throw new InvalidOperationException("EventHubs Out-Of-Order Packet");
}
@ -96,7 +92,7 @@ namespace DurableTask.Netherite.EventHubsTransport
}
else
{
yield return (eventData, new TEvent[1] { evt }, seqno);
yield return (eventData, new TEvent[1] { evt }, seqno, null);
}
}
else // we have to read messages from a blob batch
@ -107,24 +103,33 @@ namespace DurableTask.Netherite.EventHubsTransport
await BlobManager.AsynchronousStorageReadMaxConcurrency.WaitAsync();
byte[] blobContent;
byte[] blobContent = null;
token.ThrowIfCancellationRequested();
bool ignoreBatch = false;
try
{
this.lowestTraceLevel?.LogTrace("{context} downloading blob {blobName}", this.traceContext, blobClient.Name);
this.lowestTraceLevel?.LogTrace("{context} downloading blob {blobName} for #{seqno}", this.traceContext, blobClient.Name, seqno);
Azure.Response<BlobDownloadResult> downloadResult = await blobClient.DownloadContentAsync(token);
blobContent = downloadResult.Value.Content.ToArray();
this.lowestTraceLevel?.LogTrace("{context} downloaded blob {blobName} ({size} bytes, {count} packets)", this.traceContext, blobClient.Name, blobContent.Length, blobReference.PacketOffsets.Count + 1);
this.lowestTraceLevel?.LogTrace("{context} downloaded blob {blobName} for #{seqno} ({size} bytes, {count} packets)", this.traceContext, blobClient.Name, seqno, blobContent.Length, blobReference.PacketOffsets.Count + 1);
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// normal during shutdown
this.lowestTraceLevel?.LogTrace("{context} cancelled downloading blob {blobName} for #{seqno}", this.traceContext, blobClient.Name, seqno);
throw;
}
catch (Azure.RequestFailedException exception) when (BlobUtilsV12.BlobDoesNotExist(exception))
{
// the blob may be already deleted if (a) the next owner already received it, or (b) EH internally duplicated the message and the original was already delivered.
// in either case, the correct action is to skip the entire batch, i.e. not process any of its contents, since all the events were already delivered.
ignoreBatch = true;
}
catch (Exception exception)
{
this.traceHelper.LogError("{context} failed to read blob {blobName} for #{seqno}: {exception}", this.traceContext, blobClient.Name, seqno, exception);
@ -135,56 +140,52 @@ namespace DurableTask.Netherite.EventHubsTransport
BlobManager.AsynchronousStorageReadMaxConcurrency.Release();
}
TEvent[] result = new TEvent[blobReference.PacketOffsets.Count + 1];
for (int i = 0; i < result.Length; i++)
if (ignoreBatch)
{
var offset = i == 0 ? 0 : blobReference.PacketOffsets[i - 1];
var nextOffset = i < blobReference.PacketOffsets.Count ? blobReference.PacketOffsets[i] : blobContent.Length;
var length = nextOffset - offset;
using var m = new MemoryStream(blobContent, offset, length, writable: false);
this.traceHelper.LogWarning("{context} blob {blobName} for batch #{seqno} was not found. Ignoring batch since it must have been already delivered.", this.traceContext, blobClient.Name, seqno);
token.ThrowIfCancellationRequested();
try
{
Packet.Deserialize(m, out result[i], out _, null); // no need to check task hub match again
}
catch (Exception)
{
this.traceHelper.LogError("{context} could not deserialize packet from blob {blobName} at #{seqno}.{subSeqNo} offset={offset} length={length}", this.traceContext, blobClient.Name, seqno, i, offset, length);
throw;
}
}
yield return (eventData, result, seqno);
// we issue a deletion task; there is no strong guarantee that deletion will successfully complete
// which means some blobs can be left behind temporarily.
// This is fine because we have a second deletion path, a scan that removes old
// blobs that are past EH's expiration date.
if (!this.keepUntilConfirmed)
{
var bgTask = Task.Run(() => this.DeleteBlobAsync(new BlockBlobClient[1] { blobClient }));
// return an empty batch, no events should be processed
yield return (eventData, new TEvent[0], seqno, null);
}
else
{
// we cannot delete the blob until the partition has persisted an input queue position
// past this blob, so that we know we will not need to read it again
if (this.blobDeletions.TryRegister(result, blobClient))
TEvent[] result = new TEvent[blobReference.PacketOffsets.Count + 1];
for (int i = 0; i < result.Length; i++)
{
this.blobDeletions = new BlobDeletions(this);
}
else
{
// the current batch will be registered along with the next
// batch that successfully registers
}
var offset = i == 0 ? 0 : blobReference.PacketOffsets[i - 1];
var nextOffset = i < blobReference.PacketOffsets.Count ? blobReference.PacketOffsets[i] : blobContent.Length;
var length = nextOffset - offset;
if (nextPacketToReceive?.BatchPos > i)
{
this.lowestTraceLevel?.LogTrace("{context} skipped over event #({seqno},{subSeqNo}) because it is already processed", this.traceContext, seqno, i);
continue;
}
using var m = new MemoryStream(blobContent, offset, length, writable: false);
token.ThrowIfCancellationRequested();
try
{
Packet.Deserialize(m, out result[i], out _, null); // no need to check task hub match again
}
catch (Exception)
{
this.traceHelper.LogError("{context} could not deserialize packet from blob {blobName} at #{seqno}.{subSeqNo} offset={offset} length={length}", this.traceContext, blobClient.Name, seqno, i, offset, length);
throw;
}
}
yield return (eventData, result, seqno, blobClient);
}
}
if (nextPacketToReceive != null)
{
nextPacketToReceive.Value = seqno + 1;
nextPacketToReceive.SeqNo = seqno + 1;
nextPacketToReceive.BatchPos = 0;
}
}
@ -203,7 +204,7 @@ namespace DurableTask.Netherite.EventHubsTransport
}
}
public async Task<int> DeleteBlobAsync(IEnumerable<BlockBlobClient> blobClients)
public async Task<int> DeleteBlobBatchesAsync(IEnumerable<BlockBlobClient> blobClients)
{
int deletedCount = 0;
@ -215,7 +216,7 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.lowestTraceLevel?.LogTrace("{context} deleting blob {blobName}", this.traceContext, blobClient.Name);
Azure.Response response = await blobClient.DeleteAsync();
this.lowestTraceLevel?.LogTrace("{context} deleted blob {blobName}", this.traceContext, blobClient.Name);
this.traceHelper.LogDebug("{context} deleted blob {blobName}", this.traceContext, blobClient.Name);
deletedCount++;
}
catch (Azure.RequestFailedException e) when (BlobUtilsV12.BlobDoesNotExist(e))
@ -235,41 +236,6 @@ namespace DurableTask.Netherite.EventHubsTransport
return deletedCount;
}
class BlobDeletions : TransportAbstraction.IDurabilityListener
{
readonly BlobBatchReceiver<TEvent> blobBatchReceiver;
readonly List<BlockBlobClient> blobClients;
public BlobDeletions(BlobBatchReceiver<TEvent> blobBatchReceiver)
{
this.blobBatchReceiver = blobBatchReceiver;
this.blobClients = new List<BlockBlobClient>();
}
public bool TryRegister(TEvent[] events, BlockBlobClient blobClient)
{
this.blobClients.Add(blobClient);
for(int i = events.Length - 1; i >= 0; i--)
{
if (events[i] is PartitionUpdateEvent e)
{
// we can register a callback
// to be invoked after the event has been persisted in the log
DurabilityListeners.Register(e, this);
return true;
}
}
return false; // only read or query events in this batch, cannot register a callback
}
public void ConfirmDurable(Event evt)
{
Task.Run(() => this.blobBatchReceiver.DeleteBlobAsync(this.blobClients));
}
}
public async Task<int> RemoveGarbageAsync(CancellationToken token)
{
async IAsyncEnumerable<Azure.Page<BlobItem>> GetExpiredBlobs()
@ -321,7 +287,7 @@ namespace DurableTask.Netherite.EventHubsTransport
completed = true;
}
deletedCount += await this.DeleteBlobAsync(blobs);
deletedCount += await this.DeleteBlobBatchesAsync(blobs);
if (completed)
{
@ -363,8 +329,9 @@ namespace DurableTask.Netherite.EventHubsTransport
}
}
public class MutableLong
public class Position
{
public long Value;
public long SeqNo;
public int BatchPos;
}
}

Просмотреть файл

@ -11,6 +11,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Core.Common;
using DurableTask.Netherite.Abstractions;
using Microsoft.Azure.EventHubs;
@ -47,9 +48,16 @@ namespace DurableTask.Netherite.EventHubsTransport
// since EventProcessorHost does not redeliver packets, we need to keep them around until we are sure
// they are processed durably, so we can redeliver them when recycling/recovering a partition
// we make this a concurrent queue so we can remove confirmed events concurrently with receiving new ones
readonly ConcurrentQueue<(PartitionEvent[] events, long offset, long seqno)> pendingDelivery;
readonly ConcurrentQueue<EventEntry> pendingDelivery;
AsyncLock deliveryLock;
record struct EventEntry(long SeqNo, long BatchPos, PartitionEvent Event, bool LastInBatch, long Offset, BlockBlobClient BatchBlob)
{
public EventEntry(long seqNo, long batchPos, PartitionEvent evt) : this(seqNo, batchPos, evt, false, 0, null) { }
public EventEntry(long seqNo, long batchPos, PartitionEvent evt, long offset, BlockBlobClient blob) : this(seqNo, batchPos, evt, true, 0, blob) { }
}
// this points to the latest incarnation of this partition; it gets
// updated as we recycle partitions (create new incarnations after failures)
volatile Task<PartitionIncarnation> currentIncarnation;
@ -83,7 +91,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.host = host;
this.sender = sender;
this.parameters = parameters;
this.pendingDelivery = new ConcurrentQueue<(PartitionEvent[] events, long offset, long seqno)>();
this.pendingDelivery = new();
this.partitionContext = partitionContext;
this.settings = settings;
this.eventHubsTransport = eventHubsTransport;
@ -94,7 +102,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper = new EventHubsTraceHelper(traceHelper, this.partitionId);
this.shutdownToken = shutdownToken;
string traceContext = $"EventHubsProcessor {this.eventHubName}/{this.eventHubPartition}";
this.blobBatchReceiver = new BlobBatchReceiver<PartitionEvent>(traceContext, this.traceHelper, this.settings, keepUntilConfirmed: true);
this.blobBatchReceiver = new BlobBatchReceiver<PartitionEvent>(traceContext, this.traceHelper, this.settings);
var _ = shutdownToken.Register(
() => { var _ = Task.Run(() => this.IdempotentShutdown("shutdownToken", eventHubsTransport.FatalExceptionObserved)); },
@ -122,17 +130,44 @@ namespace DurableTask.Netherite.EventHubsTransport
public void ConfirmDurable(Event evt)
{
// this is called after an event has committed (i.e. has been durably persisted in the recovery log).
// so we know we will never need to deliver it again. We remove it from the local buffer, and also checkpoint
// with EventHubs occasionally.
while (this.pendingDelivery.TryPeek(out var front) && front.events[front.events.Length - 1].NextInputQueuePosition <= ((PartitionEvent)evt).NextInputQueuePosition)
List<BlockBlobClient> obsoleteBatches = null;
if (this.traceHelper.IsEnabled(LogLevel.Trace))
{
if (this.pendingDelivery.TryDequeue(out var candidate))
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} confirmed event nextInputQueuePosition={nextInputQueuePosition}", this.eventHubName, this.eventHubPartition, ((PartitionEvent)evt).NextInputQueuePositionTuple);
}
// this is called after an event has committed (i.e. has been durably persisted in the recovery log).
// so we know we will never need to deliver it again. We remove it from the local buffer, update the fields that
// track the last persisted position, and delete the blob batch if this was the last event in the batch.
while (this.pendingDelivery.TryPeek(out var front)
&& (front.Event == evt || front.Event.NextInputQueuePositionTuple.CompareTo(((PartitionEvent) evt).NextInputQueuePositionTuple) < 0))
{
if (this.pendingDelivery.TryDequeue(out var confirmed) && front == confirmed)
{
this.persistedOffset = Math.Max(this.persistedOffset, candidate.offset);
this.persistedSequenceNumber = Math.Max(this.persistedSequenceNumber, candidate.seqno);
if (this.traceHelper.IsEnabled(LogLevel.Trace))
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} discarding buffered event nextInputQueuePosition={nextInputQueuePosition} lastInBatch={lastInBatch} seqno={seqNo} offset={offset} batchBlob={batchBlob}", this.eventHubName, this.eventHubPartition, front.Event.NextInputQueuePositionTuple, confirmed.LastInBatch, confirmed.SeqNo, confirmed.Offset, confirmed.BatchBlob);
}
if (confirmed.LastInBatch)
{
// to create EH checkpoints we need to record both the EH sequence number and the EH offset (see SaveEventHubsReceiverCheckpoint)
this.persistedOffset = Math.Max(this.persistedOffset, confirmed.Offset);
this.persistedSequenceNumber = Math.Max(this.persistedSequenceNumber, confirmed.SeqNo);
if (confirmed.BatchBlob != null)
{
(obsoleteBatches ??= new()).Add(confirmed.BatchBlob);
}
}
}
}
if (obsoleteBatches != null)
{
Task backgroundTask = Task.Run(() => this.blobBatchReceiver.DeleteBlobBatchesAsync(obsoleteBatches));
}
}
async Task<PartitionIncarnation> StartPartitionAsync(PartitionIncarnation prior = null)
@ -225,7 +260,7 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} checking for packets requiring redelivery (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, c.Incarnation);
var batch = this.pendingDelivery
.SelectMany(x => x.Item1)
.Select(x => x.Event)
.Where(evt => (evt.NextInputQueuePosition, evt.NextInputQueueBatchPosition).CompareTo(c.NextPacketToReceive) > 0)
.ToList();
if (batch.Count > 0)
@ -388,6 +423,12 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is receiving events starting with #{seqno}", this.eventHubName, this.eventHubPartition, firstSequenceNumber);
if (this.shutdownToken.IsCancellationRequested)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} already shut down", this.eventHubName, this.eventHubPartition);
return;
}
PartitionIncarnation current = await this.currentIncarnation;
while (current != null && current.ErrorHandler.IsTerminated)
@ -428,59 +469,59 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) is processing packets", this.eventHubName, this.eventHubPartition, current.Incarnation);
// we need to update the next expected seqno even if the iterator returns nothing, since it may have discarded some packets.
// iterators do not support ref arguments, so we use a simple wrapper class to work around this limitation
MutableLong nextPacketToReceive = new MutableLong() { Value = current.NextPacketToReceive.seqNo };
// we need to update the next expected position tuple (seqno,batchpos) even if the iterator returns nothing, since it may have discarded some packets.
// iterators do not support ref arguments, so we define a Position object with mutable fields to work around this limitation
Position nextPacketToReceive = new Position() { SeqNo = current.NextPacketToReceive.seqNo, BatchPos = current.NextPacketToReceive.batchPos };
await foreach ((EventData eventData, PartitionEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, this.shutdownToken, nextPacketToReceive))
await foreach ((EventData eventData, PartitionEvent[] events, long seqNo, BlockBlobClient blob) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, this.shutdownToken, current.ErrorHandler, nextPacketToReceive))
{
int numSkipped = 0;
for (int i = 0; i < events.Length; i++)
{
PartitionEvent evt = events[i];
if (i < events.Length - 1)
if (evt == null)
{
numSkipped++;
continue; // was skipped over by the batch receiver because it is already processed
}
if (i < events.Length - 1) // this is not the last event in the batch
{
// the next input queue position is the next position within the same batch
evt.NextInputQueuePosition = seqNo;
evt.NextInputQueueBatchPosition = i + 1;
this.pendingDelivery.Enqueue(new EventEntry(seqNo, i, evt));
}
else
else // this is the last event in the batch
{
// the next input queue position is the first entry of of the next batch
evt.NextInputQueuePosition = seqNo + 1;
evt.NextInputQueueBatchPosition = 0;
this.pendingDelivery.Enqueue(new EventEntry(seqNo, i, evt, long.Parse(eventData.SystemProperties.Offset), blob));
}
if (this.traceHelper.IsEnabled(LogLevel.Trace))
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received packet #{seqno}.{subSeqNo} {event} id={eventId}", this.eventHubName, this.eventHubPartition, current.Incarnation, seqNo, i, evt, evt.EventIdString);
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received packet #({seqno},{subSeqNo}) {event} id={eventId}", this.eventHubName, this.eventHubPartition, current.Incarnation, seqNo, i, evt, evt.EventIdString);
}
if (evt is PartitionUpdateEvent partitionUpdateEvent)
{
DurabilityListeners.Register(evt, this);
}
totalEvents++;
}
// add events to redelivery queue, and attach durability listener to last update event in the batch
this.pendingDelivery.Enqueue((events, long.Parse(eventData.SystemProperties.Offset), eventData.SystemProperties.SequenceNumber));
for (int i = events.Length - 1; i >= 0; i--)
{
if (events[i] is PartitionUpdateEvent partitionUpdateEvent)
{
DurabilityListeners.Register(partitionUpdateEvent, this);
}
}
if (current.NextPacketToReceive.batchPos == 0)
{
current.Partition.SubmitEvents(events);
}
else
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) skipping {batchPos} events in batch #{seqno} because they are already processed", this.eventHubName, this.eventHubPartition, current.Incarnation, current.NextPacketToReceive.batchPos, seqNo);
current.Partition.SubmitEvents(events.Skip(current.NextPacketToReceive.batchPos).ToList());
}
current.Partition.SubmitEvents(numSkipped == 0 ? events : events.Skip(numSkipped).ToList());
}
current.NextPacketToReceive = (nextPacketToReceive.Value, 0);
current.NextPacketToReceive = (nextPacketToReceive.SeqNo, nextPacketToReceive.BatchPos);
}
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received {totalEvents} events in {latencyMs:F2}ms, starting with #{seqno}, next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, current.Incarnation, totalEvents, stopwatch.Elapsed.TotalMilliseconds, firstSequenceNumber, current.NextPacketToReceive.seqNo);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received {totalEvents} events in {latencyMs:F2}ms, starting with #{seqno}, next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, current.Incarnation, totalEvents, stopwatch.Elapsed.TotalMilliseconds, firstSequenceNumber, current.NextPacketToReceive);
await this.SaveEventHubsReceiverCheckpoint(context, 600000);
}

Просмотреть файл

@ -18,6 +18,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Threading.Channels;
using DurableTask.Netherite.Abstractions;
using System.Diagnostics;
using Azure.Storage.Blobs.Specialized;
/// <summary>
/// The EventHubs transport implementation.
@ -321,7 +322,7 @@ namespace DurableTask.Netherite.EventHubsTransport
TimeSpan longPollingInterval = TimeSpan.FromMinutes(1);
var backoffDelay = TimeSpan.Zero;
string context = $"Client{this.shortClientId}.ch{index}";
var blobBatchReceiver = new BlobBatchReceiver<ClientEvent>(context, this.traceHelper, this.settings, keepUntilConfirmed: false);
var blobBatchReceiver = new BlobBatchReceiver<ClientEvent>(context, this.traceHelper, this.settings);
await this.clientConnectionsEstablished[index];
@ -354,10 +355,11 @@ namespace DurableTask.Netherite.EventHubsTransport
if (packets != null)
{
List<BlockBlobClient> blobBatches = null;
int totalEvents = 0;
var stopwatch = Stopwatch.StartNew();
await foreach ((EventData eventData, ClientEvent[] events, long seqNo) in blobBatchReceiver.ReceiveEventsAsync(clientGuid, packets, this.shutdownSource.Token))
await foreach ((EventData eventData, ClientEvent[] events, long seqNo, BlockBlobClient blob) in blobBatchReceiver.ReceiveEventsAsync(clientGuid, packets, this.shutdownSource.Token))
{
for (int i = 0; i < events.Length; i++)
{
@ -376,8 +378,18 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogError("Client.{clientId}.ch{index} received packet #{seqno}.{subSeqNo} for client {otherClient}", this.shortClientId, index, seqNo, i, Client.GetShortId(clientEvent.ClientId));
}
}
if (blob != null)
{
(blobBatches ??= new()).Add(blob);
}
}
this.traceHelper.LogDebug("Client{clientId}.ch{index} received {totalEvents} events in {latencyMs:F2}ms", this.shortClientId, index, totalEvents, stopwatch.Elapsed.TotalMilliseconds);
if (blobBatches != null)
{
Task backgroundTask = Task.Run(() => blobBatchReceiver.DeleteBlobBatchesAsync(blobBatches));
}
}
else
{

Просмотреть файл

@ -11,6 +11,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Core.Common;
using DurableTask.Netherite.Abstractions;
using Microsoft.Azure.EventHubs;
@ -57,7 +58,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.partitionId = uint.Parse(this.eventHubPartition);
this.traceHelper = new EventHubsTraceHelper(traceHelper, this.partitionId);
this.shutdownToken = shutdownToken;
this.blobBatchReceiver = new BlobBatchReceiver<LoadMonitorEvent>("LoadMonitor", traceHelper, settings, keepUntilConfirmed: false);
this.blobBatchReceiver = new BlobBatchReceiver<LoadMonitorEvent>("LoadMonitor", traceHelper, settings);
}
Task IEventProcessor.OpenAsync(PartitionContext context)
@ -104,9 +105,10 @@ namespace DurableTask.Netherite.EventHubsTransport
EventData last = null;
int totalEvents = 0;
List<BlockBlobClient> blobBatches = null;
var stopwatch = Stopwatch.StartNew();
await foreach ((EventData eventData, LoadMonitorEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, this.shutdownToken))
await foreach ((EventData eventData, LoadMonitorEvent[] events, long seqNo, BlockBlobClient blob) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, this.shutdownToken))
{
for (int i = 0; i < events.Length; i++)
{
@ -117,8 +119,12 @@ namespace DurableTask.Netherite.EventHubsTransport
}
last = eventData;
}
if (blob != null)
{
(blobBatches ??= new()).Add(blob);
}
}
if (last != null)
{
@ -129,6 +135,11 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogDebug("LoadMonitor received no new events in {latencyMs:F2}ms", stopwatch.Elapsed.TotalMilliseconds);
}
if (blobBatches != null)
{
Task backgroundTask = Task.Run(() => this.blobBatchReceiver.DeleteBlobBatchesAsync(blobBatches));
}
this.PeriodicGarbageCheck();
}
catch (OperationCanceledException) when (this.shutdownToken.IsCancellationRequested)
@ -142,7 +153,7 @@ namespace DurableTask.Netherite.EventHubsTransport
}
finally
{
this.traceHelper.LogInformation("LoadMonitor exits receive loop");
this.traceHelper.LogDebug("LoadMonitor exits receive loop");
}
}