Merged PR 579760: Perform Async IO when copying files in GrpcCopyClient

This PR:
- Upgrades gRPC and Protobuf to latest versions. This is due to bug fixes/perf improvements and new APIs, respectively.
- Extends usage of unsafe bytestrings to all non-compressed copies.
- Makes non-compressed copies be performed async instead of sync as before. We also read from the input stream and write into output at the same time.
- Enables unsafe bytestring usage for all tests

Related work items: #1773412
This commit is contained in:
Julian Bayardo 2020-10-19 19:09:41 +00:00
Родитель 591e6b6387
Коммит bc4ac3c2d2
32 изменённых файлов: 345 добавлений и 240 удалений

Просмотреть файл

@ -22,6 +22,7 @@ namespace App {
),
...kustoPackages,
...getSerializationPackages(true),
...getGrpcPackages(true),
UtilitiesCore.dll,
Grpc.dll,
Hashing.dll,
@ -33,9 +34,6 @@ namespace App {
importFrom("BuildXL.Cache.DistributedCache.Host").Configuration.dll,
importFrom("BuildXL.Utilities").dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
importFrom("Microsoft.IdentityModel.Clients.ActiveDirectory").pkg,
importFrom("Newtonsoft.Json").pkg,

Просмотреть файл

@ -65,6 +65,42 @@ export function getSerializationPackages(includeNetStandard: boolean) {
];
}
@@public
export function getProtobufPackages(includeNetStandard: boolean) {
return [
...(BuildXLSdk.isFullFramework && includeNetStandard ? [
NetFx.System.IO.dll,
...(qualifier.targetFramework === "net462" ? [
// HACK: Net462 doesn't ship with netstandard dlls, so we fetch them from Net472 instead. This
// may not work.
importFrom("Sdk.Managed.Frameworks.Net472").withQualifier({targetFramework: "net472"}).NetFx.Netstandard.dll
] : [
NetFx.Netstandard.dll,
])
] : []
),
BuildXLSdk.isFullFramework || qualifier.targetFramework === "netstandard2.0" ?
importFrom("System.Memory").withQualifier({ targetFramework: "netstandard2.0" }).pkg
: importFrom("System.Memory").pkg,
BuildXLSdk.isFullFramework || qualifier.targetFramework === "netstandard2.0" ?
importFrom("System.Buffers").withQualifier({ targetFramework: "netstandard2.0" }).pkg
: importFrom("System.Buffers").pkg,
importFrom("Google.Protobuf").pkg,
];
}
@@public
export function getGrpcPackages(includeNetStandard: boolean) {
return [
...getProtobufPackages(includeNetStandard),
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
];
}
namespace Default {
export declare const qualifier: BuildXLSdk.DefaultQualifierWithNet472;
@ -165,9 +201,7 @@ export const deploymentForBuildXL: Deployment.Definition = {
contents: [
App.exe,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...getGrpcPackages(true),
...addIf(qualifier.targetRuntime === "win-x64",
importFrom("Grpc.Core").Contents.all.getFile("runtimes/win/native/grpc_csharp_ext.x64.dll"),

Просмотреть файл

@ -49,8 +49,7 @@ namespace DistributedTest {
importFrom("BuildXL.Utilities").Collections.dll,
importFrom("BuildXL.Utilities").KeyValueStore.dll,
importFrom("BuildXL.Utilities").Native.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
...getGrpcPackages(true),
...importFrom("Sdk.Selfhost.RocksDbSharp").pkgs,
...BuildXLSdk.fluentAssertionsWorkaround,

Просмотреть файл

@ -157,6 +157,8 @@ namespace ContentStoreTest.Distributed.Sessions
IsDistributedEvictionEnabled = true,
IsRepairHandlingEnabled = true,
UseUnsafeByteStringConstruction = true,
SafeToLazilyUpdateMachineCountThreshold = SafeToLazilyUpdateMachineCountThreshold,
RestoreCheckpointIntervalMinutes = 1,

Просмотреть файл

@ -12,13 +12,7 @@ namespace Grpc {
...GrpcSdk.generateCSharp({rpc: [f`ContentStore.proto`]}).sources,
],
references: [
...addIf(BuildXLSdk.isFullFramework,
NetFx.System.IO.dll
),
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...getGrpcPackages(true),
...BuildXLSdk.bclAsyncPackages,
],
tools: {

Просмотреть файл

@ -26,9 +26,7 @@ namespace GrpcTest {
UtilitiesCore.dll,
InterfacesTest.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...getGrpcPackages(true),
...BuildXLSdk.fluentAssertionsWorkaround,
],
runtimeContent: [

Просмотреть файл

@ -34,9 +34,7 @@ namespace Library {
importFrom("BuildXL.Utilities").Native.dll,
importFrom("BuildXL.Utilities").Collections.dll,
importFrom("BuildXL.Cache.DistributedCache.Host").Configuration.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...getGrpcPackages(true),
...BuildXLSdk.bclAsyncPackages,
BuildXLSdk.Factory.createBinary(importFrom("TransientFaultHandling.Core").Contents.all, r`lib/NET4/Microsoft.Practices.TransientFaultHandling.Core.dll`),

Просмотреть файл

@ -3,9 +3,15 @@
using System;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
#nullable enable
namespace BuildXL.Cache.ContentStore.Service.Grpc
{
/// <summary>
@ -13,7 +19,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// </summary>
public static class ByteStringExtensions
{
private static readonly Lazy<Func<byte[], ByteString>> UnsafeFromBytesFunc = new Lazy<Func<byte[], ByteString>>(() => CreateUnsafeFromBytesFunc());
private static readonly Lazy<Func<byte[], ByteString>> UnsafeCreateByteStringFromBytes = new Lazy<Func<byte[], ByteString>>(() => UnsafeCreateByteStringFromBytesFunc());
/// <summary>
/// Creates <see cref="ByteString"/> without copying a given <paramref name="buffer"/>.
@ -24,20 +30,55 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// </remarks>
public static ByteString UnsafeCreateFromBytes(byte[] buffer)
{
return UnsafeFromBytesFunc.Value(buffer);
return UnsafeCreateByteStringFromBytes.Value(buffer);
}
private static Func<byte[], ByteString> CreateUnsafeFromBytesFunc()
private static Func<byte[], ByteString> UnsafeCreateByteStringFromBytesFunc()
{
var internalType = typeof(ByteString).GetNestedType("Unsafe", BindingFlags.NonPublic);
Contract.Assert(internalType != null, "Can't find ByteString.Unsafe type.");
var method = internalType.GetMethod("FromBytes", BindingFlags.Static | BindingFlags.NonPublic);
if (method == null)
{
throw new InvalidOperationException($"Can't find method Unsafe.FromBytes in ByteString class.");
throw new InvalidOperationException($"Can't find method Unsafe.FromBytes in `{nameof(ByteString)}` class.");
}
return (Func<byte[], ByteString>)Delegate.CreateDelegate(typeof(Func<byte[], ByteString>), method);
}
/// <summary>
/// Extracts the underlying byte[] from the <see cref="ByteString"/>.
/// </summary>
/// <remarks>
/// This method is unsafe, as expectation from the ByteString is that it's the sole owner of the underlying
/// buffer.
/// </remarks>
public static byte[] UnsafeExtractBytes(ByteString byteString)
{
Contract.RequiresNotNull(byteString);
var unsafeByteString = new UnsafeByteString()
{
ByteString = byteString,
};
return unsafeByteString.ByteStringClone!.Bytes!;
}
private class UnsafeByteStringExtractionHelper
{
#pragma warning disable CS0649 // The Bytes field is never null, so no need to warn about it
public readonly byte[]? Bytes;
#pragma warning restore CS0649
}
[StructLayout(LayoutKind.Explicit)]
private struct UnsafeByteString
{
[FieldOffset(0)]
public ByteString ByteString;
[FieldOffset(0)]
public UnsafeByteStringExtractionHelper ByteStringClone;
}
}
}

Просмотреть файл

@ -36,14 +36,6 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// <nodoc />
public enum GrpcContentServerCounters
{
/// <nodoc />
[CounterType(CounterType.Stopwatch)]
NormalByteStringConstructionInStreamContent,
/// <nodoc />
[CounterType(CounterType.Stopwatch)]
OptimizedByteStringConstructionInStreamContent,
/// <nodoc />
[CounterType(CounterType.Stopwatch)]
StreamContentReadFromDiskDuration,
@ -109,7 +101,6 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// </summary>
private readonly int _ongoingPushCountLimit;
private readonly bool _useUnsafeByteStringConstruction;
private readonly bool _traceGrpcOperations;
/// <nodoc />
@ -127,7 +118,6 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
_bufferSize = localServerConfiguration?.BufferSizeForGrpcCopies ?? ContentStore.Grpc.GrpcConstants.DefaultBufferSizeBytes;
_gzipSizeBarrier = localServerConfiguration?.GzipBarrierSizeForGrpcCopies ?? (_bufferSize * 8);
_ongoingPushCountLimit = localServerConfiguration?.ProactivePushCountLimit ?? LocalServerConfiguration.DefaultProactivePushCountLimit;
_useUnsafeByteStringConstruction = localServerConfiguration?.UseUnsafeByteStringConstruction ?? false;
_traceGrpcOperations = localServerConfiguration?.TraceGrpcOperations ?? false;
_pool = new ByteArrayPool(_bufferSize);
ContentSessionHandler = sessionHandler;
@ -342,7 +332,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
default:
throw new NotImplementedException($"Unknown result.Code '{result.Code}'.");
}
// Send the response headers.
await context.WriteResponseHeadersAsync(headers);
@ -379,7 +369,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
return context.Host;
}
private string GetFileIODuration(Stream? resultStream)
{
if (resultStream is TrackingFileStream tfs)
@ -503,12 +493,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
// From the docs: On the server side, MoveNext() does not throw exceptions.
// In case of a failure, the request stream will appear to be finished (MoveNext will return false)
// and the CancellationToken associated with the call will be cancelled to signal the failure.
while (await requestStream.MoveNext(token))
{
var request = requestStream.Current;
var bytes = request.Content.ToByteArray();
await tempFile.Stream.WriteAsync(bytes, 0, bytes.Length, token);
}
await GrpcExtensions.CopyChunksToStreamAsync(requestStream, tempFile.Stream, request => request.Content, cancellationToken: token);
}
if (token.IsCancellationRequested)
@ -548,72 +533,11 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
private async Task<Result<(long Chunks, long Bytes)>> StreamContentAsync(Stream input, byte[] buffer, byte[] secondaryBuffer, IServerStreamWriter<CopyFileResponse> responseStream, CancellationToken ct)
{
long chunks = 0L;
long bytes = 0L;
byte[] inputBuffer = buffer;
// Pre-fill buffer with the file's first chunk
int chunkSize = await readNextChunk(input, inputBuffer, ct);
while (true)
return await GrpcExtensions.CopyStreamToChunksAsync(input, responseStream, (content, chunks) => new CopyFileResponse()
{
ct.ThrowIfCancellationRequested();
if (chunkSize == 0) { break; }
// Created ByteString can reuse the buffer.
// To avoid double writes we need two buffers:
// * One buffer is used as the output buffer
// * And another buffer is used as the input buffer.
(ByteString content, bool bufferReused) = CreateByteStringForStreamContent(inputBuffer, chunkSize);
CopyFileResponse response = new CopyFileResponse() { Content = content, Index = chunks };
if (bufferReused)
{
// If the buffer is reused we need to swap the input buffer with the secondary buffer.
inputBuffer = inputBuffer == buffer ? secondaryBuffer : buffer;
}
bytes += chunkSize;
chunks++;
// Read the next chunk while waiting for the response
var readNextChunkTask = readNextChunk(input, inputBuffer, ct);
await Task.WhenAll(readNextChunkTask, responseStream.WriteAsync(response));
chunkSize = await readNextChunkTask;
}
return (chunks, bytes);
static async Task<int> readNextChunk(Stream input, byte[] buffer, CancellationToken token)
{
var result = await input.ReadAsync(buffer, 0, buffer.Length, token);
return result;
}
}
private (ByteString result, bool bufferReused) CreateByteStringForStreamContent(byte[] buffer, int chunkSize)
{
// In some cases ByteString construction can be very expensive both in terms CPU and memory.
// For instance, during the content streaming we understand and control the ownership of the buffers
// and in that case we can avoid extra copy done by ByteString.CopyFrom call.
//
// But we can use an unsafe version only when the chunk size is the same as the buffer size.
if (_useUnsafeByteStringConstruction && chunkSize == buffer.Length)
{
// The feature is configured, and we can pass the entire buffer into the unsafely created ByteString
using (Counters[GrpcContentServerCounters.OptimizedByteStringConstructionInStreamContent].Start())
{
return (ByteStringExtensions.UnsafeCreateFromBytes(buffer), bufferReused: true);
}
}
using (Counters[GrpcContentServerCounters.NormalByteStringConstructionInStreamContent].Start())
{
return (ByteString.CopyFrom(buffer, 0, chunkSize), bufferReused: false);
}
Content = content,
Index = chunks,
}, buffer, secondaryBuffer, cancellationToken: ct);
}
private async Task<Result<(long Chunks, long Bytes)>> StreamContentWithCompressionAsync(Stream input, byte[] buffer, byte[] secondaryBuffer, IServerStreamWriter<CopyFileResponse> responseStream, CancellationToken ct)
@ -864,7 +788,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
response.Result = code;
return response;
},
(context, errorMessage) => new DeleteContentResponse() {Header = ResponseHeader.Failure(context.StartTime, errorMessage)},
(context, errorMessage) => new DeleteContentResponse() { Header = ResponseHeader.Failure(context.StartTime, errorMessage) },
token: ct
);
}
@ -889,7 +813,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
CancellationToken token,
bool? traceStartAndStop = null,
bool obtainSession = true,
[CallerMemberName]string? caller = null)
[CallerMemberName] string? caller = null)
{
bool trace = traceStartAndStop ?? _traceGrpcOperations;
@ -949,7 +873,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
CancellationToken token)
{
return RunFuncAsync(
new RequestHeader(traceId, sessionId: -1),
new RequestHeader(traceId, sessionId: -1),
(context, _) => taskFunc(context),
failFunc,
token,

Просмотреть файл

@ -54,6 +54,8 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
private readonly BandwidthChecker _bandwidthChecker;
private readonly ByteArrayPool _pool;
/// <inheritdoc />
protected override Tracer Tracer { get; } = new Tracer(nameof(GrpcCopyClient));
@ -65,7 +67,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// <summary>
/// Initializes a new instance of the <see cref="GrpcCopyClient" /> class.
/// </summary>
internal GrpcCopyClient(GrpcCopyClientKey key, GrpcCopyClientConfiguration configuration, IClock? clock = null)
internal GrpcCopyClient(GrpcCopyClientKey key, GrpcCopyClientConfiguration configuration, IClock? clock = null, ByteArrayPool? sharedBufferPool = null)
{
Key = key;
_configuration = configuration;
@ -79,6 +81,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
_client = new ContentServer.ContentServerClient(_channel);
_bandwidthChecker = new BandwidthChecker(configuration.BandwidthCheckerConfiguration);
_pool = sharedBufferPool ?? new ByteArrayPool(_configuration.ClientBufferSizeBytes);
}
/// <inheritdoc />
@ -356,10 +359,10 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
switch (compression)
{
case CopyCompression.None:
await StreamContentAsync(targetStream, response.ResponseStream, options, token);
await StreamContentAsync(response.ResponseStream, targetStream, options, token);
break;
case CopyCompression.Gzip:
await StreamContentWithCompressionAsync(targetStream, response.ResponseStream, options, token);
await StreamContentWithCompressionAsync(response.ResponseStream, targetStream, options, token);
break;
default:
throw new NotSupportedException($"CopyCompression {compression} is not supported.");
@ -497,7 +500,11 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
async Task<PushFileResult> pushFileImplementation(Stream stream, CopyOptions options, long startingPosition, IClientStreamWriter<PushFileRequest> requestStream, IAsyncStreamReader<PushFileResponse> responseStream, Task<bool> responseMoveNext, Task responseCompletedTask, CancellationToken token)
{
await StreamContentAsync(stream, new byte[_configuration.ClientBufferSizeBytes], requestStream, options, token);
using (var primaryBufferHandle = _pool.Get())
using (var secondaryBufferHandle = _pool.Get())
{
await StreamContentAsync(stream, primaryBufferHandle.Value, secondaryBufferHandle.Value, requestStream, options, token);
}
token.ThrowIfCancellationRequested();
@ -533,71 +540,43 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
}
}
private async Task StreamContentAsync(Stream input, byte[] buffer, IClientStreamWriter<PushFileRequest> requestStream, CopyOptions options, CancellationToken ct)
private Task<(long Chunks, long Bytes)> StreamContentAsync(Stream input, byte[] primaryBuffer, byte[] secondaryBuffer, IClientStreamWriter<PushFileRequest> requestStream, CopyOptions options, CancellationToken cancellationToken)
{
Contract.Requires(!(input is null));
Contract.Requires(!(requestStream is null));
int chunkSize = 0;
long totalBytesRead = 0;
// Pre-fill buffer with the file's first chunk
await readNextChunk();
while (true)
{
if (ct.IsCancellationRequested)
{
return;
}
if (chunkSize == 0) { break; }
totalBytesRead += chunkSize;
ByteString content = ByteString.CopyFrom(buffer, 0, chunkSize);
var request = new PushFileRequest() { Content = content };
// Read the next chunk while waiting for the response
await Task.WhenAll(readNextChunk(), requestStream.WriteAsync(request));
options?.UpdateTotalBytesCopied(totalBytesRead);
}
async Task<int> readNextChunk() { chunkSize = await input.ReadAsync(buffer, 0, buffer.Length, ct); return chunkSize; }
return GrpcExtensions.CopyStreamToChunksAsync(
input,
requestStream,
(content, _) => new PushFileRequest() { Content = content },
primaryBuffer,
secondaryBuffer,
progressReport: (totalBytesRead) => options?.UpdateTotalBytesCopied(totalBytesRead),
cancellationToken);
}
private async Task<(long chunks, long bytes)> StreamContentAsync(Stream targetStream, IAsyncStreamReader<CopyFileResponse> replyStream, CopyOptions? options, CancellationToken ct)
private Task<(long Chunks, long Bytes)> StreamContentAsync(IAsyncStreamReader<CopyFileResponse> input, Stream output, CopyOptions? options, CancellationToken cancellationToken)
{
long chunks = 0L;
long bytes = 0L;
while (await replyStream.MoveNext(ct))
{
chunks++;
CopyFileResponse reply = replyStream.Current;
bytes += reply.Content.Length;
reply.Content.WriteTo(targetStream);
options?.UpdateTotalBytesCopied(bytes);
}
return (chunks, bytes);
return GrpcExtensions.CopyChunksToStreamAsync(
input,
output,
response => response.Content,
totalBytes => options?.UpdateTotalBytesCopied(totalBytes),
cancellationToken);
}
private async Task<(long chunks, long bytes)> StreamContentWithCompressionAsync(Stream targetStream, IAsyncStreamReader<CopyFileResponse> replyStream, CopyOptions? options, CancellationToken ct)
private async Task<(long chunks, long bytes)> StreamContentWithCompressionAsync(IAsyncStreamReader<CopyFileResponse> input, Stream output, CopyOptions? options, CancellationToken cancellationToken)
{
long chunks = 0L;
long bytes = 0L;
using (var grpcStream = new BufferedReadStream(async () =>
{
if (await replyStream.MoveNext(ct))
if (await input.MoveNext(cancellationToken))
{
chunks++;
bytes += replyStream.Current.Content.Length;
bytes += input.Current.Content.Length;
options?.UpdateTotalBytesCopied(bytes);
return replyStream.Current.Content;
return input.Current.Content;
}
else
{
@ -607,7 +586,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
{
using (Stream decompressedStream = new GZipStream(grpcStream, CompressionMode.Decompress, true))
{
await decompressedStream.CopyToAsync(targetStream, _configuration.ClientBufferSizeBytes, ct);
await decompressedStream.CopyToAsync(output, _configuration.ClientBufferSizeBytes, cancellationToken);
}
}

Просмотреть файл

@ -5,6 +5,7 @@ using System;
using System.Diagnostics.ContractsLight;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Time;
@ -87,6 +88,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
public sealed class GrpcCopyClientCache : IDisposable
{
private readonly GrpcCopyClientCacheConfiguration _configuration;
private readonly ByteArrayPool _grpcCopyClientBufferPool;
// NOTE: Unifying interfaces here is pain for no gain. Just dispatch manually and remove when obsolete later.
private readonly ResourcePool<GrpcCopyClientKey, GrpcCopyClient>? _resourcePool;
@ -100,6 +102,8 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
configuration ??= new GrpcCopyClientCacheConfiguration();
_configuration = configuration;
_grpcCopyClientBufferPool = new ByteArrayPool(configuration.GrpcCopyClientConfiguration.ClientBufferSizeBytes);
switch (_configuration.ResourcePoolVersion)
{
case GrpcCopyClientCacheConfiguration.PoolVersion.Disabled:
@ -108,14 +112,14 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
_resourcePool = new ResourcePool<GrpcCopyClientKey, GrpcCopyClient>(
context,
_configuration.ResourcePoolConfiguration,
(key) => new GrpcCopyClient(key, _configuration.GrpcCopyClientConfiguration),
(key) => new GrpcCopyClient(key, _configuration.GrpcCopyClientConfiguration, sharedBufferPool: _grpcCopyClientBufferPool),
clock);
break;
case GrpcCopyClientCacheConfiguration.PoolVersion.V2:
_resourcePoolV2 = new ResourcePoolV2<GrpcCopyClientKey, GrpcCopyClient>(
context,
_configuration.ResourcePoolConfiguration,
(key) => new GrpcCopyClient(key, _configuration.GrpcCopyClientConfiguration),
(key) => new GrpcCopyClient(key, _configuration.GrpcCopyClientConfiguration, sharedBufferPool: _grpcCopyClientBufferPool),
clock);
break;
}
@ -131,7 +135,7 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
{
case GrpcCopyClientCacheConfiguration.PoolVersion.Disabled:
{
var client = new GrpcCopyClient(key, _configuration.GrpcCopyClientConfiguration);
var client = new GrpcCopyClient(key, _configuration.GrpcCopyClientConfiguration, sharedBufferPool: _grpcCopyClientBufferPool);
await client.StartupAsync(context).ThrowIfFailure();
var result = await operation(context, new DefaultResourceWrapperAdapter<GrpcCopyClient>(client));

Просмотреть файл

@ -1,9 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
using ContentStore.Grpc;
using Google.Protobuf;
using Grpc.Core;
using System.Diagnostics.ContractsLight;
namespace BuildXL.Cache.ContentStore.Service.Grpc
{
@ -12,6 +18,12 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// </summary>
public static class GrpcExtensions
{
/// <summary>
/// Whether to use unsafe bytestring optimizations or not. These can potentially break memory safety of CASaaS,
/// but they also allow us to perform operations faster.
/// </summary>
public static bool UnsafeByteStringOptimizations { get; set; } = false;
/// <summary>
/// Converts a bytestring to a contenthash
/// </summary>
@ -29,10 +41,146 @@ namespace BuildXL.Cache.ContentStore.Service.Grpc
/// <summary>
/// Converts a bytestring to a contenthash
/// </summary>
public static ByteString ToByteString(in this ContentHash byteString)
public static ByteString ToByteString(in this ContentHash contentHash)
{
byte[] hashByteArray = byteString.ToHashByteArray();
byte[] hashByteArray = contentHash.ToHashByteArray();
return ByteString.CopyFrom(hashByteArray, 0, hashByteArray.Length);
}
/// <nodoc />
public static async Task<(long Chunks, long Bytes)> CopyChunksToStreamAsync<T>(
IAsyncStreamReader<T> input,
Stream output,
Func<T, ByteString> transform,
Action<long>? progressReport = null,
CancellationToken cancellationToken = default)
{
Contract.Requires(input != null);
Contract.Requires(output != null);
Contract.Requires(transform != null);
long totalChunksRead = 0L;
long totalBytesRead = 0L;
Task<bool> hasCurrentElement = input.MoveNext(cancellationToken);
while (await hasCurrentElement)
{
totalChunksRead++;
ByteString chunk = transform(input.Current);
totalBytesRead += chunk.Length;
progressReport?.Invoke(totalBytesRead);
var fetchNextTask = input.MoveNext(cancellationToken);
await Task.WhenAll(fetchNextTask, output.WriteByteStringAsync(chunk, cancellationToken));
hasCurrentElement = fetchNextTask;
}
return (totalChunksRead, totalBytesRead);
}
/// <nodoc />
public static async Task<(long Chunks, long Bytes)> CopyStreamToChunksAsync<T>(
Stream input,
IAsyncStreamWriter<T> output,
Func<ByteString, long, T> transform,
byte[] primaryBuffer,
byte[] secondaryBuffer,
Action<long>? progressReport = null,
CancellationToken cancellationToken = default)
{
Contract.Requires(input != null);
Contract.Requires(output != null);
Contract.Requires(transform != null);
Contract.Requires(primaryBuffer != null);
Contract.Requires(secondaryBuffer != null);
long totalChunksRead = 0L;
long totalBytesRead = 0L;
byte[] buffer = primaryBuffer;
// Pre-fill buffer with the file's first chunk
int chunkSize = await readNextChunk(input, buffer, cancellationToken);
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
if (chunkSize == 0) { break; }
// Created ByteString can reuse the buffer.
// To avoid double writes we need two buffers:
// * One buffer is used as the output buffer
// * And another buffer is used as the input buffer.
(ByteString content, bool bufferReused) = CreateByteStringForStreamContent(buffer, chunkSize);
T response = transform(content, totalChunksRead);
if (bufferReused)
{
// If the buffer is reused we need to swap the input buffer with the secondary buffer.
buffer = buffer == primaryBuffer ? secondaryBuffer : primaryBuffer;
}
totalBytesRead += chunkSize;
totalChunksRead++;
// Read the next chunk while waiting for the response
var readNextChunkTask = readNextChunk(input, buffer, cancellationToken);
await Task.WhenAll(readNextChunkTask, output.WriteAsync(response));
progressReport?.Invoke(totalBytesRead);
chunkSize = await readNextChunkTask;
}
return (totalChunksRead, totalBytesRead);
static async Task<int> readNextChunk(Stream input, byte[] buffer, CancellationToken token)
{
var bytesRead = await input.ReadAsync(buffer, 0, buffer.Length, token);
return bytesRead;
}
}
private static (ByteString content, bool bufferReused) CreateByteStringForStreamContent(byte[] buffer, int chunkSize)
{
// In some cases ByteString construction can be very expensive both in terms CPU and memory.
// For instance, during the content streaming we understand and control the ownership of the buffers
// and in that case we can avoid extra copy done by ByteString.CopyFrom call.
//
// But we can use an unsafe version only when the chunk size is the same as the buffer size.
if (UnsafeByteStringOptimizations && chunkSize == buffer.Length)
{
return (ByteStringExtensions.UnsafeCreateFromBytes(buffer), bufferReused: true);
}
return (ByteString.CopyFrom(buffer, 0, chunkSize), bufferReused: false);
}
#pragma warning disable AsyncFixer01 // Unnecessary async/await usage. This is a false positive that happens only in .NET Core / NetStandard 2.1.
/// <nodoc />
public static async Task WriteByteStringAsync(this Stream stream, ByteString byteString, CancellationToken cancellationToken = default)
{
Contract.Requires(stream != null);
Contract.Requires(byteString != null);
// Support for using Span in Stream's WriteAsync started in .NET Core 3.0 and .NET Standard 2.1. Since we
// may run in older runtimes, we fallback into using the unsafe bytes extraction technique, whereby we
// fetch the inner byte[] inside of the ByteString and write using that directly.
#if NETCOREAPP3_0 || NETCOREAPP3_1 || NETSTANDARD2_1
await stream.WriteAsync(byteString.Memory, cancellationToken);
#else
if (UnsafeByteStringOptimizations)
{
var buffer = ByteStringExtensions.UnsafeExtractBytes(byteString);
await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}
else
{
byteString.WriteTo(stream);
}
#endif
}
#pragma warning restore AsyncFixer01 // Unnecessary async/await usage
}
}

Просмотреть файл

@ -154,11 +154,6 @@ namespace BuildXL.Cache.ContentStore.Service
/// <nodoc />
public int? BufferSizeForGrpcCopies { get; private set; }
/// <summary>
/// If true, then the unsafe version of ByteString construction is used that avoids extra copy of the byte[].
/// </summary>
public bool UseUnsafeByteStringConstruction { get; set; }
/// <nodoc />
public const int DefaultProactivePushCountLimit = 128;

Просмотреть файл

@ -44,8 +44,7 @@ namespace Test {
],
runtimeContent: [
Library.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
...getGrpcPackages(true),
],
});
}

Просмотреть файл

@ -18,7 +18,7 @@ namespace ContentStoreTest.Extensions
[Fact]
public void UnsafeCreateFromBytesCreatesTheInstance()
{
byte[] input = new byte[]{1,2,3};
byte[] input = new byte[] { 1, 2, 3 };
var byteString = ByteStringExtensions.UnsafeCreateFromBytes(input);
var byteString2 = ByteString.CopyFrom(input);
@ -27,6 +27,18 @@ namespace ContentStoreTest.Extensions
// when they point to different byte arrays.
// This test mostely checks that the hacky solution works.
Assert.Equal(byteString, byteString2);
var buffer = ByteStringExtensions.UnsafeExtractBytes(byteString);
Assert.True(ReferenceEquals(input, buffer));
}
[Fact]
public void ExtractBytesHasTheSameBuffer()
{
byte[] input = new byte[] { 1, 2, 3 };
var byteString = ByteStringExtensions.UnsafeCreateFromBytes(input);
var buffer = ByteStringExtensions.UnsafeExtractBytes(byteString);
Assert.True(ReferenceEquals(buffer, input));
}
}
}

Просмотреть файл

@ -24,9 +24,7 @@ namespace VfsLibrary {
importFrom("BuildXL.Utilities").ToolSupport.dll,
...importFrom("Sdk.Selfhost.RocksDbSharp").pkgs,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...getGrpcPackages(true),
ManagedSdk.Factory.createBinary(importFrom("TransientFaultHandling.Core").Contents.all, r`lib/NET4/Microsoft.Practices.TransientFaultHandling.Core.dll`),
],

Просмотреть файл

@ -260,12 +260,17 @@ namespace BuildXL.Cache.Host.Service.Internal
serviceConfiguration.ProactivePushCountLimit = localCasServiceSettings.MaxProactivePushRequestHandlers;
var localContentServerConfiguration = new LocalServerConfiguration(serviceConfiguration);
ApplyIfNotNull(localCasServiceSettings.UnusedSessionTimeoutMinutes, value => localContentServerConfiguration.UnusedSessionTimeout = TimeSpan.FromMinutes(value));
ApplyIfNotNull(localCasServiceSettings.UnusedSessionHeartbeatTimeoutMinutes, value => localContentServerConfiguration.UnusedSessionHeartbeatTimeout = TimeSpan.FromMinutes(value));
ApplyIfNotNull(localCasServiceSettings.GrpcCoreServerOptions, value => localContentServerConfiguration.GrpcCoreServerOptions = value);
ApplyIfNotNull(localCasServiceSettings.GrpcEnvironmentOptions, value => localContentServerConfiguration.GrpcEnvironmentOptions = value);
ApplyIfNotNull(distributedSettings?.UseUnsafeByteStringConstruction, value => localContentServerConfiguration.UseUnsafeByteStringConstruction = value);
ApplyIfNotNull(distributedSettings?.UseUnsafeByteStringConstruction, value =>
{
GrpcExtensions.UnsafeByteStringOptimizations = value;
});
ApplyIfNotNull(distributedSettings?.ShutdownEvictionBeforeHibernation, value => localContentServerConfiguration.ShutdownEvictionBeforeHibernation = value);
return localContentServerConfiguration;

Просмотреть файл

@ -22,9 +22,7 @@ namespace Library {
...BuildXLSdk.bclAsyncPackages,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
BuildXLSdk.Factory.createBinary(importFrom("TransientFaultHandling.Core").pkg.contents, r`lib/NET4/Microsoft.Practices.TransientFaultHandling.Core.dll`),
],
allowUnsafeBlocks: true,

Просмотреть файл

@ -22,12 +22,8 @@ namespace Client {
// Needed to implement gRPC service
Common.dll,
Grpc.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...addIf(BuildXLSdk.isFullFramework || qualifier.targetFramework === "netstandard2.0",
importFrom("System.Memory").withQualifier({targetFramework: "netstandard2.0"}).pkg
),
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
...addIfLazy(BuildXLSdk.isFullFramework, () => [
NetFx.System.Xml.dll,
NetFx.System.Xml.Linq.dll,

Просмотреть файл

@ -20,9 +20,7 @@ namespace Common {
// Needed to implement gRPC service
Grpc.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
// Needed for serialization/deserialization.
// TODO: figure out a way to remove this?

Просмотреть файл

@ -13,14 +13,7 @@ namespace Grpc {
],
references: [
importFrom("RuntimeContracts").pkg,
...addIf(BuildXLSdk.isFullFramework,
NetFx.System.IO.dll
),
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
...BuildXLSdk.bclAsyncPackages,
],
internalsVisibleTo: [

Просмотреть файл

@ -24,12 +24,7 @@ namespace Server {
// Needed to implement gRPC service
Common.dll,
Grpc.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...addIf(BuildXLSdk.isFullFramework,
importFrom("System.Memory").withQualifier({targetFramework: "netstandard2.0"}).pkg
),
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
...addIf(BuildXLSdk.isFullFramework,
NetFx.System.Xml.dll,
NetFx.System.Xml.Linq.dll,

Просмотреть файл

@ -259,8 +259,7 @@ namespace NugetPackages {
importFrom("Microsoft.Azure.Amqp").withQualifier(net472PackageQualifer).pkg,
importFrom("System.Threading.Tasks.Dataflow").withQualifier(net472PackageQualifer).pkg,
...BuildXLSdk.withQualifier(net472PackageQualifer).bclAsyncPackages,
importFrom("Grpc.Core").withQualifier(net472PackageQualifer).pkg,
importFrom("Google.Protobuf").withQualifier(net472PackageQualifer).pkg,
...importFrom("BuildXL.Cache.ContentStore").withQualifier(net472PackageQualifer).getGrpcPackages(false),
...importFrom("BuildXL.Cache.ContentStore").withQualifier(net472PackageQualifer).redisPackages,
...importFrom("BuildXL.Cache.ContentStore").withQualifier(net472PackageQualifer).getSerializationPackages(false),
importFrom("Microsoft.VisualStudio.Services.ArtifactServices.Shared").withQualifier(net472PackageQualifer).pkg,

Просмотреть файл

@ -15,9 +15,7 @@ namespace Distribution.Grpc {
NetFx.System.IO.dll
),
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
],
tools: {
csc: {

Просмотреть файл

@ -47,9 +47,7 @@ namespace Engine {
importFrom("BuildXL.Utilities").Storage.dll,
importFrom("BuildXL.Utilities").Script.Constants.dll,
importFrom("BuildXL.FrontEnd").Sdk.dll,
importFrom("Google.Protobuf").pkg,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(true),
importFrom("Newtonsoft.Json").pkg,
],
internalsVisibleTo: [

Просмотреть файл

@ -14,7 +14,7 @@ namespace Xldb {
references: [
importFrom("BuildXL.Utilities").dll,
importFrom("BuildXL.Utilities").KeyValueStore.dll,
importFrom("Google.Protobuf").pkg,
...importFrom("BuildXL.Cache.ContentStore").getProtobufPackages(true),
Xldb.Proto.dll,
],
});

Просмотреть файл

@ -15,11 +15,10 @@ namespace PluginGrpc {
],
references: [
...addIf(BuildXLSdk.isFullFramework,
NetFx.System.IO.dll
NetFx.System.IO.dll,
NetFx.Netstandard.dll
),
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("Google.Protobuf").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(false),
],
tools: {
csc: {

Просмотреть файл

@ -10,9 +10,7 @@ namespace Plugin {
generateLogs: true,
sources: globR(d`.`, "*.cs"),
references: [
importFrom("Google.Protobuf").pkg,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(false),
$.dll,
$.Ipc.dll,
$.PluginGrpc.dll,

Просмотреть файл

@ -12,13 +12,16 @@ using BuildXL.Utilities.Instrumentation.Common;
using Test.BuildXL.TestUtilities.Xunit;
using Xunit;
using ILogger = Grpc.Core.Logging.ILogger;
using System.Diagnostics;
using Test.BuildXL.TestUtilities;
using Xunit.Abstractions;
namespace Test.BuildXL.Plugin
{
/// <summary>
/// Tests for <see cref="PluginManager" />
/// </summary>
public class PluginManagerTests: IAsyncLifetime
public class PluginManagerTests : TemporaryStorageTestBase, IAsyncLifetime
{
private PluginManager m_pluginManager;
private LoggingContext m_loggingContext = new LoggingContext("UnitTest");
@ -55,7 +58,7 @@ namespace Test.BuildXL.Plugin
private readonly ILogger m_logger = new MockLogger();
private readonly int m_port = TcpIpConnectivity.ParsePortNumber(m_pluginPort3);
public PluginManagerTests()
public PluginManagerTests(ITestOutputHelper output) : base(output)
{
m_pluginManager = new PluginManager(m_loggingContext, "empty", new[] { "empty" });
}
@ -299,7 +302,7 @@ namespace Test.BuildXL.Plugin
});
var res = await m_pluginManager.GetOrCreateAsync(args);
Assert.True(res.Succeeded);
XAssert.PossiblySucceeded(res);
Assert.Equal(m_pluginManager.PluginLoadedSuccessfulCount, 1);
Assert.Equal(m_pluginManager.PluginsCount, 1);
Assert.True(m_pluginManager.CanHandleMessage(PluginMessageType.ParseLogMessage));

Просмотреть файл

@ -1,22 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import * as XUnit from "Sdk.Managed.Testing.XUnit";
namespace Plugin {
@@public
export const dll = BuildXLSdk.test({
assemblyName: "Test.BuildXL.Plugin",
appConfig: f`App.Config`,
sources: globR(d`.`, "*.cs"),
// This disables using QTest for this test. For an unknown reason, QTest breaks the test.
testFramework: XUnit.framework,
references: [
importFrom("BuildXL.Utilities").dll,
importFrom("BuildXL.Utilities").Ipc.dll,
importFrom("BuildXL.Utilities").PluginGrpc.dll,
importFrom("BuildXL.Utilities").Plugin.dll,
importFrom("Grpc.Core").pkg,
importFrom("Grpc.Core.Api").pkg,
importFrom("System.Runtime.CompilerServices.Unsafe").withQualifier({ targetFramework: "netstandard2.0" }).pkg,
importFrom("System.Memory").pkg,
importFrom("System.Buffers").pkg,
...importFrom("BuildXL.Cache.ContentStore").getGrpcPackages(false),
],
runtimeContent: [
importFrom("Sdk.Protocols.Grpc").runtimeContent,

Просмотреть файл

@ -285,7 +285,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Google.Protobuf",
"Version": "3.11.2"
"Version": "3.13.0"
}
}
},
@ -294,7 +294,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Google.Protobuf.Tools",
"Version": "3.11.2"
"Version": "3.13.0"
}
}
},
@ -303,7 +303,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Grpc.Core",
"Version": "2.26.0"
"Version": "2.32.0"
}
}
},
@ -312,7 +312,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Grpc.Core.Api",
"Version": "2.26.0"
"Version": "2.32.0"
}
}
},
@ -321,7 +321,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Grpc.Net.Client",
"Version": "2.26.0"
"Version": "2.32.0"
}
}
},
@ -330,7 +330,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Grpc.Net.Common",
"Version": "2.26.0"
"Version": "2.32.0"
}
}
},
@ -339,7 +339,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "Grpc.Tools",
"Version": "2.26.0"
"Version": "2.32.0"
}
}
},

Просмотреть файл

@ -1,8 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
const protoVersion = "3.11.2";
const grpcVersion = "2.26.0";
const protoVersion = "3.13.0";
const grpcVersion = "2.32.0";
export const pkgs = [
// grpc