Add latency to work item tracing.
This commit is contained in:
Родитель
7d793c4164
Коммит
bf793347f5
|
@ -19,9 +19,10 @@ namespace DurableTask.Netherite
|
|||
// a partition-local identifier for this activity (is a sequence number generated by ActivitiesState)
|
||||
public long ActivityId { get; set; }
|
||||
|
||||
|
||||
public string OriginWorkItem { get; set; }
|
||||
|
||||
public double StartedAt { get; set; }
|
||||
|
||||
public ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem)
|
||||
{
|
||||
this.Partition = partition;
|
||||
|
|
|
@ -230,6 +230,7 @@ namespace DurableTask.Netherite
|
|||
request.WorkItemId,
|
||||
creationMessage.OrchestrationInstance.InstanceId,
|
||||
WorkItemTraceHelper.ClientStatus.Create,
|
||||
0,
|
||||
WorkItemTraceHelper.FormatMessageIdList(request.TracedTaskMessages));
|
||||
|
||||
var response = await this.PerformRequestWithTimeoutAndCancellation(this.shutdownToken, request, false).ConfigureAwait(false);
|
||||
|
@ -271,6 +272,7 @@ namespace DurableTask.Netherite
|
|||
request.WorkItemId,
|
||||
group.Key,
|
||||
WorkItemTraceHelper.ClientStatus.Send,
|
||||
0,
|
||||
WorkItemTraceHelper.FormatMessageIdList(group.Select((message) => (message, request.WorkItemId))));
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ namespace DurableTask.Netherite
|
|||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
@ -33,6 +34,8 @@ namespace DurableTask.Netherite
|
|||
|
||||
readonly WorkItemTraceHelper workItemTraceHelper;
|
||||
|
||||
readonly Stopwatch workItemStopwatch = new Stopwatch();
|
||||
|
||||
/// <summary>
|
||||
/// The logger category prefix used for all ILoggers in this backend.
|
||||
/// </summary>
|
||||
|
@ -116,6 +119,8 @@ namespace DurableTask.Netherite
|
|||
if (this.configuredTransport != TransportConnectionString.TransportChoices.Memory)
|
||||
this.LoadMonitorService = new AzureLoadMonitorTable(settings.StorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName);
|
||||
|
||||
this.workItemStopwatch.Start();
|
||||
|
||||
this.Logger.LogInformation(
|
||||
"trace generation limits: general={general} , transport={transport}, storage={storage}, events={events}; workitems={workitems}; etwEnabled={etwEnabled}; core.IsTraceEnabled={core}",
|
||||
settings.LogLevelLimit,
|
||||
|
@ -523,8 +528,10 @@ namespace DurableTask.Netherite
|
|||
nextOrchestrationWorkItem.MessageBatch.InstanceId,
|
||||
nextOrchestrationWorkItem.Type.ToString(),
|
||||
WorkItemTraceHelper.FormatMessageIdList(nextOrchestrationWorkItem.MessageBatch.TracedMessages));
|
||||
}
|
||||
|
||||
|
||||
nextOrchestrationWorkItem.StartedAt = this.workItemStopwatch.Elapsed.TotalMilliseconds;
|
||||
}
|
||||
|
||||
return nextOrchestrationWorkItem;
|
||||
}
|
||||
|
||||
|
@ -540,6 +547,7 @@ namespace DurableTask.Netherite
|
|||
var orchestrationWorkItem = (OrchestrationWorkItem)workItem;
|
||||
var messageBatch = orchestrationWorkItem.MessageBatch;
|
||||
var partition = orchestrationWorkItem.Partition;
|
||||
var latencyMs = this.workItemStopwatch.Elapsed.TotalMilliseconds - orchestrationWorkItem.StartedAt;
|
||||
|
||||
List<TaskMessage> localMessages = null;
|
||||
List<TaskMessage> remoteMessages = null;
|
||||
|
@ -626,6 +634,7 @@ namespace DurableTask.Netherite
|
|||
messageBatch.WorkItemId,
|
||||
workItem.InstanceId,
|
||||
batchProcessedEvent.State.OrchestrationStatus,
|
||||
latencyMs,
|
||||
WorkItemTraceHelper.FormatMessageIdList(batchProcessedEvent.TracedTaskMessages));
|
||||
|
||||
try
|
||||
|
@ -713,6 +722,8 @@ namespace DurableTask.Netherite
|
|||
nextActivityWorkItem.TaskMessage.OrchestrationInstance.InstanceId,
|
||||
nextActivityWorkItem.ExecutionType,
|
||||
WorkItemTraceHelper.FormatMessageId(nextActivityWorkItem.TaskMessage, nextActivityWorkItem.OriginWorkItem));
|
||||
|
||||
nextActivityWorkItem.StartedAt = this.workItemStopwatch.Elapsed.TotalMilliseconds;
|
||||
}
|
||||
|
||||
return nextActivityWorkItem;
|
||||
|
@ -729,6 +740,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
var activityWorkItem = (ActivityWorkItem)workItem;
|
||||
var partition = activityWorkItem.Partition;
|
||||
var latencyMs = this.workItemStopwatch.Elapsed.TotalMilliseconds - activityWorkItem.StartedAt;
|
||||
|
||||
var activityCompletedEvent = new ActivityCompleted()
|
||||
{
|
||||
|
@ -746,6 +758,7 @@ namespace DurableTask.Netherite
|
|||
activityWorkItem.WorkItemId,
|
||||
activityWorkItem.TaskMessage.OrchestrationInstance.InstanceId,
|
||||
WorkItemTraceHelper.ActivityStatus.Completed,
|
||||
latencyMs,
|
||||
WorkItemTraceHelper.FormatMessageId(responseMessage, activityWorkItem.WorkItemId));
|
||||
|
||||
try
|
||||
|
|
|
@ -24,6 +24,8 @@ namespace DurableTask.Netherite
|
|||
|
||||
public List<string> NewMessagesOrigin { get; set; }
|
||||
|
||||
public double StartedAt { get; set; }
|
||||
|
||||
public OrchestrationWorkItem(Partition partition, OrchestrationMessageBatch messageBatch, List<HistoryEvent> previousHistory = null)
|
||||
{
|
||||
this.Partition = partition;
|
||||
|
|
|
@ -149,10 +149,10 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
|
||||
[Event(225, Level = EventLevel.Informational, Version = 1)]
|
||||
public void WorkItemCompleted(string Account, string TaskHub, int PartitionId, string WorkItemType, string WorkItemId, string InstanceId, string Status, string ProducedMessageIds, string ExtensionVersion)
|
||||
public void WorkItemCompleted(string Account, string TaskHub, int PartitionId, string WorkItemType, string WorkItemId, string InstanceId, string Status, double LatencyMs, string ProducedMessageIds, string ExtensionVersion)
|
||||
{
|
||||
SetCurrentThreadActivityId(serviceInstanceId);
|
||||
this.WriteEvent(225, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, Status, ProducedMessageIds, ExtensionVersion);
|
||||
this.WriteEvent(225, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, Status, LatencyMs, ProducedMessageIds, ExtensionVersion);
|
||||
}
|
||||
|
||||
[Event(226, Level = EventLevel.Warning, Version = 1)]
|
||||
|
|
|
@ -105,17 +105,17 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
}
|
||||
|
||||
public void TraceWorkItemCompleted(uint partitionId, WorkItemType workItemType, string workItemId, string instanceId, object status, string producedMessageIds)
|
||||
public void TraceWorkItemCompleted(uint partitionId, WorkItemType workItemType, string workItemId, string instanceId, object status, double latencyMs, string producedMessageIds)
|
||||
{
|
||||
if (this.logLevelLimit <= LogLevel.Information)
|
||||
{
|
||||
if (this.logger.IsEnabled(LogLevel.Information))
|
||||
{
|
||||
this.logger.LogInformation("Part{partition:D2} completed {workItemType}WorkItem {workItemId} instanceId={instanceId} status={status} producedMessageIds={producedMessageIds}",
|
||||
partitionId, workItemType, workItemId, instanceId, status, producedMessageIds);
|
||||
this.logger.LogInformation("Part{partition:D2} completed {workItemType}WorkItem {workItemId} instanceId={instanceId} status={status} latencyMs={latencyMs} producedMessageIds={producedMessageIds}",
|
||||
partitionId, workItemType, workItemId, instanceId, status, latencyMs, producedMessageIds);
|
||||
}
|
||||
|
||||
this.etw?.WorkItemCompleted(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, status.ToString(), producedMessageIds, TraceUtils.ExtensionVersion);
|
||||
this.etw?.WorkItemCompleted(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, status.ToString(), latencyMs, producedMessageIds, TraceUtils.ExtensionVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче