Merged PR 586754: Allow MultiplexedStore to copy files between drives when PlaceFile with Hardlink-only is specified

Currently, we will fail if the destination file is not in the same drive as the one where we actually have the content.

Example:
1. Cache has ContentA on drive X.
2. `PlaceFile` with Hardlink and destination path=[D:\...]
3. FileSystem[D] fails with ContentNotFound
4. FileSystem[X] fails with Hardlink failed because they are in different drives.
5. Distributed store decides to do a `RemoteCopy` (?!) to put the content on drive D.
6. Reattempts the place file, at which point we create a hardlink from the cache in drive D

This enables us to copy from cache in `X:\` to the cache in `D:\` so that it can perform a hardlink.

I realize that this does not cover the bulk implementation of `PlaceFileAsync`, but this is only happening with QuickBuild and they don't use the bulk version, so I don't think it's worthwhile to do this change for the bulk version, since it would be significantly more difficult to do so and we won't see any benefits.

Testing: added a test that previously failed and now passes.

Related work items: #1795932
This commit is contained in:
Juan Carlos Guzman Islas 2020-12-01 00:00:51 +00:00
Родитель 4a0d0ce771
Коммит d7a5ab55fe
3 изменённых файлов: 165 добавлений и 6 удалений

Просмотреть файл

@ -4,8 +4,8 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.ContractsLight; using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq; using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Utilities; using BuildXL.Cache.ContentStore.Distributed.Utilities;
@ -120,6 +120,7 @@ namespace BuildXL.Cache.Host.Service.Internal
UrgencyHint urgencyHint = UrgencyHint.Nominal) UrgencyHint urgencyHint = UrgencyHint.Nominal)
{ {
return PerformAggregateSessionOperationAsync<IReadOnlyContentSession, PinResult>( return PerformAggregateSessionOperationAsync<IReadOnlyContentSession, PinResult>(
context,
session => session.PinAsync(context, contentHash, cts, urgencyHint), session => session.PinAsync(context, contentHash, cts, urgencyHint),
(r1, r2) => r1.Succeeded ? r1 : r2, (r1, r2) => r1.Succeeded ? r1 : r2,
shouldBreak: r => r.Succeeded); shouldBreak: r => r.Succeeded);
@ -133,6 +134,7 @@ namespace BuildXL.Cache.Host.Service.Internal
UrgencyHint urgencyHint = UrgencyHint.Nominal) UrgencyHint urgencyHint = UrgencyHint.Nominal)
{ {
return PerformAggregateSessionOperationAsync<IReadOnlyContentSession, OpenStreamResult>( return PerformAggregateSessionOperationAsync<IReadOnlyContentSession, OpenStreamResult>(
context,
session => session.OpenStreamAsync(context, contentHash, cts, urgencyHint), session => session.OpenStreamAsync(context, contentHash, cts, urgencyHint),
(r1, r2) => r1.Succeeded ? r1 : r2, (r1, r2) => r1.Succeeded ? r1 : r2,
shouldBreak: r => r.Succeeded); shouldBreak: r => r.Succeeded);
@ -149,11 +151,44 @@ namespace BuildXL.Cache.Host.Service.Internal
CancellationToken cts, CancellationToken cts,
UrgencyHint urgencyHint = UrgencyHint.Nominal) UrgencyHint urgencyHint = UrgencyHint.Nominal)
{ {
IContentSession hardlinkSession = null;
if (realizationMode == FileRealizationMode.HardLink)
{
var drive = path.GetPathRoot();
if (SessionsByCacheRoot.TryGetValue(drive, out var session) && session is IContentSession writeableSession)
{
hardlinkSession = writeableSession;
}
else
{
return Task.FromResult(new PlaceFileResult("Requested hardlink but there is no session on the same drive as destination path."));
}
}
return PerformAggregateSessionOperationAsync<IReadOnlyContentSession, PlaceFileResult>( return PerformAggregateSessionOperationAsync<IReadOnlyContentSession, PlaceFileResult>(
session => session.PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, cts, urgencyHint), context,
executeAsync: placeFileCore,
(r1, r2) => r1.Succeeded ? r1 : r2, (r1, r2) => r1.Succeeded ? r1 : r2,
shouldBreak: r => r.Succeeded, shouldBreak: r => r.Succeeded,
pathHint: path); pathHint: path);
async Task<PlaceFileResult> placeFileCore(IReadOnlyContentSession session)
{
// If we exclusively want a hardlink, we should make sure that we can copy from other drives to satisfy the request.
if (realizationMode != FileRealizationMode.HardLink || session == hardlinkSession)
{
return await session.PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, cts, urgencyHint);
}
// See if session has the content.
var streamResult = await session.OpenStreamAsync(context, contentHash, cts, urgencyHint).ThrowIfFailure();
// Put it into correct store
var putResult = await hardlinkSession.PutStreamAsync(context, contentHash, streamResult.Stream, cts, urgencyHint).ThrowIfFailure();
// Try the hardlink on the correct drive.
return await hardlinkSession.PlaceFileAsync(context, contentHash, path, accessMode, replacementMode, realizationMode, cts, urgencyHint);
}
} }
/// <inheritdoc /> /// <inheritdoc />
@ -215,7 +250,7 @@ namespace BuildXL.Cache.Host.Service.Internal
/// <inheritdoc /> /// <inheritdoc />
public IEnumerable<ContentHash> EnumeratePinnedContentHashes() public IEnumerable<ContentHash> EnumeratePinnedContentHashes()
{ {
return PerformAggregateSessionOperationAsync<IHibernateContentSession, Result<IEnumerable<ContentHash>>>( return PerformAggregateSessionOperationCoreAsync<IHibernateContentSession, Result<IEnumerable<ContentHash>>>(
session => session =>
{ {
var hashes = session.EnumeratePinnedContentHashes(); var hashes = session.EnumeratePinnedContentHashes();
@ -229,6 +264,7 @@ namespace BuildXL.Cache.Host.Service.Internal
public Task PinBulkAsync(Context context, IEnumerable<ContentHash> contentHashes) public Task PinBulkAsync(Context context, IEnumerable<ContentHash> contentHashes)
{ {
return PerformAggregateSessionOperationAsync<IHibernateContentSession, BoolResult>( return PerformAggregateSessionOperationAsync<IHibernateContentSession, BoolResult>(
context,
async session => async session =>
{ {
await session.PinBulkAsync(context, contentHashes); await session.PinBulkAsync(context, contentHashes);
@ -242,6 +278,7 @@ namespace BuildXL.Cache.Host.Service.Internal
public Task<BoolResult> ShutdownEvictionAsync(Context context) public Task<BoolResult> ShutdownEvictionAsync(Context context)
{ {
return PerformAggregateSessionOperationAsync<IHibernateContentSession, BoolResult>( return PerformAggregateSessionOperationAsync<IHibernateContentSession, BoolResult>(
context,
session => session.ShutdownEvictionAsync(context), session => session.ShutdownEvictionAsync(context),
(r1, r2) => r1 & r2, (r1, r2) => r1 & r2,
shouldBreak: r => false); shouldBreak: r => false);
@ -299,12 +336,30 @@ namespace BuildXL.Cache.Host.Service.Internal
return result ?? new ErrorResult($"Could not find a content session which implements {typeof(TSession).Name} in {nameof(MultiplexedContentSession)}.").AsResult<TResult>(); return result ?? new ErrorResult($"Could not find a content session which implements {typeof(TSession).Name} in {nameof(MultiplexedContentSession)}.").AsResult<TResult>();
} }
private async Task<TResult> PerformAggregateSessionOperationAsync<TSession, TResult>( private Task<TResult> PerformAggregateSessionOperationAsync<TSession, TResult>(
Context context,
Func<TSession, Task<TResult>> executeAsync,
Func<TResult, TResult, TResult> aggregate,
Func<TResult, bool> shouldBreak,
AbsolutePath pathHint = null,
[CallerMemberName] string caller = null)
where TResult : ResultBase
{
var operationContext = context is null ? new OperationContext() : new OperationContext(context);
return operationContext.PerformOperationAsync(
Tracer,
() => PerformAggregateSessionOperationCoreAsync(executeAsync, aggregate, shouldBreak, pathHint),
traceOperationStarted: false,
traceOperationFinished: false,
caller: caller);
}
private async Task<TResult> PerformAggregateSessionOperationCoreAsync<TSession, TResult>(
Func<TSession, Task<TResult>> executeAsync, Func<TSession, Task<TResult>> executeAsync,
Func<TResult, TResult, TResult> aggregate, Func<TResult, TResult, TResult> aggregate,
Func<TResult, bool> shouldBreak, Func<TResult, bool> shouldBreak,
AbsolutePath pathHint = null) AbsolutePath pathHint = null)
where TResult : class where TResult : ResultBase
{ {
TResult result = null; TResult result = null;
@ -312,7 +367,15 @@ namespace BuildXL.Cache.Host.Service.Internal
foreach (var session in GetSessionsInOrder<TSession>(pathHint)) foreach (var session in GetSessionsInOrder<TSession>(pathHint))
{ {
var priorResult = result; var priorResult = result;
result = await executeAsync(session);
try
{
result = await executeAsync(session);
}
catch (Exception e)
{
result = new ErrorResult(e).AsResult<TResult>();
}
// Aggregate with previous result // Aggregate with previous result
if (priorResult != null) if (priorResult != null)

Просмотреть файл

@ -16,6 +16,7 @@ namespace Test {
...importFrom("BuildXL.Cache.ContentStore").getSerializationPackages(true), ...importFrom("BuildXL.Cache.ContentStore").getSerializationPackages(true),
Configuration.dll, Configuration.dll,
Service.dll, Service.dll,
importFrom("BuildXL.Cache.ContentStore").Hashing.dll,
importFrom("BuildXL.Cache.ContentStore").Interfaces.dll, importFrom("BuildXL.Cache.ContentStore").Interfaces.dll,
importFrom("BuildXL.Cache.ContentStore").Distributed.dll, importFrom("BuildXL.Cache.ContentStore").Distributed.dll,
importFrom("BuildXL.Cache.ContentStore").Interfaces.dll, importFrom("BuildXL.Cache.ContentStore").Interfaces.dll,

Просмотреть файл

@ -0,0 +1,95 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.InterfacesTest.FileSystem;
using BuildXL.Cache.ContentStore.InterfacesTest.Results;
using BuildXL.Cache.ContentStore.InterfacesTest.Time;
using BuildXL.Cache.ContentStore.Stores;
using BuildXL.Cache.Host.Service.Internal;
using ContentStoreTest.Test;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
namespace BuildXL.Cache.Host.Test
{
public class MultiplexedSessionTests : TestBase
{
public MultiplexedSessionTests(ITestOutputHelper output)
: base(() => new PassThroughFileSystem(TestGlobal.Logger), TestGlobal.Logger, output)
{
}
[Fact]
public async Task HardlinkTestAsync()
{
var clock = new MemoryClock();
var configuration = new ContentStoreConfiguration();
var configurationModel = new ConfigurationModel(inProcessConfiguration: configuration, ConfigurationSelection.RequireAndUseInProcessConfiguration);
var root1 = TestRootDirectoryPath / "Store1";
var store1 = new FileSystemContentStore(FileSystem, clock, root1, configurationModel);
var fakeDrive = new AbsolutePath(@"X:\");
var root2 = fakeDrive / "Store2";
var redirectedFileSystem = new RedirectionFileSystem(FileSystem, fakeDrive, TestRootDirectoryPath);
var store2 = new FileSystemContentStore(redirectedFileSystem, clock, fakeDrive, configurationModel);
var stores = new Dictionary<string, IContentStore>
{
{ root1.GetPathRoot(), store1 },
{ root2.GetPathRoot(), store2 },
};
var multiplexed = new MultiplexedContentStore(stores, preferredCacheDrive: root1.GetPathRoot(), tryAllSessions: true);
var context = new Context(Logger);
await multiplexed.StartupAsync(context).ShouldBeSuccess();
var sessionResult = multiplexed.CreateSession(context, "Default", ImplicitPin.None).ShouldBeSuccess();
var session = sessionResult.Session;
// Put random content which should go to preferred drive
var putResult = await session.PutRandomAsync(context, ContentStore.Hashing.HashType.MD5, provideHash: true, size: 1024, CancellationToken.None)
.ShouldBeSuccess();
// Should be able to place it with hardlink in primary drive
var destination1 = TestRootDirectoryPath / "destination1.txt";
var placeResult1 = await session.PlaceFileAsync(
context,
putResult.ContentHash,
destination1,
FileAccessMode.ReadOnly,
FileReplacementMode.FailIfExists,
FileRealizationMode.HardLink,
CancellationToken.None)
.ShouldBeSuccess();
placeResult1.Code.Should().Be(PlaceFileResult.ResultCode.PlacedWithHardLink);
// Should be able to place it with hardlink in secondary drive.
// The cache should copy the contents internally, and then place from the correct drive.
var destination2 = fakeDrive / "destination2.txt";
var placeResult2 = await session.PlaceFileAsync(
context,
putResult.ContentHash,
destination2,
FileAccessMode.ReadOnly,
FileReplacementMode.FailIfExists,
FileRealizationMode.HardLink,
CancellationToken.None)
.ShouldBeSuccess();
placeResult2.Code.Should().Be(PlaceFileResult.ResultCode.PlacedWithHardLink);
}
}
}