Fix fragmentation/reassembly logic (#233)
* fix fragmentation logic * address PR feedback.
This commit is contained in:
Родитель
3275990ebf
Коммит
4b9a83d345
|
@ -3,6 +3,7 @@
|
|||
|
||||
namespace DurableTask.Netherite
|
||||
{
|
||||
using System;
|
||||
using System.Runtime.Serialization;
|
||||
|
||||
[DataContract]
|
||||
|
@ -11,6 +12,9 @@ namespace DurableTask.Netherite
|
|||
[DataMember]
|
||||
public EventId OriginalEventId { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public Guid? GroupId { get; set; } // we now use a group id for tracking fragments, to fix issue #231
|
||||
|
||||
[DataMember]
|
||||
public byte[] Bytes { get; set; }
|
||||
|
||||
|
|
|
@ -15,6 +15,9 @@ namespace DurableTask.Netherite
|
|||
[DataMember]
|
||||
public EventId OriginalEventId { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public Guid? GroupId { get; set; } // we now use a group id for tracking fragments, to fix issue #231
|
||||
|
||||
[DataMember]
|
||||
public byte[] Bytes { get; set; }
|
||||
|
||||
|
@ -38,6 +41,11 @@ namespace DurableTask.Netherite
|
|||
protected override void ExtraTraceInformation(StringBuilder s)
|
||||
{
|
||||
s.Append(' ');
|
||||
if (this.GroupId.HasValue)
|
||||
{
|
||||
s.Append(this.GroupId.Value.ToString("N"));
|
||||
s.Append(' ');
|
||||
}
|
||||
s.Append(this.Bytes.Length);
|
||||
if (this.IsLast)
|
||||
{
|
||||
|
|
|
@ -39,7 +39,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
readonly BatchTimer<PendingRequest> ResponseTimeouts;
|
||||
readonly ConcurrentDictionary<long, PendingRequest> ResponseWaiters;
|
||||
readonly Dictionary<(string,int), (MemoryStream, int)> Fragments;
|
||||
readonly Dictionary<string, (MemoryStream, int)> Fragments;
|
||||
readonly Dictionary<long, QueryResponseReceived> QueryResponses;
|
||||
|
||||
public static string GetShortId(Guid clientId) => clientId.ToString("N").Substring(0, 7);
|
||||
|
@ -62,7 +62,7 @@ namespace DurableTask.Netherite
|
|||
this.shutdownToken = shutdownToken;
|
||||
this.ResponseTimeouts = new BatchTimer<PendingRequest>(this.shutdownToken, this.Timeout, this.traceHelper.TraceTimerProgress);
|
||||
this.ResponseWaiters = new ConcurrentDictionary<long, PendingRequest>();
|
||||
this.Fragments = new Dictionary<(string,int), (MemoryStream, int)>();
|
||||
this.Fragments = new Dictionary<string, (MemoryStream, int)>();
|
||||
this.QueryResponses = new Dictionary<long, QueryResponseReceived>();
|
||||
this.ResponseTimeouts.Start("ClientTimer");
|
||||
this.workItemStopwatch = new Stopwatch();
|
||||
|
@ -87,7 +87,9 @@ namespace DurableTask.Netherite
|
|||
if (clientEvent is ClientEventFragment fragment)
|
||||
{
|
||||
var originalEventString = fragment.OriginalEventId.ToString();
|
||||
var index = (originalEventString, clientEvent.ReceiveChannel);
|
||||
var group = fragment.GroupId.HasValue
|
||||
? fragment.GroupId.Value.ToString() // groups are now the way we track fragments
|
||||
: $"{originalEventString}-{fragment.ReceiveChannel}"; // prior to introducing groups, we used event id and channel, which is not always good enough
|
||||
|
||||
if (this.traceHelper.LogLevelLimit == Microsoft.Extensions.Logging.LogLevel.Trace)
|
||||
{
|
||||
|
@ -96,7 +98,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
if (fragment.IsLast)
|
||||
{
|
||||
(MemoryStream stream, int last) = this.Fragments[index];
|
||||
(MemoryStream stream, int last) = this.Fragments[group];
|
||||
|
||||
if (last != fragment.Fragment)
|
||||
{
|
||||
|
@ -105,6 +107,8 @@ namespace DurableTask.Netherite
|
|||
|
||||
var reassembledEvent = FragmentationAndReassembly.Reassemble<ClientEvent>(stream, fragment);
|
||||
|
||||
this.Fragments.Remove(group);
|
||||
|
||||
this.Process(reassembledEvent);
|
||||
}
|
||||
else
|
||||
|
@ -113,11 +117,11 @@ namespace DurableTask.Netherite
|
|||
|
||||
if (fragment.Fragment == 0)
|
||||
{
|
||||
this.Fragments[index] = streamAndPosition = (new MemoryStream(), 0);
|
||||
this.Fragments[group] = streamAndPosition = (new MemoryStream(), 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
streamAndPosition = this.Fragments[index];
|
||||
streamAndPosition = this.Fragments[group];
|
||||
}
|
||||
|
||||
if (streamAndPosition.Item2 != fragment.Fragment)
|
||||
|
@ -128,13 +132,11 @@ namespace DurableTask.Netherite
|
|||
streamAndPosition.Item1.Write(fragment.Bytes, 0, fragment.Bytes.Length);
|
||||
streamAndPosition.Item2++;
|
||||
|
||||
this.Fragments[index] = streamAndPosition;
|
||||
this.Fragments[group] = streamAndPosition;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this.Fragments.Remove((clientEvent.EventIdString, clientEvent.ReceiveChannel));
|
||||
|
||||
if (clientEvent is QueryResponseReceived queryResponseReceived)
|
||||
{
|
||||
queryResponseReceived.DeserializeOrchestrationStates();
|
||||
|
|
|
@ -58,15 +58,17 @@ namespace DurableTask.Netherite
|
|||
public override void Process(PartitionEventFragment evt, EffectTracker effects)
|
||||
{
|
||||
// stores fragments until the last one is received
|
||||
var originalEventString = evt.OriginalEventId.ToString();
|
||||
var group = evt.GroupId.HasValue
|
||||
? evt.GroupId.Value.ToString() // groups are now the way we track fragments
|
||||
: evt.OriginalEventId.ToString(); // prior to introducing groups, we used just the event id, which is not correct under interleavings
|
||||
|
||||
if (evt.IsLast)
|
||||
{
|
||||
evt.ReassembledEvent = FragmentationAndReassembly.Reassemble<PartitionEvent>(this.Fragments[originalEventString], evt, effects.Partition);
|
||||
evt.ReassembledEvent = FragmentationAndReassembly.Reassemble<PartitionEvent>(this.Fragments[group], evt, effects.Partition);
|
||||
|
||||
effects.EventDetailTracer?.TraceEventProcessingDetail($"Reassembled {evt.ReassembledEvent}");
|
||||
|
||||
this.Fragments.Remove(originalEventString);
|
||||
this.Fragments.Remove(group);
|
||||
|
||||
switch (evt.ReassembledEvent)
|
||||
{
|
||||
|
@ -96,11 +98,11 @@ namespace DurableTask.Netherite
|
|||
|
||||
if (evt.Fragment == 0)
|
||||
{
|
||||
this.Fragments[originalEventString] = list = new List<PartitionEventFragment>();
|
||||
this.Fragments[group] = list = new List<PartitionEventFragment>();
|
||||
}
|
||||
else
|
||||
{
|
||||
list = this.Fragments[originalEventString];
|
||||
list = this.Fragments[group];
|
||||
}
|
||||
|
||||
list.Add(evt);
|
||||
|
|
|
@ -98,8 +98,9 @@ namespace DurableTask.Netherite.EventHubsTransport
|
|||
if (tooBig)
|
||||
{
|
||||
// the message is too big. Break it into fragments, and send each individually.
|
||||
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} fragmenting large event ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
|
||||
var fragments = FragmentationAndReassembly.Fragment(arraySegment, evt, this.maxFragmentSize);
|
||||
Guid groupId = Guid.NewGuid();
|
||||
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} fragmenting large event ({size} bytes) id={eventId} groupId={group:N}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString, groupId);
|
||||
var fragments = FragmentationAndReassembly.Fragment(arraySegment, evt, groupId, this.maxFragmentSize);
|
||||
maybeSent = i;
|
||||
for (int k = 0; k < fragments.Count; k++)
|
||||
{
|
||||
|
|
|
@ -24,7 +24,7 @@ namespace DurableTask.Netherite
|
|||
int Fragment { get; }
|
||||
}
|
||||
|
||||
public static List<IEventFragment> Fragment(ArraySegment<byte> segment, Event original, int maxFragmentSize)
|
||||
public static List<IEventFragment> Fragment(ArraySegment<byte> segment, Event original, Guid groupId, int maxFragmentSize)
|
||||
{
|
||||
if (segment.Count <= maxFragmentSize)
|
||||
throw new ArgumentException(nameof(segment), "segment must be larger than max fragment size");
|
||||
|
@ -40,6 +40,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
list.Add(new ClientEventFragment()
|
||||
{
|
||||
GroupId = groupId,
|
||||
ClientId = clientEvent.ClientId,
|
||||
RequestId = clientEvent.RequestId,
|
||||
OriginalEventId = original.EventId,
|
||||
|
@ -52,6 +53,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
list.Add(new PartitionEventFragment()
|
||||
{
|
||||
GroupId = groupId,
|
||||
PartitionId = partitionEvent.PartitionId,
|
||||
OriginalEventId = original.EventId,
|
||||
Timeout = (partitionEvent as ClientRequestEvent)?.TimeoutUtc,
|
||||
|
|
Загрузка…
Ссылка в новой задаче