diff --git a/Public/Src/Cache/DistributedCache.Host/Service/Internal/MultiplexedReadOnlyContentSession.cs b/Public/Src/Cache/DistributedCache.Host/Service/Internal/MultiplexedReadOnlyContentSession.cs index f10fd4d0d..bf03692d4 100644 --- a/Public/Src/Cache/DistributedCache.Host/Service/Internal/MultiplexedReadOnlyContentSession.cs +++ b/Public/Src/Cache/DistributedCache.Host/Service/Internal/MultiplexedReadOnlyContentSession.cs @@ -4,8 +4,8 @@ using System; using System.Collections.Generic; using System.Diagnostics.ContractsLight; -using System.IO; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using BuildXL.Cache.ContentStore.Distributed.Utilities; @@ -120,6 +120,7 @@ namespace BuildXL.Cache.Host.Service.Internal UrgencyHint urgencyHint = UrgencyHint.Nominal) { return PerformAggregateSessionOperationAsync( + context, session => session.PinAsync(context, contentHash, cts, urgencyHint), (r1, r2) => r1.Succeeded ? r1 : r2, shouldBreak: r => r.Succeeded); @@ -133,6 +134,7 @@ namespace BuildXL.Cache.Host.Service.Internal UrgencyHint urgencyHint = UrgencyHint.Nominal) { return PerformAggregateSessionOperationAsync( + context, session => session.OpenStreamAsync(context, contentHash, cts, urgencyHint), (r1, r2) => r1.Succeeded ? r1 : r2, shouldBreak: r => r.Succeeded); @@ -149,11 +151,44 @@ namespace BuildXL.Cache.Host.Service.Internal CancellationToken cts, UrgencyHint urgencyHint = UrgencyHint.Nominal) { + IContentSession hardlinkSession = null; + if (realizationMode == FileRealizationMode.HardLink) + { + var drive = path.GetPathRoot(); + if (SessionsByCacheRoot.TryGetValue(drive, out var session) && session is IContentSession writeableSession) + { + hardlinkSession = writeableSession; + } + else + { + return Task.FromResult(new PlaceFileResult("Requested hardlink but there is no session on the same drive as destination path.")); + } + } + return PerformAggregateSessionOperationAsync( - session => session.PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, cts, urgencyHint), + context, + executeAsync: placeFileCore, (r1, r2) => r1.Succeeded ? r1 : r2, shouldBreak: r => r.Succeeded, pathHint: path); + + async Task placeFileCore(IReadOnlyContentSession session) + { + // If we exclusively want a hardlink, we should make sure that we can copy from other drives to satisfy the request. + if (realizationMode != FileRealizationMode.HardLink || session == hardlinkSession) + { + return await session.PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, cts, urgencyHint); + } + + // See if session has the content. + var streamResult = await session.OpenStreamAsync(context, contentHash, cts, urgencyHint).ThrowIfFailure(); + + // Put it into correct store + var putResult = await hardlinkSession.PutStreamAsync(context, contentHash, streamResult.Stream, cts, urgencyHint).ThrowIfFailure(); + + // Try the hardlink on the correct drive. + return await hardlinkSession.PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, cts, urgencyHint); + } } /// @@ -215,7 +250,7 @@ namespace BuildXL.Cache.Host.Service.Internal /// public IEnumerable EnumeratePinnedContentHashes() { - return PerformAggregateSessionOperationAsync>>( + return PerformAggregateSessionOperationCoreAsync>>( session => { var hashes = session.EnumeratePinnedContentHashes(); @@ -229,6 +264,7 @@ namespace BuildXL.Cache.Host.Service.Internal public Task PinBulkAsync(Context context, IEnumerable contentHashes) { return PerformAggregateSessionOperationAsync( + context, async session => { await session.PinBulkAsync(context, contentHashes); @@ -242,6 +278,7 @@ namespace BuildXL.Cache.Host.Service.Internal public Task ShutdownEvictionAsync(Context context) { return PerformAggregateSessionOperationAsync( + context, session => session.ShutdownEvictionAsync(context), (r1, r2) => r1 & r2, shouldBreak: r => false); @@ -299,12 +336,30 @@ namespace BuildXL.Cache.Host.Service.Internal return result ?? new ErrorResult($"Could not find a content session which implements {typeof(TSession).Name} in {nameof(MultiplexedContentSession)}.").AsResult(); } - private async Task PerformAggregateSessionOperationAsync( + private Task PerformAggregateSessionOperationAsync( + Context context, + Func> executeAsync, + Func aggregate, + Func shouldBreak, + AbsolutePath pathHint = null, + [CallerMemberName] string caller = null) + where TResult : ResultBase + { + var operationContext = context is null ? new OperationContext() : new OperationContext(context); + return operationContext.PerformOperationAsync( + Tracer, + () => PerformAggregateSessionOperationCoreAsync(executeAsync, aggregate, shouldBreak, pathHint), + traceOperationStarted: false, + traceOperationFinished: false, + caller: caller); + } + + private async Task PerformAggregateSessionOperationCoreAsync( Func> executeAsync, Func aggregate, Func shouldBreak, AbsolutePath pathHint = null) - where TResult : class + where TResult : ResultBase { TResult result = null; @@ -312,7 +367,15 @@ namespace BuildXL.Cache.Host.Service.Internal foreach (var session in GetSessionsInOrder(pathHint)) { var priorResult = result; - result = await executeAsync(session); + + try + { + result = await executeAsync(session); + } + catch (Exception e) + { + result = new ErrorResult(e).AsResult(); + } // Aggregate with previous result if (priorResult != null) diff --git a/Public/Src/Cache/DistributedCache.Host/Test/BuildXL.Cache.Host.Test.dsc b/Public/Src/Cache/DistributedCache.Host/Test/BuildXL.Cache.Host.Test.dsc index ad1208ee4..81c108a1e 100644 --- a/Public/Src/Cache/DistributedCache.Host/Test/BuildXL.Cache.Host.Test.dsc +++ b/Public/Src/Cache/DistributedCache.Host/Test/BuildXL.Cache.Host.Test.dsc @@ -16,6 +16,7 @@ namespace Test { ...importFrom("BuildXL.Cache.ContentStore").getSerializationPackages(true), Configuration.dll, Service.dll, + importFrom("BuildXL.Cache.ContentStore").Hashing.dll, importFrom("BuildXL.Cache.ContentStore").Interfaces.dll, importFrom("BuildXL.Cache.ContentStore").Distributed.dll, importFrom("BuildXL.Cache.ContentStore").Interfaces.dll, diff --git a/Public/Src/Cache/DistributedCache.Host/Test/MultiplexedSessionTests.cs b/Public/Src/Cache/DistributedCache.Host/Test/MultiplexedSessionTests.cs new file mode 100644 index 000000000..cd4126ee7 --- /dev/null +++ b/Public/Src/Cache/DistributedCache.Host/Test/MultiplexedSessionTests.cs @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BuildXL.Cache.ContentStore.FileSystem; +using BuildXL.Cache.ContentStore.Interfaces.FileSystem; +using BuildXL.Cache.ContentStore.Interfaces.Results; +using BuildXL.Cache.ContentStore.Interfaces.Sessions; +using BuildXL.Cache.ContentStore.Interfaces.Stores; +using BuildXL.Cache.ContentStore.Interfaces.Tracing; +using BuildXL.Cache.ContentStore.InterfacesTest.FileSystem; +using BuildXL.Cache.ContentStore.InterfacesTest.Results; +using BuildXL.Cache.ContentStore.InterfacesTest.Time; +using BuildXL.Cache.ContentStore.Stores; +using BuildXL.Cache.Host.Service.Internal; +using ContentStoreTest.Test; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace BuildXL.Cache.Host.Test +{ + public class MultiplexedSessionTests : TestBase + { + public MultiplexedSessionTests(ITestOutputHelper output) + : base(() => new PassThroughFileSystem(TestGlobal.Logger), TestGlobal.Logger, output) + { + } + + [Fact] + public async Task HardlinkTestAsync() + { + var clock = new MemoryClock(); + + var configuration = new ContentStoreConfiguration(); + var configurationModel = new ConfigurationModel(inProcessConfiguration: configuration, ConfigurationSelection.RequireAndUseInProcessConfiguration); + + var root1 = TestRootDirectoryPath / "Store1"; + var store1 = new FileSystemContentStore(FileSystem, clock, root1, configurationModel); + + var fakeDrive = new AbsolutePath(@"X:\"); + var root2 = fakeDrive / "Store2"; + var redirectedFileSystem = new RedirectionFileSystem(FileSystem, fakeDrive, TestRootDirectoryPath); + var store2 = new FileSystemContentStore(redirectedFileSystem, clock, fakeDrive, configurationModel); + + var stores = new Dictionary + { + { root1.GetPathRoot(), store1 }, + { root2.GetPathRoot(), store2 }, + }; + + var multiplexed = new MultiplexedContentStore(stores, preferredCacheDrive: root1.GetPathRoot(), tryAllSessions: true); + + var context = new Context(Logger); + + await multiplexed.StartupAsync(context).ShouldBeSuccess(); + + var sessionResult = multiplexed.CreateSession(context, "Default", ImplicitPin.None).ShouldBeSuccess(); + var session = sessionResult.Session; + + // Put random content which should go to preferred drive + var putResult = await session.PutRandomAsync(context, ContentStore.Hashing.HashType.MD5, provideHash: true, size: 1024, CancellationToken.None) + .ShouldBeSuccess(); + + // Should be able to place it with hardlink in primary drive + var destination1 = TestRootDirectoryPath / "destination1.txt"; + var placeResult1 = await session.PlaceFileAsync( + context, + putResult.ContentHash, + destination1, + FileAccessMode.ReadOnly, + FileReplacementMode.FailIfExists, + FileRealizationMode.HardLink, + CancellationToken.None) + .ShouldBeSuccess(); + placeResult1.Code.Should().Be(PlaceFileResult.ResultCode.PlacedWithHardLink); + + // Should be able to place it with hardlink in secondary drive. + // The cache should copy the contents internally, and then place from the correct drive. + var destination2 = fakeDrive / "destination2.txt"; + var placeResult2 = await session.PlaceFileAsync( + context, + putResult.ContentHash, + destination2, + FileAccessMode.ReadOnly, + FileReplacementMode.FailIfExists, + FileRealizationMode.HardLink, + CancellationToken.None) + .ShouldBeSuccess(); + placeResult2.Code.Should().Be(PlaceFileResult.ResultCode.PlacedWithHardLink); + } + } +}