Merged PR 705776: Remove grpc.core support

This commit is contained in:
Semih Okur 2023-03-08 20:49:45 +00:00
Родитель 90e4d7d8ab
Коммит eacce37978
4 изменённых файлов: 123 добавлений и 236 удалений

Просмотреть файл

@ -1,5 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma warning disable 0414
using System;
using System.Collections.Generic;
@ -37,7 +38,7 @@ namespace BuildXL.Engine.Distribution.Grpc
{
/// <nodoc />
public ConnectionFailureType Type { get; init; }
/// <nodoc />
public string Details { get; init; }
@ -80,33 +81,6 @@ namespace BuildXL.Engine.Distribution.Grpc
public static readonly IEnumerable<ChannelOption> ServerChannelOptions = GetServerChannelOptions();
// Verbose logging meant for debugging only
internal static void EnableVerboseLogging(string path, GrpcEnvironmentOptions.GrpcVerbosity verbosity)
{
#if NET6_0_OR_GREATER
s_debugLogPathBase = path;
// Adapt from GrpcEnvironmentOptions.GrpcVerbosity.
// We are slightly more 'verbose' here (i.e. Debug => Trace and Error => Warning)
// to account for the finer granularity and considering that the gRPC.NET client logging
// is not as verbose as the Grpc.Core one.
s_debugLogVerbosity = verbosity switch
{
GrpcEnvironmentOptions.GrpcVerbosity.Disabled => LogLevel.None,
GrpcEnvironmentOptions.GrpcVerbosity.Debug => LogLevel.Trace,
GrpcEnvironmentOptions.GrpcVerbosity.Info => LogLevel.Debug,
GrpcEnvironmentOptions.GrpcVerbosity.Error => LogLevel.Warning,
_ => LogLevel.Error
};
}
private static string s_debugLogPathBase;
private static LogLevel s_debugLogVerbosity;
#else
}
#endif
internal readonly ChannelBase Channel;
private readonly LoggingContext m_loggingContext;
private readonly DistributedInvocationId m_invocationId;
private readonly Task m_monitorConnectionTask;
@ -115,22 +89,19 @@ namespace BuildXL.Engine.Distribution.Grpc
private volatile bool m_isExitCalledForServer;
private readonly CancellationTokenSource m_exitTokenSource = new CancellationTokenSource();
private volatile bool m_attached;
private readonly bool m_dotNetClientEnabled;
private readonly string m_ipAddress;
private int m_numReconnectAttempts;
private readonly CounterCollection<DistributionCounter> m_counters;
private readonly CancellableTimedAction m_heartbeatAction;
private readonly Func<CallOptions, Task> m_heartbeatCall;
private int m_numConsecutiveHeartbeatFails;
private static string s_debugLogPathBase;
/// <summary>
/// Channel State
/// </summary>
#if NET6_0_OR_GREATER
private ChannelState State => m_dotNetClientEnabled ? (ChannelState)(int)(((GrpcChannel)Channel).State) : ((Channel)Channel).State;
#else
private ChannelState State => ((Channel)Channel).State;
internal readonly GrpcChannel Channel;
private ConnectivityState State => Channel.State;
private int m_numReconnectAttempts;
private static LogLevel s_debugLogVerbosity;
#endif
private string GenerateLog(string traceId, string status, uint numTry, string description)
@ -142,39 +113,23 @@ namespace BuildXL.Engine.Distribution.Grpc
return $"{status}{tryText} {traceId} {description}";
}
private string GenerateFailLog(string traceId, uint numTry, string failure)
{
return GenerateLog(traceId.ToString(), "Fail", numTry, failure);
}
public ClientConnectionManager(LoggingContext loggingContext, string ipAddress, int port, DistributedInvocationId invocationId, CounterCollection<DistributionCounter> counters, Func<CallOptions, Task> heartbeatCall)
{
m_invocationId = invocationId;
m_loggingContext = loggingContext;
m_dotNetClientEnabled = EngineEnvironmentSettings.GrpcDotNetClientEnabled;
m_ipAddress = ipAddress;
m_counters = counters;
if (m_dotNetClientEnabled)
{
#if NET6_0_OR_GREATER
Channel = SetupGrpcNetClient(ipAddress, port);
if (EngineEnvironmentSettings.GrpcDotNetMonitorConnectionsEnabled)
{
m_monitorConnectionTask = MonitorConnectionAsync();
}
#endif
}
else
{
// Grpc.Core package will be deprecated in late 2022.
Channel = SetupGrpcCoreClient(ipAddress, port);
m_monitorConnectionTask = MonitorConnectionAsync();
}
Channel = SetupGrpcNetClient(ipAddress, port);
m_monitorConnectionTask = MonitorConnectionAsync();
Contract.Assert(Channel != null, "Channel must be initialized");
m_heartbeatCall = heartbeatCall;
#else
m_monitorConnectionTask = null;
#endif
m_heartbeatCall = heartbeatCall;
m_heartbeatAction = new CancellableTimedAction(SendHeartbeat, GrpcSettings.HeartbeatIntervalMs);
}
@ -198,65 +153,8 @@ namespace BuildXL.Engine.Distribution.Grpc
}
}
private ChannelBase SetupGrpcCoreClient(string ipAddress, int port)
{
var channelCreds = ChannelCredentials.Insecure;
List<ChannelOption> channelOptions = new List<ChannelOption>();
channelOptions.AddRange(s_defaultChannelOptions);
if (EngineEnvironmentSettings.GrpcKeepAliveEnabled)
{
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.KeepAlivePermitWithoutCalls, 1)); // enable sending pings
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.KeepAliveTimeMs, 300000)); // 5m-frequent pings
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.KeepAliveTimeoutMs, 60000)); // wait for 1m to receive ack for the ping before closing connection.
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.MaxPingsWithoutData, 0)); // no limit for pings with no header/data
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.MinSentPingIntervalWithoutDataMs, 300000)); // 5m-frequent pings with no header/data
}
if (GrpcSettings.EncryptionEnabled)
{
string certSubjectName = EngineEnvironmentSettings.CBBuildUserCertificateName;
if (GrpcEncryptionUtils.TryGetPublicAndPrivateKeys(certSubjectName, out string publicCertificate, out string privateKey, out string hostName, out string errorMessage) &&
publicCertificate != null &&
privateKey != null &&
hostName != null)
{
channelCreds = new SslCredentials(
publicCertificate,
new KeyCertificatePair(publicCertificate, privateKey));
var callCredentials = GetCallCredentialsWithToken();
if (callCredentials != null)
{
channelCreds = ChannelCredentials.Create(channelCreds, callCredentials);
}
// This is needed to make SSL hostname verification successful.
// Otherwise we see this sort of error:
// GrpcCore: 0 T:\src\github\grpc\workspace_csharp_ext_windows_x64\src\core\ext\filters\client_channel\subchannel.cc:1073:
// Connect failed: {"created":"@1628096484.767000000","description":"Peer name MW1SCH103352403 is not in peer certificate",
// "file":"T:\src\github\grpc\workspace_csharp_ext_windows_x64\src\core\lib\security\security_connector\ssl\ssl_security_connector.cc","file_line":59}
// Even though this is advertised as 'test environment' only, this is a common practice for distributed services running in a closed network.
channelOptions.Add(new ChannelOption(ChannelOptions.SslTargetNameOverride, hostName));
Logger.Log.GrpcTrace(m_loggingContext, ipAddress, "Grpc.Core auth is enabled");
}
else
{
Logger.Log.GrpcTraceWarning(m_loggingContext, ipAddress, $"Could not extract public certificate and private key from '{certSubjectName}'. Server will be started without ssl. Error message: '{errorMessage}'");
}
}
return new Channel(
ipAddress,
port,
channelCreds,
channelOptions);
}
#if NET6_0_OR_GREATER
private ChannelBase SetupGrpcNetClient(string ipAddress, int port)
private GrpcChannel SetupGrpcNetClient(string ipAddress, int port)
{
var handler = new SocketsHttpHandler
{
@ -284,13 +182,13 @@ namespace BuildXL.Engine.Distribution.Grpc
if (s_debugLogPathBase != null)
{
// Enable logging from the client
channelOptions.LoggerFactory = LoggerFactory.Create(l =>
channelOptions.LoggerFactory = LoggerFactory.Create(l =>
{
l.AddProvider(new GrpcFileLoggerAdapter(s_debugLogPathBase + $".client.{ipAddress}_{port}.grpc"));
l.SetMinimumLevel(s_debugLogVerbosity);
l.AddProvider(new GrpcFileLoggerAdapter(s_debugLogPathBase + $".client.{ipAddress}_{port}.grpc"));
l.SetMinimumLevel(s_debugLogVerbosity);
});
}
if (EngineEnvironmentSettings.GrpcDotNetServiceConfigEnabled)
{
var defaultMethodConfig = new MethodConfig
@ -302,14 +200,15 @@ namespace BuildXL.Engine.Distribution.Grpc
InitialBackoff = TimeSpan.FromSeconds(2),
MaxBackoff = TimeSpan.FromSeconds(10),
BackoffMultiplier = 1.5,
RetryableStatusCodes = {
RetryableStatusCodes = {
StatusCode.Unavailable,
StatusCode.Internal,
StatusCode.Unknown }
}
};
channelOptions.ServiceConfig = new ServiceConfig {
channelOptions.ServiceConfig = new ServiceConfig
{
MethodConfigs = { defaultMethodConfig },
LoadBalancingConfigs = { new PickFirstConfig() },
};
@ -380,6 +279,27 @@ namespace BuildXL.Engine.Distribution.Grpc
}
#endif
// Verbose logging meant for debugging only
internal static void EnableVerboseLogging(string path, GrpcEnvironmentOptions.GrpcVerbosity verbosity)
{
s_debugLogPathBase = path;
#if NET6_0_OR_GREATER
// Adapt from GrpcEnvironmentOptions.GrpcVerbosity.
// We are slightly more 'verbose' here (i.e. Debug => Trace and Error => Warning)
// to account for the finer granularity and considering that the gRPC.NET client logging
// is not as verbose as the Grpc.Core one.
s_debugLogVerbosity = verbosity switch
{
GrpcEnvironmentOptions.GrpcVerbosity.Disabled => LogLevel.None,
GrpcEnvironmentOptions.GrpcVerbosity.Debug => LogLevel.Trace,
GrpcEnvironmentOptions.GrpcVerbosity.Info => LogLevel.Debug,
GrpcEnvironmentOptions.GrpcVerbosity.Error => LogLevel.Warning,
_ => LogLevel.Error
};
#endif
}
private CallCredentials GetCallCredentialsWithToken()
{
string buildIdentityTokenLocation = EngineEnvironmentSettings.CBBuildIdentityTokenPath;
@ -412,22 +332,26 @@ namespace BuildXL.Engine.Distribution.Grpc
// Pings are sent from client to server, and we do not want server to send pings to client due to the overhead concerns.
// We just need to make server accept the pings.
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.KeepAlivePermitWithoutCalls, 1)); // enable receiving pings with no data
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.MinRecvPingIntervalWithoutDataMs, 60000)); // expecting 5m-frequent pings with no header/data. As this is minimum allowed, we state 1m pings.
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.KeepAliveTimeMs, 300000)); // 5m-frequent pings
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.KeepAliveTimeoutMs, 60000)); // wait for 1m to receive ack for the ping before closing connection.
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.MaxPingsWithoutData, 0)); // no limit for pings with no header/data
channelOptions.Add(new ChannelOption(ExtendedChannelOptions.MinSentPingIntervalWithoutDataMs, 300000)); // expecting 5m-frequent pings with no header/data.
}
return channelOptions;
}
#if NET6_0_OR_GREATER
private async Task MonitorConnectionAsync()
{
await Task.Yield();
ChannelState state = ChannelState.Idle;
ChannelState lastState = state;
ConnectivityState state = ConnectivityState.Idle;
ConnectivityState lastState = state;
var connectingStateTimer = new Stopwatch();
while (state != ChannelState.Shutdown)
while (state != ConnectivityState.Shutdown)
{
// We start monitoring for disconnections only after observing some 'connected' (i.e., Ready, Idle)
// state to avoid abandoning workers that may become available some time after the build
@ -438,17 +362,7 @@ namespace BuildXL.Engine.Distribution.Grpc
try
{
lastState = state;
if (m_dotNetClientEnabled)
{
#if NET6_0_OR_GREATER
await ((GrpcChannel)Channel).WaitForStateChangedAsync((ConnectivityState)(int)state, m_exitTokenSource.Token);
#endif
}
else
{
await ((Channel)Channel).TryWaitForStateChangedAsync(state);
}
await Channel.WaitForStateChangedAsync(state, m_exitTokenSource.Token);
state = State; // Pick up the new state as soon as possible as it may change
}
catch (ObjectDisposedException)
@ -467,7 +381,7 @@ namespace BuildXL.Engine.Distribution.Grpc
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"{lastState} -> {state}");
if (state == ChannelState.Ready && GrpcSettings.HeartbeatEnabled)
if (state == ConnectivityState.Ready && GrpcSettings.HeartbeatEnabled)
{
// When connected to the server, start heartbeat messages.
m_heartbeatAction.Start();
@ -475,7 +389,7 @@ namespace BuildXL.Engine.Distribution.Grpc
// Check if we're stuck in reconnection attemps after losing connection
// In this situation, the state will alternate between "Connecting" and "TransientFailure"
if (state == ChannelState.Connecting || state == ChannelState.TransientFailure)
if (state == ConnectivityState.Connecting || state == ConnectivityState.TransientFailure)
{
if (monitorConnectingState && !connectingStateTimer.IsRunning)
{
@ -496,7 +410,7 @@ namespace BuildXL.Engine.Distribution.Grpc
// If we requested 'exit' for the server, the channel can go to 'Idle' state.
// We should not reconnect to the channel again in that case.
if (state == ChannelState.Idle && !m_isExitCalledForServer)
if (state == ConnectivityState.Idle && !m_isExitCalledForServer)
{
m_counters.IncrementCounter(DistributionCounter.ConnectionManagerIdle);
@ -509,16 +423,6 @@ namespace BuildXL.Engine.Distribution.Grpc
}
}
/// <summary>
/// Ready for exit.
/// </summary>
public void ReadyForExit()
{
// If this is an exit operation, it will make the server to exit on the other machine.
// We need to be aware of this case as we do not want to reconnect to server.
m_isExitCalledForServer = true;
}
private async Task<bool> TryReconnectAsync()
{
if (m_numReconnectAttempts > 3)
@ -555,6 +459,55 @@ namespace BuildXL.Engine.Distribution.Grpc
return false;
}
private static bool IsNonRecoverableState(ConnectivityState state)
{
switch (state)
{
case ConnectivityState.Idle:
case ConnectivityState.Shutdown:
return true;
default:
return false;
}
}
private async Task<bool> TryConnectChannelAsync(TimeSpan timeout, string operation, StopwatchSlim? watch = null)
{
watch = watch ?? StopwatchSlim.Start();
try
{
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"Connecting by {operation}");
CancellationTokenSource source = new CancellationTokenSource();
source.CancelAfter(timeout);
await Channel.ConnectAsync(source.Token);
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"Connected in {(long)watch.Value.Elapsed.TotalMilliseconds}ms");
}
catch (Exception e)
{
#pragma warning disable EPC12 // Suspicious exception handling: only Message property is observed in exception block.
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"{State}. Failed to connect in {(long)watch.Value.Elapsed.TotalMilliseconds}ms. Failure {e.Message}");
#pragma warning restore EPC12 // Suspicious exception handling: only Message property is observed in exception block.
return false;
}
return true;
}
#else
private Task<bool> TryConnectChannelAsync(TimeSpan timeout, string operation, StopwatchSlim? watch = null) => Task.FromResult(false);
#endif
/// <summary>
/// Ready for exit.
/// </summary>
/// <remarks>
/// If this is an exit operation, it will make the server to exit on the other machine.
/// We need to be aware of this case as we do not want to reconnect to server.
/// </remarks>
public void ReadyForExit() => m_isExitCalledForServer = true;
public async Task CloseAsync()
{
if (!m_isShutdownInitiated)
@ -565,20 +518,13 @@ namespace BuildXL.Engine.Distribution.Grpc
m_heartbeatAction.Cancel();
m_heartbeatAction.Join();
if (m_dotNetClientEnabled)
{
#if NET6_0_OR_GREATER
((GrpcChannel)Channel).Dispose();
// WaitForStateChangedAsync hangs when you dispose/shutdown the channel when it is 'idle'.
// That's why, we pass a cancellation token to WaitForStateChangedAsync and cancel
m_exitTokenSource.Cancel();
Channel.Dispose();
#endif
}
else
{
await ((Channel)Channel).ShutdownAsync();
}
// WaitForStateChangedAsync hangs when you dispose/shutdown the channel when it is 'idle'.
// That's why, we pass a cancellation token to WaitForStateChangedAsync and cancel
m_exitTokenSource.Cancel();
}
if (m_monitorConnectionTask != null)
@ -587,10 +533,7 @@ namespace BuildXL.Engine.Distribution.Grpc
}
}
public void OnAttachmentCompleted()
{
m_attached = true;
}
public void OnAttachmentCompleted() => m_attached = true;
public async Task<RpcCallResult<Unit>> CallAsync(
Func<CallOptions, Task> func,
@ -659,12 +602,12 @@ namespace BuildXL.Engine.Distribution.Grpc
{
OnConnectionFailureAsync?.Invoke(this, new ConnectionFailureEventArgs(ConnectionFailureType.UnrecoverableFailure, e.Status.Detail));
state = RpcCallResultState.Failed;
// Unrecoverable failure - do not retry
break;
}
if (e.Status.StatusCode == StatusCode.InvalidArgument)
if (e.Status.StatusCode == StatusCode.InvalidArgument)
{
if (e.Trailers.Get(GrpcMetadata.InvocationIdMismatch)?.Value == GrpcMetadata.True)
{
@ -723,9 +666,7 @@ namespace BuildXL.Engine.Distribution.Grpc
lastFailure: failure);
}
public Task<RpcCallResult<Unit>> FinalizeStreamAsync<TRequest>(AsyncClientStreamingCall<TRequest, RpcResponse> stream)
{
return CallAsync(
public Task<RpcCallResult<Unit>> FinalizeStreamAsync<TRequest>(AsyncClientStreamingCall<TRequest, RpcResponse> stream) => CallAsync(
async (_) =>
{
await stream.RequestStream.CompleteAsync();
@ -734,51 +675,5 @@ namespace BuildXL.Engine.Distribution.Grpc
},
nameof(FinalizeStreamAsync),
doNotRetry: true);
}
private async Task<bool> TryConnectChannelAsync(TimeSpan timeout, string operation, StopwatchSlim? watch = null)
{
watch = watch ?? StopwatchSlim.Start();
try
{
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"Connecting by {operation}");
if (m_dotNetClientEnabled)
{
#if NET6_0_OR_GREATER
CancellationTokenSource source = new CancellationTokenSource();
source.CancelAfter(timeout);
await ((GrpcChannel)Channel).ConnectAsync(source.Token);
#endif
}
else
{
await ((Channel)Channel).ConnectAsync(DateTime.UtcNow.Add(timeout));
}
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"Connected in {(long)watch.Value.Elapsed.TotalMilliseconds}ms");
}
catch (Exception e)
{
#pragma warning disable EPC12 // Suspicious exception handling: only Message property is observed in exception block.
Logger.Log.GrpcTrace(m_loggingContext, m_ipAddress, $"{State}. Failed to connect in {(long)watch.Value.Elapsed.TotalMilliseconds}ms. Failure {e.Message}");
#pragma warning restore EPC12 // Suspicious exception handling: only Message property is observed in exception block.
return false;
}
return true;
}
private static bool IsNonRecoverableState(ChannelState state)
{
switch (state)
{
case ChannelState.Idle:
case ChannelState.Shutdown:
return true;
default:
return false;
}
}
}
}

Просмотреть файл

@ -57,7 +57,11 @@ namespace BuildXL.Engine.Distribution.Grpc
m_counters,
async (callOptions) => await m_client.HeartbeatAsync(GrpcUtils.EmptyResponse, callOptions));
m_connectionManager.OnConnectionFailureAsync += onConnectionFailureAsync;
#if NET6_0_OR_GREATER
m_client = new Orchestrator.OrchestratorClient(m_connectionManager.Channel);
#else
m_client = null;
#endif
m_initialized = true;
}

Просмотреть файл

@ -50,7 +50,11 @@ namespace BuildXL.Engine.Distribution.Grpc
m_counters,
async (callOptions) => await m_client.HeartbeatAsync(GrpcUtils.EmptyResponse, callOptions));
m_connectionManager.OnConnectionFailureAsync += m_onConnectionFailureAsync;
#if NET6_0_OR_GREATER
m_client = new Worker.WorkerClient(m_connectionManager.Channel);
#else
m_client = null;
#endif
}
public Task CloseAsync()

Просмотреть файл

@ -291,22 +291,6 @@ namespace BuildXL.Utilities.Configuration
/// </remarks>
public static readonly Setting<int> GrpcMaxAttempts = CreateSetting("BuildXLGrpcMaxAttempts", value => ParseInt32(value) ?? 2);
/// <summary>
/// Whether new .Net client is enabled for grpc
/// </summary>
/// <remarks>
/// Default enabled
/// </remarks>
public static readonly Setting<bool> GrpcDotNetClientEnabled = CreateSetting("BuildXLGrpcDotNetClientEnabled", value => string.IsNullOrWhiteSpace(value) || value == "1");
/// <summary>
/// Whether MonitorConnections enabled for grpc.net
/// </summary>
/// <remarks>
/// Default enabled
/// </remarks>
public static readonly Setting<bool> GrpcDotNetMonitorConnectionsEnabled = CreateSetting("BuildXLGrpcDotNetMonitorConnectionsEnabled", value => string.IsNullOrWhiteSpace(value) || value == "1");
/// <summary>
/// Whether streaming is enabled for grpc calls
/// </summary>