diff --git a/Public/Src/Cache/ContentStore/DistributedTest/Stores/TestDistributedCacheSameMachine.cs b/Public/Src/Cache/ContentStore/DistributedTest/Stores/TestDistributedCacheSameMachine.cs index baeabb94f..6f3c98224 100644 --- a/Public/Src/Cache/ContentStore/DistributedTest/Stores/TestDistributedCacheSameMachine.cs +++ b/Public/Src/Cache/ContentStore/DistributedTest/Stores/TestDistributedCacheSameMachine.cs @@ -166,7 +166,7 @@ namespace ContentStoreTest.Distributed.Stores private async Task CheckGrpcPortIsOpen(Context context, uint grpcPort) { - var client = new GrpcClient(new ServiceClientContentSessionTracer(nameof(CheckGrpcPortIsOpen)), FileSystem, grpcPort, nameof(CheckGrpcPortIsOpen)); + var client = new GrpcContentClient(new ServiceClientContentSessionTracer(nameof(CheckGrpcPortIsOpen)), FileSystem, (int)grpcPort, nameof(CheckGrpcPortIsOpen)); var sw = Stopwatch.StartNew(); while (sw.ElapsedMilliseconds < ReadyWaitMs * 5) diff --git a/Public/Src/Cache/ContentStore/Library/Service/Grpc/GrpcClient.cs b/Public/Src/Cache/ContentStore/Library/Service/Grpc/GrpcClient.cs deleted file mode 100644 index 053d33ead..000000000 --- a/Public/Src/Cache/ContentStore/Library/Service/Grpc/GrpcClient.cs +++ /dev/null @@ -1,747 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using BuildXL.Cache.ContentStore.Extensions; -using BuildXL.Cache.ContentStore.Hashing; -using BuildXL.Cache.ContentStore.Interfaces.Extensions; -using BuildXL.Cache.ContentStore.Interfaces.FileSystem; -using BuildXL.Cache.ContentStore.Interfaces.Results; -using BuildXL.Cache.ContentStore.Interfaces.Sessions; -using BuildXL.Cache.ContentStore.Interfaces.Stores; -using BuildXL.Cache.ContentStore.Interfaces.Tracing; -using BuildXL.Cache.ContentStore.Sessions; -using BuildXL.Cache.ContentStore.Timers; -using BuildXL.Cache.ContentStore.UtilitiesCore; -using ContentStore.Grpc; -using Google.Protobuf; -using Grpc.Core; -// Can't rename ProtoBuf - -namespace BuildXL.Cache.ContentStore.Service.Grpc -{ - /// - /// An implementation of a CAS service client based on GRPC. - /// - public sealed class GrpcClient : IRpcClient - { - private const Capabilities DefaultClientCapabilities = Capabilities.All; - private const string HeartbeatName = "Heartbeat"; - private const int DefaultHeartbeatIntervalMinutes = 1; - private readonly Channel _channel; - private readonly ContentServer.ContentServerClient _client; - private readonly string _scenario; - private readonly ServiceClientContentSessionTracer _tracer; - private readonly IAbsFileSystem _fileSystem; - private readonly uint _grpcPort; - private readonly TimeSpan _heartbeatInterval; - private Capabilities _clientCapabilities; - private Capabilities _serviceCapabilities; - private IntervalTimer _heartbeatTimer; - private SessionState _sessionState; - - private bool _serviceUnavailable; - - /// - /// Initializes a new instance of the class. - /// - public GrpcClient(ServiceClientContentSessionTracer tracer, IAbsFileSystem fileSystem, uint grpcPort, string scenario, TimeSpan? heartbeatInterval = null) - { - _tracer = tracer; - _fileSystem = fileSystem; - _grpcPort = grpcPort; - _scenario = scenario; - GrpcEnvironment.InitializeIfNeeded(); - _channel = new Channel(GrpcEnvironment.Localhost, (int)grpcPort, ChannelCredentials.Insecure); - _client = new ContentServer.ContentServerClient(_channel); - _clientCapabilities = DefaultClientCapabilities; - _heartbeatInterval = heartbeatInterval ?? TimeSpan.FromMinutes(DefaultHeartbeatIntervalMinutes); - } - - /// - public bool ShutdownCompleted { get; private set; } - - /// - public bool ShutdownStarted { get; private set; } - - /// - public async Task ShutdownAsync(Context context) - { - ShutdownStarted = true; - try - { - _heartbeatTimer?.Dispose(); - - if (_sessionState == null) - { - // _sessionState is null if initialization has failed. - return BoolResult.Success; - } - - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - return new BoolResult(sessionResult); - } - - int sessionId = sessionResult.Data; - - if (_serviceUnavailable) - { - context.Debug("Skipping session shutdown because service is unavailable."); - } - else - { - try - { - await _client.ShutdownSessionAsync(new ShutdownRequest {Header = new RequestHeader(context.Id, sessionId)}); - } - catch (RpcException e) - { - context.Error($"Failed to shut down session with error: {e}"); - } - } - - await _channel.ShutdownAsync(); - ShutdownCompleted = true; - return BoolResult.Success; - } - catch (Exception ex) - { - // Catching all exceptions, even ClientCanRetryExceptions, because the teardown steps aren't idempotent. - // In the worst case, the shutdown call comes while the service is offline, the service never receives it, - // and then the service times out the session 10 minutes later (by default). - ShutdownCompleted = true; - return new BoolResult(ex); - } - } - - private async Task SendHeartbeatAsync(Context context) - { - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - // We do not even attempt to send a heartbeat if we can't get a session ID. - return; - } - - int sessionId = sessionResult.Data; - - try - { - HeartbeatResponse response = await _client.HeartbeatAsync(new HeartbeatRequest {Header = new RequestHeader(context.Id, sessionId) }); - - // Check for null header here as a workaround to a known service bug, which returns null headers on successful heartbeats. - if (response?.Header != null && !response.Header.Succeeded) - { - _tracer.Warning( - context, - $"Heartbeat failed: ErrorMessage=[{response.Header.ErrorMessage}] Diagnostics=[{response.Header.Diagnostics}]"); - - // Nor do we attempt to reset a session ID based on a failed heartbeat. - } - } - catch (Exception ex) - { - string message = (ex is RpcException rpcEx) && (rpcEx.Status.StatusCode == StatusCode.Unavailable) - ? "Heartbeat failed to detect running service." - : $"Heartbeat failed: [{ex}]"; - _tracer.Debug(context, message); - } - } - - /// - public async Task CreateSessionAsync( - Context context, - string name, - string cacheName, - ImplicitPin implicitPin) - { - var startupResult = await StartupAsync(context, 5000); - if (!startupResult.Succeeded) - { - return startupResult; - } - - CreateSessionResponse response = await CreateSessionAsyncInternalAsync(context, name, cacheName, implicitPin); - if (string.IsNullOrEmpty(response.ErrorMessage)) - { - Task> sessionFactory() => CreateSessionDataAsync(context, name, cacheName, implicitPin); - SessionData data = new SessionData { SessionId = response.SessionId, TemporaryDirectory = new DisposableDirectory(_fileSystem, new AbsolutePath(response.TempDirectory)) }; - _sessionState = new SessionState(sessionFactory, data); - - // Send a heartbeat iff both the service can receive one and the service was told to expect one. - if ((_serviceCapabilities & Capabilities.Heartbeat) != 0 && - (_clientCapabilities & Capabilities.Heartbeat) != 0) - { - _heartbeatTimer = new IntervalTimer(() => SendHeartbeatAsync(context), _heartbeatInterval, message => - { - _tracer.Debug(context, $"[{HeartbeatName}] {message}"); - }); - } - - return new StructResult(response.SessionId); - } - else - { - return new StructResult(response.ErrorMessage); - } - } - - /// - public Task StartupAsync(Context context) - { - return StartupAsync(context, 5000); - } - - /// - public async Task StartupAsync(Context context, int waitMs) - { - try - { - if (!LocalContentServer.EnsureRunning(context, _scenario, waitMs)) - { - throw new ClientCanRetryException(context, $"{nameof(GrpcClient)} failed to detect running service for scenario {_scenario} during startup"); - } - - context.Always($"Starting up GRPC client against service on port {_grpcPort}"); - - HelloResponse helloResponse; - using (var ct = new CancellationTokenSource(waitMs)) - { - helloResponse = await RunClientActionAndThrowIfFailedAsync( - context, - async () => await _client.HelloAsync( - new HelloRequest(), - cancellationToken: ct.Token)); - } - - if (helloResponse.Success) - { - _serviceCapabilities = (Capabilities) helloResponse.Capabilities; - } - else - { - return new BoolResult("Failed to connect to service for unknown reason."); - } - } - catch (Exception ex) when (!(ex is ClientCanRetryException)) - { - // The caller's retry policy needs to see ClientCanRetryExceptions in order to properly retry - return new BoolResult(ex); - } - - return BoolResult.Success; - } - - private async Task> CreateSessionDataAsync( - Context context, - string name, - string cacheName, - ImplicitPin implicitPin) - { - CreateSessionResponse response = await CreateSessionAsyncInternalAsync(context, name, cacheName, implicitPin); - if (string.IsNullOrEmpty(response.ErrorMessage)) - { - SessionData data = new SessionData() { SessionId = response.SessionId, TemporaryDirectory = new DisposableDirectory(_fileSystem, new AbsolutePath(response.TempDirectory))}; - return new ObjectResult(data); - } - else - { - return new ObjectResult(response.ErrorMessage); - } - } - - private async Task CreateSessionAsyncInternalAsync( - Context context, - string name, - string cacheName, - ImplicitPin implicitPin) - { - CreateSessionResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.CreateSessionAsync( - new CreateSessionRequest - { - CacheName = cacheName, - SessionName = name, - ImplicitPin = (int)implicitPin, - TraceId = context.Id.ToString(), - Capabilities = (int)_clientCapabilities - })); - return response; - } - - /// - public void Dispose() - { - _heartbeatTimer?.Dispose(); - _sessionState?.Dispose(); - } - - /// - public async Task OpenStreamAsync(Context context, ContentHash contentHash) - { - ObjectResult result = await _sessionState.GetDataAsync(); - if (!result.Succeeded) - { - return new OpenStreamResult(result); - } - - int sessionId = result.Data.SessionId; - AbsolutePath tempPath = result.Data.TemporaryDirectory.CreateRandomFileName(); - - var placeFileResult = await PlaceFileAsync( - context, - contentHash, - tempPath, - FileAccessMode.ReadOnly, - FileReplacementMode.None, - FileRealizationMode.HardLink, - sessionId); - - if (placeFileResult.Succeeded) - { - try - { - Stream stream = await _fileSystem.OpenReadOnlyAsync(tempPath, FileShare.Delete | FileShare.Read); - if (stream == null) - { - throw CreateServiceMayHaveRestarted(context, $"Failed to open temp file {tempPath}."); - } - - return new OpenStreamResult(stream); - } - catch (Exception ex) when (ex is DirectoryNotFoundException || ex is UnauthorizedAccessException) - { - throw new ClientCanRetryException(context, $"Failed to open temp file {tempPath}. The service may be restarting", ex); - } - catch (Exception ex) when (!(ex is ClientCanRetryException)) - { - // The caller's retry policy needs to see ClientCanRetryExceptions in order to properly retry - return new OpenStreamResult(ex); - } - } - else if (placeFileResult.Code == PlaceFileResult.ResultCode.NotPlacedContentNotFound) - { - return new OpenStreamResult(OpenStreamResult.ResultCode.ContentNotFound, placeFileResult.ErrorMessage); - } - else - { - return new OpenStreamResult(placeFileResult); - } - } - - /// - public async Task PinAsync(Context context, ContentHash contentHash) - { - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - return new PinResult(sessionResult); - } - - int sessionId = sessionResult.Data; - - DateTime startTime = DateTime.UtcNow; - PinResponse response = await RunClientActionAndThrowIfFailedAsync( - context, - async () => await _client.PinAsync( - new PinRequest - { - HashType = (int)contentHash.HashType, - ContentHash = contentHash.ToByteString(), - Header = new RequestHeader(context.Id, sessionId) - })); - - long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks; - _tracer.TrackClientWaitForServerTicks(ticksWaited); - - await ResetOnUnknownSessionAsync(context, response.Header, sessionId); - - return UnpackPinResult(response.Header); - } - - /// - public async Task>>> PinAsync( - Context context, - IReadOnlyList contentHashes) - { - if (contentHashes.Count == 0) - { - return new List>>(0); - } - - var pinResults = new List>>(); - - var pinTasks = new List>>>>(); - int i = 0; - const int batchSize = 500; - foreach (var chunk in contentHashes.GetPages(batchSize)) - { - pinTasks.Add(PinBatchAsync(context, i, chunk.ToList())); - i += batchSize; - } - - pinResults.AddRange((await Task.WhenAll(pinTasks)).SelectMany(pins => pins)); - return pinResults; - } - - private async Task>>> PinBatchAsync(Context context, int baseIndex, IReadOnlyList chunk) - { - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - PinResult pinResult = new PinResult(sessionResult); - return chunk.Select((ContentHash h) => pinResult).AsIndexed().AsTasks(); - } - - int sessionId = sessionResult.Data; - - var pinResults = new List>(); - var bulkPinRequest = new PinBulkRequest {Header = new RequestHeader(context.Id, sessionId)}; - foreach (var contentHash in chunk) - { - bulkPinRequest.Hashes.Add( - new ContentHashAndHashTypeData { HashType = (int)contentHash.HashType, ContentHash = contentHash.ToByteString() }); - } - - DateTime startTime = DateTime.UtcNow; - PinBulkResponse underlyingBulkPinResponse = await RunClientActionAndThrowIfFailedAsync( - context, - async () => await _client.PinBulkAsync(bulkPinRequest)); - long ticksWaited = underlyingBulkPinResponse.Header.Values.First().ServerReceiptTimeUtcTicks - startTime.Ticks; - _tracer.TrackClientWaitForServerTicks(ticksWaited); - - foreach (var response in underlyingBulkPinResponse.Header) - { - await ResetOnUnknownSessionAsync(context, response.Value, sessionId); - pinResults.Add(UnpackPinResult(response.Value).WithIndex(response.Key + baseIndex)); - } - - _tracer.LogPinResults(context, pinResults.Select(r => chunk[r.Index - baseIndex]).ToList(), pinResults.Select(r => r.Item).ToList()); - - return pinResults.AsTasks(); - } - - private PinResult UnpackPinResult(ResponseHeader header) - { - // Workaround: Handle the service returning negative result codes in error cases - var resultCode = header.Result < 0 ? PinResult.ResultCode.Error : (PinResult.ResultCode)header.Result; - string errorMessage = header.ErrorMessage; - return string.IsNullOrEmpty(errorMessage) - ? new PinResult(resultCode) - : new PinResult(resultCode, errorMessage, header.Diagnostics); - } - - /// - public async Task PlaceFileAsync( - Context context, - ContentHash contentHash, - AbsolutePath path, - FileAccessMode accessMode, - FileReplacementMode replacementMode, - FileRealizationMode realizationMode) - { - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - return new PlaceFileResult(sessionResult); - } - - int sessionId = sessionResult.Data; - - return await PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, sessionId); - } - - private async Task PlaceFileAsync( - Context context, - ContentHash contentHash, - AbsolutePath path, - FileAccessMode accessMode, - FileReplacementMode replacementMode, - FileRealizationMode realizationMode, - int sessionId) - { - var startTime = DateTime.UtcNow; - PlaceFileResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.PlaceFileAsync( - new PlaceFileRequest - { - Header = new RequestHeader(context.Id, sessionId), - HashType = (int)contentHash.HashType, - ContentHash = contentHash.ToByteString(), - Path = path.Path, - FileAccessMode = (int)accessMode, - FileRealizationMode = (int)realizationMode, - FileReplacementMode = (int)replacementMode - })); - long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks; - _tracer.TrackClientWaitForServerTicks(ticksWaited); - - // Workaround: Handle the service returning negative result codes in error cases - PlaceFileResult.ResultCode resultCode = response.Header.Result < 0 - ? PlaceFileResult.ResultCode.Error - : (PlaceFileResult.ResultCode)response.Header.Result; - if (!response.Header.Succeeded) - { - await ResetOnUnknownSessionAsync(context, response.Header, sessionId); - var message = string.IsNullOrEmpty(response.Header.ErrorMessage) - ? resultCode.ToString() - : response.Header.ErrorMessage; - return new PlaceFileResult(resultCode, message, response.Header.Diagnostics); - } - else - { - return new PlaceFileResult(resultCode, response.ContentSize); - } - } - - /// - public async Task PutFileAsync( - Context context, - ContentHash contentHash, - AbsolutePath path, - FileRealizationMode realizationMode) - { - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - return new PutResult(sessionResult, contentHash); - } - - int sessionId = sessionResult.Data; - return await PutFileAsync(context, contentHash, path, realizationMode, sessionId); - } - - /// - /// Gets the statistics from the remote. - /// - public async Task GetStatsAsync(Context context) - { - CounterSet counters = new CounterSet(); - - // Get stats iff compatible with service and client - if ((_serviceCapabilities & Capabilities.GetStats) != 0 && - (_clientCapabilities & Capabilities.GetStats) != 0) - { - var response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.GetStatsAsync(new GetStatsRequest())); - if (response.Success) - { - foreach (var entry in response.Statistics) - { - counters.Add(entry.Key, entry.Value); - } - } - } - - return new GetStatsResult(counters); - } - - private async Task PutFileAsync( - Context context, - ContentHash contentHash, - AbsolutePath path, - FileRealizationMode realizationMode, - int sessionId) - { - DateTime startTime = DateTime.UtcNow; - PutFileResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.PutFileAsync( - new PutFileRequest - { - Header = new RequestHeader(context.Id, sessionId), - ContentHash = contentHash.ToByteString(), - HashType = (int)contentHash.HashType, - FileRealizationMode = (int)realizationMode, - Path = path.Path - })); - long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks; - _tracer.TrackClientWaitForServerTicks(ticksWaited); - - if (!response.Header.Succeeded) - { - await ResetOnUnknownSessionAsync(context, response.Header, sessionId); - return new PutResult(contentHash, response.Header.ErrorMessage, response.Header.Diagnostics); - } - else - { - return new PutResult(response.ContentHash.ToContentHash((HashType)response.HashType), response.ContentSize); - } - } - - /// - public async Task PutFileAsync( - Context context, - HashType hashType, - AbsolutePath path, - FileRealizationMode realizationMode) - { - StructResult sessionResult = await _sessionState.GetIdAsync(); - if (!sessionResult.Succeeded) - { - return new PutResult(sessionResult, new ContentHash(hashType)); - } - - int sessionId = sessionResult.Data; - return await PutFileAsync(context, hashType, path, realizationMode, sessionId); - } - - private async Task PutFileAsync( - Context context, - HashType hashType, - AbsolutePath path, - FileRealizationMode realizationMode, - int sessionId) - { - DateTime startTime = DateTime.UtcNow; - PutFileResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.PutFileAsync( - new PutFileRequest - { - Header = new RequestHeader(context.Id, sessionId), - ContentHash = ByteString.Empty, - HashType = (int)hashType, - FileRealizationMode = (int)realizationMode, - Path = path.Path - })); - long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks; - _tracer.TrackClientWaitForServerTicks(ticksWaited); - - if (!response.Header.Succeeded) - { - await ResetOnUnknownSessionAsync(context, response.Header, sessionId); - return new PutResult(new ContentHash(hashType), response.Header.ErrorMessage, response.Header.Diagnostics); - } - else - { - return new PutResult(response.ContentHash.ToContentHash((HashType)response.HashType), response.ContentSize); - } - } - - /// - public Task PutStreamAsync(Context context, ContentHash contentHash, Stream stream) - { - return PutStreamInternalAsync( - context, - stream, - contentHash, - (sessionId, tempFile) => PutFileAsync(context, contentHash, tempFile, FileRealizationMode.HardLink, sessionId)); - } - - /// - public Task PutStreamAsync(Context context, HashType hashType, Stream stream) - { - return PutStreamInternalAsync( - context, - stream, - new ContentHash(hashType), - (sessionId, tempFile) => PutFileAsync(context, hashType, tempFile, FileRealizationMode.HardLink, sessionId)); - } - - private async Task PutStreamInternalAsync(Context context, Stream stream, ContentHash contentHash, Func> putFileFunc) - { - ObjectResult result = await _sessionState.GetDataAsync(); - if (!result.Succeeded) - { - return new PutResult(result, contentHash); - } - - int sessionId = result.Data.SessionId; - var tempFile = result.Data.TemporaryDirectory.CreateRandomFileName(); - try - { - if (stream.CanSeek) - { - stream.Position = 0; - } - - using (var fileStream = await _fileSystem.OpenAsync(tempFile, FileAccess.Write, FileMode.Create, FileShare.Delete)) - { - if (fileStream == null) - { - throw CreateServiceMayHaveRestarted(context, $"Could not create temp file {tempFile}."); - } - - await stream.CopyToAsync(fileStream); - } - - PutResult putResult = await putFileFunc(sessionId, tempFile); - - if (putResult.Succeeded) - { - return new PutResult(putResult.ContentHash, putResult.ContentSize); - } - else if (!_fileSystem.FileExists(tempFile)) - { - throw CreateServiceMayHaveRestarted(context, $"Temp file {tempFile} not found."); - } - else - { - return new PutResult(putResult, putResult.ContentHash); - } - } - catch (Exception ex) when (ex is DirectoryNotFoundException || ex is UnauthorizedAccessException) - { - throw new ClientCanRetryException(context, "Exception thrown during PutStreamInternal. The service may have shut down", ex); - } - catch (Exception ex) when (!(ex is ClientCanRetryException)) - { - // The caller's retry policy needs to see ClientCanRetryExceptions in order to properly retry - return new PutResult(ex, contentHash); - } - } - - private static ClientCanRetryException CreateServiceMayHaveRestarted(Context context, string baseMessage) - { - // This is a very important logic today: - // The service creates a temp directory for every session and it deletes all of them during shutdown - // and recreates when when it loads the hibernated sessions. - // This case is usually manifested via 'null' returned from FileSystem.OpenAsync because the file or part of the path is gone. - // This is recoverable error and the client of this code should try again later, because when the service is back - // it recreates all the temp directories for all the pending sessions back. - return new ClientCanRetryException(context, $"{baseMessage} The service may have restarted."); - } - - private async Task RunClientActionAndThrowIfFailedAsync(Context context, Func> func) - { - try - { - var result = await func(); - _serviceUnavailable = false; - await Task.Yield(); - return result; - } - catch (RpcException ex) - { - if (ex.Status.StatusCode == StatusCode.Unavailable) - { - // If the service is unavailable we can save time by not shutting down the service gracefully. - _serviceUnavailable = true; - throw new ClientCanRetryException(context, $"{nameof(GrpcClient)} failed to detect running service at port {_grpcPort} while running client action. [{ex}]"); - } - - throw new ClientCanRetryException(context, ex.ToString(), ex); - } - } - - /// - /// Test hook for overriding the client's capabilities - /// - internal void DisableCapabilities(Capabilities capabilities) - { - _clientCapabilities &= ~capabilities; - } - - private async Task ResetOnUnknownSessionAsync(Context context, ResponseHeader header, int sessionId) - { - // At the moment, the error message is the only way to identify that the error was due to - // the session ID being unknown to the server. - if (!header.Succeeded && header.ErrorMessage.Contains("Could not find session")) - { - await _sessionState.ResetAsync(sessionId); - throw new ClientCanRetryException(context, $"Could not find session id {sessionId}"); - } - } - } -} diff --git a/Public/Src/Cache/ContentStore/Library/Tracing/OperationContext.cs b/Public/Src/Cache/ContentStore/Library/Tracing/OperationContext.cs index 6e52cb875..4e160ff4b 100644 --- a/Public/Src/Cache/ContentStore/Library/Tracing/OperationContext.cs +++ b/Public/Src/Cache/ContentStore/Library/Tracing/OperationContext.cs @@ -103,11 +103,12 @@ namespace BuildXL.Cache.ContentStore.Tracing.Internal bool traceOperationStarted = true, bool traceOperationFinished = true, Func messageFactory = null, + string extraStartMessage = null, [CallerMemberName]string caller = null) where T : ResultBase { var self = this; var operationStartedAction = traceOperationStarted - ? () => operationTracer?.OperationStarted(self, caller, enabled: !traceErrorsOnly) + ? () => operationTracer?.OperationStarted(self, caller, enabled: !traceErrorsOnly, additionalInfo: extraStartMessage) : (Action)null; using (counter?.Start()) diff --git a/Public/Src/Cache/ContentStore/Vfs/App.config b/Public/Src/Cache/ContentStore/Vfs/App.config new file mode 100644 index 000000000..38db502be --- /dev/null +++ b/Public/Src/Cache/ContentStore/Vfs/App.config @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + diff --git a/Public/Src/Cache/ContentStore/Vfs/BuildXL.VirtualFileSystem.dsc b/Public/Src/Cache/ContentStore/Vfs/BuildXL.VirtualFileSystem.dsc new file mode 100644 index 000000000..b2eae71df --- /dev/null +++ b/Public/Src/Cache/ContentStore/Vfs/BuildXL.VirtualFileSystem.dsc @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +import * as ManagedSdk from "Sdk.Managed"; +import { NetFx } from "Sdk.BuildXL"; + +namespace VfsApplication { + export declare const qualifier: BuildXLSdk.FullFrameworkQualifier; + + @@public + export const exe = BuildXLSdk.executable({ + assemblyName: "bvfs", + sources: globR(d`.`,"*.cs"), + skipDocumentationGeneration: true, + appConfig: f`App.Config`, + references: [ + UtilitiesCore.dll, + Grpc.dll, + Hashing.dll, + Library.dll, + Interfaces.dll, + importFrom("Microsoft.Windows.ProjFS").pkg, + importFrom("BuildXL.Utilities").dll, + importFrom("BuildXL.Utilities").Branding.dll, + importFrom("BuildXL.Utilities").Collections.dll, + importFrom("BuildXL.Utilities").KeyValueStore.dll, + importFrom("BuildXL.Utilities").Native.dll, + importFrom("BuildXL.Utilities").ToolSupport.dll, + importFrom("Sdk.Selfhost.RocksDbSharp").pkg, + + importFrom("Grpc.Core").pkg, + importFrom("Google.Protobuf").pkg, + + ManagedSdk.Factory.createBinary(importFrom("TransientFaultHandling.Core").Contents.all, r`lib/NET4/Microsoft.Practices.TransientFaultHandling.Core.dll`), + ], + runtimeContent: [ + + ] + }); +} diff --git a/Public/Src/Cache/ContentStore/Vfs/Placeholder.cs b/Public/Src/Cache/ContentStore/Vfs/Placeholder.cs new file mode 100644 index 000000000..8c0994763 --- /dev/null +++ b/Public/Src/Cache/ContentStore/Vfs/Placeholder.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using BuildXL.Cache.ContentStore.Interfaces.FileSystem; + +namespace BuildXL.Cache.ContentStore.Vfs +{ + public class Placeholder + { + public static T Todo(string message = null, T value = default) + { + return value; + } + } +} diff --git a/Public/Src/Cache/ContentStore/Vfs/VfsCasConfiguration.cs b/Public/Src/Cache/ContentStore/Vfs/VfsCasConfiguration.cs new file mode 100644 index 000000000..2fddb577a --- /dev/null +++ b/Public/Src/Cache/ContentStore/Vfs/VfsCasConfiguration.cs @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using BuildXL.Cache.ContentStore.Interfaces.FileSystem; + +namespace BuildXL.Cache.ContentStore.Vfs +{ + public class VfsCasConfiguration + { + /// + /// Grpc port exposed by VFS CAS + /// + public int ServerGrpcPort { get; } + + /// + /// Grpc port of backing CAS service used to materialize files + /// + public int BackingGrpcPort { get; } + + /// + /// The root used by the VFS for materializing files + /// + public AbsolutePath RootPath { get; } + + /// + /// The data root used for state of the VFS + /// + public AbsolutePath DataRootPath { get; } + + /// + /// The cache root used for cache state of the VFS server + /// + public AbsolutePath ServerRootPath { get; } + + /// + /// The root virtualized directory + /// + public AbsolutePath VfsRootPath { get; } + + /// + /// The cache name + /// + public string CacheName { get; } + + /// + /// The scenario of the backing server (defines ready event wait handle name) + /// + public string Scenario { get; } + + /// + /// Specifies folder names under the VFS which will be junctioned to given destination paths + /// Maybe just names (i.e. no sub paths) + /// + public IReadOnlyDictionary VirtualizationMounts { get; } + + private VfsCasConfiguration(Builder builder) + { + ServerGrpcPort = builder.ServerGrpcPort; + BackingGrpcPort = builder.BackingGrpcPort; + RootPath = builder.RootPath; + VirtualizationMounts = builder.VirtualizationMounts.ToDictionary(kvp => kvp.Key, kvp => kvp.Value, StringComparer.OrdinalIgnoreCase); + CacheName = builder.CacheName; + Scenario = builder.Scenario; + + DataRootPath = RootPath / "data"; + VfsRootPath = RootPath / "vfs"; + ServerRootPath = RootPath / "server"; + } + + public class Builder + { + /// + /// Grpc port exposed by VFS CAS + /// + public int ServerGrpcPort { get; set; } + + /// + /// Grpc port of backing CAS service used to materialize files + /// + public int BackingGrpcPort { get; set; } + + /// + /// The root used by the VFS for materializing files + /// + public AbsolutePath RootPath { get; set; } + + /// + /// The cache name + /// + public string CacheName { get; set; } + + /// + /// The scenario of the backing server (defines ready event wait handle name) + /// + public string Scenario { get; set; } + + /// + /// Specifies folder names under the VFS which will be junctioned to given destination paths + /// Maybe just names (i.e. no sub paths) + /// + public Dictionary VirtualizationMounts { get; } = new Dictionary(StringComparer.OrdinalIgnoreCase); + + /// + /// Creates a VfsCasConfiguration + /// + public VfsCasConfiguration Build() + { + return new VfsCasConfiguration(this); + } + } + } +} diff --git a/Public/Src/Cache/ContentStore/Vfs/VfsCasRunner.cs b/Public/Src/Cache/ContentStore/Vfs/VfsCasRunner.cs new file mode 100644 index 000000000..77b842df5 --- /dev/null +++ b/Public/Src/Cache/ContentStore/Vfs/VfsCasRunner.cs @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using BuildXL.Cache.ContentStore.FileSystem; +using BuildXL.Cache.ContentStore.Interfaces.FileSystem; +using BuildXL.Cache.ContentStore.Logging; +using BuildXL.Cache.ContentStore.Service; +using BuildXL.Cache.ContentStore.Sessions; +using BuildXL.Cache.ContentStore.Stores; + +namespace BuildXL.Cache.ContentStore.Vfs +{ + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using BuildXL.Cache.ContentStore.Interfaces.Extensions; + using BuildXL.Cache.ContentStore.Interfaces.Logging; + using BuildXL.Cache.ContentStore.Interfaces.Results; + using BuildXL.Cache.ContentStore.Interfaces.Stores; + using BuildXL.Cache.ContentStore.Interfaces.Tracing; + using BuildXL.Cache.ContentStore.Tracing; + using BuildXL.Cache.ContentStore.Tracing.Internal; + using BuildXL.Native.IO; + using static Placeholder; + + /// + /// Entry point of the application. + /// + public class VfsCasRunner + { + private Tracer Tracer { get; } = new Tracer(nameof(VfsCasRunner)); + + public async Task RunAsync(VfsCasConfiguration configuration) + { + // Create VFS root + using (var fileLog = new FileLog((configuration.RootPath / "Bvfs.log").Path)) + using (var logger = new Logger(fileLog)) + { + var fileSystem = new PassThroughFileSystem(logger); + var context = new OperationContext(new Context(logger)); + + // Map junctions into VFS root + foreach (var mount in configuration.VirtualizationMounts) + { + CreateJunction(context, source: mount.Value, target: configuration.VfsRootPath / mount.Key); + } + + var clientContentStore = new ServiceClientContentStore( + logger, + fileSystem, + new ServiceClientContentStoreConfiguration( + configuration.CacheName, + new ServiceClientRpcConfiguration( + configuration.BackingGrpcPort), + scenario: configuration.Scenario)); + + // Client is startup/shutdown with wrapping VFS content store + + using (var server = new LocalContentServer( + fileSystem, + logger, + scenario: "bvfs" + configuration.ServerGrpcPort, + path => new VirtualizedContentStore(clientContentStore), + new LocalServerConfiguration( + configuration.DataRootPath, + new Dictionary() + { + { configuration.CacheName, configuration.ServerRootPath } + }, + configuration.ServerGrpcPort))) + { + + await server.StartupAsync(context).ThrowIfFailure(); + + await WaitForTerminationAsync(context); + + await server.ShutdownAsync(context).ThrowIfFailure(); + } + } + } + + private static Task WaitForTerminationAsync(Context context) + { + var termination = new TaskCompletionSource(); + + Console.CancelKeyPress += (sender, args) => + { + context.Debug("Terminating due to cancellation request on console."); + termination.TrySetResult(true); + }; + + Task.Run(() => + { + string line = null; + while ((line = Console.ReadLine()) != null) + { + if (line.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + context.Debug("Terminating due to exit request on console."); + termination.TrySetResult(true); + break; + } + } + + context.Debug("Terminating due to end of standard input."); + termination.TrySetResult(true); + }).FireAndForget(context); + + return termination.Task; + } + + private void CreateJunction(OperationContext context, AbsolutePath source, AbsolutePath target) + { + context.PerformOperation( + Tracer, + () => + { + Directory.CreateDirectory(target.Path); + Directory.CreateDirectory(source.Path); + FileUtilities.CreateJunction(source.Path, target.Path); + return BoolResult.Success; + }, + extraStartMessage: $"{source}=>{target}").ThrowIfFailure(); + } + } +} diff --git a/Public/Src/Cache/ContentStore/Vfs/VfsProgram.cs b/Public/Src/Cache/ContentStore/Vfs/VfsProgram.cs new file mode 100644 index 000000000..13cd7c9a2 --- /dev/null +++ b/Public/Src/Cache/ContentStore/Vfs/VfsProgram.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using BuildXL.ToolSupport; + +namespace BuildXL.Cache.ContentStore.Vfs +{ + using System.Diagnostics.ContractsLight; + using BuildXL.Cache.ContentStore.Interfaces.FileSystem; + using static CommandLineUtilities; + + public class VfsProgram : ToolProgram + { + public VfsProgram() + : base("Bvfs") + { + } + + public static int Main(string[] arguments) + { + try + { + return new VfsProgram().MainHandler(arguments); + } + catch (InvalidArgumentException e) + { + Console.Error.WriteLine("Execution error: " + (e.InnerException ?? e).Message); + } + + return -1; + } + + public override int Run(VfsCasConfiguration arguments) + { + var runner = new VfsCasRunner(); + runner.RunAsync(arguments).GetAwaiter().GetResult(); + return 0; + } + + public override bool TryParse(string[] rawArgs, out VfsCasConfiguration arguments) + { + var cli = new CommandLineUtilities(rawArgs); + var config = new VfsCasConfiguration.Builder(); + + Dictionary handlers = new OptionHandler[] + { + new OptionHandler(new[] { "serverPort", "sp" }, opt => config.ServerGrpcPort = (int)ParseUInt32Option(opt, 0, ushort.MaxValue)), + new OptionHandler(new[] { "backingPort", "bp" }, opt => config.BackingGrpcPort = (int)ParseUInt32Option(opt, 0, ushort.MaxValue)), + new OptionHandler(new[] { "root" }, opt => config.RootPath = new AbsolutePath(ParsePathOption(opt))), + new OptionHandler(new[] { "cacheName" }, opt => config.CacheName = ParseStringOption(opt)), + new OptionHandler(new[] { "scenario" }, opt => config.Scenario = ParseStringOption(opt), required: false), + new OptionHandler(new[] { "virtualizationMount", "vm" }, opt => + { + var kvp = ParseKeyValuePair(opt); + config.VirtualizationMounts[kvp.Key] = new AbsolutePath(GetFullPath(kvp.Value, opt)); + }), + } + .SelectMany(handler => handler.Names.Select(name => (name, handler))) + .ToDictionary(t => t.name, t => t.handler, StringComparer.OrdinalIgnoreCase); + + foreach (var opt in cli.Options) + { + if (opt.Name == "?" || opt.Name == "help") + { + // TODO: Help text + } + + if (handlers.TryGetValue(opt.Name, out var handler)) + { + handler.Handle(opt); + handler.Occurrences++; + } + else + { + throw new InvalidArgumentException($"Unrecognized option {opt.Name}"); + } + } + + foreach (var handler in handlers.Values.Where(h => h.Occurrences == 0 && h.Required)) + { + throw new InvalidArgumentException($"Option '{handler.Names[0]}' is required."); + } + + arguments = config.Build(); + return true; + } + + private class OptionHandler + { + public string[] Names { get; } + public Action