allow configuration of how many taskmessages to pack into a partition-to-partition event; defaults to 100 (#29)

This commit is contained in:
Sebastian Burckhardt 2021-03-30 19:02:31 -07:00 коммит произвёл GitHub
Родитель 4c7fb58edc
Коммит 5e8f9cc544
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 87 добавлений и 27 удалений

Просмотреть файл

@ -14,6 +14,9 @@ namespace DurableTask.Netherite
[DataMember]
public long OriginPosition { get; set; }
[IgnoreDataMember]
public virtual (long, int) DedupPosition => (this.OriginPosition, 0); // overridden if a subposition is needed
public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Dedup);

Просмотреть файл

@ -17,12 +17,21 @@ namespace DurableTask.Netherite
[DataMember]
public List<TaskMessage> DelayedTaskMessages { get; set; }
[DataMember]
public int SubPosition { get; set; }
[DataMember]
public string WorkItemId { get; set; }
[IgnoreDataMember]
public override EventId EventId => EventId.MakePartitionToPartitionEventId(this.WorkItemId, this.PartitionId);
[IgnoreDataMember]
public override (long, int) DedupPosition => (this.OriginPosition, this.SubPosition);
[IgnoreDataMember]
public int NumberMessages => (this.TaskMessages?.Count ?? 0) + (this.DelayedTaskMessages?.Count ?? 0);
protected override void ExtraTraceInformation(StringBuilder s)
{
var tCount = this.TaskMessages?.Count ?? 0;

Просмотреть файл

@ -148,6 +148,11 @@ namespace DurableTask.Netherite
/// </summary>
public bool PersistStepsFirst { get; set; } = false;
/// <summary>
/// Pack TaskMessages generated by a single work item for the same destination into a single event.
/// </summary>
public int PackPartitionTaskMessages { get; set; } = 100;
/// <summary>
/// Gets or sets the name used for resolving the premium Azure storage connection string, if used.
/// </summary>

Просмотреть файл

@ -16,7 +16,7 @@ namespace DurableTask.Netherite
class DedupState : TrackedObject
{
[DataMember]
public Dictionary<uint, long> LastProcessed { get; set; } = new Dictionary<uint, long>();
public Dictionary<uint, (long Position, int SubPosition)> LastProcessed { get; set; } = new Dictionary<uint, (long,int)>();
[DataMember]
public (long, long) Positions; // used by FasterAlt to persist positions
@ -26,11 +26,12 @@ namespace DurableTask.Netherite
bool IsNotDuplicate(PartitionMessageEvent evt)
{
// detect duplicates of incoming partition-to-partition events by comparing commit log position of this event against last processed event from same partition
this.LastProcessed.TryGetValue(evt.OriginPartition, out long lastProcessed);
if (evt.OriginPosition > lastProcessed)
// detect duplicates of incoming partition-to-partition events by comparing commit log position /subposition of this event against last processed event from same partition
this.LastProcessed.TryGetValue(evt.OriginPartition, out (long,int) lastProcessed);
if (evt.DedupPosition.CompareTo(lastProcessed) > 0)
{
this.LastProcessed[evt.OriginPartition] = evt.OriginPosition;
this.LastProcessed[evt.OriginPartition] = evt.DedupPosition;
return true;
}
else

Просмотреть файл

@ -114,7 +114,7 @@ namespace DurableTask.Netherite
public Partition Partition { get; set; }
[IgnoreDataMember]
int numAcks = 0;
int numAcks = 0;
public void ConfirmDurable(Event evt)
{
@ -154,34 +154,76 @@ int numAcks = 0;
public void Process(BatchProcessed evt, EffectTracker effects)
{
var sorted = new Dictionary<uint, TaskMessagesReceived>();
foreach (var message in evt.RemoteMessages)
{
var instanceId = message.OrchestrationInstance.InstanceId;
var destination = this.Partition.PartitionFunction(instanceId);
if (!sorted.TryGetValue(destination, out var outmessage))
var batch = new Batch();
int subPosition = 0;
IEnumerable<(uint,TaskMessage)> Messages()
{
foreach (var message in evt.RemoteMessages)
{
sorted[destination] = outmessage = new TaskMessagesReceived()
var instanceId = message.OrchestrationInstance.InstanceId;
var destination = this.Partition.PartitionFunction(instanceId);
yield return (destination, message);
}
}
void AddMessage(TaskMessagesReceived outmessage, TaskMessage message)
{
if (Entities.IsDelayedEntityMessage(message, out _))
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
}
else if (message.Event is ExecutionStartedEvent executionStartedEvent && executionStartedEvent.ScheduledStartTime.HasValue)
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
}
else
{
(outmessage.TaskMessages ??= new List<TaskMessage>()).Add(message);
}
outmessage.SubPosition = ++subPosition;
}
if (effects.Partition.Settings.PackPartitionTaskMessages > 1)
{
// pack multiple TaskMessages for the same destination into a single TaskMessagesReceived event
var sorted = new Dictionary<uint, TaskMessagesReceived>();
foreach ((uint destination, TaskMessage message) in Messages())
{
if (!sorted.TryGetValue(destination, out var outmessage))
{
sorted[destination] = outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
}
AddMessage(outmessage, message);
// send the message if we have reached the pack limit
if (outmessage.NumberMessages >= effects.Partition.Settings.PackPartitionTaskMessages)
{
batch.OutgoingMessages.Add(outmessage);
sorted.Remove(destination);
}
}
batch.OutgoingMessages.AddRange(sorted.Values);
}
else
{
// send each TaskMessage as a separate TaskMessagesReceived event
foreach ((uint destination, TaskMessage message) in Messages())
{
var outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
}
if (Entities.IsDelayedEntityMessage(message, out _))
{
(outmessage.DelayedTaskMessages ?? (outmessage.DelayedTaskMessages = new List<TaskMessage>())).Add(message);
}
else if (message.Event is ExecutionStartedEvent executionStartedEvent && executionStartedEvent.ScheduledStartTime.HasValue)
{
(outmessage.DelayedTaskMessages ?? (outmessage.DelayedTaskMessages = new List<TaskMessage>())).Add(message);
}
else
{
(outmessage.TaskMessages ?? (outmessage.TaskMessages = new List<TaskMessage>())).Add(message);
AddMessage(outmessage, message);
batch.OutgoingMessages.Add(outmessage);
}
}
var batch = new Batch();
batch.OutgoingMessages.AddRange(sorted.Values);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}