Merged PR 728101: BlobLifetimeManager supports multiple universes/namespaces, reading from the change feed, and checkpointing

As preparation for checkpointing, which requires that garbage collection happens for the whole storage account at once, it makes sense to first support garbage collecting multiple namespaces/universes in a single garbage collection run. The idea is that instead of accessing the DB directly, there is an IAccessor, which limits the view of the database to only a given namespace. In practice, what this means is that each accessor will have a unique set of RocksDb column families that it accesses. Other than that, the logic to create/manage the database stays the same.

Another change is that we can now update our view of the world in subsequent runs via reading Azure Storage's change feed. This is extremely important since otherwise, nothing works: on the first run, since we touch everything, nothing is evictable; and on the second run, such a long time has passed that without updating our view of things, we might be deleting blobs with new references.

Finally, after both these changes, I also implemented checkpointing. The checkpoint and all its data will live in different containers in the 0th shard of the cache, as different-sized caches _are different caches_, regardless of whether they share accounts. Ideally, we won't have this ever since we're the ones resharding, but even today we already have that problem since some of our tests are not using all 100 accounts we've provisioned.
This commit is contained in:
Juan Carlos Guzman Islas 2023-08-15 22:43:19 +00:00
Родитель b64183f7f7
Коммит 911efd84c6
37 изменённых файлов: 2214 добавлений и 655 удалений

Просмотреть файл

@ -167,6 +167,7 @@ export function getAzureBlobStorageSdkPackagesWithoutNetStandard() : (Managed.Ma
importFrom("Azure.Storage.Common").pkg,
importFrom("Azure.Core").pkg,
importFrom("Azure.Storage.Blobs.Batch").pkg,
importFrom("Azure.Storage.Blobs.ChangeFeed").pkg,
];
}

Просмотреть файл

@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Text.RegularExpressions;
namespace BuildXL.Cache.ContentStore.Distributed.Blob
{
/// <summary>
/// This absolute path is gotten from the Azure Blob change feed. It uniquely identifies a blob within the cache.
/// </summary>
public readonly record struct AbsoluteBlobPath(BlobCacheStorageAccountName Account, BlobCacheContainerName Container, BlobPath Path)
{
private readonly static Regex BlobChangeFeedEventSubjectRegex = new(@"/blobServices/default/containers/(?<container>[^/]+)/blobs/(?<path>.+)", RegexOptions.IgnoreCase | RegexOptions.Compiled | RegexOptions.CultureInvariant);
public static AbsoluteBlobPath ParseFromChangeEventSubject(BlobCacheStorageAccountName account, string subject)
{
var match = BlobChangeFeedEventSubjectRegex.Match(subject);
if (!match.Success)
{
throw new ArgumentException($"Failed to match {nameof(BlobChangeFeedEventSubjectRegex)} to {subject}", nameof(subject));
}
var container = BlobCacheContainerName.Parse(match.Groups["container"].Value);
var path = new BlobPath(match.Groups["path"].Value, relative: false);
return new(Account: account, Container: container, Path: path);
}
}
}

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace BuildXL.Cache.ContentStore.Distributed.Blob
{
/// <summary>
/// This uniquely describes a namespace in a blob cache. Each namespace is garbage-collected
/// as a separate cache from other namespaces
/// </summary>
public readonly record struct BlobNamespaceId(string Universe, string Namespace)
{
public override string ToString() => $"{Universe}-{Namespace}";
}
}

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Linq;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
#nullable enable
namespace BuildXL.Cache.ContentStore.Distributed.Blob
{
public class EnvironmentVariableCacheSecretsProvider : StaticBlobCacheSecretsProvider
{
public EnvironmentVariableCacheSecretsProvider(string environmentVariableName)
: base(ExtractCredsFromEnvironmentVariable(environmentVariableName))
{
}
public static Dictionary<BlobCacheStorageAccountName, AzureStorageCredentials> ExtractCredsFromEnvironmentVariable(string environmentVariableName)
{
var connectionStringsString = Environment.GetEnvironmentVariable(environmentVariableName);
if (string.IsNullOrEmpty(connectionStringsString))
{
throw new ArgumentException($"Connections strings for the L3 cache must be provided via the {environmentVariableName} environment variable " +
$"in the format of comma-separated strings.");
}
var connectionStrings = connectionStringsString.Split(',');
var creds = connectionStrings.Select(connString => new AzureStorageCredentials(new PlainTextSecret(connString))).ToArray();
return creds.ToDictionary(
cred => BlobCacheStorageAccountName.Parse(cred.GetAccountName()),
cred => cred);
}
}
}

Просмотреть файл

@ -1,9 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Collections.Generic;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Utilities.Collections;
#nullable enable
@ -19,6 +21,5 @@ public interface IBlobCacheSecretsProvider
/// </summary>
public Task<AzureStorageCredentials> RetrieveBlobCredentialsAsync(
OperationContext context,
BlobCacheStorageAccountName account,
BlobCacheContainerName container);
BlobCacheStorageAccountName account);
}

Просмотреть файл

@ -116,7 +116,7 @@ public class ShardedBlobCacheTopology : IBlobCacheTopology
}).ToArray();
}
internal static (string Metadata, string Content) GenerateMatrix(ShardingScheme scheme)
public static (string Metadata, string Content) GenerateMatrix(ShardingScheme scheme)
{
// The matrix here ensures that metadata does not overlap across sharding schemes. Basically, whenever we add
// or remove shards (or change the sharding algorithm), we will get a new salt. This salt will force us to use
@ -206,7 +206,7 @@ public class ShardedBlobCacheTopology : IBlobCacheTopology
Tracer,
async context =>
{
var credentials = await _configuration.SecretsProvider.RetrieveBlobCredentialsAsync(context, account, container);
var credentials = await _configuration.SecretsProvider.RetrieveBlobCredentialsAsync(context, account);
BlobClientOptions blobClientOptions = new(BlobClientOptions.ServiceVersion.V2021_02_12)
{

Просмотреть файл

@ -33,7 +33,7 @@ public enum ShardingAlgorithm
/// <summary>
/// Specifies a sharding scheme.
/// </summary>
public record ShardingScheme(ShardingAlgorithm Scheme, List<BlobCacheStorageAccountName> Accounts)
public record ShardingScheme(ShardingAlgorithm Scheme, IReadOnlyList<BlobCacheStorageAccountName> Accounts)
{
public IShardingScheme<int, BlobCacheStorageAccountName> Create()
{

Просмотреть файл

@ -3,6 +3,7 @@
#nullable enable
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Tracing;
@ -17,23 +18,28 @@ public class StaticBlobCacheSecretsProvider : IBlobCacheSecretsProvider
{
protected static Tracer Tracer { get; } = new(nameof(StaticBlobCacheSecretsProvider));
public IReadOnlyList<BlobCacheStorageAccountName> ConfiguredAccounts => _accounts;
private readonly AzureStorageCredentials? _fallback;
private readonly IReadOnlyDictionary<BlobCacheStorageAccountName, AzureStorageCredentials> _credentials = new Dictionary<BlobCacheStorageAccountName, AzureStorageCredentials>();
private readonly IReadOnlyList<BlobCacheStorageAccountName> _accounts;
public StaticBlobCacheSecretsProvider(IReadOnlyDictionary<BlobCacheStorageAccountName, AzureStorageCredentials> credentials, AzureStorageCredentials? fallback = null)
{
_credentials = credentials;
_accounts = _credentials.Keys.ToArray();
_fallback = fallback;
}
public StaticBlobCacheSecretsProvider(AzureStorageCredentials fallback)
{
_fallback = fallback;
_accounts = _credentials.Keys.ToArray();
}
public Task<AzureStorageCredentials> RetrieveBlobCredentialsAsync(OperationContext context, BlobCacheStorageAccountName account, BlobCacheContainerName container)
public Task<AzureStorageCredentials> RetrieveBlobCredentialsAsync(OperationContext context, BlobCacheStorageAccountName account)
{
Tracer.Info(context, $"Fetching credentials. Account=[{account}] Container=[{container}]");
Tracer.Info(context, $"Fetching credentials. Account=[{account}]");
if (_credentials.TryGetValue(account, out var credentials))
{
@ -45,6 +51,6 @@ public class StaticBlobCacheSecretsProvider : IBlobCacheSecretsProvider
return Task.FromResult(_fallback);
}
throw new KeyNotFoundException($"Credentials are unavailable for storage account {account} and container {container}");
throw new KeyNotFoundException($"Credentials are unavailable for storage account {account}");
}
}

Просмотреть файл

@ -77,7 +77,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
/// <inheritdoc />
public CheckpointManager(
ContentLocationDatabase database,
ICheckpointable database,
ICheckpointRegistry checkpointRegistry,
CentralStorage storage,
CheckpointManagerConfiguration configuration,

Просмотреть файл

@ -5,6 +5,7 @@ using System;
using System.Text.RegularExpressions;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
@ -130,6 +131,28 @@ namespace BuildXL.Cache.ContentStore.Interfaces.Secrets
};
}
/// <nodoc />
public BlobChangeFeedClient CreateBlobChangeFeedClient(BlobClientOptions? blobClientOptions = null, BlobChangeFeedClientOptions? changeFeedClientOptions = null)
{
// We default to this specific version because tests run against the Azurite emulator. The emulator doesn't
// currently support any higher version than this, and we won't upgrade it because it's build process is
// weird as hell and they don't just provide binaries.
blobClientOptions ??= new BlobClientOptions(BlobClientOptions.ServiceVersion.V2021_02_12);
changeFeedClientOptions ??= new BlobChangeFeedClientOptions();
return _secret switch
{
PlainTextSecret plainText => new BlobChangeFeedClient(connectionString: plainText.Secret, blobClientOptions, changeFeedClientOptions),
UpdatingSasToken sasToken => new BlobChangeFeedClient(
serviceUri: new Uri($"https://{sasToken.Token.StorageAccount}.blob.core.windows.net/"),
credential: CreateV12StorageCredentialsFromSasToken(sasToken),
blobClientOptions,
changeFeedClientOptions),
_ => throw new NotImplementedException($"Unknown secret type `{_secret.GetType()}`")
};
}
/// <nodoc />
public BlobContainerClient CreateContainerClient(string containerName, BlobClientOptions? blobClientOptions = null)
{

Просмотреть файл

@ -3,7 +3,7 @@
namespace App {
@@public
export const exe = !BuildXLSdk.Flags.isMicrosoftInternal ? undefined : BuildXLSdk.executable({
export const exe = BuildXLSdk.executable({
assemblyName: "BuildXL.Cache.BlobLifetimeManager",
sources: globR(d`.`,"*.cs"),
references: [
@ -12,10 +12,11 @@ namespace App {
importFrom("BuildXL.Cache.ContentStore").Distributed.dll,
importFrom("BuildXL.Cache.ContentStore").Interfaces.dll,
importFrom("BuildXL.Cache.ContentStore").Hashing.dll,
importFrom("BuildXL.Cache.ContentStore").Library.dll,
importFrom("BuildXL.Cache.ContentStore").UtilitiesCore.dll,
importFrom("BuildXL.Cache.MemoizationStore").Interfaces.dll,
importFrom("BuildXL.Utilities").dll,
...importFrom("BuildXL.Cache.ContentStore").getAzureBlobStorageSdkPackages(true),
],
tools: {
csc: {

Просмотреть файл

@ -4,25 +4,25 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Interfaces.Logging;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Logging;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.BlobLifetimeManager.Library;
using BuildXL.Cache.ContentStore.Distributed.Utilities;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Utilities;
using CLAP;
using CLAP.Validation;
using BuildXL.Cache.ContentStore.Interfaces.Time;
namespace BuildXL.Cache.BlobLifetimeManager
{
public class Program
{
private const string ConnectionStringsEnvironmentVariable = "LIFETIME_MANAGER_CONNECTION_STRINGS";
private static readonly Tracer Tracer = new(nameof(Program));
private static void Main(string[] args)
{
@ -34,28 +34,14 @@ namespace BuildXL.Cache.BlobLifetimeManager
Description = $"Run the lifetime manager. Note that connections strings for the L3 cache must be provided via the {ConnectionStringsEnvironmentVariable} " +
$"environment variable in the format of comma-separated strings.")]
public static void Run(
[Description("Max size for the L3 in MB.")]
[Required]
[MoreThan(0)]
long maxSizeMB,
[Required]
[Description("Cache universe for which to perform GC.")]
string cacheUniverse,
[Required]
[Description("Cache namespace for which to perform GC.")]
string cacheNamespace,
[Description("Database path to use if you don't want to create a new one from scratch.")]
[DefaultValue(null)]
string? databasePath,
[Description("Path to the garbage collection configuration. The configuration should be a JSON document with a list of objects: { Universe:string, Namespace:string, MaxSizeGb:number }")]
string configPath,
[Description("Perform a dry run. Delete operations against blob storage will be logged, but not performed.")]
[DefaultValue(false)]
bool dryRun,
[Description("Degree of parallelism to use for each shard when enumerating content from blob storage.")]
[DefaultValue(100)]
int contentDegreeOfParallelism,
@ -64,9 +50,30 @@ namespace BuildXL.Cache.BlobLifetimeManager
[DefaultValue(10)]
int fingerprintDegreeOfParallelism,
[Required]
[Description("ID of the current GC run. This is used to identify which run produced a checkpoint.")]
string runId,
[DefaultValue(Severity.Debug)]
Severity logSeverity,
[Description("Whether to write logs to a file on disk.")]
[DefaultValue(false)]
bool enableFileLogging,
bool debug)
{
RunCoreAsync(configPath, dryRun, contentDegreeOfParallelism, fingerprintDegreeOfParallelism, runId, logSeverity, enableFileLogging, debug).GetAwaiter().GetResult();
}
public static async Task RunCoreAsync(
string configPath,
bool dryRun,
int contentDegreeOfParallelism,
int fingerprintDegreeOfParallelism,
string runId,
Severity logSeverity,
bool enableFileLogging,
bool debug)
{
if (debug)
@ -74,122 +81,47 @@ namespace BuildXL.Cache.BlobLifetimeManager
Debugger.Launch();
}
var connectionStringsString = Environment.GetEnvironmentVariable(ConnectionStringsEnvironmentVariable);
if (string.IsNullOrEmpty(connectionStringsString))
if (!File.Exists(configPath))
{
throw new ArgumentException($"Connections strings for the L3 cache must be provided via the {ConnectionStringsEnvironmentVariable} environment variable " +
$"in the format of comma-separated strings.");
Console.WriteLine($"Config file does not exist: {configPath}");
return;
}
var connectionStrings = connectionStringsString.Split(',');
BlobQuotaKeeperConfig config;
try
{
using var stream = File.OpenRead(configPath);
config = await JsonUtilities.JsonDeserializeAsync<BlobQuotaKeeperConfig>(stream);
}
catch (Exception e)
{
Console.WriteLine($"Failed to read configuration file {configPath}: {e}");
return;
}
RunCoreAsync(
maxSizeMB * 1024 * 1024,
connectionStrings,
cacheUniverse,
cacheNamespace,
databasePath,
logSeverity,
fingerprintDegreeOfParallelism,
var secretsProvider = new EnvironmentVariableCacheSecretsProvider(ConnectionStringsEnvironmentVariable);
var accounts = secretsProvider.ConfiguredAccounts;
using var logger = new Logger(new ConsoleLog(logSeverity, printSeverity: true));
if (enableFileLogging)
{
logger.AddLog(new FileLog(Path.Join(Environment.CurrentDirectory, "BlobLifetimeManager.log")));
}
using var cts = new ConsoleCancellationSource();
var context = new OperationContext(new Context(logger), cts.Token);
await new Library.BlobLifetimeManager().RunAsync(
context,
config,
secretsProvider,
accounts,
SystemClock.Instance,
runId,
contentDegreeOfParallelism,
dryRun).Wait();
}
private static async Task RunCoreAsync(
long maxSize,
string[] connectionStrings,
string cacheUniverse,
string cacheNamespace,
string? databasePath,
Severity logSeverity,
int fingerprintDegreeOfParallelism,
int contentDegreeOfParallelism,
bool dryRun)
{
using var logger = new Logger(new ConsoleLog(logSeverity), new FileLog(Path.Join(Environment.CurrentDirectory, "BlobLifetimeManager.log")));
var context = new OperationContext(new Context(logger));
var creds = connectionStrings.Select(connString => new AzureStorageCredentials(new PlainTextSecret(connString)));
var nameToCred = creds.ToDictionary(
cred => BlobCacheStorageAccountName.Parse(cred.GetAccountName()),
cred => cred);
var shardingScheme = new ShardingScheme(ShardingAlgorithm.JumpHash, nameToCred.Keys.ToList());
var config = new ShardedBlobCacheTopology.Configuration(
shardingScheme,
new StaticBlobCacheSecretsProvider(nameToCred),
cacheUniverse,
cacheNamespace);
var topology = new ShardedBlobCacheTopology(config);
RocksDbLifetimeDatabase db;
if (string.IsNullOrEmpty(databasePath))
{
// TODO: for the sake of keeping PRs as small as possible, we're not implementing checkpointing yet.
// Checkpointing should be created in the future to avoid the cost of enumerating the L3 every time this runs.
var dbCreator = new LifetimeDatabaseCreator(SystemClock.Instance, topology!);
db = await dbCreator.CreateAsync(context, contentDegreeOfParallelism, fingerprintDegreeOfParallelism);
}
else
{
if (dryRun)
{
logger.Debug("Dry run is being used with a pre-existing DB. Cloning DB to prevent state changes in future runs.");
var clonePath = databasePath + "_clone";
if(Directory.Exists(clonePath))
{
Directory.Delete(clonePath, true);
}
Directory.CreateDirectory(clonePath);
copyDirectory(databasePath, clonePath);
databasePath = clonePath;
static void copyDirectory(string sourceDir, string destinationDir)
{
var dir = new DirectoryInfo(sourceDir);
DirectoryInfo[] dirs = dir.GetDirectories();
Directory.CreateDirectory(destinationDir);
foreach (FileInfo file in dir.GetFiles())
{
string targetFilePath = Path.Combine(destinationDir, file.Name);
file.CopyTo(targetFilePath);
}
foreach (DirectoryInfo subDir in dirs)
{
string newDestinationDir = Path.Combine(destinationDir, subDir.Name);
copyDirectory(subDir.FullName, newDestinationDir);
}
}
}
db = RocksDbLifetimeDatabase.Create(
new RocksDbLifetimeDatabase.Configuration
{
DatabasePath = databasePath,
LruEnumerationPercentileStep = 0.05,
LruEnumerationBatchSize = 1000,
},
SystemClock.Instance).ThrowIfFailure();
}
Console.CancelKeyPress += delegate {
db.Dispose();
logger.Dispose();
};
using (db)
{
var manager = new Library.BlobLifetimeManager(db, topology!, SystemClock.Instance);
_ = await manager.GarbageCollectAsync(context, maxSize, dryRun, contentDegreeOfParallelism, fingerprintDegreeOfParallelism);
}
fingerprintDegreeOfParallelism,
dryRun);
}
}
}

Просмотреть файл

@ -10,8 +10,7 @@ export {BuildXLSdk};
namespace Default {
@@public
export const deployment: Deployment.Definition = !BuildXLSdk.Flags.isMicrosoftInternal ? undefined :
{
export const deployment: Deployment.Definition = {
contents: [
{
subfolder: r`App`,

Просмотреть файл

@ -0,0 +1,329 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Collections.Generic;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using System.Linq;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using Azure.Storage.Blobs.ChangeFeed;
using BuildXL.Utilities.Core.Tasks;
using System;
using Azure;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using System.Threading;
using Microsoft.WindowsAzure.Storage;
using OperationContext = BuildXL.Cache.ContentStore.Tracing.Internal.OperationContext;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
/// <summary>
/// <see cref="BlobChangeFeedEvent"/> can't be extended or instantiated, so we have to create interfaces around it for testing.
/// </summary>
internal interface IBlobChangeFeedEvent
{
DateTimeOffset EventTime { get; }
BlobChangeFeedEventType EventType { get; }
string Subject { get; }
long ContentLength { get; }
}
/// <summary>
/// For the sake of testing, and because the Azure emulator does not support the change feed, this interface encapsulates the operations
/// performed with <see cref="BlobChangeFeedClient"/>
/// </summary>
internal interface IChangeFeedClient
{
IAsyncEnumerable<Page<IBlobChangeFeedEvent>> GetChangesAsync(string? continuationToken);
IAsyncEnumerable<Page<IBlobChangeFeedEvent>> GetChangesAsync(DateTime? startTimeUtc);
}
/// <summary>
/// Reads events from the Azure Storage change feed for each accoutn in the cache and dispatches them to the database updater. This ensures that
/// our view of the remote is accurate.
/// </summary>
public class AzureStorageChangeFeedEventDispatcher
{
private static readonly Tracer Tracer = new(nameof(AzureStorageChangeFeedEventDispatcher));
private readonly IBlobCacheSecretsProvider _secretsProvider;
private readonly IReadOnlyList<BlobCacheStorageAccountName> _accounts;
private readonly LifetimeDatabaseUpdater _updater;
private readonly RocksDbLifetimeDatabase _db;
private readonly IClock _clock;
private readonly string _metadataMatrix;
private readonly string _contentMatrix;
public AzureStorageChangeFeedEventDispatcher(
IBlobCacheSecretsProvider secretsProvider,
IReadOnlyList<BlobCacheStorageAccountName> accounts,
LifetimeDatabaseUpdater updater,
RocksDbLifetimeDatabase db,
IClock clock,
string metadataMatrix,
string contentMatrix)
{
_secretsProvider = secretsProvider;
_accounts = accounts;
_updater = updater;
_db = db;
_clock = clock;
_metadataMatrix = metadataMatrix;
_contentMatrix = contentMatrix;
}
public Task<BoolResult> ConsumeNewChangesAsync(OperationContext context)
{
return context.PerformOperationAsync(
Tracer,
async () =>
{
var now = _clock.UtcNow;
using var cts = new CancellationTokenSource();
// It should be OK to do this unbounded, since we never expect a number of accounts big enough to overwhelm the system with tasks.
var tasks = _accounts.Select(accountName =>
{
return Task.Run(async () =>
{
var creds = await _secretsProvider.RetrieveBlobCredentialsAsync(context, accountName);
return await ConsumeAccountChanges(context, now, cts, accountName, creds);
});
});
await TaskUtilities.SafeWhenAll(tasks);
var aggregatedResult = BoolResult.Success;
foreach (var task in tasks)
{
var result = await task;
if (!result.Succeeded)
{
aggregatedResult &= result;
}
}
return aggregatedResult;
});
}
private async Task<BoolResult> ConsumeAccountChanges(
OperationContext context,
DateTime now,
CancellationTokenSource cts,
BlobCacheStorageAccountName accountName,
AzureStorageCredentials creds)
{
OperationContext nestedContext = context.CreateNested("StorageAccountChangeFeed").WithCancellationToken(cts.Token);
var changeFeedClient = CreateChangeFeedClient(creds);
IAsyncEnumerable<Page<IBlobChangeFeedEvent>> pagesEnumerable;
var continuationToken = _db.GetCursor(accountName.AccountName);
if (continuationToken is null)
{
var creationDate = _db.GetCreationTime();
Tracer.Debug(nestedContext, $"Starting enumeration of change feed for account=[{accountName.AccountName}] " +
$"with startTimeUtc=[{creationDate.ToString() ?? "null"}]");
pagesEnumerable = changeFeedClient.GetChangesAsync(creationDate);
}
else
{
Tracer.Debug(nestedContext, $"Starting enumeration of change feed for account=[{accountName.AccountName}] with cursor=[{continuationToken ?? "null"}]");
pagesEnumerable = changeFeedClient.GetChangesAsync(continuationToken);
}
var enumerator = pagesEnumerable.GetAsyncEnumerator();
while (!nestedContext.Token.IsCancellationRequested)
{
var hasMore = await nestedContext.PerformNonResultOperationAsync(
Tracer,
() => enumerator.MoveNextAsync().AsTask(),
caller: "GetChangeFeedPage",
traceOperationStarted: false,
extraEndMessage: hasMore => $"ContinuationToken=[{continuationToken}], HasMore=[{hasMore}], NextContinuationToken=[{(hasMore ? enumerator.Current.ContinuationToken : null)}]");
if (!hasMore)
{
break;
}
var page = enumerator.Current;
var maxDateProcessed = await ProcessPageAsync(nestedContext, page, accountName, continuationToken);
if (nestedContext.Token.IsCancellationRequested)
{
break;
}
if (!maxDateProcessed.Succeeded)
{
// We've failed to process a page. This is unrecoverable. Cancel further page processing.
cts.Cancel();
return maxDateProcessed;
}
continuationToken = page.ContinuationToken;
if (continuationToken is not null)
{
_db.SetCursor(accountName.AccountName, continuationToken);
}
if (maxDateProcessed.Value > now)
{
break;
}
}
return nestedContext.Token.IsCancellationRequested
? new BoolResult("Cancellation was requested")
: BoolResult.Success;
}
internal virtual IChangeFeedClient CreateChangeFeedClient(AzureStorageCredentials creds)
{
return new AzureChangeFeedClientWrapper(creds.CreateBlobChangeFeedClient());
}
private Task<Result<DateTime?>> ProcessPageAsync(
OperationContext context,
Page<IBlobChangeFeedEvent> page,
BlobCacheStorageAccountName accountName,
string? pageId)
{
return context.PerformOperationAsync(
Tracer,
async () =>
{
DateTime? maxDate = null;
foreach (var change in page.Values)
{
if (context.Token.IsCancellationRequested)
{
return new Result<DateTime?>(maxDate, isNullAllowed: true);
}
if (change is null)
{
// Not sure why this would be null, but the SDK makes it an option.
Tracer.Debug(context, $"Found null change in page=[{pageId ?? "null"}]");
continue;
}
maxDate = maxDate < change.EventTime.UtcDateTime ? change.EventTime.UtcDateTime : maxDate;
// For new we'll ignore everything except blob creations. We'll assume that this GC service is the only thing deleting blobs.
if (change.EventType != BlobChangeFeedEventType.BlobCreated)
{
continue;
}
AbsoluteBlobPath blobPath;
try
{
blobPath = AbsoluteBlobPath.ParseFromChangeEventSubject(accountName, change.Subject);
}
catch (Exception e)
{
Tracer.Debug(context, e, $"Failed to parse blob path from subject {change.Subject}.");
continue;
}
var namespaceId = new BlobNamespaceId(blobPath.Container.Universe, blobPath.Container.Namespace);
var blobLength = change.ContentLength;
switch (blobPath.Container.Purpose)
{
case BlobCacheContainerPurpose.Content:
{
// If resharding happened, we don't want to process events for the other shard configuration.
if (blobPath.Container.Matrix.Equals(_contentMatrix, StringComparison.OrdinalIgnoreCase))
{
_updater.ContentCreated(context, namespaceId, blobPath.Path.Path, blobLength);
}
break;
}
case BlobCacheContainerPurpose.Metadata:
{
// If resharding happened, we don't want to process events for the other shard configuration.
if (blobPath.Container.Matrix.Equals(_metadataMatrix, StringComparison.OrdinalIgnoreCase))
{
var result = await _updater.ContentHashListCreatedAsync(context, namespaceId, blobPath.Path.Path, blobLength);
}
break;
}
default:
throw new NotSupportedException($"{blobPath.Container.Purpose} is not a supported purpose");
}
}
return new Result<DateTime?>(maxDate, isNullAllowed: true);
},
traceOperationStarted: false);
}
/// <summary>
/// Wrapper around <see cref="BlobChangeFeedClient"/> to be able to use our own defined interfaces.
/// </summary>
private class AzureChangeFeedClientWrapper : IChangeFeedClient
{
private readonly BlobChangeFeedClient _client;
public AzureChangeFeedClientWrapper(BlobChangeFeedClient client) => _client = client;
public async IAsyncEnumerable<Page<IBlobChangeFeedEvent>> GetChangesAsync(string? continuationToken)
{
var enunmerator = _client.GetChangesAsync(continuationToken).AsPages().GetAsyncEnumerator();
while (await enunmerator.MoveNextAsync())
{
var page = enunmerator.Current;
var changes = page.Values.Select(c => new BlobChangeFeedEventWrapper(c)).ToArray();
var newPage = Page<IBlobChangeFeedEvent>.FromValues(changes, page.ContinuationToken, page.GetRawResponse());
yield return newPage;
}
}
public async IAsyncEnumerable<Page<IBlobChangeFeedEvent>> GetChangesAsync(DateTime? startTimeUtc)
{
var enunmerator = _client.GetChangesAsync(start: startTimeUtc).AsPages().GetAsyncEnumerator();
while (await enunmerator.MoveNextAsync())
{
var page = enunmerator.Current;
var changes = page.Values.Select(c => new BlobChangeFeedEventWrapper(c)).ToArray();
var newPage = Page<IBlobChangeFeedEvent>.FromValues(changes, page.ContinuationToken, page.GetRawResponse());
yield return newPage;
}
}
}
/// <summary>
/// Wrapper around <see cref="BlobChangeFeedEvent"/> to be able to use our own defined interfaces.
/// </summary>
internal class BlobChangeFeedEventWrapper : IBlobChangeFeedEvent
{
private readonly BlobChangeFeedEvent _inner;
public BlobChangeFeedEventWrapper(BlobChangeFeedEvent inner) => _inner = inner;
public DateTimeOffset EventTime => _inner.EventTime;
public BlobChangeFeedEventType EventType => _inner.EventType;
public string Subject => _inner.Subject;
public long ContentLength => _inner.EventData.ContentLength;
}
}
}

Просмотреть файл

@ -8,11 +8,11 @@ using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.MemoizationStore.Stores;
@ -22,36 +22,39 @@ using BuildXL.Utilities.ParallelAlgorithms;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
/// <summary>
/// This class takes in a <see cref="RocksDbLifetimeDatabase"/> and starts performing garbage collection based on the LRU enumeration
/// of fingerprints provided by the DB. When performing GC, this class will make sure that both the database and the remote cache
/// This class takes in a <see cref="RocksDbLifetimeDatabase"/> and starts freeing up space based on the LRU enumeration
/// of fingerprints provided by the DB. When performing deletions, this class will make sure that both the database and the remote cache
/// reflect the changes necessary
/// </summary>
public class BlobLifetimeManager
public class BlobQuotaKeeper
{
private static readonly Tracer Tracer = new(nameof(BlobLifetimeManager));
private static readonly Tracer Tracer = new(nameof(BlobQuotaKeeper));
private readonly RocksDbLifetimeDatabase _database;
private readonly RocksDbLifetimeDatabase.IAccessor _database;
private readonly IBlobCacheTopology _topology;
private readonly IClock _clock;
private readonly TimeSpan _lastAccessTimeDeletionThreshold;
public BlobLifetimeManager(
RocksDbLifetimeDatabase database,
public BlobQuotaKeeper(
RocksDbLifetimeDatabase.IAccessor database,
IBlobCacheTopology topology,
TimeSpan lastAccessTimeDeletionThreshold,
IClock clock)
{
_database = database;
_topology = topology;
_lastAccessTimeDeletionThreshold = lastAccessTimeDeletionThreshold;
_clock = clock;
}
/// <summary>
/// Performs GC based on LRU ordering for fingerprints as per the database. Once a fingerprint is deleted from the remote, we will
/// Performs fingerprint deletions based on LRU ordering for fingerprints as per the database. Once a fingerprint is deleted from the remote, we will
/// decrease the reference coutn for all of its contents, and attempt to delete those contents which reach a reference count of zero.
///
/// For content that already has a reference count of zero, we only perform deletions when a certain amount of time has passed since
/// it was last accessed.
/// </summary>
public Task<BoolResult> GarbageCollectAsync(
public Task<Result<long>> EnsureUnderQuota(
OperationContext context,
long maxSize,
bool dryRun,
@ -61,23 +64,23 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
Contract.Requires(contentDegreeOfParallelism > 0);
Contract.Requires(fingerprintDegreeOfParallelism > 0);
return context.PerformOperationAsync(
return context.PerformOperationAsync<Result<long>>(
Tracer,
async () =>
{
var enumerationResult = _database.GetLruOrderedContentHashLists(context).ThrowIfFailure();
var enumerationResult = _database.GetLruOrderedContentHashLists(context);
var currentSize = enumerationResult.TotalSize;
Tracer.Info(context, $"Total L3 size is calculated to be {currentSize / 1024.0 / 1024:0.0}MB.");
Tracer.Info(context, $"Total L3 size is calculated to be {currentSize}.");
if (currentSize < maxSize)
{
Tracer.Info(context, $"Total L3 size is smaller than the max configured size. Terminating garbage collection. CurrentSize=[{currentSize / 1024.0 / 1024:0.0}MB], MaxSize=[{maxSize / 1024.0 / 1024:0.0}MB]");
return BoolResult.Success;
return currentSize;
}
var cts = new CancellationTokenSource();
Tracer.Info(context, "Starting enumeration of zero-reference content for garbage collection.");
using var underQuotaCts = new CancellationTokenSource();
await ParallelAlgorithms.EnumerateAsync<(ContentHash hash, long length)>(
enumerationResult.ZeroReferenceBlobs,
contentDegreeOfParallelism,
@ -91,15 +94,14 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
if (currentSize <= maxSize)
{
cts.Cancel();
underQuotaCts.Cancel();
}
},
cts.Token);
underQuotaCts.Token);
if (cts.IsCancellationRequested)
if (underQuotaCts.IsCancellationRequested)
{
Tracer.Info(context, $"Total L3 size is now under the max configured size. CurrentSize=[{currentSize / 1024.0 / 1024:0.0}MB], MaxSize=[{maxSize / 1024.0 / 1024:0.0}MB]");
return BoolResult.Success;
return currentSize;
}
var tryDeleteContentHashActionBlock = ActionBlockSlim.CreateWithAsyncAction<(ContentHash hash, TaskCompletionSource<object?> tcs, OperationContext context)>(
@ -143,13 +145,13 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
},
context.Token);
cts = new CancellationTokenSource();
Tracer.Info(context, "Starting LRU enumeration for fingerprint garbage collection.");
await ParallelAlgorithms.EnumerateAsync(
enumerationResult.LruOrderedContentHashLists,
fingerprintDegreeOfParallelism,
async chl =>
{
var opContext = context.CreateNested(nameof(BlobLifetimeManager), caller: "TryDeleteContentHashList");
var opContext = context.CreateNested(nameof(BlobQuotaKeeper), caller: "TryDeleteContentHashList");
try
{
@ -167,7 +169,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
Interlocked.Add(ref currentSize, -chl.BlobSize);
_database.DeleteContentHashList(chl.BlobName, chl.Hashes).ThrowIfFailure();
_database.DeleteContentHashList(chl.BlobName, chl.Hashes);
// At this point the CHL is deleted and ref count for all content is decremented. Check which content is safe to delete.
var tasks = new List<Task>();
@ -181,11 +183,11 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
await TaskUtilities.SafeWhenAll(tasks);
Tracer.Debug(opContext, $"Current size: {currentSize / 1024.0 / 1024 / 1024:0.0}GB, max size: {maxSize / 1024.0 / 1024 / 1024:0.0}GB");
Tracer.Debug(opContext, $"Current size: {currentSize}, max size: {maxSize}");
if (currentSize <= maxSize)
{
cts.Cancel();
underQuotaCts.Cancel();
}
}
catch (Exception ex)
@ -193,11 +195,11 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
Tracer.Error(opContext, ex, $"Error when processing fingerprint blob {chl.BlobName}.");
}
},
cts.Token);
underQuotaCts.Token);
Tracer.Info(context, $"Total L3 size is now under the max configured size. CurrentSize=[{currentSize / 1024.0 / 1024:0.0}MB], MaxSize=[{maxSize / 1024.0 / 1024:0.0}MB]");
return BoolResult.Success;
});
return currentSize;
},
extraEndMessage: result => $"MaxSize=[{maxSize}], CurrentSize=[{(result.Succeeded ? result.Value : null)}]");
}
private async Task<bool> TryDeleteContentAsync(OperationContext context, ContentHash contentHash, bool dryRun, long contentSize)
@ -208,10 +210,10 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
// Because of this, the current design is that clients will update the last access time of a blob when they get a content cache hit when ulpoading the contents of a new strong fingerprint.
// On the GC side of things, what this means is that we have to check that the content has not been accessed recently.
var lastAccessTime = await GetLastAccessTimeAsync(context, client);
if (lastAccessTime > _clock.UtcNow.AddDays(-1))
if (lastAccessTime > _clock.UtcNow.Add(-_lastAccessTimeDeletionThreshold))
{
Tracer.Debug(context,
$"Skipping deletion of {contentHash.ToShortString()} because it has been accessed too recently to be deleted.");
$"Skipping deletion of {contentHash.ToShortString()} because it has been accessed too recently to be deleted. LastAccessTime=[{lastAccessTime}]");
return false;
}
@ -220,18 +222,18 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
{
Tracer.Debug(
context,
$"DRY RUN: DELETE ContentHash=[{contentHash.ToShortString()}], BlobSize=[{contentSize}]");
$"DRY RUN: DELETE ContentHash=[{contentHash.ToShortString()}], BlobSize=[{contentSize}], Shard=[{client.AccountName}]");
}
else
{
var result = await DeleteBlobFromStorageAsync(context, client, contentSize);
var result = await DeleteBlobFromStorageAsync(context, client, contentSize, lastAccessTime);
if (!result.Succeeded)
{
return false;
}
}
_database.DeleteContent(contentHash).ThrowIfFailure();
_database.DeleteContent(contentHash);
return true;
}
@ -263,7 +265,15 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
var updatedHashList = contentHashList with { LastAccessTime = currentLastAccessTime.Value };
_database.UpdateContentHashListLastAccessTime(updatedHashList).ThrowIfFailure();
_database.UpdateContentHashListLastAccessTime(updatedHashList);
return false;
}
if (currentLastAccessTime > _clock.UtcNow.Add(-_lastAccessTimeDeletionThreshold))
{
Tracer.Debug(context,
$"Skipping deletion of {contentHashList.BlobName} because it has been accessed too recently to be deleted. LastAccessTime=[{currentLastAccessTime}]");
return false;
}
@ -276,36 +286,45 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
else
{
var result = await DeleteBlobFromStorageAsync(context, client, contentHashList.BlobSize);
var result = await DeleteBlobFromStorageAsync(context, client, contentHashList.BlobSize, currentLastAccessTime);
return result.Succeeded;
}
return true;
}
private static Task<BoolResult> DeleteBlobFromStorageAsync(OperationContext context, BlobClient client, long size)
internal static Task<Result<bool>> DeleteBlobFromStorageAsync(OperationContext context, BlobClient client, long size, DateTime? lastAccessTime)
{
return context.PerformOperationAsync(
return context.PerformOperationAsync<Result<bool>>(
Tracer,
async () =>
{
var response = await client.DeleteAsync();
return response.IsError
? new BoolResult($"Failed to delete blob {client.Name}. Error ({response.Status}): {response.ReasonPhrase}")
: BoolResult.Success;
try
{
var response = await client.DeleteAsync();
return response.IsError
? new Result<bool>($"Failed to delete blob {client.Name}. Error ({response.Status}): {response.ReasonPhrase}")
: true;
}
catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobNotFound)
{
// Consider this a success, but trace for diagnostic purposes.
return false;
}
},
extraEndMessage: result => $"BlobName=[{client.Name}], Size={size}");
traceOperationStarted: false,
extraEndMessage: result =>
$"BlobName=[{client.Name}], Size=[{size}], Shard=[{client.AccountName}], LastAccessTime=[{lastAccessTime}], BlobExisted=[{(result.Succeeded ? result.Value : null)}]");
}
private static async Task<DateTime?> GetLastAccessTimeAsync(OperationContext context, BlobClient client)
{
const int BlobNotFound = 404;
try
{
var response = await client.GetPropertiesAsync(cancellationToken: context.Token);
return response.Value.LastAccessed.UtcDateTime;
}
catch (RequestFailedException e) when (e.Status == BlobNotFound)
catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobNotFound)
{
return null;
}

Просмотреть файл

@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
public class BlobQuotaKeeperConfig
{
public required TimeSpan LastAccessTimeDeletionThreshold { get; init; }
public required List<GarbageCollectionNamespaceConfig> Namespaces { get; init; }
public double LruEnumerationPercentileStep { get; init; } = 0.05;
public int LruEnumerationBatchSize { get; init; } = 1000;
}
public record GarbageCollectionNamespaceConfig(string Universe, string Namespace, double MaxSizeGb);
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Diagnostics.CodeAnalysis;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
internal static class BlobUtilities
{
/// <summary>
/// Extracts the name of a blob, without the ".blob" extension.
/// </summary>
public static bool TryExtractBlobName(string fullName, [NotNullWhen(true)] out string? blobName)
{
if (!fullName.EndsWith(".blob"))
{
blobName = null;
return false;
}
blobName = fullName[..^".blob".Length];
return true;
}
}
}

Просмотреть файл

@ -3,7 +3,7 @@
namespace Library {
@@public
export const dll = !BuildXLSdk.Flags.isMicrosoftInternal ? undefined : BuildXLSdk.library({
export const dll = BuildXLSdk.library({
assemblyName: "BuildXL.Cache.BlobLifetimeManager.Library",
sources: globR(d`.`,"*.cs"),
references: [
@ -19,6 +19,7 @@ namespace Library {
importFrom("BuildXL.Utilities").dll,
importFrom("BuildXL.Utilities").Utilities.Core.dll,
importFrom("BuildXL.Utilities").KeyValueStore.dll,
importFrom("BuildXL.Utilities").Native.dll,
...importFrom("BuildXL.Cache.ContentStore").getAzureBlobStorageSdkPackages(true),
...importFrom("Sdk.Selfhost.RocksDbSharp").pkgs,

Просмотреть файл

@ -8,6 +8,7 @@ using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using BuildXL.Cache.ContentStore.Distributed.Blob;
@ -21,7 +22,6 @@ using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
using BuildXL.Utilities;
using BuildXL.Utilities.Core.Tasks;
using BuildXL.Utilities.ParallelAlgorithms;
using static BuildXL.Cache.BlobLifetimeManager.Library.RocksDbLifetimeDatabase;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
@ -29,42 +29,63 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
/// This class is responsible for creating a database "from zero". This means that it will enumerate all of the contents of
/// the blob cache and construct a database that tracks content blobs' reference coutns, as well as blob lengths.
/// </summary>
public class LifetimeDatabaseCreator
public static class LifetimeDatabaseCreator
{
private static Tracer Tracer { get; } = new(nameof(LifetimeDatabaseCreator));
private record ProcessFingerprintRequest(OperationContext Context, BlobContainerClient Container, BlobItem Blob, RocksDbLifetimeDatabase Database);
internal record ProcessFingerprintRequest(
OperationContext Context,
BlobContainerClient Container,
string BlobName,
long BlobLength,
RocksDbLifetimeDatabase.IAccessor Database,
IBlobCacheTopology Topology);
private readonly IClock _clock;
private readonly IBlobCacheTopology _topology;
private readonly SerializationPool _serializationPool = new();
private static readonly SerializationPool SerializationPool = new();
public LifetimeDatabaseCreator(IClock clock, IBlobCacheTopology topology)
{
_clock = clock;
_topology = topology;
}
public async Task<RocksDbLifetimeDatabase> CreateAsync(
public static Task<Result<RocksDbLifetimeDatabase>> CreateAsync(
OperationContext context,
RocksDbLifetimeDatabase.Configuration configuration,
IClock clock,
int contentDegreeOfParallelism,
int fingerprintDegreeOfParallelism)
int fingerprintDegreeOfParallelism,
Func<BlobNamespaceId, IBlobCacheTopology> topologyFactory)
{
Contract.Requires(contentDegreeOfParallelism > 0);
Contract.Requires(fingerprintDegreeOfParallelism > 0);
return context.PerformOperationAsync(
Tracer,
async () =>
{
Contract.Requires(contentDegreeOfParallelism > 0);
Contract.Requires(fingerprintDegreeOfParallelism > 0);
RocksDbLifetimeDatabase database = CreateNewDatabase();
RocksDbLifetimeDatabase database = CreateNewDatabase(configuration, clock);
_ = await ProcessIndividualContentBlobsAsync(context, contentDegreeOfParallelism, database);
await ProcessRemoteFingerprintsAsync(context, fingerprintDegreeOfParallelism, database);
// TODO: consider doing this in parallel, although it could be argued that if each of these calls
// is making full use of resources, it can be better to just do this sequentially.
foreach (var namespaceId in configuration.BlobNamespaceIds)
{
var accessor = database.GetAccessor(namespaceId);
var topology = topologyFactory(namespaceId);
return database;
_ = await ProcessIndividualContentBlobsAsync(context, contentDegreeOfParallelism, accessor, topology);
var fingerprintsResult = await ProcessRemoteFingerprintsAsync(context, fingerprintDegreeOfParallelism, accessor, topology, clock);
if (!fingerprintsResult.Succeeded)
{
// If we fail to process remote fingerprints, we fall into an unrecoverable state. We must fail.
return new Result<RocksDbLifetimeDatabase>(fingerprintsResult);
}
}
return database;
});
}
private Task<BoolResult> ProcessIndividualContentBlobsAsync(
private static Task<BoolResult> ProcessIndividualContentBlobsAsync(
OperationContext context,
int degreeOfParallelism,
RocksDbLifetimeDatabase database)
RocksDbLifetimeDatabase.IAccessor database,
IBlobCacheTopology topology)
{
long count = 0;
@ -72,38 +93,14 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
Tracer,
async () =>
{
var clientEnumerationTasks = new List<Task>();
var actionBlock = ActionBlockSlim.Create<(BlobItem blob, BlobContainerClient container)>(
var processContentActionBlock = ActionBlockSlim.Create<(BlobItem blob, BlobContainerClient container)>(
degreeOfParallelism: degreeOfParallelism,
async blobAndContainer =>
{
var blob = blobAndContainer.blob;
var container = blobAndContainer.container;
try
{
if (!blob.Name.EndsWith(".blob"))
{
return;
}
var lastAccessTime = GetLastAccessTime(blob);
var hashString = blob.Name[..^".blob".Length];
if (!ContentHash.TryParse(hashString, out var hash))
{
Tracer.Warning(context, $"Failed to parse content hash from blob name {blob.Name}");
return;
}
var length = await GetBlobLengthAsync(blob, container);
database.AddContent(hash, length).ThrowIfFailure();
}
catch (Exception ex)
{
Tracer.Error(context, ex, $"Error when processing content. Account=[{container.AccountName}], Container=[{container.Name}], Blob={blob.Name}");
}
await ProcessContentBlobAsync(context, blob, container, database);
if (Interlocked.Increment(ref count) % 1000 == 0)
{
@ -111,7 +108,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
});
await foreach (var container in _topology.EnumerateClientsAsync(context, BlobCacheContainerPurpose.Content))
var clientEnumerationTasks = new List<Task>();
await foreach (var container in topology.EnumerateClientsAsync(context, BlobCacheContainerPurpose.Content))
{
var enumerationTask = Task.Run(async () =>
{
@ -119,7 +117,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
await foreach (var blob in container.GetBlobsAsync(cancellationToken: context.Token))
{
actionBlock.Post((blob, container));
processContentActionBlock.Post((blob, container));
}
});
@ -127,18 +125,48 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
await TaskUtilities.SafeWhenAll(clientEnumerationTasks);
actionBlock.Complete();
await actionBlock.Completion;
processContentActionBlock.Complete();
await processContentActionBlock.Completion;
return BoolResult.Success;
},
extraEndMessage: _ => $"TotalProcessed={count}");
}
private Task ProcessRemoteFingerprintsAsync(
private static async Task ProcessContentBlobAsync(
OperationContext context,
BlobItem blob,
BlobContainerClient container,
RocksDbLifetimeDatabase.IAccessor database)
{
try
{
if (!BlobUtilities.TryExtractBlobName(blob.Name, out var hashString))
{
return;
}
if (!ContentHash.TryParse(hashString, out var hash))
{
Tracer.Warning(context, $"Failed to parse content hash from blob name {blob.Name}");
return;
}
var length = await GetBlobLengthAsync(blob, container);
database.AddContent(hash, length);
}
catch (Exception ex)
{
Tracer.Error(context, ex, $"Error when processing content. Account=[{container.AccountName}], Container=[{container.Name}], Blob={blob.Name}");
}
}
private static Task<BoolResult> ProcessRemoteFingerprintsAsync(
OperationContext context,
int degreeOfParallelism,
RocksDbLifetimeDatabase database)
RocksDbLifetimeDatabase.IAccessor database,
IBlobCacheTopology topology,
IClock clock)
{
long count = 0;
@ -146,21 +174,25 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
Tracer,
async () =>
{
var clientEnumerationTasks = new List<Task>();
var actionBlock = ActionBlockSlim.Create<ProcessFingerprintRequest>(
BoolResult errorResult = BoolResult.Success;
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.Token);
var processFingerprintActionBlock = ActionBlockSlim.Create<ProcessFingerprintRequest>(
degreeOfParallelism: degreeOfParallelism,
async request =>
{
await DownloadAndProcessContentHashListAsync(request.Context, request.Container, request.Blob, request.Database);
if (Interlocked.Increment(ref count) % 1000 == 0)
var processResult = await DownloadAndProcessContentHashListAsync(request.Context, request.Container, request.BlobName, request.BlobLength, request.Database, request.Topology, clock);
if (!processResult.Succeeded)
{
Tracer.Info(context, $"Processed {count} fingerprints so far.");
errorResult &= new BoolResult(processResult);
cts.Cancel();
}
});
await foreach (var container in _topology.EnumerateClientsAsync(context, BlobCacheContainerPurpose.Metadata))
Interlocked.Increment(ref count);
},
cancellationToken: cts.Token);
var clientEnumerationTasks = new List<Task>();
await foreach (var container in topology.EnumerateClientsAsync(context, BlobCacheContainerPurpose.Metadata))
{
var enumerationTask = Task.Run(async () =>
{
@ -168,7 +200,12 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
await foreach (var blob in container.GetBlobsAsync(cancellationToken: context.Token))
{
actionBlock.Post(new ProcessFingerprintRequest(context, container, blob, database));
if (cts.IsCancellationRequested)
{
break;
}
processFingerprintActionBlock.Post(new ProcessFingerprintRequest(context, container, blob.Name, blob.Properties.ContentLength!.Value, database, topology));
}
});
@ -176,8 +213,21 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
await TaskUtilities.SafeWhenAll(clientEnumerationTasks);
actionBlock.Complete();
await actionBlock.Completion;
processFingerprintActionBlock.Complete();
await processFingerprintActionBlock.Completion;
if (cts.Token.IsCancellationRequested)
{
if (context.Token.IsCancellationRequested)
{
return new BoolResult("Operation was cancelled");
}
else
{
// Cancellation was requested because we failed to process one or more content hash list.
return errorResult;
}
}
return BoolResult.Success;
},
@ -197,91 +247,146 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
return properties.Value.ContentLength;
}
private RocksDbLifetimeDatabase CreateNewDatabase()
private static RocksDbLifetimeDatabase CreateNewDatabase(
RocksDbLifetimeDatabase.Configuration configuration,
IClock clock)
{
var temp = Path.Combine(Path.GetTempPath(), "LifetimeDatabase", Guid.NewGuid().ToString());
var db = RocksDbLifetimeDatabase.Create(
new RocksDbLifetimeDatabase.Configuration
{
DatabasePath = temp,
LruEnumerationPercentileStep = 0.05,
LruEnumerationBatchSize = 1000,
},
_clock)
.ThrowIfFailure();
var db = RocksDbLifetimeDatabase.Create(configuration, clock);
db.SetCreationTime(clock.UtcNow);
return db;
}
private async Task DownloadAndProcessContentHashListAsync(
OperationContext context,
BlobContainerClient container,
BlobItem blob,
RocksDbLifetimeDatabase database)
internal enum ProcessContentHashListResult
{
try
{
var blobClient = container.GetBlobClient(blob.Name);
using var stream = new MemoryStream();
var response = await blobClient.DownloadToAsync(stream, context.Token);
if (response.IsError)
{
Tracer.Debug(context, $"Failed to download content hash list: {response.ReasonPhrase}");
return;
}
// Reset the position of the stream so we can read from it.
stream.Position = 0;
var metadataEntry = _serializationPool.Deserialize(stream, ContentStore.Distributed.NuCache.MetadataEntry.Deserialize);
var contentHashList = metadataEntry.ContentHashListWithDeterminism;
var lastAccessTime = GetLastAccessTime(blob);
await ProcessContentHashListAsync(context, blob, contentHashList, lastAccessTime, database);
}
catch (Exception ex)
{
Tracer.Error(context, ex, $"Error when processing fingerprint. Account=[{container.AccountName}], Container=[{container.Name}], Blob={blob.Name}");
}
Success,
ContentHashListDoesNotExist,
InvalidAndDeleted,
}
private async Task ProcessContentHashListAsync(
internal static Task<Result<ProcessContentHashListResult>> DownloadAndProcessContentHashListAsync(
OperationContext context,
BlobItem blob,
BlobContainerClient container,
string blobName,
long blobLength,
RocksDbLifetimeDatabase.IAccessor database,
IBlobCacheTopology topology,
IClock clock)
{
return context.PerformOperationAsync(
Tracer,
async () =>
{
var blobClient = container.GetBlobClient(blobName);
// TODO: consider using pooled memory streams.
using var stream = new MemoryStream();
try
{
var response = await blobClient.DownloadToAsync(stream, context.Token);
if (response.IsError)
{
return new Result<ProcessContentHashListResult>($"Download of the content hash list failed with status code: {response.Status}");
}
}
catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobNotFound)
{
// The CHL no longer exists. We can continue without making any changes.
return ProcessContentHashListResult.ContentHashListDoesNotExist;
}
// Reset the position of the stream so we can read from it.
stream.Position = 0;
var metadataEntry = SerializationPool.Deserialize(stream, MetadataEntry.Deserialize);
var contentHashList = metadataEntry.ContentHashListWithDeterminism;
// We use "now" as last access time because we just downloaded the blob, and thus we just affected the CHL's blob last access time.
// Adding a minute to account for clocks not being in sync. In the grand scheme of things, an error of one minute shouldn't make a difference.
var lastAccessTime = clock.UtcNow.AddMinutes(1);
var processResult = await ProcessContentHashListAsync(context, blobName, blobLength, contentHashList, lastAccessTime, database, topology);
if (!processResult.Succeeded)
{
return new Result<ProcessContentHashListResult>(processResult);
}
if (!processResult.Value)
{
// This signals that one of the referenced blobs does not exist. The CHL is invalid and should be deleted.
var deleteResult = await BlobQuotaKeeper.DeleteBlobFromStorageAsync(context, blobClient, blobLength, lastAccessTime);
if (!deleteResult.Succeeded)
{
return new Result<ProcessContentHashListResult>(deleteResult, "Failed to delete invalid content hash list.");
}
return ProcessContentHashListResult.InvalidAndDeleted;
}
return ProcessContentHashListResult.Success;
},
traceOperationStarted: false,
extraEndMessage: result => $"ReturnCode=[{(result.Succeeded ? result.Value : "Failure")}], Account=[{container.AccountName}], Container=[{container.Name}], Blob=[{blobName}]");
}
private static async Task<Result<bool>> ProcessContentHashListAsync(
OperationContext context,
string blobName,
long blobLength,
ContentHashListWithDeterminism contentHashList,
DateTime lastAccessTime,
RocksDbLifetimeDatabase database)
RocksDbLifetimeDatabase.IAccessor database,
IBlobCacheTopology topology)
{
var hashes = new List<(ContentHash hash, long size)>();
foreach (var contentHash in contentHashList.ContentHashList!.Hashes)
{
try
{
if (contentHash.IsEmptyHash())
{
hashes.Add((contentHash, 0));
continue;
}
var exists = database.GetContentEntry(contentHash) is not null;
var length = 0L;
if (!exists)
{
BlobClient client = await _topology.GetBlobClientAsync(context, contentHash);
var response = await client.GetPropertiesAsync(cancellationToken: context.Token);
length = response.Value.ContentLength;
BlobClient client = await topology.GetBlobClientAsync(context, contentHash);
try
{
var response = await client.GetPropertiesAsync(cancellationToken: context.Token);
length = response.Value.ContentLength;
}
catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobNotFound)
{
Tracer.Warning(context, $"Non-registrated content was not found in blob storage. Hash=[{contentHash.ToShortHash()}], CHL=[{blobName}]");
return false;
}
}
hashes.Add((contentHash, length));
}
catch (Exception ex)
{
Tracer.Error(context, ex, $"Error when incrementing reference count for {contentHash.ToShortString()}");
return new Result<bool>(ex, $"Error when incrementing reference count for {contentHash.ToShortString()}");
}
}
database.AddContentHashList(
new ContentHashList(
blob.Name,
blobName,
lastAccessTime,
contentHashList.ContentHashList!.Hashes.ToArray(),
blob.Properties.ContentLength!.Value),
hashes).ThrowIfFailure();
blobLength),
hashes);
return true;
}
private static DateTime GetLastAccessTime(

Просмотреть файл

@ -0,0 +1,120 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
using BuildXL.Cache.MemoizationStore.Stores;
using BuildXL.Utilities.ParallelAlgorithms;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
public class LifetimeDatabaseUpdater
{
private static readonly Tracer Tracer = new(nameof(LifetimeDatabaseUpdater));
private readonly Dictionary<BlobNamespaceId, IBlobCacheTopology> _topologies;
private readonly Dictionary<BlobNamespaceId, RocksDbLifetimeDatabase.IAccessor> _accessors;
private readonly IClock _clock;
private readonly ActionBlockSlim<(LifetimeDatabaseCreator.ProcessFingerprintRequest, TaskCompletionSource<Result<LifetimeDatabaseCreator.ProcessContentHashListResult>>)> _fingerprintCreatedActionBlock;
public LifetimeDatabaseUpdater(
Dictionary<BlobNamespaceId, IBlobCacheTopology> topologies,
Dictionary<BlobNamespaceId, RocksDbLifetimeDatabase.IAccessor> accessors,
IClock clock,
int fingerprintsDegreeOfParallelism)
{
_topologies = topologies;
_accessors = accessors;
_clock = clock;
_fingerprintCreatedActionBlock = ActionBlockSlim.CreateWithAsyncAction<(LifetimeDatabaseCreator.ProcessFingerprintRequest, TaskCompletionSource<Result<LifetimeDatabaseCreator.ProcessContentHashListResult>>)>(
new ActionBlockSlimConfiguration(DegreeOfParallelism: fingerprintsDegreeOfParallelism),
async tpl =>
{
var (request, tcs) = tpl;
try
{
var result = await LifetimeDatabaseCreator.DownloadAndProcessContentHashListAsync(
request.Context, request.Container, request.BlobName, request.BlobLength, request.Database, request.Topology, clock);
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetResult(new Result<LifetimeDatabaseCreator.ProcessContentHashListResult>(ex));
}
});
}
public void ContentCreated(OperationContext context, BlobNamespaceId namespaceId, string blobName, long length)
{
if (!_accessors.TryGetValue(namespaceId, out var db))
{
Tracer.Debug(context, $"Ignoring creation of content with name={blobName} because Namespace={namespaceId} isn't being tracked.");
return;
}
if (!BlobUtilities.TryExtractBlobName(blobName, out var hashString))
{
return;
}
if (!ContentHash.TryParse(hashString, out var hash))
{
// If a random file is created, ignore it.
Tracer.Warning(context, $"Failed to parse content hash from BlobName=[{blobName}]. Ignoring blob.");
return;
}
db.AddContent(hash, length);
}
internal async Task<Result<LifetimeDatabaseCreator.ProcessContentHashListResult>> ContentHashListCreatedAsync(
OperationContext context,
BlobNamespaceId namespaceId,
string blobName,
long blobLength)
{
if (!_accessors.TryGetValue(namespaceId, out var db) ||
!_topologies.TryGetValue(namespaceId, out var topology))
{
Tracer.Debug(context, $"Ignoring creation of content hash list with path {blobName} because Namespace={namespaceId} isn't being tracked.");
return LifetimeDatabaseCreator.ProcessContentHashListResult.Success;
}
StrongFingerprint strongFingerprint;
try
{
strongFingerprint = AzureBlobStorageMetadataStore.ExtractStrongFingerprintFromPath(blobName);
}
catch (Exception e)
{
Tracer.Debug(context, e, $"Failed to parse strong fingerprint from BlobName=[{blobName}]. Ignoring blob.");
return LifetimeDatabaseCreator.ProcessContentHashListResult.ContentHashListDoesNotExist;
}
if (db.GetContentHashList(strongFingerprint, out _) is not null)
{
// Already in DB. Ignoring update.
return LifetimeDatabaseCreator.ProcessContentHashListResult.Success;
}
var containerClient = await topology.GetContainerClientAsync(context, BlobCacheShardingKey.FromWeakFingerprint(strongFingerprint.WeakFingerprint));
var tcs = new TaskCompletionSource<Result<LifetimeDatabaseCreator.ProcessContentHashListResult>>();
var request = new LifetimeDatabaseCreator.ProcessFingerprintRequest(context, containerClient, blobName, blobLength, db, topology);
_fingerprintCreatedActionBlock.Post((request, tcs));
var result = await tcs.Task;
return result;
}
}
}

Просмотреть файл

@ -0,0 +1,318 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Timers;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Utilities.Core;
namespace BuildXL.Cache.BlobLifetimeManager.Library
{
public class BlobLifetimeManager
{
private static readonly Tracer Tracer = new(nameof(BlobLifetimeManager));
public async Task RunAsync(
OperationContext context,
BlobQuotaKeeperConfig config,
IBlobCacheSecretsProvider secretsProvider,
IReadOnlyList<BlobCacheStorageAccountName> accountNames,
IClock clock,
string runId,
int contentDegreeOfParallelism,
int fingerprintDegreeOfParallelism,
bool dryRun)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.Token);
context = context.WithCancellationToken(cts.Token);
var shardingScheme = new ShardingScheme(ShardingAlgorithm.JumpHash, accountNames);
var (metadataMatrix, contentMatrix) = ShardedBlobCacheTopology.GenerateMatrix(shardingScheme);
var maxSizes = config.Namespaces.ToDictionary(
c => new BlobNamespaceId(c.Universe, c.Namespace),
c => (long)(c.MaxSizeGb * 1024 * 1024 * 1024));
var namespaces = maxSizes.Keys.ToList();
var topologies = namespaces.ToDictionary(
n => n,
n => (IBlobCacheTopology)new ShardedBlobCacheTopology(
new ShardedBlobCacheTopology.Configuration(
shardingScheme,
secretsProvider,
n.Universe,
n.Namespace)));
// Using the 0th shard of the cache, so that checkpoint data is preserved between re-sharding.
// However, the container needs to be differentiated between reshardings.
var checkpointContainerName = $"checkpoints-{metadataMatrix}";
var checkpointManagerStorageAccount = accountNames.MinBy(account => ((BlobCacheStorageShardingAccountName)account).ShardId);
Contract.Assert(checkpointManagerStorageAccount is not null);
var checkpointManagerStorageCreds = await secretsProvider.RetrieveBlobCredentialsAsync(context, checkpointManagerStorageAccount);
var machineLocation = new MachineLocation(runId);
await RunWithLeaseAsync(
context,
checkpointManagerStorageCreds,
checkpointContainerName,
machineLocation,
cts,
clock,
async () =>
{
await LogConfigAndAccountDifferencesAsync(context, secretsProvider, accountNames, config, metadataMatrix, contentMatrix);
var temp = Path.Combine(Path.GetTempPath(), "LifetimeDatabase", Guid.NewGuid().ToString());
var dbConfig = new RocksDbLifetimeDatabase.Configuration
{
DatabasePath = temp,
LruEnumerationPercentileStep = config.LruEnumerationPercentileStep,
LruEnumerationBatchSize = config.LruEnumerationBatchSize,
BlobNamespaceIds = namespaces,
};
var checkpointable = new RocksDbLifetimeDatabase.CheckpointableLifetimeDatabase(
dbConfig,
clock);
var registry = new AzureBlobStorageCheckpointRegistry(
new AzureBlobStorageCheckpointRegistryConfiguration
{
Storage = new AzureBlobStorageCheckpointRegistryConfiguration.StorageSettings(
Credentials: checkpointManagerStorageCreds,
ContainerName: checkpointContainerName)
});
var centralStorage = new BlobCentralStorage(new BlobCentralStoreConfiguration(
checkpointManagerStorageCreds,
containerName: checkpointContainerName,
checkpointsKey: "lifetime-manager"
));
var checkpointManager = new CheckpointManager(
checkpointable,
registry,
centralStorage,
new CheckpointManagerConfiguration(
WorkingDirectory: new ContentStore.Interfaces.FileSystem.AbsolutePath(Path.Combine(Path.GetTempPath(), "CheckpointManager")),
PrimaryMachineLocation: machineLocation)
{
// We don't want to restore checkpoints on a loop.
RestoreCheckpoints = false,
},
new CounterCollection<ContentLocationStoreCounters>(),
clock);
await checkpointManager.StartupAsync(context).ThrowIfFailure();
var checkpointStateResult = await registry.GetCheckpointStateAsync(context).ThrowIfFailure();
RocksDbLifetimeDatabase db;
var firstRun = string.IsNullOrEmpty(checkpointStateResult.Value?.CheckpointId);
if (!firstRun)
{
await checkpointManager.RestoreCheckpointAsync(context, checkpointStateResult.Value!).ThrowIfFailure();
db = checkpointable.GetDatabase();
}
else
{
db = await LifetimeDatabaseCreator.CreateAsync(
context,
dbConfig,
clock,
contentDegreeOfParallelism,
fingerprintDegreeOfParallelism,
namespaceId => topologies[namespaceId]).ThrowIfFailureAsync();
checkpointable.Database = db;
}
using (db)
{
var accessors = namespaces.ToDictionary(
n => n,
db.GetAccessor);
if (!firstRun)
{
// Only get updates from Azure if the database already existed
var updater = new LifetimeDatabaseUpdater(topologies, accessors, clock, fingerprintDegreeOfParallelism);
AzureStorageChangeFeedEventDispatcher dispatcher =
CreateDispatcher(secretsProvider, accountNames, metadataMatrix, contentMatrix, db, updater, clock);
await dispatcher.ConsumeNewChangesAsync(context).ThrowIfFailure();
}
// TODO: consider doing this in parallel, although it could be argued that if each of these calls
// is making full use of resources, it can be better to just do this sequentially.
foreach (var namespaceId in namespaces)
{
context.Token.ThrowIfCancellationRequested();
var accessor = accessors[namespaceId];
var quotaKeeper = new BlobQuotaKeeper(accessor, topologies[namespaceId], config.LastAccessTimeDeletionThreshold, clock);
_ = await quotaKeeper.EnsureUnderQuota(context, maxSizes[namespaceId], dryRun, contentDegreeOfParallelism, fingerprintDegreeOfParallelism);
}
if (!dryRun)
{
context.Token.ThrowIfCancellationRequested();
await checkpointManager.CreateCheckpointAsync(
context,
new EventSequencePoint(clock.UtcNow),
maxEventProcessingDelay: null)
.ThrowIfFailure();
}
}
});
}
protected virtual AzureStorageChangeFeedEventDispatcher CreateDispatcher(
IBlobCacheSecretsProvider secretsProvider,
IReadOnlyList<BlobCacheStorageAccountName> accountNames,
string metadataMatrix,
string contentMatrix,
RocksDbLifetimeDatabase db,
LifetimeDatabaseUpdater updater,
IClock clock)
{
return new AzureStorageChangeFeedEventDispatcher(secretsProvider, accountNames, updater, db, clock, metadataMatrix, contentMatrix);
}
private static async Task RunWithLeaseAsync(
OperationContext context,
AzureStorageCredentials checkpointManagerStorageAccount,
string checkpointContainerName,
MachineLocation machineLocation,
CancellationTokenSource cts,
IClock clock,
Func<Task> run)
{
var masterSelection = new AzureBlobStorageMasterElectionMechanism(
new AzureBlobStorageMasterElectionMechanismConfiguration()
{
Storage = new AzureBlobStorageMasterElectionMechanismConfiguration.StorageSettings(
checkpointManagerStorageAccount,
ContainerName: checkpointContainerName),
ReleaseLeaseOnShutdown = true,
},
machineLocation,
clock);
await masterSelection.StartupAsync(context).ThrowIfFailure();
var state = await masterSelection.GetRoleAsync(context).ThrowIfFailureAsync();
if (state.Role != Role.Master)
{
throw new InvalidOperationException(
$"Failed to get lease on the blob cache. This indicates another GC run is already happening. The cache is currently leased to: {state.Master.Path}");
}
try
{
using var timer = new IntervalTimer(async () =>
{
var stateResult = await masterSelection.GetRoleAsync(context);
if (!stateResult.Succeeded || stateResult.Value.Role != Role.Master)
{
Tracer.Error(
context,
$"Failed to renew lease on the blob cache. Aborting since this could lead to multiple GC runs running at the same time.");
cts.Cancel();
}
},
period: TimeSpan.FromMinutes(5),
dueTime: TimeSpan.FromMinutes(5));
await run();
}
finally
{
await masterSelection.ShutdownAsync(context).ThrowIfFailure();
}
}
private static async Task LogConfigAndAccountDifferencesAsync(
OperationContext context,
IBlobCacheSecretsProvider secretsProvider,
IReadOnlyList<BlobCacheStorageAccountName> accounts,
BlobQuotaKeeperConfig config,
string metadataMatrix,
string contentMatrix)
{
var configuredNamespaces = config.Namespaces.Select(config => (config.Universe, config.Namespace)).ToHashSet();
var enumeratedNamespaces = new HashSet<(string Universe, string Namespace)>();
// Start by enumerating all accounts and their containers, attempting to parse their namespaces.
foreach (var account in accounts)
{
var cred = await secretsProvider.RetrieveBlobCredentialsAsync(context, account);
var client = cred.CreateBlobServiceClient();
await foreach (var container in client.GetBlobContainersAsync())
{
try
{
var name = BlobCacheContainerName.Parse(container.Name);
if (!configuredNamespaces.Contains((name.Universe, name.Namespace)))
{
Tracer.Warning(context, $"Container {container.Name} in account {client.AccountName} has " +
$"(Universe=[{name.Universe}], Namespace=[{name.Namespace}]), which is not listed in the configuration.");
}
else if (name.Purpose == BlobCacheContainerPurpose.Metadata)
{
if (name.Matrix != metadataMatrix)
{
Tracer.Warning(context, $"Container {container.Name} in account {client.AccountName} has Matrix=[{name.Matrix}], which does" +
$"not match current matrix=[{metadataMatrix}]. Resharding is likely to have occurred.");
continue;
}
}
else if (name.Purpose == BlobCacheContainerPurpose.Content)
{
if (name.Matrix != contentMatrix)
{
Tracer.Warning(context, $"Container {container.Name} in account {client.AccountName} has Matrix=[{name.Matrix}], which does" +
$"not match current matrix=[{contentMatrix}]. Resharding is likely to have occurred.");
continue;
}
}
enumeratedNamespaces.Add((name.Universe, name.Namespace));
}
catch (FormatException)
{
Tracer.Warning(context, $"Failed to parse container name {container.Name} in account {client.AccountName}");
}
}
}
// Now find all configured namespaces with no matching containers.
foreach (var (universe, @namespace) in configuredNamespaces.Except(enumeratedNamespaces))
{
Tracer.Warning(context, $"(Universe=[{universe}], Namespace=[{@namespace}]) was found in configuration but no matching containers were found.");
}
}
}
}

Просмотреть файл

@ -3,10 +3,15 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Threading;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Tracing;
@ -15,8 +20,8 @@ using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
using BuildXL.Cache.MemoizationStore.Stores;
using BuildXL.Engine.Cache.KeyValueStores;
using BuildXL.Native.IO;
using BuildXL.Utilities;
using BuildXL.Utilities.Core;
using BuildXL.Utilities.Serialization;
using RocksDbSharp;
using OperationContext = BuildXL.Cache.ContentStore.Tracing.Internal.OperationContext;
@ -30,6 +35,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
/// </summary>
public class RocksDbLifetimeDatabase : IDisposable
{
private const string CreationTimeKey = "__creation_time__";
private static readonly Tracer Tracer = new(nameof(RocksDbLifetimeDatabase));
public class Configuration
@ -55,6 +61,30 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
/// in batches, so that operations against the L3 can happen in between usages of the database.
/// </summary>
public int LruEnumerationBatchSize { get; set; } = 1000;
public required IReadOnlyList<BlobNamespaceId> BlobNamespaceIds { get; set; }
}
/// <summary>
/// There's multiple column families in this usage of RocksDB. Their usages are documented below.
/// </summary>
private enum ColumnFamily
{
/// <summary>
/// Stores a mapping from <see cref="ContentHash"/> to <see cref="ContentEntry"/>. This will allow us to keep track
/// of which content is ready to be removed from the L3, as well as how large it is.
/// </summary>
Content,
/// Stores a mapping from <see cref="StrongFingerprint"/> to <see cref="MetadataEntry"/>.
/// This allows us to keep track of which strong fingerprints have been accessed least recently, as well as which
/// content hashes are contained within its content hash list.
Fingerprints,
/// <summary>
/// Stores a mapping from storage account to a cursor to the latest change feed event processed.
/// </summary>
Cursors,
}
internal record ContentEntry(long BlobSize, int ReferenceCount)
@ -80,7 +110,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
{
writer.Write(BlobSize);
writer.Write(LastAccessTime);
writer.Write(Hashes, (ref SpanWriter w, ContentHash hash) => w.Write(hash));
writer.Write(Hashes, (ref SpanWriter w, ContentHash hash) => HashSerializationExtensions.Write(ref w, hash));
}
public static MetadataEntry Deserialize(SpanReader reader)
@ -93,104 +123,96 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
}
/// <summary>
/// There's multiple column families in this usage of RocksDB. Their usages are documented below.
/// </summary>
private enum Columns
{
/// <summary>
/// Stores a mapping from <see cref="ContentHash"/> to <see cref="ContentEntry"/>. This will allow us to keep track
/// of which content is ready to be removed from the L3, as well as how large it is.
/// </summary>
Content,
/// Stores a mapping from <see cref="StrongFingerprint"/> to <see cref="MetadataEntry"/>.
/// This allows us to keep track of which strong fingerprints have been accessed least recently, as well as which
/// content hashes are contained within its content hash list.
Fingerprints,
}
private readonly KeyValueStoreAccessor _keyValueStore;
private readonly RocksDb _db;
private readonly SerializationPool _serializationPool = new();
private readonly Configuration _configuration;
private readonly IClock _clock;
private readonly ColumnFamilyHandle _contentCf;
private readonly ColumnFamilyHandle _fingerprintCf;
private readonly ColumnFamilyHandle _cursorsCf;
private readonly ReadOptions _readOptions = new ReadOptions()
.SetVerifyChecksums(true)
.SetTotalOrderSeek(true);
private readonly WriteOptions _writeOptions = new WriteOptions()
.SetSync(false)
.DisableWal(disable: 1);
protected RocksDbLifetimeDatabase(
Configuration configuration,
IClock clock,
KeyValueStoreAccessor keyValueStore,
ColumnFamilyHandle contentCf,
ColumnFamilyHandle fingerprintCf)
RocksDb db)
{
_configuration = configuration;
_clock = clock;
_keyValueStore = keyValueStore;
_contentCf = contentCf;
_fingerprintCf = fingerprintCf;
_db = db;
_cursorsCf = _db.GetColumnFamily(nameof(ColumnFamily.Cursors));
}
public static Result<RocksDbLifetimeDatabase> Create(
public static RocksDbLifetimeDatabase Create(
Configuration configuration,
IClock clock)
{
var possibleStore = CreateAccessor(configuration);
KeyValueStoreAccessor keyValueStore;
if (possibleStore.Succeeded)
{
keyValueStore = possibleStore.Result;
var (contentCf, fingerprintCf) = GetColumnFamilies(keyValueStore);
return new RocksDbLifetimeDatabase(configuration, clock, keyValueStore, contentCf, fingerprintCf);
}
return new Result<RocksDbLifetimeDatabase>($"Failed to initialize a RocksDb store at {configuration.DatabasePath}:", possibleStore.Failure.DescribeIncludingInnerFailures());
RocksDb db = CreateDb(configuration);
return new RocksDbLifetimeDatabase(configuration, clock, db);
}
internal static (ColumnFamilyHandle contentCf, ColumnFamilyHandle fingerprintCf) GetColumnFamilies(KeyValueStoreAccessor keyValueStore)
// There is a bug in our RocksDb library that requires that we keep a reference to created ColumnFamilyOptions,
// otherwise an Exception will be thrown with the error:
// A callback was made on a garbage collected delegate of type 'RocksDbSharp!RocksDbSharp.ColumnFamilyOptions+GetMergeOperator::Invoke'.
// Bug#2090673
private static readonly List<ColumnFamilyOptions> Hack = new List<ColumnFamilyOptions>();
protected static RocksDb CreateDb(Configuration configuration)
{
ColumnFamilyHandle? contentCf = null, fingerprintCf = null;
var result = keyValueStore.Use(store =>
var options = new DbOptions();
options.EnableStatistics();
options.SetAdviseRandomOnOpen(true);
options.SetCreateIfMissing(true);
options.SetCreateMissingColumnFamilies(true);
options.SetParanoidChecks(true);
options.SetMaxOpenFiles(int.MaxValue);
options.SetKeepLogFileNum(5);
options.SetMaxLogFileSize(100_000);
options.SetAllowConcurrentMemtableWrite(true);
// The default column family is unused on purpose.
var defaultOptions = new ColumnFamilyOptions() { };
var columnFamilies = new ColumnFamilies(defaultOptions);
var cursorOptions = new ColumnFamilyOptions()
.SetCompression(Compression.Zstd);
columnFamilies.Add(new ColumnFamilies.Descriptor(nameof(ColumnFamily.Cursors), cursorOptions));
foreach (var namespaceId in configuration.BlobNamespaceIds)
{
contentCf = store.GetColumn(nameof(Columns.Content));
fingerprintCf = store.GetColumn(nameof(Columns.Fingerprints));
}).ThrowIfFailure();
Contract.Assert(contentCf != null);
Contract.Assert(fingerprintCf != null);
return (contentCf, fingerprintCf);
}
protected static Possible<KeyValueStoreAccessor> CreateAccessor(Configuration configuration)
{
// WIP: review these settings. Mostly copy/pasted from RocksDbContentLocationDatabase
var settings = new RocksDbStoreConfiguration(configuration.DatabasePath)
{
AdditionalColumns = new[] { nameof(Columns.Content), nameof(Columns.Fingerprints) },
RotateLogsMaxFileSizeBytes = (ulong)"1MB".ToSize(),
RotateLogsNumFiles = 10,
RotateLogsMaxAge = TimeSpan.FromHours(3),
FastOpen = true,
ReadOnly = false,
DisableAutomaticCompactions = false,
LeveledCompactionDynamicLevelTargetSizes = true,
Compression = Compression.Zstd,
UseReadOptionsWithSetTotalOrderSeekInDbEnumeration = true,
UseReadOptionsWithSetTotalOrderSeekInGarbageCollection = true,
};
settings.MergeOperators.Add(
nameof(Columns.Content),
MergeOperators.CreateAssociative(
"ContentRefCountMergeOperator",
var contentOptions = new ColumnFamilyOptions()
.SetCompression(Compression.Zstd)
.SetMergeOperator(MergeOperators.CreateAssociative(
$"ContentRefCountMergeOperator_{namespaceId.Universe}_{namespaceId.Namespace}",
(key, value1, value2, result) =>
MergeContentRefCount(value1, value2, result)));
return KeyValueStoreAccessor.Open(settings);
var fingerprintsOptions = new ColumnFamilyOptions()
.SetCompression(Compression.Zstd);
columnFamilies.Add(new ColumnFamilies.Descriptor(GetColumnFamilyName(ColumnFamily.Content, namespaceId), contentOptions));
columnFamilies.Add(new ColumnFamilies.Descriptor(GetColumnFamilyName(ColumnFamily.Fingerprints, namespaceId), fingerprintsOptions));
Hack.Add(contentOptions);
Hack.Add(fingerprintsOptions);
}
Directory.CreateDirectory(configuration.DatabasePath);
var db = RocksDb.Open(options, path: configuration.DatabasePath, columnFamilies);
return db;
}
public IAccessor GetAccessor(BlobNamespaceId namespaceId)
{
return new Accessor(this, namespaceId);
}
private static bool MergeContentRefCount(
@ -214,7 +236,29 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
return true;
}
public BoolResult AddContentHashList(ContentHashList contentHashList, IReadOnlyCollection<(ContentHash hash, long size)> hashes)
public void SetCreationTime(DateTime creationTimeUtc)
{
using var key = _serializationPool.SerializePooled(CreationTimeKey, static (string s, ref SpanWriter writer) => writer.Write(s));
var existing = Get<DateTime?>(key.WrittenSpan, cfHandle: null, reader => reader.ReadDateTime());
Contract.Assert(existing is null, "DB creation time can only be set once.");
using var value = _serializationPool.SerializePooled(creationTimeUtc, static (DateTime instance, ref SpanWriter writer) => writer.Write(instance));
_db.Put(key.WrittenSpan, value.WrittenSpan, cf: null);
}
public DateTime? GetCreationTime()
{
using var key = _serializationPool.SerializePooled(CreationTimeKey, static (string s, ref SpanWriter writer) => writer.Write(s));
return Get<DateTime?>(key.WrittenSpan, cfHandle: null, reader => reader.ReadDateTime());
}
private void AddContentHashList(
ContentHashList contentHashList,
IReadOnlyCollection<(ContentHash hash, long size)> hashes,
ColumnFamilyHandle contentCf,
ColumnFamilyHandle fingerprintCf)
{
var batch = new WriteBatch();
@ -231,13 +275,13 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
var value = _serializationPool.SerializePooled(entry, static (MetadataEntry instance, ref SpanWriter writer) => instance.Serialize(writer));
disposables.Add(value);
batch.Put(key.WrittenSpan, value.WrittenSpan, _fingerprintCf);
batch.Put(key.WrittenSpan, value.WrittenSpan, fingerprintCf);
foreach (var (hash, size) in hashes)
{
var hashKey = _serializationPool.SerializePooled(
hash,
static (ContentHash instance, ref SpanWriter writer) => writer.Write(instance));
static (ContentHash instance, ref SpanWriter writer) => HashSerializationExtensions.Write(ref writer, instance));
disposables.Add(hashKey);
var contentEntry = new ContentEntry(BlobSize: size, ReferenceCount: 1);
@ -247,17 +291,10 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
static (ContentEntry instance, ref SpanWriter writer) => instance.Serialize(writer));
disposables.Add(valueSpan);
batch.Merge(hashKey.WrittenSpan, valueSpan.WrittenSpan, _contentCf);
batch.Merge(hashKey.WrittenSpan, valueSpan.WrittenSpan, contentCf);
}
var status = _keyValueStore.Use(store =>
{
store.ApplyBatch(batch);
});
return status.Succeeded
? BoolResult.Success
: new BoolResult(status.Failure.DescribeIncludingInnerFailures());
_db.Write(batch, _writeOptions);
}
finally
{
@ -268,9 +305,11 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
}
public BoolResult DeleteContentHashList(
private void DeleteContentHashList(
string contentHashListName,
IReadOnlyCollection<ContentHash> hashes)
IReadOnlyCollection<ContentHash> hashes,
ColumnFamilyHandle contentCf,
ColumnFamilyHandle fingerprintCf)
{
var batch = new WriteBatch();
@ -282,14 +321,14 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
var key = _serializationPool.SerializePooled(contentHashListName, static (string s, ref SpanWriter writer) => writer.Write(s));
disposables.Add(key);
batch.Delete(key.WrittenSpan, _fingerprintCf);
batch.Delete(key.WrittenSpan, fingerprintCf);
// Decrement reference count for all the contents.
foreach (var hash in hashes)
{
var hashKey = _serializationPool.SerializePooled(
hash,
static (ContentHash instance, ref SpanWriter writer) => writer.Write(instance));
static (ContentHash instance, ref SpanWriter writer) => HashSerializationExtensions.Write(ref writer, instance));
disposables.Add(hashKey);
var value = new ContentEntry(BlobSize: 0, ReferenceCount: -1);
@ -299,17 +338,10 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
static (ContentEntry instance, ref SpanWriter writer) => instance.Serialize(writer));
disposables.Add(valueSpan);
batch.Merge(hashKey.WrittenSpan, valueSpan.WrittenSpan, _contentCf);
batch.Merge(hashKey.WrittenSpan, valueSpan.WrittenSpan, contentCf);
}
var status = _keyValueStore.Use(store =>
{
store.ApplyBatch(batch);
});
return status.Succeeded
? BoolResult.Success
: new BoolResult(status.Failure.DescribeIncludingInnerFailures());
_db.Write(batch, _writeOptions);
}
finally
{
@ -320,29 +352,33 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
}
}
public BoolResult AddContent(ContentHash hash, long blobSize)
public string? GetCursor(string accountName)
{
var status = _keyValueStore.Use(store =>
{
using var key = _serializationPool.SerializePooled(
hash,
static (ContentHash instance, ref SpanWriter writer) => writer.Write(instance));
var value = new ContentEntry(BlobSize: blobSize, ReferenceCount: 0);
using var valueSpan = _serializationPool.SerializePooled(
value,
static (ContentEntry instance, ref SpanWriter writer) => instance.Serialize(writer));
store.Merge(key.WrittenSpan, valueSpan.WrittenSpan, columnFamilyName: nameof(Columns.Content));
});
return status.Succeeded
? BoolResult.Success
: new BoolResult(status.Failure.DescribeIncludingInnerFailures());
using var key = _serializationPool.SerializePooled(accountName, static (string instance, ref SpanWriter writer) => writer.Write(instance));
return Get(key.WrittenSpan, _cursorsCf, static reader => reader.ReadString());
}
public Result<EnumerationResult> GetLruOrderedContentHashLists(OperationContext context)
public void SetCursor(string accountName, string cursor)
{
using var key = _serializationPool.SerializePooled(accountName, static (string instance, ref SpanWriter writer) => writer.Write(instance));
using var value = _serializationPool.SerializePooled(cursor, static (string instance, ref SpanWriter writer) => writer.Write(instance));
_db.Put(key.WrittenSpan, value.WrittenSpan, _cursorsCf);
}
private void AddContent(ContentHash hash, long blobSize, ColumnFamilyHandle contentCf)
{
using var key = _serializationPool.SerializePooled(hash, static (ContentHash instance, ref SpanWriter writer) => HashSerializationExtensions.Write(ref writer, instance));
var value = new ContentEntry(BlobSize: blobSize, ReferenceCount: 0);
using var valueSpan = _serializationPool.SerializePooled(
value,
static (ContentEntry instance, ref SpanWriter writer) => instance.Serialize(writer));
_db.Merge(key.WrittenSpan, valueSpan.WrittenSpan, contentCf);
}
private EnumerationResult GetLruOrderedContentHashLists(OperationContext context, ColumnFamilyHandle fingerprintsCf, ColumnFamilyHandle contentCf)
{
// LRU enumeration works by first doing a first pass of the entire database to
// estimate the percentiles for content hash lists' last access time. After that first pass,
@ -359,44 +395,29 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
long firstPassScannedEntries = 0;
var lastAccessDeltaSketch = new DDSketch();
var status = _keyValueStore.Use(store =>
{
store.PrefixLookup(
state: string.Empty,
prefix: ReadOnlySpan<byte>.Empty,
columnFamilyName: nameof(Columns.Fingerprints),
(state, key, value) =>
IterateDbContent(
iterator =>
{
MetadataEntry metadataEntry;
try
{
if (context.Token.IsCancellationRequested)
{
return false;
}
metadataEntry = MetadataEntry.Deserialize(iterator.Value().AsReader());
}
catch
{
Tracer.Error(context, $"Failure to deserialize span: {Convert.ToHexString(iterator.Value())}");
throw;
}
MetadataEntry metadataEntry;
try
{
metadataEntry = MetadataEntry.Deserialize(value.AsReader());
}
catch
{
Tracer.Error(context, $"Failure to deserialize span: {Convert.ToHexString(value)}");
throw;
}
var lastAccessDelta = now - metadataEntry.LastAccessTime;
lastAccessDeltaSketch.Insert(lastAccessDelta.TotalMilliseconds);
var lastAccessDelta = now - metadataEntry.LastAccessTime;
lastAccessDeltaSketch.Insert(lastAccessDelta.TotalMilliseconds);
totalSize += metadataEntry.BlobSize;
firstPassScannedEntries++;
return true;
});
});
if (!status.Succeeded)
{
return new Result<EnumerationResult>(status.Failure.DescribeIncludingInnerFailures());
}
totalSize += metadataEntry.BlobSize;
firstPassScannedEntries++;
},
fingerprintsCf,
startKey: null,
context.Token);
IEnumerable<ContentHashList> chlEnumerable = Array.Empty<ContentHashList>();
@ -421,7 +442,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
var newQuantile = Math.Max(0, previousQuantile - _configuration.LruEnumerationPercentileStep);
var upperLimit = now - TimeSpan.FromMilliseconds(lastAccessDeltaSketch.Quantile(newQuantile));
chlEnumerable = chlEnumerable.Concat(EnumerateContentHashLists(context, previousLimit, upperLimit));
chlEnumerable = chlEnumerable.Concat(EnumerateContentHashLists(context, previousLimit, upperLimit, fingerprintsCf));
previousLimit = upperLimit;
previousQuantile = newQuantile;
@ -433,36 +454,29 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
long zeroRefContent = 0;
long zeroRefContentSize = 0;
long contentBlobCount = 0;
status = _keyValueStore.Use(store =>
{
store.IterateDbContent(
iterator =>
IterateDbContent(
iterator =>
{
var contentEntry = ContentEntry.Deserialize(iterator.Value().AsReader());
totalSize += contentEntry.BlobSize;
firstPassScannedEntries++;
contentBlobCount++;
if (contentEntry.ReferenceCount == 0)
{
var contentEntry = ContentEntry.Deserialize(iterator.Value().AsReader());
totalSize += contentEntry.BlobSize;
firstPassScannedEntries++;
contentBlobCount++;
if (contentEntry.ReferenceCount == 0)
{
zeroRefContent++;
zeroRefContentSize += contentEntry.BlobSize;
}
},
columnFamilyName: nameof(Columns.Content),
startValue: (byte[]?)null,
context.Token);
});
if (!status.Succeeded)
{
return new Result<EnumerationResult>(status.Failure.DescribeIncludingInnerFailures());
}
zeroRefContent++;
zeroRefContentSize += contentEntry.BlobSize;
}
},
contentCf,
startKey: null,
context.Token);
if (firstPassScannedEntries == 0)
{
Tracer.Info(context, $"No entries in database. Early stopping.");
return new Result<EnumerationResult>(new EnumerationResult(Array.Empty<ContentHashList>(), Array.Empty<(ContentHash, long)>(), totalSize: 0));
return new EnumerationResult(Array.Empty<ContentHashList>(), Array.Empty<(ContentHash, long)>(), totalSize: 0);
}
Tracer.Info(context, $"Content enumeration complete. TotalContentBlobs=[{contentBlobCount}] ZeroReferenceBlobCount=[{zeroRefContent}], ZeroReferenceBlobSize=[{zeroRefContentSize}]");
@ -472,15 +486,16 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
context.Token.ThrowIfCancellationRequested();
var zeroReferenceEnumerable = EnumerateZeroRefContent(context);
var zeroReferenceEnumerable = EnumerateZeroRefContent(context, contentCf);
return new Result<EnumerationResult>(new EnumerationResult(chlEnumerable, zeroReferenceEnumerable, totalSize));
return new EnumerationResult(chlEnumerable, zeroReferenceEnumerable, totalSize);
}
private IEnumerable<ContentHashList> EnumerateContentHashLists(
OperationContext context,
DateTime lowerLimitNonInclusive,
DateTime upperLimitInclusive)
DateTime upperLimitInclusive,
ColumnFamilyHandle fingerprintsCf)
{
Tracer.Debug(context, $"Starting enumeration of DB for values in range ({lowerLimitNonInclusive}, {upperLimitInclusive}]");
@ -494,7 +509,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
lowerLimitNonInclusive,
upperLimitInclusive,
_configuration.LruEnumerationBatchSize,
nextKey)),
nextKey,
fingerprintsCf)),
messageFactory: r => r.Succeeded
? $"ReachedEnd={r.Value.reachedEnd}, NextKey=[{(r.Value.nextKey is null ? "null" : Convert.ToHexString(r.Value.nextKey))}]"
: string.Empty
@ -524,7 +540,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
yield break;
}
private IEnumerable<(ContentHash content, long length)> EnumerateZeroRefContent(OperationContext context)
private IEnumerable<(ContentHash content, long length)> EnumerateZeroRefContent(OperationContext context, ColumnFamilyHandle contentCf)
{
Tracer.Info(context, "Starting enumeration of zero-reference content");
@ -536,7 +552,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
() => new Result<(IReadOnlyList<(ContentHash, long)>, bool reachedEnd, byte[]? nextKey)>(GetZeroRefContentHashBatch(
context,
_configuration.LruEnumerationBatchSize,
nextKey)),
nextKey,
contentCf)),
messageFactory: r => r.Succeeded
? $"ReachedEnd={r.Value.reachedEnd}, NextKey=[{(r.Value.nextKey is null ? "null" : Convert.ToHexString(r.Value.nextKey))}]"
: string.Empty
@ -573,7 +590,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
DateTime lowerLimitNonInclusive,
DateTime upperLimitInclusive,
int limit,
byte[]? startKey)
byte[]? startKey,
ColumnFamilyHandle fingerprintsCf)
{
return GetEnumerationBatch(
context,
@ -592,13 +610,14 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
result.Add(new ContentHashList(blobName, metadataEntry.LastAccessTime, metadataEntry.Hashes, metadataEntry.BlobSize));
}
},
nameof(Columns.Fingerprints));
fingerprintsCf);
}
private (IReadOnlyList<(ContentHash, long)> batch, bool reachedEnd, byte[]? next) GetZeroRefContentHashBatch(
OperationContext context,
int limit,
byte[]? startKey)
byte[]? startKey,
ColumnFamilyHandle contentCf)
{
return GetEnumerationBatch(
context,
@ -614,17 +633,18 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
result.Add((hash, contentEntry.BlobSize));
}
},
nameof(Columns.Content));
contentCf);
}
private delegate void EnumerationBatchHandler<T>(ref ReadOnlySpan<byte> key, ref ReadOnlySpan<byte> value, IList<T> result);
private delegate T SpanReaderDeserializer<T>(SpanReader reader);
private (IReadOnlyList<T> batch, bool reachedEnd, byte[]? next) GetEnumerationBatch<T>(
OperationContext context,
int limit,
byte[]? startKey,
EnumerationBatchHandler<T> handle,
string columnFamily)
ColumnFamilyHandle cfHandle)
{
Contract.Requires(limit > 0);
@ -632,112 +652,271 @@ namespace BuildXL.Cache.BlobLifetimeManager.Library
byte[]? nextInitialKey = null;
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.Token);
var status = _keyValueStore!.Use(store =>
{
return store.IterateDbContent(
iterator =>
var iterationResult = IterateDbContent(
iterator =>
{
var key = iterator.Key();
if (result.Count == limit)
{
var key = iterator.Key();
nextInitialKey = key.ToArray();
cts.Cancel();
return;
}
if (result.Count == limit)
{
nextInitialKey = key.ToArray();
cts.Cancel();
return;
}
var value = iterator.Value();
var value = iterator.Value();
handle(ref key, ref value, result);
},
cfHandle,
startKey,
cts.Token);
handle(ref key, ref value, result);
},
columnFamilyName: columnFamily,
startValue: startKey,
cts.Token);
}).ThrowIfFailure();
return (result, status.Result.ReachedEnd, nextInitialKey);
return (result, iterationResult.ReachedEnd, nextInitialKey);
}
public BoolResult UpdateContentHashListLastAccessTime(ContentHashList contentHashList)
private IterateDbContentResult IterateDbContent(
Action<Iterator> onNextItem,
ColumnFamilyHandle cfHandle,
byte[]? startKey,
CancellationToken token)
{
using (var iterator = _db.NewIterator(cfHandle, _readOptions))
{
if (startKey is null)
{
iterator.SeekToFirst();
}
else
{
iterator.Seek(startKey);
}
while (iterator.Valid() && !token.IsCancellationRequested)
{
onNextItem(iterator);
iterator.Next();
}
}
return new IterateDbContentResult() { ReachedEnd = !token.IsCancellationRequested, Canceled = token.IsCancellationRequested, };
}
private void UpdateContentHashListLastAccessTime(ContentHashList contentHashList, ColumnFamilyHandle fingerprintsCf)
{
// We can simply override the existing value.
var entry = new MetadataEntry(contentHashList.BlobSize, contentHashList.LastAccessTime, contentHashList.Hashes);
using var key = _serializationPool.SerializePooled(contentHashList.BlobName, static (string s, ref SpanWriter writer) => writer.Write(s));
using var value = _serializationPool.SerializePooled(entry, static (MetadataEntry instance, ref SpanWriter writer) => instance.Serialize(writer));
var status = _keyValueStore.Use(store =>
{
store.Put(key.WrittenSpan, value.WrittenSpan, nameof(Columns.Fingerprints));
});
return status.Succeeded
? BoolResult.Success
: new BoolResult(status.Failure.DescribeIncludingInnerFailures());
_db.Put(key.WrittenSpan, value.WrittenSpan, fingerprintsCf, _writeOptions);
}
public BoolResult DeleteContent(ContentHash hash)
private void DeleteContent(ContentHash hash, ColumnFamilyHandle contentCf)
{
var status = _keyValueStore.Use(store =>
{
using var key = _serializationPool.SerializePooled(
using var key = _serializationPool.SerializePooled(
hash,
static (ContentHash instance, ref SpanWriter writer) => writer.Write(instance));
static (ContentHash instance, ref SpanWriter writer) => HashSerializationExtensions.Write(ref writer, instance));
store.Remove(key.WrittenSpan, columnFamilyName: nameof(Columns.Content));
});
_db.Remove(key.WrittenSpan, contentCf, _writeOptions);
}
if (!status.Succeeded)
private MetadataEntry? GetContentHashList(StrongFingerprint strongFingerprint, ColumnFamilyHandle fingerprintsCf, out string? blobPath)
{
blobPath = AzureBlobStorageMetadataStore.GetBlobPath(strongFingerprint);
using var key = _serializationPool.SerializePooled(blobPath, static (string s, ref SpanWriter writer) => writer.Write(s));
return Get(key.WrittenSpan, fingerprintsCf, MetadataEntry.Deserialize);
}
private ContentEntry? GetContentEntry(ContentHash hash, ColumnFamilyHandle contentCf)
{
using var key = _serializationPool.SerializePooled(
hash,
static (ContentHash instance, ref SpanWriter writer) => HashSerializationExtensions.Write(ref writer, instance));
return Get(key.WrittenSpan, contentCf, ContentEntry.Deserialize);
}
private T? Get<T>(ReadOnlySpan<byte> key, ColumnFamilyHandle? cfHandle, SpanReaderDeserializer<T> deserializer)
{
var pinnable = _db.UnsafeGetPinnable(key, cfHandle, _readOptions);
if (pinnable is null)
{
return new BoolResult(status.Failure.DescribeIncludingInnerFailures());
return default;
}
return BoolResult.Success;
}
internal MetadataEntry? TryGetContentHashList(StrongFingerprint strongFingerprint, out string? blobPath)
{
string? path = null;
var result = _keyValueStore.Use(store =>
using (pinnable)
{
path = AzureBlobStorageMetadataStore.GetBlobPath(strongFingerprint);
using var key = _serializationPool.SerializePooled(path, static (string s, ref SpanWriter writer) => writer.Write(s));
var found = store.TryDeserializeValue(
key.WrittenSpan,
columnFamilyName: nameof(Columns.Fingerprints),
MetadataEntry.Deserialize,
out var result);
return result;
});
blobPath = path;
return result.Result;
}
internal ContentEntry? GetContentEntry(ContentHash hash)
{
var result = _keyValueStore.Use(store =>
{
using var key = _serializationPool.SerializePooled(
hash,
static (ContentHash instance, ref SpanWriter writer) => writer.Write(instance));
var found = store.TryDeserializeValue<ContentEntry>(
key.WrittenSpan,
columnFamilyName: nameof(Columns.Content),
ContentEntry.Deserialize,
out var result);
return result;
});
return result.Result;
var spanReader = pinnable.Value.UnsafePin().AsReader();
return deserializer(spanReader)!;
}
}
public void Dispose()
{
_keyValueStore.Dispose();
_db.Dispose();
}
private static string GetColumnFamilyName(ColumnFamily column, BlobNamespaceId namespaceId) => $"{column}-{namespaceId}";
/// <summary>
/// Provides a view into the database where all the data corresponds to a given cache namespace.
/// </summary>
public interface IAccessor
{
void AddContent(ContentHash hash, long blobSize);
internal ContentEntry? GetContentEntry(ContentHash hash);
void DeleteContent(ContentHash hash);
void AddContentHashList(ContentHashList contentHashList, IReadOnlyCollection<(ContentHash hash, long size)> hashes);
internal MetadataEntry? GetContentHashList(StrongFingerprint strongFingerprint, out string? blobPath);
void DeleteContentHashList(string contentHashListName, IReadOnlyCollection<ContentHash> hashes);
EnumerationResult GetLruOrderedContentHashLists(OperationContext context);
void UpdateContentHashListLastAccessTime(ContentHashList contentHashList);
}
private class Accessor : IAccessor
{
private readonly RocksDbLifetimeDatabase _database;
private readonly BlobNamespaceId _namespaceId;
private readonly ColumnFamilyHandle _contentCf;
private readonly ColumnFamilyHandle _fingerprintCf;
public Accessor(RocksDbLifetimeDatabase database, BlobNamespaceId namespaceId)
{
_database = database;
_namespaceId = namespaceId;
_contentCf = database._db.GetColumnFamily(GetColumnFamilyName(ColumnFamily.Content));
_fingerprintCf = database._db.GetColumnFamily(GetColumnFamilyName(ColumnFamily.Fingerprints));
}
public void AddContent(ContentHash hash, long blobSize) => _database.AddContent(hash, blobSize, _contentCf);
public void AddContentHashList(ContentHashList contentHashList, IReadOnlyCollection<(ContentHash hash, long size)> hashes)
=> _database.AddContentHashList(contentHashList, hashes, _contentCf, _fingerprintCf);
public void DeleteContent(ContentHash hash) => _database.DeleteContent(hash, _contentCf);
public void DeleteContentHashList(string contentHashListName, IReadOnlyCollection<ContentHash> hashes)
=> _database.DeleteContentHashList(contentHashListName, hashes, _contentCf, _fingerprintCf);
public EnumerationResult GetLruOrderedContentHashLists(OperationContext context)
=> _database.GetLruOrderedContentHashLists(context, _fingerprintCf, _contentCf);
public void UpdateContentHashListLastAccessTime(ContentHashList contentHashList)
=> _database.UpdateContentHashListLastAccessTime(contentHashList, _fingerprintCf);
private string GetColumnFamilyName(ColumnFamily column) => RocksDbLifetimeDatabase.GetColumnFamilyName(column, _namespaceId);
public ContentEntry? GetContentEntry(ContentHash hash) => _database.GetContentEntry(hash, _contentCf);
MetadataEntry? IAccessor.GetContentHashList(StrongFingerprint strongFingerprint, out string? blobPath)
=> _database.GetContentHashList(strongFingerprint, _fingerprintCf, out blobPath);
}
/// <summary>
/// <see cref="CheckpointManager"/> is designed around having a database that will be modified on the fly.
/// <see cref="RocksDbLifetimeDatabase"/> does not need this use case, since it is only restored during start of execution, and saved during
/// the end of execution of GC. Thus, it's possible for the DB to have a simpler design/state if we add this wrapper.
/// </summary>
public class CheckpointableLifetimeDatabase : StartupShutdownBase, ICheckpointable
{
protected override Tracer Tracer { get; } = new Tracer(nameof(CheckpointableLifetimeDatabase));
public RocksDbLifetimeDatabase? Database { get; set; }
private readonly Configuration _config;
private readonly IClock _clock;
public CheckpointableLifetimeDatabase(Configuration config, IClock clock)
{
_config = config;
_clock = clock;
}
public RocksDbLifetimeDatabase GetDatabase()
{
if (Database is null)
{
throw new InvalidOperationException("A checkpoint must be restored before attempting to use the database.");
}
return Database;
}
public bool IsImmutable(AbsolutePath filePath)
{
return filePath.Path.EndsWith(".sst", StringComparison.OrdinalIgnoreCase);
}
public BoolResult RestoreCheckpoint(OperationContext context, AbsolutePath checkpointDirectory)
{
Contract.Assert(Database is null);
return context.PerformOperation(
Tracer,
() =>
{
// Make sure the parent directory exists, but the destination directory doesn't.
Directory.CreateDirectory(_config.DatabasePath);
FileUtilities.DeleteDirectoryContents(_config.DatabasePath, deleteRootDirectory: true);
Directory.Move(checkpointDirectory.ToString(), _config.DatabasePath);
var db = CreateDb(_config);
Database = new RocksDbLifetimeDatabase(_config, _clock, db);
return BoolResult.Success;
});
}
public BoolResult SaveCheckpoint(OperationContext context, AbsolutePath checkpointDirectory)
{
Contract.Assert(Database is not null);
return context.PerformOperation(
Tracer,
() =>
{
// Make sure the parent directory exists, but the destination directory doesn't.
Directory.CreateDirectory(checkpointDirectory.Path);
FileUtilities.DeleteDirectoryContents(checkpointDirectory.Path, deleteRootDirectory: true);
var checkpoint = Database._db.Checkpoint();
checkpoint.Save(checkpointDirectory.Path);
return BoolResult.Success;
});
}
public void SetGlobalEntry(string key, string? value)
{
Contract.Assert(Database is not null);
using var keySpan = Database._serializationPool.SerializePooled(key, static (string instance, ref SpanWriter writer) => writer.Write(instance));
if (value is null)
{
Database._db.Remove(keySpan.WrittenSpan);
}
else
{
using var valueSpan = Database._serializationPool.SerializePooled(value, static (string instance, ref SpanWriter writer) => writer.Write(instance));
Database._db.Put(keySpan.WrittenSpan, valueSpan.WrittenSpan);
}
}
public bool TryGetGlobalEntry(string key, [NotNullWhen(true)] out string? value)
{
Contract.Assert(Database is not null);
using var keySpan = Database._serializationPool.SerializePooled(key, static (string instance, ref SpanWriter writer) => writer.Write(instance));
value = Database.Get(keySpan.WrittenSpan, cfHandle: null, (SpanReader r) => r.ReadString());
return value is not null;
}
}
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using FluentAssertions;
using Xunit;
namespace BuildXL.Cache.BlobLifetimeManager.Test
{
public class AbsoluteBlobPathTests
{
[Fact]
public void CanParseFromSubject()
{
var container = "contentv0-matrix-universe-namespace";
var path = @"some\path\to\the\file.blob";
var subject = $@"/blobServices/default/containers/{container}/blobs/{path}";
var accountName = BlobCacheStorageAccountName.Parse("theaccount");
var absolutePath = AbsoluteBlobPath.ParseFromChangeEventSubject(accountName, subject);
absolutePath.Account.Should().Be(accountName);
absolutePath.Path.Path.Should().Be(path);
absolutePath.Container.ContainerName.Should().Be(container);
}
}
}

Просмотреть файл

@ -21,14 +21,15 @@ using ContentStoreTest.Test;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using System.IO;
namespace BuildXL.Cache.BlobLifetimeManager.Test
{
[Collection("Redis-based tests")]
[Trait("Category", "WindowsOSOnly")] // 'redis-server' executable no longer exists
public class BlobLifetimeManagerTests : LifetimeDatabaseTestBase
public class BlobGarbageCollectorTests : LifetimeDatabaseTestBase
{
public BlobLifetimeManagerTests(LocalRedisFixture redis, ITestOutputHelper output)
public BlobGarbageCollectorTests(LocalRedisFixture redis, ITestOutputHelper output)
: base(redis, output) { }
[Fact]
@ -36,7 +37,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
{
var context = new OperationContext(new Context(TestGlobal.Logger));
return RunTest(context,
async (topology, session) =>
async (topology, session, namespaceId, _) =>
{
var putResult = await session.PutRandomAsync(context, HashType.Vso0, provideHash: false, size: 1, CancellationToken.None).ThrowIfFailure();
var openResult = await session.OpenStreamAsync(context, putResult.ContentHash, CancellationToken.None).ThrowIfFailure();
@ -53,10 +54,24 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
var addChlResult = await session.AddOrGetContentHashListAsync(context, fp, chl, CancellationToken.None).ThrowIfFailure();
await session.GetContentHashListAsync(context, fp, CancellationToken.None).ThrowIfFailure();
using var db = await new LifetimeDatabaseCreator(SystemClock.Instance, topology).CreateAsync(context, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1);
var temp = Path.Combine(Path.GetTempPath(), "LifetimeDatabase", Guid.NewGuid().ToString());
var dbConfig = new RocksDbLifetimeDatabase.Configuration
{
DatabasePath = temp,
LruEnumerationPercentileStep = 0.05,
LruEnumerationBatchSize = 1000,
BlobNamespaceIds = new[] { namespaceId },
};
using var db = await LifetimeDatabaseCreator.CreateAsync(
context,
dbConfig,
SystemClock.Instance,
contentDegreeOfParallelism: 1,
fingerprintDegreeOfParallelism: 1,
n => topology).ThrowIfFailureAsync();
var manager = new Library.BlobLifetimeManager(db, topology, SystemClock.Instance);
await manager.GarbageCollectAsync(context, maxSize: 0, dryRun: false, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1).ThrowIfFailure();
var manager = new BlobQuotaKeeper(db.GetAccessor(namespaceId), topology, lastAccessTimeDeletionThreshold: TimeSpan.Zero, SystemClock.Instance);
await manager.EnsureUnderQuota(context, maxSize: 0, dryRun: false, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1).ThrowIfFailure();
(await session.OpenStreamAsync(context, putResult.ContentHash, CancellationToken.None)).Code.Should().Be(OpenStreamResult.ResultCode.ContentNotFound);
(await session.OpenStreamAsync(context, putResult2.ContentHash, CancellationToken.None)).Code.Should().Be(OpenStreamResult.ResultCode.ContentNotFound);
@ -70,7 +85,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
{
var context = new OperationContext(new Context(TestGlobal.Logger));
return RunTest(context,
async (topology, session) =>
async (topology, session, namespaceId, secretsProvider) =>
{
// Because the LRU ordering isn't expected to be perfect because we're only calculating rough quantile estimates,
// we need to have a deterministic seed to our randomness to avoid test flakyness.
@ -110,7 +125,23 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
fingerprints.Add(sf);
}
using var db = await new LifetimeDatabaseCreator(SystemClock.Instance, topology).CreateAsync(context, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1);
var temp = Path.Combine(Path.GetTempPath(), "LifetimeDatabase", Guid.NewGuid().ToString());
var dbConfig = new RocksDbLifetimeDatabase.Configuration
{
DatabasePath = temp,
LruEnumerationPercentileStep = 0.05,
LruEnumerationBatchSize = 1000,
BlobNamespaceIds = new[] { namespaceId },
};
using var db = await LifetimeDatabaseCreator.CreateAsync(
context,
dbConfig,
SystemClock.Instance,
contentDegreeOfParallelism: 1,
fingerprintDegreeOfParallelism: 1,
n => topology).ThrowIfFailureAsync();
var accessor = db.GetAccessor(namespaceId);
// This here is a bit of a hack. Since we don't have any control of the azure emulator's clock, we'll have to trick
// the manager into thinking that the last access times are what we want them to be. Otherwise, the emulator gives us identical
@ -123,13 +154,12 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
for (var i = 0; i < totalFingerprints; i++)
{
clock.Increment(TimeSpan.FromHours(1));
var entry = db.TryGetContentHashList(fingerprints[i], out var blobPath);
db.UpdateContentHashListLastAccessTime(new Library.ContentHashList(BlobName: blobPath!, LastAccessTime: clock.UtcNow, entry!.Hashes, entry.BlobSize))
.ThrowIfFailure();
var entry = accessor.GetContentHashList(fingerprints[i], out var blobPath);
accessor.UpdateContentHashListLastAccessTime(new Library.ContentHashList(BlobName: blobPath!, LastAccessTime: clock.UtcNow, entry!.Hashes, entry.BlobSize));
}
// Since all CHLs have the same number of hashes, they should all have the same size
var metadataSize = db.TryGetContentHashList(fingerprints[0], out _)!.BlobSize;
var metadataSize = accessor.GetContentHashList(fingerprints[0], out _)!.BlobSize;
var fingerprintsToKeep = 2;
var contentToKeep = fingerprintsToKeep * contentPerFingerprint + 1;
@ -137,8 +167,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
var fingerprintSizeToKeep = fingerprintsToKeep * metadataSize;
var maxSize = contentSizeToKeep + fingerprintSizeToKeep;
var manager = new Library.BlobLifetimeManager(db, topology, clock);
await manager.GarbageCollectAsync(context, maxSize: maxSize, dryRun: false, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1).ThrowIfFailure();
var manager = new Library.BlobQuotaKeeper(accessor, topology, lastAccessTimeDeletionThreshold: TimeSpan.Zero, clock);
await manager.EnsureUnderQuota(context, maxSize: maxSize, dryRun: false, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1).ThrowIfFailure();
for (var i = 0; i < (totalContent - contentToKeep); i++)
{

Просмотреть файл

@ -0,0 +1,267 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Threading;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.BlobLifetimeManager.Library;
using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
using ContentStoreTest.Distributed.Redis;
using ContentStoreTest.Test;
using Xunit.Abstractions;
using System.Threading.Tasks;
using Xunit;
using System.Collections.Generic;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using Azure;
using Azure.Storage.Blobs.ChangeFeed;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using System.Linq;
using BuildXL.Cache.MemoizationStore.Stores;
using System.IO;
using Azure.Core;
using System.Diagnostics.CodeAnalysis;
using FluentAssertions;
namespace BuildXL.Cache.BlobLifetimeManager.Test
{
public class BlobLifetimeManagerTests : LifetimeDatabaseTestBase
{
public BlobLifetimeManagerTests(LocalRedisFixture redis, ITestOutputHelper output) : base(redis, output)
{
}
[Fact]
public Task BlobLifetimeManagerTest()
{
var context = new OperationContext(new Context(TestGlobal.Logger));
return RunTest(context,
async (topology, session, namespaceId, secretsProvider) =>
{
var cycles = 10;
var fpsPerCycle = 3;
var contentPerFp = 2;
var contentSize = 10;
const int FpSize = 44; // This is the size of a fingerprint without counting the CHL.
const int HashSizeInFp = 34;
var fpBlobSize = FpSize + contentPerFp * HashSizeInFp;
var totalSizePerFp = fpBlobSize + contentPerFp * contentSize;
var gcLimit = totalSizePerFp * fpsPerCycle * 2;
cycles.Should().BeGreaterThan(2, "We want to actually garbage collect content");
var config = new BlobQuotaKeeperConfig
{
LastAccessTimeDeletionThreshold = TimeSpan.Zero,
Namespaces = new List<GarbageCollectionNamespaceConfig>
{
// 300 bytes (1 CHL + 2 pieces of content = 132 bytes)
new GarbageCollectionNamespaceConfig(namespaceId.Universe, namespaceId.Namespace, (1.0 / 1024 / 1024 / 1024) * gcLimit)
}
};
var accounts = new List<BlobCacheStorageAccountName>();
await foreach (var client in topology.EnumerateClientsAsync(context, BlobCacheContainerPurpose.Metadata))
{
accounts.Add(BlobCacheStorageAccountName.Parse(client.AccountName));
}
var pages = new List<Page<IBlobChangeFeedEvent>>();
for (var i = 0; i < cycles; i++)
{
foreach (var _ in Enumerable.Range(0, fpsPerCycle))
{
await PutRandomContentHashListAsync(context, session, contentCount: 3, contentSize: 10, topology, pages, SystemClock.Instance);
}
await new TestBlobLifetimeManager(pages).RunAsync(
context,
config,
secretsProvider,
accounts,
SystemClock.Instance,
runId: $"run#{i}",
contentDegreeOfParallelism: 1,
fingerprintDegreeOfParallelism: 1,
dryRun: false);
}
});
}
private static async Task PutRandomContentHashListAsync(
OperationContext context,
ICacheSession session,
int contentCount,
int contentSize,
IBlobCacheTopology topology,
List<Page<IBlobChangeFeedEvent>> pages,
IClock clock)
{
var changes = new List<IBlobChangeFeedEvent>();
var hashes = new ContentHash[contentCount];
for (var i = 0; i < contentCount; i++)
{
var putResult = await session.PutRandomAsync(context, HashType.Vso0, provideHash: false, size: contentSize, CancellationToken.None).ThrowIfFailure();
hashes[i] = putResult.ContentHash;
var blobClient = await topology.GetBlobClientAsync(context, putResult.ContentHash);
var fullName = $"/blobServices/default/containers/{blobClient.BlobContainerName}/blobs/{blobClient.Name}";
var change = new MockChange()
{
Subject = fullName,
EventTime = clock.GetUtcNow(),
EventType = BlobChangeFeedEventType.BlobCreated,
ContentLength = contentSize,
};
changes.Add(change);
}
var chl = new ContentHashListWithDeterminism(
new MemoizationStore.Interfaces.Sessions.ContentHashList(hashes), CacheDeterminism.None);
var fp = StrongFingerprint.Random();
var r = await session.AddOrGetContentHashListAsync(context, fp, chl, CancellationToken.None).ThrowIfFailure();
var containerClient = await topology.GetContainerClientAsync(context, BlobCacheShardingKey.FromWeakFingerprint(fp.WeakFingerprint));
var blobName = AzureBlobStorageMetadataStore.GetBlobPath(fp);
var size = (await containerClient.GetBlobClient(blobName).GetPropertiesAsync()).Value.ContentLength;
var fpChange = new MockChange()
{
Subject = $"/blobServices/default/containers/{containerClient.Name}/blobs/{blobName}",
EventTime = clock.GetUtcNow(),
EventType = BlobChangeFeedEventType.BlobCreated,
ContentLength = size,
};
changes.Add(fpChange);
TestDispatcher.AddPage(changes, pages);
}
private class TestBlobLifetimeManager : Library.BlobLifetimeManager
{
public TestDispatcher? TestDispatcher;
private readonly List<Page<IBlobChangeFeedEvent>> _pages;
public TestBlobLifetimeManager(List<Page<IBlobChangeFeedEvent>> pages) => _pages = pages;
protected override AzureStorageChangeFeedEventDispatcher CreateDispatcher(
IBlobCacheSecretsProvider secretsProvider,
IReadOnlyList<BlobCacheStorageAccountName> accountNames,
string metadataMatrix,
string contentMatrix,
RocksDbLifetimeDatabase db,
LifetimeDatabaseUpdater updater,
IClock clock)
{
return TestDispatcher = new TestDispatcher(secretsProvider, accountNames, updater, db, clock, metadataMatrix, contentMatrix, _pages);
}
}
private class TestDispatcher : AzureStorageChangeFeedEventDispatcher
{
public readonly List<Page<IBlobChangeFeedEvent>> Pages;
public TestDispatcher(
IBlobCacheSecretsProvider secretsProvider,
IReadOnlyList<BlobCacheStorageAccountName> accounts,
LifetimeDatabaseUpdater updater,
RocksDbLifetimeDatabase db,
IClock clock,
string metadataMatrix,
string contentMatrix,
List<Page<IBlobChangeFeedEvent>> pages)
: base(secretsProvider, accounts, updater, db, clock, metadataMatrix, contentMatrix)
=> Pages = pages;
internal override IChangeFeedClient CreateChangeFeedClient(AzureStorageCredentials creds)
{
return new TestFeedClient(Pages);
}
public static void AddPage(List<IBlobChangeFeedEvent> changes, List<Page<IBlobChangeFeedEvent>> pages)
{
var page = Page<IBlobChangeFeedEvent>.FromValues(changes, continuationToken: pages.Count.ToString(), new MockResponse());
pages.Add(page);
}
private class MockResponse : Response
{
public override int Status => 200;
public override string ReasonPhrase => string.Empty;
public override Stream? ContentStream { get => null; set { } }
public override string ClientRequestId { get => string.Empty; set { } }
public override void Dispose()
{
}
protected override bool ContainsHeader(string name) => false;
protected override IEnumerable<HttpHeader> EnumerateHeaders() => Enumerable.Empty<HttpHeader>();
protected override bool TryGetHeader(string name, [NotNullWhen(true)] out string? value)
{
value = null;
return false;
}
protected override bool TryGetHeaderValues(string name, [NotNullWhen(true)] out IEnumerable<string>? values)
{
values = null;
return false;
}
}
}
private class TestFeedClient : IChangeFeedClient
{
private readonly List<Page<IBlobChangeFeedEvent>> _pages;
public TestFeedClient(List<Page<IBlobChangeFeedEvent>> pages) => _pages = pages;
public async IAsyncEnumerable<Page<IBlobChangeFeedEvent>> GetChangesAsync(string? continuationToken)
{
if (!int.TryParse(continuationToken, out var skip))
{
skip = 0;
}
foreach (var page in _pages.Skip(skip))
{
await Task.Yield();
yield return page;
}
}
public async IAsyncEnumerable<Page<IBlobChangeFeedEvent>> GetChangesAsync(DateTime? startTimeUtc)
{
// This is intended to skip events that happened before DB creation. In the case of these tests, there are none.
foreach (var page in _pages)
{
await Task.Yield();
yield return page;
}
}
}
private class MockChange : IBlobChangeFeedEvent
{
public DateTimeOffset EventTime { get; init; }
public BlobChangeFeedEventType EventType { get; init; }
public required string Subject { get; init; }
public long ContentLength { get; init; }
}
}
}

Просмотреть файл

@ -3,7 +3,7 @@
namespace BlobLifetimeManagerTest {
@@public
export const dll = !BuildXLSdk.Flags.isMicrosoftInternal ? undefined : BuildXLSdk.cacheTest({
export const dll = BuildXLSdk.cacheTest({
assemblyName: "BuildXL.Cache.BlobLifetimeManager.Test",
sources: globR(d`.`, "*.cs"),
skipTestRun: BuildXLSdk.restrictTestRunToSomeQualifiers,

Просмотреть файл

@ -19,6 +19,7 @@ using ContentStoreTest.Test;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using System.IO;
namespace BuildXL.Cache.BlobLifetimeManager.Test
{
@ -34,7 +35,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
{
var context = new OperationContext(new Context(TestGlobal.Logger));
return RunTest(context,
async (topology, session) =>
async (topology, session, namespaceId, secretsProvider) =>
{
// Add content/fingerprints to the L3
@ -63,26 +64,42 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
await session.AddOrGetContentHashListAsync(context, sf2, contentHashList2, CancellationToken.None).ThrowIfFailure();
// Create the DB
var creator = new LifetimeDatabaseCreator(SystemClock.Instance, topology);
using var db = await creator.CreateAsync(context, contentDegreeOfParallelism: 1, fingerprintDegreeOfParallelism: 1);
var temp = Path.Combine(Path.GetTempPath(), "LifetimeDatabase", Guid.NewGuid().ToString());
var dbConfig = new RocksDbLifetimeDatabase.Configuration
{
DatabasePath = temp,
LruEnumerationPercentileStep = 0.05,
LruEnumerationBatchSize = 1000,
BlobNamespaceIds = new[] { namespaceId },
};
var contentEntry = db.GetContentEntry(contentResults[0]);
using var db = await LifetimeDatabaseCreator.CreateAsync(
context,
dbConfig,
SystemClock.Instance,
contentDegreeOfParallelism: 1,
fingerprintDegreeOfParallelism: 1,
n => topology).ThrowIfFailureAsync();
var accessor = db.GetAccessor(namespaceId);
var contentEntry = accessor.GetContentEntry(contentResults[0]);
contentEntry!.BlobSize.Should().Be(16);
contentEntry!.ReferenceCount.Should().Be(1);
contentEntry = db.GetContentEntry(contentResults[1]);
contentEntry = accessor.GetContentEntry(contentResults[1]);
contentEntry!.BlobSize.Should().Be(16);
contentEntry!.ReferenceCount.Should().Be(2);
contentEntry = db.GetContentEntry(contentResults[2]);
contentEntry = accessor.GetContentEntry(contentResults[2]);
contentEntry!.BlobSize.Should().Be(16);
contentEntry!.ReferenceCount.Should().Be(1);
var fpEntry = db.TryGetContentHashList(sf1, out _);
var fpEntry = accessor.GetContentHashList(sf1, out _);
fpEntry!.Hashes.Should().ContainInOrder(contentHashList1.ContentHashList!.Hashes);
fpEntry.LastAccessTime.Should().BeAfter(DateTime.UtcNow.AddMinutes(-1));
fpEntry = db.TryGetContentHashList(sf2, out _);
fpEntry = accessor.GetContentHashList(sf2, out _);
fpEntry!.Hashes.Should().BeEquivalentTo(contentHashList2.ContentHashList!.Hashes);
fpEntry.LastAccessTime.Should().BeAfter(DateTime.UtcNow.AddMinutes(-1));
});

Просмотреть файл

@ -5,6 +5,7 @@ using System;
using System.Diagnostics.ContractsLight;
using System.Linq;
using System.Threading.Tasks;
using BuildXL.Cache.BlobLifetimeManager.Library;
using BuildXL.Cache.ContentStore.Distributed.Blob;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
@ -40,7 +41,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
public LifetimeDatabaseTestBase(LocalRedisFixture redis, ITestOutputHelper output)
: base(output) => _fixture = redis;
protected async Task RunTest(OperationContext context, Func<IBlobCacheTopology, ICacheSession, Task> run)
protected async Task RunTest(OperationContext context, Func<IBlobCacheTopology, ICacheSession, BlobNamespaceId, IBlobCacheSecretsProvider, Task> run)
{
var shards = Enumerable.Range(0, 10).Select(shard => (BlobCacheStorageAccountName)new BlobCacheStorageShardingAccountName("0123456789", shard, "testing")).ToList();
@ -58,12 +59,15 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
return (Account: account, Credentials: credentials);
}).ToDictionary(kvp => kvp.Account, kvp => kvp.Credentials);
var secretsProvider = new StaticBlobCacheSecretsProvider(credentials);
var namespaceId = new BlobNamespaceId(_runId, "default");
var topology = new ShardedBlobCacheTopology(
new ShardedBlobCacheTopology.Configuration(
new ShardingScheme(ShardingAlgorithm.JumpHash, credentials.Keys.ToList()),
SecretsProvider: new StaticBlobCacheSecretsProvider(credentials),
Universe: _runId,
Namespace: "default"));
SecretsProvider: secretsProvider,
namespaceId.Universe,
namespaceId.Namespace));
var blobMetadataStore = new AzureBlobStorageMetadataStore(new BlobMetadataStoreConfiguration
{
@ -109,7 +113,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
await session!.StartupAsync(context).ThrowIfFailure();
await run(topology, session);
await run(topology, session, namespaceId, secretsProvider);
}
}
}

Просмотреть файл

@ -15,19 +15,20 @@ using BuildXL.Cache.ContentStore.InterfacesTest.Time;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.UtilitiesCore;
using BuildXL.Cache.BlobLifetimeManager.Library;
using BuildXL.Engine.Cache.KeyValueStores;
using ContentStoreTest.Test;
using FluentAssertions;
using RocksDbSharp;
using Xunit;
using Xunit.Abstractions;
using static BuildXL.Cache.BlobLifetimeManager.Library.RocksDbLifetimeDatabase;
using BuildXL.Cache.ContentStore.Distributed.Blob;
namespace BuildXL.Cache.BlobLifetimeManager.Test
{
public class RocksDbLifetimeDatabaseTests : TestWithOutput
{
private readonly MemoryClock _clock = new();
private static readonly BlobNamespaceId NamespaceId = new BlobNamespaceId("universe", "namespace");
public RocksDbLifetimeDatabaseTests(ITestOutputHelper output) : base(output)
{
@ -42,6 +43,7 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
var config = new Configuration
{
DatabasePath = temp.Path.Path,
BlobNamespaceIds = new[] { NamespaceId }
};
setupConfig?.Invoke(config);
@ -62,6 +64,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
// perfectly ordered.
using var db = CreateDatabase(config => config.LruEnumerationPercentileStep = 1.0 / chlCount);
var accessor = db.GetAccessor(NamespaceId);
// Make distribution uniform to help with non-determinism as much as possible.
var lastAccessTimes = Enumerable.Range(1, chlCount).Select(hoursAgo => _clock.UtcNow.AddHours(-hoursAgo)).ToList();
ThreadSafeRandom.Shuffle(lastAccessTimes);
@ -72,11 +76,11 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
// Add content to the DB to verify total size is accurate.
foreach (var chlWithContentLength in chlsWithContentLengths)
{
db.AddContentHashList(chlWithContentLength.chl, chlWithContentLength.hashesWithLengths).ThrowIfFailure();
accessor.AddContentHashList(chlWithContentLength.chl, chlWithContentLength.hashesWithLengths);
contentSize += chlWithContentLength.hashesWithLengths.Sum(h => h.length);
}
var result = db.GetLruOrderedContentHashLists(context).ThrowIfFailure();
var result = accessor.GetLruOrderedContentHashLists(context);
result.TotalSize.Should().Be(chlsWithContentLengths.Sum(chl => chl.chl.BlobSize) + contentSize);
var currentBatch = 0;
var currentLowerLimitNonInclusive = DateTime.MinValue;
@ -106,14 +110,14 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
// Now check that deletion works
foreach (var chlWithContentLengths in chlsWithContentLengths)
{
db.DeleteContentHashList(chlWithContentLengths.chl.BlobName, chlWithContentLengths.chl.Hashes).ThrowIfFailure();
accessor.DeleteContentHashList(chlWithContentLengths.chl.BlobName, chlWithContentLengths.chl.Hashes);
foreach (var hash in chlWithContentLengths.hashesWithLengths)
{
db.DeleteContent(hash.hash).ThrowIfFailure();
accessor.DeleteContent(hash.hash);
}
}
result = db.GetLruOrderedContentHashLists(context).ThrowIfFailure();
result = accessor.GetLruOrderedContentHashLists(context);
result.TotalSize.Should().Be(0);
result.LruOrderedContentHashLists.Count().Should().Be(0);
}
@ -152,14 +156,16 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
config.LruEnumerationPercentileStep = config.LruEnumerationPercentileStep = 1.0;
});
var accessor = db.GetAccessor(NamespaceId);
var chlsWithContentLengths = Enumerable.Range(0, chlCount).Select(_ => RandomContentHashList()).ToArray();
foreach (var chlWithContentLengths in chlsWithContentLengths)
{
db.AddContentHashList(chlWithContentLengths.chl, chlWithContentLengths.hashesWithLengths).ThrowIfFailure();
accessor.AddContentHashList(chlWithContentLengths.chl, chlWithContentLengths.hashesWithLengths);
}
var result = db.GetLruOrderedContentHashLists(context).ThrowIfFailure();
var result = accessor.GetLruOrderedContentHashLists(context);
var current = 0;
foreach (var chl in result.LruOrderedContentHashLists)
{
@ -172,48 +178,68 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
public void IncrementAndDecrementRefCount()
{
using var db = CreateDatabase();
var accessor = db.GetAccessor(NamespaceId);
var hash = ContentHash.Random();
var chlWithContentLengths1 = RandomContentHashList((hash, 1L));
db.AddContentHashList(chlWithContentLengths1.chl, chlWithContentLengths1.hashesWithLengths).ThrowIfFailure();
db.GetContentEntry(hash)!.ReferenceCount.Should().Be(1);
db.GetContentEntry(hash)!.BlobSize.Should().Be(1);
accessor.AddContentHashList(chlWithContentLengths1.chl, chlWithContentLengths1.hashesWithLengths);
accessor.GetContentEntry(hash)!.ReferenceCount.Should().Be(1);
accessor.GetContentEntry(hash)!.BlobSize.Should().Be(1);
var chlWithContentLengths2 = RandomContentHashList((hash, 0L));
db.AddContentHashList(chlWithContentLengths2.chl, chlWithContentLengths2.hashesWithLengths).ThrowIfFailure();
db.GetContentEntry(hash)!.ReferenceCount.Should().Be(2);
db.GetContentEntry(hash)!.BlobSize.Should().Be(1);
accessor.AddContentHashList(chlWithContentLengths2.chl, chlWithContentLengths2.hashesWithLengths);
accessor.GetContentEntry(hash)!.ReferenceCount.Should().Be(2);
accessor.GetContentEntry(hash)!.BlobSize.Should().Be(1);
db.DeleteContentHashList(chlWithContentLengths1.chl.BlobName, chlWithContentLengths1.chl.Hashes).ThrowIfFailure();
db.GetContentEntry(hash)!.ReferenceCount.Should().Be(1);
db.GetContentEntry(hash)!.BlobSize.Should().Be(1);
accessor.DeleteContentHashList(chlWithContentLengths1.chl.BlobName, chlWithContentLengths1.chl.Hashes);
accessor.GetContentEntry(hash)!.ReferenceCount.Should().Be(1);
accessor.GetContentEntry(hash)!.BlobSize.Should().Be(1);
db.DeleteContentHashList(chlWithContentLengths2.chl.BlobName, chlWithContentLengths2.chl.Hashes).ThrowIfFailure();
db.GetContentEntry(hash)!.ReferenceCount.Should().Be(0);
db.GetContentEntry(hash)!.BlobSize.Should().Be(1);
accessor.DeleteContentHashList(chlWithContentLengths2.chl.BlobName, chlWithContentLengths2.chl.Hashes);
accessor.GetContentEntry(hash)!.ReferenceCount.Should().Be(0);
accessor.GetContentEntry(hash)!.BlobSize.Should().Be(1);
}
[Fact]
public void DeleteContent()
{
using var db = CreateDatabase();
var accessor = db.GetAccessor(NamespaceId);
var chlWithContentLengths = RandomContentHashList();
db.AddContentHashList(chlWithContentLengths.chl, chlWithContentLengths.hashesWithLengths).ThrowIfFailure();
accessor.AddContentHashList(chlWithContentLengths.chl, chlWithContentLengths.hashesWithLengths);
var fpSize = chlWithContentLengths.chl.BlobSize + chlWithContentLengths.hashesWithLengths.Sum(h => h.length);
var hash = ContentHash.Random();
db.AddContent(hash, 1).ThrowIfFailure();
accessor.AddContent(hash, 1);
var context = new OperationContext(new Context(TestGlobal.Logger));
var result = db.GetLruOrderedContentHashLists(context).ThrowIfFailure();
var result = accessor.GetLruOrderedContentHashLists(context);
result.TotalSize.Should().Be(fpSize + 1);
db.DeleteContent(hash).ThrowIfFailure();
result = db.GetLruOrderedContentHashLists(context).ThrowIfFailure();
accessor.DeleteContent(hash);
result = accessor.GetLruOrderedContentHashLists(context);
result.TotalSize.Should().Be(fpSize);
}
[Fact]
public void CursorSetAndGet()
{
using var db = CreateDatabase();
db.SetCursor("account1", "c1");
db.SetCursor("account2", "c2");
db.GetCursor("account1").Should().Be("c1");
db.GetCursor("account2").Should().Be("c2");
db.SetCursor("account1", "c3");
db.SetCursor("account2", "c4");
db.GetCursor("account1").Should().Be("c3");
db.GetCursor("account2").Should().Be("c4");
}
}
public class BatchCountingDatabase : RocksDbLifetimeDatabase
@ -221,10 +247,8 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
private BatchCountingDatabase(
Configuration configuration,
IClock clock,
KeyValueStoreAccessor keyValueStore,
ColumnFamilyHandle content,
ColumnFamilyHandle fingerprint)
: base(configuration, clock, keyValueStore, content, fingerprint)
RocksDb db)
: base(configuration, clock, db)
{
}
@ -232,22 +256,20 @@ namespace BuildXL.Cache.BlobLifetimeManager.Test
Configuration configuration,
IClock clock)
{
var possibleStore = CreateAccessor(configuration);
var (contentCf, fingerprintCf) = GetColumnFamilies(possibleStore.Result);
return new BatchCountingDatabase(configuration, clock, possibleStore.Result, contentCf, fingerprintCf);
var db = CreateDb(configuration);
return new BatchCountingDatabase(configuration, clock, db);
}
public int BatchCount { get; set; }
public DateTime CurrentUpperLimitInclusive { get; set; }
public DateTime CurrentLowerLimitNonInclusive { get; set; }
protected override (IReadOnlyList<ContentHashList> batch, bool reachedEnd, byte[]? next) GetEnumerateContentHashListsBatch(OperationContext context, DateTime lowerLimitNonInclusive, DateTime upperLimitInclusive, int limit, byte[]? startKey)
protected override (IReadOnlyList<ContentHashList> batch, bool reachedEnd, byte[]? next) GetEnumerateContentHashListsBatch(OperationContext context, DateTime lowerLimitNonInclusive, DateTime upperLimitInclusive, int limit, byte[]? startKey, ColumnFamilyHandle fingerprintsCf)
{
BatchCount++;
CurrentLowerLimitNonInclusive = lowerLimitNonInclusive;
CurrentUpperLimitInclusive = upperLimitInclusive;
return base.GetEnumerateContentHashListsBatch(context, lowerLimitNonInclusive, upperLimitInclusive, limit, startKey);
return base.GetEnumerateContentHashListsBatch(context, lowerLimitNonInclusive, upperLimitInclusive, limit, startKey, fingerprintsCf);
}
}
}

Просмотреть файл

@ -43,7 +43,7 @@ namespace BuildXL.Cache.MemoizationStore.Distributed.Test
{
PublishAsynchronously = publishAsynchronously,
Configuration = new Stores.AzureBlobStorageCacheFactory.Configuration(
ShardingScheme: new ShardingScheme(ShardingAlgorithm.SingleShard, new() { BlobCacheStorageAccountName.Parse("devstoreaccount1") }),
ShardingScheme: new ShardingScheme(ShardingAlgorithm.SingleShard, new List<BlobCacheStorageAccountName>() { BlobCacheStorageAccountName.Parse("devstoreaccount1") }),
Universe: "default",
Namespace: "default",
RetentionPolicyInDays: 0)

Просмотреть файл

@ -1,4 +1,4 @@
$rgName = "blobl3-juguzman-centralus"
$rgName = "blobl3-cbtest-centralus"
$subscription = "bf933bbb-8131-491c-81d9-26d7b6f327fa"
az account set --subscription $subscription

Просмотреть файл

@ -20,6 +20,12 @@ param shards int
@maxLength(9)
param purpose string
@allowed([
'service'
'engine'
])
param gcStrategy string
// The following creates one storage account per shard that's going to be used. This is where fingerprints and content
// are stored, and what both the datacenter and dev cache will access to obtain cache hits.
module shard 'shard.bicep' = [for shard in range(0, shards): {
@ -30,5 +36,6 @@ module shard 'shard.bicep' = [for shard in range(0, shards): {
kind: kind
shard: shard
purpose: purpose
gcStrategy: gcStrategy
}
}]

Просмотреть файл

@ -27,6 +27,12 @@ param shards int
@description('String that indicates who or what this instance is for. MUST be all lowercase letters and numbers')
param purpose string
@allowed([
'service'
'engine'
])
param gcStrategy string = 'service'
targetScope = 'subscription'
resource resourceGroup 'Microsoft.Resources/resourceGroups@2021-04-01' = [for locidx in range(0, length(locations)): {
@ -43,5 +49,6 @@ module BlobL3Module 'location.bicep' = [for locidx in range(0, length(locations)
kind: kind
shards: shards
purpose: purpose
gcStrategy: gcStrategy
}
}]

Просмотреть файл

@ -4,16 +4,18 @@
"parameters": {
"locations": {
"value": [
"eastus2",
"northcentralus",
"southcentralus",
"centralus"
]
},
"shards": {
"value": 10
"value": 100
},
"purpose": {
"value": "jubayard"
"value": "juguzman"
},
"gcStrategy": {
"value": "service"
}
}
}

Просмотреть файл

@ -20,6 +20,12 @@ param shard int
@maxLength(9)
param purpose string
@allowed([
'service'
'engine'
])
param gcStrategy string
// See: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-feature-support-in-storage-accounts
// SKU:
// 'Premium_LRS'
@ -80,7 +86,7 @@ resource storageAccount 'Microsoft.Storage/storageAccounts@2021-09-01' = {
}
}
resource lifecycleManagement 'managementPolicies@2021-09-01' = {
resource lifecycleManagement 'managementPolicies@2021-09-01' = if (gcStrategy == 'engine') {
name: 'default'
dependsOn: [
blobService

Просмотреть файл

@ -154,6 +154,15 @@
}
}
},
{
"Component": {
"Type": "NuGet",
"NuGet": {
"Name": "Azure.Storage.Blobs.ChangeFeed",
"Version": "12.0.0-preview.34"
}
}
},
{
"Component": {
"Type": "NuGet",

Просмотреть файл

@ -233,6 +233,8 @@ config({
{ id: "System.IO.Hashing", version: "6.0.0",
dependentPackageIdsToSkip: [ "System.Buffers", "System.Memory" ] },
{ id: "Azure.Storage.Blobs.Batch", version: "12.10.0" },
{ id: "Azure.Storage.Blobs.ChangeFeed", version: "12.0.0-preview.34",
dependentPackageIdsToSkip: [ "System.Text.Json" ] },
// xUnit
{ id: "xunit.abstractions", version: "2.0.3" },