Merged PR 666481: Add sst file creation to BlobContentLocationRegistry.

Add sst file creation to BlobContentLocationRegistry.
This commit is contained in:
Lance Collins 2022-06-20 20:17:55 +00:00
Родитель ccf7741680
Коммит 241e999120
51 изменённых файлов: 2044 добавлений и 509 удалений

Просмотреть файл

@ -10,6 +10,7 @@ using System.Net;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Utilities;
using BuildXL.Cache.ContentStore.Interfaces.Results;
@ -50,11 +51,18 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
return new BlobName(fileName, IsRelative: true);
}
public static implicit operator BlobName?(string? fileName)
{
return fileName != null
? new BlobName(fileName, IsRelative: true)
: default(BlobName?);
}
public static BlobName CreateAbsolute(string name) => new BlobName(name, false);
public override string ToString()
{
return Name;
return ToDisplayName();
}
public string ToDisplayName()
@ -147,26 +155,38 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
timeout: _configuration.StorageInteractionTimeout);
}
public BlobWrapper GetBlob(CancellationToken token, BlobName fileName)
{
var blob = GetBlockBlobReference(fileName);
return WrapBlob(token, fileName, blob);
}
internal BlobWrapper WrapBlob(CancellationToken token, BlobName fileName, CloudBlockBlob blob)
{
return new BlobWrapper(this, blob, fileName, token, DefaultBlobStorageRequestOptions);
}
public Task<T> UseBlockBlobAsync<T>(
OperationContext context,
BlobName fileName,
Func<OperationContext, BlobWrapper, Task<T>> useAsync,
[CallerMemberName] string? caller = null,
Func<T, string>? endMessageSuffix = null,
TimeSpan? timeout = null)
TimeSpan? timeout = null,
bool isCritical = false)
where T : ResultBase
{
return context.PerformOperationWithTimeoutAsync(
Tracer,
context =>
{
var blob = GetBlockBlobReference(fileName);
var wrapperBlob = new BlobWrapper(blob, fileName, context.Token, DefaultBlobStorageRequestOptions);
var wrapperBlob = GetBlob(context.Token, fileName);
return useAsync(context, wrapperBlob);
},
extraEndMessage: r => $"FileName=[{GetDisplayPath(fileName)}]{endMessageSuffix?.Invoke(r)}",
traceOperationStarted: false,
caller: caller,
isCritical: isCritical,
timeout: timeout ?? _configuration.StorageInteractionTimeout);
}
@ -496,6 +516,18 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
}
}
private CloudBlobDirectory GetDirectoryReference(BlobName fileName)
{
if (fileName.IsRelative)
{
return Directory.GetDirectoryReference(fileName.Name);
}
else
{
return _container.GetDirectoryReference(fileName.Name);
}
}
public Task<BoolResult> TouchAsync(OperationContext context, BlobName fileName)
{
return context.PerformOperationWithTimeoutAsync(Tracer, async context =>
@ -519,13 +551,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Lists blobs in folder
/// </summary>
public IAsyncEnumerable<BlobName> ListBlobsAsync(
public IAsyncEnumerable<BlobName> ListBlobNamesAsync(
OperationContext context,
Regex? regex = null,
string? subDirectoryPath = null,
int? maxResults = null)
{
return ListBlobsCoreAsync(context, regex, subDirectoryPath, maxResults: maxResults).Select(blob => BlobName.CreateAbsolute(blob.Name));
return ListBlobsAsync(context, regex, subDirectoryPath, maxResults: maxResults).Select(blob => BlobName.CreateAbsolute(blob.Name));
}
/// <summary>
@ -537,11 +569,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
Regex? regex = null,
string? subDirectoryPath = null)
{
var blobs = await ListBlobsCoreAsync(
var blobs = await ListBlobsAsync(
context,
regex,
subDirectoryPath,
getMetadata: true,
blobListingDetails: BlobListingDetails.Metadata,
maxResults: maxResults).ToListAsync();
blobs.Sort(LruCompareBlobs);
@ -578,39 +610,46 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Lists blobs in folder
/// </summary>
private async IAsyncEnumerable<CloudBlob> ListBlobsCoreAsync(
internal async IAsyncEnumerable<CloudBlob> ListBlobsAsync(
OperationContext context,
Regex? regex = null,
string? subDirectoryPath = null,
bool getMetadata = false,
int? maxResults = null)
BlobName? prefix = null,
BlobListingDetails blobListingDetails = BlobListingDetails.None,
int? maxResults = null,
bool listingSingleBlobSnapshots = false)
{
BlobContinuationToken? continuation = null;
var directory = Directory;
if (prefix != null)
{
directory = GetDirectoryReference(prefix.Value);
}
var delimiter = directory.ServiceClient.DefaultDelimiter;
var listingPrefix = listingSingleBlobSnapshots && directory.Prefix.EndsWith(delimiter)
? directory.Prefix.Substring(0, directory.Prefix.Length - delimiter.Length)
: directory.Prefix;
while (!context.Token.IsCancellationRequested)
{
var directory = Directory;
if (subDirectoryPath != null)
{
directory = directory.GetDirectoryReference(subDirectoryPath);
}
var blobs = await context.PerformOperationWithTimeoutAsync(
Tracer,
async context =>
{
var result = await directory.ListBlobsSegmentedAsync(
var result = await _container.ListBlobsSegmentedAsync(
prefix: listingPrefix,
useFlatBlobListing: true,
blobListingDetails: getMetadata ? BlobListingDetails.Metadata : BlobListingDetails.None,
blobListingDetails: blobListingDetails,
maxResults: maxResults,
currentToken: continuation,
options: null,
options: DefaultBlobStorageRequestOptions,
operationContext: null,
cancellationToken: context.Token);
return Result.Success(result);
},
timeout: _configuration.StorageInteractionTimeout,
extraEndMessage: r => $"ItemCount={r.GetValueOrDefault()?.Results.Count()}").ThrowIfFailureAsync();
extraEndMessage: r => $"Prefix={directory.Prefix} ItemCount={r.GetValueOrDefault()?.Results.Count()}").ThrowIfFailureAsync();
continuation = blobs.ContinuationToken;

Просмотреть файл

@ -5,12 +5,14 @@ using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using OperationContext = BuildXL.Cache.ContentStore.Tracing.Internal.OperationContext;
#nullable enable
@ -19,8 +21,16 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Wrapper which passes common arguments to <see cref="CloudBlockBlob"/> APIs
/// </summary>
public record struct BlobWrapper(CloudBlockBlob Blob, BlobName Name, CancellationToken Token, BlobRequestOptions Options, Microsoft.WindowsAzure.Storage.OperationContext? Context = null)
public record BlobWrapper(
BlobFolderStorage Storage,
CloudBlockBlob Blob,
BlobName Name,
CancellationToken Token,
BlobRequestOptions Options,
Microsoft.WindowsAzure.Storage.OperationContext? Context = null)
{
public AccessCondition? DefaultAccessCondition { get; set; }
public T? GetMetadataOrDefault<T>(Func<string, T> parse, T? defaultValue = default, [CallerMemberName] string key = null!)
{
T parseCore(string value)
@ -35,7 +45,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
}
}
return Blob?.Metadata.TryGetValue(key, out var value) == true && !string.IsNullOrEmpty(value)
return Blob.Metadata.TryGetValue(key, out var value) == true && !string.IsNullOrEmpty(value)
? parseCore(value)
: defaultValue;
}
@ -44,7 +54,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
if (value == null)
{
Blob?.Metadata.Remove(key);
Blob.Metadata.Remove(key);
}
else if (Blob != null)
{
@ -52,19 +62,24 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
}
}
private AccessCondition? Apply(AccessCondition? condition)
{
return condition ?? DefaultAccessCondition;
}
internal Task<string> AcquireLeaseAsync(TimeSpan leaseTime, AccessCondition? accessCondition = null)
{
return Blob.AcquireLeaseAsync(leaseTime, proposedLeaseId: null, accessCondition, Options, Context, Token);
return Blob.AcquireLeaseAsync(leaseTime, proposedLeaseId: null, Apply(accessCondition), Options, Context, Token);
}
internal Task<IEnumerable<ListBlockItem>> DownloadBlockListAsync(BlockListingFilter filter, AccessCondition? accessCondition = null)
{
return Blob.DownloadBlockListAsync(filter, accessCondition, Options, Context, Token);
return Blob.DownloadBlockListAsync(filter, Apply(accessCondition), Options, Context, Token);
}
internal Task DownloadRangeToStreamAsync(Stream stream, long? offset, long? length, AccessCondition? accessCondition = null)
{
return Blob.DownloadRangeToStreamAsync(stream, offset, length, accessCondition, Options, Context, Token);
return Blob.DownloadRangeToStreamAsync(stream, offset, length, Apply(accessCondition), Options, Context, Token);
}
internal Task<bool> ExistsAsync()
@ -74,29 +89,56 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
internal Task FetchAttributesAsync(AccessCondition? accessCondition = null)
{
return Blob.FetchAttributesAsync(accessCondition, Options, Context, Token);
return Blob.FetchAttributesAsync(Apply(accessCondition), Options, Context, Token);
}
internal Task PutBlockAsync(string blockId, Stream stream, AccessCondition? accessCondition = null, ContentHash? md5Hash = null)
{
Contract.Requires(md5Hash == null || md5Hash.Value.HashType == HashType.MD5);
var contentMD5 = md5Hash == null ? null : Convert.ToBase64String(md5Hash.Value.ToHashByteArray());
return Blob.PutBlockAsync(blockId, stream, contentMD5, accessCondition, Options, Context, Token);
return Blob.PutBlockAsync(blockId, stream, contentMD5, Apply(accessCondition), Options, Context, Token);
}
internal Task PutBlockListAsync(IEnumerable<string> blockList, AccessCondition? accessCondition = null)
{
return Blob.PutBlockListAsync(blockList, accessCondition, Options, Context, Token);
return Blob.PutBlockListAsync(blockList, Apply(accessCondition), Options, Context, Token);
}
internal Task ReleaseLeaseAsync(AccessCondition accessCondition)
internal Task ReleaseLeaseAsync(AccessCondition leaseCondition)
{
return Blob.ReleaseLeaseAsync(accessCondition, Options, Context, Token);
return Blob.ReleaseLeaseAsync(leaseCondition, Options, Context, Token);
}
internal Task UploadFromByteArrayAsync(ArraySegment<byte> buffer, AccessCondition? accessCondition = null)
{
return Blob.UploadFromByteArrayAsync(buffer.Array, buffer.Offset, buffer.Count, accessCondition, Options, Context, Token);
return Blob.UploadFromByteArrayAsync(buffer.Array, buffer.Offset, buffer.Count, Apply(accessCondition), Options, Context, Token);
}
internal async Task<List<T>> ListSnapshotsAsync<T>(
OperationContext context,
Func<BlobWrapper, T> transformBlob)
{
return await Storage.ListBlobsAsync(
context,
regex: null,
prefix: Name,
listingSingleBlobSnapshots: true,
blobListingDetails: BlobListingDetails.Metadata | BlobListingDetails.Snapshots)
.Where(blob => blob.IsSnapshot)
.OfType<CloudBlockBlob>()
.Select(blob => transformBlob(Storage.WrapBlob(context.Token, Name with { SnapshotTime = blob.SnapshotTime }, blob)))
.ToListAsync();
}
internal Task<bool> DeleteIfExistsAsync(DeleteSnapshotsOption option = DeleteSnapshotsOption.None, AccessCondition? accessCondition = null, CancellationToken? token = default)
{
return Blob.DeleteIfExistsAsync(option, Apply(accessCondition), Options, Context, token ?? Token);
}
internal async Task<BlobWrapper> SnapshotAsync(AccessCondition? accessCondition = null)
{
var result = await Blob.SnapshotAsync(metadata: null, Apply(accessCondition), Options, Context, Token);
return Storage.GetBlob(Token, Name with { SnapshotTime = result.SnapshotTime });
}
}
}

Просмотреть файл

@ -1,7 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
using BuildXL.Cache.ContentStore.Tracing.Internal;
@ -12,6 +14,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
/// </summary>
public interface IRoleObserver : IStartupShutdownSlim
{
public Task OnRoleUpdatedAsync(OperationContext context, Role role);
Task OnRoleUpdatedAsync(OperationContext context, MasterElectionState electionState);
}
}

Просмотреть файл

@ -97,7 +97,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
if (result.Succeeded)
{
_lastGetRoleTime = _clock.UtcNow;
OnRoleUpdated(context, result.Value.Role);
OnRoleUpdated(context, result.Value);
}
return result;
@ -108,17 +108,17 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
var result = await _inner.ReleaseRoleIfNecessaryAsync(context);
if (result.Succeeded)
{
OnRoleUpdated(context, result.Value);
OnRoleUpdated(context, MasterElectionState.DefaultWorker with { Role = result.Value });
}
return result;
}
private void OnRoleUpdated(OperationContext context, Role role)
private void OnRoleUpdated(OperationContext context, MasterElectionState electionState)
{
if (_observer != null)
{
_observer.OnRoleUpdatedAsync(context, role).FireAndForget(context);
_observer.OnRoleUpdatedAsync(context, electionState).FireAndForget(context);
}
}
}

Просмотреть файл

@ -129,6 +129,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
private ActionQueue _concurrencyLimitingQueue;
private readonly IClock _clock;
private readonly BlobContentLocationRegistry _registry;
protected override Tracer Tracer { get; } = new Tracer(nameof(ResilientGlobalCacheService));
internal ContentMetadataEventStream EventStream => _eventStream;
@ -198,7 +200,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
CheckpointManager checkpointManager,
RocksDbContentMetadataStore store,
ContentMetadataEventStream eventStream,
IClock clock = null)
IClock clock = null,
BlobContentLocationRegistry registry = null)
: base(store)
{
_configuration = configuration;
@ -206,9 +209,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
_checkpointManager = checkpointManager;
_eventStream = eventStream;
_clock = clock ?? SystemClock.Instance;
_registry = registry;
LinkLifetime(_eventStream);
LinkLifetime(_checkpointManager);
LinkLifetime(registry);
RunInBackground(nameof(CreateCheckpointLoopAsync), CreateCheckpointLoopAsync, fireAndForget: true);
@ -240,6 +245,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
{
if (!ShouldRetry(out var retryReason, out var errorMessage, isShutdown: true))
{
// Stop database updates
_registry?.SetDatabaseUpdateLeaseExpiry(null);
// Stop logging
_eventStream.SetIsLogging(false);
@ -254,8 +262,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
return BoolResult.Success;
}
public async Task OnRoleUpdatedAsync(OperationContext context, Role role)
public async Task OnRoleUpdatedAsync(OperationContext context, MasterElectionState electionState)
{
var role = electionState.Role;
if (!StartupCompleted || ShutdownStarted)
{
return;
@ -267,11 +276,20 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
_lastSuccessfulHeartbeat = _clock.UtcNow;
if (_role != role)
{
// Stop database updates
_registry?.SetDatabaseUpdateLeaseExpiry(null);
_eventStream.SetIsLogging(false);
_hasRestoredCheckpoint = false;
_role = role;
}
if (!ShouldRetry(out _, out _))
{
// Notify registry that master lease is still held to ensure database is updated.
_registry?.SetDatabaseUpdateLeaseExpiry(electionState.MasterLeaseExpiryUtc);
}
if (_role == Role.Master)
{
// Acquire mutex to ensure we cancel the actual outstanding background restore
@ -296,6 +314,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
{
_hasRestoredCheckpoint = true;
_eventStream.SetIsLogging(true);
// Resume database updates
_registry?.SetDatabaseUpdateLeaseExpiry(electionState.MasterLeaseExpiryUtc);
}
}
finally

Просмотреть файл

@ -9,6 +9,7 @@ using System.Diagnostics.SymbolStore;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Threading;
@ -250,6 +251,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
return result;
}
private MergeOperator MergeContentMergeOperator { get; } = MergeOperators.CreateAssociative(
"MergeContent",
merge: RocksDbOperations.MergeLocations,
transformSingle: RocksDbOperations.ProcessSingleLocationEntry);
private bool IsStoredEpochInvalid([NotNullWhen(true)] out string? epoch)
{
TryGetGlobalEntry(nameof(GlobalKeys.StoredEpoch), out epoch);
@ -263,10 +269,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
yield return
(
Columns.MergeContent,
MergeOperators.CreateAssociative(
"MergeContent",
merge: RocksDbOperations.MergeLocations,
transformSingle: RocksDbOperations.ProcessSingleLocationEntry)
MergeContentMergeOperator
);
yield return
(
Columns.SstMergeContent,
MergeContentMergeOperator
);
}
@ -690,6 +699,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
return ColumnNames[(int)columnFamily][(int)resolvedGroup];
}
internal RocksDbStore UnsafeGetStore()
{
return _keyValueStore.Use(store => store).ToResult().Value!;
}
/// <summary>
/// Ingests sst files from the given paths for the <see cref="Columns.SstMergeContent"/> column
/// </summary>
@ -697,7 +711,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
{
return _keyValueStore.Use(store => store.Database.IngestExternalFiles(
files.Select(f => f.Path).ToArray(),
new IngestExternalFileOptions().SetMoveFiles(true),
new IngestExternalFileOptions().SetMoveFiles(true)
,
store.GetColumn(NameOf(Columns.SstMergeContent))))
.ToBoolResult();
}
@ -1119,7 +1134,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
|| store.TryReadValue(key, valueBuffer, NameOf(columns, GetFormerColumnGroup(columns))) >= 0;
}
private bool TryDeserializeValue<TResult>(RocksDbStore store, ReadOnlySpan<byte> key, Columns columns, DeserializeValue<TResult> deserializer, [NotNullWhen(true)] out TResult? result)
public bool TryDeserializeValue<TResult>(RocksDbStore store, ReadOnlySpan<byte> key, Columns columns, DeserializeValue<TResult> deserializer, [NotNullWhen(true)] out TResult? result)
{
return TryDeserializeValue(store, key, NameOf(columns), deserializer, out result)
|| IsRotatedColumn(columns) && TryDeserializeValue(store, key, NameOf(columns, GetFormerColumnGroup(columns)), deserializer, out result);
@ -1302,6 +1317,37 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
});
}
public Result<IterateDbContentResult> IterateSstMergeContentEntries(OperationContext context, Action<MachineContentEntry> onEntry)
{
return _keyValueStore.Use(
static (store, state) =>
{
var hashKeySize = Unsafe.SizeOf<ShardHash>();
return store.IterateDbContent(
iterator =>
{
var key = iterator.Key();
if (key.Length != hashKeySize)
{
return;
}
var hash = MemoryMarshal.Read<ShardHash>(key);
RocksDbOperations.ReadMergedContentLocationEntry(iterator.Value(), out var machines, out var info);
foreach (var machine in machines)
{
var entry = new MachineContentEntry(hash, machine, info.Size!.Value, info.LatestAccessTime ?? CompactTime.Zero);
state.onEntry(entry);
}
},
state.@this.NameOf(Columns.SstMergeContent),
startValue: (byte[]?)null,
state.context.Token);
}, (@this: this, onEntry, context))
.ToResult();
}
/// <inheritdoc />
public override IEnumerable<Result<StrongFingerprint>> EnumerateStrongFingerprints(OperationContext context)
{

Просмотреть файл

@ -9,6 +9,7 @@ using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Service;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Utilities.Collections;
using BuildXL.Utilities.Serialization;
using RocksDbSharp;
@ -33,17 +34,31 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
/// <summary>
/// Deletes the range of keys in the given partition (i.e. all keys with partition as prefix).
/// </summary>
public static void DeleteLocationEntryPartitionRange<TWriter>(this TWriter writer, byte partition)
public static void DeleteLocationEntryPartitionRange<TWriter>(this TWriter writer, PartitionId partition)
where TWriter : IRocksDbColumnWriter
{
// Create a key after all keys with the given partition by taking partition
// and suffixing with byte.MaxValue greater to maximum key length.
// Next partition id can't be used because there is no way to represent the next for partition 255.
Span<byte> rangeEnd = stackalloc byte[ShortHash.SerializedLength + 1];
rangeEnd[0] = partition;
Span<byte> rangeEnd = stackalloc byte[ShortHash.SerializedLength + 2];
rangeEnd[0] = partition.EndValue;
rangeEnd.Slice(1).Fill(byte.MaxValue);
writer.DeleteRange(stackalloc[] { partition }, rangeEnd);
writer.DeleteRange(stackalloc[] { partition.StartValue }, rangeEnd);
}
/// <summary>
/// Gets a key for the partition record in the partition's range after all valid shard hash
/// keys but within the range which would be deleted by a DeleteRange operation.
/// </summary>
public static ReadOnlyArray<byte> GetPartitionRecordKey(this PartitionId partition)
{
var key = new byte[ShortHash.SerializedLength + 1];
key[0] = partition.EndValue;
key.AsSpan().Slice(1).Fill(byte.MaxValue);
key[ShortHash.SerializedLength] = 0;
return key;
}
/// <summary>

Просмотреть файл

@ -240,7 +240,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
private IAsyncEnumerable<BlobName> ListBlobsRecentFirstAsync(OperationContext context)
{
var blobs = _storage.ListBlobsAsync(context, _blobNameRegex)
var blobs = _storage.ListBlobNamesAsync(context, _blobNameRegex)
.Select(name =>
{
// This should never fail, because ListBlobsAsync returns blobs that we know already match.

Просмотреть файл

@ -119,7 +119,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
if (r.Succeeded)
{
// We don't know who the master is any more
_lastElection = new MasterElectionState(Master: default, Role: Role.Worker);
_lastElection = MasterElectionState.DefaultWorker;
}
return Result.Success(Role.Worker);
@ -236,7 +236,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
master = default(MachineLocation);
}
return new MasterElectionState(master, master.Equals(_primaryMachineLocation) ? Role.Master : Role.Worker);
return new MasterElectionState(master, master.Equals(_primaryMachineLocation) ? Role.Master : Role.Worker, MasterLeaseExpiryUtc: lease?.LeaseExpiryTimeUtc);
}
private Task<Result<MasterElectionState>> UpdateRoleAsync(OperationContext context, TryUpdateLease tryUpdateLease)

Просмотреть файл

@ -0,0 +1,225 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Extensions;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Stores;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Utilities.Collections;
using BuildXL.Utilities.Serialization;
using RocksDbSharp;
using static BuildXL.Cache.ContentStore.Distributed.MetadataService.RocksDbOperations;
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
public partial class BlobContentLocationRegistry
{
/// <summary>
/// Writes the full and diff entries based on the baseline and current content list
/// </summary>
public static BoolResult WriteSstFiles(
OperationContext context,
PartitionId partitionId,
ContentListing baseline,
ContentListing current,
IRocksDbColumnWriter fullSstWriter,
IRocksDbColumnWriter diffSstWriter,
PartitionUpdateStatistics counters)
{
// Computes and writes per hash entries of the current listing
// and returns the enumerator so subsequent step can consume the
// entries
var fullEntries = EnumerateAndWriteFullSstFile(
fullSstWriter,
partitionId,
current.EnumerateEntries());
// Compute the differences
var diffEntries = baseline.EnumerateChanges(
current.EnumerateEntries(),
counters.DiffStats,
synthesizeUniqueHashEntries: true);
// Write the differences
WriteDiffSstFile(
diffSstWriter,
diffEntries: ComputeDatabaseEntries(diffEntries),
currentEntries: fullEntries,
baselineEntries: ComputeDatabaseEntries(baseline.EnumerateEntries()),
counters);
return BoolResult.Success;
}
/// <summary>
/// Computes and writes per hash entries of the current listing
/// and returns the enumerator so subsequent step can consume the
/// entries
/// </summary>
private static IEnumerator<LocationEntryBuilder> EnumerateAndWriteFullSstFile(
IRocksDbColumnWriter writer,
PartitionId partitionId,
IEnumerable<MachineContentEntry> entries)
{
writer.DeleteLocationEntryPartitionRange(partitionId);
foreach (var entry in ComputeDatabaseEntries(entries))
{
entry.Write(writer, merge: false);
yield return entry;
}
}
/// <summary>
/// Writes the diff sst file
/// </summary>
private static void WriteDiffSstFile(
IRocksDbColumnWriter writer,
IEnumerable<LocationEntryBuilder> diffEntries,
IEnumerator<LocationEntryBuilder> currentEntries,
IEnumerable<LocationEntryBuilder> baselineEntries,
PartitionUpdateStatistics counters)
{
// Enumerates the diff entries and writes deletions and returns the existing entries
IEnumerable<LocationEntryBuilder> enumerateDiffEntriesAndWriteDeletions()
{
foreach (var item in NuCacheCollectionUtilities.DistinctMergeSorted(diffEntries.GetEnumerator(), currentEntries, e => e.Hash, e => e.Hash))
{
var diffEntry = item.left;
var fullEntry = item.right;
if (item.mode == MergeMode.LeftOnly)
{
// Entry no long exists in current entries.
// Just put a delete
writer.DeleteLocationEntry(item.left.Hash);
counters.DiffStats.Deletes.Add(new MachineContentEntry(item.left.Hash, default, item.left.Info.Size ?? 0, default));
}
else if (item.mode == MergeMode.Both)
{
yield return diffEntry;
}
else
{
// RightOnly case not handled because this is only concerned with entries which appear in the diff.
// This case should probably not happen since entries are synthezized into diff for every unique hash in the
// current entries.
}
}
}
// Enumerates the existing diff entries and writes minimal data to database. (i.e. size, creation time, and last access time
// are excluded respectively if they are present in the base entry)
foreach (var item in NuCacheCollectionUtilities.DistinctMergeSorted(enumerateDiffEntriesAndWriteDeletions(), baselineEntries, e => e.Hash, e => e.Hash))
{
// RightOnly case not handled because this is only concerned with entries which appear in the diff.
if (item.mode != MergeMode.RightOnly)
{
var diffEntry = item.left;
if (item.mode == MergeMode.Both)
{
var baselineEntry = item.right;
diffEntry.Info = MachineContentInfo.Diff(baseline: baselineEntry.Info, current: diffEntry.Info);
}
if (diffEntry.Entries.Count == 0)
{
// This is a touch-only entry
// Don't need to write size for touch-only entry
diffEntry.Info.Size = null;
if (diffEntry.Info.LatestAccessTime == null)
{
// Don't need to write touch-only entry if last access time is not set
continue;
}
}
diffEntry.Write(writer, merge: true);
}
}
}
private static IEnumerable<LocationEntryBuilder> ComputeDatabaseEntries(IEnumerable<MachineContentEntry> entries)
{
var group = new LocationEntryBuilder();
foreach (var entry in entries)
{
if (group.HasInfo && group.Hash != entry.ShardHash)
{
yield return group;
group.Reset();
}
group.Add(entry);
}
if (group.HasInfo)
{
yield return group;
}
}
/// <summary>
/// Accumulates <see cref="MachineContentEntry"/> values into a per-hash entry
/// </summary>
private class LocationEntryBuilder
{
public bool HasInfo { get; set; }
public Buffer<LocationChange> Entries = new();
public ShardHash Hash { get; set; } = default;
public MachineContentInfo Info = MachineContentInfo.Default;
public void Write(IRocksDbColumnWriter writer, bool merge)
{
MergeOrPutContentLocationEntry<IRocksDbColumnWriter, LocationChange>(writer,
Hash,
Entries.ItemSpan,
static l => l,
Info,
merge);
}
public void Add(MachineContentEntry entry)
{
HasInfo = true;
if (Entries.Count == 0)
{
Hash = entry.ShardHash;
}
if (!entry.Location.IsRemove)
{
Info.Merge(entry);
}
// Need to check for valid location in this case
// because entry may just be a synthetic entry
// which is added for the purpose of touches
if (entry.Location.IsValid)
{
Entries.Add(entry.Location);
}
}
/// <summary>
/// Reset the entry
/// </summary>
public void Reset()
{
HasInfo = false;
Hash = default;
Entries.Reset();
Info = MachineContentInfo.Default;
}
}
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -92,7 +92,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
while (!context.Token.IsCancellationRequested)
{
await PeriodicRestoreCheckpointAsync(context);
await PeriodicRestoreCheckpointAsync(context).IgnoreFailure();
await Task.Delay(_configuration.RestoreCheckpointInterval, context.Token);
}
@ -100,7 +100,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
return BoolResult.Success;
}
private Task PeriodicRestoreCheckpointAsync(OperationContext context)
public Task<BoolResult> PeriodicRestoreCheckpointAsync(OperationContext context)
{
return context.PerformOperationAsync(
Tracer,
@ -109,8 +109,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
var checkpointState = await CheckpointRegistry.GetCheckpointStateAsync(context).ThrowIfFailureAsync();
return await RestoreCheckpointAsync(context, checkpointState);
})
.IgnoreFailure();
});
}
private record DatabaseStats
@ -255,6 +254,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
long uploadSize = 0;
long retainedSize = 0;
int uploadCount = 0;
int retainedCount = 0;
var newManifest = new CheckpointManifest();
await ParallelAlgorithms.WhenDoneAsync(
_configuration.IncrementalCheckpointDegreeOfParallelism,
@ -278,6 +280,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
AddEntry(newManifest, relativePath, storageId);
attemptUpload = false;
Interlocked.Increment(ref retainedCount);
Interlocked.Add(ref retainedSize, _fileSystem.GetFileSize(file));
Counters[ContentLocationStoreCounters.IncrementalCheckpointFilesUploadSkipped].Increment();
}
@ -291,6 +294,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
incrementalCheckpointsPrefix + relativePath).ThrowIfFailureAsync();
AddEntry(newManifest, relativePath, storageId);
Interlocked.Increment(ref uploadCount);
Interlocked.Add(ref uploadSize, _fileSystem.GetFileSize(file));
Counters[ContentLocationStoreCounters.IncrementalCheckpointFilesUploaded].Increment();
}
@ -299,6 +303,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
Tracer.TrackMetric(context, "CheckpointUploadSize", uploadSize);
Tracer.TrackMetric(context, "CheckpointRetainedSize", retainedSize);
Tracer.TrackMetric(context, "CheckpointTotalSize", uploadSize + retainedSize);
Tracer.TrackMetric(context, "CheckpointUploadCount", uploadCount);
Tracer.TrackMetric(context, "CheckpointRetainedCount", retainedCount);
Tracer.TrackMetric(context, "CheckpointTotalCount", uploadCount + retainedCount);
DatabaseWriteManifest(context, newManifest);

Просмотреть файл

@ -10,6 +10,7 @@ using BuildXL.Cache.ContentStore.Utils;
using OperationContext = BuildXL.Cache.ContentStore.Tracing.Internal.OperationContext;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System;
#nullable enable
@ -40,7 +41,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
LinkLifetime(_storage);
RunInBackground(nameof(BackgroundUpdateAsync), BackgroundUpdateAsync, fireAndForget: true);
if (_configuration.Checkpoint?.UpdateClusterStateInterval > TimeSpan.Zero)
{
RunInBackground(nameof(BackgroundUpdateAsync), BackgroundUpdateAsync, fireAndForget: true);
}
}
protected override async Task<BoolResult> StartupComponentAsync(OperationContext context)

Просмотреть файл

@ -5,8 +5,11 @@ using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Stores;
using BuildXL.Utilities.Collections;
@ -17,8 +20,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// </summary>
public record ContentListing : IDisposable
{
internal const int PartitionCount = 256;
private readonly SafeAllocHHandle _handle;
private readonly int _offset;
private int _length;
@ -134,12 +135,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Enumerates the partition slices from the listing
/// </summary>
public IEnumerable<ContentListing> GetPartitionSlices()
public IEnumerable<ContentListing> GetPartitionSlices(IEnumerable<PartitionId> ids)
{
int start = 0;
for (int i = 0; i < PartitionCount; i++)
foreach (var partitionId in ids)
{
var partitionId = (byte)i;
var partition = GetPartitionContentSlice(partitionId, ref start);
yield return partition;
}
@ -148,13 +148,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Gets the slice of the full listing containing the partition's content.
/// </summary>
private ContentListing GetPartitionContentSlice(byte partitionId, ref int start)
private ContentListing GetPartitionContentSlice(PartitionId partitionId, ref int start)
{
var entrySpan = EntrySpan;
for (int i = start; i < entrySpan.Length; i++)
{
var entry = entrySpan[i];
if (entry.PartitionId != partitionId)
if (!partitionId.Contains(entry.PartitionId))
{
var result = GetSlice(start, i - start);
start = i;
@ -169,9 +169,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Gets an unmanaged stream over the listing.
/// </summary>
public Stream AsStream()
public StreamWithLength AsStream()
{
return new UnmanagedMemoryStream(_handle, _offset * (long)MachineContentEntry.ByteLength, _length * (long)MachineContentEntry.ByteLength, FileAccess.ReadWrite);
var stream = new UnmanagedMemoryStream(_handle, _offset * (long)MachineContentEntry.ByteLength, _length * (long)MachineContentEntry.ByteLength, FileAccess.ReadWrite);
return stream.WithLength(stream.Length);
}
/// <summary>
@ -202,24 +203,36 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Computes the difference from this listing to <paramref name="nextEntries"/>.
/// </summary>
public IEnumerable<MachineContentEntry> EnumerateChanges(IEnumerable<MachineContentEntry> nextEntries,
PartitionChangeCounters counters = null)
public IEnumerable<MachineContentEntry> EnumerateChanges(
IEnumerable<MachineContentEntry> nextEntries,
BoxRef<DiffContentStatistics> counters,
bool synthesizeUniqueHashEntries)
{
counters ??= new PartitionChangeCounters();
foreach (var diff in NuCacheCollectionUtilities.DistinctDiffSorted(EnumerateEntries(), nextEntries, i => i))
counters ??= new DiffContentStatistics();
foreach (var diff in NuCacheCollectionUtilities.DistinctMergeSorted(EnumerateEntries(), nextEntries, i => i, i => i))
{
if (diff.mode == MergeMode.LeftOnly)
{
counters.Removes++;
counters.RemoveContentSize += diff.item.Size.Value;
yield return diff.item with { Location = diff.item.Location.AsRemove() };
counters.Value.Removes.Add(diff.left);
yield return diff.left with { Location = diff.left.Location.AsRemove() };
}
else
{
counters.Adds++;
counters.AddContentSize += diff.item.Size.Value;
Contract.Assert(!diff.item.Location.IsRemove);
yield return diff.item;
var entry = diff.Either();
bool isUnique = counters.Value.Total.Add(entry);
if (diff.mode == MergeMode.RightOnly)
{
counters.Value.Adds.Add(entry, isUnique);
yield return diff.right;
}
else if (isUnique && synthesizeUniqueHashEntries)
{
// Synthesize fake entry for hash
// This ensures that touches happen even when no content adds/removes
// have happened for the hash.
// NOTE: This should not be written to the database.
yield return new MachineContentEntry() with { ShardHash = entry.ShardHash };
}
}
}
}
@ -243,12 +256,73 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// Counters for <see cref="ContentListing.EnumerateChanges"/>
/// </summary>
public record PartitionChangeCounters
public record PartitionUpdateStatistics
{
public long Adds;
public long Removes;
public long AddContentSize;
public long RemoveContentSize;
// TODO: Log(size) statistics?
public DiffContentStatistics DiffStats { get; init; } = new DiffContentStatistics();
}
public record ContentStatistics
{
public long TotalCount { get; set; }
public long TotalSize { get; set; }
public long UniqueCount { get; set; }
public long UniqueSize { get; set; }
public ShardHash? First { get; set; }
public ShardHash? Last { get; set; }
public int MaxHashFirstByteDifference { get; set; }
public MachineContentInfo Info = MachineContentInfo.Default;
public bool Add(MachineContentEntry entry, bool? isUnique = default)
{
var last = Last;
Last = entry.ShardHash;
First ??= Last;
Info.Merge(entry);
var size = entry.Size.Value;
TotalCount++;
TotalSize += size;
isUnique ??= last != Last;
if (isUnique.Value)
{
if (last != null)
{
var bytes1 = MemoryMarshal.AsBytes(stackalloc[] { last.Value });
var bytes2 = MemoryMarshal.AsBytes(stackalloc[] { Last.Value });
MaxHashFirstByteDifference = Math.Max(MaxHashFirstByteDifference, GetFirstByteDifference(bytes1, bytes2));
}
UniqueCount++;
UniqueSize += size;
}
return isUnique.Value;
}
private static int GetFirstByteDifference(Span<byte> bytes1, Span<byte> bytes2)
{
for (int i = 0; i < bytes1.Length; i++)
{
if (bytes1[i] != bytes2[i])
{
return i;
}
}
return bytes1.Length;
}
}
public record DiffContentStatistics()
{
public ContentStatistics Adds { get; init; } = new();
public ContentStatistics Removes { get; init; } = new();
public ContentStatistics Deletes { get; init; } = new();
public ContentStatistics Touchs { get; init; } = new();
public ContentStatistics Total { get; init; } = new();
}
/// <summary>

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Stores;
@ -12,9 +13,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// Current master machine
MachineLocation Master,
/// Role of the current machine
Role Role)
Role Role,
DateTime? MasterLeaseExpiryUtc)
{
public static MasterElectionState DefaultWorker = new MasterElectionState(Master: default, Role: Role.Worker);
public static MasterElectionState DefaultWorker = new MasterElectionState(Master: default, Role: Role.Worker, MasterLeaseExpiryUtc: null);
}
/// <nodoc />

Просмотреть файл

@ -304,7 +304,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// </summary>
public CompactTime? LatestAccessTime
{
get => _latestAccessTime > 0 ? new CompactTime(_latestAccessTime) : null;
get => _latestAccessTime != DefaultInvalidLatestAccessTime ? new CompactTime(_latestAccessTime) : null;
}
/// <summary>
@ -313,7 +313,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// </summary>
public CompactTime? EarliestAccessTime
{
get => _earliestAccessTime > 0 ? new CompactTime(_earliestAccessTime) : null;
get => _earliestAccessTime != DefaultInvalidEarliestTime ? new CompactTime(_earliestAccessTime) : null;
}
/// <summary>

Просмотреть файл

@ -109,7 +109,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
return DistinctMergeSorted(leftEnumerator, rightEnumerator, getLeftComparable, getRightComparable);
}
private static IEnumerable<(TLeft left, TRight right, MergeMode mode)> DistinctMergeSorted<TLeft, TRight, TComparable>(
/// <summary>
/// Merges two sorted sequences with no duplicates.
/// </summary>
public static IEnumerable<(TLeft left, TRight right, MergeMode mode)> DistinctMergeSorted<TLeft, TRight, TComparable>(
IEnumerator<TLeft> leftEnumerator,
IEnumerator<TRight> rightEnumerator,
Func<TLeft, TComparable> getLeftComparable,

Просмотреть файл

@ -0,0 +1,84 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Linq;
using System.Text.Json.Serialization;
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Utils;
using BuildXL.Utilities;
using BuildXL.Utilities.Collections;
using BuildXL.Utilities.Tasks;
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
/// <summary>
/// Defines a hash partition encompassing hashes with prefixes from <paramref cref="StartValue"/>
/// to <paramref cref="EndValue"/> inclusive.
/// </summary>
/// <param name="StartValue">The first included hash prefix of the range</param>
/// <param name="EndValue">The last included hash prefix of the range</param>
public record struct PartitionId(byte StartValue, byte EndValue)
{
private const int MaxPartitionCount = 256;
/// <summary>
/// Number of a hash prefixes included in the partition
/// </summary>
public int Width { get; } = (EndValue - StartValue) + 1;
/// <summary>
/// The index of the partition in the ordered list of partitions
/// </summary>
public int Index => StartValue / Width;
/// <summary>
/// The total number of partitions.
/// </summary>
public int PartitionCount => MaxPartitionCount / Width;
/// <summary>
/// The prefix used for blobs representing this partition
/// </summary>
public string BlobPrefix => $"{PartitionCount}/{HexUtilities.BytesToHex(new[] { StartValue })}-{HexUtilities.BytesToHex(new[] { EndValue })}";
/// <summary>
/// Gets whether the partition contains the hash prefix (i.e. first byte of the hash)
/// </summary>
public bool Contains(byte hashPrefix)
{
return StartValue <= hashPrefix && hashPrefix <= EndValue;
}
/// <inheritdoc />
public override string ToString()
{
return $"[{StartValue}, {EndValue}]";
}
/// <summary>
/// Gets an ordered array of partitions for the given count. NOTE: The input is coerced into a valid
/// value (i.e. a power of 2 between 1 and 256 inclusive)
/// </summary>
public static ReadOnlyArray<PartitionId> GetPartitions(int partitionCount)
{
partitionCount = Math.Min(Math.Max(1, partitionCount), MaxPartitionCount);
// Round to power of two
var powerOfTwo = Bits.HighestBitSet((uint)partitionCount);
partitionCount = (int)(powerOfTwo < partitionCount ? powerOfTwo << 1 : powerOfTwo);
var perRangeCount = (MaxPartitionCount + partitionCount - 1) / partitionCount;
return Enumerable.Range(0, partitionCount)
.Select(i =>
{
var start = i * perRangeCount;
var end = Math.Min(byte.MaxValue, (start + perRangeCount) - 1);
return new PartitionId((byte)start, (byte)end);
})
.ToArray();
}
}
}

Просмотреть файл

@ -27,7 +27,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <summary>
/// <see cref="IContentLocationStore"/> implementation that supports old redis and new local location store.
/// </summary>
internal class TransitioningContentLocationStore : StartupShutdownBase, IContentLocationStore, IDistributedLocationStore, IDistributedMachineInfo
public class TransitioningContentLocationStore : StartupShutdownBase, IContentLocationStore, IDistributedLocationStore, IDistributedMachineInfo
{
/// <nodoc />
public ILocalContentStore LocalContentStore { get; }

Просмотреть файл

@ -129,21 +129,25 @@ namespace BuildXL.Cache.ContentStore.Distributed.Services
CentralStorage = Create(() => CreateCentralStorage());
BlobContentLocationRegistry = CreateOptional(
() => Dependencies.DistributedContentSettings.InstanceOrDefault()?.LocationStoreSettings?.EnableBlobContentLocationRegistry == true,
() => Dependencies.DistributedContentSettings.InstanceOrDefault()?.LocationStoreSettings?.EnableBlobContentLocationRegistry == true
&& !Configuration.DistributedContentConsumerOnly,
() => CreateBlobContentLocationRegistry());
}
private BlobContentLocationRegistry CreateBlobContentLocationRegistry()
{
var baseConfiguration = Arguments.Dependencies.DistributedContentSettings.GetRequiredInstance().LocationStoreSettings.BlobContentLocationRegistrySettings ?? new();
var database = (RocksDbContentMetadataDatabase)Dependencies.GlobalCacheCheckpointManager.GetRequiredInstance().Database;
return new BlobContentLocationRegistry(
new BlobContentLocationRegistryConfiguration(Arguments.Dependencies.DistributedContentSettings.GetRequiredInstance().LocationStoreSettings.BlobContentLocationRegistrySettings ?? new())
new BlobContentLocationRegistryConfiguration(baseConfiguration)
{
Credentials = Configuration.AzureBlobStorageCheckpointRegistryConfiguration!.Credentials,
},
ClusterStateManager.Instance,
localContentStore: null, // Set to null initially. Will be populated when content location store is initialized similar to LLS.
Configuration.PrimaryMachineLocation,
Arguments.Clock);
database,
localContentStore: null, // Set to null initially. Will be populated when content location store is initialized similar to LLS.
clock: Arguments.Clock);
}
private ClientGlobalCacheStore CreateClientGlobalCacheStore()

Просмотреть файл

@ -284,7 +284,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Services
};
if (!GlobalCacheService.IsAvailable
&& DistributedContentSettings.EnableGlobalCacheLocationStoreValidation
&& DistributedContentSettings.GlobalCacheDatabaseValidationMode != DatabaseValidationMode.None
&& DistributedContentSettings.GlobalCacheBackgroundRestore)
{
// Restore checkpoints in checkpoint manager when GCS is unavailable since
@ -371,7 +371,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.Services
checkpointManager,
RocksDbContentMetadataStore.Instance,
eventStream,
clock);
clock,
registry: ContentLocationStoreServices.Instance.BlobContentLocationRegistry.InstanceOrDefault());
return service;
}

Просмотреть файл

@ -170,7 +170,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
string IDistributedContentCopierHost2.ReportCopyResult(OperationContext context, ContentLocation info, CopyFileResult result)
{
if (_distributedContentSettings?.EnableGlobalCacheLocationStoreValidation != true
if (_distributedContentSettings == null
|| _distributedContentSettings.GlobalCacheDatabaseValidationMode == DatabaseValidationMode.None
|| GlobalCacheCheckpointManager == null
|| LocalLocationStore == null
|| !LocalLocationStore.ClusterState.TryResolveMachineId(info.Machine, out var machineId))
@ -195,25 +196,41 @@ namespace BuildXL.Cache.ContentStore.Distributed.Stores
};
}
bool shouldError = _distributedContentSettings.GlobalCacheDatabaseValidationMode == DatabaseValidationMode.LogAndError
&& result.Succeeded
&& info.Origin == GetBulkOrigin.Local
&& !isPresentInGcs;
// Represent various aspects as parameters in operation so
// they show up as separate dimensions on the metric in MDM.
var presence = isPresentInGcs ? "GcsContains" : "GcsMissing";
var operationResult = new OperationResult(
message: presence,
operationName: presence,
tracerName: $"{nameof(DistributedContentStore)}.{nameof(IDistributedContentCopierHost2.ReportCopyResult)}",
status: result.GetStatus(),
duration: result.Duration,
operationKind: originToKind(info.Origin),
exception: result.Exception,
operationId: context.TracingContext.TraceId,
severity: Severity.Debug);
// Directly calls IOperationLogger interface because we don't want to log a message, just surface a complex metric
if (context.TracingContext.Logger is IOperationLogger logger)
if (shouldError && context.TracingContext.Logger is IStructuredLogger slog)
{
// Represent various aspects as parameters in operation so
// they show up as separate dimensions on the metric in MDM.
logger.OperationFinished(new OperationResult(
message: "",
operationName: presence,
tracerName: $"{nameof(DistributedContentStore)}.{nameof(IDistributedContentCopierHost2.ReportCopyResult)}",
status: result.GetStatus(),
duration: result.Duration,
operationKind: originToKind(info.Origin),
exception: result.Exception,
operationId: context.TracingContext.TraceId,
severity: Severity.Debug));
slog.LogOperationFinished(operationResult);
}
else if (context.TracingContext.Logger is IOperationLogger logger)
{
logger.OperationFinished(operationResult);
}
return $"IsPresentInGcs=[{isPresentInGcs}]";
if (shouldError)
{
new ErrorResult($"Content found in LLS but not GCS DB. {info.Hash} CopyResult={result}").ThrowIfFailure();
}
return $"Origin=[{info.Origin}] IsPresentInGcs=[{isPresentInGcs}]";
}
private Task<Result<ReadOnlyDistributedContentSession>> CreateCopySession(Context context)

Просмотреть файл

@ -35,10 +35,20 @@ namespace BuildXL.Cache.ContentStore.Distributed.Utilities
public static JsonSerializerOptions IndentedSerializationOptions { get; } = GetOptions(indent: true);
/// <summary>
/// Options used when reading deployment configuration
/// </summary>
public static JsonDocumentOptions DefaultDocumentOptions { get; } = new JsonDocumentOptions()
{
AllowTrailingCommas = true,
CommentHandling = JsonCommentHandling.Skip
};
private static JsonSerializerOptions GetOptions(bool indent)
{
return new JsonSerializerOptions()
{
IgnoreReadOnlyProperties = true,
AllowTrailingCommas = true,
ReadCommentHandling = JsonCommentHandling.Skip,
WriteIndented = indent,
@ -51,6 +61,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.Utilities
FuncJsonConverter.Create(ReadMachineId, (writer, value) => writer.WriteNumberValue(value.Index)),
FuncJsonConverter.Create(ReadMachineLocation, (writer, value) => writer.WriteStringValue(value.Path)),
FuncJsonConverter.Create(ReadShortHash, (writer, value) => writer.WriteStringValue(value.HashType == HashType.Unknown ? null : value.ToString())),
FuncJsonConverter.Create(ReadShardHash, (writer, value) => writer.WriteStringValue(value.ToShortHash().HashType == HashType.Unknown ? null : value.ToShortHash().ToString())),
FuncJsonConverter.Create(ReadCompactTime, (writer, value) => writer.WriteStringValue(value.ToDateTime())),
FuncJsonConverter.Create(ReadCompactSize, (writer, value) => writer.WriteNumberValue(value.Value)),
}
};
}
@ -78,6 +91,21 @@ namespace BuildXL.Cache.ContentStore.Distributed.Utilities
return new MachineId(reader.GetInt32());
}
private static ShardHash ReadShardHash(ref Utf8JsonReader reader)
{
return ReadShortHash(ref reader).AsEntryKey();
}
private static CompactSize ReadCompactSize(ref Utf8JsonReader reader)
{
return reader.GetInt64();
}
private static CompactTime ReadCompactTime(ref Utf8JsonReader reader)
{
return reader.GetDateTime();
}
private static ShortHash ReadShortHash(ref Utf8JsonReader reader)
{
if (reader.TokenType == JsonTokenType.StartObject)
@ -90,15 +118,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.Utilities
return data == null ? default : new ShortHash(data);
}
/// <summary>
/// Options used when reading deployment configuration
/// </summary>
public static JsonDocumentOptions DefaultDocumentOptions { get; } = new JsonDocumentOptions()
{
AllowTrailingCommas = true,
CommentHandling = JsonCommentHandling.Skip
};
/// <summary>
/// Serialize the value to json using <see cref="DefaultSerializationOptions"/>
/// </summary>
@ -120,6 +139,18 @@ namespace BuildXL.Cache.ContentStore.Distributed.Utilities
/// </summary>
public static ValueTask<T> JsonDeserializeAsync<T>(Stream value)
{
#if NETCOREAPP
if (value is MemoryStream memoryStream && memoryStream.TryGetBuffer(out var buffer))
{
// JsonSerializer.DeserializeAsync can fail on reading large streams if there is a value which
// must be skipped. Workaround this in some cases where the data is in memory
// and using the synchronous API.
var doc = JsonDocument.Parse(buffer.AsMemory(), DefaultDocumentOptions);
var result = JsonSerializer.Deserialize<T>(doc, DefaultSerializationOptions);
return new ValueTask<T>(result);
}
#endif
return JsonSerializer.DeserializeAsync<T>(value, DefaultSerializationOptions);
}

Просмотреть файл

@ -343,9 +343,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
Clock = clock;
}
public Task OnRoleUpdatedAsync(OperationContext context, Role role)
public Task OnRoleUpdatedAsync(OperationContext context, MasterElectionState electionState)
{
return RoleQueue.Writer.WriteAsync((role, Clock.UtcNow)).AsTask();
return RoleQueue.Writer.WriteAsync((electionState.Role, Clock.UtcNow)).AsTask();
}
public async Task WaitTillCurrentRoleAsync()
@ -455,4 +455,12 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
await machine.ShutdownAsync(operationContext).ThrowIfFailureAsync();
}
}
public static class TestRoleObserverExtensions
{
public static Task OnRoleUpdatedAsync(this IRoleObserver observer, OperationContext context, Role role)
{
return observer.OnRoleUpdatedAsync(context, new MasterElectionState(default, role, default));
}
}
}

Просмотреть файл

@ -4,16 +4,22 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Distributed.Utilities;
using BuildXL.Cache.ContentStore.FileSystem;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Interfaces.Time;
@ -26,13 +32,20 @@ using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.UtilitiesCore;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Cache.Host.Service;
using BuildXL.Engine.Cache.KeyValueStores;
using BuildXL.Utilities.Collections;
using ContentStoreTest.Distributed.Redis;
using ContentStoreTest.Test;
using FluentAssertions;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using RocksDbSharp;
using Test.BuildXL.TestUtilities.Xunit;
using Xunit;
using Xunit.Abstractions;
using static BuildXL.Cache.ContentStore.Distributed.NuCache.BlobContentLocationRegistry;
using OperationContext = BuildXL.Cache.ContentStore.Tracing.Internal.OperationContext;
#nullable enable annotations
@ -40,26 +53,32 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
{
[Collection("Redis-based tests")]
[Trait("Category", "WindowsOSOnly")] // 'redis-server' executable no longer exists
public class BlobContentLocationRegistryTests : TestWithOutput
#if !NETCOREAPP
[TestClassIfSupported(TestRequirements.NotSupported)]
#endif
public class BlobContentLocationRegistryTests : TestBase
{
private readonly static MachineLocation M1 = new MachineLocation("M1");
private readonly LocalRedisFixture _fixture;
public BlobContentLocationRegistryTests(LocalRedisFixture fixture, ITestOutputHelper output)
: base(output)
: base(TestGlobal.Logger, output)
{
_fixture = fixture;
}
[Fact]
public void TestCompat()
{
// Changing this value is a breaking change!! An epoch reset is needed if this changed.
// Changing these values are a breaking change!! An epoch reset is needed if they are changed.
MachineRecord.MaxBlockLength.Should().Be(56);
Unsafe.SizeOf<MachineContentEntry>().Should().Be(24);
}
[Theory]
[MemberData(nameof(TruthTable.GetTable), 2, MemberType = typeof(TruthTable))]
public async Task TestBasicRegister(bool concurrent, bool expireMachine)
[MemberData(nameof(TruthTable.GetTable), 3, MemberType = typeof(TruthTable))]
public async Task TestBasicRegister(bool concurrent, bool expireMachine, bool useMaxPartitionCount)
{
// Increase to find flaky test if needed
int iterationCount = 1;
@ -68,6 +87,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
await RunTest(async context =>
{
var excludedMachine = context.Machines[1];
var firstHash = new ContentHash(HashType.MD5, Guid.Empty.ToByteArray());
excludedMachine.Store.Add(firstHash, DateTime.UtcNow, 42);
bool excludeHeartbeat(TestMachine machine)
{
return expireMachine && machine == excludedMachine;
@ -82,6 +104,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
await context.HeartbeatAllMachines();
await context.UpdateAllMachinePartitionsAsync(concurrent);
await CheckAllPartitionsAsync(context, isExcluded: null);
context.Arguments.Clock.UtcNow += TimeSpan.FromHours(5);
// Heartbeat all machine so all machines are aware of each other and the selected machine has expired
@ -92,44 +116,75 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
await CheckAllPartitionsAsync(context, excludeContentByLocation);
},
machineCount: concurrent ? 10 : 3);
machineCount: concurrent ? 10 : 3,
useMaxPartitionCount: useMaxPartitionCount);
}
}
// Only include some partitions to keep test run times reasonable
private static readonly byte[] IncludedPartitions = new byte[] { 0, 4, 5, 6, 25, 255 };
//private static readonly byte[] IncludedPartitions = new byte[] { 4 };
private static readonly ReadOnlyArray<PartitionId> MaxPartitionCountIncludedPartitions = new byte[] { 0, 4, 5, 6, 25, 255 }
.Select(i => PartitionId.GetPartitions(256)[i])
.ToArray();
private static readonly ReadOnlyArray<PartitionId> FewPartitionCountIncludedPartitions = PartitionId.GetPartitions(1).ToArray();
private static async Task CheckAllPartitionsAsync(TestContext context, Func<MachineLocation, bool> isExcluded)
{
var primary = context.PrimaryMachine;
context.ActualContent.Clear();
context.Arguments.Context.TracingContext.Debug("--- Checking all partitions ---", nameof(BlobContentLocationRegistryTests));
foreach (var partition in IncludedPartitions)
foreach (var partition in context.IncludedPartitions)
{
var listing = await context.PrimaryMachine.Registry.ComputeSortedPartitionContentAsync(context, partition);
var listing = await primary.Registry.ComputeSortedPartitionContentAsync(context, partition);
listing.EntrySpan.Length.Should().Be(listing.FullSpanForTests.Length, $"Partition={partition}");
context.CheckListing(listing, partition);
context.Arguments.Context.TracingContext.Debug($"Partition={partition} Entries={listing.EntrySpan.Length}", nameof(BlobContentLocationRegistryTests));
}
var expectedContent = context.ExpectedContent.Where(t => isExcluded?.Invoke(t.Machine) != true);
var expectedContent = context.ExpectedContent.Where(t => isExcluded?.Invoke(t.Machine) != true).ToHashSet();
// Using this instead for set equivalence since the built-in equivalence is very slow.
context.ActualContent.Except(expectedContent).Should().BeEmpty("Found unexpected content. Actual content minus expected content should be empty");
expectedContent.Except(context.ActualContent).Should().BeEmpty("Could not find expected content. Expected content minus actual content should be empty");
// Ensure database is updated by setting time after interval and triggering update
context.Clock.UtcNow += TestContext.PartitionsUpdateInterval + TimeSpan.FromSeconds(10);
await context.UpdateMachinePartitionsAsync(primary).ShouldBeSuccess();
var clusterState = primary.ClusterStateManager.ClusterState;
var actualDbContent = context.ExpectedContent.Take(0).ToHashSet();
primary.Database.IterateSstMergeContentEntries(context, entry =>
{
entry.Location.IsAdd.Should().BeTrue();
clusterState.TryResolve(entry.Location.AsMachineId(), out var location).Should().BeTrue();
var actualEntry = (entry.Hash, location, entry.Size.Value);
expectedContent.Should().Contain(actualEntry);
actualDbContent.Add(actualEntry).Should().BeTrue();
}).ShouldBeSuccess().Value.ReachedEnd.Should().BeTrue();
expectedContent.Except(actualDbContent).Should().BeEmpty("Could not find expected content in database. Expected content minus actual content should be empty");
}
private async Task RunTest(
Func<TestContext, Task> runTest,
int machineCount = 3,
double machineContentFraction = 0.5,
int totalUniqueContent = 10000)
int totalUniqueContent = 10000,
bool useMaxPartitionCount = true)
{
var includedPartitions = useMaxPartitionCount
? MaxPartitionCountIncludedPartitions
: FewPartitionCountIncludedPartitions;
var tracingContext = new Context(TestGlobal.Logger);
var context = new OperationContext(tracingContext);
var clock = new MemoryClock();
using var storage = AzuriteStorageProcess.CreateAndStartEmpty(_fixture, TestGlobal.Logger);
var arguments = new TestContextArguments(context, storage.ConnectionString, clock);
var arguments = new TestContextArguments(context, storage.ConnectionString, clock, TestRootDirectoryPath, includedPartitions);
var testContext = new TestContext(arguments);
await testContext.StartupAsync(context).ThrowIfFailureAsync();
@ -158,11 +213,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
var maxSizeMask = (1L << maxSizeOffset) - 1;
var size = MemoryMarshal.Read<long>(MemoryMarshal.AsBytes(stackalloc[] { hash.ToFixedBytes() })) & maxSizeMask;
if (IncludedPartitions.Contains(hash[0]))
{
testContext.ExpectedContent.Add(((ShortHash)hash, machine.Location, size));
}
machine.Store.Add(
hashes[hashIndex],
accessTime,
@ -174,7 +224,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
await testContext.ShutdownAsync(context).ThrowIfFailureAsync();
}
private class MockContentStore : TestObserver, ILocalContentStore
private record MockContentStore(TestContext Context, MachineLocation Location) : ILocalContentStore
{
private ConcurrentDictionary<ShortHash, ContentInfo> InfoMap { get; } = new();
private int[] PartitionCounts { get; } = new int[256];
@ -210,26 +260,28 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
public void Add(ContentHash hash, DateTime accessTime, long size)
{
ShortHash shortHash = hash;
var partition = shortHash[0];
if (InfoMap.TryAdd(hash, new ContentInfo(hash, size, accessTime)))
{
var partition = MemoryMarshal.Read<byte>(MemoryMarshal.AsBytes(stackalloc[] { hash.ToFixedBytes() }));
PartitionCounts[partition]++;
}
}
internal override void OnPutBlock(byte partitionId, ContentListing partition)
{
if (PartitionCounts[partitionId] != partition.EntrySpan.Length)
{
if (Context.IncludedPartitionIndices.Contains(hash[0]))
{
Context.ExpectedContent.Add((shortHash, Location, size));
}
}
}
}
private record TestContextArguments(OperationContext Context, string ConnectionString, MemoryClock Clock);
private record TestContextArguments(OperationContext Context,
string ConnectionString,
MemoryClock Clock,
AbsolutePath Path,
ReadOnlyArray<PartitionId> IncludedPartitions);
private record TestMachine(
int Index,
RocksDbContentMetadataDatabase Database,
MockContentStore Store,
ClusterStateManager ClusterStateManager,
BlobContentLocationRegistry Registry,
@ -247,8 +299,16 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
public KeyedList<MachineLocation, TestMachine> Machines { get; } = new();
public static readonly TimeSpan PartitionsUpdateInterval = TimeSpan.FromMinutes(5);
public TestMachine PrimaryMachine => Machines[0];
public ReadOnlyArray<PartitionId> IncludedPartitions => Arguments.IncludedPartitions;
public byte[] IncludedPartitionIndices => Arguments.IncludedPartitions
.SelectMany(p => Enumerable.Range(p.StartValue, (p.EndValue - p.StartValue) + 1))
.Select(i => (byte)i)
.ToArray();
public HashSet<(ShortHash Hash, MachineLocation Machine, long Size)> ExpectedContent = new();
public HashSet<(ShortHash Hash, MachineLocation Machine, long Size)> ActualContent = new();
@ -256,24 +316,39 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
public MemoryClock Clock => Arguments.Clock;
public DisposableDirectory Directory { get; }
public Guid TestUniqueId = Guid.NewGuid();
public TestContext(TestContextArguments data)
{
Arguments = data;
Directory = new DisposableDirectory(PassThroughFileSystem.Default, data.Path);
}
protected override void DisposeCore()
{
Directory.Dispose();
}
public Task CreateAndStartAsync()
{
var index = Machines.Count;
var configuration = new BlobContentLocationRegistryConfiguration()
{
FolderName = TestUniqueId.ToString(),
Credentials = new AzureBlobStorageCredentials(Arguments.ConnectionString),
PerPartitionDelayInterval = TimeSpan.Zero,
PartitionsUpdateInterval = TimeSpan.FromMinutes(5),
UpdateInBackground = false
PartitionsUpdateInterval = PartitionsUpdateInterval,
UpdateInBackground = false,
PartitionCount = IncludedPartitions[0].PartitionCount,
UpdateDatabase = index == 0
};
var location = GetRandomLocation();
var store = new MockContentStore();
var store = new MockContentStore(this, location);
var clusterStateManager = new ClusterStateManager(
new LocalLocationStoreConfiguration()
@ -289,15 +364,22 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
Clock),
Clock);
var database = new RocksDbContentMetadataDatabase(
Clock,
new RocksDbContentMetadataDatabaseConfiguration(Directory.Path / Machines.Count.ToString())
{
UseMergeOperators = true
});
var registry = new BlobContentLocationRegistry(
configuration,
clusterStateManager,
store,
location,
Arguments.Clock);
database,
store,
Clock);
var machine = new TestMachine(Machines.Count, store, clusterStateManager, registry, location);
registry.Observer = store;
var machine = new TestMachine(index, database, store, clusterStateManager, registry, location);
Machines.Add(machine);
@ -318,7 +400,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
// Heartbeat machine so it knows about other machines
await machine.ClusterStateManager.HeartbeatAsync(this, MachineState.Open).ShouldBeSuccess();
}
}
}
@ -349,6 +430,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
public Task<BoolResult> UpdateMachinePartitionsAsync(TestMachine machine)
{
// Ensure the primary machine is updating its database
PrimaryMachine.Registry.SetDatabaseUpdateLeaseExpiry(Clock.UtcNow + TimeSpan.FromMinutes(5));
return machine.Registry.UpdatePartitionsAsync(this,
excludePartition: partition => !IncludedPartitions.Contains(partition))
.ThrowIfFailureAsync(s => s);
@ -379,7 +463,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
return result;
}
internal void CheckListing(ContentListing listing, byte partitionId)
internal void CheckListing(ContentListing listing, PartitionId partitionId)
{
var clusterState = PrimaryMachine.ClusterStateManager.ClusterState;
var entries = listing.EntrySpan.ToArray();
@ -387,8 +471,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
{
var entry = entries[i];
entry.PartitionId.Should().Be(partitionId);
entry.Hash[0].Should().Be(partitionId);
entry.PartitionId.Should().Be(entry.Hash[0]);
partitionId.Contains(entry.PartitionId).Should().BeTrue();
clusterState.TryResolve(entry.Location.AsMachineId(), out var machineLocation).Should().BeTrue();
ActualContent.Add((entry.Hash, machineLocation, entry.Size.Value)).Should().BeTrue();

Просмотреть файл

@ -2,7 +2,9 @@
// Licensed under the MIT License.
using System;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
@ -57,6 +59,35 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
_fixture = fixture;
}
/// <summary>
/// This test is for a bug in Azurite (the Azure storage emulator)
/// where creating a snapshot causes PutBlock operations with a lease to fail.
/// </summary>
[Fact]
public async Task TestStorage()
{
using var storage = AzuriteStorageProcess.CreateAndStartEmpty(_fixture, TestGlobal.Logger);
var creds = new AzureBlobStorageCredentials(storage.ConnectionString);
var client = creds.CreateCloudBlobClient();
var container = client.GetContainerReference("test");
await container.CreateIfNotExistsAsync();
var blob = container.GetBlockBlobReference("test/sub/blob.out.bin");
var bytes = Encoding.UTF8.GetBytes("hello");
await blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length);
var leaseId = await blob.AcquireLeaseAsync(TimeSpan.FromSeconds(60));
var snapshot = await blob.SnapshotAsync();
await blob.PutBlockAsync("0000", new MemoryStream(), null, Microsoft.WindowsAzure.Storage.AccessCondition.GenerateLeaseCondition(leaseId), null, null);
}
[Fact]
public Task CreatesMissingContainerOnWrite()
{

Просмотреть файл

@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.InterfacesTest.Results;
using BuildXL.Cache.Host.Configuration;
using ContentStoreTest.Distributed.Redis;
using FluentAssertions;
using Test.BuildXL.TestUtilities.Xunit;
using Xunit;
using Xunit.Abstractions;
namespace ContentStoreTest.Distributed.Sessions
{
[Trait("Category", "Integration")]
[Trait("Category", "LongRunningTest")]
[Collection("Redis-based tests")]
#if !NETCOREAPP
[TestClassIfSupported(TestRequirements.NotSupported)]
#endif
public class BlobLocationRegistryDistributedContentTests : LocalLocationStoreDistributedContentTests
{
/// <nodoc />
public BlobLocationRegistryDistributedContentTests(LocalRedisFixture redis, ITestOutputHelper output)
: base(redis, output)
{
}
// Disable flaky test.
public override Task PinWithUnverifiedCountAndStartCopyWithThreshold(int threshold)
{
return Task.CompletedTask;
}
protected override TestDistributedContentSettings ModifySettings(TestDistributedContentSettings dcs)
{
if (dcs.IsMasterEligible)
{
// Enable GCS on the master machine so checkpoint can be created.
dcs.MemoizationContentMetadataStoreModeOverride = ContentMetadataStoreMode.WriteBothPreferRedis;
dcs.ContentMetadataEnableResilience = true;
}
else
{
// Prevent workers from processing partition to simplify test logging so that
// partition writes only come from one machine (typically during CreateCheckpointAsync below)
dcs.LocationStoreSettings.BlobContentLocationRegistrySettings.ProcessPartitions = false;
}
dcs.LocationStoreSettings.BlobContentLocationRegistrySettings.PartitionsUpdateInterval = "1m";
dcs.LocationStoreSettings.BlobContentLocationRegistrySettings.PartitionCount = 2;
dcs.LocationStoreSettings.BlobContentLocationRegistrySettings.PerPartitionDelayInterval = TimeSpan.Zero;
dcs.LocationStoreSettings.BlobContentLocationRegistrySettings.UpdateInBackground = false;
dcs.LocationStoreSettings.BlobContentLocationRegistrySettings.UpdateDatabase = true;
dcs.LocationStoreSettings.EnableBlobContentLocationRegistry = true;
dcs.GlobalCacheDatabaseValidationMode = DatabaseValidationMode.LogAndError;
dcs.ContentMetadataUseMergeWrites = true;
dcs.EnableIndependentBackgroundMasterElection = true;
return base.ModifySettings(dcs);
}
protected override async Task<BoolResult> RestoreCheckpointAsync(InstanceRef storeRef, TestContext context)
{
await base.RestoreCheckpointAsync(storeRef, context).ShouldBeSuccess();
var index = storeRef.ResolveIndex(context);
var checkpointManager = context.GetServices(index).Dependencies.GlobalCacheCheckpointManager.GetRequiredInstance();
await checkpointManager.PeriodicRestoreCheckpointAsync(context).ShouldBeSuccess();
return BoolResult.Success;
}
protected override async Task<BoolResult> CreateCheckpointAsync(InstanceRef storeRef, TestContext context)
{
var index = storeRef.ResolveIndex(context);
index.Should().Be(context.GetMasterIndex());
await UpdateAllRegistriesAsync(context, index);
await base.CreateCheckpointAsync(storeRef, context).ShouldBeSuccess();
var registry = context.GetBlobContentLocationRegistry(index);
// Increment time to ensure database is updated on master
TestClock.UtcNow += TimeSpan.FromMinutes(2);
await registry.UpdatePartitionsAsync(context).ShouldBeSuccess();
var services = context.GetServices(index);
var service = context.GetContentMetadataService(index);
await service.CreateCheckpointAsync(context).ShouldBeSuccess();
return BoolResult.Success;
}
private static async Task UpdateAllRegistriesAsync(TestContext context, int? excludeIndex = null)
{
for (int i = 0; i < context.Stores.Count; i++)
{
if (i != excludeIndex)
{
var registry = context.GetBlobContentLocationRegistry(i);
await registry.UpdatePartitionsAsync(context).ShouldBeSuccess();
}
}
}
}
}

Просмотреть файл

@ -51,6 +51,24 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation
_fixture = fixture;
}
[Fact]
public void TestPartitionIds()
{
for (int i = 1; i < 256; i++)
{
var ids = PartitionId.GetPartitions(i);
ids.Length.Should().BeGreaterOrEqualTo(i);
ids.Length.Should().BeLessOrEqualTo(i * 2);
ids.All(id => id.Width == ids[0].Width).Should().BeTrue();
ids.All(id => id.EndValue >= id.StartValue).Should().BeTrue();
for (int j = 1; j < ids.Length; j++)
{
ids[j].StartValue.Should().BeGreaterThan(ids[j - 1].EndValue);
}
}
}
[Fact]
public void TestEntryFormat()
{

Просмотреть файл

@ -8,6 +8,7 @@ using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed;
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Distributed.Test.MetadataService;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Interfaces.Time;
@ -51,8 +52,9 @@ namespace ContentStoreTest.Distributed.Sessions
protected override DistributedCacheServiceArguments ModifyArguments(DistributedCacheServiceArguments arguments)
{
arguments.Configuration.DistributedContentSettings.EnableGlobalCacheLocationStoreValidation = true;
arguments.Configuration.DistributedContentSettings.GlobalCacheDatabaseValidationMode = DatabaseValidationMode.Log;
arguments.Configuration.DistributedContentSettings.ContentMetadataUseMergeWrites = UseMergeOperators;
arguments.Configuration.DistributedContentSettings.EnableIndependentBackgroundMasterElection = true;
return base.ModifyArguments(arguments);
}

Просмотреть файл

@ -476,7 +476,7 @@ namespace ContentStoreTest.Distributed.Sessions
[Theory]
[InlineData(1)]
[InlineData(0)]
public Task PinWithUnverifiedCountAndStartCopy(int threshold)
public virtual Task PinWithUnverifiedCountAndStartCopyWithThreshold(int threshold)
{
_overrideDistributed = s =>
{
@ -1615,12 +1615,12 @@ namespace ContentStoreTest.Distributed.Sessions
await workerStore.RegisterLocalLocationAsync(context, new[] { new ContentHashWithSize(hash, 120) }, Token, UrgencyHint.Nominal, touch: true).ShouldBeSuccess();
TestClock.UtcNow += TimeSpan.FromMinutes(2);
await masterStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
await CreateCheckpointAsync(masterStore, context).ShouldBeSuccess();
for (int sessionIndex = 0; sessionIndex < storeCount; sessionIndex++)
{
// Heartbeat to ensure machine receives checkpoint
await context.GetLocationStore(sessionIndex).LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
await RestoreCheckpointAsync(sessionIndex, context).ShouldBeSuccess();
// Pin the content in the session which should fail with content not found
await PinContentForSession(sessionIndex).ShouldBeContentNotFound();
@ -2006,13 +2006,13 @@ namespace ContentStoreTest.Distributed.Sessions
TestClock.UtcNow += TimeSpan.FromMinutes(masterLeaseExpiryTime.TotalMinutes / 2);
// Save checkpoint by heartbeating master
await masterRedisStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
await CreateCheckpointAsync(masterRedisStore, context).ShouldBeSuccess();
// Verify file was uploaded
// Verify file was skipped (if not first iteration)
// Restore checkpoint by heartbeating worker
await workerRedisStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
await RestoreCheckpointAsync(workerRedisStore, context).ShouldBeSuccess();
// Files should be uploaded by master and downloaded by worker
diff(masterRedisStore.LocalLocationStore.Counters, masterCounters, ContentLocationStoreCounters.IncrementalCheckpointFilesUploaded).Should().BePositive();
@ -2152,10 +2152,10 @@ namespace ContentStoreTest.Distributed.Sessions
TestClock.UtcNow += TimeSpan.FromMinutes(masterLeaseExpiryTime.TotalMinutes / 2);
// Save checkpoint by heartbeating master
await masterRedisStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
await CreateCheckpointAsync(masterRedisStore, context).ShouldBeSuccess();
// Restore checkpoint by heartbeating worker
await workerRedisStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
await RestoreCheckpointAsync(workerRedisStore, context).ShouldBeSuccess();
// Files should be uploaded by master and downloaded by worker
diff(masterRedisStore.LocalLocationStore.Counters, masterCounters, ContentLocationStoreCounters.IncrementalCheckpointFilesUploaded).Should().BePositive();
@ -3046,38 +3046,6 @@ namespace ContentStoreTest.Distributed.Sessions
return putResult0.ContentHash;
}
private async Task UploadCheckpointOnMasterAndRestoreOnWorkers(TestContext context, bool reconcile = false, string clearStoragePrefix = null)
{
// Update time to trigger checkpoint upload and restore on master and workers respectively
TestClock.UtcNow += TimeSpan.FromMinutes(2);
var masterStore = context.GetMaster();
// Heartbeat master first to upload checkpoint
await masterStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
if (reconcile)
{
await masterStore.ReconcileAsync(context, force: true).ShouldBeSuccess();
}
if (clearStoragePrefix != null)
{
await StorageProcess.ClearAsync(clearStoragePrefix);
}
// Next heartbeat workers to restore checkpoint
foreach (var workerStore in context.EnumerateWorkers())
{
await workerStore.LocalLocationStore.HeartbeatAsync(context).ShouldBeSuccess();
if (reconcile)
{
await workerStore.ReconcileAsync(context, force: true).ShouldBeSuccess();
}
}
}
#region SAS Tokens Tests
[Fact(Skip = "For manual testing only. Requires storage account credentials")]
public async Task BlobCentralStorageCredentialsUpdate()

Просмотреть файл

@ -4,12 +4,14 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed;
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming;
using BuildXL.Cache.ContentStore.Distributed.Redis;
using BuildXL.Cache.ContentStore.Distributed.Stores;
@ -360,6 +362,7 @@ namespace ContentStoreTest.Distributed.Sessions
};
}
settings = ModifySettings(settings);
var configuration = new DistributedCacheServiceConfiguration(localCasSettings, settings);
var arguments = new DistributedCacheServiceArguments(
@ -383,6 +386,8 @@ namespace ContentStoreTest.Distributed.Sessions
return CreateStore(context, arguments);
}
protected virtual TestDistributedContentSettings ModifySettings(TestDistributedContentSettings dcs) => dcs;
protected virtual TestServerProvider CreateStore(Context context, DistributedCacheServiceArguments arguments)
{
if (UseGrpcServer)
@ -397,6 +402,80 @@ namespace ContentStoreTest.Distributed.Sessions
}
}
protected virtual Task<BoolResult> CreateCheckpointAsync(InstanceRef storeRef, TestContext context)
{
return context.GetLocalLocationStore(storeRef.ResolveIndex(context)).HeartbeatAsync(context);
}
protected virtual Task<BoolResult> RestoreCheckpointAsync(InstanceRef storeRef, TestContext context)
{
return context.GetLocalLocationStore(storeRef.ResolveIndex(context)).HeartbeatAsync(context);
}
protected async Task UploadCheckpointOnMasterAndRestoreOnWorkers(TestContext context, bool reconcile = false, string clearStoragePrefix = null)
{
// Update time to trigger checkpoint upload and restore on master and workers respectively
TestClock.UtcNow += TimeSpan.FromMinutes(2);
var masterStore = context.GetMaster();
// Heartbeat master first to upload checkpoint
await CreateCheckpointAsync(masterStore, context).ShouldBeSuccess();
if (reconcile)
{
await masterStore.ReconcileAsync(context, force: true).ShouldBeSuccess();
}
if (clearStoragePrefix != null)
{
await StorageProcess.ClearAsync(clearStoragePrefix);
}
// Next heartbeat workers to restore checkpoint
foreach (var workerStore in context.EnumerateWorkers())
{
await RestoreCheckpointAsync(workerStore, context).ShouldBeSuccess();
if (reconcile)
{
await workerStore.ReconcileAsync(context, force: true).ShouldBeSuccess();
}
}
}
protected record struct InstanceRef(
LocalLocationStore LocalLocationStore = null,
TransitioningContentLocationStore LocationStore = null,
int? Index = null)
{
public int ResolveIndex(TestContext context)
{
return Index
?? ResolveIndex(context, LocalLocationStore, (c, i) => c.GetLocalLocationStore(i))
?? ResolveIndex(context, LocationStore, (c, i) => c.GetLocationStore(i))
?? throw Contract.AssertFailure("Could not find instance");
}
public int? ResolveIndex<T>(TestContext context, T instance, Func<TestContext, int, T> getIndexInstance)
where T : class
{
for (int i = 0; i < context.Stores.Count; i++)
{
if (getIndexInstance(context, i) == instance)
{
return i;
}
}
return null;
}
public static implicit operator InstanceRef(LocalLocationStore value) => new InstanceRef(LocalLocationStore: value);
public static implicit operator InstanceRef(int value) => new InstanceRef(Index: value);
public static implicit operator InstanceRef(TransitioningContentLocationStore value) => new InstanceRef(LocationStore: value);
}
protected async Task OpenStreamAndDisposeAsync(IContentSession session, Context context, ContentHash hash)
{
var openResult = await session.OpenStreamAsync(context, hash, Token).ShouldBeSuccess();

Просмотреть файл

@ -1,8 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
@ -21,6 +23,25 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
public ShortHashTests(ITestOutputHelper helper) => _helper = helper;
public static IEnumerable<object[]> HashTypes => HashInfoLookup.All().Distinct().Select(i => new object[] { i.HashType });
[Theory]
[MemberData(nameof(HashTypes))]
public void ParseAllHashTypes(HashType hashType)
{
var hash = ContentHash.Random(hashType);
var stringHash = hash.ToShortString();
// Test using TryParse
ShortHash.TryParse(stringHash, out var shortHash).Should().BeTrue();
var expectedShortHash = hash.AsShortHash();
shortHash.Should().Be(expectedShortHash);
// Test using constructor
shortHash = new ShortHash(stringHash);
shortHash.Should().Be(expectedShortHash);
}
[Fact]
public void ShortHashBinaryRoundtrip()
{

Просмотреть файл

@ -269,4 +269,12 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.MetadataService
_redisFixture.Dispose();
}
}
public static partial class TestExtensions
{
public static Task OnRoleUpdatedAsync(this ResilientGlobalCacheService service, OperationContext context, Role role)
{
return service.OnRoleUpdatedAsync(context, new MasterElectionState(default, role, default));
}
}
}

Просмотреть файл

@ -363,6 +363,11 @@ namespace ContentStoreTest.Distributed.Sessions
return GetServices(idx).RedisGlobalStore.Instance;
}
internal BlobContentLocationRegistry GetBlobContentLocationRegistry(int idx)
{
return GetServices(idx).BlobContentLocationRegistry.GetRequiredInstance();
}
internal TransitioningContentLocationStore GetLocationStore(int idx) =>
((TransitioningContentLocationStore)GetDistributedSession(idx).ContentLocationStore);

Просмотреть файл

@ -428,7 +428,7 @@ namespace BuildXL.Cache.ContentStore.Hashing
/// </summary>
public static bool TryParse(string serialized, out ContentHash contentHash)
{
return TryParse(serialized, out contentHash, null);
return TryParse(serialized, out contentHash, isShortHash: false);
}
/// <summary>
@ -436,13 +436,13 @@ namespace BuildXL.Cache.ContentStore.Hashing
/// </summary>
public static bool TryParse(HashType hashType, string serialized, out ContentHash contentHash)
{
return TryParse(hashType, serialized, out contentHash, null);
return TryParse(hashType, serialized, out contentHash, isShortHash: false);
}
/// <summary>
/// Attempt to create from a known type string.
/// </summary>
internal static bool TryParse(string serialized, out ContentHash contentHash, int? expectedStringLength)
internal static bool TryParse(string serialized, out ContentHash contentHash, bool isShortHash)
{
Contract.Requires(serialized != null);
@ -473,17 +473,17 @@ namespace BuildXL.Cache.ContentStore.Hashing
return false;
}
return TryParse(hashType, hash, out contentHash, expectedStringLength);
return TryParse(hashType, hash, out contentHash, isShortHash);
}
/// <summary>
/// Attempt to create from a known type and string (without type).
/// </summary>
internal static bool TryParse(HashType hashType, string serialized, out ContentHash contentHash, int? expectedStringLength)
internal static bool TryParse(HashType hashType, string serialized, out ContentHash contentHash, bool isShortHash)
{
Contract.Requires(serialized != null);
if (serialized.Length != (expectedStringLength ?? HashInfoLookup.Find(hashType).StringLength))
if (serialized.Length != (isShortHash ? ShortHash.HashStringLength : HashInfoLookup.Find(hashType).StringLength))
{
contentHash = default(ContentHash);
return false;
@ -497,10 +497,13 @@ namespace BuildXL.Cache.ContentStore.Hashing
contentHash = new ContentHash(hashType, bytes);
if (HashInfoLookup.Find(hashType) is TaggedHashInfo && !AlgorithmIdHelpers.IsHashTagValid(contentHash))
if (!isShortHash)
{
contentHash = default(ContentHash);
return false;
if (HashInfoLookup.Find(hashType) is TaggedHashInfo && !AlgorithmIdHelpers.IsHashTagValid(contentHash))
{
contentHash = default(ContentHash);
return false;
}
}
return true;

Просмотреть файл

@ -29,6 +29,11 @@ namespace BuildXL.Cache.ContentStore.Hashing
/// </summary>
public const int HashLength = SerializedLength - 1;
/// <summary>
/// The length in hex characters of the hash portion of a short hash. NOTE: This does NOT include characters for the hash type
/// </summary>
public const int HashStringLength = HashLength * 2;
/// <nodoc />
public readonly ShortReadOnlyFixedBytes Value;
@ -78,7 +83,7 @@ namespace BuildXL.Cache.ContentStore.Hashing
/// </summary>
public static bool TryParse(string str, out ShortHash result)
{
if (ContentHash.TryParse(str, out var longHash, expectedStringLength: HashLength * 2))
if (ContentHash.TryParse(str, out var longHash, isShortHash: true))
{
result = longHash.AsShortHash();
return true;

Просмотреть файл

@ -235,11 +235,11 @@ namespace BuildXL.Cache.ContentStore.Interfaces.Results
/// <summary>
/// Maps result into different result type or propagates error to result type
/// </summary>
public static Result<TResult> Select<T, TResult>(this Result<T> result, Func<T, TResult> selector)
public static Result<TResult> Select<T, TResult>(this Result<T> result, Func<T, TResult> selector, bool isNullAllowed = false)
{
if (result.Succeeded)
{
return Result.Success(selector(result.Value));
return Result.Success(selector(result.Value), isNullAllowed: isNullAllowed);
}
else
{

Просмотреть файл

@ -20,6 +20,12 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
private static readonly byte[] B2 =
new List<byte> {2}.Concat(Enumerable.Repeat((byte)0, ContentHash.MaxHashByteLength - 1)).ToArray();
public static IEnumerable<object[]> HashTypes => HashInfoLookup.All().Distinct().Select(i => new object[] { i.HashType });
public static IEnumerable<object[]> HashTypesWithByteLengths => HashInfoLookup.All().Distinct().Select(i => new object[] { i.HashType, i.ByteLength });
public static IEnumerable<object[]> HashTypesWithStringLengths => HashInfoLookup.All().Distinct().Select(i => new object[] { i.HashType, i.StringLength });
[Fact]
public void TestGetHashCodeWithDefaultInstanceShouldNotThrow()
{
@ -29,12 +35,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5, 16)]
[InlineData(HashType.SHA1, 20)]
[InlineData(HashType.SHA256, 32)]
[InlineData(HashType.Vso0, 33)]
[InlineData(HashType.Dedup64K, 33)]
[InlineData(HashType.Dedup1024K, 33)]
[MemberData(nameof(HashTypesWithByteLengths))]
public void ValidByteLength(HashType hashType, int length)
{
var randomHash = ContentHash.Random(hashType);
@ -43,24 +44,14 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5, 32)]
[InlineData(HashType.SHA1, 40)]
[InlineData(HashType.SHA256, 64)]
[InlineData(HashType.Vso0, 66)]
[InlineData(HashType.Dedup64K, 66)]
[InlineData(HashType.Dedup1024K, 66)]
[MemberData(nameof(HashTypesWithStringLengths))]
public void ValidStringLength(HashType hashType, int length)
{
Assert.Equal(length, ContentHash.Random(hashType).StringLength);
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void RandomValue(HashType hashType)
{
var v = ContentHash.Random(hashType);
@ -71,15 +62,17 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
{
Assert.Equal(v[hashInfo.ByteLength - 1], taggedHashInfo.AlgorithmId);
}
var stringHash = v.Serialize();
Assert.True(ContentHash.TryParse(stringHash, out var parsedHash));
Assert.Equal(v, parsedHash);
parsedHash = new ContentHash(stringHash);
Assert.Equal(v, parsedHash);
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void MismatchLengthThrows(HashType hashType)
{
var b = Enumerable.Repeat((byte)0, 15).ToArray();
@ -104,12 +97,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void EqualsTrueReferenceType(HashType hashType)
{
var hash = ContentHash.Random(hashType);
@ -117,12 +105,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void EqualsFalseReferenceType(HashType hashType)
{
var h1 = ContentHash.Random(hashType);
@ -131,12 +114,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void GetHashCodeEqual(HashType hashType)
{
var h1 = new ContentHash(hashType, Zeros);
@ -145,12 +123,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void GetHashCodeNotEqual(HashType hashType)
{
var h1 = ContentHash.Random(hashType);
@ -159,12 +132,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void CompareToEqual(HashType hashType)
{
var h1 = new ContentHash(hashType, Zeros);
@ -173,12 +141,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void CompareToLessThan(HashType hashType)
{
var h1 = new ContentHash(hashType, B1);
@ -187,12 +150,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void CompareToGreaterThan(HashType hashType)
{
var h1 = new ContentHash(hashType, B1);
@ -216,12 +174,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void EqualityOperatorTrue(HashType hashType)
{
var hash1 = new ContentHash(hashType, B1);
@ -230,12 +183,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void EqualityOperatorFalse(HashType hashType)
{
var h1 = new ContentHash(hashType, B1);
@ -244,12 +192,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void InequalityOperatorFalse(HashType hashType)
{
var h1 = new ContentHash(hashType, B1);
@ -258,12 +201,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void InequalityOperatorTrue(HashType hashType)
{
var h1 = new ContentHash(hashType, B1);
@ -276,12 +214,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void EqualContentHashRoundTripViaSpan(HashType hashType)
{
var h1 = ContentHash.Random(hashType);
@ -294,12 +227,7 @@ namespace BuildXL.Cache.ContentStore.InterfacesTest.Hashing
}
[Theory]
[InlineData(HashType.MD5)]
[InlineData(HashType.SHA1)]
[InlineData(HashType.SHA256)]
[InlineData(HashType.Vso0)]
[InlineData(HashType.Dedup64K)]
[InlineData(HashType.Dedup1024K)]
[MemberData(nameof(HashTypes))]
public void EqualContentHashRoundTripViaHexString(HashType hashType)
{
var h1 = ContentHash.Random(hashType);

Просмотреть файл

@ -167,7 +167,7 @@ namespace BuildXL.Cache.ContentStore.Extensions
/// <summary>
/// Pseudorandomly enumerates the range from [0, <paramref name="length"/>)
/// </summary>
public static IEnumerable<int> PseudoRandomEnumerate(int length)
public static IEnumerable<int> PseudoRandomEnumerateRange(int length)
{
var offset = ThreadSafeRandom.Generator.Next(0, length);
var current = ThreadSafeRandom.Generator.Next(0, length);
@ -178,6 +178,17 @@ namespace BuildXL.Cache.ContentStore.Extensions
}
}
/// <summary>
/// Pseudorandomly enumerates the items in the list
/// </summary>
public static IEnumerable<T> PseudoRandomEnumerate<T>(this IReadOnlyList<T> list)
{
foreach (var index in PseudoRandomEnumerateRange(list.Count))
{
yield return list[index];
}
}
/// <summary>
/// Gets a unique, pseudorandom value between [0, length).
///

Просмотреть файл

@ -24,18 +24,20 @@ namespace ContentStoreTest.Extensions
{
for (int length = 0; length < 1024; length++)
{
var randomRange = EnumerableExtensions.PseudoRandomEnumerate(length).ToArray();
var sortedRange = Enumerable.Range(0, length).ToArray();
var randomRange = EnumerableExtensions.PseudoRandomEnumerateRange(length).ToArray();
var randomEnumeration = sortedRange.PseudoRandomEnumerate().ToArray();
// Technically, there is a possibility that the random range will match the sorted
// range. Use a high enough length so that possibility is excluded.
if (length > 10)
{
Assert.NotEqual(sortedRange, randomRange);
Assert.NotEqual(sortedRange, randomEnumeration);
}
Assert.Equal(sortedRange, randomRange.OrderBy(i => i));
Assert.Equal(sortedRange, randomEnumeration.OrderBy(i => i));
}
}

Просмотреть файл

@ -681,7 +681,7 @@ namespace BuildXL.Cache.Host.Configuration
#endregion
[DataMember]
public bool EnableGlobalCacheLocationStoreValidation { get; set; } = false;
public EnumSetting<DatabaseValidationMode> GlobalCacheDatabaseValidationMode { get; set; } = DatabaseValidationMode.None;
[DataMember]
public bool IsMasterEligible { get; set; } = false;
@ -1162,7 +1162,7 @@ namespace BuildXL.Cache.Host.Configuration
public bool ContentMetadataDisableDatabaseRegisterLocation { get; set; }
[DataMember]
public bool ContentMetadataEnableResilience { get; set; }
public bool ContentMetadataEnableResilience { get; set; } = true;
[DataMember]
public bool ContentMetadataOptimizeWrites { get; set; }
@ -1325,6 +1325,24 @@ namespace BuildXL.Cache.Host.Configuration
public LocalLocationStoreSettings LocationStoreSettings { get; set; } = new();
}
public enum DatabaseValidationMode
{
/// <summary>
/// No validation
/// </summary>
None,
/// <summary>
/// Log metrics and presence in GCS for all copies
/// </summary>
Log,
/// <summary>
/// Log and raise error. This mainly for unit test validation.
/// </summary>
LogAndError,
}
[Flags]
public enum RegisterHintHandling
{

Просмотреть файл

@ -42,7 +42,7 @@ namespace BuildXL.Cache.Host.Configuration
/// </summary>
public bool EnableBlobContentLocationRegistry { get; set; }
public BlobContentLocationRegistrySettings BlobContentLocationRegistrySettings { get; }
public BlobContentLocationRegistrySettings BlobContentLocationRegistrySettings { get; set; } = new BlobContentLocationRegistrySettings();
}
public record BlobContentLocationRegistrySettings
@ -53,7 +53,7 @@ namespace BuildXL.Cache.Host.Configuration
public string FolderName { get; set; } = "partitions";
public string PartitionCheckpointManifestFileName { get; set; } = "manifest.json";
public string PartitionCheckpointManifestFileName { get; set; } = "manifest.v2.json";
/// <summary>
/// Indicates whether partitions are updated in the background on a timer loop
@ -68,6 +68,36 @@ namespace BuildXL.Cache.Host.Configuration
/// <summary>
/// Interval between updates of partitions output blob
/// </summary>
public TimeSpanSetting PartitionsUpdateInterval { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpanSetting PartitionsUpdateInterval { get; set; } = TimeSpan.FromMinutes(10);
/// <summary>
/// Gets whether partitions should be processed into output blobs (i.e. containing sst files and content listings)
/// </summary>
public bool ProcessPartitions { get; set; } = true;
/// <summary>
/// Maximum number of diff sst snapshots for a particular partition allowed before using full sst snapshot instead.
/// </summary>
public int MaxSnapshotChainLength { get; set; } = 5;
/// <summary>
/// Maximum number of diff sst snapshots for a particular partition allowed before using full sst snapshot instead.
/// </summary>
public int MaxRetainedSnapshots => Math.Max(1, (MaxSnapshotChainLength * 2));
/// <summary>
/// Maximum parallelism for sst file download
/// </summary>
public int MaxDegreeOfParallelism { get; set; } = 4;
/// <summary>
/// Gets whether the local database should be updated with sst files
/// </summary>
public bool UpdateDatabase { get; set; } = false;
/// <summary>
/// The number of partitions to create. Changing this number causes partition to be recomputed
/// </summary>
public int PartitionCount { get; set; } = 256;
}
}

Просмотреть файл

@ -12,6 +12,7 @@ using BuildXL.Cache.ContentStore.Distributed;
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Distributed.Stores;
using BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;

Просмотреть файл

@ -472,32 +472,6 @@ namespace TypeScript.Net.Extensions
}
}
/// <summary>
/// Converts sequence to dictionary, but accepts duplicate keys. First will win.
/// </summary>
public static Dictionary<TKey, TValue> ToDictionarySafe<T, TKey, TValue>(this IEnumerable<T> source, Func<T, TKey> keySelector,
Func<T, TValue> valueSelector)
{
Contract.Requires(source != null);
Contract.Requires(keySelector != null);
Contract.Requires(valueSelector != null);
Dictionary<TKey, TValue> result = new Dictionary<TKey, TValue>();
foreach (var element in source)
{
var key = keySelector(element);
var value = valueSelector(element);
if (!result.ContainsKey(key))
{
result.Add(key, value);
}
}
return result;
}
/// <summary>
/// Returns whether a <paramref name="source"/> is null or empty.
/// </summary>

Просмотреть файл

@ -9,6 +9,7 @@ using System.Linq;
using System.Numerics;
using BuildXL.FrontEnd.Script.Constants;
using BuildXL.Utilities;
using BuildXL.Utilities.Collections;
using TypeScript.Net.Diagnostics;
using TypeScript.Net.Extensions;
using TypeScript.Net.Types;

Просмотреть файл

@ -190,6 +190,57 @@ namespace BuildXL.Utilities.Collections
return array;
}
/// <summary>
/// Converts sequence to dictionary, but accepts duplicate keys. First will win.
/// </summary>
public static Dictionary<TKey, TValue> ToDictionarySafe<TKey, TValue>(this IEnumerable<TValue> source, Func<TValue, TKey> keySelector)
where TKey : notnull
{
Contract.Requires(source != null);
Contract.Requires(keySelector != null);
Dictionary<TKey, TValue> result = new Dictionary<TKey, TValue>();
foreach (var value in source)
{
var key = keySelector(value);
if (!result.ContainsKey(key))
{
result.Add(key, value);
}
}
return result;
}
/// <summary>
/// Converts sequence to dictionary, but accepts duplicate keys. First will win.
/// </summary>
public static Dictionary<TKey, TValue> ToDictionarySafe<T, TKey, TValue>(this IEnumerable<T> source, Func<T, TKey> keySelector,
Func<T, TValue> valueSelector)
where TKey : notnull
{
Contract.Requires(source != null);
Contract.Requires(keySelector != null);
Contract.Requires(valueSelector != null);
Dictionary<TKey, TValue> result = new Dictionary<TKey, TValue>();
foreach (var element in source)
{
var key = keySelector(element);
var value = valueSelector(element);
if (!result.ContainsKey(key))
{
result.Add(key, value);
}
}
return result;
}
/// <summary>
/// Clones the existing dictionary with no enumerator allocations.
/// </summary>

Просмотреть файл

@ -76,6 +76,8 @@ namespace Test.BuildXL.TestUtilities.Xunit
return;
}
CheckRequirement(TestRequirements.NotSupported, () => "Test is marked not supported.");
CheckRequirement(
TestRequirements.Admin,
() =>

Просмотреть файл

@ -67,5 +67,10 @@ namespace Test.BuildXL.TestUtilities.Xunit
/// Requires running on either Windows or Mac operating system (excluding Linux)
/// </summary>
WindowsOrMacOs = 1 << 9,
/// <summary>
/// Used to disable a test. Typically used with #ifdef
/// </summary>
NotSupported = 1 << 10,
}
}

Просмотреть файл

@ -19,4 +19,20 @@ namespace BuildXL.Utilities
return value.Value;
}
}
/// <summary>
/// Helper methods for <see cref="AsyncOut{T}"/>
/// </summary>
public static class AsyncOut
{
/// <summary>
/// Allows inline declaration of <see cref="AsyncOut{T}"/> patterns like the
/// (out T parameter) pattern. Usage: await ExecuteAsync(out AsyncOut.Var&lt;T&gt;(out var outParam));
/// </summary>
public static AsyncOut<T> Var<T>(out AsyncOut<T> value)
{
value = new AsyncOut<T>();
return value;
}
}
}

Просмотреть файл

@ -141,7 +141,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "BuildXL.Azurite.Executables",
"Version": "1.0.0-CI-20220125-034149"
"Version": "1.0.0-CI-20220612-002122"
}
}
},
@ -2868,7 +2868,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "RocksDbNative",
"Version": "6.10.2-b20220610.3"
"Version": "6.10.2-b20220610.4"
}
}
},
@ -2877,7 +2877,7 @@
"Type": "NuGet",
"NuGet": {
"Name": "RocksDbSharp",
"Version": "6.10.2-b20220610.3"
"Version": "6.10.2-b20220610.4"
}
}
},

Просмотреть файл

@ -170,11 +170,11 @@ config({
{ id: "Microsoft.Windows.ProjFS", version: "1.2.19351.1" },
// RocksDb
{ id: "RocksDbSharp", version: "6.10.2-b20220610.3", alias: "RocksDbSharpSigned",
{ id: "RocksDbSharp", version: "6.10.2-b20220610.4", alias: "RocksDbSharpSigned",
dependentPackageIdsToSkip: [ "System.Memory" ],
dependentPackageIdsToIgnore: [ "System.Memory" ]
},
{ id: "RocksDbNative", version: "6.10.2-b20220610.3" },
{ id: "RocksDbNative", version: "6.10.2-b20220610.4" },
{ id: "JsonDiffPatch.Net", version: "2.1.0" },
@ -257,7 +257,7 @@ config({
// Azurite node app compiled to standalone executable
// Sources for this package are: https://github.com/Azure/Azurite
// This packaged is produced by the pipeline: https://dev.azure.com/mseng/Domino/_build?definitionId=13199
{ id: "BuildXL.Azurite.Executables", version: "1.0.0-CI-20220125-034149" },
{ id: "BuildXL.Azurite.Executables", version: "1.0.0-CI-20220612-002122" },
// It turns out Redis-64 ( https://www.nuget.org/packages/redis-64/ ) was deprecated several years
// ago, and so we can't even build with it due to component governance. We don't actually care about