This commit is contained in:
Gregorius Soedharmo 2022-10-28 17:02:02 +07:00
Родитель 6a88763113
Коммит 60607a94ba
15 изменённых файлов: 228 добавлений и 13 удалений

Просмотреть файл

@ -23,36 +23,128 @@ public sealed class DataGenerator
throw new Exception("Test cluster has not been started yet.");
var region = _testCluster.ShardRegions.First();
await GenerateDataAsync(region, token);
await UntilSnapshotAndClearCompleteAsync(region, token);
await GenerateDataAsync(region, token);
await UntilSnapshotCompleteAsync(region, token);
await GenerateDataAsync(region, token);
await UntilFinishedAsync(region, token);
}
private static async Task GenerateDataAsync(IActorRef region, CancellationToken token)
{
const int pauseEvery = 500;
const int pauseMillis = 500;
var count = 0;
// 4 test types: int, string, ShardedMessage, and CustomShardedMessage
foreach (var i in Enumerable.Range(0, Utils.MessagesPerType))
{
region.Tell(i);
count++;
if (count % pauseEvery == 0)
{
Console.WriteLine($"Sent {count} data, pausing for {pauseMillis} ms.");
await Task.Delay(pauseMillis, token);
}
if (token.IsCancellationRequested)
return;
}
foreach (var i in Enumerable.Range(0, Utils.MessagesPerType))
{
region.Tell(i.ToString());
count++;
if (count % pauseEvery == 0)
{
Console.WriteLine($"Sent {count} data, pausing for {pauseMillis} ms.");
await Task.Delay(pauseMillis, token);
}
if (token.IsCancellationRequested)
return;
}
foreach (var i in Enumerable.Range(0, Utils.MessagesPerType))
{
region.Tell(new ShardedMessage(i));
count++;
if (count % pauseEvery == 0)
{
Console.WriteLine($"Sent {count} data, pausing for {pauseMillis} ms.");
await Task.Delay(pauseMillis, token);
}
if (token.IsCancellationRequested)
return;
}
foreach (var i in Enumerable.Range(0, Utils.MessagesPerType))
{
region.Tell(new CustomShardedMessage(i));
count++;
if (count % pauseEvery == 0)
{
Console.WriteLine($"Sent {count} data, pausing for {pauseMillis} ms.");
await Task.Delay(pauseMillis, token);
}
if (token.IsCancellationRequested)
return;
}
var tasks = Enumerable.Range(0, 100).Select(id => region.Ask<(string, int)>(new Finish(id))).ToList();
}
private static async Task UntilFinishedAsync(IActorRef region, CancellationToken token)
{
var tasks = Enumerable.Range(0, 100).Select(id => region.Ask<(string, StateSnapshot, int, int)>(new Finish(id), token)).ToList();
while (tasks.Count > 0)
{
var task = await Task.WhenAny(tasks);
if (token.IsCancellationRequested)
return;
tasks.Remove(task);
var (id, count) = task.Result;
Console.WriteLine($">>>>> Entity {id} persisted {count} items");
var (id, lastSnapshot, total, persisted) = task.Result;
Console.WriteLine(
$"{id} data received. " +
$"Snapshot: [Total: {lastSnapshot.Total}, Persisted: {lastSnapshot.Persisted}] " +
$"State: [Total: {total}, Persisted: {persisted}]");
}
}
private static async Task UntilSnapshotCompleteAsync(IActorRef region, CancellationToken token)
{
var tasks = Enumerable.Range(0, 100).Select(id => region.Ask<(string, StateSnapshot)>(new TakeSnapshot(id), token)).ToList();
while (tasks.Count > 0)
{
var task = await Task.WhenAny(tasks);
if (token.IsCancellationRequested)
return;
tasks.Remove(task);
var (id, lastSnapshot) = task.Result;
Console.WriteLine(
$"{id} snapshot completed. " +
$"Snapshot: [Total: {lastSnapshot.Total}, Persisted: {lastSnapshot.Persisted}]");
}
}
private static async Task UntilSnapshotAndClearCompleteAsync(IActorRef region, CancellationToken token)
{
var tasks = Enumerable.Range(0, 100).Select(id => region.Ask<(string, StateSnapshot)>(new TakeSnapshotAndClear(id), token)).ToList();
while (tasks.Count > 0)
{
var task = await Task.WhenAny(tasks);
if (token.IsCancellationRequested)
return;
tasks.Remove(task);
var (id, lastSnapshot) = task.Result;
Console.WriteLine(
$"{id} snapshot completed, journal cleared. " +
$"Snapshot: [Total: {lastSnapshot.Total}, Persisted: {lastSnapshot.Persisted}]");
}
}
}

Просмотреть файл

@ -17,6 +17,11 @@ public sealed class EntityActor : ReceivePersistentActor
private readonly ILoggingAdapter _log;
private int _total;
private int _persisted;
private StateSnapshot _lastSnapshot = StateSnapshot.Empty;
private StateSnapshot _savingSnapshot = StateSnapshot.Empty;
private bool _clearing;
private IActorRef? _sender;
public EntityActor(string persistenceId)
{
@ -82,9 +87,71 @@ public sealed class EntityActor : ReceivePersistentActor
Command<Finish>(_ =>
{
Sender.Tell((PersistenceId, _persisted));
Sender.Tell((PersistenceId, _lastSnapshot, _total, _persisted));
});
Command<TakeSnapshotAndClear>(_ =>
{
_sender = Sender;
_clearing = true;
_savingSnapshot = new StateSnapshot(_total, _persisted);
SaveSnapshot(_savingSnapshot);
});
Command<TakeSnapshot>(_ =>
{
_sender = Sender;
_savingSnapshot = new StateSnapshot(_total, _persisted);
SaveSnapshot(_savingSnapshot);
});
Command<SaveSnapshotSuccess>(msg =>
{
_lastSnapshot = _savingSnapshot;
_savingSnapshot = StateSnapshot.Empty;
if(!_clearing)
{
_sender.Tell((PersistenceId, _lastSnapshot));
return;
}
_clearing = false;
DeleteMessages(msg.Metadata.SequenceNr);
});
Command<SaveSnapshotFailure>(fail =>
{
_log.Error(fail.Cause, "SaveSnapshot failed!");
_savingSnapshot = StateSnapshot.Empty;
_sender.Tell((PersistenceId, _lastSnapshot));
});
Command<DeleteMessagesSuccess>(_ =>
{
_sender.Tell((PersistenceId, _lastSnapshot));
});
Command<DeleteMessagesFailure>(fail =>
{
_log.Error(fail.Cause, "DeleteMessages failed!");
_sender.Tell((PersistenceId, _lastSnapshot));
});
Command<RecoveryCompleted>(_ =>
{
_log.Info($"{persistenceId}: Recovery completed. State: [Total:{_total}, Persisted:{_persisted}.]");
});
Recover<SnapshotOffer>(offer =>
{
_lastSnapshot = (StateSnapshot) offer.Snapshot;
_total = _lastSnapshot.Total;
_persisted = _lastSnapshot.Persisted;
_log.Info($"{persistenceId}: Snapshot loaded. State: [Total:{_total}, Persisted:{_persisted}.] " +
$"Metadata: [SequenceNr:{offer.Metadata.SequenceNr}, Timestamp:{offer.Metadata.Timestamp}]");
});
Recover<int>(msg =>
{
_total += msg;
@ -102,6 +169,12 @@ public sealed class EntityActor : ReceivePersistentActor
_total += msg.Message;
_persisted++;
});
Recover<CustomShardedMessage>(msg =>
{
_total += msg.Message;
_persisted++;
});
}
public override string PersistenceId { get; }

Просмотреть файл

@ -21,6 +21,26 @@ public sealed class Finish: IHasEntityId
public string EntityId { get; }
}
public sealed class TakeSnapshotAndClear: IHasEntityId
{
public TakeSnapshotAndClear(int entityId)
{
EntityId = (entityId % Utils.MaxEntities).ToString();
}
public string EntityId { get; }
}
public sealed class TakeSnapshot: IHasEntityId
{
public TakeSnapshot(int entityId)
{
EntityId = (entityId % Utils.MaxEntities).ToString();
}
public string EntityId { get; }
}
public sealed class ShardedMessage: IHasEntityId
{
public ShardedMessage(int message)
@ -46,3 +66,17 @@ public sealed class CustomShardedMessage: IHasEntityId
public int Message { get; }
}
public sealed class StateSnapshot
{
public static readonly StateSnapshot Empty = new StateSnapshot(0, 0);
public StateSnapshot(int total, int persisted)
{
Total = total;
Persisted = persisted;
}
public int Total { get; }
public int Persisted { get; }
}

Просмотреть файл

@ -72,7 +72,9 @@ public sealed class TestCluster: IAsyncDisposable
builder
.ConfigureLoggers(logger => logger.LogLevel = LogLevel.InfoLevel)
.AddHocon("akka.cluster.min-nr-of-members = 3", HoconAddMode.Prepend)
.AddHocon(@"
akka.cluster.min-nr-of-members = 3
akka.cluster.sharding.snapshot-after = 20", HoconAddMode.Prepend)
.WithCustomSerializer(
serializerIdentifier: "customSerializer",
boundTypes: new [] {typeof(CustomShardedMessage)},
@ -120,6 +122,9 @@ public sealed class TestCluster: IAsyncDisposable
}
cts.Cancel();
// wait 2 seconds for everything to settle down
await Task.Delay(2000, token);
}
private async Task ShutdownAsync()

Просмотреть файл

@ -12,7 +12,7 @@ public static class Utils
{
// We're only doing 100 entities
public const int MaxEntities = 100;
public const int TaggedVariants = 3;
private const int TaggedVariants = 3;
public const int MessagesPerType = MaxEntities * TaggedVariants;
public static readonly string[] Tags = { "Tag1", "Tag2", "Tag3", "Tag4" };

Просмотреть файл

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>$(NetCoreFramework)</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>

Просмотреть файл

@ -47,7 +47,7 @@ public class MySqlDocker : DockerContainer
{
try
{
await ExecuteCommandAsync("sh", "-c", $"exec mysqldump -u{User} -p'{Password}' {DatabaseName} > {outputFile}");
await ExecuteCommandAsync("mysqldump", $"-u{User}", $"-p'{Password}'", $"{DatabaseName} > {outputFile}");
}
catch (Exception e)
{

Просмотреть файл

@ -43,12 +43,10 @@ await testCluster.StartAsync();
var generator = new DataGenerator(testCluster);
await generator.GenerateAsync();
//Console.WriteLine(">>>>>>>>>>> Creating backup");
//await docker.DumpDatabase("backup.sql");
Console.WriteLine(">>>>>>>>>>> downloading backup");
//await docker.DownloadAsync("backup.sql", docker.OutputPath, "backup.tar");
await docker.DownloadAsync("/var/lib/mysql/", docker.OutputPath, "mysql.tar");
Console.WriteLine(">>>>>>>>>>> Shutting down test cluster");
await testCluster.DisposeAsync();
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>$(NetCoreFramework)</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>

Просмотреть файл

@ -43,6 +43,7 @@ public class PostgreSqlDocker: DockerContainer
$"POSTGRES_USER={User}",
"PGDATA=/data"
};
parameters.Cmd = new List<string> { "postgres", "-c", "max_connections=300" };
}
protected override async Task AfterContainerStartedAsync(CancellationToken cancellationToken)

Просмотреть файл

@ -30,4 +30,7 @@ await docker.DumpDatabaseAsync("backup.sql");
Console.WriteLine(">>>>>>>>>>> downloading backup");
await docker.DownloadAsync("backup.sql", docker.OutputPath, "backup.tar", true, false);
Console.WriteLine(">>>>>>>>>>> Shutting down test cluster");
await testCluster.DisposeAsync();
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -28,7 +28,9 @@ var generator = new DataGenerator(testCluster);
await generator.GenerateAsync();
Console.WriteLine(">>>>>>>>>>> downloading backup");
await docker.DownloadAsync("/var/opt/mssql/data/", docker.OutputPath, "data.tar");
Console.WriteLine(">>>>>>>>>>> Shutting down test cluster");
await testCluster.DisposeAsync();
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>$(NetCoreFramework)</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>

Просмотреть файл

@ -47,4 +47,7 @@ await testCluster.StartAsync();
var generator = new DataGenerator(testCluster);
await generator.GenerateAsync();
Console.WriteLine(">>>>>>>>>>> Shutting down test cluster");
await testCluster.DisposeAsync();
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -3,6 +3,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>$(NetCoreFramework)</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>