update work item tracing to include latency and discarding of client work items
This commit is contained in:
Родитель
c1d30075d0
Коммит
e54b492a6d
|
@ -6,6 +6,7 @@ namespace DurableTask.Netherite
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
|
@ -21,6 +22,7 @@ namespace DurableTask.Netherite
|
|||
readonly string account;
|
||||
readonly Guid taskHubGuid;
|
||||
readonly WorkItemTraceHelper workItemTraceHelper;
|
||||
readonly Stopwatch workItemStopwatch;
|
||||
|
||||
static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5);
|
||||
|
||||
|
@ -55,6 +57,8 @@ namespace DurableTask.Netherite
|
|||
this.ResponseWaiters = new ConcurrentDictionary<long, PendingRequest>();
|
||||
this.Fragments = new Dictionary<string, MemoryStream>();
|
||||
this.ResponseTimeouts.Start("ClientTimer");
|
||||
this.workItemStopwatch = new Stopwatch();
|
||||
this.workItemStopwatch.Start();
|
||||
|
||||
this.traceHelper.TraceProgress("Started");
|
||||
}
|
||||
|
@ -130,10 +134,7 @@ namespace DurableTask.Netherite
|
|||
this.ResponseWaiters.TryAdd(request.RequestId, pendingRequest);
|
||||
this.ResponseTimeouts.Schedule(request.TimeoutUtc, pendingRequest, timeoutId);
|
||||
|
||||
if (doneWhenSent)
|
||||
{
|
||||
DurabilityListeners.Register((Event)request, pendingRequest);
|
||||
}
|
||||
DurabilityListeners.Register((Event)request, pendingRequest);
|
||||
|
||||
this.Send(partitionEvent);
|
||||
|
||||
|
@ -148,6 +149,7 @@ namespace DurableTask.Netherite
|
|||
readonly Client client;
|
||||
readonly (DateTime due, int id) timeoutKey;
|
||||
readonly TaskCompletionSource<ClientEvent> continuation;
|
||||
readonly double startTime;
|
||||
|
||||
static readonly TimeoutException timeoutException = new TimeoutException("Client request timed out.");
|
||||
|
||||
|
@ -164,6 +166,7 @@ namespace DurableTask.Netherite
|
|||
this.client = client;
|
||||
this.timeoutKey = (due, timeoutId);
|
||||
this.continuation = new TaskCompletionSource<ClientEvent>();
|
||||
this.startTime = this.client.workItemStopwatch.Elapsed.TotalMilliseconds;
|
||||
}
|
||||
|
||||
public void Respond(ClientEvent evt)
|
||||
|
@ -174,20 +177,72 @@ namespace DurableTask.Netherite
|
|||
|
||||
void TransportAbstraction.IDurabilityListener.ConfirmDurable(Event evt)
|
||||
{
|
||||
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var _))
|
||||
if (evt is ClientTaskMessagesReceived request)
|
||||
{
|
||||
this.client.ResponseTimeouts.TryCancel(this.timeoutKey);
|
||||
this.continuation.TrySetResult(null); // task finishes when the send has been confirmed, no result is returned
|
||||
// we create a separate trace message for each destination partition
|
||||
foreach (var group in request.TaskMessages.GroupBy((message) => message.OrchestrationInstance.InstanceId))
|
||||
{
|
||||
this.client.workItemTraceHelper.TraceWorkItemCompleted(
|
||||
request.PartitionId,
|
||||
WorkItemTraceHelper.WorkItemType.Client,
|
||||
request.WorkItemId,
|
||||
group.Key,
|
||||
WorkItemTraceHelper.ClientStatus.Send,
|
||||
this.client.workItemStopwatch.Elapsed.TotalMilliseconds - this.startTime,
|
||||
WorkItemTraceHelper.FormatMessageIdList(group.Select((message) => (message, request.WorkItemId))));
|
||||
}
|
||||
|
||||
// this request is considered completed at the time of durability
|
||||
// so we generate the response now
|
||||
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var _))
|
||||
{
|
||||
this.client.ResponseTimeouts.TryCancel(this.timeoutKey);
|
||||
this.continuation.TrySetResult(null); // task finishes when the send has been confirmed, no result is returned
|
||||
}
|
||||
}
|
||||
else if (evt is CreationRequestReceived creationRequestReceived)
|
||||
{
|
||||
this.client.workItemTraceHelper.TraceWorkItemCompleted(
|
||||
creationRequestReceived.PartitionId,
|
||||
WorkItemTraceHelper.WorkItemType.Client,
|
||||
creationRequestReceived.WorkItemId,
|
||||
creationRequestReceived.InstanceId,
|
||||
WorkItemTraceHelper.ClientStatus.Create,
|
||||
this.client.workItemStopwatch.Elapsed.TotalMilliseconds - this.startTime,
|
||||
WorkItemTraceHelper.FormatMessageIdList(creationRequestReceived.TracedTaskMessages));
|
||||
}
|
||||
}
|
||||
|
||||
void TransportAbstraction.IDurabilityOrExceptionListener.ReportException(Event evt, Exception e)
|
||||
{
|
||||
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var _))
|
||||
if (evt is ClientTaskMessagesReceived request)
|
||||
{
|
||||
this.client.ResponseTimeouts.TryCancel(this.timeoutKey);
|
||||
this.continuation.TrySetException(e); // task finishes with exception
|
||||
// we create a separate trace message for each destination partition
|
||||
foreach (var group in request.TaskMessages.GroupBy((message) => message.OrchestrationInstance.InstanceId))
|
||||
{
|
||||
this.client.workItemTraceHelper.TraceWorkItemDiscarded(
|
||||
request.PartitionId,
|
||||
WorkItemTraceHelper.WorkItemType.Client,
|
||||
request.WorkItemId,
|
||||
group.Key,
|
||||
"");
|
||||
}
|
||||
|
||||
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var _))
|
||||
{
|
||||
this.client.ResponseTimeouts.TryCancel(this.timeoutKey);
|
||||
this.continuation.TrySetException(e); // task finishes with exception
|
||||
}
|
||||
}
|
||||
else if (evt is CreationRequestReceived creationRequestReceived)
|
||||
{
|
||||
this.client.workItemTraceHelper.TraceWorkItemDiscarded(
|
||||
creationRequestReceived.PartitionId,
|
||||
WorkItemTraceHelper.WorkItemType.Client,
|
||||
creationRequestReceived.WorkItemId,
|
||||
creationRequestReceived.InstanceId,
|
||||
"");
|
||||
}
|
||||
}
|
||||
|
||||
public void TryTimeout()
|
||||
|
@ -224,14 +279,13 @@ namespace DurableTask.Netherite
|
|||
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
|
||||
};
|
||||
|
||||
this.workItemTraceHelper.TraceWorkItemCompleted(
|
||||
this.workItemTraceHelper.TraceWorkItemStarted(
|
||||
partitionId,
|
||||
WorkItemTraceHelper.WorkItemType.Client,
|
||||
request.WorkItemId,
|
||||
creationMessage.OrchestrationInstance.InstanceId,
|
||||
WorkItemTraceHelper.ClientStatus.Create,
|
||||
0,
|
||||
WorkItemTraceHelper.FormatMessageIdList(request.TracedTaskMessages));
|
||||
"CreateOrchestration",
|
||||
WorkItemTraceHelper.FormatEmptyMessageIdList());
|
||||
|
||||
var response = await this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, false).ConfigureAwait(false);
|
||||
var creationResponseReceived = (CreationResponseReceived)response;
|
||||
|
@ -264,16 +318,15 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
|
||||
// we create a separate trace message for each destination partition
|
||||
foreach (var group in messages.GroupBy((message) => message.OrchestrationInstance.InstanceId))
|
||||
foreach (var group in request.TaskMessages.GroupBy((message) => message.OrchestrationInstance.InstanceId))
|
||||
{
|
||||
this.workItemTraceHelper.TraceWorkItemCompleted(
|
||||
this.workItemTraceHelper.TraceWorkItemStarted(
|
||||
partitionId,
|
||||
WorkItemTraceHelper.WorkItemType.Client,
|
||||
request.WorkItemId,
|
||||
group.Key,
|
||||
WorkItemTraceHelper.ClientStatus.Send,
|
||||
0,
|
||||
WorkItemTraceHelper.FormatMessageIdList(group.Select((message) => (message, request.WorkItemId))));
|
||||
"SendMessages",
|
||||
WorkItemTraceHelper.FormatEmptyMessageIdList());
|
||||
}
|
||||
|
||||
return this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, true);
|
||||
|
|
|
@ -26,6 +26,8 @@ namespace DurableTask.Netherite
|
|||
public static string FormatMessageIdList(IEnumerable<(TaskMessage message, string workItem)> messages)
|
||||
=> string.Join(",", messages.Select(entry => FormatMessageId(entry.message, entry.workItem)));
|
||||
|
||||
public static string FormatEmptyMessageIdList() => string.Empty;
|
||||
|
||||
public static string FormatClientWorkItemId(Guid clientId, long requestId)
|
||||
=> $"{Client.GetShortId(clientId)}R{requestId}";
|
||||
|
||||
|
@ -111,7 +113,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
if (this.logger.IsEnabled(LogLevel.Information))
|
||||
{
|
||||
this.logger.LogInformation("Part{partition:D2} completed {workItemType}WorkItem {workItemId} instanceId={instanceId} status={status} latencyMs={latencyMs} producedMessageIds={producedMessageIds}",
|
||||
this.logger.LogInformation("Part{partition:D2} completed {workItemType}WorkItem {workItemId} instanceId={instanceId} status={status} latencyMs={latencyMs:F2} producedMessageIds={producedMessageIds}",
|
||||
partitionId, workItemType, workItemId, instanceId, status, latencyMs, producedMessageIds);
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче