diff --git a/Public/Src/Cache/ContentStore/Distributed/Blob/AzureBlobStorageContentStore.cs b/Public/Src/Cache/ContentStore/Distributed/Blob/AzureBlobStorageContentStore.cs
index 37dbacfbe..2d52fd2cd 100644
--- a/Public/Src/Cache/ContentStore/Distributed/Blob/AzureBlobStorageContentStore.cs
+++ b/Public/Src/Cache/ContentStore/Distributed/Blob/AzureBlobStorageContentStore.cs
@@ -107,20 +107,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.Blobs
timeout: _configuration.StorageInteractionTimeout);
}
- ///
- public CreateSessionResult CreateReadOnlySession(Context context, string name, ImplicitPin implicitPin)
- {
- using var guard = TrackShutdown(context, default);
- var operationContext = guard.Context;
-
- return operationContext.PerformOperation(Tracer, () =>
- {
- return new CreateSessionResult(CreateSessionCore(name, implicitPin));
- },
- traceOperationStarted: false,
- messageFactory: _ => $"Name=[{name}] ImplicitPin=[{implicitPin}]");
- }
-
///
public CreateSessionResult CreateSession(Context context, string name, ImplicitPin implicitPin)
{
diff --git a/Public/Src/Cache/ContentStore/Distributed/Sessions/DistributedContentSession.cs b/Public/Src/Cache/ContentStore/Distributed/Sessions/DistributedContentSession.cs
index 00ef4d39c..64a04a87d 100644
--- a/Public/Src/Cache/ContentStore/Distributed/Sessions/DistributedContentSession.cs
+++ b/Public/Src/Cache/ContentStore/Distributed/Sessions/DistributedContentSession.cs
@@ -2,9 +2,16 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Diagnostics.ContractsLight;
using System.IO;
+using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Stores;
+using BuildXL.Cache.ContentStore.Distributed.Utilities;
+using BuildXL.Cache.ContentStore.Extensions;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Distributed;
using BuildXL.Cache.ContentStore.Interfaces.Extensions;
@@ -12,41 +19,1743 @@ using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Logging;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
+using BuildXL.Cache.ContentStore.Interfaces.Stores;
+using BuildXL.Cache.ContentStore.Interfaces.Time;
+using BuildXL.Cache.ContentStore.Interfaces.Tracing;
+using BuildXL.Cache.ContentStore.Interfaces.Utils;
+using BuildXL.Cache.ContentStore.Service.Grpc;
+using BuildXL.Cache.ContentStore.Sessions;
using BuildXL.Cache.ContentStore.Sessions.Internal;
-using BuildXL.Cache.ContentStore.Stores;
+using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.UtilitiesCore;
+using BuildXL.Cache.ContentStore.Utils;
+using BuildXL.Utilities.Collections;
+using BuildXL.Utilities.Core.Tasks;
+using BuildXL.Utilities.ParallelAlgorithms;
using BuildXL.Utilities.Tracing;
+using ContentStore.Grpc;
+using ResultsExtensions = BuildXL.Cache.ContentStore.Interfaces.Results.ResultsExtensions;
namespace BuildXL.Cache.ContentStore.Distributed.Sessions
{
- ///
- /// A content location based content session with an inner content session for storage.
- ///
- public class DistributedContentSession : ReadOnlyDistributedContentSession, IContentSession
+ public class DistributedContentSession : ContentSessionBase, IContentSession, IHibernateContentSession, IConfigurablePin
{
+ internal enum Counters
+ {
+ GetLocationsSatisfiedFromLocal,
+ GetLocationsSatisfiedFromRemote,
+ PinUnverifiedCountSatisfied,
+ StartCopyForPinWhenUnverifiedCountSatisfied,
+ ProactiveCopiesSkipped,
+ ProactiveCopy_OutsideRingFromPreferredLocations,
+ ProactiveCopy_OutsideRingCopies,
+ ProactiveCopyRetries,
+ ProactiveCopyInsideRingRetries,
+ ProactiveCopyOutsideRingRetries,
+ ProactiveCopy_InsideRingCopies,
+ ProactiveCopy_InsideRingFullyReplicated,
+ }
+
+ internal CounterCollection SessionCounters { get; } = new CounterCollection();
+
+ private string _buildId = null;
+ private ContentHash? _buildIdHash = null;
+ private readonly ExpiringValue _buildRingMachinesCache;
+
+ private MachineLocation[] BuildRingMachines => _buildRingMachinesCache.GetValueOrDefault() ?? Array.Empty();
+
+ private readonly ConcurrentBigSet _pendingProactivePuts = new ConcurrentBigSet();
+ private ResultNagleQueue _proactiveCopyGetBulkNagleQueue;
+
+ // The method used for remote pins depends on which pin configuration is enabled.
+ private readonly RemotePinAsync _remotePinner;
+
+ ///
+ /// The store that persists content locations to a persistent store.
+ ///
+ internal readonly IContentLocationStore ContentLocationStore;
+
+ private readonly ColdStorage _coldStorage;
+
+ ///
+ /// The machine location for the current cache.
+ ///
+ protected readonly MachineLocation LocalCacheRootMachineLocation;
+
+ ///
+ /// The content session that actually stores content.
+ ///
+ public IContentSession Inner { get; }
+
+ ///
+ protected override Tracer Tracer { get; } = new Tracer(nameof(DistributedContentSession));
+
+ ///
+ protected readonly DistributedContentCopier DistributedCopier;
+
+ ///
+ /// Settings for the session.
+ ///
+ protected readonly DistributedContentStoreSettings Settings;
+
+ ///
+ /// Trace only stops and errors to reduce the Kusto traffic.
+ ///
+ protected override bool TraceOperationStarted => false;
+
+ ///
+ /// Semaphore that limits the maximum number of concurrent put and place operations
+ ///
+ protected readonly SemaphoreSlim PutAndPlaceFileGate;
+
+ private readonly DistributedContentStore _contentStore;
+
///
public DistributedContentSession(
string name,
IContentSession inner,
IContentLocationStore contentLocationStore,
DistributedContentCopier contentCopier,
- DistributedContentStore distributedStore,
+ DistributedContentStore distributedContentStore,
MachineLocation localMachineLocation,
ColdStorage coldStorage,
DistributedContentStoreSettings settings = default)
- : base(
- name,
- inner,
- contentLocationStore,
- contentCopier,
- distributedStore,
- localMachineLocation,
- coldStorage,
- settings)
+ : base(name)
{
+ Contract.Requires(name != null);
+ Contract.Requires(inner != null);
+ Contract.Requires(contentLocationStore != null);
+ Contract.Requires(contentLocationStore is StartupShutdownSlimBase, "The store must derive from StartupShutdownSlimBase");
+ Contract.Requires(localMachineLocation.IsValid);
+
+ Inner = inner;
+ ContentLocationStore = contentLocationStore;
+ LocalCacheRootMachineLocation = localMachineLocation;
+ Settings = settings ?? DistributedContentStoreSettings.DefaultSettings;
+ _contentStore = distributedContentStore;
+ _remotePinner = PinFromMultiLevelContentLocationStore;
+ DistributedCopier = contentCopier;
+ PutAndPlaceFileGate = new SemaphoreSlim(Settings.MaximumConcurrentPutAndPlaceFileOperations);
+
+ _coldStorage = coldStorage;
+
+ _buildRingMachinesCache = new ExpiringValue(
+ Settings.ProactiveCopyInRingMachineLocationsExpiryCache,
+ SystemClock.Instance,
+ originalValue: Array.Empty());
}
+ ///
+ protected override async Task StartupCoreAsync(OperationContext context)
+ {
+ var canHibernate = Inner is IHibernateContentSession ? "can" : "cannot";
+ Tracer.Debug(context, $"Session {Name} {canHibernate} hibernate");
+ await Inner.StartupAsync(context).ThrowIfFailure();
+
+ _proactiveCopyGetBulkNagleQueue = new ResultNagleQueue(
+ execute: hashes => GetLocationsForProactiveCopyAsync(context.CreateNested(Tracer.Name), hashes),
+ maxDegreeOfParallelism: 1,
+ interval: Settings.ProactiveCopyGetBulkInterval,
+ batchSize: Settings.ProactiveCopyGetBulkBatchSize);
+ _proactiveCopyGetBulkNagleQueue.Start();
+
+ TryRegisterMachineWithBuildId(context);
+
+ return BoolResult.Success;
+ }
+
+ private void TryRegisterMachineWithBuildId(OperationContext context)
+ {
+ if (Constants.TryExtractBuildId(Name, out _buildId) && Guid.TryParse(_buildId, out var buildIdGuid))
+ {
+ // Generate a fake hash for the build and register a content entry in the location store to represent
+ // machines in the build ring
+ var buildIdContent = buildIdGuid.ToByteArray();
+
+ var buildIdHash = GetBuildIdHash(buildIdContent);
+
+ var arguments = $"Build={_buildId}, BuildIdHash={buildIdHash.ToShortString()}";
+
+ context.PerformOperationAsync(
+ Tracer,
+ async () =>
+ {
+ // Storing the build id in the cache to prevent this data to be removed during reconciliation.
+ using var stream = new MemoryStream(buildIdContent);
+ var result = await Inner.PutStreamAsync(context, buildIdHash, stream, CancellationToken.None, UrgencyHint.Nominal);
+ result.ThrowIfFailure();
+
+ // Even if 'StoreBuildIdInCache' is true we need to register the content manually,
+ // because Inner.PutStreamAsync will just add the content to the cache but won't send a registration message.
+ await ContentLocationStore.RegisterLocalLocationAsync(
+ context,
+ new[] { new ContentHashWithSize(buildIdHash, buildIdContent.Length) },
+ context.Token,
+ UrgencyHint.Nominal).ThrowIfFailure();
+
+ // Updating the build id field only if the registration or the put operation succeeded.
+ _buildIdHash = buildIdHash;
+
+ return BoolResult.Success;
+ },
+ extraStartMessage: arguments,
+ extraEndMessage: r => arguments).FireAndForget(context);
+ }
+ }
+
+ private static ContentHash GetBuildIdHash(byte[] buildId) => buildId.CalculateHash(HashType.MD5);
+
+ ///
+ protected override async Task ShutdownCoreAsync(OperationContext context)
+ {
+ var counterSet = new CounterSet();
+ counterSet.Merge(GetCounters(), $"{Tracer.Name}.");
+
+ // Unregister from build machine location set
+ if (_buildIdHash.HasValue)
+ {
+ Guid.TryParse(_buildId, out var buildIdGuid);
+ var buildIdHash = GetBuildIdHash(buildIdGuid.ToByteArray());
+ Tracer.Debug(context, $"Deleting in-ring mapping from cache: Build={_buildId}, BuildIdHash={buildIdHash.ToShortString()}");
+
+ // DeleteAsync will unregister the content as well. No need for calling 'TrimBulkAsync'.
+ await TaskUtilities.IgnoreErrorsAndReturnCompletion(
+ _contentStore.DeleteAsync(context, _buildIdHash.Value, new DeleteContentOptions() { DeleteLocalOnly = true }));
+ }
+
+ await Inner.ShutdownAsync(context).ThrowIfFailure();
+
+ _proactiveCopyGetBulkNagleQueue?.Dispose();
+ Tracer.TraceStatisticsAtShutdown(context, counterSet, prefix: "DistributedContentSessionStats");
+
+ return BoolResult.Success;
+ }
+
+ ///
+ protected override void DisposeCore()
+ {
+ base.DisposeCore();
+
+ Inner.Dispose();
+ }
+
+ ///
+ protected override async Task PinCoreAsync(
+ OperationContext operationContext,
+ ContentHash contentHash,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ // Call bulk API
+ var result = await PinHelperAsync(operationContext, new[] { contentHash }, urgencyHint, PinOperationConfiguration.Default());
+ return (await result.First()).Item;
+ }
+
+ ///
+ protected override async Task OpenStreamCoreAsync(
+ OperationContext operationContext,
+ ContentHash contentHash,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ OpenStreamResult streamResult =
+ await Inner.OpenStreamAsync(operationContext, contentHash, operationContext.Token, urgencyHint);
+ if (streamResult.Code == OpenStreamResult.ResultCode.Success)
+ {
+ return streamResult;
+ }
+
+ var contentRegistrationUrgencyHint = urgencyHint;
+
+ long? size = null;
+ GetBulkLocationsResult localGetBulkResult = null;
+
+ // First try to fetch file based on locally stored locations for the hash
+ // Then fallback to fetching file based on global locations minus the locally stored locations which were already checked
+ foreach (var getBulkTask in ContentLocationStoreExtensions.MultiLevelGetLocations(
+ ContentLocationStore,
+ operationContext,
+ new[] { contentHash },
+ operationContext.Token,
+ urgencyHint,
+ subtractLocalResults: true))
+ {
+ var getBulkResult = await getBulkTask;
+ // There is an issue with GetBulkLocationsResult construction from Exception that may loose the information about the origin.
+ // So we rely on the result order of MultiLevelGetLocations method: the first result is always local and the second one is global.
+
+ GetBulkOrigin origin = localGetBulkResult == null ? GetBulkOrigin.Local : GetBulkOrigin.Global;
+ if (origin == GetBulkOrigin.Local)
+ {
+ localGetBulkResult = getBulkResult;
+ }
+
+ // Local function: Use content locations for GetBulk to copy file locally
+ async Task tryCopyContentLocalAsync()
+ {
+ if (!getBulkResult || !getBulkResult.ContentHashesInfo.Any())
+ {
+ return new BoolResult($"Metadata records for hash {contentHash.ToShortString()} not found in content location store.");
+ }
+
+ // Don't reconsider locally stored results that were checked in prior iteration
+ getBulkResult = getBulkResult.Subtract(localGetBulkResult);
+
+ var hashInfo = getBulkResult.ContentHashesInfo.Single();
+
+ if (!CanCopyContentHash(
+ operationContext,
+ hashInfo,
+ isGlobal: getBulkResult.Origin == GetBulkOrigin.Global,
+ out var useInRingLocations,
+ out var errorMessage))
+ {
+ return new BoolResult(errorMessage);
+ }
+
+ // Using in-ring machines if configured
+ var copyResult = await TryCopyAndPutAsync(
+ operationContext,
+ hashInfo,
+ urgencyHint,
+ CopyReason.OpenStream,
+ trace: false,
+ useInRingLocations);
+ if (!copyResult)
+ {
+ return new BoolResult(copyResult);
+ }
+
+ size = copyResult.ContentSize;
+
+ // If configured, register the content eagerly on put to make sure the content is discoverable by the other builders,
+ // even if the current machine used to have the content and evicted it recently.
+ if (Settings.RegisterEagerlyOnPut && !copyResult.ContentAlreadyExistsInCache)
+ {
+ contentRegistrationUrgencyHint = UrgencyHint.RegisterEagerly;
+ }
+
+ return BoolResult.Success;
+ }
+
+ var copyLocalResult = await tryCopyContentLocalAsync();
+
+ // Throw operation canceled to avoid operations below which are not value for canceled case.
+ operationContext.Token.ThrowIfCancellationRequested();
+
+ if (copyLocalResult.Succeeded)
+ {
+ // Succeeded in copying content locally. No need to try with more content locations
+ break;
+ }
+ else if (origin == GetBulkOrigin.Global)
+ {
+ return new OpenStreamResult(copyLocalResult, OpenStreamResult.ResultCode.ContentNotFound);
+ }
+ }
+
+ Contract.Assert(size != null, "Size should be set if operation succeeded");
+
+ var updateResult = await UpdateContentTrackerWithNewReplicaAsync(
+ operationContext,
+ new[] { new ContentHashWithSize(contentHash, size.Value) },
+ contentRegistrationUrgencyHint);
+ if (!updateResult.Succeeded)
+ {
+ return new OpenStreamResult(updateResult);
+ }
+
+ return await Inner.OpenStreamAsync(operationContext, contentHash, operationContext.Token, urgencyHint);
+ }
+
+ ///
+ Task>>> IConfigurablePin.PinAsync(
+ Context context,
+ IReadOnlyList contentHashes,
+ PinOperationConfiguration pinOperationConfiguration)
+ {
+ // The lifetime of this operation should be detached from the lifetime of the session.
+ // But still tracking the lifetime of the store.
+ return WithStoreCancellationAsync(
+ context,
+ opContext => PinHelperAsync(opContext, contentHashes, pinOperationConfiguration.UrgencyHint, pinOperationConfiguration),
+ pinOperationConfiguration.CancellationToken);
+ }
+
+ ///
+ protected override Task>>> PinCoreAsync(
+ OperationContext operationContext,
+ IReadOnlyList contentHashes,
+ UrgencyHint urgencyHint,
+ Counter retryCounter,
+ Counter fileCounter)
+ {
+ return PinHelperAsync(operationContext, contentHashes, urgencyHint, PinOperationConfiguration.Default());
+ }
+
+ private async Task>>> PinHelperAsync(
+ OperationContext operationContext,
+ IReadOnlyList contentHashes,
+ UrgencyHint urgencyHint,
+ PinOperationConfiguration pinOperationConfiguration)
+ {
+ Contract.Requires(contentHashes != null);
+
+ IEnumerable>> pinResults = null;
+
+ IEnumerable>> intermediateResult = null;
+ if (pinOperationConfiguration.ReturnGlobalExistenceFast)
+ {
+ Tracer.Debug(operationContext.TracingContext, $"Detected {nameof(PinOperationConfiguration.ReturnGlobalExistenceFast)}");
+
+ // Check globally for existence, but do not copy locally and do not update content tracker.
+ pinResults = await Workflows.RunWithFallback(
+ contentHashes,
+ async hashes =>
+ {
+ intermediateResult = await Inner.PinAsync(operationContext, hashes, operationContext.Token, urgencyHint);
+ return intermediateResult;
+ },
+ hashes => _remotePinner(operationContext, hashes, succeedWithOneLocation: true, urgencyHint),
+ result => result.Succeeded);
+
+ // Replace operation context with a new cancellation token so it can outlast this call.
+ // Using a cancellation token from the store avoid the operations lifetime to be greater than the lifetime of the outer store.
+ operationContext = new OperationContext(operationContext.TracingContext, token: StoreShutdownStartedCancellationToken);
+ }
+
+ // Default pin action
+ var pinTask = Workflows.RunWithFallback(
+ contentHashes,
+ hashes => intermediateResult == null
+ ? Inner.PinAsync(operationContext, hashes, operationContext.Token, urgencyHint)
+ : Task.FromResult(intermediateResult),
+ hashes => _remotePinner(operationContext, hashes, succeedWithOneLocation: false, urgencyHint),
+ result => result.Succeeded,
+ // Exclude the empty hash because it is a special case which is hard coded for place/openstream/pin.
+ async hits => await UpdateContentTrackerWithLocalHitsAsync(
+ operationContext,
+ hits.Where(x => !contentHashes[x.Index].IsEmptyHash()).Select(
+ x => new ContentHashWithSizeAndLastAccessTime(contentHashes[x.Index], x.Item.ContentSize, x.Item.LastAccessTime)).ToList(),
+ urgencyHint));
+
+ // Initiate a proactive copy if just pinned content is under-replicated
+ if (Settings.ProactiveCopyOnPin && Settings.ProactiveCopyMode != ProactiveCopyMode.Disabled)
+ {
+ pinTask = ProactiveCopyOnPinAsync(operationContext, contentHashes, pinTask);
+ }
+
+ if (pinOperationConfiguration.ReturnGlobalExistenceFast)
+ {
+ // Fire off the default pin action, but do not await the result.
+ //
+ // Creating a new OperationContext instance without existing 'CancellationToken',
+ // because the operation we triggered that stored in 'pinTask' can outlive the lifetime of the current instance.
+ // And we don't want the PerformNonResultOperationAsync to fail because the current instance is shut down (or disposed).
+ new OperationContext(operationContext.TracingContext).PerformNonResultOperationAsync(
+ Tracer,
+ () => pinTask,
+ extraEndMessage: results =>
+ {
+ var resultString = string.Join(
+ ",",
+ results.Select(
+ async task =>
+ {
+ // Since all bulk operations are constructed with Task.FromResult, it is safe to just access the result;
+ Indexed result = await task;
+ return result != null
+ ? $"{contentHashes[result.Index].ToShortString()}:{result.Item}"
+ : string.Empty;
+ }));
+
+ return $"ConfigurablePin Count={contentHashes.Count}, Hashes=[{resultString}]";
+ },
+ traceErrorsOnly: TraceErrorsOnly,
+ traceOperationStarted: TraceOperationStarted,
+ traceOperationFinished: true,
+ isCritical: false).FireAndForget(operationContext);
+ }
+ else
+ {
+ pinResults = await pinTask;
+ }
+
+ Contract.Assert(pinResults != null);
+ return pinResults;
+ }
+
+ private async Task>>> ProactiveCopyOnPinAsync(
+ OperationContext context,
+ IReadOnlyList contentHashes,
+ Task>>> pinTask)
+ {
+ var results = await pinTask;
+
+ // Since the rest of the operation is done asynchronously, using a cancellation context from the outer store.
+ var proactiveCopyTask = WithStoreCancellationAsync(
+ context,
+ async opContext =>
+ {
+ var proactiveTasks = results.Select(resultTask => proactiveCopyOnSinglePinAsync(opContext, resultTask)).ToList();
+
+ // Ensure all tasks are completed, after awaiting outer task
+ await Task.WhenAll(proactiveTasks);
+
+ return proactiveTasks;
+ });
+
+ if (Settings.InlineOperationsForTests)
+ {
+ return await proactiveCopyTask;
+ }
+ else
+ {
+ proactiveCopyTask.FireAndForget(context);
+ return results;
+ }
+
+ // Proactive copy an individual pin
+ async Task> proactiveCopyOnSinglePinAsync(OperationContext opContext, Task> resultTask)
+ {
+ Indexed indexedPinResult = await resultTask;
+ var pinResult = indexedPinResult.Item;
+
+ // Local pins and distributed pins which are copied locally allow proactive copy
+ if (pinResult.Succeeded && (!(pinResult is DistributedPinResult distributedPinResult) || distributedPinResult.CopyLocally))
+ {
+ var proactiveCopyResult = await ProactiveCopyIfNeededAsync(
+ opContext,
+ contentHashes[indexedPinResult.Index],
+ tryBuildRing: true,
+ CopyReason.ProactiveCopyOnPin);
+
+ // Only fail if all copies failed.
+ if (!proactiveCopyResult.Succeeded)
+ {
+ return new PinResult(proactiveCopyResult).WithIndex(indexedPinResult.Index);
+ }
+ }
+
+ return indexedPinResult;
+ }
+ }
+
+ ///
+ protected override async Task PlaceFileCoreAsync(
+ OperationContext operationContext,
+ ContentHash contentHash,
+ AbsolutePath path,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ var resultWithData = await PerformPlaceFileGatedOperationAsync(
+ operationContext,
+ () => PlaceHelperAsync(
+ operationContext,
+ new[] { new ContentHashWithPath(contentHash, path) },
+ accessMode,
+ replacementMode,
+ realizationMode,
+ urgencyHint
+ ));
+
+ var result = await resultWithData.Result.SingleAwaitIndexed();
+ result.Metadata = resultWithData.Metadata;
+ return result;
+ }
+
+ ///
+ protected override async Task>>> PlaceFileCoreAsync(
+ OperationContext operationContext,
+ IReadOnlyList hashesWithPaths,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ // The fallback is invoked for cache misses only. This preserves existing behavior of
+ // bubbling up errors with Inner store instead of trying remote.
+ var resultWithData = await PerformPlaceFileGatedOperationAsync(
+ operationContext,
+ () => PlaceHelperAsync(
+ operationContext,
+ hashesWithPaths,
+ accessMode,
+ replacementMode,
+ realizationMode,
+ urgencyHint
+ ));
+
+ // We are tracing here because we did not want to change the signature for PlaceFileCoreAsync, which is implemented in multiple locations
+ operationContext.TracingContext.Debug(
+ $"PlaceFileBulk, Gate.OccupiedCount={resultWithData.Metadata.GateOccupiedCount} Gate.Wait={resultWithData.Metadata.GateWaitTime.TotalMilliseconds}ms Hashes.Count={hashesWithPaths.Count}",
+ component: nameof(DistributedContentSession),
+ operation: "PlaceFileBulk");
+
+ return resultWithData.Result;
+ }
+
+ private Task>>> PlaceHelperAsync(
+ OperationContext operationContext,
+ IReadOnlyList hashesWithPaths,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ UrgencyHint urgencyHint)
+ {
+ return Workflows.RunWithFallback(
+ hashesWithPaths,
+ args => Inner.PlaceFileAsync(
+ operationContext,
+ args,
+ accessMode,
+ replacementMode,
+ realizationMode,
+ operationContext.Token,
+ urgencyHint),
+ args => fetchFromMultiLevelContentLocationStoreThenPlaceFileAsync(args),
+ result => IsPlaceFileSuccess(result),
+ async hits => await UpdateContentTrackerWithLocalHitsAsync(
+ operationContext,
+ hits.Select(
+ x => new ContentHashWithSizeAndLastAccessTime(
+ hashesWithPaths[x.Index].Hash,
+ x.Item.FileSize,
+ x.Item.LastAccessTime))
+ .ToList(),
+ urgencyHint));
+
+ Task>>> fetchFromMultiLevelContentLocationStoreThenPlaceFileAsync(
+ IReadOnlyList fetchedContentInfo)
+ {
+ return MultiLevelUtilities.RunMultiLevelAsync(
+ fetchedContentInfo,
+ runFirstLevelAsync: args => FetchFromMultiLevelContentLocationStoreThenPutAsync(
+ operationContext,
+ args,
+ urgencyHint,
+ CopyReason.Place),
+ runSecondLevelAsync: args => innerPlaceAsync(args),
+ // NOTE: We just use the first level result if the fetch using content location store fails because the place cannot succeed since the
+ // content will not have been put into the local CAS
+ useFirstLevelResult: result => !IsPlaceFileSuccess(result));
+ }
+
+ async Task>>> innerPlaceAsync(IReadOnlyList input)
+ {
+ // When the content is obtained from remote we still have to place it from the local.
+ // So in order to return the right source of the file placement
+ // we have to place locally but change the source for each result.
+ var results = await Inner.PlaceFileAsync(
+ operationContext,
+ input,
+ accessMode,
+ replacementMode,
+ realizationMode,
+ operationContext.Token,
+ urgencyHint);
+ var updatedResults = new List>();
+ foreach (var resultTask in results)
+ {
+ var result = await resultTask;
+ updatedResults.Add(
+ IndexedExtensions.WithIndex(
+ result.Item
+ .WithMaterializationSource(PlaceFileResult.Source.DatacenterCache),
+ result.Index));
+ }
+
+ return updatedResults.AsTasks();
+ }
+ }
+
+ private Task>>>> PerformPlaceFileGatedOperationAsync(
+ OperationContext operationContext,
+ Func>>>> func)
+ {
+ return GateExtensions.GatedOperationAsync(
+ PutAndPlaceFileGate,
+ async (timeWaiting, currentCount) =>
+ {
+ var gateOccupiedCount = Settings.MaximumConcurrentPutAndPlaceFileOperations - currentCount;
+
+ var result = await func();
+
+ return new ResultWithMetaData>>>(
+ new ResultMetaData(timeWaiting, gateOccupiedCount),
+ result);
+ },
+ operationContext.Token);
+ }
+
+ private static bool IsPlaceFileSuccess(PlaceFileResult result)
+ {
+ return result.Code != PlaceFileResult.ResultCode.Error && result.Code != PlaceFileResult.ResultCode.NotPlacedContentNotFound;
+ }
+
+ ///
+ public IEnumerable EnumeratePinnedContentHashes()
+ {
+ return Inner is IHibernateContentSession session
+ ? session.EnumeratePinnedContentHashes()
+ : Enumerable.Empty();
+ }
+
+ ///
+ public Task PinBulkAsync(Context context, IEnumerable contentHashes)
+ {
+ // TODO: Replace PinBulkAsync in hibernate with PinAsync bulk call (bug 1365340)
+ return Inner is IHibernateContentSession session
+ ? session.PinBulkAsync(context, contentHashes)
+ : Task.FromResult(0);
+ }
+
+ ///
+ public Task ShutdownEvictionAsync(Context context)
+ {
+ return Inner is IHibernateContentSession session
+ ? session.ShutdownEvictionAsync(context)
+ : BoolResult.SuccessTask;
+ }
+
+ private Task>>> FetchFromMultiLevelContentLocationStoreThenPutAsync(
+ OperationContext context,
+ IReadOnlyList hashesWithPaths,
+ UrgencyHint urgencyHint,
+ CopyReason reason)
+ {
+ // First try to place file by fetching files based on locally stored locations for the hash
+ // Then fallback to fetching file based on global locations minus the locally stored locations which were already checked
+
+ var localGetBulkResult = new BuildXL.Utilities.AsyncOut();
+
+ GetIndexedResults initialFunc = async args =>
+ {
+ var contentHashes = args.Select(p => p.Hash).ToList();
+ localGetBulkResult.Value =
+ await ContentLocationStore.GetBulkAsync(
+ context,
+ contentHashes,
+ context.Token,
+ urgencyHint,
+ GetBulkOrigin.Local);
+ return await FetchFromContentLocationStoreThenPutAsync(
+ context,
+ args,
+ GetBulkOrigin.Local,
+ urgencyHint,
+ localGetBulkResult.Value,
+ reason);
+ };
+
+ GetIndexedResults fallbackFunc = async args =>
+ {
+ var contentHashes = args.Select(p => p.Hash).ToList();
+ var globalGetBulkResult =
+ await ContentLocationStore.GetBulkAsync(
+ context,
+ contentHashes,
+ context.Token,
+ urgencyHint,
+ GetBulkOrigin.Global);
+ globalGetBulkResult =
+ globalGetBulkResult.Subtract(localGetBulkResult.Value);
+ return await FetchFromContentLocationStoreThenPutAsync(
+ context,
+ args,
+ GetBulkOrigin.Global,
+ urgencyHint,
+ globalGetBulkResult,
+ reason);
+ };
+
+ // If ColdStorage is ON try to place files from it before use remote locations
+ if (_coldStorage != null)
+ {
+ return Workflows.RunWithFallback(
+ hashesWithPaths,
+ initialFunc: async args => { return await _coldStorage.FetchThenPutBulkAsync(context, args, Inner); },
+ fallbackFunc: initialFunc,
+ secondFallbackFunc: fallbackFunc,
+ thirdFallbackFunc: async args =>
+ {
+ var coldStorageGetBulkResult = _coldStorage.GetBulkLocations(context, args);
+ return await FetchFromContentLocationStoreThenPutAsync(
+ context,
+ args,
+ GetBulkOrigin.ColdStorage,
+ urgencyHint,
+ coldStorageGetBulkResult,
+ reason);
+ },
+ isSuccessFunc: result => IsPlaceFileSuccess(result));
+ }
+
+ return Workflows.RunWithFallback(
+ hashesWithPaths,
+ initialFunc: initialFunc,
+ fallbackFunc: fallbackFunc,
+ isSuccessFunc: result => IsPlaceFileSuccess(result));
+ }
+
+ private async Task>>> FetchFromContentLocationStoreThenPutAsync(
+ OperationContext context,
+ IReadOnlyList hashesWithPaths,
+ GetBulkOrigin origin,
+ UrgencyHint urgencyHint,
+ GetBulkLocationsResult getBulkResult,
+ CopyReason reason)
+ {
+ try
+ {
+ // Tracing the hashes here for the entire list, instead of tracing one hash at a time inside TryCopyAndPutAsync method.
+
+ // This returns failure if any item in the batch wasn't copied locally
+ // TODO: split results and call PlaceFile on successfully copied files (bug 1365340)
+ if (!getBulkResult.Succeeded || !getBulkResult.ContentHashesInfo.Any())
+ {
+ return hashesWithPaths.Select(
+ _ => new PlaceFileResult(
+ getBulkResult,
+ PlaceFileResult.ResultCode.NotPlacedContentNotFound,
+ "Metadata records not found in content location store"))
+ .AsIndexedTasks();
+ }
+
+ async Task> copyAndPut(ContentHashWithSizeAndLocations contentHashWithSizeAndLocations, int index)
+ {
+ PlaceFileResult result;
+ try
+ {
+ if (!CanCopyContentHash(
+ context,
+ contentHashWithSizeAndLocations,
+ isGlobal: origin == GetBulkOrigin.Global,
+ out var useInRingMachineLocations,
+ out var errorMessage))
+ {
+ result = PlaceFileResult.CreateContentNotFound(errorMessage);
+ }
+ else
+ {
+ var copyResult = await TryCopyAndPutAsync(
+ context,
+ contentHashWithSizeAndLocations,
+ urgencyHint,
+ reason,
+ // We just traced all the hashes as a result of GetBulk call, no need to trace each individual hash.
+ trace: false,
+ // Using in-ring locations as well if the feature is on.
+ useInRingMachineLocations: useInRingMachineLocations,
+ outputPath: hashesWithPaths[index].Path);
+
+ if (!copyResult)
+ {
+ // For ColdStorage we should treat all errors as cache misses
+ result = origin != GetBulkOrigin.ColdStorage
+ ? new PlaceFileResult(copyResult)
+ : new PlaceFileResult(copyResult, PlaceFileResult.ResultCode.NotPlacedContentNotFound);
+ }
+ else
+ {
+ var source = origin == GetBulkOrigin.ColdStorage
+ ? PlaceFileResult.Source.ColdStorage
+ : PlaceFileResult.Source.DatacenterCache;
+ result = PlaceFileResult.CreateSuccess(PlaceFileResult.ResultCode.PlacedWithMove, copyResult.ContentSize, source);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ // The transform block should not fail with an exception otherwise the block's state will be changed to failed state and the exception
+ // won't be propagated to the caller.
+ result = new PlaceFileResult(e);
+ }
+
+ return result.WithIndex(index);
+ }
+
+ var copyFilesLocallyActionQueue = new ActionQueue(Settings.ParallelCopyFilesLimit);
+ var copyFilesLocally = await copyFilesLocallyActionQueue.SelectAsync(
+ items: getBulkResult.ContentHashesInfo,
+ body: (item, index) => copyAndPut(item, index));
+
+ var updateResults = await UpdateContentTrackerWithNewReplicaAsync(
+ context,
+ copyFilesLocally.Where(r => r.Item.Succeeded).Select(r => new ContentHashWithSize(hashesWithPaths[r.Index].Hash, r.Item.FileSize))
+ .ToList(),
+ urgencyHint);
+
+ if (!updateResults.Succeeded)
+ {
+ return copyFilesLocally.Select(result => new PlaceFileResult(updateResults).WithIndex(result.Index)).AsTasks();
+ }
+
+ return copyFilesLocally.AsTasks();
+ }
+ catch (Exception ex)
+ {
+ return hashesWithPaths.Select((hash, index) => new PlaceFileResult(ex).WithIndex(index)).AsTasks();
+ }
+ }
+
+ private bool CanCopyContentHash(
+ Context context,
+ ContentHashWithSizeAndLocations result,
+ bool isGlobal,
+ out bool useInRingMachineLocations,
+ [NotNullWhen(false)] out string message)
+ {
+ useInRingMachineLocations = isGlobal && Settings.UseInRingMachinesForCopies;
+ if (!isLocationsAvailable(out message))
+ {
+ if (useInRingMachineLocations && GetInRingActiveMachines().Length != 0)
+ {
+ string useInRingLocationsMessage =
+ $", but {nameof(DistributedContentStoreSettings.UseInRingMachinesForCopies)} is true. Trying to copy the content from in-ring machines.";
+ Tracer.Debug(context, message + useInRingLocationsMessage);
+ // Still trying copying the content from in-ring machines, even though the location information is lacking.
+ return true;
+ }
+
+ // Tracing only for global locations because they come last.
+ if (isGlobal)
+ {
+ Tracer.Warning(context, message);
+ }
+
+ return false;
+ }
+
+ return true;
+
+ bool isLocationsAvailable([NotNullWhen(false)] out string errorMessage)
+ {
+ errorMessage = null;
+ // Null represents no replicas were ever registered, where as empty list implies content is missing from all replicas
+ if (result.Locations == null)
+ {
+ errorMessage = $"No replicas registered for hash {result.ContentHash.ToShortString()}";
+ return false;
+ }
+
+ if (!result.Locations.Any())
+ {
+ errorMessage = $"No replicas currently exist in content tracker for hash {result.ContentHash.ToShortString()}";
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ private async Task TryCopyAndPutAsync(
+ OperationContext operationContext,
+ ContentHashWithSizeAndLocations hashInfo,
+ UrgencyHint urgencyHint,
+ CopyReason reason,
+ bool trace,
+ bool useInRingMachineLocations = false,
+ AbsolutePath outputPath = null)
+ {
+ Context context = operationContext;
+ CancellationToken cts = operationContext.Token;
+
+ if (trace)
+ {
+ Tracer.Debug(operationContext, $"Copying {hashInfo.ContentHash.ToShortString()} with {hashInfo.Locations?.Count ?? 0} locations");
+ }
+
+ var copyCompression = CopyCompression.None;
+ var copyCompressionThreshold = Settings.GrpcCopyCompressionSizeThreshold ?? 0;
+ if (copyCompressionThreshold > 0 && hashInfo.Size > copyCompressionThreshold)
+ {
+ copyCompression = Settings.GrpcCopyCompressionAlgorithm;
+ }
+
+ var copyRequest = new DistributedContentCopier.CopyRequest(
+ _contentStore,
+ hashInfo,
+ reason,
+ HandleCopyAsync: async args =>
+ {
+ (CopyFileResult copyFileResult, AbsolutePath tempLocation, _) = args;
+
+ PutResult innerPutResult;
+ long actualSize = copyFileResult.Size ?? hashInfo.Size;
+ if (Settings.UseTrustedHash(actualSize) && Inner is ITrustedContentSession trustedInner)
+ {
+ // The file has already been hashed, so we can trust the hash of the file.
+ innerPutResult = await trustedInner.PutTrustedFileAsync(
+ context,
+ new ContentHashWithSize(hashInfo.ContentHash, actualSize),
+ tempLocation,
+ FileRealizationMode.Move,
+ cts,
+ urgencyHint);
+ }
+ else
+ {
+ // Pass the HashType, not the Hash. This prompts a re-hash of the file, which places it where its actual hash requires.
+ // If the actual hash differs from the expected hash, then we fail below and move to the next location.
+ // Also, record the bytes if the file is small enough to be put into the ContentLocationStore.
+ innerPutResult = await Inner.PutFileAsync(
+ context,
+ hashInfo.ContentHash.HashType,
+ tempLocation,
+ FileRealizationMode.Move,
+ cts,
+ urgencyHint);
+ }
+
+ return innerPutResult;
+ },
+ copyCompression,
+ OverrideWorkingFolder: (Inner as ITrustedContentSession)?.TryGetWorkingDirectory(outputPath));
+
+ if (useInRingMachineLocations)
+ {
+ copyRequest = copyRequest with { InRingMachines = GetInRingActiveMachines() };
+ }
+
+ var putResult = await DistributedCopier.TryCopyAndPutAsync(operationContext, copyRequest);
+
+ return putResult;
+ }
+
+ ///
+ /// Runs a given function in the cancellation context of the outer store.
+ ///
+ ///
+ /// The lifetime of some session-based operation is longer than the session itself.
+ /// For instance, proactive copies, or put blob can outlive the lifetime of the session.
+ /// But these operations should not outlive the lifetime of the store, because when the store is closed
+ /// most likely all the operations will fail with some weird errors like "ObjectDisposedException".
+ ///
+ /// This helper method allows running the operations that may outlive the lifetime of the session but should not outlive the lifetime of the store.
+ ///
+ protected Task WithStoreCancellationAsync(
+ Context context,
+ Func> func,
+ CancellationToken token = default)
+ {
+ return ((StartupShutdownSlimBase)ContentLocationStore).WithOperationContext(context, token, func);
+ }
+
+ ///
+ /// Returns a cancellation token that is triggered when the outer store shutdown is started.
+ ///
+ protected CancellationToken StoreShutdownStartedCancellationToken =>
+ ((StartupShutdownSlimBase)ContentLocationStore).ShutdownStartedCancellationToken;
+
+ private Task UpdateContentTrackerWithNewReplicaAsync(
+ OperationContext context,
+ IReadOnlyList contentHashes,
+ UrgencyHint urgencyHint)
+ {
+ if (contentHashes.Count == 0)
+ {
+ return BoolResult.SuccessTask;
+ }
+
+ // TODO: Pass location store option (seems to only be used to prevent updating TTL when replicating for proactive replication) (bug 1365340)
+ return ContentLocationStore.RegisterLocalLocationAsync(context, contentHashes, context.Token, urgencyHint);
+ }
+
+ private Task>>> PinFromMultiLevelContentLocationStore(
+ OperationContext context,
+ IReadOnlyList contentHashes,
+ bool succeedWithOneLocation,
+ UrgencyHint urgencyHint = UrgencyHint.Nominal)
+ {
+ // Pinning a content based on a number of location is inherently dangerous in case of eviction storms in a system.
+ // The LLS data might be stale because of event processing delay and the global information can be inaccurate because
+ // the trim events are not sent to the global store.
+ // We can't do anything when the LLS data is stale, but in some stamps its beneficial not to rely on the global loctions.
+ if (Settings.PinConfiguration.UseLocalLocationsOnlyOnUnverifiedPin)
+ {
+ return PinFromContentLocationStoreOriginAsync(
+ context,
+ contentHashes,
+ GetBulkOrigin.Local,
+ succeedWithOneLocation: succeedWithOneLocation,
+ urgencyHint);
+ }
+
+ return Workflows.RunWithFallback(
+ contentHashes,
+ hashes => PinFromContentLocationStoreOriginAsync(
+ context,
+ hashes,
+ GetBulkOrigin.Local,
+ succeedWithOneLocation: succeedWithOneLocation,
+ urgencyHint),
+ hashes => PinFromContentLocationStoreOriginAsync(
+ context,
+ hashes,
+ GetBulkOrigin.Global,
+ succeedWithOneLocation: succeedWithOneLocation,
+ urgencyHint),
+ result => result.Succeeded);
+ }
+
+ // This method creates pages of hashes, makes one bulk call to the content location store to get content location record sets for all the hashes on the page,
+ // and fires off processing of the returned content location record sets while proceeding to the next page of hashes in parallel.
+ private async Task>>> PinFromContentLocationStoreOriginAsync(
+ OperationContext operationContext,
+ IReadOnlyList hashes,
+ GetBulkOrigin origin,
+ bool succeedWithOneLocation,
+ UrgencyHint urgency = UrgencyHint.Nominal)
+ {
+ CancellationToken cancel = operationContext.Token;
+ // Create an action block to process all the requested remote pins while limiting the number of simultaneously executed.
+ var pinnings = new List(hashes.Count);
+ var pinningAction = ActionBlockSlim.CreateWithAsyncAction(
+ degreeOfParallelism: Settings.PinConfiguration?.MaxIOOperations ?? 1,
+ async pinning => await PinRemoteAsync(
+ operationContext,
+ pinning,
+ isLocal: origin == GetBulkOrigin.Local,
+ updateContentTracker: false,
+ succeedWithOneLocation: succeedWithOneLocation),
+ cancellationToken: cancel);
+
+ // Make a bulk call to content location store to get location records for all hashes on the page.
+ // NOTE: We use GetBulkStackedAsync so that when Global results are retrieved we also include Local results to ensure we get a full view of available content
+ GetBulkLocationsResult pageLookup = await ContentLocationStoreExtensions.GetBulkStackedAsync(
+ ContentLocationStore,
+ operationContext,
+ hashes,
+ cancel,
+ urgency,
+ origin);
+
+ // If successful, fire off the remote pinning logic for each hash. If not, set all pins to failed.
+ if (pageLookup.Succeeded)
+ {
+ foreach (ContentHashWithSizeAndLocations record in pageLookup.ContentHashesInfo)
+ {
+ RemotePinning pinning = new RemotePinning(record);
+ pinnings.Add(pinning);
+ await pinningAction.PostAsync(pinning, cancel);
+ }
+ }
+ else
+ {
+ foreach (ContentHash hash in hashes)
+ {
+ Tracer.Warning(
+ operationContext,
+ $"Pin failed for hash {hash.ToShortString()}: directory query failed with error {pageLookup.ErrorMessage}");
+ RemotePinning pinning = new RemotePinning(new ContentHashWithSizeAndLocations(hash, -1L)) { Result = new PinResult(pageLookup) };
+ pinnings.Add(pinning);
+ }
+ }
+
+ Contract.Assert(pinnings.Count == hashes.Count);
+
+ // Wait for all the pinning actions to complete.
+ pinningAction.Complete();
+
+ try
+ {
+ await pinningAction.Completion;
+ }
+ catch (TaskCanceledException)
+ {
+ // Cancellation token provided to an action block can be canceled.
+ // Ignoring the exception in this case.
+ }
+
+ // Inform the content directory that we copied the files.
+ // Looking for distributed pin results that were successful by copying the content locally.
+ var localCopies = pinnings.Select((rp, index) => (result: rp, index))
+ .Where(x => x.result.Result is DistributedPinResult dpr && dpr.CopyLocally).ToList();
+
+ BoolResult updated = await UpdateContentTrackerWithNewReplicaAsync(
+ operationContext,
+ localCopies.Select(lc => new ContentHashWithSize(lc.result.Record.ContentHash, lc.result.Record.Size)).ToList(),
+ UrgencyHint.Nominal);
+ if (!updated.Succeeded)
+ {
+ // We failed to update the tracker. Need to update the results.
+ string hashesAsString = string.Join(", ", localCopies.Select(lc => lc.result.Record.ContentHash.ToShortString()));
+ Tracer.Warning(
+ operationContext,
+ $"Pin failed for hashes {hashesAsString}: local copy succeeded, but could not inform content directory due to {updated.ErrorMessage}.");
+ foreach (var (_, index) in localCopies)
+ {
+ pinnings[index].Result = new PinResult(updated);
+ }
+ }
+
+ // The return type should probably be just Task>, but higher callers require the Indexed wrapper and that the PinResults be encased in Tasks.
+ return pinnings.Select(x => x.Result ?? createCanceledPutResult()).AsIndexed().AsTasks();
+
+ static PinResult createCanceledPutResult() => new ErrorResult("The operation was canceled").AsResult();
+ }
+
+ // The dataflow framework can process only a single object, and returns no output from that processing. By combining the input and output of each remote pinning into a single object,
+ // we can nonetheless use the dataflow framework to process pinnings and read the output from the updated objects afterward.
+ private class RemotePinning
+ {
+ public ContentHashWithSizeAndLocations Record { get; }
+
+ private PinResult _result;
+
+ public PinResult Result
+ {
+ get => _result;
+ set
+ {
+ value!.ContentSize = Record.Size;
+ _result = value;
+ }
+ }
+
+ public RemotePinning(ContentHashWithSizeAndLocations record)
+ => Record = record;
+ }
+
+ // This method processes each remote pinning, setting the output when the operation is completed.
+ private async Task PinRemoteAsync(
+ OperationContext context,
+ RemotePinning pinning,
+ bool isLocal,
+ bool updateContentTracker = true,
+ bool succeedWithOneLocation = false)
+ {
+ pinning.Result = await PinRemoteAsync(
+ context,
+ pinning.Record,
+ isLocal,
+ updateContentTracker,
+ succeedWithOneLocation: succeedWithOneLocation);
+ }
+
+ // This method processes a single content location record set for pinning.
+ private async Task PinRemoteAsync(
+ OperationContext operationContext,
+ ContentHashWithSizeAndLocations remote,
+ bool isLocal,
+ bool updateContentTracker = true,
+ bool succeedWithOneLocation = false)
+ {
+ IReadOnlyList locations = remote.Locations;
+
+ // If no remote locations are recorded, we definitely can't pin
+ if (locations == null || locations.Count == 0)
+ {
+ if (!isLocal)
+ {
+ // Trace only when pin failed based on the data from the global store.
+ Tracer.Warning(operationContext, $"Pin failed for hash {remote.ContentHash.ToShortString()}: no remote records.");
+ }
+
+ return DistributedPinResult.ContentNotFound(replicaCount: 0, "No locations found");
+ }
+
+ // When we only require the content to exist at least once anywhere, we can ignore pin thresholds
+ // and return success after finding a single location.
+ if (succeedWithOneLocation && locations.Count >= 1)
+ {
+ return DistributedPinResult.EnoughReplicas(locations.Count, "Global succeeds");
+ }
+
+ if (locations.Count >= Settings.PinConfiguration.PinMinUnverifiedCount)
+ {
+ SessionCounters[Counters.PinUnverifiedCountSatisfied].Increment();
+
+ // Tracing extra data if the locations were merged to separate the local locations and the global locations that we added to the final result.
+ string extraMessage = null;
+ if (!isLocal)
+ {
+ // Extra locations does make sense only for the global case when the entries were merged.
+ extraMessage = $"ExtraGlobal: {remote.ExtraMergedLocations}";
+ }
+
+ var result = DistributedPinResult.EnoughReplicas(locations.Count, extraMessage);
+
+ // Triggering an async copy if the number of replicas are close to a PinMinUnverifiedCount threshold.
+ int threshold = Settings.PinConfiguration.PinMinUnverifiedCount +
+ Settings.PinConfiguration.AsyncCopyOnPinThreshold;
+ if (locations.Count < threshold)
+ {
+ Tracer.Info(
+ operationContext,
+ $"Starting asynchronous copy of the content for hash {remote.ContentHash.ToShortString()} because the number of locations '{locations.Count}' is less then a threshold of '{threshold}'.");
+ SessionCounters[Counters.StartCopyForPinWhenUnverifiedCountSatisfied].Increment();
+
+ // For "synchronous" pins the tracker is updated at once for all the hashes for performance reasons,
+ // but for asynchronous copy we always need to update the tracker with a new location.
+ var task = WithStoreCancellationAsync(
+ operationContext.TracingContext,
+ opContext => TryCopyAndPutAndUpdateContentTrackerAsync(
+ opContext,
+ remote,
+ updateContentTracker: true,
+ CopyReason.AsyncCopyOnPin));
+ if (Settings.InlineOperationsForTests)
+ {
+ (await task).TraceIfFailure(operationContext);
+ }
+ else
+ {
+ task.FireAndForget(operationContext.TracingContext, traceErrorResult: true, operation: "AsynchronousCopyOnPin");
+ }
+
+ // Note: Pin result traces with CpA (copied asynchronously) code is to provide the information that the content is being copied asynchronously, and that replica count is enough but not above async copy threshold.
+ // This trace result does not represent that of the async copy since that is done FireAndForget.
+ result = DistributedPinResult.AsynchronousCopy(locations.Count);
+ }
+
+ return result;
+ }
+
+ if (isLocal)
+ {
+ // Don't copy content locally based on locally cached result. So stop here and return content not found.
+ // This method will be called again with global locations at which time we will attempt to copy the files locally.
+ // When allowing global locations to succeed a put, report success.
+ return DistributedPinResult.ContentNotFound(locations.Count);
+ }
+
+ // Previous checks were not sufficient, so copy the file locally.
+ PutResult copy = await TryCopyAndPutAsync(operationContext, remote, UrgencyHint.Nominal, CopyReason.Pin, trace: false);
+ if (copy)
+ {
+ if (!updateContentTracker)
+ {
+ return DistributedPinResult.SynchronousCopy(locations.Count);
+ }
+
+ // Inform the content directory that we have the file.
+ // We wait for this to complete, rather than doing it fire-and-forget, because another machine in the ring may need the pinned content immediately.
+ BoolResult updated = await UpdateContentTrackerWithNewReplicaAsync(
+ operationContext,
+ new[] { new ContentHashWithSize(remote.ContentHash, copy.ContentSize) },
+ UrgencyHint.Nominal);
+ if (updated.Succeeded)
+ {
+ return DistributedPinResult.SynchronousCopy(locations.Count);
+ }
+ else
+ {
+ // Tracing the error separately.
+ Tracer.Warning(
+ operationContext,
+ $"Pin failed for hash {remote.ContentHash.ToShortString()}: local copy succeeded, but could not inform content directory due to {updated.ErrorMessage}.");
+ return new DistributedPinResult(locations.Count, updated);
+ }
+ }
+ else
+ {
+ // Tracing the error separately.
+ Tracer.Warning(operationContext, $"Pin failed for hash {remote.ContentHash.ToShortString()}: local copy failed with {copy}.");
+ return DistributedPinResult.ContentNotFound(locations.Count);
+ }
+ }
+
+ private async Task TryCopyAndPutAndUpdateContentTrackerAsync(
+ OperationContext operationContext,
+ ContentHashWithSizeAndLocations remote,
+ bool updateContentTracker,
+ CopyReason reason)
+ {
+ PutResult copy = await TryCopyAndPutAsync(operationContext, remote, UrgencyHint.Nominal, reason, trace: true);
+ if (copy && updateContentTracker)
+ {
+ return await UpdateContentTrackerWithNewReplicaAsync(
+ operationContext,
+ new[] { new ContentHashWithSize(remote.ContentHash, copy.ContentSize) },
+ UrgencyHint.Nominal);
+ }
+
+ return copy;
+ }
+
+ private Task UpdateContentTrackerWithLocalHitsAsync(
+ OperationContext context,
+ IReadOnlyList contentHashesWithInfo,
+ UrgencyHint urgencyHint)
+ {
+ if (Disposed)
+ {
+ // Nothing to do.
+ return BoolTask.True;
+ }
+
+ if (contentHashesWithInfo.Count == 0)
+ {
+ // Nothing to do.
+ return BoolTask.True;
+ }
+
+ IReadOnlyList hashesToEagerUpdate =
+ contentHashesWithInfo.Select(x => new ContentHashWithSize(x.Hash, x.Size)).ToList();
+
+ // Wait for update to complete on remaining hashes to cover case where the record has expired and another machine in the ring requests it immediately after this pin succeeds.
+ return UpdateContentTrackerWithNewReplicaAsync(context, hashesToEagerUpdate, urgencyHint);
+ }
+
+ internal async Task> GetLocationsForProactiveCopyAsync(
+ OperationContext context,
+ IReadOnlyList hashes)
+ {
+ var originalLength = hashes.Count;
+ if (_buildIdHash.HasValue && !_buildRingMachinesCache.IsUpToDate())
+ {
+ Tracer.Debug(
+ context,
+ $"{Tracer.Name}.{nameof(GetLocationsForProactiveCopyAsync)}: getting in-ring machines for BuildId='{_buildId}'.");
+ // Add build id hash to hashes so build ring machines can be updated
+ hashes = hashes.AppendItem(_buildIdHash.Value).ToList();
+ }
+
+ var result = await MultiLevelUtilities.RunMultiLevelWithMergeAsync(
+ hashes,
+ inputs => ResultsExtensions.ThrowIfFailureAsync>(
+ ContentLocationStore.GetBulkAsync(context, inputs, context.Token, UrgencyHint.Nominal, GetBulkOrigin.Local),
+ g => g.ContentHashesInfo),
+ inputs => ResultsExtensions.ThrowIfFailureAsync>(
+ ContentLocationStore.GetBulkAsync(context, inputs, context.Token, UrgencyHint.Nominal, GetBulkOrigin.Global),
+ g => g.ContentHashesInfo),
+ mergeResults: ContentHashWithSizeAndLocations.Merge,
+ useFirstLevelResult: result =>
+ {
+ if (result.Locations?.Count >= Settings.ProactiveCopyLocationsThreshold)
+ {
+ SessionCounters[Counters.GetLocationsSatisfiedFromLocal].Increment();
+ return true;
+ }
+ else
+ {
+ SessionCounters[Counters.GetLocationsSatisfiedFromRemote].Increment();
+ return false;
+ }
+ });
+
+ if (hashes.Count != originalLength)
+ {
+ // Update build ring machines with retrieved locations
+ var buildRingMachines = result.Last().Locations?.AppendItem(LocalCacheRootMachineLocation).ToArray() ??
+ CollectionUtilities.EmptyArray();
+ _buildRingMachinesCache.Update(buildRingMachines);
+ Tracer.Debug(
+ context,
+ $"{Tracer.Name}.{nameof(GetLocationsForProactiveCopyAsync)}: InRingMachines=[{string.Join(", ", buildRingMachines.Select(m => m.Path))}] BuildId='{_buildId}'");
+ return result.Take(originalLength).ToList();
+ }
+ else
+ {
+ return result;
+ }
+ }
+
+ internal async Task ProactiveCopyIfNeededAsync(
+ OperationContext context,
+ ContentHash hash,
+ bool tryBuildRing,
+ CopyReason reason)
+ {
+ var nagleQueue = _proactiveCopyGetBulkNagleQueue;
+ if (nagleQueue is null)
+ {
+ return new ProactiveCopyResult(new ErrorResult("StartupAsync was not called"));
+ }
+
+ ContentHashWithSizeAndLocations result = await nagleQueue.EnqueueAsync(hash);
+ return await ProactiveCopyIfNeededAsync(context, result, tryBuildRing, reason);
+ }
+
+ internal Task ProactiveCopyIfNeededAsync(
+ OperationContext context,
+ ContentHashWithSizeAndLocations info,
+ bool tryBuildRing,
+ CopyReason reason)
+ {
+ var hash = info.ContentHash;
+ if (!_pendingProactivePuts.Add(hash)
+ || info.ContentHash.IsEmptyHash()) // No reason to push an empty hash to another machine.
+ {
+ return Task.FromResult(ProactiveCopyResult.CopyNotRequiredResult);
+ }
+
+ // Don't trace this case since it would add too much log traffic.
+ var replicatedLocations = (info.Locations ?? CollectionUtilities.EmptyArray()).ToList();
+
+ if (replicatedLocations.Count >= Settings.ProactiveCopyLocationsThreshold)
+ {
+ SessionCounters[Counters.ProactiveCopiesSkipped].Increment();
+ return Task.FromResult(ProactiveCopyResult.CopyNotRequiredResult);
+ }
+
+ // By adding the master to replicatedLocations, it will be excluded from proactive replication
+ var masterLocation = _contentStore.LocalLocationStore?.MasterElectionMechanism.Master;
+ if (masterLocation is not null && masterLocation.Value.IsValid)
+ {
+ replicatedLocations.Add(masterLocation.Value);
+ }
+
+ return context.PerformOperationAsync(
+ Tracer,
+ operation: async () =>
+ {
+ try
+ {
+ var outsideRingCopyTask = ProactiveCopyOutsideBuildRingWithRetryAsync(context, info, replicatedLocations, reason);
+ var insideRingCopyTask = ProactiveCopyInsideBuildRingWithRetryAsync(
+ context,
+ info,
+ tryBuildRing,
+ replicatedLocations,
+ reason);
+ await Task.WhenAll(outsideRingCopyTask, insideRingCopyTask);
+
+ var (insideRingRetries, insideRingResult) = await insideRingCopyTask;
+ var (outsideRingRetries, outsideRingResult) = await outsideRingCopyTask;
+
+ int totalRetries = insideRingRetries + outsideRingRetries;
+ return new ProactiveCopyResult(insideRingResult, outsideRingResult, totalRetries, info.Entry);
+ }
+ finally
+ {
+ _pendingProactivePuts.Remove(hash);
+ }
+ },
+ extraEndMessage: r => $"Hash={info.ContentHash}, Retries={r.TotalRetries}, Reason=[{reason}]");
+ }
+
+ private async Task<(int retries, ProactivePushResult outsideRingCopyResult)> ProactiveCopyOutsideBuildRingWithRetryAsync(
+ OperationContext context,
+ ContentHashWithSize hash,
+ IReadOnlyList replicatedLocations,
+ CopyReason reason)
+ {
+ var outsideRingCopyResult = await ProactiveCopyOutsideBuildRingAsync(context, hash, replicatedLocations, reason, retries: 0);
+ int retries = 0;
+ while (outsideRingCopyResult.QualifiesForRetry && retries < Settings.ProactiveCopyMaxRetries)
+ {
+ SessionCounters[Counters.ProactiveCopyRetries].Increment();
+ SessionCounters[Counters.ProactiveCopyOutsideRingRetries].Increment();
+ retries++;
+ outsideRingCopyResult = await ProactiveCopyOutsideBuildRingAsync(context, hash, replicatedLocations, reason, retries);
+ }
+
+ return (retries, outsideRingCopyResult);
+ }
+
+ private async Task ProactiveCopyOutsideBuildRingAsync(
+ OperationContext context,
+ ContentHashWithSize hash,
+ IReadOnlyList replicatedLocations,
+ CopyReason reason,
+ int retries)
+ {
+ // The first attempt is not considered as retry
+ int attempt = retries + 1;
+ if ((Settings.ProactiveCopyMode & ProactiveCopyMode.OutsideRing) == 0)
+ {
+ return ProactivePushResult.FromPushFileResult(PushFileResult.Disabled(), attempt);
+ }
+
+ Result getLocationResult = null;
+ var source = ProactiveCopyLocationSource.Random;
+
+ // Make sure that the machine is not in the build ring and does not already have the content.
+ var machinesToSkip = replicatedLocations.Concat(BuildRingMachines).ToArray();
+
+ // Try to select one of the designated machines for this hash.
+ if (Settings.ProactiveCopyUsePreferredLocations)
+ {
+ var designatedLocationsResult = ContentLocationStore.GetDesignatedLocations(hash);
+ if (designatedLocationsResult.Succeeded)
+ {
+ // A machine in the build may be a designated location for the hash,
+ // but we won't pushing to the same machine twice, because 'replicatedLocations' argument
+ // has a local machine that we're about to push for inside the ring copy.
+ // We also want to skip inside ring machines for outsidecopy task
+ var candidates = Enumerable.Except(designatedLocationsResult.Value, machinesToSkip).ToArray();
+
+ if (candidates.Length > 0)
+ {
+ getLocationResult = candidates[ThreadSafeRandom.Generator.Next(0, candidates.Length)];
+ source = ProactiveCopyLocationSource.DesignatedLocation;
+ SessionCounters[Counters.ProactiveCopy_OutsideRingFromPreferredLocations].Increment();
+ }
+ }
+ }
+
+ // Try to select one machine at random.
+ if (getLocationResult?.Succeeded != true)
+ {
+ getLocationResult = ContentLocationStore.GetRandomMachineLocation(except: machinesToSkip);
+ source = ProactiveCopyLocationSource.Random;
+ }
+
+ if (!getLocationResult.Succeeded)
+ {
+ return ProactivePushResult.FromPushFileResult(new PushFileResult(getLocationResult), attempt);
+ }
+
+ var candidate = getLocationResult.Value;
+ SessionCounters[Counters.ProactiveCopy_OutsideRingCopies].Increment();
+ PushFileResult pushFileResult = await PushContentAsync(context, hash, candidate, isInsideRing: false, reason, source, attempt);
+ return ProactivePushResult.FromPushFileResult(pushFileResult, attempt);
+ }
+
+ ///
+ /// Gets all the active in-ring machines (excluding the current one).
+ ///
+ public MachineLocation[] GetInRingActiveMachines()
+ {
+ return Enumerable.Where(BuildRingMachines, m => !m.Equals(LocalCacheRootMachineLocation))
+ .Where(m => ContentLocationStore.IsMachineActive(m))
+ .ToArray();
+ }
+
+ private async Task<(int retries, ProactivePushResult insideRingCopyResult)> ProactiveCopyInsideBuildRingWithRetryAsync(
+ OperationContext context,
+ ContentHashWithSize hash,
+ bool tryBuildRing,
+ IReadOnlyList replicatedLocations,
+ CopyReason reason)
+ {
+ ProactivePushResult insideRingCopyResult = await ProactiveCopyInsideBuildRing(
+ context,
+ hash,
+ tryBuildRing,
+ replicatedLocations,
+ reason,
+ retries: 0);
+ int retries = 0;
+ while (insideRingCopyResult.QualifiesForRetry && retries < Settings.ProactiveCopyMaxRetries)
+ {
+ SessionCounters[Counters.ProactiveCopyRetries].Increment();
+ SessionCounters[Counters.ProactiveCopyInsideRingRetries].Increment();
+ retries++;
+ insideRingCopyResult = await ProactiveCopyInsideBuildRing(context, hash, tryBuildRing, replicatedLocations, reason, retries);
+ }
+
+ return (retries, insideRingCopyResult);
+ }
+
+ private async Task ProactiveCopyInsideBuildRing(
+ OperationContext context,
+ ContentHashWithSize hash,
+ bool tryBuildRing,
+ IReadOnlyList replicatedLocations,
+ CopyReason reason,
+ int retries)
+ {
+ // The first attempt is not considered as retry
+ int attempt = retries + 1;
+
+ // Get random machine inside build ring
+ if (!tryBuildRing || (Settings.ProactiveCopyMode & ProactiveCopyMode.InsideRing) == 0)
+ {
+ return ProactivePushResult.FromPushFileResult(PushFileResult.Disabled(), attempt);
+ }
+
+ if (_buildIdHash == null)
+ {
+ return ProactivePushResult.FromStatus(ProactivePushStatus.BuildIdNotSpecified, attempt);
+ }
+
+ // Having an explicit case to check if the in-ring machine list is empty to separate the case
+ // when the machine list is not empty but all the candidates are unavailable.
+ if (BuildRingMachines.Length == 0)
+ {
+ return ProactivePushResult.FromStatus(ProactivePushStatus.InRingMachineListIsEmpty, attempt);
+ }
+
+ var candidates = GetInRingActiveMachines();
+
+ if (candidates.Length == 0)
+ {
+ return ProactivePushResult.FromStatus(ProactivePushStatus.MachineNotFound, attempt);
+ }
+
+ candidates = candidates.Except(replicatedLocations).ToArray();
+ if (candidates.Length == 0)
+ {
+ SessionCounters[Counters.ProactiveCopy_InsideRingFullyReplicated].Increment();
+ return ProactivePushResult.FromStatus(ProactivePushStatus.MachineAlreadyHasCopy, attempt);
+ }
+
+ SessionCounters[Counters.ProactiveCopy_InsideRingCopies].Increment();
+ var candidate = candidates[ThreadSafeRandom.Generator.Next(0, candidates.Length)];
+ PushFileResult pushFileResult = await PushContentAsync(
+ context,
+ hash,
+ candidate,
+ isInsideRing: true,
+ reason,
+ ProactiveCopyLocationSource.Random,
+ attempt);
+ return ProactivePushResult.FromPushFileResult(pushFileResult, attempt);
+ }
+
+ private async Task PushContentAsync(
+ OperationContext context,
+ ContentHashWithSize hash,
+ MachineLocation target,
+ bool isInsideRing,
+ CopyReason reason,
+ ProactiveCopyLocationSource source,
+ int attempt)
+ {
+ // This is here to avoid hanging ProactiveCopyIfNeededAsync on inside/outside ring copies before starting
+ // the other one.
+ await Task.Yield();
+
+ if (Settings.PushProactiveCopies)
+ {
+ // It is possible that this method is used during proactive replication
+ // and the hash was already evicted at the time this method is called.
+ var streamResult = await Inner.OpenStreamAsync(context, hash, context.Token);
+ if (!streamResult.Succeeded)
+ {
+ return PushFileResult.SkipContentUnavailable();
+ }
+
+ using var stream = streamResult.Stream!;
+
+ return await DistributedCopier.PushFileAsync(
+ context,
+ hash,
+ target,
+ stream,
+ isInsideRing,
+ reason,
+ source,
+ attempt);
+ }
+ else
+ {
+ var requestResult = await DistributedCopier.RequestCopyFileAsync(context, hash, target, isInsideRing, attempt);
+ if (requestResult)
+ {
+ return PushFileResult.PushSucceeded(size: null);
+ }
+
+ return new PushFileResult(requestResult, "Failed requesting a copy");
+ }
+ }
+
+ ///
+ protected override CounterSet GetCounters() =>
+ base.GetCounters()
+ .Merge(DistributedCopier.GetCounters())
+ .Merge(CounterUtilities.ToCounterSet((CounterCollection)SessionCounters));
+
///
protected override Task PutFileCoreAsync(
OperationContext operationContext,
@@ -56,13 +1765,15 @@ namespace BuildXL.Cache.ContentStore.Distributed.Sessions
UrgencyHint urgencyHint,
Counter retryCounter)
{
- return PerformPutFileGatedOperationAsync(operationContext, () =>
- {
- return PutCoreAsync(
- operationContext,
- urgencyHint,
- session => session.PutFileAsync(operationContext, hashType, path, realizationMode, operationContext.Token, urgencyHint));
- });
+ return PerformPutFileGatedOperationAsync(
+ operationContext,
+ () =>
+ {
+ return PutCoreAsync(
+ operationContext,
+ urgencyHint,
+ session => session.PutFileAsync(operationContext, hashType, path, realizationMode, operationContext.Token, urgencyHint));
+ });
}
///
@@ -76,26 +1787,30 @@ namespace BuildXL.Cache.ContentStore.Distributed.Sessions
{
// We are intentionally not gating PutStream operations because we don't expect a high number of them at
// the same time.
- return PerformPutFileGatedOperationAsync(operationContext, () =>
- {
- return PutCoreAsync(
- operationContext,
- urgencyHint,
- session => session.PutFileAsync(operationContext, contentHash, path, realizationMode, operationContext.Token, urgencyHint));
- });
+ return PerformPutFileGatedOperationAsync(
+ operationContext,
+ () =>
+ {
+ return PutCoreAsync(
+ operationContext,
+ urgencyHint,
+ session => session.PutFileAsync(operationContext, contentHash, path, realizationMode, operationContext.Token, urgencyHint));
+ });
}
private Task PerformPutFileGatedOperationAsync(OperationContext operationContext, Func> func)
{
- return PutAndPlaceFileGate.GatedOperationAsync(async (timeWaiting, currentCount) =>
- {
- var gateOccupiedCount = Settings.MaximumConcurrentPutAndPlaceFileOperations - currentCount;
+ return PutAndPlaceFileGate.GatedOperationAsync(
+ async (timeWaiting, currentCount) =>
+ {
+ var gateOccupiedCount = Settings.MaximumConcurrentPutAndPlaceFileOperations - currentCount;
- var result = await func();
- result.MetaData = new ResultMetaData(timeWaiting, gateOccupiedCount);
+ var result = await func();
+ result.MetaData = new ResultMetaData(timeWaiting, gateOccupiedCount);
- return result;
- }, operationContext.Token);
+ return result;
+ },
+ operationContext.Token);
}
///
@@ -150,7 +1865,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.Sessions
// Since the rest of the operation is done asynchronously, create new context to stop cancelling operation prematurely.
var proactiveCopyTask = WithStoreCancellationAsync(
context,
- operationContext => ProactiveCopyIfNeededAsync(operationContext, result.ContentHash, tryBuildRing: true, CopyReason.ProactiveCopyOnPut)
+ operationContext => ProactiveCopyIfNeededAsync(
+ operationContext,
+ result.ContentHash,
+ tryBuildRing: true,
+ CopyReason.ProactiveCopyOnPut)
);
if (Settings.InlineOperationsForTests)
@@ -166,7 +1885,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.Sessions
else
{
// Tracing task-related errors because normal failures already traced by the operation provider
- proactiveCopyTask.TraceIfFailure(context, failureSeverity: Severity.Debug, traceTaskExceptionsOnly: true, operation: "ProactiveCopyIfNeeded");
+ proactiveCopyTask.TraceIfFailure(
+ context,
+ failureSeverity: Severity.Debug,
+ traceTaskExceptionsOnly: true,
+ operation: "ProactiveCopyIfNeeded");
}
}
@@ -185,7 +1908,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Sessions
// is definitely available in the global store.
urgencyHint = UrgencyHint.RegisterEagerly;
}
-
+
var updateResult = await ContentLocationStore.RegisterLocalLocationAsync(
context,
new[] { new ContentHashWithSize(putResult.ContentHash, putResult.ContentSize) },
diff --git a/Public/Src/Cache/ContentStore/Distributed/Sessions/ReadOnlyDistributedContentSession.cs b/Public/Src/Cache/ContentStore/Distributed/Sessions/ReadOnlyDistributedContentSession.cs
deleted file mode 100644
index 401fffe49..000000000
--- a/Public/Src/Cache/ContentStore/Distributed/Sessions/ReadOnlyDistributedContentSession.cs
+++ /dev/null
@@ -1,1562 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using System.Diagnostics.ContractsLight;
-using System.IO;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using BuildXL.Cache.ContentStore.Distributed.Stores;
-using BuildXL.Cache.ContentStore.Distributed.Utilities;
-using BuildXL.Cache.ContentStore.Extensions;
-using BuildXL.Cache.ContentStore.Hashing;
-using BuildXL.Cache.ContentStore.Interfaces.Distributed;
-using BuildXL.Cache.ContentStore.Interfaces.Extensions;
-using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
-using BuildXL.Cache.ContentStore.Interfaces.Results;
-using BuildXL.Cache.ContentStore.Interfaces.Sessions;
-using BuildXL.Cache.ContentStore.Interfaces.Stores;
-using BuildXL.Cache.ContentStore.Interfaces.Time;
-using BuildXL.Cache.ContentStore.Interfaces.Tracing;
-using BuildXL.Cache.ContentStore.Interfaces.Utils;
-using BuildXL.Cache.ContentStore.Service.Grpc;
-using BuildXL.Cache.ContentStore.Sessions;
-using BuildXL.Cache.ContentStore.Sessions.Internal;
-using BuildXL.Cache.ContentStore.Tracing;
-using BuildXL.Cache.ContentStore.Tracing.Internal;
-using BuildXL.Cache.ContentStore.UtilitiesCore;
-using BuildXL.Cache.ContentStore.Utils;
-using BuildXL.Utilities.Collections;
-using BuildXL.Utilities.ParallelAlgorithms;
-using BuildXL.Utilities.Core.Tasks;
-using BuildXL.Utilities.Tracing;
-using ContentStore.Grpc;
-using PlaceBulkResult = System.Collections.Generic.IEnumerable>>;
-
-#nullable enable
-
-namespace BuildXL.Cache.ContentStore.Distributed.Sessions
-{
- ///
- /// A read only content location based content session with an inner session for storage.
- ///
- public class ReadOnlyDistributedContentSession : ContentSessionBase, IHibernateContentSession, IConfigurablePin
- {
- internal enum Counters
- {
- GetLocationsSatisfiedFromLocal,
- GetLocationsSatisfiedFromRemote,
- PinUnverifiedCountSatisfied,
- StartCopyForPinWhenUnverifiedCountSatisfied,
- ProactiveCopiesSkipped,
- ProactiveCopy_OutsideRingFromPreferredLocations,
- ProactiveCopy_OutsideRingCopies,
- ProactiveCopyRetries,
- ProactiveCopyInsideRingRetries,
- ProactiveCopyOutsideRingRetries,
- ProactiveCopy_InsideRingCopies,
- ProactiveCopy_InsideRingFullyReplicated,
- }
-
- internal CounterCollection SessionCounters { get; } = new CounterCollection();
-
- private string? _buildId = null;
- private ContentHash? _buildIdHash = null;
- private readonly ExpiringValue _buildRingMachinesCache;
-
- private MachineLocation[] BuildRingMachines => _buildRingMachinesCache.GetValueOrDefault() ?? Array.Empty();
-
- private readonly ConcurrentBigSet _pendingProactivePuts = new ConcurrentBigSet();
- private ResultNagleQueue? _proactiveCopyGetBulkNagleQueue;
-
- // The method used for remote pins depends on which pin configuration is enabled.
- private readonly RemotePinAsync _remotePinner;
-
- ///
- /// The store that persists content locations to a persistent store.
- ///
- internal readonly IContentLocationStore ContentLocationStore;
-
- private readonly ColdStorage? _coldStorage;
-
- ///
- /// The machine location for the current cache.
- ///
- protected readonly MachineLocation LocalCacheRootMachineLocation;
-
- ///
- /// The content session that actually stores content.
- ///
- public IContentSession Inner { get; }
-
- ///
- protected override Tracer Tracer { get; } = new Tracer(nameof(DistributedContentSession));
-
- ///
- protected readonly DistributedContentCopier DistributedCopier;
-
- ///
- /// Settings for the session.
- ///
- protected readonly DistributedContentStoreSettings Settings;
-
- ///
- /// Trace only stops and errors to reduce the Kusto traffic.
- ///
- protected override bool TraceOperationStarted => false;
-
- ///
- /// Semaphore that limits the maximum number of concurrent put and place operations
- ///
- protected readonly SemaphoreSlim PutAndPlaceFileGate;
-
- private readonly DistributedContentStore _contentStore;
-
- ///
- public ReadOnlyDistributedContentSession(
- string name,
- IContentSession inner,
- IContentLocationStore contentLocationStore,
- DistributedContentCopier contentCopier,
- DistributedContentStore distributedContentStore,
- MachineLocation localMachineLocation,
- ColdStorage? coldStorage,
- DistributedContentStoreSettings? settings = default)
- : base(name)
- {
- Contract.Requires(name != null);
- Contract.Requires(inner != null);
- Contract.Requires(contentLocationStore != null);
- Contract.Requires(contentLocationStore is StartupShutdownSlimBase, "The store must derive from StartupShutdownSlimBase");
- Contract.Requires(localMachineLocation.IsValid);
-
- Inner = inner;
- ContentLocationStore = contentLocationStore;
- LocalCacheRootMachineLocation = localMachineLocation;
- Settings = settings ?? DistributedContentStoreSettings.DefaultSettings;
- _contentStore = distributedContentStore;
- _remotePinner = PinFromMultiLevelContentLocationStore;
- DistributedCopier = contentCopier;
- PutAndPlaceFileGate = new SemaphoreSlim(Settings.MaximumConcurrentPutAndPlaceFileOperations);
-
- _coldStorage = coldStorage;
-
- _buildRingMachinesCache = new ExpiringValue(
- Settings.ProactiveCopyInRingMachineLocationsExpiryCache,
- SystemClock.Instance,
- originalValue: Array.Empty());
- }
-
- ///
- protected override async Task StartupCoreAsync(OperationContext context)
- {
- var canHibernate = Inner is IHibernateContentSession ? "can" : "cannot";
- Tracer.Debug(context, $"Session {Name} {canHibernate} hibernate");
- await Inner.StartupAsync(context).ThrowIfFailure();
-
- _proactiveCopyGetBulkNagleQueue = new ResultNagleQueue(
- execute: hashes => GetLocationsForProactiveCopyAsync(context.CreateNested(Tracer.Name), hashes),
- maxDegreeOfParallelism: 1,
- interval: Settings.ProactiveCopyGetBulkInterval,
- batchSize: Settings.ProactiveCopyGetBulkBatchSize);
- _proactiveCopyGetBulkNagleQueue.Start();
-
- TryRegisterMachineWithBuildId(context);
-
- return BoolResult.Success;
- }
-
- private void TryRegisterMachineWithBuildId(OperationContext context)
- {
- if (Constants.TryExtractBuildId(Name, out _buildId) && Guid.TryParse(_buildId, out var buildIdGuid))
- {
- // Generate a fake hash for the build and register a content entry in the location store to represent
- // machines in the build ring
- var buildIdContent = buildIdGuid.ToByteArray();
-
- var buildIdHash = GetBuildIdHash(buildIdContent);
-
- var arguments = $"Build={_buildId}, BuildIdHash={buildIdHash.ToShortString()}";
-
- context.PerformOperationAsync(Tracer, async () =>
- {
- // Storing the build id in the cache to prevent this data to be removed during reconciliation.
- using var stream = new MemoryStream(buildIdContent);
- var result = await Inner.PutStreamAsync(context, buildIdHash, stream, CancellationToken.None, UrgencyHint.Nominal);
- result.ThrowIfFailure();
-
- // Even if 'StoreBuildIdInCache' is true we need to register the content manually,
- // because Inner.PutStreamAsync will just add the content to the cache but won't send a registration message.
- await ContentLocationStore.RegisterLocalLocationAsync(context, new[] { new ContentHashWithSize(buildIdHash, buildIdContent.Length) }, context.Token, UrgencyHint.Nominal).ThrowIfFailure();
-
- // Updating the build id field only if the registration or the put operation succeeded.
- _buildIdHash = buildIdHash;
-
- return BoolResult.Success;
- },
- extraStartMessage: arguments,
- extraEndMessage: r => arguments).FireAndForget(context);
- }
- }
-
- private static ContentHash GetBuildIdHash(byte[] buildId) => buildId.CalculateHash(HashType.MD5);
-
- ///
- protected override async Task ShutdownCoreAsync(OperationContext context)
- {
- var counterSet = new CounterSet();
- counterSet.Merge(GetCounters(), $"{Tracer.Name}.");
-
- // Unregister from build machine location set
- if (_buildIdHash.HasValue)
- {
- Guid.TryParse(_buildId, out var buildIdGuid);
- var buildIdHash = GetBuildIdHash(buildIdGuid.ToByteArray());
- Tracer.Debug(context, $"Deleting in-ring mapping from cache: Build={_buildId}, BuildIdHash={buildIdHash.ToShortString()}");
-
- // DeleteAsync will unregister the content as well. No need for calling 'TrimBulkAsync'.
- await _contentStore.DeleteAsync(context, _buildIdHash.Value, new DeleteContentOptions() { DeleteLocalOnly = true })
- .IgnoreErrorsAndReturnCompletion();
- }
-
- await Inner.ShutdownAsync(context).ThrowIfFailure();
-
- _proactiveCopyGetBulkNagleQueue?.Dispose();
- Tracer.TraceStatisticsAtShutdown(context, counterSet, prefix: "DistributedContentSessionStats");
-
- return BoolResult.Success;
- }
-
- ///
- protected override void DisposeCore()
- {
- base.DisposeCore();
-
- Inner.Dispose();
- }
-
- ///
- protected override async Task PinCoreAsync(
- OperationContext operationContext,
- ContentHash contentHash,
- UrgencyHint urgencyHint,
- Counter retryCounter)
- {
- // Call bulk API
- var result = await PinHelperAsync(operationContext, new[] { contentHash }, urgencyHint, PinOperationConfiguration.Default());
- return (await result.First()).Item;
- }
-
- ///
- protected override async Task OpenStreamCoreAsync(
- OperationContext operationContext,
- ContentHash contentHash,
- UrgencyHint urgencyHint,
- Counter retryCounter)
- {
- OpenStreamResult streamResult =
- await Inner.OpenStreamAsync(operationContext, contentHash, operationContext.Token, urgencyHint);
- if (streamResult.Code == OpenStreamResult.ResultCode.Success)
- {
- return streamResult;
- }
-
- var contentRegistrationUrgencyHint = urgencyHint;
-
- long? size = null;
- GetBulkLocationsResult? localGetBulkResult = null;
-
- // First try to fetch file based on locally stored locations for the hash
- // Then fallback to fetching file based on global locations minus the locally stored locations which were already checked
- foreach (var getBulkTask in ContentLocationStore.MultiLevelGetLocations(operationContext, new[] { contentHash }, operationContext.Token, urgencyHint, subtractLocalResults: true))
- {
- var getBulkResult = await getBulkTask;
- // There is an issue with GetBulkLocationsResult construction from Exception that may loose the information about the origin.
- // So we rely on the result order of MultiLevelGetLocations method: the first result is always local and the second one is global.
-
- GetBulkOrigin origin = localGetBulkResult == null ? GetBulkOrigin.Local : GetBulkOrigin.Global;
- if (origin == GetBulkOrigin.Local)
- {
- localGetBulkResult = getBulkResult;
- }
-
- // Local function: Use content locations for GetBulk to copy file locally
- async Task tryCopyContentLocalAsync()
- {
- if (!getBulkResult || !getBulkResult.ContentHashesInfo.Any())
- {
- return new BoolResult($"Metadata records for hash {contentHash.ToShortString()} not found in content location store.");
- }
-
- // Don't reconsider locally stored results that were checked in prior iteration
- getBulkResult = getBulkResult.Subtract(localGetBulkResult);
-
- var hashInfo = getBulkResult.ContentHashesInfo.Single();
-
- if (!CanCopyContentHash(operationContext, hashInfo, isGlobal: getBulkResult.Origin == GetBulkOrigin.Global, out var useInRingLocations, out var errorMessage))
- {
- return new BoolResult(errorMessage);
- }
-
- // Using in-ring machines if configured
- var copyResult = await TryCopyAndPutAsync(operationContext, hashInfo, urgencyHint, CopyReason.OpenStream, trace: false, useInRingLocations);
- if (!copyResult)
- {
- return new BoolResult(copyResult);
- }
-
- size = copyResult.ContentSize;
-
- // If configured, register the content eagerly on put to make sure the content is discoverable by the other builders,
- // even if the current machine used to have the content and evicted it recently.
- if (Settings.RegisterEagerlyOnPut && !copyResult.ContentAlreadyExistsInCache)
- {
- contentRegistrationUrgencyHint = UrgencyHint.RegisterEagerly;
- }
-
- return BoolResult.Success;
- }
-
- var copyLocalResult = await tryCopyContentLocalAsync();
-
- // Throw operation canceled to avoid operations below which are not value for canceled case.
- operationContext.Token.ThrowIfCancellationRequested();
-
- if (copyLocalResult.Succeeded)
- {
- // Succeeded in copying content locally. No need to try with more content locations
- break;
- }
- else if (origin == GetBulkOrigin.Global)
- {
- return new OpenStreamResult(copyLocalResult, OpenStreamResult.ResultCode.ContentNotFound);
- }
- }
-
- Contract.Assert(size != null, "Size should be set if operation succeeded");
-
- var updateResult = await UpdateContentTrackerWithNewReplicaAsync(operationContext, new[] { new ContentHashWithSize(contentHash, size.Value) }, contentRegistrationUrgencyHint);
- if (!updateResult.Succeeded)
- {
- return new OpenStreamResult(updateResult);
- }
-
- return await Inner.OpenStreamAsync(operationContext, contentHash, operationContext.Token, urgencyHint);
- }
-
- ///
- Task>>> IConfigurablePin.PinAsync(Context context, IReadOnlyList contentHashes, PinOperationConfiguration pinOperationConfiguration)
- {
- // The lifetime of this operation should be detached from the lifetime of the session.
- // But still tracking the lifetime of the store.
- return WithStoreCancellationAsync(context,
- opContext => PinHelperAsync(opContext, contentHashes, pinOperationConfiguration.UrgencyHint, pinOperationConfiguration),
- pinOperationConfiguration.CancellationToken);
- }
-
- ///
- protected override Task>>> PinCoreAsync(OperationContext operationContext, IReadOnlyList contentHashes, UrgencyHint urgencyHint, Counter retryCounter, Counter fileCounter)
- {
- return PinHelperAsync(operationContext, contentHashes, urgencyHint, PinOperationConfiguration.Default());
- }
-
- private async Task>>> PinHelperAsync(OperationContext operationContext, IReadOnlyList contentHashes, UrgencyHint urgencyHint, PinOperationConfiguration pinOperationConfiguration)
- {
- Contract.Requires(contentHashes != null);
-
- IEnumerable>>? pinResults = null;
-
- IEnumerable>>? intermediateResult = null;
- if (pinOperationConfiguration.ReturnGlobalExistenceFast)
- {
- Tracer.Debug(operationContext.TracingContext, $"Detected {nameof(PinOperationConfiguration.ReturnGlobalExistenceFast)}");
-
- // Check globally for existence, but do not copy locally and do not update content tracker.
- pinResults = await Workflows.RunWithFallback(
- contentHashes,
- async hashes =>
- {
- intermediateResult = await Inner.PinAsync(operationContext, hashes, operationContext.Token, urgencyHint);
- return intermediateResult;
- },
- hashes => _remotePinner(operationContext, hashes, succeedWithOneLocation: true, urgencyHint),
- result => result.Succeeded);
-
- // Replace operation context with a new cancellation token so it can outlast this call.
- // Using a cancellation token from the store avoid the operations lifetime to be greater than the lifetime of the outer store.
- operationContext = new OperationContext(operationContext.TracingContext, token: StoreShutdownStartedCancellationToken);
- }
-
- // Default pin action
- var pinTask = Workflows.RunWithFallback(
- contentHashes,
- hashes => intermediateResult == null
- ? Inner.PinAsync(operationContext, hashes, operationContext.Token, urgencyHint)
- : Task.FromResult(intermediateResult),
- hashes => _remotePinner(operationContext, hashes, succeedWithOneLocation: false, urgencyHint),
- result => result.Succeeded,
- // Exclude the empty hash because it is a special case which is hard coded for place/openstream/pin.
- async hits => await UpdateContentTrackerWithLocalHitsAsync(
- operationContext,
- hits.Where(x => !contentHashes[x.Index].IsEmptyHash()).Select(
- x => new ContentHashWithSizeAndLastAccessTime(contentHashes[x.Index], x.Item.ContentSize, x.Item.LastAccessTime)).ToList(),
- urgencyHint));
-
- // Initiate a proactive copy if just pinned content is under-replicated
- if (Settings.ProactiveCopyOnPin && Settings.ProactiveCopyMode != ProactiveCopyMode.Disabled)
- {
- pinTask = ProactiveCopyOnPinAsync(operationContext, contentHashes, pinTask);
- }
-
- if (pinOperationConfiguration.ReturnGlobalExistenceFast)
- {
- // Fire off the default pin action, but do not await the result.
- //
- // Creating a new OperationContext instance without existing 'CancellationToken',
- // because the operation we triggered that stored in 'pinTask' can outlive the lifetime of the current instance.
- // And we don't want the PerformNonResultOperationAsync to fail because the current instance is shut down (or disposed).
- new OperationContext(operationContext.TracingContext).PerformNonResultOperationAsync(
- Tracer,
- () => pinTask,
- extraEndMessage: results =>
- {
- var resultString = string.Join(",", results.Select(async task =>
- {
- // Since all bulk operations are constructed with Task.FromResult, it is safe to just access the result;
- Indexed result = await task;
- return result != null ? $"{contentHashes[result.Index].ToShortString()}:{result.Item}" : string.Empty;
- }));
-
- return $"ConfigurablePin Count={contentHashes.Count}, Hashes=[{resultString}]";
- },
- traceErrorsOnly: TraceErrorsOnly,
- traceOperationStarted: TraceOperationStarted,
- traceOperationFinished: true,
- isCritical: false).FireAndForget(operationContext);
- }
- else
- {
- pinResults = await pinTask;
- }
-
- Contract.Assert(pinResults != null);
- return pinResults;
- }
-
- private async Task>>> ProactiveCopyOnPinAsync(OperationContext context, IReadOnlyList contentHashes, Task>>> pinTask)
- {
- var results = await pinTask;
-
- // Since the rest of the operation is done asynchronously, using a cancellation context from the outer store.
- var proactiveCopyTask = WithStoreCancellationAsync(
- context,
- async opContext =>
- {
- var proactiveTasks = results.Select(resultTask => proactiveCopyOnSinglePinAsync(opContext, resultTask)).ToList();
-
- // Ensure all tasks are completed, after awaiting outer task
- await Task.WhenAll(proactiveTasks);
-
- return proactiveTasks;
- });
-
- if (Settings.InlineOperationsForTests)
- {
- return await proactiveCopyTask;
- }
- else
- {
- proactiveCopyTask.FireAndForget(context);
- return results;
- }
-
- // Proactive copy an individual pin
- async Task> proactiveCopyOnSinglePinAsync(OperationContext opContext, Task> resultTask)
- {
- Indexed indexedPinResult = await resultTask;
- var pinResult = indexedPinResult.Item;
-
- // Local pins and distributed pins which are copied locally allow proactive copy
- if (pinResult.Succeeded && (!(pinResult is DistributedPinResult distributedPinResult) || distributedPinResult.CopyLocally))
- {
- var proactiveCopyResult = await ProactiveCopyIfNeededAsync(opContext, contentHashes[indexedPinResult.Index], tryBuildRing: true, CopyReason.ProactiveCopyOnPin);
-
- // Only fail if all copies failed.
- if (!proactiveCopyResult.Succeeded)
- {
- return new PinResult(proactiveCopyResult).WithIndex(indexedPinResult.Index);
- }
- }
-
- return indexedPinResult;
- }
- }
-
- ///
- protected override async Task PlaceFileCoreAsync(
- OperationContext operationContext,
- ContentHash contentHash,
- AbsolutePath path,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- UrgencyHint urgencyHint,
- Counter retryCounter)
- {
- var resultWithData = await PerformPlaceFileGatedOperationAsync(
- operationContext,
- () => PlaceHelperAsync(
- operationContext,
- new[] { new ContentHashWithPath(contentHash, path) },
- accessMode,
- replacementMode,
- realizationMode,
- urgencyHint
- ));
-
- var result = await resultWithData.Result.SingleAwaitIndexed();
- result.Metadata = resultWithData.Metadata;
- return result;
- }
-
- ///
- protected override async Task PlaceFileCoreAsync(
- OperationContext operationContext,
- IReadOnlyList hashesWithPaths,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- UrgencyHint urgencyHint,
- Counter retryCounter)
- {
- // The fallback is invoked for cache misses only. This preserves existing behavior of
- // bubbling up errors with Inner store instead of trying remote.
- var resultWithData = await PerformPlaceFileGatedOperationAsync(
- operationContext,
- () => PlaceHelperAsync(
- operationContext,
- hashesWithPaths,
- accessMode,
- replacementMode,
- realizationMode,
- urgencyHint
- ));
-
- // We are tracing here because we did not want to change the signature for PlaceFileCoreAsync, which is implemented in multiple locations
- operationContext.TracingContext.Debug(
- $"PlaceFileBulk, Gate.OccupiedCount={resultWithData.Metadata.GateOccupiedCount} Gate.Wait={resultWithData.Metadata.GateWaitTime.TotalMilliseconds}ms Hashes.Count={hashesWithPaths.Count}",
- component: nameof(ReadOnlyDistributedContentSession),
- operation: "PlaceFileBulk");
-
- return resultWithData.Result;
- }
-
- private Task PlaceHelperAsync(
- OperationContext operationContext,
- IReadOnlyList hashesWithPaths,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- UrgencyHint urgencyHint)
- {
- return Workflows.RunWithFallback(
- hashesWithPaths,
- args => Inner.PlaceFileAsync(
- operationContext,
- args,
- accessMode,
- replacementMode,
- realizationMode,
- operationContext.Token,
- urgencyHint),
- args => fetchFromMultiLevelContentLocationStoreThenPlaceFileAsync(args),
- result => IsPlaceFileSuccess(result),
- async hits => await UpdateContentTrackerWithLocalHitsAsync(
- operationContext,
- hits.Select(
- x => new ContentHashWithSizeAndLastAccessTime(
- hashesWithPaths[x.Index].Hash,
- x.Item.FileSize,
- x.Item.LastAccessTime))
- .ToList(),
- urgencyHint));
-
- Task fetchFromMultiLevelContentLocationStoreThenPlaceFileAsync(IReadOnlyList fetchedContentInfo)
- {
- return MultiLevelUtilities.RunMultiLevelAsync(
- fetchedContentInfo,
- runFirstLevelAsync: args => FetchFromMultiLevelContentLocationStoreThenPutAsync(operationContext, args, urgencyHint, CopyReason.Place),
- runSecondLevelAsync: args => innerPlaceAsync(args),
- // NOTE: We just use the first level result if the fetch using content location store fails because the place cannot succeed since the
- // content will not have been put into the local CAS
- useFirstLevelResult: result => !IsPlaceFileSuccess(result));
- }
-
- async Task innerPlaceAsync(IReadOnlyList input)
- {
- // When the content is obtained from remote we still have to place it from the local.
- // So in order to return the right source of the file placement
- // we have to place locally but change the source for each result.
- var results = await Inner.PlaceFileAsync(operationContext, input, accessMode, replacementMode, realizationMode, operationContext.Token, urgencyHint);
- var updatedResults = new List>();
- foreach (var resultTask in results)
- {
- var result = await resultTask;
- updatedResults.Add(
- result.Item
- .WithMaterializationSource(PlaceFileResult.Source.DatacenterCache)
- .WithIndex(result.Index));
- }
-
- return updatedResults.AsTasks();
- }
- }
-
- private Task> PerformPlaceFileGatedOperationAsync(OperationContext operationContext, Func> func)
- {
- return PutAndPlaceFileGate.GatedOperationAsync(async (timeWaiting, currentCount) =>
- {
- var gateOccupiedCount = Settings.MaximumConcurrentPutAndPlaceFileOperations - currentCount;
-
- var result = await func();
-
- return new ResultWithMetaData(
- new ResultMetaData(timeWaiting, gateOccupiedCount),
- result);
- }, operationContext.Token);
- }
-
- private static bool IsPlaceFileSuccess(PlaceFileResult result)
- {
- return result.Code != PlaceFileResult.ResultCode.Error && result.Code != PlaceFileResult.ResultCode.NotPlacedContentNotFound;
- }
-
- ///
- public IEnumerable EnumeratePinnedContentHashes()
- {
- return Inner is IHibernateContentSession session
- ? session.EnumeratePinnedContentHashes()
- : Enumerable.Empty();
- }
-
- ///
- public Task PinBulkAsync(Context context, IEnumerable contentHashes)
- {
- // TODO: Replace PinBulkAsync in hibernate with PinAsync bulk call (bug 1365340)
- return Inner is IHibernateContentSession session
- ? session.PinBulkAsync(context, contentHashes)
- : Task.FromResult(0);
- }
-
- ///
- public Task ShutdownEvictionAsync(Context context)
- {
- return Inner is IHibernateContentSession session
- ? session.ShutdownEvictionAsync(context)
- : BoolResult.SuccessTask;
- }
-
- private Task FetchFromMultiLevelContentLocationStoreThenPutAsync(
- OperationContext context,
- IReadOnlyList hashesWithPaths,
- UrgencyHint urgencyHint,
- CopyReason reason)
- {
- // First try to place file by fetching files based on locally stored locations for the hash
- // Then fallback to fetching file based on global locations minus the locally stored locations which were already checked
-
- var localGetBulkResult = new BuildXL.Utilities.AsyncOut();
-
- GetIndexedResults initialFunc = async args =>
- {
- var contentHashes = args.Select(p => p.Hash).ToList();
- localGetBulkResult.Value = await ContentLocationStore.GetBulkAsync(context, contentHashes, context.Token, urgencyHint, GetBulkOrigin.Local);
- return await FetchFromContentLocationStoreThenPutAsync(context, args, GetBulkOrigin.Local, urgencyHint, localGetBulkResult.Value, reason);
- };
-
- GetIndexedResults fallbackFunc = async args =>
- {
- var contentHashes = args.Select(p => p.Hash).ToList();
- var globalGetBulkResult = await ContentLocationStore.GetBulkAsync(context, contentHashes, context.Token, urgencyHint, GetBulkOrigin.Global);
- globalGetBulkResult = globalGetBulkResult.Subtract(localGetBulkResult.Value);
- return await FetchFromContentLocationStoreThenPutAsync(context, args, GetBulkOrigin.Global, urgencyHint, globalGetBulkResult, reason);
- };
-
- // If ColdStorage is ON try to place files from it before use remote locations
- if (_coldStorage != null)
- {
- return Workflows.RunWithFallback(
- hashesWithPaths,
- initialFunc: async args =>
- {
- return await _coldStorage.FetchThenPutBulkAsync(context, args, Inner);
- },
- fallbackFunc: initialFunc,
- secondFallbackFunc: fallbackFunc,
- thirdFallbackFunc: async args =>
- {
- var coldStorageGetBulkResult = _coldStorage.GetBulkLocations(context, args);
- return await FetchFromContentLocationStoreThenPutAsync(context, args, GetBulkOrigin.ColdStorage, urgencyHint, coldStorageGetBulkResult, reason);
- },
- isSuccessFunc: result => IsPlaceFileSuccess(result));
- }
-
- return Workflows.RunWithFallback(
- hashesWithPaths,
- initialFunc: initialFunc,
- fallbackFunc: fallbackFunc,
- isSuccessFunc: result => IsPlaceFileSuccess(result));
- }
-
- private async Task FetchFromContentLocationStoreThenPutAsync(
- OperationContext context,
- IReadOnlyList hashesWithPaths,
- GetBulkOrigin origin,
- UrgencyHint urgencyHint,
- GetBulkLocationsResult getBulkResult,
- CopyReason reason)
- {
- try
- {
- // Tracing the hashes here for the entire list, instead of tracing one hash at a time inside TryCopyAndPutAsync method.
-
- // This returns failure if any item in the batch wasn't copied locally
- // TODO: split results and call PlaceFile on successfully copied files (bug 1365340)
- if (!getBulkResult.Succeeded || !getBulkResult.ContentHashesInfo.Any())
- {
- return hashesWithPaths.Select(
- _ => new PlaceFileResult(
- getBulkResult,
- PlaceFileResult.ResultCode.NotPlacedContentNotFound,
- "Metadata records not found in content location store"))
- .AsIndexedTasks();
- }
-
- async Task> copyAndPut(ContentHashWithSizeAndLocations contentHashWithSizeAndLocations, int index)
- {
- PlaceFileResult result;
- try
- {
- if (!CanCopyContentHash(context, contentHashWithSizeAndLocations, isGlobal: origin == GetBulkOrigin.Global, out var useInRingMachineLocations, out var errorMessage))
- {
- result = PlaceFileResult.CreateContentNotFound(errorMessage);
- }
- else
- {
- var copyResult = await TryCopyAndPutAsync(
- context,
- contentHashWithSizeAndLocations,
- urgencyHint,
- reason,
- // We just traced all the hashes as a result of GetBulk call, no need to trace each individual hash.
- trace: false,
- // Using in-ring locations as well if the feature is on.
- useInRingMachineLocations: useInRingMachineLocations,
- outputPath: hashesWithPaths[index].Path);
-
- if (!copyResult)
- {
- // For ColdStorage we should treat all errors as cache misses
- result = origin != GetBulkOrigin.ColdStorage ? new PlaceFileResult(copyResult) : new PlaceFileResult(copyResult, PlaceFileResult.ResultCode.NotPlacedContentNotFound);
- }
- else
- {
- var source = origin == GetBulkOrigin.ColdStorage
- ? PlaceFileResult.Source.ColdStorage
- : PlaceFileResult.Source.DatacenterCache;
- result = PlaceFileResult.CreateSuccess(PlaceFileResult.ResultCode.PlacedWithMove, copyResult.ContentSize, source);
- }
- }
- }
- catch (Exception e)
- {
- // The transform block should not fail with an exception otherwise the block's state will be changed to failed state and the exception
- // won't be propagated to the caller.
- result = new PlaceFileResult(e);
- }
-
- return result.WithIndex(index);
- }
-
- var copyFilesLocallyActionQueue = new ActionQueue(Settings.ParallelCopyFilesLimit);
- var copyFilesLocally = await copyFilesLocallyActionQueue.SelectAsync(
- items: getBulkResult.ContentHashesInfo,
- body: (item, index) => copyAndPut(item, index));
-
- var updateResults = await UpdateContentTrackerWithNewReplicaAsync(
- context,
- copyFilesLocally.Where(r => r.Item.Succeeded).Select(r => new ContentHashWithSize(hashesWithPaths[r.Index].Hash, r.Item.FileSize)).ToList(),
- urgencyHint);
-
- if (!updateResults.Succeeded)
- {
- return copyFilesLocally.Select(result => new PlaceFileResult(updateResults).WithIndex(result.Index)).AsTasks();
- }
-
- return copyFilesLocally.AsTasks();
- }
- catch (Exception ex)
- {
- return hashesWithPaths.Select((hash, index) => new PlaceFileResult(ex).WithIndex(index)).AsTasks();
- }
- }
-
- private bool CanCopyContentHash(Context context, ContentHashWithSizeAndLocations result, bool isGlobal, out bool useInRingMachineLocations, [NotNullWhen(false)]out string? message)
- {
- useInRingMachineLocations = isGlobal && Settings.UseInRingMachinesForCopies;
- if (!isLocationsAvailable(out message))
- {
- if (useInRingMachineLocations && GetInRingActiveMachines().Length != 0)
- {
- string useInRingLocationsMessage = $", but {nameof(Settings.UseInRingMachinesForCopies)} is true. Trying to copy the content from in-ring machines.";
- Tracer.Debug(context, message + useInRingLocationsMessage);
- // Still trying copying the content from in-ring machines, even though the location information is lacking.
- return true;
- }
-
- // Tracing only for global locations because they come last.
- if (isGlobal)
- {
- Tracer.Warning(context, message);
- }
-
- return false;
- }
-
- return true;
-
- bool isLocationsAvailable([NotNullWhen(false)] out string? errorMessage)
- {
- errorMessage = null;
- // Null represents no replicas were ever registered, where as empty list implies content is missing from all replicas
- if (result.Locations == null)
- {
- errorMessage = $"No replicas registered for hash {result.ContentHash.ToShortString()}";
- return false;
- }
-
- if (!result.Locations.Any())
- {
- errorMessage = $"No replicas currently exist in content tracker for hash {result.ContentHash.ToShortString()}";
- return false;
- }
-
- return true;
- }
- }
-
- private async Task TryCopyAndPutAsync(OperationContext operationContext, ContentHashWithSizeAndLocations hashInfo, UrgencyHint urgencyHint, CopyReason reason, bool trace, bool useInRingMachineLocations = false, AbsolutePath? outputPath = null)
- {
- Context context = operationContext;
- CancellationToken cts = operationContext.Token;
-
- if (trace)
- {
- Tracer.Debug(operationContext, $"Copying {hashInfo.ContentHash.ToShortString()} with {hashInfo.Locations?.Count ?? 0 } locations");
- }
-
- var copyCompression = CopyCompression.None;
- var copyCompressionThreshold = Settings.GrpcCopyCompressionSizeThreshold ?? 0;
- if (copyCompressionThreshold > 0 && hashInfo.Size > copyCompressionThreshold)
- {
- copyCompression = Settings.GrpcCopyCompressionAlgorithm;
- }
-
- var copyRequest = new DistributedContentCopier.CopyRequest(
- _contentStore,
- hashInfo,
- reason,
- HandleCopyAsync: async args =>
- {
- (CopyFileResult copyFileResult, AbsolutePath tempLocation, _) = args;
-
- PutResult innerPutResult;
- long actualSize = copyFileResult.Size ?? hashInfo.Size;
- if (Settings.UseTrustedHash(actualSize) && Inner is ITrustedContentSession trustedInner)
- {
- // The file has already been hashed, so we can trust the hash of the file.
- innerPutResult = await trustedInner.PutTrustedFileAsync(
- context,
- new ContentHashWithSize(hashInfo.ContentHash, actualSize),
- tempLocation,
- FileRealizationMode.Move,
- cts,
- urgencyHint);
- }
- else
- {
- // Pass the HashType, not the Hash. This prompts a re-hash of the file, which places it where its actual hash requires.
- // If the actual hash differs from the expected hash, then we fail below and move to the next location.
- // Also, record the bytes if the file is small enough to be put into the ContentLocationStore.
- innerPutResult = await Inner.PutFileAsync(
- context,
- hashInfo.ContentHash.HashType,
- tempLocation,
- FileRealizationMode.Move,
- cts,
- urgencyHint);
- }
-
- return innerPutResult;
- },
- copyCompression,
- OverrideWorkingFolder: (Inner as ITrustedContentSession)?.TryGetWorkingDirectory(outputPath));
-
- if (useInRingMachineLocations)
- {
- copyRequest = copyRequest with { InRingMachines = GetInRingActiveMachines() };
- }
-
- var putResult = await DistributedCopier.TryCopyAndPutAsync(operationContext, copyRequest);
-
- return putResult;
- }
-
- ///
- /// Runs a given function in the cancellation context of the outer store.
- ///
- ///
- /// The lifetime of some session-based operation is longer than the session itself.
- /// For instance, proactive copies, or put blob can outlive the lifetime of the session.
- /// But these operations should not outlive the lifetime of the store, because when the store is closed
- /// most likely all the operations will fail with some weird errors like "ObjectDisposedException".
- ///
- /// This helper method allows running the operations that may outlive the lifetime of the session but should not outlive the lifetime of the store.
- ///
- protected Task WithStoreCancellationAsync(Context context, Func> func, CancellationToken token = default)
- {
- return ((StartupShutdownSlimBase)ContentLocationStore).WithOperationContext(context, token, func);
- }
-
- ///
- /// Returns a cancellation token that is triggered when the outer store shutdown is started.
- ///
- protected CancellationToken StoreShutdownStartedCancellationToken =>
- ((StartupShutdownSlimBase)ContentLocationStore).ShutdownStartedCancellationToken;
-
- private Task UpdateContentTrackerWithNewReplicaAsync(OperationContext context, IReadOnlyList contentHashes, UrgencyHint urgencyHint)
- {
- if (contentHashes.Count == 0)
- {
- return BoolResult.SuccessTask;
- }
-
- // TODO: Pass location store option (seems to only be used to prevent updating TTL when replicating for proactive replication) (bug 1365340)
- return ContentLocationStore.RegisterLocalLocationAsync(context, contentHashes, context.Token, urgencyHint);
- }
-
- private Task>>> PinFromMultiLevelContentLocationStore(
- OperationContext context,
- IReadOnlyList contentHashes,
- bool succeedWithOneLocation,
- UrgencyHint urgencyHint = UrgencyHint.Nominal)
- {
- // Pinning a content based on a number of location is inherently dangerous in case of eviction storms in a system.
- // The LLS data might be stale because of event processing delay and the global information can be inaccurate because
- // the trim events are not sent to the global store.
- // We can't do anything when the LLS data is stale, but in some stamps its beneficial not to rely on the global loctions.
- if (Settings.PinConfiguration.UseLocalLocationsOnlyOnUnverifiedPin)
- {
- return PinFromContentLocationStoreOriginAsync(
- context,
- contentHashes,
- GetBulkOrigin.Local,
- succeedWithOneLocation: succeedWithOneLocation,
- urgencyHint);
- }
-
- return Workflows.RunWithFallback(
- contentHashes,
- hashes => PinFromContentLocationStoreOriginAsync(context, hashes, GetBulkOrigin.Local, succeedWithOneLocation: succeedWithOneLocation, urgencyHint),
- hashes => PinFromContentLocationStoreOriginAsync(context, hashes, GetBulkOrigin.Global, succeedWithOneLocation: succeedWithOneLocation, urgencyHint),
- result => result.Succeeded);
- }
-
- // This method creates pages of hashes, makes one bulk call to the content location store to get content location record sets for all the hashes on the page,
- // and fires off processing of the returned content location record sets while proceeding to the next page of hashes in parallel.
- private async Task>>> PinFromContentLocationStoreOriginAsync(
- OperationContext operationContext, IReadOnlyList hashes, GetBulkOrigin origin, bool succeedWithOneLocation, UrgencyHint urgency = UrgencyHint.Nominal)
- {
- CancellationToken cancel = operationContext.Token;
- // Create an action block to process all the requested remote pins while limiting the number of simultaneously executed.
- var pinnings = new List(hashes.Count);
- var pinningAction = ActionBlockSlim.CreateWithAsyncAction(
- degreeOfParallelism: Settings.PinConfiguration?.MaxIOOperations ?? 1,
- async pinning => await PinRemoteAsync(
- operationContext,
- pinning,
- isLocal: origin == GetBulkOrigin.Local,
- updateContentTracker: false,
- succeedWithOneLocation: succeedWithOneLocation),
- cancellationToken: cancel);
-
- // Make a bulk call to content location store to get location records for all hashes on the page.
- // NOTE: We use GetBulkStackedAsync so that when Global results are retrieved we also include Local results to ensure we get a full view of available content
- GetBulkLocationsResult pageLookup = await ContentLocationStore.GetBulkStackedAsync(operationContext, hashes, cancel, urgency, origin);
-
- // If successful, fire off the remote pinning logic for each hash. If not, set all pins to failed.
- if (pageLookup.Succeeded)
- {
- foreach (ContentHashWithSizeAndLocations record in pageLookup.ContentHashesInfo)
- {
- RemotePinning pinning = new RemotePinning(record);
- pinnings.Add(pinning);
- await pinningAction.PostAsync(pinning, cancel);
- }
- }
- else
- {
- foreach (ContentHash hash in hashes)
- {
- Tracer.Warning(operationContext, $"Pin failed for hash {hash.ToShortString()}: directory query failed with error {pageLookup.ErrorMessage}");
- RemotePinning pinning = new RemotePinning(new ContentHashWithSizeAndLocations(hash, -1L))
- {
- Result = new PinResult(pageLookup)
- };
- pinnings.Add(pinning);
- }
- }
-
- Contract.Assert(pinnings.Count == hashes.Count);
-
- // Wait for all the pinning actions to complete.
- pinningAction.Complete();
-
- try
- {
- await pinningAction.Completion;
- }
- catch (TaskCanceledException)
- {
- // Cancellation token provided to an action block can be canceled.
- // Ignoring the exception in this case.
- }
-
- // Inform the content directory that we copied the files.
- // Looking for distributed pin results that were successful by copying the content locally.
- var localCopies = pinnings.Select((rp, index) => (result: rp, index)).Where(x => x.result.Result is DistributedPinResult dpr && dpr.CopyLocally).ToList();
-
- BoolResult updated = await UpdateContentTrackerWithNewReplicaAsync(operationContext, localCopies.Select(lc => new ContentHashWithSize(lc.result.Record.ContentHash, lc.result.Record.Size)).ToList(), UrgencyHint.Nominal);
- if (!updated.Succeeded)
- {
- // We failed to update the tracker. Need to update the results.
- string hashesAsString = string.Join(", ", localCopies.Select(lc => lc.result.Record.ContentHash.ToShortString()));
- Tracer.Warning(operationContext, $"Pin failed for hashes {hashesAsString}: local copy succeeded, but could not inform content directory due to {updated.ErrorMessage}.");
- foreach (var (_, index) in localCopies)
- {
- pinnings[index].Result = new PinResult(updated);
- }
- }
-
- // The return type should probably be just Task>, but higher callers require the Indexed wrapper and that the PinResults be encased in Tasks.
- return pinnings.Select(x => x.Result ?? createCanceledPutResult()).AsIndexed().AsTasks();
-
- static PinResult createCanceledPutResult() => new ErrorResult("The operation was canceled").AsResult();
- }
-
- // The dataflow framework can process only a single object, and returns no output from that processing. By combining the input and output of each remote pinning into a single object,
- // we can nonetheless use the dataflow framework to process pinnings and read the output from the updated objects afterward.
- private class RemotePinning
- {
- public ContentHashWithSizeAndLocations Record { get; }
-
- private PinResult? _result;
- public PinResult? Result
- {
- get => _result;
- set
- {
- value!.ContentSize = Record.Size;
- _result = value;
- }
- }
-
- public RemotePinning(ContentHashWithSizeAndLocations record)
- => Record = record;
- }
-
- // This method processes each remote pinning, setting the output when the operation is completed.
- private async Task PinRemoteAsync(
- OperationContext context,
- RemotePinning pinning,
- bool isLocal,
- bool updateContentTracker = true,
- bool succeedWithOneLocation = false)
- {
- pinning.Result = await PinRemoteAsync(context, pinning.Record, isLocal, updateContentTracker, succeedWithOneLocation: succeedWithOneLocation);
- }
-
- // This method processes a single content location record set for pinning.
- private async Task PinRemoteAsync(
- OperationContext operationContext,
- ContentHashWithSizeAndLocations remote,
- bool isLocal,
- bool updateContentTracker = true,
- bool succeedWithOneLocation = false)
- {
- IReadOnlyList? locations = remote.Locations;
-
- // If no remote locations are recorded, we definitely can't pin
- if (locations == null || locations.Count == 0)
- {
- if (!isLocal)
- {
- // Trace only when pin failed based on the data from the global store.
- Tracer.Warning(operationContext, $"Pin failed for hash {remote.ContentHash.ToShortString()}: no remote records.");
- }
-
- return DistributedPinResult.ContentNotFound(replicaCount: 0, "No locations found");
- }
-
- // When we only require the content to exist at least once anywhere, we can ignore pin thresholds
- // and return success after finding a single location.
- if (succeedWithOneLocation && locations.Count >= 1)
- {
- return DistributedPinResult.EnoughReplicas(locations.Count, "Global succeeds");
- }
-
- if (locations.Count >= Settings.PinConfiguration.PinMinUnverifiedCount)
- {
- SessionCounters[Counters.PinUnverifiedCountSatisfied].Increment();
-
- // Tracing extra data if the locations were merged to separate the local locations and the global locations that we added to the final result.
- string? extraMessage = null;
- if (!isLocal)
- {
- // Extra locations does make sense only for the global case when the entries were merged.
- extraMessage = $"ExtraGlobal: {remote.ExtraMergedLocations}";
- }
-
- var result = DistributedPinResult.EnoughReplicas(locations.Count, extraMessage);
-
- // Triggering an async copy if the number of replicas are close to a PinMinUnverifiedCount threshold.
- int threshold = Settings.PinConfiguration.PinMinUnverifiedCount +
- Settings.PinConfiguration.AsyncCopyOnPinThreshold;
- if (locations.Count < threshold)
- {
- Tracer.Info(operationContext, $"Starting asynchronous copy of the content for hash {remote.ContentHash.ToShortString()} because the number of locations '{locations.Count}' is less then a threshold of '{threshold}'.");
- SessionCounters[Counters.StartCopyForPinWhenUnverifiedCountSatisfied].Increment();
-
- // For "synchronous" pins the tracker is updated at once for all the hashes for performance reasons,
- // but for asynchronous copy we always need to update the tracker with a new location.
- var task = WithStoreCancellationAsync(
- operationContext.TracingContext,
- opContext => TryCopyAndPutAndUpdateContentTrackerAsync(opContext, remote, updateContentTracker: true, CopyReason.AsyncCopyOnPin));
- if (Settings.InlineOperationsForTests)
- {
- (await task).TraceIfFailure(operationContext);
- }
- else
- {
- task.FireAndForget(operationContext.TracingContext, traceErrorResult: true, operation: "AsynchronousCopyOnPin");
- }
-
- // Note: Pin result traces with CpA (copied asynchronously) code is to provide the information that the content is being copied asynchronously, and that replica count is enough but not above async copy threshold.
- // This trace result does not represent that of the async copy since that is done FireAndForget.
- result = DistributedPinResult.AsynchronousCopy(locations.Count);
- }
-
- return result;
- }
-
- if (isLocal)
- {
- // Don't copy content locally based on locally cached result. So stop here and return content not found.
- // This method will be called again with global locations at which time we will attempt to copy the files locally.
- // When allowing global locations to succeed a put, report success.
- return DistributedPinResult.ContentNotFound(locations.Count);
- }
-
- // Previous checks were not sufficient, so copy the file locally.
- PutResult copy = await TryCopyAndPutAsync(operationContext, remote, UrgencyHint.Nominal, CopyReason.Pin, trace: false);
- if (copy)
- {
- if (!updateContentTracker)
- {
- return DistributedPinResult.SynchronousCopy(locations.Count);
- }
-
- // Inform the content directory that we have the file.
- // We wait for this to complete, rather than doing it fire-and-forget, because another machine in the ring may need the pinned content immediately.
- BoolResult updated = await UpdateContentTrackerWithNewReplicaAsync(operationContext, new[] { new ContentHashWithSize(remote.ContentHash, copy.ContentSize) }, UrgencyHint.Nominal);
- if (updated.Succeeded)
- {
- return DistributedPinResult.SynchronousCopy(locations.Count);
- }
- else
- {
- // Tracing the error separately.
- Tracer.Warning(operationContext, $"Pin failed for hash {remote.ContentHash.ToShortString()}: local copy succeeded, but could not inform content directory due to {updated.ErrorMessage}.");
- return new DistributedPinResult(locations.Count, updated);
- }
- }
- else
- {
- // Tracing the error separately.
- Tracer.Warning(operationContext, $"Pin failed for hash {remote.ContentHash.ToShortString()}: local copy failed with {copy}.");
- return DistributedPinResult.ContentNotFound(locations.Count);
- }
- }
-
- private async Task TryCopyAndPutAndUpdateContentTrackerAsync(
- OperationContext operationContext,
- ContentHashWithSizeAndLocations remote,
- bool updateContentTracker,
- CopyReason reason)
- {
- PutResult copy = await TryCopyAndPutAsync(operationContext, remote, UrgencyHint.Nominal, reason, trace: true);
- if (copy && updateContentTracker)
- {
- return await UpdateContentTrackerWithNewReplicaAsync(operationContext, new[] { new ContentHashWithSize(remote.ContentHash, copy.ContentSize) }, UrgencyHint.Nominal);
- }
-
- return copy;
- }
-
- private Task UpdateContentTrackerWithLocalHitsAsync(OperationContext context, IReadOnlyList contentHashesWithInfo, UrgencyHint urgencyHint)
- {
- if (Disposed)
- {
- // Nothing to do.
- return BoolTask.True;
- }
-
- if (contentHashesWithInfo.Count == 0)
- {
- // Nothing to do.
- return BoolTask.True;
- }
-
- IReadOnlyList hashesToEagerUpdate = contentHashesWithInfo.Select(x => new ContentHashWithSize(x.Hash, x.Size)).ToList();
-
- // Wait for update to complete on remaining hashes to cover case where the record has expired and another machine in the ring requests it immediately after this pin succeeds.
- return UpdateContentTrackerWithNewReplicaAsync(context, hashesToEagerUpdate, urgencyHint);
- }
-
- internal async Task> GetLocationsForProactiveCopyAsync(
- OperationContext context,
- IReadOnlyList hashes)
- {
- var originalLength = hashes.Count;
- if (_buildIdHash.HasValue && !_buildRingMachinesCache.IsUpToDate())
- {
- Tracer.Debug(context, $"{Tracer.Name}.{nameof(GetLocationsForProactiveCopyAsync)}: getting in-ring machines for BuildId='{_buildId}'.");
- // Add build id hash to hashes so build ring machines can be updated
- hashes = hashes.AppendItem(_buildIdHash.Value).ToList();
- }
-
- var result = await MultiLevelUtilities.RunMultiLevelWithMergeAsync(
- hashes,
- inputs => ContentLocationStore.GetBulkAsync(context, inputs, context.Token, UrgencyHint.Nominal, GetBulkOrigin.Local).ThrowIfFailureAsync(g => g.ContentHashesInfo),
- inputs => ContentLocationStore.GetBulkAsync(context, inputs, context.Token, UrgencyHint.Nominal, GetBulkOrigin.Global).ThrowIfFailureAsync(g => g.ContentHashesInfo),
- mergeResults: ContentHashWithSizeAndLocations.Merge,
- useFirstLevelResult: result =>
- {
- if (result.Locations?.Count >= Settings.ProactiveCopyLocationsThreshold)
- {
- SessionCounters[Counters.GetLocationsSatisfiedFromLocal].Increment();
- return true;
- }
- else
- {
- SessionCounters[Counters.GetLocationsSatisfiedFromRemote].Increment();
- return false;
- }
- });
-
- if (hashes.Count != originalLength)
- {
- // Update build ring machines with retrieved locations
- var buildRingMachines = result.Last().Locations?.AppendItem(LocalCacheRootMachineLocation).ToArray() ?? CollectionUtilities.EmptyArray();
- _buildRingMachinesCache.Update(buildRingMachines);
- Tracer.Debug(context, $"{Tracer.Name}.{nameof(GetLocationsForProactiveCopyAsync)}: InRingMachines=[{string.Join(", ", buildRingMachines.Select(m => m.Path))}] BuildId='{_buildId}'");
- return result.Take(originalLength).ToList();
- }
- else
- {
- return result;
- }
- }
-
- internal async Task ProactiveCopyIfNeededAsync(
- OperationContext context,
- ContentHash hash,
- bool tryBuildRing,
- CopyReason reason)
- {
- var nagleQueue = _proactiveCopyGetBulkNagleQueue;
- if (nagleQueue is null)
- {
- return new ProactiveCopyResult(new ErrorResult("StartupAsync was not called"));
- }
-
- ContentHashWithSizeAndLocations result = await nagleQueue.EnqueueAsync(hash);
- return await ProactiveCopyIfNeededAsync(context, result, tryBuildRing, reason);
- }
-
- internal Task ProactiveCopyIfNeededAsync(
- OperationContext context,
- ContentHashWithSizeAndLocations info,
- bool tryBuildRing,
- CopyReason reason)
- {
- var hash = info.ContentHash;
- if (!_pendingProactivePuts.Add(hash)
- || info.ContentHash.IsEmptyHash()) // No reason to push an empty hash to another machine.
- {
- return Task.FromResult(ProactiveCopyResult.CopyNotRequiredResult);
- }
-
- // Don't trace this case since it would add too much log traffic.
- var replicatedLocations = (info.Locations ?? CollectionUtilities.EmptyArray()).ToList();
-
- if (replicatedLocations.Count >= Settings.ProactiveCopyLocationsThreshold)
- {
- SessionCounters[Counters.ProactiveCopiesSkipped].Increment();
- return Task.FromResult(ProactiveCopyResult.CopyNotRequiredResult);
- }
-
- // By adding the master to replicatedLocations, it will be excluded from proactive replication
- var masterLocation = _contentStore.LocalLocationStore?.MasterElectionMechanism.Master;
- if (masterLocation is not null && masterLocation.Value.IsValid)
- {
- replicatedLocations.Add(masterLocation.Value);
- }
-
- return context.PerformOperationAsync(
- Tracer,
- operation: async () =>
- {
- try
- {
- var outsideRingCopyTask = ProactiveCopyOutsideBuildRingWithRetryAsync(context, info, replicatedLocations, reason);
- var insideRingCopyTask = ProactiveCopyInsideBuildRingWithRetryAsync(context, info, tryBuildRing, replicatedLocations, reason);
- await Task.WhenAll(outsideRingCopyTask, insideRingCopyTask);
-
- var (insideRingRetries, insideRingResult) = await insideRingCopyTask;
- var (outsideRingRetries, outsideRingResult) = await outsideRingCopyTask;
-
- int totalRetries = insideRingRetries + outsideRingRetries;
- return new ProactiveCopyResult(insideRingResult, outsideRingResult, totalRetries, info.Entry);
- }
- finally
- {
- _pendingProactivePuts.Remove(hash);
- }
- },
- extraEndMessage: r => $"Hash={info.ContentHash}, Retries={r.TotalRetries}, Reason=[{reason}]");
- }
-
- private async Task<(int retries, ProactivePushResult outsideRingCopyResult)> ProactiveCopyOutsideBuildRingWithRetryAsync(
- OperationContext context,
- ContentHashWithSize hash,
- IReadOnlyList replicatedLocations,
- CopyReason reason)
- {
- var outsideRingCopyResult = await ProactiveCopyOutsideBuildRingAsync(context, hash, replicatedLocations, reason, retries: 0);
- int retries = 0;
- while (outsideRingCopyResult.QualifiesForRetry && retries < Settings.ProactiveCopyMaxRetries)
- {
- SessionCounters[Counters.ProactiveCopyRetries].Increment();
- SessionCounters[Counters.ProactiveCopyOutsideRingRetries].Increment();
- retries++;
- outsideRingCopyResult = await ProactiveCopyOutsideBuildRingAsync(context, hash, replicatedLocations, reason, retries);
- }
- return (retries, outsideRingCopyResult);
- }
-
- private async Task ProactiveCopyOutsideBuildRingAsync(
- OperationContext context,
- ContentHashWithSize hash,
- IReadOnlyList replicatedLocations,
- CopyReason reason,
- int retries)
- {
- // The first attempt is not considered as retry
- int attempt = retries + 1;
- if ((Settings.ProactiveCopyMode & ProactiveCopyMode.OutsideRing) == 0)
- {
- return ProactivePushResult.FromPushFileResult(PushFileResult.Disabled(), attempt);
- }
-
- Result? getLocationResult = null;
- var source = ProactiveCopyLocationSource.Random;
-
- // Make sure that the machine is not in the build ring and does not already have the content.
- var machinesToSkip = replicatedLocations.Concat(BuildRingMachines).ToArray();
-
- // Try to select one of the designated machines for this hash.
- if (Settings.ProactiveCopyUsePreferredLocations)
- {
- var designatedLocationsResult = ContentLocationStore.GetDesignatedLocations(hash);
- if (designatedLocationsResult.Succeeded)
- {
- // A machine in the build may be a designated location for the hash,
- // but we won't pushing to the same machine twice, because 'replicatedLocations' argument
- // has a local machine that we're about to push for inside the ring copy.
- // We also want to skip inside ring machines for outsidecopy task
- var candidates = designatedLocationsResult.Value
- .Except(machinesToSkip).ToArray();
-
- if (candidates.Length > 0)
- {
- getLocationResult = candidates[ThreadSafeRandom.Generator.Next(0, candidates.Length)];
- source = ProactiveCopyLocationSource.DesignatedLocation;
- SessionCounters[Counters.ProactiveCopy_OutsideRingFromPreferredLocations].Increment();
- }
- }
- }
-
- // Try to select one machine at random.
- if (getLocationResult?.Succeeded != true)
- {
- getLocationResult = ContentLocationStore.GetRandomMachineLocation(except: machinesToSkip);
- source = ProactiveCopyLocationSource.Random;
- }
-
- if (!getLocationResult.Succeeded)
- {
- return ProactivePushResult.FromPushFileResult(new PushFileResult(getLocationResult), attempt);
- }
- var candidate = getLocationResult.Value;
- SessionCounters[Counters.ProactiveCopy_OutsideRingCopies].Increment();
- PushFileResult pushFileResult = await PushContentAsync(context, hash, candidate, isInsideRing: false, reason, source, attempt);
- return ProactivePushResult.FromPushFileResult(pushFileResult, attempt);
- }
-
- ///
- /// Gets all the active in-ring machines (excluding the current one).
- ///
- public MachineLocation[] GetInRingActiveMachines()
- {
- return BuildRingMachines
- .Where(m => !m.Equals(LocalCacheRootMachineLocation))
- .Where(m => ContentLocationStore.IsMachineActive(m))
- .ToArray();
- }
-
- private async Task<(int retries, ProactivePushResult insideRingCopyResult)> ProactiveCopyInsideBuildRingWithRetryAsync(
- OperationContext context,
- ContentHashWithSize hash,
- bool tryBuildRing,
- IReadOnlyList replicatedLocations,
- CopyReason reason)
- {
- ProactivePushResult insideRingCopyResult = await ProactiveCopyInsideBuildRing(context, hash, tryBuildRing, replicatedLocations, reason, retries: 0);
- int retries = 0;
- while (insideRingCopyResult.QualifiesForRetry && retries < Settings.ProactiveCopyMaxRetries)
- {
- SessionCounters[Counters.ProactiveCopyRetries].Increment();
- SessionCounters[Counters.ProactiveCopyInsideRingRetries].Increment();
- retries++;
- insideRingCopyResult = await ProactiveCopyInsideBuildRing(context, hash, tryBuildRing, replicatedLocations, reason, retries);
- }
- return (retries, insideRingCopyResult);
- }
-
- private async Task ProactiveCopyInsideBuildRing(
- OperationContext context,
- ContentHashWithSize hash,
- bool tryBuildRing,
- IReadOnlyList replicatedLocations,
- CopyReason reason,
- int retries)
- {
- // The first attempt is not considered as retry
- int attempt = retries + 1;
-
- // Get random machine inside build ring
- if (!tryBuildRing || (Settings.ProactiveCopyMode & ProactiveCopyMode.InsideRing) == 0)
- {
- return ProactivePushResult.FromPushFileResult(PushFileResult.Disabled(), attempt);
- }
-
- if (_buildIdHash == null)
- {
- return ProactivePushResult.FromStatus(ProactivePushStatus.BuildIdNotSpecified, attempt);
- }
-
- // Having an explicit case to check if the in-ring machine list is empty to separate the case
- // when the machine list is not empty but all the candidates are unavailable.
- if (BuildRingMachines.Length == 0)
- {
- return ProactivePushResult.FromStatus(ProactivePushStatus.InRingMachineListIsEmpty, attempt);
- }
-
- var candidates = GetInRingActiveMachines();
-
- if (candidates.Length == 0)
- {
- return ProactivePushResult.FromStatus(ProactivePushStatus.MachineNotFound, attempt);
- }
-
- candidates = candidates.Except(replicatedLocations).ToArray();
- if (candidates.Length == 0)
- {
- SessionCounters[Counters.ProactiveCopy_InsideRingFullyReplicated].Increment();
- return ProactivePushResult.FromStatus(ProactivePushStatus.MachineAlreadyHasCopy, attempt);
- }
-
- SessionCounters[Counters.ProactiveCopy_InsideRingCopies].Increment();
- var candidate = candidates[ThreadSafeRandom.Generator.Next(0, candidates.Length)];
- PushFileResult pushFileResult = await PushContentAsync(context, hash, candidate, isInsideRing: true, reason, ProactiveCopyLocationSource.Random, attempt);
- return ProactivePushResult.FromPushFileResult(pushFileResult, attempt);
- }
-
- private async Task PushContentAsync(
- OperationContext context,
- ContentHashWithSize hash,
- MachineLocation target,
- bool isInsideRing,
- CopyReason reason,
- ProactiveCopyLocationSource source,
- int attempt)
- {
- // This is here to avoid hanging ProactiveCopyIfNeededAsync on inside/outside ring copies before starting
- // the other one.
- await Task.Yield();
-
- if (Settings.PushProactiveCopies)
- {
- // It is possible that this method is used during proactive replication
- // and the hash was already evicted at the time this method is called.
- var streamResult = await Inner.OpenStreamAsync(context, hash, context.Token);
- if (!streamResult.Succeeded)
- {
- return PushFileResult.SkipContentUnavailable();
- }
-
- using var stream = streamResult.Stream!;
-
- return await DistributedCopier.PushFileAsync(
- context,
- hash,
- target,
- stream,
- isInsideRing,
- reason,
- source,
- attempt);
- }
- else
- {
- var requestResult = await DistributedCopier.RequestCopyFileAsync(context, hash, target, isInsideRing, attempt);
- if (requestResult)
- {
- return PushFileResult.PushSucceeded(size: null);
- }
-
- return new PushFileResult(requestResult, "Failed requesting a copy");
- }
- }
-
- ///
- protected override CounterSet GetCounters() =>
- base.GetCounters()
- .Merge(DistributedCopier.GetCounters())
- .Merge(SessionCounters.ToCounterSet());
- }
-}
diff --git a/Public/Src/Cache/ContentStore/Distributed/Sessions/RemotePinAsync.cs b/Public/Src/Cache/ContentStore/Distributed/Sessions/RemotePinAsync.cs
index aa69a72f8..9cb285452 100644
--- a/Public/Src/Cache/ContentStore/Distributed/Sessions/RemotePinAsync.cs
+++ b/Public/Src/Cache/ContentStore/Distributed/Sessions/RemotePinAsync.cs
@@ -12,7 +12,7 @@ using BuildXL.Cache.ContentStore.Tracing.Internal;
namespace BuildXL.Cache.ContentStore.Distributed.Sessions
{
///
- /// Performs remote pinning for
+ /// Performs remote pinning for
///
public delegate Task>>> RemotePinAsync(
OperationContext context,
diff --git a/Public/Src/Cache/ContentStore/Distributed/Stores/DistributedContentStore.cs b/Public/Src/Cache/ContentStore/Distributed/Stores/DistributedContentStore.cs
index d0a9bc09f..551f01ade 100644
--- a/Public/Src/Cache/ContentStore/Distributed/Stores/DistributedContentStore.cs
+++ b/Public/Src/Cache/ContentStore/Distributed/Stores/DistributedContentStore.cs
@@ -99,8 +99,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
private readonly DistributedContentCopier _distributedCopier;
private readonly DisposableDirectory _copierWorkingDirectory;
- private Lazy>>? _proactiveCopySession;
- internal Lazy>> ProactiveCopySession => NotNull(_proactiveCopySession, nameof(_proactiveCopySession));
+ private Lazy>>? _proactiveCopySession;
+ internal Lazy>> ProactiveCopySession => NotNull(_proactiveCopySession, nameof(_proactiveCopySession));
private readonly DistributedContentSettings? _distributedContentSettings;
@@ -226,7 +226,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
return $"Origin=[{info.Origin}] IsPresentInGcs=[{isPresentInGcs}]";
}
- private Task> CreateCopySession(Context context)
+ private Task> CreateCopySession(Context context)
{
var sessionId = Guid.NewGuid().ToString();
@@ -235,11 +235,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
async () =>
{
// NOTE: We use ImplicitPin.None so that the OpenStream calls triggered by RequestCopy will only pull the content, NOT pin it in the local store.
- var sessionResult = CreateReadOnlySession(operationContext, $"{sessionId}-DefaultCopy", ImplicitPin.None).ThrowIfFailure();
+ var sessionResult = CreateSession(operationContext, $"{sessionId}-DefaultCopy", ImplicitPin.None).ThrowIfFailure();
var session = sessionResult.Session!;
await session.StartupAsync(context).ThrowIfFailure();
- return Result.Success((ReadOnlyDistributedContentSession)session);
+ return Result.Success((DistributedContentSession)session);
});
}
@@ -250,7 +250,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
var startupTask = base.StartupAsync(context);
- _proactiveCopySession = new Lazy>>(() => CreateCopySession(context));
+ _proactiveCopySession = new Lazy>>(() => CreateCopySession(context));
if (_settings.SetPostInitializationCompletionAfterStartup)
{
@@ -324,7 +324,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
private Task ProactiveReplicationIterationAsync(
OperationContext context,
- ReadOnlyDistributedContentSession proactiveCopySession,
+ DistributedContentSession proactiveCopySession,
ILocalContentStore localContentStore,
TransitioningContentLocationStore contentLocationStore)
{
@@ -476,31 +476,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
return BoolResult.Success;
}
- ///
- public CreateSessionResult CreateReadOnlySession(Context context, string name, ImplicitPin implicitPin)
- {
- return CreateReadOnlySessionCall.Run(_tracer, OperationContext(context), name, () =>
- {
- CreateSessionResult innerSessionResult = InnerContentStore.CreateSession(context, name, implicitPin);
-
- if (innerSessionResult.Succeeded)
- {
- var session = new ReadOnlyDistributedContentSession(
- name,
- innerSessionResult.Session,
- ContentLocationStore,
- _distributedCopier,
- this,
- LocalMachineLocation,
- ColdStorage,
- settings: _settings);
- return new CreateSessionResult(session);
- }
-
- return new CreateSessionResult(innerSessionResult, "Could not initialize inner content session with error");
- });
- }
-
///
public CreateSessionResult CreateSession(Context context, string name, ImplicitPin implicitPin)
{
@@ -513,7 +488,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
var session = new DistributedContentSession(
name,
innerSessionResult.Session,
- _contentLocationStore,
+ ContentLocationStore,
_distributedCopier,
this,
LocalMachineLocation,
diff --git a/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/LocalLocationStoreDistributedContentTests.cs b/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/LocalLocationStoreDistributedContentTests.cs
index 1f92cfe5e..07c8e4622 100644
--- a/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/LocalLocationStoreDistributedContentTests.cs
+++ b/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/LocalLocationStoreDistributedContentTests.cs
@@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
diff --git a/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/ProactiveCopyTests.cs b/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/ProactiveCopyTests.cs
index da7706a65..94a991f0e 100644
--- a/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/ProactiveCopyTests.cs
+++ b/Public/Src/Cache/ContentStore/DistributedTest/ContentLocation/ProactiveCopyTests.cs
@@ -20,7 +20,7 @@ using BuildXL.Cache.ContentStore.Service.Grpc;
using FluentAssertions;
using Xunit;
-using static BuildXL.Cache.ContentStore.Distributed.Sessions.ReadOnlyDistributedContentSession.Counters;
+using static BuildXL.Cache.ContentStore.Distributed.Sessions.DistributedContentSession.Counters;
namespace ContentStoreTest.Distributed.Sessions
{
diff --git a/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentSessionTests.cs b/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentSessionTests.cs
index d81c9ba00..1537e1ba6 100644
--- a/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentSessionTests.cs
+++ b/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentSessionTests.cs
@@ -75,7 +75,7 @@ namespace ContentStoreTest.Distributed.Sessions
}, implicitPin);
}
- protected override Task RunReadOnlyTestAsync(ImplicitPin implicitPin, Func funcAsync)
+ protected override Task RunReadOnlyTestAsync(ImplicitPin implicitPin, Func funcAsync)
{
return RunTestAsync(implicitPin, null, (ctx, session) => funcAsync(ctx, session));
}
diff --git a/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentTests.cs b/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentTests.cs
index 76755bbec..95804eaf0 100644
--- a/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentTests.cs
+++ b/Public/Src/Cache/ContentStore/DistributedTest/Sessions/DistributedContentTests.cs
@@ -1009,7 +1009,7 @@ namespace ContentStoreTest.Distributed.Sessions
}
protected async Task OpenStreamReturnsExpectedFile(
- IReadOnlyContentSession session, Context context, ContentHash hash, byte[] expected)
+ IContentSession session, Context context, ContentHash hash, byte[] expected)
{
OpenStreamResult openResult = await session.OpenStreamAsync(context, hash, Token);
_tracer.Debug(context, $"Validating stream for content hash {hash} returned result {openResult.Code} with diagnostics {openResult} with ErrorMessage {openResult.ErrorMessage} diagnostics {openResult.Diagnostics}");
diff --git a/Public/Src/Cache/ContentStore/DistributedTest/Stores/AzureBlobStorageContentSessionTests.cs b/Public/Src/Cache/ContentStore/DistributedTest/Stores/AzureBlobStorageContentSessionTests.cs
index 038fc8c88..644179523 100644
--- a/Public/Src/Cache/ContentStore/DistributedTest/Stores/AzureBlobStorageContentSessionTests.cs
+++ b/Public/Src/Cache/ContentStore/DistributedTest/Stores/AzureBlobStorageContentSessionTests.cs
@@ -52,7 +52,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test
throw new NotImplementedException();
}
- protected override async Task RunReadOnlyTestAsync(ImplicitPin implicitPin, Func funcAsync)
+ protected override async Task RunReadOnlyTestAsync(ImplicitPin implicitPin, Func funcAsync)
{
using var directory = new DisposableDirectory(FileSystem);
diff --git a/Public/Src/Cache/ContentStore/GrpcTest/TestHangingContentStore.cs b/Public/Src/Cache/ContentStore/GrpcTest/TestHangingContentStore.cs
index 180eda036..fe43a0fa9 100644
--- a/Public/Src/Cache/ContentStore/GrpcTest/TestHangingContentStore.cs
+++ b/Public/Src/Cache/ContentStore/GrpcTest/TestHangingContentStore.cs
@@ -50,12 +50,6 @@ namespace ContentStoreTest.Grpc
return Task.FromResult(BoolResult.Success);
}
- [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
- public CreateSessionResult CreateReadOnlySession(Context context, string name, ImplicitPin implicitPin)
- {
- return new CreateSessionResult(new TestHangingContentSession(name, _useCancellationToken, _hangHasStartedSemaphore));
- }
-
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public CreateSessionResult CreateSession(Context context, string name, ImplicitPin implicitPin)
{
diff --git a/Public/Src/Cache/ContentStore/GrpcTest/TestTempFileDeletingContentStore.cs b/Public/Src/Cache/ContentStore/GrpcTest/TestTempFileDeletingContentStore.cs
index 388af544f..e92584aaf 100644
--- a/Public/Src/Cache/ContentStore/GrpcTest/TestTempFileDeletingContentStore.cs
+++ b/Public/Src/Cache/ContentStore/GrpcTest/TestTempFileDeletingContentStore.cs
@@ -48,13 +48,6 @@ namespace ContentStoreTest.Grpc
return Task.FromResult(BoolResult.Success);
}
- [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
- public CreateSessionResult CreateReadOnlySession(Context context, string name, ImplicitPin implicitPin)
- {
- return new CreateSessionResult(new TestTempFileDeletingContentSession(name, _fileSystem));
- }
-
- [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public CreateSessionResult CreateSession(Context context, string name, ImplicitPin implicitPin)
{
return new CreateSessionResult(new TestTempFileDeletingContentSession(name, _fileSystem));
diff --git a/Public/Src/Cache/ContentStore/Interfaces/Sessions/IContentSession.cs b/Public/Src/Cache/ContentStore/Interfaces/Sessions/IContentSession.cs
index 5c69900f3..3d57d137c 100644
--- a/Public/Src/Cache/ContentStore/Interfaces/Sessions/IContentSession.cs
+++ b/Public/Src/Cache/ContentStore/Interfaces/Sessions/IContentSession.cs
@@ -1,12 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
+using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
+using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
// ReSharper disable UnusedParameter.Global
@@ -15,8 +17,60 @@ namespace BuildXL.Cache.ContentStore.Interfaces.Sessions
///
/// A related set of accesses to a content store.
///
- public interface IContentSession : IReadOnlyContentSession
+ public interface IContentSession : IName, IStartupShutdown, IConfigurablePin
{
+ ///
+ /// Ensure content does not get deleted.
+ ///
+ Task PinAsync(
+ Context context,
+ ContentHash contentHash,
+ CancellationToken cts,
+ UrgencyHint urgencyHint = UrgencyHint.Nominal);
+
+ ///
+ /// Open a stream to content.
+ ///
+ Task OpenStreamAsync(
+ Context context,
+ ContentHash contentHash,
+ CancellationToken cts,
+ UrgencyHint urgencyHint = UrgencyHint.Nominal);
+
+ ///
+ /// Materialize content to the filesystem.
+ ///
+ Task PlaceFileAsync(
+ Context context,
+ ContentHash contentHash,
+ AbsolutePath path,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ CancellationToken cts,
+ UrgencyHint urgencyHint = UrgencyHint.Nominal);
+
+ ///
+ /// Ensure content does not get deleted in bulk.
+ ///
+ Task>>> PinAsync(
+ Context context,
+ IReadOnlyList contentHashes,
+ CancellationToken cts,
+ UrgencyHint urgencyHint = UrgencyHint.Nominal);
+
+ ///
+ /// Materialize content to the filesystem in bulk.
+ ///
+ Task>>> PlaceFileAsync(
+ Context context,
+ IReadOnlyList hashesWithPaths,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ CancellationToken cts,
+ UrgencyHint urgencyHint = UrgencyHint.Nominal);
+
///
/// Add content from a file.
///
diff --git a/Public/Src/Cache/ContentStore/Interfaces/Sessions/IReadOnlyContentSession.cs b/Public/Src/Cache/ContentStore/Interfaces/Sessions/IReadOnlyContentSession.cs
deleted file mode 100644
index 49369a64d..000000000
--- a/Public/Src/Cache/ContentStore/Interfaces/Sessions/IReadOnlyContentSession.cs
+++ /dev/null
@@ -1,72 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-using BuildXL.Cache.ContentStore.Hashing;
-using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
-using BuildXL.Cache.ContentStore.Interfaces.Results;
-using BuildXL.Cache.ContentStore.Interfaces.Stores;
-using BuildXL.Cache.ContentStore.Interfaces.Tracing;
-
-namespace BuildXL.Cache.ContentStore.Interfaces.Sessions
-{
- ///
- /// A related set of read accesses to a content store.
- ///
- public interface IReadOnlyContentSession : IName, IStartupShutdown, IConfigurablePin
- {
- ///
- /// Ensure content does not get deleted.
- ///
- Task PinAsync(
- Context context,
- ContentHash contentHash,
- CancellationToken cts,
- UrgencyHint urgencyHint = UrgencyHint.Nominal);
-
- ///
- /// Open a stream to content.
- ///
- Task OpenStreamAsync(
- Context context,
- ContentHash contentHash,
- CancellationToken cts,
- UrgencyHint urgencyHint = UrgencyHint.Nominal);
-
- ///
- /// Materialize content to the filesystem.
- ///
- Task PlaceFileAsync(
- Context context,
- ContentHash contentHash,
- AbsolutePath path,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- CancellationToken cts,
- UrgencyHint urgencyHint = UrgencyHint.Nominal);
-
- ///
- /// Ensure content does not get deleted in bulk.
- ///
- Task>>> PinAsync(
- Context context,
- IReadOnlyList contentHashes,
- CancellationToken cts,
- UrgencyHint urgencyHint = UrgencyHint.Nominal);
-
- ///
- /// Materialize content to the filesystem in bulk.
- ///
- Task>>> PlaceFileAsync(
- Context context,
- IReadOnlyList hashesWithPaths,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- CancellationToken cts,
- UrgencyHint urgencyHint = UrgencyHint.Nominal);
- }
-}
diff --git a/Public/Src/Cache/ContentStore/Interfaces/Stores/IContentStore.cs b/Public/Src/Cache/ContentStore/Interfaces/Stores/IContentStore.cs
index c71f46213..11fd6a512 100644
--- a/Public/Src/Cache/ContentStore/Interfaces/Stores/IContentStore.cs
+++ b/Public/Src/Cache/ContentStore/Interfaces/Stores/IContentStore.cs
@@ -26,11 +26,6 @@ namespace BuildXL.Cache.ContentStore.Interfaces.Stores
///
public interface IContentStore : IStartupShutdown
{
- ///
- /// Create a new session that can only read.
- ///
- CreateSessionResult CreateReadOnlySession(Context context, string name, ImplicitPin implicitPin);
-
///
/// Create a new session that can add content as well as read.
///
diff --git a/Public/Src/Cache/ContentStore/InterfacesTest/Sessions/ContentSessionTestsBase.cs b/Public/Src/Cache/ContentStore/InterfacesTest/Sessions/ContentSessionTestsBase.cs
index 86ef5e1ed..c26f9698a 100644
--- a/Public/Src/Cache/ContentStore/InterfacesTest/Sessions/ContentSessionTestsBase.cs
+++ b/Public/Src/Cache/ContentStore/InterfacesTest/Sessions/ContentSessionTestsBase.cs
@@ -63,7 +63,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Sessions
protected abstract IContentStore CreateStore(DisposableDirectory testDirectory, ContentStoreConfiguration configuration);
- protected virtual async Task RunReadOnlyTestAsync(ImplicitPin implicitPin, Func funcAsync)
+ protected virtual async Task RunReadOnlyTestAsync(ImplicitPin implicitPin, Func funcAsync)
{
var context = new Context(Logger);
using (var directory = new DisposableDirectory(FileSystem))
@@ -76,7 +76,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Sessions
{
await store.StartupAsync(context).ShouldBeSuccess();
- var createResult = store.CreateReadOnlySession(context, Name, implicitPin).ShouldBeSuccess();
+ var createResult = store.CreateSession(context, Name, implicitPin).ShouldBeSuccess();
using (var session = createResult.Session)
{
try
diff --git a/Public/Src/Cache/ContentStore/Library/Sessions/FileSystemContentSession.cs b/Public/Src/Cache/ContentStore/Library/Sessions/FileSystemContentSession.cs
index 4bde19218..7e458906c 100644
--- a/Public/Src/Cache/ContentStore/Library/Sessions/FileSystemContentSession.cs
+++ b/Public/Src/Cache/ContentStore/Library/Sessions/FileSystemContentSession.cs
@@ -2,10 +2,13 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Generic;
using System.IO;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
+using BuildXL.Cache.ContentStore.Interfaces.Extensions;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
@@ -13,22 +16,165 @@ using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Sessions.Internal;
using BuildXL.Cache.ContentStore.Stores;
+using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Utilities.Tracing;
namespace BuildXL.Cache.ContentStore.Sessions
{
- ///
- /// An implemented over a
- ///
- public class FileSystemContentSession : ReadOnlyFileSystemContentSession, ITrustedContentSession
+ public class FileSystemContentSession : ContentSessionBase, ITrustedContentSession, IHibernateContentSession
{
///
- /// Initializes a new instance of the class.
+ /// The internal content store backing the session.
///
- public FileSystemContentSession(string name, ImplicitPin implicitPin, FileSystemContentStoreInternal store)
- : base(name, store, implicitPin)
+ protected readonly FileSystemContentStoreInternal Store;
+
+ private readonly PinContext _pinContext;
+ private readonly ImplicitPin _implicitPin;
+
+ ///
+ protected override Tracer Tracer { get; } = new Tracer(nameof(FileSystemContentSession));
+
+ ///
+ protected override bool TraceErrorsOnly => true; // This type adds nothing in terms of tracing. So configure it to trace errors only.
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public FileSystemContentSession(string name, FileSystemContentStoreInternal store, ImplicitPin implicitPin)
+ : base(name)
{
+ Store = store;
+ _pinContext = Store.CreatePinContext();
+ _implicitPin = implicitPin;
+ }
+
+ ///
+ protected override async Task ShutdownCoreAsync(OperationContext operationContext)
+ {
+ await _pinContext.DisposeAsync();
+
+ var statsResult = await Store.GetStatsAsync(operationContext);
+ if (statsResult.Succeeded)
+ {
+ Tracer.TraceStatisticsAtShutdown(operationContext, statsResult.CounterSet, prefix: "FileSystemContentSessionStats");
+ }
+
+ return BoolResult.Success;
+ }
+
+ ///
+ protected override void DisposeCore()
+ {
+ base.DisposeCore();
+ _pinContext.Dispose();
+ }
+
+ ///
+ protected override Task PinCoreAsync(
+ OperationContext operationContext,
+ ContentHash contentHash,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ return Store.PinAsync(operationContext, contentHash, _pinContext);
+ }
+
+ ///
+ protected override Task OpenStreamCoreAsync(
+ OperationContext operationContext,
+ ContentHash contentHash,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ return Store.OpenStreamAsync(operationContext, contentHash, MakePinRequest(ImplicitPin.Get));
+ }
+
+ ///
+ protected override Task PlaceFileCoreAsync(
+ OperationContext operationContext,
+ ContentHash contentHash,
+ AbsolutePath path,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ return Store.PlaceFileAsync(
+ operationContext,
+ contentHash,
+ path,
+ accessMode,
+ replacementMode,
+ realizationMode,
+ MakePinRequest(ImplicitPin.Put));
+ }
+
+ ///
+ protected override async Task>>> PinCoreAsync(
+ OperationContext operationContext,
+ IReadOnlyList contentHashes,
+ UrgencyHint urgencyHint,
+ Counter retryCounter,
+ Counter fileCounter)
+ {
+ return EnumerableExtensions.AsTasks>(
+ (await Store.PinAsync(operationContext, contentHashes, _pinContext, options: null)));
+ }
+
+ ///
+ protected override Task>>> PlaceFileCoreAsync(
+ OperationContext operationContext,
+ IReadOnlyList hashesWithPaths,
+ FileAccessMode accessMode,
+ FileReplacementMode replacementMode,
+ FileRealizationMode realizationMode,
+ UrgencyHint urgencyHint,
+ Counter retryCounter)
+ {
+ return Store.PlaceFileAsync(
+ operationContext,
+ hashesWithPaths,
+ accessMode,
+ replacementMode,
+ realizationMode,
+ MakePinRequest(ImplicitPin.Get));
+ }
+
+ ///
+ public IEnumerable EnumeratePinnedContentHashes()
+ {
+ return _pinContext.GetContentHashes();
+ }
+
+ ///
+ async Task IHibernateContentSession.PinBulkAsync(Context context, IEnumerable contentHashes)
+ {
+ var contentHashList = contentHashes as List ?? contentHashes.ToList();
+ // Passing 'RePinFromHibernation' to use more optimal pinning logic.
+ var results = Enumerable.ToList>(
+ (await Store.PinAsync(context, contentHashList, _pinContext, new PinBulkOptions() { RePinFromHibernation = true })));
+
+ var failed = results.Where(r => !r.Item.Succeeded);
+ foreach (var result in failed)
+ {
+ Tracer.Warning(context, $"Failed to pin contentHash=[{contentHashList[result.Index]}]");
+ }
+ }
+
+ ///
+ Task IHibernateContentSession.ShutdownEvictionAsync(Context context)
+ {
+ return Store.ShutdownEvictionAsync(context);
+ }
+
+ ///
+ /// Build a PinRequest based on whether auto-pin configuration matches request.
+ ///
+ protected PinRequest? MakePinRequest(ImplicitPin implicitPin)
+ {
+ return (implicitPin & _implicitPin) != ImplicitPin.None ? new PinRequest(_pinContext) : (PinRequest?)null;
}
///
diff --git a/Public/Src/Cache/ContentStore/Library/Sessions/ReadOnlyFileSystemContentSession.cs b/Public/Src/Cache/ContentStore/Library/Sessions/ReadOnlyFileSystemContentSession.cs
deleted file mode 100644
index 6d7b99c9a..000000000
--- a/Public/Src/Cache/ContentStore/Library/Sessions/ReadOnlyFileSystemContentSession.cs
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using BuildXL.Cache.ContentStore.Hashing;
-using BuildXL.Cache.ContentStore.Interfaces.Extensions;
-using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
-using BuildXL.Cache.ContentStore.Interfaces.Logging;
-using BuildXL.Cache.ContentStore.Interfaces.Results;
-using BuildXL.Cache.ContentStore.Interfaces.Sessions;
-using BuildXL.Cache.ContentStore.Interfaces.Stores;
-using BuildXL.Cache.ContentStore.Interfaces.Tracing;
-using BuildXL.Cache.ContentStore.Stores;
-using BuildXL.Cache.ContentStore.Tracing;
-using BuildXL.Cache.ContentStore.Tracing.Internal;
-using BuildXL.Utilities.Tracing;
-
-namespace BuildXL.Cache.ContentStore.Sessions
-{
- ///
- /// An implemented over a
- ///
- public class ReadOnlyFileSystemContentSession : ContentSessionBase, IHibernateContentSession
- {
- ///
- /// The internal content store backing the session.
- ///
- protected readonly FileSystemContentStoreInternal Store;
-
- private readonly PinContext _pinContext;
- private readonly ImplicitPin _implicitPin;
-
- ///
- protected override Tracer Tracer { get; } = new Tracer(nameof(FileSystemContentSession));
-
- ///
- protected override bool TraceErrorsOnly => true; // This type adds nothing in terms of tracing. So configure it to trace errors only.
-
- ///
- /// Initializes a new instance of the class.
- ///
- public ReadOnlyFileSystemContentSession(string name, FileSystemContentStoreInternal store, ImplicitPin implicitPin)
- : base(name)
- {
- Store = store;
- _pinContext = Store.CreatePinContext();
- _implicitPin = implicitPin;
- }
-
- ///
- protected override async Task ShutdownCoreAsync(OperationContext operationContext)
- {
- await _pinContext.DisposeAsync();
-
- var statsResult = await Store.GetStatsAsync(operationContext);
- if (statsResult.Succeeded)
- {
- Tracer.TraceStatisticsAtShutdown(operationContext, statsResult.CounterSet, prefix: "FileSystemContentSessionStats");
- }
-
- return BoolResult.Success;
- }
-
- ///
- protected override void DisposeCore()
- {
- base.DisposeCore();
- _pinContext.Dispose();
- }
-
- ///
- protected override Task PinCoreAsync(OperationContext operationContext, ContentHash contentHash, UrgencyHint urgencyHint, Counter retryCounter)
- {
- return Store.PinAsync(operationContext, contentHash, _pinContext);
- }
-
- ///
- protected override Task OpenStreamCoreAsync(
- OperationContext operationContext, ContentHash contentHash, UrgencyHint urgencyHint, Counter retryCounter)
- {
- return Store.OpenStreamAsync(operationContext, contentHash, MakePinRequest(ImplicitPin.Get));
- }
-
- ///
- protected override Task PlaceFileCoreAsync(
- OperationContext operationContext,
- ContentHash contentHash,
- AbsolutePath path,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- UrgencyHint urgencyHint,
- Counter retryCounter)
- {
- return Store.PlaceFileAsync(operationContext, contentHash, path, accessMode, replacementMode, realizationMode, MakePinRequest(ImplicitPin.Put));
- }
-
- ///
- protected override async Task>>> PinCoreAsync(
- OperationContext operationContext,
- IReadOnlyList contentHashes,
- UrgencyHint urgencyHint,
- Counter retryCounter,
- Counter fileCounter)
- {
- return (await Store.PinAsync(operationContext, contentHashes, _pinContext, options: null)).AsTasks();
- }
-
- ///
- protected override Task>>> PlaceFileCoreAsync(
- OperationContext operationContext,
- IReadOnlyList hashesWithPaths,
- FileAccessMode accessMode,
- FileReplacementMode replacementMode,
- FileRealizationMode realizationMode,
- UrgencyHint urgencyHint,
- Counter retryCounter)
- {
- return Store.PlaceFileAsync(operationContext, hashesWithPaths, accessMode, replacementMode, realizationMode, MakePinRequest(ImplicitPin.Get));
- }
-
- ///
- public IEnumerable EnumeratePinnedContentHashes()
- {
- return _pinContext.GetContentHashes();
- }
-
- ///
- async Task IHibernateContentSession.PinBulkAsync(Context context, IEnumerable contentHashes)
- {
- var contentHashList = contentHashes as List