[Issue 9195] Evict silos from cluster if they remain in the Joining or Created state for longer than MaxJoinAttemptTime (#9201)

This commit is contained in:
Chris Eckhardt 2024-11-07 16:07:41 -05:00 коммит произвёл GitHub
Родитель 186e5f80a1
Коммит f086d1b98c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 296 добавлений и 59 удалений

Просмотреть файл

@ -125,5 +125,10 @@ namespace Orleans.Configuration
/// Gets or sets a value indicating whether to enable probing silos indirectly, via other silos.
/// </summary>
public bool EnableIndirectProbes { get; set; } = true;
/// <summary>
/// Gets or sets a value indicating whether to enable membership eviction of silos when in a state of `Joining` or `Created` for longer than MaxJoinAttemptTime
/// </summary>
public bool EvictWhenMaxJoinAttemptTimeExceeded { get; set; } = true;
}
}

Просмотреть файл

@ -78,7 +78,14 @@ namespace Orleans.Runtime.MembershipService
if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Starting to process membership updates");
await foreach (var tableSnapshot in this.membershipService.MembershipTableUpdates.WithCancellation(this.shutdownCancellation.Token))
{
var newMonitoredSilos = this.UpdateMonitoredSilos(tableSnapshot, this.monitoredSilos, DateTime.UtcNow);
var utcNow = DateTime.UtcNow;
var newMonitoredSilos = this.UpdateMonitoredSilos(tableSnapshot, this.monitoredSilos, utcNow);
if (this.clusterMembershipOptions.CurrentValue.EvictWhenMaxJoinAttemptTimeExceeded)
{
await this.EvictStaleStateSilos(tableSnapshot, utcNow);
}
foreach (var pair in this.monitoredSilos)
{
@ -103,6 +110,45 @@ namespace Orleans.Runtime.MembershipService
}
}
private async Task EvictStaleStateSilos(
MembershipTableSnapshot membership,
DateTime utcNow)
{
foreach (var member in membership.Entries)
{
if (IsCreatedOrJoining(member.Value.Status)
&& HasExceededMaxJoinTime(
startTime: member.Value.StartTime,
now: utcNow,
maxJoinTime: this.clusterMembershipOptions.CurrentValue.MaxJoinAttemptTime))
{
try
{
if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Stale silo with a joining or created state found, calling `TryToSuspectOrKill`");
await this.membershipService.TryToSuspectOrKill(member.Key);
}
catch(Exception exception)
{
log.LogError(
exception,
"Silo {suspectAddress} has had the status `{siloStatus}` for longer than `MaxJoinAttemptTime` but a call to `TryToSuspectOrKill` has failed",
member.Value.SiloAddress,
member.Value.Status.ToString());
}
}
}
static bool IsCreatedOrJoining(SiloStatus status)
{
return status == SiloStatus.Created || status == SiloStatus.Joining;
}
static bool HasExceededMaxJoinTime(DateTime startTime, DateTime now, TimeSpan maxJoinTime)
{
return now > startTime.Add(maxJoinTime);
}
}
[Pure]
private ImmutableDictionary<SiloAddress, SiloHealthMonitor> UpdateMonitoredSilos(
MembershipTableSnapshot membership,

Просмотреть файл

@ -3,7 +3,6 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NonSilo.Tests.Utilities;
using NSubstitute;
using Orleans;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Runtime.MembershipService;
@ -103,6 +102,42 @@ namespace NonSilo.Tests.Membership
await ClusterHealthMonitor_BasicScenario_Runner(enableIndirectProbes: false, numVotesForDeathDeclaration: 1);
}
/// <summary>
/// Tests basic operation of <see cref="ClusterHealthMonitor"/> and <see cref="SiloHealthMonitor"/>, but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
/// </summary>
[Fact]
public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_OneVoteNeededToKill()
{
await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: true, numVotesForDeathDeclaration: 1);
}
/// <summary>
/// Tests basic operation of <see cref="ClusterHealthMonitor"/> and <see cref="SiloHealthMonitor"/>, but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
/// </summary>
[Fact]
public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_TwoVotesNeededToKill()
{
await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: true, numVotesForDeathDeclaration: 2);
}
/// <summary>
/// Tests basic operation of <see cref="ClusterHealthMonitor"/> and <see cref="SiloHealthMonitor"/>, but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
/// </summary>
[Fact]
public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_ThreeVotesNeededToKill()
{
await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: true, numVotesForDeathDeclaration: 3);
}
/// <summary>
/// Tests basic operation of <see cref="ClusterHealthMonitor"/> and <see cref="SiloHealthMonitor"/>, but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
/// </summary>
[Fact]
public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_Disabled()
{
await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: false, numVotesForDeathDeclaration: 3);
}
private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirectProbes, int? numVotesForDeathDeclaration = default)
{
var clusterMembershipOptions = new ClusterMembershipOptions
@ -115,19 +150,7 @@ namespace NonSilo.Tests.Membership
clusterMembershipOptions.NumVotesForDeathDeclaration = numVotesForDeathDeclaration.Value;
}
var manager = new MembershipTableManager(
localSiloDetails: this.localSiloDetails,
clusterMembershipOptions: Options.Create(clusterMembershipOptions),
membershipTable: membershipTable,
fatalErrorHandler: this.fatalErrorHandler,
gossiper: this.membershipGossiper,
log: this.loggerFactory.CreateLogger<MembershipTableManager>(),
timerFactory: new AsyncTimerFactory(this.loggerFactory),
this.lifecycle);
((ILifecycleParticipant<ISiloLifecycle>)manager).Participate(this.lifecycle);
var membershipService = Substitute.For<IClusterMembershipService>();
membershipService.CurrentSnapshot.ReturnsForAnyArgs(info => manager.MembershipTableSnapshot.CreateClusterMembershipSnapshot());
var testRig = CreateClusterHealthMonitorTestRig(clusterMembershipOptions);
var probeCalls = new ConcurrentQueue<(SiloAddress Target, int ProbeNumber, bool IsIndirect)>();
this.prober.Probe(default, default).ReturnsForAnyArgs(info =>
{
@ -145,31 +168,8 @@ namespace NonSilo.Tests.Membership
});
});
var optionsMonitor = Substitute.For<IOptionsMonitor<ClusterMembershipOptions>>();
optionsMonitor.CurrentValue.ReturnsForAnyArgs(clusterMembershipOptions);
var monitor = new ClusterHealthMonitor(
this.localSiloDetails,
manager,
this.loggerFactory.CreateLogger<ClusterHealthMonitor>(),
optionsMonitor,
this.fatalErrorHandler,
null);
((ILifecycleParticipant<ISiloLifecycle>)monitor).Participate(this.lifecycle);
var testAccessor = (ClusterHealthMonitor.ITestAccessor)monitor;
testAccessor.CreateMonitor = s => new SiloHealthMonitor(
s,
testAccessor.OnProbeResult,
optionsMonitor,
this.loggerFactory,
this.prober,
this.timerFactory,
this.localSiloHealthMonitor,
membershipService,
this.localSiloDetails);
await this.lifecycle.OnStart();
Assert.Empty(testAccessor.MonitoredSilos);
Assert.Empty(testRig.TestAccessor.MonitoredSilos);
var otherSilos = new[]
{
@ -184,7 +184,7 @@ namespace NonSilo.Tests.Membership
Entry(Silo("127.0.0.200:900@100"), SiloStatus.Active)
};
var lastVersion = testAccessor.ObservedVersion;
var lastVersion = testRig.TestAccessor.ObservedVersion;
// Add the new silos
var table = await this.membershipTable.ReadAll();
@ -194,25 +194,25 @@ namespace NonSilo.Tests.Membership
Assert.True(await this.membershipTable.InsertRow(entry, table.Version.Next()));
}
await manager.Refresh();
await testRig.Manager.Refresh();
await Until(() => testAccessor.ObservedVersion > lastVersion);
lastVersion = testAccessor.ObservedVersion;
await Until(() => testRig.TestAccessor.ObservedVersion > lastVersion);
lastVersion = testRig.TestAccessor.ObservedVersion;
// No silos should be monitored by this silo until it becomes active.
Assert.Empty(testAccessor.MonitoredSilos);
Assert.Empty(testRig.TestAccessor.MonitoredSilos);
await manager.UpdateStatus(SiloStatus.Active);
await testRig.Manager.UpdateStatus(SiloStatus.Active);
await Until(() => testAccessor.ObservedVersion > lastVersion);
lastVersion = testAccessor.ObservedVersion;
await Until(() => testRig.TestAccessor.ObservedVersion > lastVersion);
lastVersion = testRig.TestAccessor.ObservedVersion;
// Now that this silo is active, it should be monitoring some fraction of the other active silos
await Until(() => testAccessor.MonitoredSilos.Count > 0);
await Until(() => testRig.TestAccessor.MonitoredSilos.Count > 0);
Assert.NotEmpty(this.timers);
Assert.DoesNotContain(testAccessor.MonitoredSilos, s => s.Key.Equals(this.localSilo));
Assert.Equal(clusterMembershipOptions.NumProbedSilos, testAccessor.MonitoredSilos.Count);
Assert.All(testAccessor.MonitoredSilos, m => m.Key.Equals(m.Value.SiloAddress));
Assert.DoesNotContain(testRig.TestAccessor.MonitoredSilos, s => s.Key.Equals(this.localSilo));
Assert.Equal(clusterMembershipOptions.NumProbedSilos, testRig.TestAccessor.MonitoredSilos.Count);
Assert.All(testRig.TestAccessor.MonitoredSilos, m => m.Key.Equals(m.Value.SiloAddress));
Assert.Empty(probeCalls);
// Check that those silos are actually being probed periodically
@ -226,9 +226,9 @@ namespace NonSilo.Tests.Membership
return probeCalls.Count;
});
Assert.Equal(clusterMembershipOptions.NumProbedSilos, probeCalls.Count);
while (probeCalls.TryDequeue(out var call)) Assert.Contains(testAccessor.MonitoredSilos, k => k.Key.Equals(call.Item1));
while (probeCalls.TryDequeue(out var call)) Assert.Contains(testRig.TestAccessor.MonitoredSilos, k => k.Key.Equals(call.Item1));
var monitoredSilos = testAccessor.MonitoredSilos.Values.ToList();
var monitoredSilos = testRig.TestAccessor.MonitoredSilos.Values.ToList();
foreach (var siloMonitor in monitoredSilos)
{
Assert.Equal(0, ((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes);
@ -306,7 +306,7 @@ namespace NonSilo.Tests.Membership
return;
}
await manager.Refresh();
await testRig.Manager.Refresh();
// Make the probes succeed again.
this.prober.Probe(default, default).ReturnsForAnyArgs(info =>
@ -329,9 +329,9 @@ namespace NonSilo.Tests.Membership
while (probeCalls.TryDequeue(out _)) ;
// Wait for probes to be fired
this.output.WriteLine($"Firing probes for silos: {string.Join(", ", testAccessor.MonitoredSilos.Keys)}");
this.output.WriteLine($"Firing probes for silos: {string.Join(", ", testRig.TestAccessor.MonitoredSilos.Keys)}");
var probesReceived = new HashSet<SiloAddress>();
await UntilEqual(testAccessor.MonitoredSilos.Count, () =>
await UntilEqual(testRig.TestAccessor.MonitoredSilos.Count, () =>
{
if (this.timerCalls.TryDequeue(out var timer))
{
@ -346,7 +346,7 @@ namespace NonSilo.Tests.Membership
return probesReceived.Count;
});
foreach (var siloMonitor in testAccessor.MonitoredSilos.Values)
foreach (var siloMonitor in testRig.TestAccessor.MonitoredSilos.Values)
{
this.output.WriteLine($"Checking missed probes on {siloMonitor.SiloAddress}: {((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes}");
Assert.Equal(0, ((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes);
@ -355,9 +355,134 @@ namespace NonSilo.Tests.Membership
await StopLifecycle();
}
private async Task ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(bool evictWhenMaxJoinAttemptTimeExceeded = true, int? numVotesForDeathDeclaration = default)
{
var clusterMembershipOptions = new ClusterMembershipOptions
{
EvictWhenMaxJoinAttemptTimeExceeded = evictWhenMaxJoinAttemptTimeExceeded
};
if (numVotesForDeathDeclaration.HasValue)
{
clusterMembershipOptions.NumVotesForDeathDeclaration = numVotesForDeathDeclaration.Value;
}
var testRig = CreateClusterHealthMonitorTestRig(clusterMembershipOptions);
var otherSilos = new[]
{
Entry(Silo("127.0.0.200:100@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:200@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:300@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:400@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:500@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:600@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:700@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:800@100"), SiloStatus.Active),
Entry(Silo("127.0.0.200:900@100"), SiloStatus.Active)
};
var joiningSilo = "127.0.0.200:111@100";
var createdSilo = "127.0.0.200:112@100";
// default MaxJoinAttemptTime is 5 minutes, setting it to 6 minutes ago will make sure they are flagged immediately
var staleCreatedOrJoiningSilos = new[]
{
Entry(Silo(joiningSilo), SiloStatus.Joining, DateTime.UtcNow.AddMinutes(-6)),
Entry(Silo(createdSilo), SiloStatus.Created, DateTime.UtcNow.AddMinutes(-6)),
};
otherSilos = [.. otherSilos, .. staleCreatedOrJoiningSilos];
var lastVersion = testRig.TestAccessor.ObservedVersion;
// Add the new silos
var table = await this.membershipTable.ReadAll();
foreach (var entry in otherSilos)
{
table = await this.membershipTable.ReadAll();
Assert.True(await this.membershipTable.InsertRow(entry, table.Version.Next()));
}
table = await this.membershipTable.ReadAll();
var joiningEntry = GetEntryFromTable(table, joiningSilo);
var createdEntry = GetEntryFromTable(table, createdSilo);
Assert.NotNull(joiningEntry);
Assert.NotNull(createdEntry);
Assert.Equal(expected: SiloStatus.Joining, actual: joiningEntry.Item1.Status);
Assert.Equal(expected: SiloStatus.Created, actual: createdEntry.Item1.Status);
// We are going to add numVotesForDeathDeclaration - 1 votes to the created or joining silos
var totalRequiredVotes = clusterMembershipOptions.NumVotesForDeathDeclaration;
var votesNeeded = totalRequiredVotes - 1;
// the joining and created silos should not be declared dead until the required number of votes.
while (votesNeeded > 0)
{
table = await this.membershipTable.ReadAll();
joiningEntry = GetEntryFromTable(table, joiningSilo);
joiningEntry.Item1.AddSuspector(otherSilos[0].SiloAddress, DateTime.UtcNow);
Assert.True(await this.membershipTable.UpdateRow(joiningEntry.Item1, joiningEntry.Item2, table.Version.Next()));
table = await this.membershipTable.ReadAll();
createdEntry = GetEntryFromTable(table, createdSilo);
createdEntry.Item1.AddSuspector(otherSilos[0].SiloAddress, DateTime.UtcNow);
Assert.True(await this.membershipTable.UpdateRow(createdEntry.Item1, createdEntry.Item2, table.Version.Next()));
votesNeeded--;
}
table = await this.membershipTable.ReadAll();
joiningEntry = GetEntryFromTable(table, joiningSilo);
createdEntry = GetEntryFromTable(table, createdSilo);
// Suspect time will be null if numVotesForDeathDeclaration == 1
if (totalRequiredVotes > 1 && evictWhenMaxJoinAttemptTimeExceeded)
{
Assert.Equal(totalRequiredVotes - 1, joiningEntry.Item1.SuspectTimes.Count);
Assert.Equal(totalRequiredVotes - 1, createdEntry.Item1.SuspectTimes.Count);
}
// now we start the lifecycle and let the local silo add the final vote.
await this.lifecycle.OnStart();
await testRig.Manager.Refresh();
await Until(() => testRig.TestAccessor.ObservedVersion > lastVersion);
lastVersion = testRig.TestAccessor.ObservedVersion;
table = await this.membershipTable.ReadAll();
joiningEntry = GetEntryFromTable(table, joiningSilo);
createdEntry = GetEntryFromTable(table, createdSilo);
var expectedVotes = totalRequiredVotes == 1
? 2
: totalRequiredVotes;
expectedVotes = evictWhenMaxJoinAttemptTimeExceeded
? totalRequiredVotes
: totalRequiredVotes - 1;
Assert.True(expectedVotes <= joiningEntry.Item1.SuspectTimes.Count);
Assert.True(expectedVotes <= createdEntry.Item1.SuspectTimes.Count);
Assert.Equal(expected: evictWhenMaxJoinAttemptTimeExceeded ? SiloStatus.Dead : SiloStatus.Joining, actual: joiningEntry.Item1.Status);
Assert.Equal(expected: evictWhenMaxJoinAttemptTimeExceeded ? SiloStatus.Dead : SiloStatus.Created, actual: createdEntry.Item1.Status);
await StopLifecycle();
static Tuple<MembershipEntry, string> GetEntryFromTable(MembershipTableData table, string siloAddress)
{
return table.Members.FirstOrDefault(entry => entry.Item1.SiloAddress.ToParsableString() == siloAddress);
}
}
private static SiloAddress Silo(string value) => SiloAddress.FromParsableString(value);
private static MembershipEntry Entry(SiloAddress address, SiloStatus status) => new MembershipEntry { SiloAddress = address, Status = status };
private static MembershipEntry Entry(SiloAddress address, SiloStatus status, DateTime startTime = default) => new MembershipEntry { SiloAddress = address, Status = status, StartTime = startTime };
private static async Task UntilEqual<T>(T expected, Func<T> getActual)
{
@ -393,5 +518,66 @@ namespace NonSilo.Tests.Membership
await stopped;
}
private class ClusterHealthMonitorTestRig(
MembershipTableManager manager,
IClusterMembershipService membershipService,
IOptionsMonitor<ClusterMembershipOptions> optionsMonitor,
ClusterHealthMonitor.ITestAccessor testAccessor)
{
public readonly MembershipTableManager Manager = manager;
public readonly IClusterMembershipService MembershipService = membershipService;
public readonly IOptionsMonitor<ClusterMembershipOptions> OptionsMonitor = optionsMonitor;
public readonly ClusterHealthMonitor.ITestAccessor TestAccessor = testAccessor;
}
private ClusterHealthMonitorTestRig CreateClusterHealthMonitorTestRig(ClusterMembershipOptions clusterMembershipOptions)
{
var manager = new MembershipTableManager(
localSiloDetails: this.localSiloDetails,
clusterMembershipOptions: Options.Create(clusterMembershipOptions),
membershipTable: membershipTable,
fatalErrorHandler: this.fatalErrorHandler,
gossiper: this.membershipGossiper,
log: this.loggerFactory.CreateLogger<MembershipTableManager>(),
timerFactory: new AsyncTimerFactory(this.loggerFactory),
this.lifecycle);
((ILifecycleParticipant<ISiloLifecycle>)manager).Participate(this.lifecycle);
var membershipService = Substitute.For<IClusterMembershipService>();
membershipService.CurrentSnapshot.ReturnsForAnyArgs(info => manager.MembershipTableSnapshot.CreateClusterMembershipSnapshot());
var optionsMonitor = Substitute.For<IOptionsMonitor<ClusterMembershipOptions>>();
optionsMonitor.CurrentValue.ReturnsForAnyArgs(clusterMembershipOptions);
var monitor = new ClusterHealthMonitor(
this.localSiloDetails,
manager,
this.loggerFactory.CreateLogger<ClusterHealthMonitor>(),
optionsMonitor,
this.fatalErrorHandler,
null);
((ILifecycleParticipant<ISiloLifecycle>)monitor).Participate(this.lifecycle);
var testAccessor = (ClusterHealthMonitor.ITestAccessor)monitor;
testAccessor.CreateMonitor = s => new SiloHealthMonitor(
s,
testAccessor.OnProbeResult,
optionsMonitor,
this.loggerFactory,
this.prober,
this.timerFactory,
this.localSiloHealthMonitor,
membershipService,
this.localSiloDetails);
return new(
manager: manager,
membershipService: membershipService,
optionsMonitor: optionsMonitor,
testAccessor: testAccessor);
}
}
}

Просмотреть файл

@ -288,7 +288,7 @@ namespace NonSilo.Tests.Membership
[Fact]
public async Task MembershipTableManager_Superseded()
{
// The table includes a sucessor to this silo.
// The table includes a successor to this silo.
var successor = Entry(Silo("127.0.0.1:100@200"), SiloStatus.Active);
var otherSilos = new[]