Fix client responses to wait for persistence (#98)

* fixes the response for create, delete, purge, and wait request so it does not send the response until the state is persisted

* fix logic mistake in OutboxState
This commit is contained in:
Sebastian Burckhardt 2021-12-22 12:29:02 -08:00 коммит произвёл GitHub
Родитель 195cf34c66
Коммит 3989fb03d6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 155 добавлений и 91 удалений

Просмотреть файл

@ -39,6 +39,9 @@ namespace DurableTask.Netherite
[IgnoreDataMember]
public override TrackedObjectKey Target => TrackedObjectKey.Instance(this.InstanceId);
[IgnoreDataMember]
public CreationResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState
public override bool OnReadComplete(TrackedObject target, Partition partition)
{
// Use this moment of time as the creation timestamp, replacing the original timestamp taken on the client.

Просмотреть файл

@ -25,6 +25,9 @@ namespace DurableTask.Netherite
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;
[IgnoreDataMember]
public DeletionResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState
public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);

Просмотреть файл

@ -31,7 +31,7 @@ namespace DurableTask.Netherite
History = historyState?.History?.ToList(),
};
partition.Send(response);
partition.Send(response); //TODO wait for persistence
}
}
}

Просмотреть файл

@ -41,6 +41,9 @@ namespace DurableTask.Netherite
PurgeBatchIssued batch = makeNewBatchObject();
// TODO : while the request itself is reliable, the client response is not.
// We should probably fix that by using the ClientState to track progress.
async Task ExecuteBatch()
{
await partition.State.Prefetch(batch.KeysToPrefetch);

Просмотреть файл

@ -42,7 +42,7 @@ namespace DurableTask.Netherite
OrchestrationState = editedState,
};
partition.Send(response);
partition.Send(response); //TODO wait for persistence
}
}
}

Просмотреть файл

@ -22,6 +22,9 @@ namespace DurableTask.Netherite
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;
[IgnoreDataMember]
public WaitResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState
public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);

Просмотреть файл

@ -66,6 +66,9 @@ namespace DurableTask.Netherite
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;
[IgnoreDataMember]
public List<WaitResponseReceived> ResponsesToSend { get; set; } // used to communicate responses to ClientState
IEnumerable<TrackedObjectKey> IRequiresPrefetch.KeysToPrefetch
{
get

Просмотреть файл

@ -9,7 +9,7 @@ namespace DurableTask.Netherite
using System.Threading.Tasks;
[DataContract]
class PurgeBatchIssued : PartitionUpdateEvent, IRequiresPrefetch
class PurgeBatchIssued : PartitionUpdateEvent, IRequiresPrefetch, TransportAbstraction.IDurabilityListener
{
[DataMember]
public string QueryEventId { get; set; }
@ -71,5 +71,11 @@ namespace DurableTask.Netherite
effects.Add(TrackedObjectKey.Instance(instanceId));
}
}
public void ConfirmDurable(Event evt)
{
// lets the client know this batch has been persisted
this.WhenProcessed.TrySetResult(null);
}
}
}

Просмотреть файл

@ -70,17 +70,14 @@ namespace DurableTask.Netherite
}
}
if (!effects.IsReplaying)
effects.Add(TrackedObjectKey.Outbox);
creationRequestReceived.ResponseToSend = new CreationResponseReceived()
{
// send response to client
effects.Partition.Send(new CreationResponseReceived()
{
ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
});
}
ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
};
}
@ -92,13 +89,8 @@ namespace DurableTask.Netherite
// if the orchestration is complete, notify clients that are waiting for it
if (this.Waiters != null && WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
{
if (!effects.IsReplaying)
{
foreach (var request in this.Waiters)
{
this.Partition.Send(request.CreateResponse(this.OrchestrationState));
}
}
// we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState
evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList();
this.Waiters = null;
}
@ -108,10 +100,8 @@ namespace DurableTask.Netherite
{
if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
{
if (!effects.IsReplaying)
{
this.Partition.Send(evt.CreateResponse(this.OrchestrationState));
}
effects.Add(TrackedObjectKey.Outbox);
evt.ResponseToSend = evt.CreateResponse(this.OrchestrationState);
}
else
{
@ -135,7 +125,7 @@ namespace DurableTask.Netherite
{
int numberInstancesDeleted = 0;
if (this.OrchestrationState != null
if (this.OrchestrationState != null
&& (!deletionRequest.CreatedTime.HasValue || deletionRequest.CreatedTime.Value == this.OrchestrationState.CreatedTime))
{
numberInstancesDeleted++;
@ -148,15 +138,13 @@ namespace DurableTask.Netherite
effects.Add(TrackedObjectKey.Sessions);
}
if (!effects.IsReplaying)
effects.Add(TrackedObjectKey.Outbox);
deletionRequest.ResponseToSend = new DeletionResponseReceived()
{
this.Partition.Send(new DeletionResponseReceived()
{
ClientId = deletionRequest.ClientId,
RequestId = deletionRequest.RequestId,
NumberInstancesDeleted = numberInstancesDeleted,
});
}
ClientId = deletionRequest.ClientId,
RequestId = deletionRequest.RequestId,
NumberInstancesDeleted = numberInstancesDeleted,
};
}
public override void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects)

Просмотреть файл

@ -94,6 +94,11 @@ namespace DurableTask.Netherite
outmessage.OriginPosition = batch.Position;
this.Partition.Send(outmessage);
}
foreach (var outresponse in batch.OutgoingResponses)
{
DurabilityListeners.Register(outresponse, batch);
this.Partition.Send(outresponse);
}
}
[DataContract]
@ -102,6 +107,9 @@ namespace DurableTask.Netherite
[DataMember]
public List<PartitionMessageEvent> OutgoingMessages { get; set; } = new List<PartitionMessageEvent>();
[DataMember]
public List<ClientEvent> OutgoingResponses { get; set; } = new List<ClientEvent>();
[IgnoreDataMember]
public long Position { get; set; }
@ -119,21 +127,22 @@ namespace DurableTask.Netherite
public void ConfirmDurable(Event evt)
{
var partitionMessageEvent = (PartitionMessageEvent)evt;
var workItemTraceHelper = this.Partition.WorkItemTraceHelper;
if (workItemTraceHelper.TraceTaskMessages)
if (evt is PartitionMessageEvent partitionMessageEvent)
{
double? persistenceDelayMs = this.ProcessedTimestamp.HasValue ? (this.ReadyToSendTimestamp - this.ProcessedTimestamp.Value) : null;
double sendDelayMs = this.Partition.CurrentTimeMs - this.ReadyToSendTimestamp;
foreach (var entry in partitionMessageEvent.TracedTaskMessages)
var workItemTraceHelper = this.Partition.WorkItemTraceHelper;
if (workItemTraceHelper.TraceTaskMessages)
{
workItemTraceHelper.TraceTaskMessageSent(this.Partition.PartitionId, entry.message, entry.workItemId, persistenceDelayMs, sendDelayMs);
double? persistenceDelayMs = this.ProcessedTimestamp.HasValue ? (this.ReadyToSendTimestamp - this.ProcessedTimestamp.Value) : null;
double sendDelayMs = this.Partition.CurrentTimeMs - this.ReadyToSendTimestamp;
foreach (var entry in partitionMessageEvent.TracedTaskMessages)
{
workItemTraceHelper.TraceTaskMessageSent(this.Partition.PartitionId, entry.message, entry.workItemId, persistenceDelayMs, sendDelayMs);
}
}
}
if (++this.numAcks == this.OutgoingMessages.Count)
if (++this.numAcks == this.OutgoingMessages.Count + this.OutgoingResponses.Count)
{
this.Partition.SubmitEvent(new SendConfirmed()
{
@ -171,73 +180,93 @@ namespace DurableTask.Netherite
var batch = new Batch();
int subPosition = 0;
IEnumerable<(uint,TaskMessage)> Messages()
bool sendResponses = evt.ResponsesToSend != null;
bool sendMessages = evt.RemoteMessages?.Count > 0;
if (! (sendResponses || sendMessages))
{
foreach (var message in evt.RemoteMessages)
return;
}
if (sendResponses)
{
foreach(var r in evt.ResponsesToSend)
{
var instanceId = message.OrchestrationInstance.InstanceId;
var destination = this.Partition.PartitionFunction(instanceId);
yield return (destination, message);
batch.OutgoingResponses.Add(r);
}
}
void AddMessage(TaskMessagesReceived outmessage, TaskMessage message)
if (sendMessages)
{
if (Entities.IsDelayedEntityMessage(message, out _))
IEnumerable<(uint, TaskMessage)> Messages()
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
foreach (var message in evt.RemoteMessages)
{
var instanceId = message.OrchestrationInstance.InstanceId;
var destination = this.Partition.PartitionFunction(instanceId);
yield return (destination, message);
}
}
else if (message.Event is ExecutionStartedEvent executionStartedEvent && executionStartedEvent.ScheduledStartTime.HasValue)
void AddMessage(TaskMessagesReceived outmessage, TaskMessage message)
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
if (Entities.IsDelayedEntityMessage(message, out _))
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
}
else if (message.Event is ExecutionStartedEvent executionStartedEvent && executionStartedEvent.ScheduledStartTime.HasValue)
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
}
else
{
(outmessage.TaskMessages ??= new List<TaskMessage>()).Add(message);
}
outmessage.SubPosition = ++subPosition;
}
if (evt.PackPartitionTaskMessages > 1)
{
// pack multiple TaskMessages for the same destination into a single TaskMessagesReceived event
var sorted = new Dictionary<uint, TaskMessagesReceived>();
foreach ((uint destination, TaskMessage message) in Messages())
{
if (!sorted.TryGetValue(destination, out var outmessage))
{
sorted[destination] = outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
}
AddMessage(outmessage, message);
// send the message if we have reached the pack limit
if (outmessage.NumberMessages >= evt.PackPartitionTaskMessages)
{
batch.OutgoingMessages.Add(outmessage);
sorted.Remove(destination);
}
}
batch.OutgoingMessages.AddRange(sorted.Values);
}
else
{
(outmessage.TaskMessages ??= new List<TaskMessage>()).Add(message);
}
outmessage.SubPosition = ++subPosition;
}
if (evt.PackPartitionTaskMessages > 1)
{
// pack multiple TaskMessages for the same destination into a single TaskMessagesReceived event
var sorted = new Dictionary<uint, TaskMessagesReceived>();
foreach ((uint destination, TaskMessage message) in Messages())
{
if (!sorted.TryGetValue(destination, out var outmessage))
// send each TaskMessage as a separate TaskMessagesReceived event
foreach ((uint destination, TaskMessage message) in Messages())
{
sorted[destination] = outmessage = new TaskMessagesReceived()
var outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
}
AddMessage(outmessage, message);
// send the message if we have reached the pack limit
if (outmessage.NumberMessages >= evt.PackPartitionTaskMessages)
{
AddMessage(outmessage, message);
batch.OutgoingMessages.Add(outmessage);
sorted.Remove(destination);
}
}
batch.OutgoingMessages.AddRange(sorted.Values);
}
else
{
// send each TaskMessage as a separate TaskMessagesReceived event
foreach ((uint destination, TaskMessage message) in Messages())
{
var outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
AddMessage(outmessage, message);
batch.OutgoingMessages.Add(outmessage);
}
}
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}
@ -270,5 +299,29 @@ namespace DurableTask.Netherite
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}
public override void Process(WaitRequestReceived evt, EffectTracker effects)
{
this.Partition.Assert(evt.ResponseToSend != null);
var batch = new Batch();
batch.OutgoingResponses.Add(evt.ResponseToSend);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}
public override void Process(CreationRequestReceived evt, EffectTracker effects)
{
this.Partition.Assert(evt.ResponseToSend != null);
var batch = new Batch();
batch.OutgoingResponses.Add(evt.ResponseToSend);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}
public override void Process(DeletionRequestReceived evt, EffectTracker effects)
{
this.Partition.Assert(evt.ResponseToSend != null);
var batch = new Batch();
batch.OutgoingResponses.Add(evt.ResponseToSend);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}
}
}

Просмотреть файл

@ -74,7 +74,7 @@ namespace DurableTask.Netherite
if (!effects.IsReplaying)
{
// lets the query that is currently in progress know that this batch is done
purgeBatchIssued.WhenProcessed.TrySetResult(null);
DurabilityListeners.Register(purgeBatchIssued, purgeBatchIssued);
}
}

Просмотреть файл

@ -324,7 +324,7 @@ namespace DurableTask.Netherite
effects.Add(TrackedObjectKey.Timers);
}
if (evt.RemoteMessages?.Count > 0)
if (evt.RemoteMessages?.Count > 0 || WaitRequestReceived.SatisfiesWaitCondition(evt.State))
{
effects.Add(TrackedObjectKey.Outbox);
}

Просмотреть файл

@ -91,6 +91,8 @@ namespace DurableTask.Netherite.Emulated
var serialized = this.Serialize(evt);
DurabilityListeners.ConfirmDurable(evt);
this.Submit(serialized);
}
}