always deduplicate CreationRequestReceived events

This commit is contained in:
sebastianburckhardt 2024-04-16 16:11:10 -07:00
Родитель a582ca576e
Коммит b7cd8b5fc3
1 изменённых файлов: 16 добавлений и 3 удалений

Просмотреть файл

@ -27,6 +27,9 @@ namespace DurableTask.Netherite
[DataMember] [DataMember]
public List<WaitRequestReceived> Waiters { get; set; } public List<WaitRequestReceived> Waiters { get; set; }
[DataMember]
public string CreationRequestEventId { get; set; }
[IgnoreDataMember] [IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Instance, this.InstanceId); public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Instance, this.InstanceId);
@ -39,12 +42,19 @@ namespace DurableTask.Netherite
public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects)
{ {
if (creationRequestReceived.EventIdString == this.CreationRequestEventId)
{
// we have already processed this event - it must be a duplicate delivery. Ignore it.
return;
};
bool exists = this.OrchestrationState != null; bool exists = this.OrchestrationState != null;
bool filterDuplicate = exists
bool previousExecutionWithDedupeStatus = exists
&& creationRequestReceived.DedupeStatuses != null && creationRequestReceived.DedupeStatuses != null
&& creationRequestReceived.DedupeStatuses.Contains(this.OrchestrationState.OrchestrationStatus); && creationRequestReceived.DedupeStatuses.Contains(this.OrchestrationState.OrchestrationStatus);
if (!filterDuplicate) if (! previousExecutionWithDedupeStatus)
{ {
var ee = creationRequestReceived.ExecutionStartedEvent; var ee = creationRequestReceived.ExecutionStartedEvent;
@ -65,6 +75,7 @@ namespace DurableTask.Netherite
ScheduledStartTime = ee.ScheduledStartTime ScheduledStartTime = ee.ScheduledStartTime
}; };
this.OrchestrationStateSize = DurableTask.Netherite.SizeUtils.GetEstimatedSize(this.OrchestrationState); this.OrchestrationStateSize = DurableTask.Netherite.SizeUtils.GetEstimatedSize(this.OrchestrationState);
this.CreationRequestEventId = creationRequestReceived.EventIdString;
// queue the message in the session, or start a timer if delayed // queue the message in the session, or start a timer if delayed
if (!ee.ScheduledStartTime.HasValue) if (!ee.ScheduledStartTime.HasValue)
@ -87,7 +98,7 @@ namespace DurableTask.Netherite
{ {
ClientId = creationRequestReceived.ClientId, ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId, RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate, Succeeded = !previousExecutionWithDedupeStatus,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus, ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
}; };
} }
@ -211,6 +222,8 @@ namespace DurableTask.Netherite
effects.AddDeletion(TrackedObjectKey.History(this.InstanceId)); effects.AddDeletion(TrackedObjectKey.History(this.InstanceId));
this.OrchestrationState = null; this.OrchestrationState = null;
this.OrchestrationStateSize = 0;
this.CreationRequestEventId = null;
this.Waiters = null; this.Waiters = null;
} }