зеркало из https://github.com/microsoft/BuildXL.git
Merged PR 542833: Remove in-memory cache for location database
Remove in-memory cache for location database Related work items: #1701136
This commit is contained in:
Родитель
7b2650c5ba
Коммит
e49b9ab000
|
@ -9,7 +9,6 @@ using System.Threading;
|
|||
using System.Threading.Tasks;
|
||||
using BuildXL.Cache.ContentStore.Distributed.NuCache.InMemory;
|
||||
using BuildXL.Cache.ContentStore.Distributed.Utilities;
|
||||
using BuildXL.Cache.ContentStore.Extensions;
|
||||
using BuildXL.Cache.ContentStore.Hashing;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Results;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Time;
|
||||
|
@ -26,6 +25,8 @@ using BuildXL.Utilities.Tracing;
|
|||
using static BuildXL.Cache.ContentStore.Distributed.Tracing.TracingStructuredExtensions;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
||||
#nullable enable
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
||||
{
|
||||
/// <summary>
|
||||
|
@ -58,8 +59,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
|
||||
private readonly Func<IReadOnlyList<MachineId>> _getInactiveMachines;
|
||||
|
||||
private Timer _gcTimer;
|
||||
private NagleQueue<(Context, ShortHash hash, EntryOperation op, OperationReason reason)> _nagleOperationTracer;
|
||||
private Timer? _gcTimer;
|
||||
private NagleQueue<(Context, ShortHash hash, EntryOperation op, OperationReason reason)>? _nagleOperationTracer;
|
||||
private readonly ContentLocationDatabaseConfiguration _configuration;
|
||||
|
||||
/// <nodoc />
|
||||
|
@ -75,56 +76,18 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
/// </summary>
|
||||
private readonly object[] _locks = Enumerable.Range(0, ushort.MaxValue + 1).Select(s => new object()).ToArray();
|
||||
|
||||
/// <summary>
|
||||
/// Whether the cache is currently being used. Can only possibly be true in master. Only meant for testing
|
||||
/// purposes.
|
||||
/// </summary>
|
||||
internal bool IsInMemoryCacheEnabled { get; private set; } = false;
|
||||
|
||||
private readonly FlushableCache _inMemoryCache;
|
||||
|
||||
/// <summary>
|
||||
/// This counter is not exact, but provides an approximate count. It may be thwarted by flushes and cache
|
||||
/// activate/deactivate events. Its only purpose is to roughly help ensure flushes are more frequent as
|
||||
/// more operations are performed.
|
||||
/// </summary>
|
||||
private long _cacheUpdatesSinceLastFlush = 0;
|
||||
|
||||
/// <summary>
|
||||
/// External users should be tests only
|
||||
/// </summary>
|
||||
internal long CacheUpdatesSinceLastFlush => _cacheUpdatesSinceLastFlush;
|
||||
|
||||
/// <summary>
|
||||
/// Controls cache flushing due to timeout.
|
||||
/// </summary>
|
||||
private Timer _inMemoryCacheFlushTimer;
|
||||
|
||||
/// <nodoc />
|
||||
protected readonly object TimerChangeLock = new object();
|
||||
|
||||
private readonly object _cacheFlushTimerLock = new object();
|
||||
|
||||
|
||||
private readonly object _flushTaskLock = new object();
|
||||
|
||||
/// <summary>
|
||||
/// Currently ongoing flush
|
||||
///
|
||||
/// External users should be tests only
|
||||
/// </summary>
|
||||
internal Task FlushTask { get; private set; } = BoolResult.SuccessTask;
|
||||
|
||||
/// <summary>
|
||||
/// Event callback that's triggered when the database is permanently invalidated.
|
||||
/// </summary>
|
||||
public Action<OperationContext, Failure<Exception>> DatabaseInvalidated;
|
||||
public Action<OperationContext, Failure<Exception>>? DatabaseInvalidated;
|
||||
|
||||
/// <nodoc />
|
||||
protected void OnDatabaseInvalidated(OperationContext context, Failure<Exception> failure)
|
||||
{
|
||||
Contract.Requires(failure != null);
|
||||
|
||||
// Notice that no update to the internal state is required when invalidation happens. By definition,
|
||||
// nothing can be done to this instance after invalidation: all incoming and ongoing operations should fail
|
||||
// (because it is triggered by RocksDb). The only way to resume operation is to reload from a checkpoint,
|
||||
|
@ -143,8 +106,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
_configuration = configuration;
|
||||
_getInactiveMachines = getInactiveMachines;
|
||||
|
||||
_inMemoryCache = new FlushableCache(configuration, this);
|
||||
|
||||
_isMetadataGarbageCollectionEnabled = configuration.MetadataGarbageCollectionEnabled;
|
||||
}
|
||||
|
||||
|
@ -186,7 +147,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
// The parameter indicates whether we will be in writeable state or not after this function runs. The
|
||||
// following calls can see if we transition from read/only to read/write by looking at the internal value
|
||||
ConfigureGarbageCollection(isDatabaseWriteable);
|
||||
ConfigureInMemoryDatabaseCache(isDatabaseWriteable);
|
||||
|
||||
IsDatabaseWriteable = isDatabaseWriteable;
|
||||
}
|
||||
|
@ -202,38 +162,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
_isMetadataGarbageCollectionEnabled = isDatabaseWriteable && _configuration.MetadataGarbageCollectionEnabled;
|
||||
|
||||
var nextGcTimeSpan = IsGarbageCollectionEnabled ? _configuration.GarbageCollectionInterval : Timeout.InfiniteTimeSpan;
|
||||
lock (TimerChangeLock)
|
||||
{
|
||||
_gcTimer?.Change(nextGcTimeSpan, Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
}
|
||||
|
||||
private void ConfigureInMemoryDatabaseCache(bool isDatabaseWritable)
|
||||
{
|
||||
if (_configuration.ContentCacheEnabled)
|
||||
{
|
||||
// This clear is actually safe, as no operations should happen concurrently with this function.
|
||||
_inMemoryCache.UnsafeClear();
|
||||
|
||||
Interlocked.Exchange(ref _cacheUpdatesSinceLastFlush, 0);
|
||||
|
||||
lock (_cacheFlushTimerLock)
|
||||
{
|
||||
IsInMemoryCacheEnabled = isDatabaseWritable;
|
||||
}
|
||||
|
||||
ResetFlushTimer();
|
||||
}
|
||||
}
|
||||
|
||||
private void ResetFlushTimer()
|
||||
{
|
||||
lock (_cacheFlushTimerLock)
|
||||
{
|
||||
var cacheFlushTimeSpan = IsInMemoryCacheEnabled
|
||||
? _configuration.CacheFlushingMaximumInterval
|
||||
: Timeout.InfiniteTimeSpan;
|
||||
|
||||
_inMemoryCacheFlushTimer?.Change(cacheFlushTimeSpan, Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
|
@ -248,20 +181,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
|
||||
if (_configuration.ContentCacheEnabled && _configuration.CacheFlushingMaximumInterval != Timeout.InfiniteTimeSpan)
|
||||
{
|
||||
_inMemoryCacheFlushTimer = new Timer(
|
||||
_ =>
|
||||
{
|
||||
ForceCacheFlush(context.CreateNested(componentName: nameof(ContentLocationDatabase), caller: nameof(ForceCacheFlush)),
|
||||
counter: ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByTimer,
|
||||
blocking: false);
|
||||
},
|
||||
null,
|
||||
Timeout.InfiniteTimeSpan,
|
||||
Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
|
||||
_nagleOperationTracer = NagleQueue<(Context context, ShortHash hash, EntryOperation op, OperationReason reason)>.Create(
|
||||
ops =>
|
||||
{
|
||||
|
@ -293,7 +212,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override async Task<BoolResult> ShutdownCoreAsync(OperationContext context)
|
||||
protected override Task<BoolResult> ShutdownCoreAsync(OperationContext context)
|
||||
{
|
||||
_nagleOperationTracer?.Dispose();
|
||||
|
||||
|
@ -306,22 +225,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
_gcTimer = null;
|
||||
}
|
||||
|
||||
lock (_cacheFlushTimerLock)
|
||||
{
|
||||
#pragma warning disable AsyncFixer02
|
||||
_inMemoryCacheFlushTimer?.Dispose();
|
||||
#pragma warning restore AsyncFixer02
|
||||
|
||||
_inMemoryCacheFlushTimer = null;
|
||||
}
|
||||
|
||||
// NOTE(jubayard): there could be a flush in progress as this is done. Either way, any writes performed
|
||||
// after the last checkpoint will be completely lost. Since no checkpoints will be created after this runs,
|
||||
// it doesn't make any sense to flush here. However, we can't close the DB or anything like that until this
|
||||
// flush is over.
|
||||
await FlushTask;
|
||||
|
||||
return await base.ShutdownCoreAsync(context);
|
||||
return base.ShutdownCoreAsync(context);
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
|
@ -350,16 +254,20 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
public class EnumerationFilter
|
||||
{
|
||||
/// <nodoc />
|
||||
public Func<byte[], bool> ShouldEnumerate { get; set; }
|
||||
public Func<byte[], bool> ShouldEnumerate { get; }
|
||||
|
||||
/// <nodoc />
|
||||
public ShortHash? StartingPoint { get; set; }
|
||||
public ShortHash? StartingPoint { get; }
|
||||
|
||||
/// <nodoc />
|
||||
public EnumerationFilter(Func<byte[], bool> shouldEnumerate, ShortHash? startingPoint) =>
|
||||
(ShouldEnumerate, StartingPoint) = (shouldEnumerate, startingPoint);
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
protected abstract IEnumerable<(ShortHash key, ContentLocationEntry entry)> EnumerateEntriesWithSortedKeysFromStorage(
|
||||
OperationContext context,
|
||||
EnumerationFilter filter = null,
|
||||
EnumerationFilter? filter = null,
|
||||
bool returnKeysOnly = false);
|
||||
|
||||
/// <summary>
|
||||
|
@ -367,14 +275,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
/// </summary>
|
||||
public IEnumerable<(ShortHash key, ContentLocationEntry entry)> EnumerateEntriesWithSortedKeys(
|
||||
OperationContext context,
|
||||
EnumerationFilter filter = null)
|
||||
EnumerationFilter? filter = null)
|
||||
{
|
||||
// Flush only when the database is writable (and the cache is enabled).
|
||||
if (IsDatabaseWriteable && IsInMemoryCacheEnabled)
|
||||
{
|
||||
ForceCacheFlush(context, ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByContentEnumeration, blocking: true);
|
||||
}
|
||||
|
||||
return EnumerateEntriesWithSortedKeysFromStorage(context, filter);
|
||||
}
|
||||
|
||||
|
@ -512,7 +414,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
// If there are some bad locations, remove them.
|
||||
Counters[ContentLocationDatabaseCounters.TotalNumberOfCleanedEntries].Increment();
|
||||
Store(context, hash, filteredEntry);
|
||||
_nagleOperationTracer.Enqueue((context, hash, EntryOperation.RemoveMachine, OperationReason.GarbageCollect));
|
||||
_nagleOperationTracer?.Enqueue((context, hash, EntryOperation.RemoveMachine, OperationReason.GarbageCollect));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -632,13 +534,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
{
|
||||
using (Counters[ContentLocationDatabaseCounters.SaveCheckpoint].Start())
|
||||
{
|
||||
if (IsInMemoryCacheEnabled)
|
||||
{
|
||||
ForceCacheFlush(context,
|
||||
counter: ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByCheckpoint,
|
||||
blocking: true);
|
||||
}
|
||||
|
||||
return context.PerformOperation(Tracer,
|
||||
() => SaveCheckpointCore(context, checkpointDirectory),
|
||||
extraStartMessage: $"CheckpointDirectory=[{checkpointDirectory}]",
|
||||
|
@ -672,16 +567,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
{
|
||||
Counters[ContentLocationDatabaseCounters.NumberOfGetOperations].Increment();
|
||||
|
||||
if (IsInMemoryCacheEnabled && _inMemoryCache.TryGetEntry(hash, out entry))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return TryGetEntryCoreFromStorage(context, hash, out entry);
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
internal abstract void Persist(OperationContext context, ShortHash hash, ContentLocationEntry entry);
|
||||
internal abstract void Persist(OperationContext context, ShortHash hash, ContentLocationEntry? entry);
|
||||
|
||||
/// <nodoc />
|
||||
internal virtual void PersistBatch(OperationContext context, IEnumerable<KeyValuePair<ShortHash, ContentLocationEntry>> pairs)
|
||||
|
@ -693,30 +583,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
}
|
||||
|
||||
/// <nodoc />
|
||||
public void Store(OperationContext context, ShortHash hash, ContentLocationEntry entry)
|
||||
public void Store(OperationContext context, ShortHash hash, ContentLocationEntry? entry)
|
||||
{
|
||||
Counters[ContentLocationDatabaseCounters.NumberOfStoreOperations].Increment();
|
||||
|
||||
if (IsInMemoryCacheEnabled)
|
||||
{
|
||||
_inMemoryCache.Store(context, hash, entry);
|
||||
|
||||
var updates = Interlocked.Increment(ref _cacheUpdatesSinceLastFlush);
|
||||
if (_configuration.CacheMaximumUpdatesPerFlush > 0 && updates >= _configuration.CacheMaximumUpdatesPerFlush && FlushTask.IsCompleted)
|
||||
{
|
||||
// We trigger a flush following the indicated number of operations. However, high load can cause
|
||||
// flushes to run for too long, hence, we trigger the logic every time after we go over the
|
||||
// threshold, just to ensure it gets run when it's needed.
|
||||
ForceCacheFlush(context,
|
||||
counter: ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByUpdates,
|
||||
blocking: false);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Persist(context, hash, entry);
|
||||
Counters[ContentLocationDatabaseCounters.NumberOfPersistedEntries].Increment();
|
||||
}
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
|
@ -725,96 +596,15 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
Store(context, hash, entry: null);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Forces a cache flush.
|
||||
/// </summary>
|
||||
/// <returns>
|
||||
/// The return value is only relevant for tests, and when the in-memory cache is enabled.
|
||||
///
|
||||
/// It is true if the current thread either performed or waited for a flush to finish.
|
||||
/// </returns>
|
||||
internal bool ForceCacheFlush(OperationContext context, ContentLocationDatabaseCounters? counter = null, bool blocking = true)
|
||||
{
|
||||
if (!IsInMemoryCacheEnabled)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool renewed = false;
|
||||
if (FlushTask.IsCompleted)
|
||||
{
|
||||
lock (_flushTaskLock)
|
||||
{
|
||||
if (FlushTask.IsCompleted)
|
||||
{
|
||||
FlushTask = forceCacheFlushAsync(context, counter);
|
||||
renewed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (blocking)
|
||||
{
|
||||
FlushTask.GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
return renewed && blocking;
|
||||
|
||||
Task forceCacheFlushAsync(OperationContext context, ContentLocationDatabaseCounters? counter = null)
|
||||
{
|
||||
if (!IsInMemoryCacheEnabled)
|
||||
{
|
||||
return BoolResult.SuccessTask;
|
||||
}
|
||||
|
||||
return context.PerformOperationAsync(
|
||||
Tracer,
|
||||
async () =>
|
||||
{
|
||||
// NOTE(jubayard): notice that the count of the dictionary is actually the number of unique
|
||||
// updated entries, which can be much less than the number of actual updates performed (i.e. if
|
||||
// the updates are performed on a single entry). We need to make sure we discount the "precise"
|
||||
// number of updates that are written to disk.
|
||||
long flushedEntries = _cacheUpdatesSinceLastFlush;
|
||||
try
|
||||
{
|
||||
var flushCounters = await _inMemoryCache.FlushAsync(context);
|
||||
return Result.Success(flushCounters);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Interlocked.Add(ref _cacheUpdatesSinceLastFlush, -flushedEntries);
|
||||
ResetFlushTimer();
|
||||
|
||||
if (counter != null)
|
||||
{
|
||||
Counters[counter.Value].Increment();
|
||||
}
|
||||
}
|
||||
}, extraEndMessage: maybeCounters =>
|
||||
{
|
||||
if (!maybeCounters.Succeeded)
|
||||
{
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
var counters = maybeCounters.Value;
|
||||
return $"Persisted={counters[FlushableCache.FlushableCacheCounters.Persisted].Value} Leftover={counters[FlushableCache.FlushableCacheCounters.Leftover].Value} Growth={counters[FlushableCache.FlushableCacheCounters.Growth].Value} FlushingTime={counters[FlushableCache.FlushableCacheCounters.FlushingTime].Duration.TotalMilliseconds}ms CleanupTime={counters[FlushableCache.FlushableCacheCounters.CleanupTime].Duration.TotalMilliseconds}ms";
|
||||
}).ThrowIfFailure();
|
||||
}
|
||||
}
|
||||
|
||||
private ContentLocationEntry SetMachineExistenceAndUpdateDatabase(OperationContext context, ShortHash hash, MachineId? machine, bool existsOnMachine, long size, UnixTime? lastAccessTime, bool reconciling)
|
||||
{
|
||||
var created = false;
|
||||
var reason = reconciling ? OperationReason.Reconcile : OperationReason.Unknown;
|
||||
var priorLocationCount = 0;
|
||||
lock (GetLock(hash))
|
||||
{
|
||||
if (TryGetEntryCore(context, hash, out var entry))
|
||||
{
|
||||
var initialEntry = entry;
|
||||
priorLocationCount = entry.Locations.Count;
|
||||
|
||||
// Don't update machines if entry already contains the machine
|
||||
var machines = machine != null && (entry.Locations[machine.Value] != existsOnMachine)
|
||||
|
@ -837,11 +627,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
|
||||
if (existsOnMachine)
|
||||
{
|
||||
_nagleOperationTracer.Enqueue((context, hash, initialEntry.Locations.Count == entry.Locations.Count ? EntryOperation.Touch : EntryOperation.AddMachine, reason));
|
||||
_nagleOperationTracer?.Enqueue((context, hash, initialEntry.Locations.Count == entry.Locations.Count ? EntryOperation.Touch : EntryOperation.AddMachine, reason));
|
||||
}
|
||||
else
|
||||
{
|
||||
_nagleOperationTracer.Enqueue((context, hash, machine == null ? EntryOperation.Touch : EntryOperation.RemoveMachine, reason));
|
||||
_nagleOperationTracer?.Enqueue((context, hash, machine == null ? EntryOperation.Touch : EntryOperation.RemoveMachine, reason));
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -852,7 +642,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
return ContentLocationEntry.Missing;
|
||||
}
|
||||
|
||||
lastAccessTime = lastAccessTime ?? Clock.UtcNow;
|
||||
lastAccessTime ??= Clock.UtcNow;
|
||||
var creationTime = UnixTime.Min(lastAccessTime.Value, Clock.UtcNow.ToUnixTime());
|
||||
|
||||
entry = ContentLocationEntry.Create(MachineIdSet.Empty.SetExistence(new[] { machine.Value }, existsOnMachine), size, lastAccessTime.Value, creationTime);
|
||||
|
@ -887,7 +677,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
{
|
||||
Counters[ContentLocationDatabaseCounters.TotalNumberOfCreatedEntries].Increment();
|
||||
Counters[ContentLocationDatabaseCounters.UniqueContentAddedSize].Add(entry.ContentSize);
|
||||
_nagleOperationTracer.Enqueue((context, hash, EntryOperation.Create, reason));
|
||||
_nagleOperationTracer?.Enqueue((context, hash, EntryOperation.Create, reason));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -899,7 +689,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
{
|
||||
Counters[ContentLocationDatabaseCounters.TotalNumberOfDeletedEntries].Increment();
|
||||
Counters[ContentLocationDatabaseCounters.UniqueContentRemovedSize].Add(size);
|
||||
_nagleOperationTracer.Enqueue((context, hash, EntryOperation.Delete, reason));
|
||||
_nagleOperationTracer?.Enqueue((context, hash, EntryOperation.Delete, reason));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -1037,7 +827,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
});
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
/// <nodoc />
|
||||
public abstract Result<long> GetContentDatabaseSizeBytes();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,56 +29,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
/// </summary>
|
||||
public bool StoreClusterState { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// When activated, the requests effectively sent to the database will be initally done in memory and later on
|
||||
/// flushed to the underlying store.
|
||||
/// </summary>
|
||||
public bool ContentCacheEnabled { get; set; } = false;
|
||||
|
||||
/// <summary>
|
||||
/// Number of threads to use when flushing updates to the underlying storage
|
||||
///
|
||||
/// Only useful when <see cref="ContentCacheEnabled"/> is true.
|
||||
/// </summary>
|
||||
public int FlushDegreeOfParallelism { get; set; } = 1;
|
||||
|
||||
/// <summary>
|
||||
/// Number of entries to pool together into a single transaction when doing multithreaded transactional flush.
|
||||
/// </summary>
|
||||
public int FlushTransactionSize { get; set; } = 100_000;
|
||||
|
||||
/// <summary>
|
||||
/// Whether to use a single transaction to the underlying store when flushing instead of one transaction per
|
||||
/// change.
|
||||
///
|
||||
/// When this setting is on, there is no parallelism done, regardless of
|
||||
/// <see cref="FlushDegreeOfParallelism"/>.
|
||||
///
|
||||
/// Only useful when <see cref="ContentCacheEnabled"/> is true.
|
||||
/// </summary>
|
||||
public bool FlushSingleTransaction { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Percentage of records to maintain in memory after flush
|
||||
///
|
||||
/// Only useful when <see cref="ContentCacheEnabled"/> is true.
|
||||
/// </summary>
|
||||
public double FlushPreservePercentInMemory = 0.5;
|
||||
|
||||
/// <summary>
|
||||
/// The maximum number of updates that we are willing to perform in memory before flushing.
|
||||
///
|
||||
/// Only useful when <see cref="ContentCacheEnabled"/> is true.
|
||||
/// </summary>
|
||||
public int CacheMaximumUpdatesPerFlush { get; set; } = 2_500_000;
|
||||
|
||||
/// <summary>
|
||||
/// The maximum amount of time that can pass without a flush.
|
||||
///
|
||||
/// Only useful when <see cref="ContentCacheEnabled"/> is true.
|
||||
/// </summary>
|
||||
public TimeSpan CacheFlushingMaximumInterval { get; set; } = TimeSpan.FromMinutes(1);
|
||||
|
||||
/// <summary>
|
||||
/// Whether to enable garbage collection of metadata
|
||||
/// </summary>
|
||||
|
|
|
@ -87,40 +87,12 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
/// </summary>
|
||||
EpochMatches,
|
||||
|
||||
/// <nodoc />
|
||||
[CounterType(CounterType.Stopwatch)]
|
||||
CacheFlush,
|
||||
|
||||
/// <nodoc />
|
||||
TotalNumberOfCacheFlushes,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfCacheFlushesTriggeredByUpdates,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfCacheFlushesTriggeredByTimer,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfCacheFlushesTriggeredByReconciliation,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfCacheFlushesTriggeredByCheckpoint,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfCacheFlushesTriggeredByContentEnumeration,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfPersistedEntries,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfGetOperations,
|
||||
|
||||
/// <nodoc />
|
||||
NumberOfStoreOperations,
|
||||
|
||||
/// <nodoc />
|
||||
TotalNumberOfCompletedCacheFlushes,
|
||||
|
||||
/// <nodoc />
|
||||
[CounterType(CounterType.Stopwatch)]
|
||||
GarbageCollectMetadata,
|
||||
|
|
|
@ -10,7 +10,7 @@ using BuildXL.Utilities;
|
|||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
||||
{
|
||||
/// <summary>
|
||||
/// Set of extension methods for <see cref="BuildXL.Cache.ContentStore.Distributed.NuCache.ContentLocationDatabase"/>
|
||||
/// Set of extension methods for <see cref="ContentLocationDatabase"/>
|
||||
/// </summary>
|
||||
public static class ContentLocationDatabaseExtensions
|
||||
{
|
||||
|
@ -44,11 +44,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
MachineId currentMachineId,
|
||||
ShortHash? startingPoint)
|
||||
{
|
||||
var filter = new ContentLocationDatabase.EnumerationFilter
|
||||
{
|
||||
ShouldEnumerate = rawValue => database.HasMachineId(rawValue, currentMachineId.Index),
|
||||
StartingPoint = startingPoint
|
||||
};
|
||||
var filter = new ContentLocationDatabase.EnumerationFilter(
|
||||
rawValue => database.HasMachineId(rawValue, currentMachineId.Index),
|
||||
startingPoint);
|
||||
|
||||
foreach (var (key, entry) in database.EnumerateEntriesWithSortedKeys(context, filter))
|
||||
{
|
||||
|
|
|
@ -1,226 +0,0 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using BuildXL.Cache.ContentStore.Extensions;
|
||||
using BuildXL.Cache.ContentStore.Hashing;
|
||||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
using BuildXL.Utilities.Collections;
|
||||
using BuildXL.Utilities.ParallelAlgorithms;
|
||||
using BuildXL.Utilities.Tasks;
|
||||
using BuildXL.Utilities.Threading;
|
||||
using BuildXL.Utilities.Tracing;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
||||
{
|
||||
/// <summary>
|
||||
/// A cache over the key value store in <see cref="ContentLocationDatabase"/>; made especifically for the needs of
|
||||
/// that class.
|
||||
/// </summary>
|
||||
internal class FlushableCache
|
||||
{
|
||||
private readonly ContentLocationDatabaseConfiguration _configuration;
|
||||
private readonly ContentLocationDatabase _database;
|
||||
|
||||
private ConcurrentBigMap<ShortHash, ContentLocationEntry> _cache = new ConcurrentBigMap<ShortHash, ContentLocationEntry>();
|
||||
private ConcurrentBigMap<ShortHash, ContentLocationEntry> _flushingCache = new ConcurrentBigMap<ShortHash, ContentLocationEntry>();
|
||||
|
||||
private readonly SemaphoreSlim _flushMutex = new SemaphoreSlim(1);
|
||||
private readonly ReadWriteLock _exchangeLock = ReadWriteLock.Create();
|
||||
|
||||
public FlushableCache(ContentLocationDatabaseConfiguration configuration, ContentLocationDatabase database)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_database = database;
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
public void UnsafeClear()
|
||||
{
|
||||
// Order is important here, inverse order could cause deadlock.
|
||||
using (_flushMutex.AcquireSemaphore())
|
||||
using (_exchangeLock.AcquireWriteLock())
|
||||
{
|
||||
if (_cache.Count != 0)
|
||||
{
|
||||
// Nothing guarantees that some number of updates couldn't have happened in between the last flush
|
||||
// and this reset, because acquisition of the write lock happens after the flush lock. The only way
|
||||
// to deal with this situation is to force a flush before this assignment and after locks have been
|
||||
// acquired. However, this isn't required by our code right now; although it is a simple change.
|
||||
_cache = new ConcurrentBigMap<ShortHash, ContentLocationEntry>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
public void Store(OperationContext context, ShortHash hash, ContentLocationEntry entry)
|
||||
{
|
||||
using (_exchangeLock.AcquireReadLock())
|
||||
{
|
||||
_cache[hash] = entry;
|
||||
}
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
public bool TryGetEntry(ShortHash hash, out ContentLocationEntry entry)
|
||||
{
|
||||
using (_exchangeLock.AcquireReadLock())
|
||||
{
|
||||
// The entry could be a tombstone, so we need to make sure the user knows content has actually been
|
||||
// deleted, which is why we check for null.
|
||||
if (_cache.TryGetValue(hash, out entry))
|
||||
{
|
||||
_database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Increment();
|
||||
return entry != null;
|
||||
}
|
||||
else if (_flushingCache.TryGetValue(hash, out entry))
|
||||
{
|
||||
_database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Increment();
|
||||
return entry != null;
|
||||
}
|
||||
}
|
||||
|
||||
_database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Increment();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <nodoc />
|
||||
public async Task<CounterCollection<FlushableCacheCounters>> FlushAsync(OperationContext context)
|
||||
{
|
||||
// Make it likely that this runs in a separate thread other than the one that triggered the flush
|
||||
await Task.Yield();
|
||||
|
||||
// This lock is required to ensure no flushes happen concurrently. We may loose updates if that happens.
|
||||
// AcquireAsync is used so as to avoid multiple concurrent tasks just waiting; this way we return the
|
||||
// task to the thread pool in between.
|
||||
using (await _flushMutex.AcquireAsync())
|
||||
{
|
||||
return PerformFlush(context);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Needs to take the flushing lock. Called only from <see cref="FlushAsync(OperationContext)"/>. Refactored
|
||||
/// out for clarity.
|
||||
/// </summary>
|
||||
private CounterCollection<FlushableCacheCounters> PerformFlush(OperationContext context)
|
||||
{
|
||||
_database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheFlushes].Increment();
|
||||
var counters = new CounterCollection<FlushableCacheCounters>();
|
||||
|
||||
using (_database.Counters[ContentLocationDatabaseCounters.CacheFlush].Start())
|
||||
{
|
||||
using (_exchangeLock.AcquireWriteLock())
|
||||
{
|
||||
_flushingCache = _cache;
|
||||
_cache = new ConcurrentBigMap<ShortHash, ContentLocationEntry>();
|
||||
}
|
||||
|
||||
using (counters[FlushableCacheCounters.FlushingTime].Start()) {
|
||||
var threads = _configuration.FlushDegreeOfParallelism;
|
||||
if (threads <= 0)
|
||||
{
|
||||
threads = Environment.ProcessorCount;
|
||||
}
|
||||
|
||||
if (_configuration.FlushSingleTransaction)
|
||||
{
|
||||
if (_configuration.FlushDegreeOfParallelism == 1 || _flushingCache.Count <= _configuration.FlushTransactionSize)
|
||||
{
|
||||
_database.PersistBatch(context, _flushingCache);
|
||||
}
|
||||
else
|
||||
{
|
||||
var actionBlock = new ActionBlockSlim<IEnumerable<KeyValuePair<ShortHash, ContentLocationEntry>>>(threads, kvs =>
|
||||
{
|
||||
_database.PersistBatch(context, kvs);
|
||||
});
|
||||
|
||||
foreach (var kvs in _flushingCache.GetPages(_configuration.FlushTransactionSize))
|
||||
{
|
||||
actionBlock.Post(kvs);
|
||||
}
|
||||
|
||||
actionBlock.Complete();
|
||||
actionBlock.CompletionAsync().Wait();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var actionBlock = new ActionBlockSlim<KeyValuePair<ShortHash, ContentLocationEntry>>(threads, kv =>
|
||||
{
|
||||
// Do not lock on GetLock here, as it will cause a deadlock with
|
||||
// SetMachineExistenceAndUpdateDatabase. It is correct not do take any locks as well, because
|
||||
// no Store can happen while flush is running.
|
||||
_database.Persist(context, kv.Key, kv.Value);
|
||||
});
|
||||
|
||||
foreach (var kv in _flushingCache)
|
||||
{
|
||||
actionBlock.Post(kv);
|
||||
}
|
||||
|
||||
actionBlock.Complete();
|
||||
actionBlock.CompletionAsync().Wait();
|
||||
}
|
||||
}
|
||||
|
||||
counters[FlushableCacheCounters.Persisted].Add(_flushingCache.Count);
|
||||
|
||||
_database.Counters[ContentLocationDatabaseCounters.NumberOfPersistedEntries].Add(_flushingCache.Count);
|
||||
|
||||
using (counters[FlushableCacheCounters.CleanupTime].Start())
|
||||
{
|
||||
if (_configuration.FlushPreservePercentInMemory > 0)
|
||||
{
|
||||
int targetFlushingSize = (int)(_flushingCache.Count * _configuration.FlushPreservePercentInMemory);
|
||||
int removeAmount = _flushingCache.Count - targetFlushingSize;
|
||||
|
||||
foreach (var key in _flushingCache.Keys.Take(removeAmount))
|
||||
{
|
||||
_flushingCache.RemoveKey(key);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
using (_exchangeLock.AcquireWriteLock())
|
||||
{
|
||||
_flushingCache = new ConcurrentBigMap<ShortHash, ContentLocationEntry>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
counters[FlushableCacheCounters.Leftover].Add(_flushingCache.Count);
|
||||
_database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCompletedCacheFlushes].Increment();
|
||||
}
|
||||
|
||||
counters[FlushableCacheCounters.Growth].Add(_cache.Count);
|
||||
return counters;
|
||||
}
|
||||
|
||||
public enum FlushableCacheCounters
|
||||
{
|
||||
/// <nodoc />
|
||||
Persisted,
|
||||
|
||||
/// <nodoc />
|
||||
Leftover,
|
||||
|
||||
/// <nodoc />
|
||||
Growth,
|
||||
|
||||
/// <nodoc />
|
||||
[CounterType(CounterType.Stopwatch)]
|
||||
FlushingTime,
|
||||
|
||||
/// <nodoc />
|
||||
[CounterType(CounterType.Stopwatch)]
|
||||
CleanupTime
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,7 +5,6 @@ using System;
|
|||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using BuildXL.Cache.ContentStore.Hashing;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Results;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Time;
|
||||
|
|
|
@ -19,7 +19,6 @@ using BuildXL.Cache.ContentStore.Interfaces.Synchronization;
|
|||
using BuildXL.Cache.ContentStore.Interfaces.Time;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Utils;
|
||||
using BuildXL.Cache.ContentStore.Tracing;
|
||||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
using BuildXL.Cache.ContentStore.Utils;
|
||||
using BuildXL.Cache.MemoizationStore.Interfaces.Results;
|
||||
|
@ -32,6 +31,8 @@ using BuildXL.Utilities.Collections;
|
|||
using BuildXL.Utilities.Tasks;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
||||
#nullable enable annotations
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
||||
{
|
||||
/// <summary>
|
||||
|
@ -92,8 +93,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
public RocksDbContentLocationDatabase(IClock clock, RocksDbContentLocationDatabaseConfiguration configuration, Func<IReadOnlyList<MachineId>> getInactiveMachines)
|
||||
: base(clock, configuration, getInactiveMachines)
|
||||
{
|
||||
Contract.Requires(configuration.FlushPreservePercentInMemory >= 0 && configuration.FlushPreservePercentInMemory <= 1);
|
||||
Contract.Requires(configuration.FlushDegreeOfParallelism > 0);
|
||||
Contract.Requires(configuration.MetadataGarbageCollectionMaximumNumberOfEntriesToKeep > 0);
|
||||
|
||||
_configuration = configuration;
|
||||
|
@ -522,7 +521,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
/// <inheritdoc />
|
||||
protected override IEnumerable<(ShortHash key, ContentLocationEntry entry)> EnumerateEntriesWithSortedKeysFromStorage(
|
||||
OperationContext context,
|
||||
EnumerationFilter valueFilter,
|
||||
EnumerationFilter? valueFilter,
|
||||
bool returnKeysOnly)
|
||||
{
|
||||
var token = context.Token;
|
||||
|
@ -625,7 +624,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
internal override void Persist(OperationContext context, ShortHash hash, ContentLocationEntry entry)
|
||||
internal override void Persist(OperationContext context, ShortHash hash, ContentLocationEntry? entry)
|
||||
{
|
||||
if (entry == null)
|
||||
{
|
||||
|
|
|
@ -19,7 +19,6 @@ using BuildXL.Cache.ContentStore.Tracing.Internal;
|
|||
using BuildXL.Utilities.Collections;
|
||||
using ContentStoreTest.Distributed.Redis;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
||||
namespace ContentStoreTest.Distributed.Sessions
|
||||
|
|
|
@ -2723,8 +2723,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
master.LocalLocationStore.Database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCleanedEntries].Value.Should().Be(0, "No entries should be cleaned before GC is called");
|
||||
master.LocalLocationStore.Database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCollectedEntries].Value.Should().Be(0, "No entries should be cleaned before GC is called");
|
||||
|
||||
master.LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
|
||||
master.LocalLocationStore.Database.GarbageCollect(context);
|
||||
|
||||
master.LocalLocationStore.Database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCollectedEntries].Value.Should().Be(1, "After GC, the entry with only a location from the expired machine should be collected");
|
||||
|
@ -3250,7 +3248,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
var master = context.GetMaster();
|
||||
var sessions = context.Sessions;
|
||||
Warmup(context, maximumBatchSize, warmupBatches);
|
||||
context.GetMaster().LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
PrintCacheStatistics(context);
|
||||
|
||||
{
|
||||
|
@ -3269,7 +3266,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
context.SendEventToMaster(ev);
|
||||
}
|
||||
});
|
||||
context.GetMaster().LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
stopWatch.Stop();
|
||||
|
||||
var ts = stopWatch.Elapsed;
|
||||
|
@ -3367,7 +3363,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
var sessions = context.Sessions;
|
||||
var master = context.GetMaster();
|
||||
Warmup(context, maximumBatchSize, warmupBatches);
|
||||
context.GetMaster().LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
PrintCacheStatistics(context);
|
||||
|
||||
{
|
||||
|
@ -3386,7 +3381,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
context.SendEventToMaster(ev);
|
||||
}
|
||||
});
|
||||
context.GetMaster().LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
stopWatch.Stop();
|
||||
|
||||
var ts = stopWatch.Elapsed;
|
||||
|
@ -3513,7 +3507,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
var sessions = context.Sessions;
|
||||
var master = context.GetMaster();
|
||||
Warmup(context, maximumBatchSize, warmupBatches);
|
||||
context.GetMaster().LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
PrintCacheStatistics(context);
|
||||
|
||||
{
|
||||
|
@ -3532,7 +3525,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
context.SendEventToMaster(ev);
|
||||
}
|
||||
});
|
||||
context.GetMaster().LocalLocationStore.Database.ForceCacheFlush(context);
|
||||
stopWatch.Stop();
|
||||
|
||||
var ts = stopWatch.Elapsed;
|
||||
|
@ -3553,15 +3545,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
var db = context.GetMaster().LocalLocationStore.Database;
|
||||
var counters = db.Counters;
|
||||
|
||||
if (db.IsInMemoryCacheEnabled)
|
||||
{
|
||||
Output.WriteLine("CACHE ENABLED");
|
||||
}
|
||||
else
|
||||
{
|
||||
Output.WriteLine("CACHE DISABLED");
|
||||
}
|
||||
|
||||
Output.WriteLine("[Statistics] NumberOfStoreOperations: " + counters[ContentLocationDatabaseCounters.NumberOfStoreOperations].ToString());
|
||||
Output.WriteLine("[Statistics] NumberOfGetOperations: " + counters[ContentLocationDatabaseCounters.NumberOfGetOperations].ToString());
|
||||
Output.WriteLine("[Statistics] TotalNumberOfCacheHit: " + counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].ToString());
|
||||
|
@ -3572,15 +3555,6 @@ namespace ContentStoreTest.Distributed.Sessions
|
|||
double cacheHitRate = ((double)counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value) / ((double)totalCacheRequests);
|
||||
Output.WriteLine("[Statistics] Cache Hit Rate: " + cacheHitRate.ToString());
|
||||
}
|
||||
|
||||
Output.WriteLine("[Statistics] NumberOfPersistedEntries: " + counters[ContentLocationDatabaseCounters.NumberOfPersistedEntries].ToString());
|
||||
Output.WriteLine("[Statistics] TotalNumberOfCacheFlushes: " + counters[ContentLocationDatabaseCounters.TotalNumberOfCacheFlushes].ToString());
|
||||
Output.WriteLine("[Statistics] NumberOfCacheFlushesTriggeredByUpdates: " + counters[ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByUpdates].ToString());
|
||||
Output.WriteLine("[Statistics] NumberOfCacheFlushesTriggeredByTimer: " + counters[ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByTimer].ToString());
|
||||
Output.WriteLine("[Statistics] NumberOfCacheFlushesTriggeredByGarbageCollection: " + counters[ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByReconciliation].ToString());
|
||||
Output.WriteLine("[Statistics] NumberOfCacheFlushesTriggeredByCheckpoint: " + counters[ContentLocationDatabaseCounters.NumberOfCacheFlushesTriggeredByCheckpoint].ToString());
|
||||
|
||||
Output.WriteLine("[Statistics] CacheFlush: " + counters[ContentLocationDatabaseCounters.CacheFlush].ToString());
|
||||
}
|
||||
|
||||
private static List<List<ContentLocationEventData>> GenerateUniquenessWorkload(int numberOfMachines, float cacheHitRatio, int maximumBatchSize, int operationsPerMachine, int? randomSeedOverride = null)
|
||||
|
|
|
@ -1,265 +0,0 @@
|
|||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using BuildXL.Cache.ContentStore.Distributed.NuCache;
|
||||
using BuildXL.Cache.ContentStore.Hashing;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
|
||||
using BuildXL.Cache.ContentStore.InterfacesTest;
|
||||
using BuildXL.Cache.ContentStore.InterfacesTest.Results;
|
||||
using BuildXL.Cache.ContentStore.InterfacesTest.Time;
|
||||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
using ContentStoreTest.Test;
|
||||
using FluentAssertions;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace ContentStoreTest.Distributed.ContentLocation.NuCache
|
||||
{
|
||||
/// <summary>
|
||||
/// These tests are meant to test very basic functionality of the cache. The reason no further testing is done is
|
||||
/// because it would be intruding into the private implementation, instead of just unit testing.
|
||||
/// </summary>
|
||||
public class ContentLocationDatabaseCacheTests : TestBase
|
||||
{
|
||||
protected readonly MemoryClock Clock = new MemoryClock();
|
||||
|
||||
/// <summary>
|
||||
/// Notice that a <see cref="MemoryContentLocationDatabaseConfiguration"/> is used. This is on purpose, to
|
||||
/// avoid dealing with RocksDB.
|
||||
/// </summary>
|
||||
protected ContentLocationDatabaseConfiguration DefaultConfiguration { get; } = new MemoryContentLocationDatabaseConfiguration
|
||||
{
|
||||
ContentCacheEnabled = true,
|
||||
// These ensure no flushing happens unless explicitly directed
|
||||
CacheFlushingMaximumInterval = Timeout.InfiniteTimeSpan,
|
||||
CacheMaximumUpdatesPerFlush = -1
|
||||
};
|
||||
|
||||
public ContentLocationDatabaseCacheTests(ITestOutputHelper output = null)
|
||||
: base(TestGlobal.Logger, output)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
private async Task RunTest(Action<OperationContext, ContentLocationDatabase> action) => await RunCustomTest(DefaultConfiguration, action);
|
||||
|
||||
private async Task RunCustomTest(ContentLocationDatabaseConfiguration configuration, Action<OperationContext, ContentLocationDatabase> action)
|
||||
{
|
||||
var tracingContext = new Context(TestGlobal.Logger);
|
||||
var operationContext = new OperationContext(tracingContext);
|
||||
|
||||
var database = ContentLocationDatabase.Create(Clock, configuration, () => new MachineId[] { });
|
||||
await database.StartupAsync(operationContext).ShouldBeSuccess();
|
||||
database.SetDatabaseMode(isDatabaseWriteable: true);
|
||||
|
||||
action(operationContext, database);
|
||||
|
||||
await database.ShutdownAsync(operationContext).ShouldBeSuccess();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task ReadMyWrites()
|
||||
{
|
||||
return RunTest((context, database) =>
|
||||
{
|
||||
var machine = new MachineId(1);
|
||||
var hash = new ShortHash(ContentHash.Random());
|
||||
|
||||
database.LocationAdded(context, hash, machine, 200);
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
|
||||
database.TryGetEntry(context, hash, out var entry).Should().BeTrue();
|
||||
entry.ContentSize.Should().Be(200);
|
||||
entry.Locations.Count.Should().Be(1);
|
||||
entry.Locations[machine].Should().BeTrue();
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(1);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task ReadMyDeletes()
|
||||
{
|
||||
return RunTest((context, database) =>
|
||||
{
|
||||
var machine = new MachineId(1);
|
||||
var hash = new ShortHash(ContentHash.Random());
|
||||
|
||||
database.LocationAdded(context, hash, machine, 200);
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
|
||||
database.LocationRemoved(context, hash, machine);
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(1);
|
||||
|
||||
database.TryGetEntry(context, hash, out var entry).Should().BeFalse();
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(2);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task SubsequentWrites()
|
||||
{
|
||||
return RunTest((context, database) =>
|
||||
{
|
||||
var machine = new MachineId(1);
|
||||
var machine2 = new MachineId(2);
|
||||
var hash = new ShortHash(ContentHash.Random());
|
||||
|
||||
database.LocationAdded(context, hash, machine, 200);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
|
||||
database.LocationAdded(context, hash, machine2, 200);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(1);
|
||||
|
||||
database.TryGetEntry(context, hash, out var entry).Should().BeTrue();
|
||||
entry.ContentSize.Should().Be(200);
|
||||
entry.Locations.Count.Should().Be(2);
|
||||
entry.Locations[machine].Should().BeTrue();
|
||||
entry.Locations[machine2].Should().BeTrue();
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(2);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task SizeChangeOverwrites()
|
||||
{
|
||||
return RunTest((context, database) =>
|
||||
{
|
||||
var machine = new MachineId(1);
|
||||
var machine2 = new MachineId(2);
|
||||
var hash = new ShortHash(ContentHash.Random());
|
||||
|
||||
database.LocationAdded(context, hash, machine, 200);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
|
||||
database.LocationAdded(context, hash, machine2, 400);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(1);
|
||||
|
||||
database.TryGetEntry(context, hash, out var entry).Should().BeTrue();
|
||||
entry.ContentSize.Should().Be(400);
|
||||
entry.Locations.Count.Should().Be(2);
|
||||
entry.Locations[machine].Should().BeTrue();
|
||||
entry.Locations[machine2].Should().BeTrue();
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(2);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task DeleteUnknownDoesNotFailOrModify()
|
||||
{
|
||||
return RunTest((context, database) =>
|
||||
{
|
||||
var machine = new MachineId(1);
|
||||
var hash = new ShortHash(ContentHash.Random());
|
||||
|
||||
database.LocationRemoved(context, hash, machine);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
|
||||
database.TryGetEntry(context, hash, out var entry).Should().BeFalse();
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(2);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task FlushSyncsToStorage()
|
||||
{
|
||||
return RunTest((context, database) =>
|
||||
{
|
||||
var machine = new MachineId(1);
|
||||
var hash = new ShortHash(ContentHash.Random());
|
||||
|
||||
database.LocationAdded(context, hash, machine, 200);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
|
||||
database.ForceCacheFlush(context).Should().BeTrue();
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheFlushes].Value.Should().Be(1);
|
||||
|
||||
database.TryGetEntry(context, hash, out var entry).Should().BeTrue();
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheMiss].Value.Should().Be(2);
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheHit].Value.Should().Be(0);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task PartialFlushingWorks()
|
||||
{
|
||||
ContentLocationDatabaseConfiguration configuration = new MemoryContentLocationDatabaseConfiguration
|
||||
{
|
||||
ContentCacheEnabled = true,
|
||||
// These ensure no flushing happens unless explicitly directed
|
||||
CacheFlushingMaximumInterval = Timeout.InfiniteTimeSpan,
|
||||
CacheMaximumUpdatesPerFlush = -1,
|
||||
FlushPreservePercentInMemory = 0.5,
|
||||
};
|
||||
|
||||
return RunCustomTest(configuration, (context, database) =>
|
||||
{
|
||||
// Setup small test DB
|
||||
foreach (var _ in Enumerable.Range(0, 100))
|
||||
{
|
||||
database.LocationAdded(context, new ShortHash(ContentHash.Random()), new MachineId(1), 200);
|
||||
}
|
||||
|
||||
database.ForceCacheFlush(context, blocking: true).Should().BeTrue();
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheFlushes].Value.Should().Be(1);
|
||||
database.Counters[ContentLocationDatabaseCounters.NumberOfPersistedEntries].Value.Should().Be(100);
|
||||
|
||||
// The second flush will discard the flushing cache, and we haven't added anything in-between
|
||||
database.ForceCacheFlush(context).Should().BeTrue();
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheFlushes].Value.Should().Be(2);
|
||||
database.Counters[ContentLocationDatabaseCounters.NumberOfPersistedEntries].Value.Should().Be(100);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public Task FlushingCountsCorrectly()
|
||||
{
|
||||
ContentLocationDatabaseConfiguration configuration = new MemoryContentLocationDatabaseConfiguration
|
||||
{
|
||||
ContentCacheEnabled = true,
|
||||
// These ensure no flushing happens unless explicitly directed
|
||||
CacheFlushingMaximumInterval = Timeout.InfiniteTimeSpan,
|
||||
CacheMaximumUpdatesPerFlush = 5,
|
||||
FlushPreservePercentInMemory = 0.5,
|
||||
};
|
||||
|
||||
return RunCustomTest(configuration, async (context, database) =>
|
||||
{
|
||||
// If the counting is done correctly, then writing 5 times to the same entry should trigger a single flush
|
||||
var shortHash = new ShortHash(ContentHash.Random());
|
||||
foreach (var i in Enumerable.Range(1, 4))
|
||||
{
|
||||
database.LocationAdded(context, shortHash, MachineId.FromIndex(i), 200);
|
||||
}
|
||||
|
||||
database.CacheUpdatesSinceLastFlush.Should().Be(4);
|
||||
database.LocationAdded(context, shortHash, new MachineId(5), 200);
|
||||
await database.FlushTask;
|
||||
|
||||
database.Counters[ContentLocationDatabaseCounters.TotalNumberOfCacheFlushes].Value.Should().Be(1);
|
||||
database.CacheUpdatesSinceLastFlush.Should().Be(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -524,31 +524,6 @@ namespace BuildXL.Cache.Host.Configuration
|
|||
[Validation.Range(1, int.MaxValue)]
|
||||
public int? ContentLocationDatabaseEntryTimeToLiveMinutes { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public bool? ContentLocationDatabaseCacheEnabled { get; set; }
|
||||
|
||||
[DataMember]
|
||||
[Validation.Range(1, int.MaxValue)]
|
||||
public int? ContentLocationDatabaseFlushDegreeOfParallelism { get; set; }
|
||||
|
||||
[DataMember]
|
||||
[Validation.Range(1, int.MaxValue)]
|
||||
public int? ContentLocationDatabaseFlushTransactionSize { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public bool? ContentLocationDatabaseFlushSingleTransaction { get; set; }
|
||||
|
||||
[DataMember]
|
||||
[Validation.Range(0, 1)]
|
||||
public double? ContentLocationDatabaseFlushPreservePercentInMemory { get; set; }
|
||||
|
||||
[DataMember]
|
||||
[Validation.Range(1, int.MaxValue)]
|
||||
public int? ContentLocationDatabaseCacheMaximumUpdatesPerFlush { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public TimeSpan? ContentLocationDatabaseCacheFlushingMaximumInterval { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public bool ContentLocationDatabaseLogsBackupEnabled { get; set; }
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
|
|||
using BuildXL.Cache.ContentStore.Interfaces.Logging;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Results;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Time;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
|
||||
using BuildXL.Cache.ContentStore.Stores;
|
||||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
|
@ -30,7 +29,6 @@ using BuildXL.Cache.Host.Configuration;
|
|||
using BuildXL.Cache.MemoizationStore.Distributed.Stores;
|
||||
using BuildXL.Cache.MemoizationStore.Interfaces.Stores;
|
||||
using Microsoft.Practices.TransientFaultHandling;
|
||||
using Microsoft.WindowsAzure.Storage.Auth;
|
||||
using static BuildXL.Cache.Host.Service.Internal.ConfigurationHelper;
|
||||
|
||||
namespace BuildXL.Cache.Host.Service.Internal
|
||||
|
@ -217,14 +215,6 @@ namespace BuildXL.Cache.Host.Service.Internal
|
|||
dbConfig.MetadataGarbageCollectionMaximumNumberOfEntriesToKeep = _distributedSettings.MaximumNumberOfMetadataEntriesToStore;
|
||||
}
|
||||
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseCacheEnabled, v => dbConfig.ContentCacheEnabled = v);
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseFlushDegreeOfParallelism, v => dbConfig.FlushDegreeOfParallelism = v);
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseFlushTransactionSize, v => dbConfig.FlushTransactionSize = v);
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseFlushSingleTransaction, v => dbConfig.FlushSingleTransaction = v);
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseFlushPreservePercentInMemory, v => dbConfig.FlushPreservePercentInMemory = v);
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseCacheMaximumUpdatesPerFlush, v => dbConfig.CacheMaximumUpdatesPerFlush = v);
|
||||
ApplyIfNotNull(_distributedSettings.ContentLocationDatabaseCacheFlushingMaximumInterval, v => dbConfig.CacheFlushingMaximumInterval = v);
|
||||
|
||||
ApplyIfNotNull(
|
||||
_distributedSettings.FullRangeCompactionIntervalMinutes,
|
||||
v => dbConfig.FullRangeCompactionInterval = TimeSpan.FromMinutes(v));
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the MIT License.
|
||||
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using BuildXL.Cache.ContentStore.Interfaces.Distributed;
|
||||
using BuildXL.Cache.Host.Configuration;
|
||||
using FluentAssertions;
|
||||
|
@ -91,23 +92,26 @@ namespace BuildXL.Cache.Host.Test
|
|||
{
|
||||
var settings = DistributedContentSettings.CreateDisabled();
|
||||
|
||||
settings.ContentLocationDatabaseFlushPreservePercentInMemory = -1;
|
||||
settings.BlobExpiryTimeMinutes = -1;
|
||||
var errors = settings.Validate();
|
||||
errors.Count.Should().Be(1);
|
||||
errors[0].Should().Contain(nameof(settings.ContentLocationDatabaseFlushPreservePercentInMemory));
|
||||
errors[0].Should().Contain(nameof(settings.BlobExpiryTimeMinutes));
|
||||
|
||||
settings.ContentLocationDatabaseFlushPreservePercentInMemory = 1.1F;
|
||||
settings = DistributedContentSettings.CreateDisabled();
|
||||
settings.MinimumSpeedInMbPerSec = -1F;
|
||||
errors = settings.Validate();
|
||||
errors.Count.Should().Be(1);
|
||||
errors[0].Should().Contain(nameof(settings.ContentLocationDatabaseFlushPreservePercentInMemory));
|
||||
errors[0].Should().Contain(nameof(settings.MinimumSpeedInMbPerSec));
|
||||
|
||||
settings.ContentLocationDatabaseFlushPreservePercentInMemory = 0;
|
||||
settings = DistributedContentSettings.CreateDisabled();
|
||||
settings.CentralStoragePropagationDelaySeconds = 0;
|
||||
errors = settings.Validate();
|
||||
errors.Should().BeEmpty();
|
||||
errors.Should().BeEmpty(string.Join(", ", errors));
|
||||
|
||||
settings.ContentLocationDatabaseFlushPreservePercentInMemory = 1;
|
||||
settings = DistributedContentSettings.CreateDisabled();
|
||||
settings.BandwidthCheckIntervalSeconds = 1;
|
||||
errors = settings.Validate();
|
||||
errors.Should().BeEmpty();
|
||||
errors.Should().BeEmpty(string.Join(", ", errors));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2679,7 +2679,7 @@
|
|||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "RuntimeContracts",
|
||||
"Version": "0.1.9.1"
|
||||
"Version": "0.1.10"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
@ -70,7 +70,7 @@ config({
|
|||
{ id: "CLAP", version: "4.6" },
|
||||
{ id: "CLAP-DotNetCore", version: "4.6" },
|
||||
|
||||
{ id: "RuntimeContracts", version: "0.1.9.1" },
|
||||
{ id: "RuntimeContracts", version: "0.1.10" },
|
||||
|
||||
{ id: "Microsoft.NETFramework.ReferenceAssemblies.net451", version: "1.0.0-alpha-5", osSkip: [ "macOS" ]},
|
||||
{ id: "Microsoft.NETFramework.ReferenceAssemblies.net461", version: "1.0.0-alpha-5", osSkip: [ "macOS" ]},
|
||||
|
|
Загрузка…
Ссылка в новой задаче