Merged PR 707482: Don't publish recompute information globally

Don't publish recompute information globally. Also removes the concept of a LocationAddRecentInactiveEager. That feature has been useless for months because machines are filtered out at both local and global, so doing a proactive global registration doesn't help.
This commit is contained in:
Julian Bayardo 2023-03-24 01:24:11 +00:00
Родитель b7431b1efb
Коммит fe43a562a6
10 изменённых файлов: 98 добавлений и 169 удалений

Просмотреть файл

@ -1,19 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.Linq;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Extensions;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Secrets;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Cache.Host.Configuration;
#nullable enable
@ -54,36 +51,40 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
public Task<Result<RegisterMachineOutput>> RegisterMachinesAsync(OperationContext context, RegisterMachineInput request)
{
return context.PerformOperationAsync(Tracer, async () =>
{
var (currentState, assignedMachineIds) = await _storage.ReadModifyWriteAsync<ClusterStateMachine, MachineId[]>(context, new BlobPath(_configuration.FileName, relative: true), currentState =>
return context.PerformOperationAsync(
Tracer,
async () =>
{
var now = _clock.UtcNow;
MachineId[] assignedMachineIds = new MachineId[request.MachineLocations.Count];
foreach (var (item, index) in request.MachineLocations.AsIndexed())
{
if (currentState.TryResolveMachineId(item, out var machineId))
var (currentState, assignedMachineIds) = await _storage.ReadModifyWriteAsync<ClusterStateMachine, MachineId[]>(
context,
new BlobPath(_configuration.FileName, relative: true),
currentState =>
{
assignedMachineIds[index] = machineId;
}
else
{
(currentState, assignedMachineIds[index]) = currentState.RegisterMachine(item, now);
}
}
var now = _clock.UtcNow;
currentState = currentState.Recompute(_configuration.RecomputeConfiguration, now);
return (currentState, assignedMachineIds);
}).ThrowIfFailureAsync();
MachineId[] assignedMachineIds = new MachineId[request.MachineLocations.Count];
foreach (var (item, index) in request.MachineLocations.AsIndexed())
{
if (currentState.TryResolveMachineId(item, out var machineId))
{
assignedMachineIds[index] = machineId;
}
else
{
(currentState, assignedMachineIds[index]) = currentState.RegisterMachine(item, now);
}
}
var machineMappings = request.MachineLocations
.Zip(assignedMachineIds, (machineLocation, machineId) => new MachineMapping(machineId, machineLocation))
.ToArray();
return (currentState, assignedMachineIds);
}).ThrowIfFailureAsync();
return Result.Success(new RegisterMachineOutput(currentState, machineMappings));
},
traceOperationStarted: false);
var machineMappings = request.MachineLocations
.Zip(assignedMachineIds, (machineLocation, machineId) => new MachineMapping(machineId, machineLocation))
.ToArray();
return Result.Success(new RegisterMachineOutput(currentState, machineMappings));
},
traceOperationStarted: false);
}
public record HeartbeatInput(IReadOnlyList<MachineId> MachineIds, MachineState MachineState);
@ -92,35 +93,49 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
public Task<Result<HeartbeatOutput>> HeartbeatAsync(OperationContext context, HeartbeatInput request)
{
return context.PerformOperationAsync(Tracer, async () =>
{
var (currentState, priorMachineRecords) = await _storage.ReadModifyWriteAsync<ClusterStateMachine, MachineRecord[]>(context, new BlobPath(_configuration.FileName, relative: true), currentState =>
return context.PerformOperationAsync(
Tracer,
async () =>
{
var now = _clock.UtcNow;
var (currentState, priorMachineRecords) = await _storage.ReadModifyWriteAsync<ClusterStateMachine, MachineRecord[]>(
context,
new BlobPath(_configuration.FileName, relative: true),
currentState =>
{
var now = _clock.UtcNow;
var priorMachineRecords = new MachineRecord[request.MachineIds.Count];
foreach (var entry in request.MachineIds.AsIndexed())
{
(currentState, priorMachineRecords[entry.Index]) = currentState.Heartbeat(entry.Item, now, request.MachineState).ThrowIfFailure();
}
var priorMachineRecords = new MachineRecord[request.MachineIds.Count];
foreach (var entry in request.MachineIds.AsIndexed())
{
(currentState, priorMachineRecords[entry.Index]) =
currentState.Heartbeat(entry.Item, now, request.MachineState).ThrowIfFailure();
}
currentState = currentState.Recompute(_configuration.RecomputeConfiguration, now);
return (currentState, priorMachineRecords);
}).ThrowIfFailureAsync();
return (currentState, priorMachineRecords);
}).ThrowIfFailureAsync();
return Result.Success(new HeartbeatOutput(currentState, priorMachineRecords));
},
traceOperationStarted: false);
return Result.Success(new HeartbeatOutput(TransitionInactiveMachines(currentState), priorMachineRecords));
},
traceOperationStarted: false);
}
public Task<Result<ClusterStateMachine>> ReadState(OperationContext context)
{
return context.PerformOperationAsync(Tracer, () =>
{
return _storage.ReadAsync<ClusterStateMachine>(context, new BlobPath(_configuration.FileName, relative: true));
},
traceOperationStarted: false);
return context.PerformOperationAsync(
Tracer,
async () =>
{
var currentState = await _storage.ReadAsync<ClusterStateMachine>(context, new BlobPath(_configuration.FileName, relative: true))
.ThrowIfFailureAsync();
return Result.Success(TransitionInactiveMachines(currentState));
},
traceOperationStarted: false);
}
private ClusterStateMachine TransitionInactiveMachines(ClusterStateMachine currentState)
{
return currentState.TransitionInactiveMachines(_configuration.RecomputeConfiguration, _clock.UtcNow);
}
}
}

Просмотреть файл

@ -9,8 +9,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
public record ClusterStateRecomputeConfiguration
{
public TimeSpan RecomputeFrequency { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan ActiveToClosedInterval { get; set; } = TimeSpan.FromMinutes(10);
public TimeSpan ActiveToDeadExpiredInterval { get; set; } = TimeSpan.FromHours(1);

Просмотреть файл

@ -23,8 +23,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
internal const MachineState InitialState = MachineState.Open;
public DateTime LastStateMachineRecomputeTimeUtc { get; init; } = DateTime.MinValue;
// Machine IDs have historically been assigned from 1 onwards as an implementation detail. Thus, 0 has been
// deemed to be an invalid machine ID, and is used as such in some parts of the code. This code keeps the
// convention to avoid making major changes.
@ -70,13 +68,17 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
DateTime nowUtc,
MachineState state)
{
Contract.Requires(state != MachineState.Unknown, $"Can't register machine ID `{machineId}` for location `{location}` with initial state `{state}`");
Contract.Requires(
state != MachineState.Unknown,
$"Can't register machine ID `{machineId}` for location `{location}` with initial state `{state}`");
if (machineId.Index < NextMachineId)
{
if (TryResolve(machineId, out var assignedLocation))
{
Contract.Assert(assignedLocation.Equals(location), $"Machine id `{machineId}` has already been allocated to location `{assignedLocation}` and so can't be allocated to `{location}`");
Contract.Assert(
assignedLocation.Equals(location),
$"Machine id `{machineId}` has already been allocated to location `{assignedLocation}` and so can't be allocated to `{location}`");
// Heartbeat can only fail if the machine ID doesn't exist, and we know it does
return Heartbeat(machineId, nowUtc, state).ThrowIfFailure().Next;
@ -85,13 +87,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
var records = Records.ToList();
records.Add(new MachineRecord()
{
Id = machineId,
Location = location,
State = state,
LastHeartbeatTimeUtc = nowUtc,
});
records.Add(
new MachineRecord()
{
Id = machineId, Location = location, State = state, LastHeartbeatTimeUtc = nowUtc,
});
// We sort this list in order to ensure it is easy on the eyes when we need to manually inspect it
records.Sort((a, b) => a.Id.Index.CompareTo(b.Id.Index));
@ -103,19 +103,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
var records = Records.ToList();
records.Add(new MachineRecord()
{
Id = machineId,
Location = location,
State = state,
LastHeartbeatTimeUtc = nowUtc,
});
records.Add(
new MachineRecord()
{
Id = machineId, Location = location, State = state, LastHeartbeatTimeUtc = nowUtc,
});
return this with
{
NextMachineId = Math.Max(machineId.Index + 1, NextMachineId + 1),
Records = records,
};
return this with { NextMachineId = Math.Max(machineId.Index + 1, NextMachineId + 1), Records = records, };
}
}
@ -125,11 +119,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
// This will only happen when operating in DistributedContentConsumerOnly mode. These machines get
// registered with an invalid machine ID, so this is the expected response of heartbeat.
return Result.Success((Next: this, Previous: new MachineRecord()
{
State = state,
LastHeartbeatTimeUtc = nowUtc,
}));
return Result.Success((Next: this, Previous: new MachineRecord() { State = state, LastHeartbeatTimeUtc = nowUtc, }));
}
var records = new List<MachineRecord>(capacity: Records.Count);
@ -149,30 +139,22 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
if (previous is null)
{
return Result.FromErrorMessage<(ClusterStateMachine Next, MachineRecord Previous)>($"Failed to find machine id `{machineId}` in records");
return Result.FromErrorMessage<(ClusterStateMachine Next, MachineRecord Previous)>(
$"Failed to find machine id `{machineId}` in records");
}
return Result.Success((Next: this with { Records = records }, Previous: previous));
}
public ClusterStateMachine Recompute(ClusterStateRecomputeConfiguration configuration, DateTime nowUtc)
public ClusterStateMachine TransitionInactiveMachines(ClusterStateRecomputeConfiguration configuration, DateTime nowUtc)
{
if (configuration.RecomputeFrequency != TimeSpan.Zero && LastStateMachineRecomputeTimeUtc.IsRecent(nowUtc, configuration.RecomputeFrequency))
{
return this;
}
var records = new List<MachineRecord>(capacity: Records.Count);
foreach (var record in Records)
{
records.Add(ChangeMachineStateIfNeeded(configuration, nowUtc, record));
}
return this with
{
LastStateMachineRecomputeTimeUtc = nowUtc,
Records = records,
};
return this with { Records = records, };
}
private MachineRecord ChangeMachineStateIfNeeded(
@ -212,7 +194,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
break;
}
default:
throw new NotImplementedException($"Attempt to transition machine record `{current}` failed because the state `{current.State}` is unknown");
throw new NotImplementedException(
$"Attempt to transition machine record `{current}` failed because the state `{current.State}` is unknown");
}
return current;

Просмотреть файл

@ -62,9 +62,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
[CounterType(CounterType.Stopwatch)]
BackgroundTouchBulk,
/// <nodoc />
LocationAddRecentInactiveEager,
/// <nodoc />
LocationAddRecentRemoveEager,

Просмотреть файл

@ -1116,7 +1116,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
EagerGlobal,
EagerGlobalOnPut,
RecentInactiveEagerGlobal,
RecentRemoveEagerGlobal,
LazyEventOnly,
LazyTouchEventOnly,
@ -1138,7 +1137,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
case RegisterAction.EagerGlobal:
case RegisterAction.EagerGlobalOnPut:
case RegisterAction.RecentInactiveEagerGlobal:
case RegisterAction.RecentRemoveEagerGlobal:
return RegisterCoreAction.Global;
case RegisterAction.LazyEventOnly:
@ -1162,14 +1160,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
return RegisterAction.RecentRemoveEagerGlobal;
}
if (ClusterState.LastInactiveTime.IsRecent(now, Configuration.MachineStateRecomputeInterval.Multiply(5)))
{
// The machine was recently inactive. We should eagerly register content for some amount of time (a few heartbeats) because content may be currently filtered from other machines
// local db results due to inactive machines filter.
Counters[ContentLocationStoreCounters.LocationAddRecentInactiveEager].Increment();
return RegisterAction.RecentInactiveEagerGlobal;
}
if (_recentlyAddedHashes.Contains(hash))
{
// Content was recently added for the machine by a prior operation

Просмотреть файл

@ -2451,8 +2451,6 @@ namespace ContentStoreTest.Distributed.Sessions
// Insert random file in session 0
var putResult0 = await workerSession.PutRandomAsync(context, ContentHashType, false, ContentByteCount, Token).ShouldBeSuccess();
worker0.LocalLocationStore.Counters[ContentLocationStoreCounters.LocationAddRecentInactiveEager].Value.Should().Be(0);
var masterResult = await master.GetBulkAsync(
context,
new[] { putResult0.ContentHash },
@ -2490,8 +2488,6 @@ namespace ContentStoreTest.Distributed.Sessions
// Insert random file in session 0
var putResult1 = await workerSession.PutRandomAsync(context, ContentHashType, false, ContentByteCount, Token).ShouldBeSuccess();
worker0.LocalLocationStore.Counters[ContentLocationStoreCounters.LocationAddRecentInactiveEager].Value.Should().Be(0, "New content shouldn't eagerly go to global store because RemoveFromTracker keeps the machine available");
var worker1GlobalResult = await worker1.GetBulkAsync(
context,
new[] { putResult1.ContentHash },
@ -3054,8 +3050,6 @@ namespace ContentStoreTest.Distributed.Sessions
ctx,
MachineState.Unknown).ShouldBeSuccess();
workerState = (await worker.LocalLocationStore.SetOrGetMachineStateAsync(ctx, MachineState.Unknown)).ShouldBeSuccess().Value;
workerState.Should().Be(MachineState.DeadExpired);
master.LocalLocationStore.ClusterState.ClosedMachines.Contains(workerPrimaryMachineId).Should().BeFalse();
master.LocalLocationStore.ClusterState.InactiveMachines.Contains(workerPrimaryMachineId).Should().BeTrue();
});
@ -3102,9 +3096,6 @@ namespace ContentStoreTest.Distributed.Sessions
MachineState.Unknown).ShouldBeSuccess();
master.LocalLocationStore.ClusterState.ClosedMachines.Contains(workerPrimaryMachineId).Should().BeTrue();
master.LocalLocationStore.ClusterState.InactiveMachines.Contains(workerPrimaryMachineId).Should().BeFalse();
workerState = (await worker.LocalLocationStore.SetOrGetMachineStateAsync(ctx, MachineState.Unknown)).ShouldBeSuccess().Value;
workerState.Should().Be(MachineState.Closed);
});
}

Просмотреть файл

@ -552,9 +552,6 @@ namespace ContentStoreTest.Distributed.Sessions
{
configuration.InlinePostInitialization = true;
// Set recompute time to zero to force recomputation on every heartbeat
configuration.MachineStateRecomputeInterval = TimeSpan.Zero;
if (!_tests.UseRealEventHub)
{
// Propagate epoch from normal configuration to in-memory configuration

Просмотреть файл

@ -28,13 +28,14 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation.NuCache
machineId.Index.Should().Be(MachineId.MinValue);
var record = clusterState.GetStatus(machineId).ThrowIfFailure();
record.Should().BeEquivalentTo(new MachineRecord()
{
Id = new MachineId(MachineId.MinValue),
Location = machineLocation,
State = ClusterStateMachine.InitialState,
LastHeartbeatTimeUtc = nowUtc,
});
record.Should().BeEquivalentTo(
new MachineRecord()
{
Id = new MachineId(MachineId.MinValue),
Location = machineLocation,
State = ClusterStateMachine.InitialState,
LastHeartbeatTimeUtc = nowUtc,
});
}
[Fact]
@ -117,48 +118,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation.NuCache
r.State.Should().Be(MachineState.Open);
}
[Fact]
public void HeartbeatDoesntChangeRecomputeTime()
{
var clusterState = new ClusterStateMachine();
MachineId n1Id;
(clusterState, n1Id) = clusterState.RegisterMachine(new MachineLocation("node1"), _clock.UtcNow);
_clock.Increment(TimeSpan.FromMinutes(1));
clusterState = clusterState.Heartbeat(n1Id, _clock.UtcNow, MachineState.Open).ThrowIfFailure().Next;
clusterState.LastStateMachineRecomputeTimeUtc.Should().Be(DateTime.MinValue);
}
[Fact]
public void RecomputeDoesntRunIfNotNeeded()
{
var now = _clock.UtcNow;
var current = new ClusterStateMachine()
{
LastStateMachineRecomputeTimeUtc = now,
};
var cfg = new ClusterStateRecomputeConfiguration()
{
RecomputeFrequency = TimeSpan.FromMilliseconds(1),
};
var next = current.Recompute(cfg, now);
next.Should().BeEquivalentTo(current);
}
[Fact]
public void RecomputeChangesStatesAsExpected()
{
var clusterState = new ClusterStateMachine();
var cfg = new ClusterStateRecomputeConfiguration()
{
// Force recompute to run
RecomputeFrequency = TimeSpan.Zero,
};
var cfg = new ClusterStateRecomputeConfiguration();
var nowUtc = _clock.UtcNow;
@ -194,7 +158,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test.ContentLocation.NuCache
MachineId n7Id;
(clusterState, n7Id) = clusterState.ForceRegisterMachineWithState(n7, nowUtc, MachineState.Closed);
clusterState = clusterState.Recompute(cfg, nowUtc);
clusterState = clusterState.TransitionInactiveMachines(cfg, nowUtc);
clusterState.GetStatus(n1Id).ThrowIfFailure().State.Should().Be(MachineState.DeadUnavailable);
clusterState.GetStatus(n2Id).ThrowIfFailure().State.Should().Be(MachineState.DeadExpired);

Просмотреть файл

@ -761,10 +761,6 @@ namespace BuildXL.Cache.Host.Configuration
[Validation.Range(0, double.MaxValue)]
public double? MaxProcessingDelayToReconcileMinutes { get; set; }
[DataMember]
[Validation.Range(1, double.MaxValue)]
public double? MachineStateRecomputeIntervalMinutes { get; set; }
[DataMember]
[Validation.Range(1, double.MaxValue)]
public double? MachineActiveToClosedIntervalMinutes { get; set; }

Просмотреть файл

@ -198,7 +198,6 @@ namespace BuildXL.Cache.Host.Service.Internal
ApplyIfNotNull(_distributedSettings.ReplicaCreditInMinutes, v => result.ContentLifetime = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineRisk, v => result.MachineRisk = v);
ApplyIfNotNull(_distributedSettings.LocationEntryExpiryMinutes, v => result.LocationEntryExpiry = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineStateRecomputeIntervalMinutes, v => result.MachineStateRecomputeInterval = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineActiveToClosedIntervalMinutes, v => result.MachineActiveToClosedInterval = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineActiveToExpiredIntervalMinutes, v => result.MachineActiveToExpiredInterval = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.TouchFrequencyMinutes, v => result.TouchFrequency = TimeSpan.FromMinutes(v));
@ -656,7 +655,6 @@ namespace BuildXL.Cache.Host.Service.Internal
ApplyIfNotNull(_distributedSettings.BlobClusterStateStorageRetryPolicy, v => blobClusterStateStorageConfiguration.RetryPolicy = v);
var gcCfg = new ClusterStateRecomputeConfiguration();
ApplyIfNotNull(_distributedSettings.MachineStateRecomputeIntervalMinutes, v => gcCfg.RecomputeFrequency = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineActiveToClosedIntervalMinutes, v => gcCfg.ActiveToClosedInterval = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineActiveToExpiredIntervalMinutes, v => gcCfg.ActiveToDeadExpiredInterval = TimeSpan.FromMinutes(v));
ApplyIfNotNull(_distributedSettings.MachineActiveToExpiredIntervalMinutes, v => gcCfg.ClosedToDeadExpiredInterval = TimeSpan.FromMinutes(v));