Merged PR 817279: Log GrpcTrace when the async request is completed

Log GrpcTrace when the async request is completed. This can let us know if there is a delay in the processing:

![image.png](https://dev.azure.com/mseng/9ed2c125-1cd5-4a17-886b-9d267f3a5fab/_apis/git/repositories/50d331c7-ea65-45eb-833f-0303c6c2387e/pullRequests/817279/attachments/image.png)
This commit is contained in:
Marcelo Lynch 2024-11-27 20:29:03 +00:00
Родитель 70dd4a26bf
Коммит 91feaf4284
5 изменённых файлов: 88 добавлений и 55 удалений

Просмотреть файл

@ -44,14 +44,20 @@ namespace BuildXL.Engine.Distribution.Grpc
/// <inheritdoc/>
public override Task<RpcResponse> ReportPipResults(PipResultsInfo message, ServerCallContext context)
{
m_orchestratorService.ReceivedPipResults(message).Forget(ex => Tracing.Logger.Log.GrpcEventHandlerExceptionOccurred(m_loggingContext, ex.ToStringDemystified()));
var ci = GrpcCallInformation.Extract(context);
m_orchestratorService.ReceivedPipResults(message).Forget(
unobservedExceptionHandler: ex => Tracing.Logger.Log.GrpcEventHandlerExceptionOccurred(m_loggingContext, ex.ToStringDemystified()),
synchronousContinuation: _ => Tracing.Logger.Log.GrpcTrace(m_loggingContext, ci.Sender, $"Done {ci.TraceId} {ci.MethodName}"));
return GrpcUtils.EmptyResponseTask;
}
/// <inheritdoc/>
public override Task<RpcResponse> ReportExecutionLog(ExecutionLogInfo message, ServerCallContext context)
{
m_orchestratorService.ReceivedExecutionLog(message).Forget(ex => Tracing.Logger.Log.GrpcEventHandlerExceptionOccurred(m_loggingContext, ex.ToStringDemystified()));
var ci = GrpcCallInformation.Extract(context);
m_orchestratorService.ReceivedExecutionLog(message).Forget(
unobservedExceptionHandler: ex => Tracing.Logger.Log.GrpcEventHandlerExceptionOccurred(m_loggingContext, ex.ToStringDemystified()),
synchronousContinuation: _ => Tracing.Logger.Log.GrpcTrace(m_loggingContext, ci.Sender, $"Done {ci.TraceId} {ci.MethodName}"));
return GrpcUtils.EmptyResponseTask;
}

Просмотреть файл

@ -8,8 +8,66 @@ using Grpc.Core;
namespace BuildXL.Engine.Distribution.Grpc
{
/// <summary>
/// Information of interest that we extract for every call
/// </summary>
public record struct GrpcCallInformation(string Sender, string MethodName, DistributedInvocationId InvocationId, string TraceId, string Token)
{
/// <summary>
/// Extracts the information from a <see cref="ServerCallContext"/>
/// </summary>
public static GrpcCallInformation Extract(ServerCallContext context)
{
string method = context.Method.Split('/')[2];
var callInformation = new GrpcCallInformation
{
MethodName = method,
Sender = string.Empty
};
string relatedActivityId = string.Empty;
string environment = string.Empty;
string engineVersion = string.Empty;
callInformation.TraceId = string.Empty;
callInformation.Token = string.Empty;
foreach (var kvp in context.RequestHeaders)
{
if (kvp.Key == GrpcMetadata.TraceIdKey)
{
callInformation.TraceId = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.RelatedActivityIdKey)
{
relatedActivityId = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.EnvironmentKey)
{
environment = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.EngineVersionKey)
{
engineVersion = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.SenderKey)
{
callInformation.Sender = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.AuthKey)
{
callInformation.Token = kvp.Value;
}
}
callInformation.InvocationId = new DistributedInvocationId(relatedActivityId, environment, engineVersion);
return callInformation;
}
}
internal static class GrpcSettings
{
/// <summary>
/// How many retry attempts when a grpc message is failed to send in the given <see cref="CallTimeout"/>
/// </summary>
@ -59,46 +117,5 @@ namespace BuildXL.Engine.Distribution.Grpc
/// Default: 75 minutes
/// </remarks>
public static TimeSpan WorkerAttachTimeout => EngineEnvironmentSettings.WorkerAttachTimeout;
public static void ParseHeader(Metadata header, out string sender, out DistributedInvocationId senderInvocationId, out string traceId, out string token)
{
sender = string.Empty;
string relatedActivityId = string.Empty;
string environment = string.Empty;
string engineVersion = string.Empty;
traceId = string.Empty;
token = string.Empty;
foreach (var kvp in header)
{
if (kvp.Key == GrpcMetadata.TraceIdKey)
{
traceId = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.RelatedActivityIdKey)
{
relatedActivityId = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.EnvironmentKey)
{
environment = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.EngineVersionKey)
{
engineVersion = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.SenderKey)
{
sender = kvp.Value;
}
else if (kvp.Key == GrpcMetadata.AuthKey)
{
token = kvp.Value;
}
}
senderInvocationId = new DistributedInvocationId(relatedActivityId, environment, engineVersion);
}
}
}

Просмотреть файл

@ -26,9 +26,8 @@ namespace BuildXL.Engine.Distribution.Grpc
/// <inheritdoc/>
public override Task<RpcResponse> Attach(BuildStartData message, ServerCallContext context)
{
GrpcSettings.ParseHeader(context.RequestHeaders, out string sender, out var _, out var _, out var _);
m_workerService.Attach(message, sender);
var callInformation = GrpcCallInformation.Extract(context);
m_workerService.Attach(message, callInformation.Sender);
return GrpcUtils.EmptyResponseTask;
}

Просмотреть файл

@ -36,13 +36,12 @@ namespace BuildXL.Engine.Distribution
private void InterceptCallContext(ServerCallContext context, out string sender)
{
string method = context.Method.Split('/')[2];
var callInformation = GrpcCallInformation.Extract(context);
sender = callInformation.Sender;
GrpcSettings.ParseHeader(context.RequestHeaders, out sender, out var senderInvocationId, out string traceId, out string token);
if (m_invocationId != senderInvocationId)
if (m_invocationId != callInformation.InvocationId)
{
string failureMessage = $"The receiver and sender distributed invocation ids do not match. Receiver invocation id: {m_invocationId}. Sender invocation id: {senderInvocationId}.";
string failureMessage = $"The receiver and sender distributed invocation ids do not match. Receiver invocation id: {m_invocationId}. Sender invocation id: {callInformation.InvocationId}.";
Logger.Log.GrpcServerTrace(m_loggingContext, failureMessage);
var trailers = new Metadata
@ -58,9 +57,9 @@ namespace BuildXL.Engine.Distribution
trailers);
}
if (m_authenticationEnabled && token != m_token)
if (m_authenticationEnabled && callInformation.Token != m_token)
{
Logger.Log.GrpcTrace(m_loggingContext, sender, $"Authentication tokens do not match:\r\nReceived:{token}\r\nExpected:{m_token}");
Logger.Log.GrpcTrace(m_loggingContext, callInformation.Sender, $"Authentication tokens do not match:\r\nReceived:{callInformation.Token.Substring(0, 10)}\r\nExpected:{m_token.Substring(0,10)}");
var trailers = new Metadata
{
@ -70,7 +69,7 @@ namespace BuildXL.Engine.Distribution
throw new RpcException(new Status(StatusCode.Unauthenticated, "Call could not be authenticated."), trailers);
}
Logger.Log.GrpcTrace(m_loggingContext, sender, $"Recv {traceId} {method}");
Logger.Log.GrpcTrace(m_loggingContext, callInformation.Sender, $"Recv {callInformation.TraceId} {callInformation.MethodName}");
}
public override Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation)

Просмотреть файл

@ -367,7 +367,8 @@ namespace BuildXL.Utilities.Core.Tasks
/// </summary>
/// <param name="task">The task whose result is to be ignored.</param>
/// <param name="unobservedExceptionHandler">Optional handler for the task's unobserved exception (if any).</param>
public static void Forget(this Task task, Action<Exception> unobservedExceptionHandler = null)
/// <param name="synchronousContinuation">Optional action to invoke synchronously after the task completes succesfully</param>
public static void Forget(this Task task, Action<Exception> unobservedExceptionHandler = null, Action<Task> synchronousContinuation = null)
{
task.ContinueWith(t =>
{
@ -377,6 +378,17 @@ namespace BuildXL.Utilities.Core.Tasks
var e = (t.Exception as AggregateException)?.InnerException ?? t.Exception;
unobservedExceptionHandler?.Invoke(e);
}
else
{
try
{
synchronousContinuation?.Invoke(t);
}
catch (Exception e)
{
unobservedExceptionHandler?.Invoke(e);
}
}
});
}