Start implementation of BuildXL VFS cas process. (#249)

* Start implementation of BuildXL VFS cas process. Currently just proxies to a backing content server. Next step is to actually add virtualization.

* PR feedback.

* Only VFS build on full framework.

* PR feedback.
This commit is contained in:
Lance Collins 2019-05-07 14:46:04 -07:00 коммит произвёл GitHub
Родитель 3c7e394ca7
Коммит 9b5ff34e1f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 637 добавлений и 753 удалений

Просмотреть файл

@ -166,7 +166,7 @@ namespace ContentStoreTest.Distributed.Stores
private async Task<BoolResult> CheckGrpcPortIsOpen(Context context, uint grpcPort)
{
var client = new GrpcClient(new ServiceClientContentSessionTracer(nameof(CheckGrpcPortIsOpen)), FileSystem, grpcPort, nameof(CheckGrpcPortIsOpen));
var client = new GrpcContentClient(new ServiceClientContentSessionTracer(nameof(CheckGrpcPortIsOpen)), FileSystem, (int)grpcPort, nameof(CheckGrpcPortIsOpen));
var sw = Stopwatch.StartNew();
while (sw.ElapsedMilliseconds < ReadyWaitMs * 5)

Просмотреть файл

@ -1,747 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Extensions;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Extensions;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Sessions;
using BuildXL.Cache.ContentStore.Timers;
using BuildXL.Cache.ContentStore.UtilitiesCore;
using ContentStore.Grpc;
using Google.Protobuf;
using Grpc.Core;
// Can't rename ProtoBuf
namespace BuildXL.Cache.ContentStore.Service.Grpc
{
/// <summary>
/// An implementation of a CAS service client based on GRPC.
/// </summary>
public sealed class GrpcClient : IRpcClient
{
private const Capabilities DefaultClientCapabilities = Capabilities.All;
private const string HeartbeatName = "Heartbeat";
private const int DefaultHeartbeatIntervalMinutes = 1;
private readonly Channel _channel;
private readonly ContentServer.ContentServerClient _client;
private readonly string _scenario;
private readonly ServiceClientContentSessionTracer _tracer;
private readonly IAbsFileSystem _fileSystem;
private readonly uint _grpcPort;
private readonly TimeSpan _heartbeatInterval;
private Capabilities _clientCapabilities;
private Capabilities _serviceCapabilities;
private IntervalTimer _heartbeatTimer;
private SessionState _sessionState;
private bool _serviceUnavailable;
/// <summary>
/// Initializes a new instance of the <see cref="GrpcClient" /> class.
/// </summary>
public GrpcClient(ServiceClientContentSessionTracer tracer, IAbsFileSystem fileSystem, uint grpcPort, string scenario, TimeSpan? heartbeatInterval = null)
{
_tracer = tracer;
_fileSystem = fileSystem;
_grpcPort = grpcPort;
_scenario = scenario;
GrpcEnvironment.InitializeIfNeeded();
_channel = new Channel(GrpcEnvironment.Localhost, (int)grpcPort, ChannelCredentials.Insecure);
_client = new ContentServer.ContentServerClient(_channel);
_clientCapabilities = DefaultClientCapabilities;
_heartbeatInterval = heartbeatInterval ?? TimeSpan.FromMinutes(DefaultHeartbeatIntervalMinutes);
}
/// <inheritdoc />
public bool ShutdownCompleted { get; private set; }
/// <inheritdoc />
public bool ShutdownStarted { get; private set; }
/// <inheritdoc />
public async Task<BoolResult> ShutdownAsync(Context context)
{
ShutdownStarted = true;
try
{
_heartbeatTimer?.Dispose();
if (_sessionState == null)
{
// _sessionState is null if initialization has failed.
return BoolResult.Success;
}
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
return new BoolResult(sessionResult);
}
int sessionId = sessionResult.Data;
if (_serviceUnavailable)
{
context.Debug("Skipping session shutdown because service is unavailable.");
}
else
{
try
{
await _client.ShutdownSessionAsync(new ShutdownRequest {Header = new RequestHeader(context.Id, sessionId)});
}
catch (RpcException e)
{
context.Error($"Failed to shut down session with error: {e}");
}
}
await _channel.ShutdownAsync();
ShutdownCompleted = true;
return BoolResult.Success;
}
catch (Exception ex)
{
// Catching all exceptions, even ClientCanRetryExceptions, because the teardown steps aren't idempotent.
// In the worst case, the shutdown call comes while the service is offline, the service never receives it,
// and then the service times out the session 10 minutes later (by default).
ShutdownCompleted = true;
return new BoolResult(ex);
}
}
private async Task SendHeartbeatAsync(Context context)
{
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
// We do not even attempt to send a heartbeat if we can't get a session ID.
return;
}
int sessionId = sessionResult.Data;
try
{
HeartbeatResponse response = await _client.HeartbeatAsync(new HeartbeatRequest {Header = new RequestHeader(context.Id, sessionId) });
// Check for null header here as a workaround to a known service bug, which returns null headers on successful heartbeats.
if (response?.Header != null && !response.Header.Succeeded)
{
_tracer.Warning(
context,
$"Heartbeat failed: ErrorMessage=[{response.Header.ErrorMessage}] Diagnostics=[{response.Header.Diagnostics}]");
// Nor do we attempt to reset a session ID based on a failed heartbeat.
}
}
catch (Exception ex)
{
string message = (ex is RpcException rpcEx) && (rpcEx.Status.StatusCode == StatusCode.Unavailable)
? "Heartbeat failed to detect running service."
: $"Heartbeat failed: [{ex}]";
_tracer.Debug(context, message);
}
}
/// <inheritdoc />
public async Task<BoolResult> CreateSessionAsync(
Context context,
string name,
string cacheName,
ImplicitPin implicitPin)
{
var startupResult = await StartupAsync(context, 5000);
if (!startupResult.Succeeded)
{
return startupResult;
}
CreateSessionResponse response = await CreateSessionAsyncInternalAsync(context, name, cacheName, implicitPin);
if (string.IsNullOrEmpty(response.ErrorMessage))
{
Task<ObjectResult<SessionData>> sessionFactory() => CreateSessionDataAsync(context, name, cacheName, implicitPin);
SessionData data = new SessionData { SessionId = response.SessionId, TemporaryDirectory = new DisposableDirectory(_fileSystem, new AbsolutePath(response.TempDirectory)) };
_sessionState = new SessionState(sessionFactory, data);
// Send a heartbeat iff both the service can receive one and the service was told to expect one.
if ((_serviceCapabilities & Capabilities.Heartbeat) != 0 &&
(_clientCapabilities & Capabilities.Heartbeat) != 0)
{
_heartbeatTimer = new IntervalTimer(() => SendHeartbeatAsync(context), _heartbeatInterval, message =>
{
_tracer.Debug(context, $"[{HeartbeatName}] {message}");
});
}
return new StructResult<int>(response.SessionId);
}
else
{
return new StructResult<int>(response.ErrorMessage);
}
}
/// <nodoc />
public Task<BoolResult> StartupAsync(Context context)
{
return StartupAsync(context, 5000);
}
/// <nodoc />
public async Task<BoolResult> StartupAsync(Context context, int waitMs)
{
try
{
if (!LocalContentServer.EnsureRunning(context, _scenario, waitMs))
{
throw new ClientCanRetryException(context, $"{nameof(GrpcClient)} failed to detect running service for scenario {_scenario} during startup");
}
context.Always($"Starting up GRPC client against service on port {_grpcPort}");
HelloResponse helloResponse;
using (var ct = new CancellationTokenSource(waitMs))
{
helloResponse = await RunClientActionAndThrowIfFailedAsync(
context,
async () => await _client.HelloAsync(
new HelloRequest(),
cancellationToken: ct.Token));
}
if (helloResponse.Success)
{
_serviceCapabilities = (Capabilities) helloResponse.Capabilities;
}
else
{
return new BoolResult("Failed to connect to service for unknown reason.");
}
}
catch (Exception ex) when (!(ex is ClientCanRetryException))
{
// The caller's retry policy needs to see ClientCanRetryExceptions in order to properly retry
return new BoolResult(ex);
}
return BoolResult.Success;
}
private async Task<ObjectResult<SessionData>> CreateSessionDataAsync(
Context context,
string name,
string cacheName,
ImplicitPin implicitPin)
{
CreateSessionResponse response = await CreateSessionAsyncInternalAsync(context, name, cacheName, implicitPin);
if (string.IsNullOrEmpty(response.ErrorMessage))
{
SessionData data = new SessionData() { SessionId = response.SessionId, TemporaryDirectory = new DisposableDirectory(_fileSystem, new AbsolutePath(response.TempDirectory))};
return new ObjectResult<SessionData>(data);
}
else
{
return new ObjectResult<SessionData>(response.ErrorMessage);
}
}
private async Task<CreateSessionResponse> CreateSessionAsyncInternalAsync(
Context context,
string name,
string cacheName,
ImplicitPin implicitPin)
{
CreateSessionResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.CreateSessionAsync(
new CreateSessionRequest
{
CacheName = cacheName,
SessionName = name,
ImplicitPin = (int)implicitPin,
TraceId = context.Id.ToString(),
Capabilities = (int)_clientCapabilities
}));
return response;
}
/// <inheritdoc />
public void Dispose()
{
_heartbeatTimer?.Dispose();
_sessionState?.Dispose();
}
/// <inheritdoc />
public async Task<OpenStreamResult> OpenStreamAsync(Context context, ContentHash contentHash)
{
ObjectResult<SessionData> result = await _sessionState.GetDataAsync();
if (!result.Succeeded)
{
return new OpenStreamResult(result);
}
int sessionId = result.Data.SessionId;
AbsolutePath tempPath = result.Data.TemporaryDirectory.CreateRandomFileName();
var placeFileResult = await PlaceFileAsync(
context,
contentHash,
tempPath,
FileAccessMode.ReadOnly,
FileReplacementMode.None,
FileRealizationMode.HardLink,
sessionId);
if (placeFileResult.Succeeded)
{
try
{
Stream stream = await _fileSystem.OpenReadOnlyAsync(tempPath, FileShare.Delete | FileShare.Read);
if (stream == null)
{
throw CreateServiceMayHaveRestarted(context, $"Failed to open temp file {tempPath}.");
}
return new OpenStreamResult(stream);
}
catch (Exception ex) when (ex is DirectoryNotFoundException || ex is UnauthorizedAccessException)
{
throw new ClientCanRetryException(context, $"Failed to open temp file {tempPath}. The service may be restarting", ex);
}
catch (Exception ex) when (!(ex is ClientCanRetryException))
{
// The caller's retry policy needs to see ClientCanRetryExceptions in order to properly retry
return new OpenStreamResult(ex);
}
}
else if (placeFileResult.Code == PlaceFileResult.ResultCode.NotPlacedContentNotFound)
{
return new OpenStreamResult(OpenStreamResult.ResultCode.ContentNotFound, placeFileResult.ErrorMessage);
}
else
{
return new OpenStreamResult(placeFileResult);
}
}
/// <inheritdoc />
public async Task<PinResult> PinAsync(Context context, ContentHash contentHash)
{
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
return new PinResult(sessionResult);
}
int sessionId = sessionResult.Data;
DateTime startTime = DateTime.UtcNow;
PinResponse response = await RunClientActionAndThrowIfFailedAsync(
context,
async () => await _client.PinAsync(
new PinRequest
{
HashType = (int)contentHash.HashType,
ContentHash = contentHash.ToByteString(),
Header = new RequestHeader(context.Id, sessionId)
}));
long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks;
_tracer.TrackClientWaitForServerTicks(ticksWaited);
await ResetOnUnknownSessionAsync(context, response.Header, sessionId);
return UnpackPinResult(response.Header);
}
/// <inheritdoc />
public async Task<IEnumerable<Task<Indexed<PinResult>>>> PinAsync(
Context context,
IReadOnlyList<ContentHash> contentHashes)
{
if (contentHashes.Count == 0)
{
return new List<Task<Indexed<PinResult>>>(0);
}
var pinResults = new List<Task<Indexed<PinResult>>>();
var pinTasks = new List<Task<IEnumerable<Task<Indexed<PinResult>>>>>();
int i = 0;
const int batchSize = 500;
foreach (var chunk in contentHashes.GetPages(batchSize))
{
pinTasks.Add(PinBatchAsync(context, i, chunk.ToList()));
i += batchSize;
}
pinResults.AddRange((await Task.WhenAll(pinTasks)).SelectMany(pins => pins));
return pinResults;
}
private async Task<IEnumerable<Task<Indexed<PinResult>>>> PinBatchAsync(Context context, int baseIndex, IReadOnlyList<ContentHash> chunk)
{
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
PinResult pinResult = new PinResult(sessionResult);
return chunk.Select((ContentHash h) => pinResult).AsIndexed().AsTasks();
}
int sessionId = sessionResult.Data;
var pinResults = new List<Indexed<PinResult>>();
var bulkPinRequest = new PinBulkRequest {Header = new RequestHeader(context.Id, sessionId)};
foreach (var contentHash in chunk)
{
bulkPinRequest.Hashes.Add(
new ContentHashAndHashTypeData { HashType = (int)contentHash.HashType, ContentHash = contentHash.ToByteString() });
}
DateTime startTime = DateTime.UtcNow;
PinBulkResponse underlyingBulkPinResponse = await RunClientActionAndThrowIfFailedAsync(
context,
async () => await _client.PinBulkAsync(bulkPinRequest));
long ticksWaited = underlyingBulkPinResponse.Header.Values.First().ServerReceiptTimeUtcTicks - startTime.Ticks;
_tracer.TrackClientWaitForServerTicks(ticksWaited);
foreach (var response in underlyingBulkPinResponse.Header)
{
await ResetOnUnknownSessionAsync(context, response.Value, sessionId);
pinResults.Add(UnpackPinResult(response.Value).WithIndex(response.Key + baseIndex));
}
_tracer.LogPinResults(context, pinResults.Select(r => chunk[r.Index - baseIndex]).ToList(), pinResults.Select(r => r.Item).ToList());
return pinResults.AsTasks();
}
private PinResult UnpackPinResult(ResponseHeader header)
{
// Workaround: Handle the service returning negative result codes in error cases
var resultCode = header.Result < 0 ? PinResult.ResultCode.Error : (PinResult.ResultCode)header.Result;
string errorMessage = header.ErrorMessage;
return string.IsNullOrEmpty(errorMessage)
? new PinResult(resultCode)
: new PinResult(resultCode, errorMessage, header.Diagnostics);
}
/// <inheritdoc />
public async Task<PlaceFileResult> PlaceFileAsync(
Context context,
ContentHash contentHash,
AbsolutePath path,
FileAccessMode accessMode,
FileReplacementMode replacementMode,
FileRealizationMode realizationMode)
{
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
return new PlaceFileResult(sessionResult);
}
int sessionId = sessionResult.Data;
return await PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, sessionId);
}
private async Task<PlaceFileResult> PlaceFileAsync(
Context context,
ContentHash contentHash,
AbsolutePath path,
FileAccessMode accessMode,
FileReplacementMode replacementMode,
FileRealizationMode realizationMode,
int sessionId)
{
var startTime = DateTime.UtcNow;
PlaceFileResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.PlaceFileAsync(
new PlaceFileRequest
{
Header = new RequestHeader(context.Id, sessionId),
HashType = (int)contentHash.HashType,
ContentHash = contentHash.ToByteString(),
Path = path.Path,
FileAccessMode = (int)accessMode,
FileRealizationMode = (int)realizationMode,
FileReplacementMode = (int)replacementMode
}));
long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks;
_tracer.TrackClientWaitForServerTicks(ticksWaited);
// Workaround: Handle the service returning negative result codes in error cases
PlaceFileResult.ResultCode resultCode = response.Header.Result < 0
? PlaceFileResult.ResultCode.Error
: (PlaceFileResult.ResultCode)response.Header.Result;
if (!response.Header.Succeeded)
{
await ResetOnUnknownSessionAsync(context, response.Header, sessionId);
var message = string.IsNullOrEmpty(response.Header.ErrorMessage)
? resultCode.ToString()
: response.Header.ErrorMessage;
return new PlaceFileResult(resultCode, message, response.Header.Diagnostics);
}
else
{
return new PlaceFileResult(resultCode, response.ContentSize);
}
}
/// <inheritdoc />
public async Task<PutResult> PutFileAsync(
Context context,
ContentHash contentHash,
AbsolutePath path,
FileRealizationMode realizationMode)
{
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
return new PutResult(sessionResult, contentHash);
}
int sessionId = sessionResult.Data;
return await PutFileAsync(context, contentHash, path, realizationMode, sessionId);
}
/// <summary>
/// Gets the statistics from the remote.
/// </summary>
public async Task<GetStatsResult> GetStatsAsync(Context context)
{
CounterSet counters = new CounterSet();
// Get stats iff compatible with service and client
if ((_serviceCapabilities & Capabilities.GetStats) != 0 &&
(_clientCapabilities & Capabilities.GetStats) != 0)
{
var response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.GetStatsAsync(new GetStatsRequest()));
if (response.Success)
{
foreach (var entry in response.Statistics)
{
counters.Add(entry.Key, entry.Value);
}
}
}
return new GetStatsResult(counters);
}
private async Task<PutResult> PutFileAsync(
Context context,
ContentHash contentHash,
AbsolutePath path,
FileRealizationMode realizationMode,
int sessionId)
{
DateTime startTime = DateTime.UtcNow;
PutFileResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.PutFileAsync(
new PutFileRequest
{
Header = new RequestHeader(context.Id, sessionId),
ContentHash = contentHash.ToByteString(),
HashType = (int)contentHash.HashType,
FileRealizationMode = (int)realizationMode,
Path = path.Path
}));
long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks;
_tracer.TrackClientWaitForServerTicks(ticksWaited);
if (!response.Header.Succeeded)
{
await ResetOnUnknownSessionAsync(context, response.Header, sessionId);
return new PutResult(contentHash, response.Header.ErrorMessage, response.Header.Diagnostics);
}
else
{
return new PutResult(response.ContentHash.ToContentHash((HashType)response.HashType), response.ContentSize);
}
}
/// <inheritdoc />
public async Task<PutResult> PutFileAsync(
Context context,
HashType hashType,
AbsolutePath path,
FileRealizationMode realizationMode)
{
StructResult<int> sessionResult = await _sessionState.GetIdAsync();
if (!sessionResult.Succeeded)
{
return new PutResult(sessionResult, new ContentHash(hashType));
}
int sessionId = sessionResult.Data;
return await PutFileAsync(context, hashType, path, realizationMode, sessionId);
}
private async Task<PutResult> PutFileAsync(
Context context,
HashType hashType,
AbsolutePath path,
FileRealizationMode realizationMode,
int sessionId)
{
DateTime startTime = DateTime.UtcNow;
PutFileResponse response = await RunClientActionAndThrowIfFailedAsync(context, async () => await _client.PutFileAsync(
new PutFileRequest
{
Header = new RequestHeader(context.Id, sessionId),
ContentHash = ByteString.Empty,
HashType = (int)hashType,
FileRealizationMode = (int)realizationMode,
Path = path.Path
}));
long ticksWaited = response.Header.ServerReceiptTimeUtcTicks - startTime.Ticks;
_tracer.TrackClientWaitForServerTicks(ticksWaited);
if (!response.Header.Succeeded)
{
await ResetOnUnknownSessionAsync(context, response.Header, sessionId);
return new PutResult(new ContentHash(hashType), response.Header.ErrorMessage, response.Header.Diagnostics);
}
else
{
return new PutResult(response.ContentHash.ToContentHash((HashType)response.HashType), response.ContentSize);
}
}
/// <inheritdoc />
public Task<PutResult> PutStreamAsync(Context context, ContentHash contentHash, Stream stream)
{
return PutStreamInternalAsync(
context,
stream,
contentHash,
(sessionId, tempFile) => PutFileAsync(context, contentHash, tempFile, FileRealizationMode.HardLink, sessionId));
}
/// <inheritdoc />
public Task<PutResult> PutStreamAsync(Context context, HashType hashType, Stream stream)
{
return PutStreamInternalAsync(
context,
stream,
new ContentHash(hashType),
(sessionId, tempFile) => PutFileAsync(context, hashType, tempFile, FileRealizationMode.HardLink, sessionId));
}
private async Task<PutResult> PutStreamInternalAsync(Context context, Stream stream, ContentHash contentHash, Func<int, AbsolutePath, Task<PutResult>> putFileFunc)
{
ObjectResult<SessionData> result = await _sessionState.GetDataAsync();
if (!result.Succeeded)
{
return new PutResult(result, contentHash);
}
int sessionId = result.Data.SessionId;
var tempFile = result.Data.TemporaryDirectory.CreateRandomFileName();
try
{
if (stream.CanSeek)
{
stream.Position = 0;
}
using (var fileStream = await _fileSystem.OpenAsync(tempFile, FileAccess.Write, FileMode.Create, FileShare.Delete))
{
if (fileStream == null)
{
throw CreateServiceMayHaveRestarted(context, $"Could not create temp file {tempFile}.");
}
await stream.CopyToAsync(fileStream);
}
PutResult putResult = await putFileFunc(sessionId, tempFile);
if (putResult.Succeeded)
{
return new PutResult(putResult.ContentHash, putResult.ContentSize);
}
else if (!_fileSystem.FileExists(tempFile))
{
throw CreateServiceMayHaveRestarted(context, $"Temp file {tempFile} not found.");
}
else
{
return new PutResult(putResult, putResult.ContentHash);
}
}
catch (Exception ex) when (ex is DirectoryNotFoundException || ex is UnauthorizedAccessException)
{
throw new ClientCanRetryException(context, "Exception thrown during PutStreamInternal. The service may have shut down", ex);
}
catch (Exception ex) when (!(ex is ClientCanRetryException))
{
// The caller's retry policy needs to see ClientCanRetryExceptions in order to properly retry
return new PutResult(ex, contentHash);
}
}
private static ClientCanRetryException CreateServiceMayHaveRestarted(Context context, string baseMessage)
{
// This is a very important logic today:
// The service creates a temp directory for every session and it deletes all of them during shutdown
// and recreates when when it loads the hibernated sessions.
// This case is usually manifested via 'null' returned from FileSystem.OpenAsync because the file or part of the path is gone.
// This is recoverable error and the client of this code should try again later, because when the service is back
// it recreates all the temp directories for all the pending sessions back.
return new ClientCanRetryException(context, $"{baseMessage} The service may have restarted.");
}
private async Task<T> RunClientActionAndThrowIfFailedAsync<T>(Context context, Func<Task<T>> func)
{
try
{
var result = await func();
_serviceUnavailable = false;
await Task.Yield();
return result;
}
catch (RpcException ex)
{
if (ex.Status.StatusCode == StatusCode.Unavailable)
{
// If the service is unavailable we can save time by not shutting down the service gracefully.
_serviceUnavailable = true;
throw new ClientCanRetryException(context, $"{nameof(GrpcClient)} failed to detect running service at port {_grpcPort} while running client action. [{ex}]");
}
throw new ClientCanRetryException(context, ex.ToString(), ex);
}
}
/// <summary>
/// Test hook for overriding the client's capabilities
/// </summary>
internal void DisableCapabilities(Capabilities capabilities)
{
_clientCapabilities &= ~capabilities;
}
private async Task ResetOnUnknownSessionAsync(Context context, ResponseHeader header, int sessionId)
{
// At the moment, the error message is the only way to identify that the error was due to
// the session ID being unknown to the server.
if (!header.Succeeded && header.ErrorMessage.Contains("Could not find session"))
{
await _sessionState.ResetAsync(sessionId);
throw new ClientCanRetryException(context, $"Could not find session id {sessionId}");
}
}
}
}

Просмотреть файл

@ -103,11 +103,12 @@ namespace BuildXL.Cache.ContentStore.Tracing.Internal
bool traceOperationStarted = true,
bool traceOperationFinished = true,
Func<T, string> messageFactory = null,
string extraStartMessage = null,
[CallerMemberName]string caller = null) where T : ResultBase
{
var self = this;
var operationStartedAction = traceOperationStarted
? () => operationTracer?.OperationStarted(self, caller, enabled: !traceErrorsOnly)
? () => operationTracer?.OperationStarted(self, caller, enabled: !traceErrorsOnly, additionalInfo: extraStartMessage)
: (Action)null;
using (counter?.Start())

Просмотреть файл

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.net>
<connectionManagement>
<!-- http://support.microsoft.com/kb/821268 Set the value of the maxconnection parameter to 12*N (where N is the number of CPUs that you have) -->
<add address = "*" maxconnection = "144" />
</connectionManagement>
</system.net>
<runtime>
<ThrowUnobservedTaskExceptions enabled="true"/>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="System.Interactive.Async" publicKeyToken="94bc3704cddfc263" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-3.0.3000.0" newVersion="3.0.3000.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,39 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import * as ManagedSdk from "Sdk.Managed";
import { NetFx } from "Sdk.BuildXL";
namespace VfsApplication {
export declare const qualifier: BuildXLSdk.FullFrameworkQualifier;
@@public
export const exe = BuildXLSdk.executable({
assemblyName: "bvfs",
sources: globR(d`.`,"*.cs"),
skipDocumentationGeneration: true,
appConfig: f`App.Config`,
references: [
UtilitiesCore.dll,
Grpc.dll,
Hashing.dll,
Library.dll,
Interfaces.dll,
importFrom("Microsoft.Windows.ProjFS").pkg,
importFrom("BuildXL.Utilities").dll,
importFrom("BuildXL.Utilities").Branding.dll,
importFrom("BuildXL.Utilities").Collections.dll,
importFrom("BuildXL.Utilities").KeyValueStore.dll,
importFrom("BuildXL.Utilities").Native.dll,
importFrom("BuildXL.Utilities").ToolSupport.dll,
importFrom("Sdk.Selfhost.RocksDbSharp").pkg,
importFrom("Grpc.Core").pkg,
importFrom("Google.Protobuf").pkg,
ManagedSdk.Factory.createBinary(importFrom("TransientFaultHandling.Core").Contents.all, r`lib/NET4/Microsoft.Practices.TransientFaultHandling.Core.dll`),
],
runtimeContent: [
]
});
}

Просмотреть файл

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
namespace BuildXL.Cache.ContentStore.Vfs
{
public class Placeholder
{
public static T Todo<T>(string message = null, T value = default)
{
return value;
}
}
}

Просмотреть файл

@ -0,0 +1,117 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
namespace BuildXL.Cache.ContentStore.Vfs
{
public class VfsCasConfiguration
{
/// <summary>
/// Grpc port exposed by VFS CAS
/// </summary>
public int ServerGrpcPort { get; }
/// <summary>
/// Grpc port of backing CAS service used to materialize files
/// </summary>
public int BackingGrpcPort { get; }
/// <summary>
/// The root used by the VFS for materializing files
/// </summary>
public AbsolutePath RootPath { get; }
/// <summary>
/// The data root used for state of the VFS
/// </summary>
public AbsolutePath DataRootPath { get; }
/// <summary>
/// The cache root used for cache state of the VFS server
/// </summary>
public AbsolutePath ServerRootPath { get; }
/// <summary>
/// The root virtualized directory
/// </summary>
public AbsolutePath VfsRootPath { get; }
/// <summary>
/// The cache name
/// </summary>
public string CacheName { get; }
/// <summary>
/// The scenario of the backing server (defines ready event wait handle name)
/// </summary>
public string Scenario { get; }
/// <summary>
/// Specifies folder names under the VFS which will be junctioned to given destination paths
/// Maybe just names (i.e. no sub paths)
/// </summary>
public IReadOnlyDictionary<string, AbsolutePath> VirtualizationMounts { get; }
private VfsCasConfiguration(Builder builder)
{
ServerGrpcPort = builder.ServerGrpcPort;
BackingGrpcPort = builder.BackingGrpcPort;
RootPath = builder.RootPath;
VirtualizationMounts = builder.VirtualizationMounts.ToDictionary(kvp => kvp.Key, kvp => kvp.Value, StringComparer.OrdinalIgnoreCase);
CacheName = builder.CacheName;
Scenario = builder.Scenario;
DataRootPath = RootPath / "data";
VfsRootPath = RootPath / "vfs";
ServerRootPath = RootPath / "server";
}
public class Builder
{
/// <summary>
/// Grpc port exposed by VFS CAS
/// </summary>
public int ServerGrpcPort { get; set; }
/// <summary>
/// Grpc port of backing CAS service used to materialize files
/// </summary>
public int BackingGrpcPort { get; set; }
/// <summary>
/// The root used by the VFS for materializing files
/// </summary>
public AbsolutePath RootPath { get; set; }
/// <summary>
/// The cache name
/// </summary>
public string CacheName { get; set; }
/// <summary>
/// The scenario of the backing server (defines ready event wait handle name)
/// </summary>
public string Scenario { get; set; }
/// <summary>
/// Specifies folder names under the VFS which will be junctioned to given destination paths
/// Maybe just names (i.e. no sub paths)
/// </summary>
public Dictionary<string, AbsolutePath> VirtualizationMounts { get; } = new Dictionary<string, AbsolutePath>(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// Creates a VfsCasConfiguration
/// </summary>
public VfsCasConfiguration Build()
{
return new VfsCasConfiguration(this);
}
}
}
}

Просмотреть файл

@ -0,0 +1,127 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using BuildXL.Cache.ContentStore.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Logging;
using BuildXL.Cache.ContentStore.Service;
using BuildXL.Cache.ContentStore.Sessions;
using BuildXL.Cache.ContentStore.Stores;
namespace BuildXL.Cache.ContentStore.Vfs
{
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Extensions;
using BuildXL.Cache.ContentStore.Interfaces.Logging;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Native.IO;
using static Placeholder;
/// <summary>
/// Entry point of the application.
/// </summary>
public class VfsCasRunner
{
private Tracer Tracer { get; } = new Tracer(nameof(VfsCasRunner));
public async Task RunAsync(VfsCasConfiguration configuration)
{
// Create VFS root
using (var fileLog = new FileLog((configuration.RootPath / "Bvfs.log").Path))
using (var logger = new Logger(fileLog))
{
var fileSystem = new PassThroughFileSystem(logger);
var context = new OperationContext(new Context(logger));
// Map junctions into VFS root
foreach (var mount in configuration.VirtualizationMounts)
{
CreateJunction(context, source: mount.Value, target: configuration.VfsRootPath / mount.Key);
}
var clientContentStore = new ServiceClientContentStore(
logger,
fileSystem,
new ServiceClientContentStoreConfiguration(
configuration.CacheName,
new ServiceClientRpcConfiguration(
configuration.BackingGrpcPort),
scenario: configuration.Scenario));
// Client is startup/shutdown with wrapping VFS content store
using (var server = new LocalContentServer(
fileSystem,
logger,
scenario: "bvfs" + configuration.ServerGrpcPort,
path => new VirtualizedContentStore(clientContentStore),
new LocalServerConfiguration(
configuration.DataRootPath,
new Dictionary<string, AbsolutePath>()
{
{ configuration.CacheName, configuration.ServerRootPath }
},
configuration.ServerGrpcPort)))
{
await server.StartupAsync(context).ThrowIfFailure();
await WaitForTerminationAsync(context);
await server.ShutdownAsync(context).ThrowIfFailure();
}
}
}
private static Task WaitForTerminationAsync(Context context)
{
var termination = new TaskCompletionSource<bool>();
Console.CancelKeyPress += (sender, args) =>
{
context.Debug("Terminating due to cancellation request on console.");
termination.TrySetResult(true);
};
Task.Run(() =>
{
string line = null;
while ((line = Console.ReadLine()) != null)
{
if (line.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
context.Debug("Terminating due to exit request on console.");
termination.TrySetResult(true);
break;
}
}
context.Debug("Terminating due to end of standard input.");
termination.TrySetResult(true);
}).FireAndForget(context);
return termination.Task;
}
private void CreateJunction(OperationContext context, AbsolutePath source, AbsolutePath target)
{
context.PerformOperation(
Tracer,
() =>
{
Directory.CreateDirectory(target.Path);
Directory.CreateDirectory(source.Path);
FileUtilities.CreateJunction(source.Path, target.Path);
return BoolResult.Success;
},
extraStartMessage: $"{source}=>{target}").ThrowIfFailure();
}
}
}

Просмотреть файл

@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BuildXL.ToolSupport;
namespace BuildXL.Cache.ContentStore.Vfs
{
using System.Diagnostics.ContractsLight;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using static CommandLineUtilities;
public class VfsProgram : ToolProgram<VfsCasConfiguration>
{
public VfsProgram()
: base("Bvfs")
{
}
public static int Main(string[] arguments)
{
try
{
return new VfsProgram().MainHandler(arguments);
}
catch (InvalidArgumentException e)
{
Console.Error.WriteLine("Execution error: " + (e.InnerException ?? e).Message);
}
return -1;
}
public override int Run(VfsCasConfiguration arguments)
{
var runner = new VfsCasRunner();
runner.RunAsync(arguments).GetAwaiter().GetResult();
return 0;
}
public override bool TryParse(string[] rawArgs, out VfsCasConfiguration arguments)
{
var cli = new CommandLineUtilities(rawArgs);
var config = new VfsCasConfiguration.Builder();
Dictionary<string, OptionHandler> handlers = new OptionHandler[]
{
new OptionHandler(new[] { "serverPort", "sp" }, opt => config.ServerGrpcPort = (int)ParseUInt32Option(opt, 0, ushort.MaxValue)),
new OptionHandler(new[] { "backingPort", "bp" }, opt => config.BackingGrpcPort = (int)ParseUInt32Option(opt, 0, ushort.MaxValue)),
new OptionHandler(new[] { "root" }, opt => config.RootPath = new AbsolutePath(ParsePathOption(opt))),
new OptionHandler(new[] { "cacheName" }, opt => config.CacheName = ParseStringOption(opt)),
new OptionHandler(new[] { "scenario" }, opt => config.Scenario = ParseStringOption(opt), required: false),
new OptionHandler(new[] { "virtualizationMount", "vm" }, opt =>
{
var kvp = ParseKeyValuePair(opt);
config.VirtualizationMounts[kvp.Key] = new AbsolutePath(GetFullPath(kvp.Value, opt));
}),
}
.SelectMany(handler => handler.Names.Select(name => (name, handler)))
.ToDictionary(t => t.name, t => t.handler, StringComparer.OrdinalIgnoreCase);
foreach (var opt in cli.Options)
{
if (opt.Name == "?" || opt.Name == "help")
{
// TODO: Help text
}
if (handlers.TryGetValue(opt.Name, out var handler))
{
handler.Handle(opt);
handler.Occurrences++;
}
else
{
throw new InvalidArgumentException($"Unrecognized option {opt.Name}");
}
}
foreach (var handler in handlers.Values.Where(h => h.Occurrences == 0 && h.Required))
{
throw new InvalidArgumentException($"Option '{handler.Names[0]}' is required.");
}
arguments = config.Build();
return true;
}
private class OptionHandler
{
public string[] Names { get; }
public Action<Option> Handle { get; }
public bool Required { get; }
public int Occurrences { get; set; }
public OptionHandler(string[] names, Action<Option> handle, bool required = true)
{
Contract.Requires(names.Length != 0);
Names = names;
Handle = handle;
Required = required;
}
}
}
}

Просмотреть файл

@ -0,0 +1,112 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Sessions;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Utilities.Collections;
using BuildXL.Utilities.Tasks;
using BuildXL.Utilities.Tracing;
namespace BuildXL.Cache.ContentStore.Vfs
{
/// <todoc />
public class VirtualizedContentSession : ContentSessionBase
{
private IContentSession InnerSession { get; }
private VirtualizedContentStore Store { get; }
protected override Tracer Tracer { get; } = new Tracer(nameof(VirtualizedContentSession));
public VirtualizedContentSession(VirtualizedContentStore store, IContentSession session, string name)
: base(name)
{
// TODO: ImplicitPin?
Store = store;
InnerSession = session;
}
/// <inheritdoc />
protected override async Task<BoolResult> StartupCoreAsync(OperationContext context)
{
await InnerSession.StartupAsync(context).ThrowIfFailure();
return await base.StartupCoreAsync(context);
}
/// <inheritdoc />
protected override async Task<BoolResult> ShutdownCoreAsync(OperationContext context)
{
var result = await base.ShutdownCoreAsync(context);
result &= await InnerSession.ShutdownAsync(context);
return result;
}
/// <inheritdoc />
protected override Task<OpenStreamResult> OpenStreamCoreAsync(OperationContext operationContext, ContentHash contentHash, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.OpenStreamAsync(operationContext, contentHash, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<PinResult> PinCoreAsync(OperationContext operationContext, ContentHash contentHash, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PinAsync(operationContext, contentHash, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<IEnumerable<Task<Indexed<PinResult>>>> PinCoreAsync(OperationContext operationContext, IReadOnlyList<ContentHash> contentHashes, UrgencyHint urgencyHint, Counter retryCounter, Counter fileCounter)
{
return InnerSession.PinAsync(operationContext, contentHashes, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<PlaceFileResult> PlaceFileCoreAsync(OperationContext operationContext, ContentHash contentHash, AbsolutePath path, FileAccessMode accessMode, FileReplacementMode replacementMode, FileRealizationMode realizationMode, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PlaceFileAsync(operationContext, contentHash, path, accessMode, replacementMode, realizationMode, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<IEnumerable<Task<Indexed<PlaceFileResult>>>> PlaceFileCoreAsync(OperationContext operationContext, IReadOnlyList<ContentHashWithPath> hashesWithPaths, FileAccessMode accessMode, FileReplacementMode replacementMode, FileRealizationMode realizationMode, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PlaceFileAsync(operationContext, hashesWithPaths, accessMode, replacementMode, realizationMode, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<PutResult> PutFileCoreAsync(OperationContext operationContext, ContentHash contentHash, AbsolutePath path, FileRealizationMode realizationMode, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PutFileAsync(operationContext, contentHash, path, realizationMode, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<PutResult> PutFileCoreAsync(OperationContext operationContext, HashType hashType, AbsolutePath path, FileRealizationMode realizationMode, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PutFileAsync(operationContext, hashType, path, realizationMode, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<PutResult> PutStreamCoreAsync(OperationContext operationContext, ContentHash contentHash, Stream stream, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PutStreamAsync(operationContext, contentHash, stream, operationContext.Token, urgencyHint);
}
/// <inheritdoc />
protected override Task<PutResult> PutStreamCoreAsync(OperationContext operationContext, HashType hashType, Stream stream, UrgencyHint urgencyHint, Counter retryCounter)
{
return InnerSession.PutStreamAsync(operationContext, hashType, stream, operationContext.Token, urgencyHint);
}
}
}

Просмотреть файл

@ -0,0 +1,82 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.Utils;
namespace BuildXL.Cache.ContentStore.Vfs
{
/// <summary>
/// A store which virtualizes calls to an underlying content store (i.e. content will
/// be lazily materialized using the projected file system filter driver)
/// </summary>
public class VirtualizedContentStore : StartupShutdownBase, IContentStore
{
private IContentStore InnerStore { get; }
protected override Tracer Tracer { get; } = new Tracer(nameof(VirtualizedContentStore));
/// <nodoc />
public VirtualizedContentStore(IContentStore innerStore)
{
InnerStore = innerStore;
}
/// <inheritdoc />
public CreateSessionResult<IReadOnlyContentSession> CreateReadOnlySession(Context context, string name, ImplicitPin implicitPin)
{
return CreateSession<IReadOnlyContentSession>(context, name, implicitPin);
}
/// <inheritdoc />
public CreateSessionResult<IContentSession> CreateSession(Context context, string name, ImplicitPin implicitPin)
{
return CreateSession<IContentSession>(context, name, implicitPin);
}
private CreateSessionResult<T> CreateSession<T>(Context context, string name, ImplicitPin implicitPin)
where T : class, IName
{
var operationContext = OperationContext(context);
return operationContext.PerformOperation(
Tracer,
() =>
{
var innerSessionResult = InnerStore.CreateSession(context, name, implicitPin).ThrowIfFailure();
var session = new VirtualizedContentSession(this, innerSessionResult.Session, name);
return new CreateSessionResult<T>(session as T);
});
}
/// <inheritdoc />
public Task<GetStatsResult> GetStatsAsync(Context context)
{
return InnerStore.GetStatsAsync(context);
}
/// <inheritdoc />
protected override async Task<BoolResult> StartupCoreAsync(OperationContext context)
{
await InnerStore.StartupAsync(context).ThrowIfFailure();
return await base.StartupCoreAsync(context);
}
/// <inheritdoc />
protected override async Task<BoolResult> ShutdownCoreAsync(OperationContext context)
{
// Close all sessions?
var result = await base.ShutdownCoreAsync(context);
result &= await InnerStore.ShutdownAsync(context);
return result;
}
}
}

Просмотреть файл

@ -17,6 +17,9 @@ namespace BuildXL {
// primary
importFrom("BuildXL.App").deployment,
importFrom("BuildXL.App").serverDeployment,
...(qualifier.targetFramework !== "net472" ? [] : [
importFrom("BuildXL.Cache.ContentStore").VfsApplication.exe,
]),
// analyzers
importFrom("BuildXL.Tools").Execution.Analyzer.exe,

Просмотреть файл

@ -92,10 +92,13 @@ namespace BuildXL.Storage
/// <summary>
/// Clients may switch between hashing algorithms. Must be set at the beginning of the build.
/// </summary>
public static void SetDefaultHashType()
public static void SetDefaultHashType(bool force = false)
{
if (!s_isInitialized || force)
{
SetDefaultHashType(HashType.Vso0);
}
}
/// <summary>
/// Clients may switch between hashing algorithms. Must be set at the beginning of the build.

Просмотреть файл

@ -334,7 +334,10 @@ namespace BuildXL.ToolSupport
/// </summary>
public static IEnumerable<string> ParseRepeatingPathOption(Option opt, string separator) => ParseRepeatingOption(opt, separator, v => GetFullPath(v, opt));
private static string GetFullPath(string path, Option opt)
/// <summary>
/// Gets the full path for an option
/// </summary>
public static string GetFullPath(string path, Option opt)
{
string fullPath = null;

Просмотреть файл

@ -136,6 +136,9 @@ config({
// Cpp Sdk
{ id: "VisualCppTools.Community.VS2017Layout", version: "14.11.25506"},
// ProjFS (virtual file system)
{ id: "Microsoft.Windows.ProjFS", version: "1.0.19079.1" },
// RocksDb
{ id: "RocksDbSharp", version: "5.8.0-b20181023.3", alias: "RocksDbSharpSigned" },
{ id: "RocksDbNative", version: "6.0.1-b20190426.4" },