Create test data infrastructure

This commit is contained in:
Gregorius Soedharmo 2022-10-12 00:50:07 +07:00
Родитель 2fb3e0c616
Коммит bb7a114a53
14 изменённых файлов: 370 добавлений и 131 удалений

Просмотреть файл

@ -1,39 +0,0 @@
// -----------------------------------------------------------------------
// <copyright file="ActorSystemExtensions.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using Akka.Actor;
using Akka.Event;
namespace Akka.Persistence.Sql.Exporter.Shared;
public static class ActorSystemExtensions
{
private const int TotalEvents = 1000;
public static async Task CreateTestData(this ActorSystem system)
{
var log = Logging.GetLogger(system, "SQLExporter");
var actor1 = system.ActorOf(PersistenceActor.Props("one"));
await actor1.Ask<Done>(Ready.Instance);
log.Info($">>>>>>>>>>> Persisting {TotalEvents} events");
foreach (var i in Enumerable.Range(0, TotalEvents))
{
actor1.Tell(new Store(i));
if(i>0 && i%500 == 0)
log.Info($">>>>>>>>>>> Queued: {i} events");
}
log.Info(">>>>>>>>>>> Waiting for all events to be persisted");
var count = 0;
while (count < TotalEvents)
{
count = (int) await actor1.Ask<long>(Finish.Instance);
log.Info($">>>>>>>>>>> Persisted: {count} events");
}
}
}

Просмотреть файл

@ -7,6 +7,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" Version="0.5.0" />
<PackageReference Include="Akka.Cluster.Sharding" Version="1.4.43" />
<PackageReference Include="Akka.Persistence" Version="1.4.43" />
<PackageReference Include="Akka.Persistence.Query" Version="1.4.41" />
<PackageReference Include="Akka.Persistence.Query.Sql" Version="1.4.41" />

Просмотреть файл

@ -1,62 +0,0 @@
// -----------------------------------------------------------------------
// <copyright file="PersistenceActor.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using Akka.Actor;
namespace Akka.Persistence.Sql.Exporter.Shared;
public class PersistenceActor: PersistentActor
{
public static Props Props(string id) => Actor.Props.Create(() => new PersistenceActor(id));
private long _count;
private long _state;
public PersistenceActor(string id)
{
PersistenceId = id;
}
protected override bool ReceiveRecover(object message)
{
switch (message)
{
case Stored s:
_count++;
_state += s.Value;
return true;
default:
return false;
}
}
protected override bool ReceiveCommand(object message)
{
switch (message)
{
case Store s:
Persist(new Stored(s.Value), stored =>
{
_count++;
_state += stored.Value;
});
return true;
case Ready _:
Sender.Tell(Done.Instance);
return true;
case Finish _:
Sender.Tell(_count);
return true;
default:
return false;
}
}
public override string PersistenceId { get; }
}

Просмотреть файл

@ -0,0 +1,29 @@
// -----------------------------------------------------------------------
// <copyright file="DataGenerator.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace Akka.Persistence.Sql.Exporter.Shared.Test;
public sealed class DataGenerator
{
public static readonly string[] Tags = { "Tag1", "Tag2", "Tag3", "Tag4" };
private readonly TestCluster _testCluster;
public DataGenerator(TestCluster testCluster)
{
_testCluster = testCluster;
}
public async Task GenerateAsync(CancellationToken token = default)
{
if (!_testCluster.IsStarted)
throw new Exception("Test cluster has not been started yet.");
var region = _testCluster.ShardRegions.First();
}
}

Просмотреть файл

@ -0,0 +1,113 @@
// -----------------------------------------------------------------------
// <copyright file="PersistenceActor.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Event;
using Akka.Persistence.Journal;
namespace Akka.Persistence.Sql.Exporter.Shared.Test;
public sealed class ShardedMessage
{
public ShardedMessage(string entityId, int message)
{
EntityId = entityId;
Message = message;
}
public string EntityId { get; }
public int Message { get; }
}
public sealed class MessageExtractor : HashCodeMessageExtractor
{
// We're only doing 100 entities
private const int MaxEntities = 100;
/// <summary>
/// We only ever run three nodes, so ~10 shards per node
/// </summary>
public MessageExtractor() : base(30)
{
}
public override string? EntityId(object message)
=> message switch
{
int i => (i % MaxEntities).ToString(),
string str => (int.Parse(str) % MaxEntities).ToString(),
ShardedMessage msg => msg.EntityId,
ShardingEnvelope msg => msg.EntityId,
_ => null
};
}
public sealed class EntityActor : ReceivePersistentActor
{
public static Props Props(string id) => Actor.Props.Create(() => new EntityActor(id));
private ILoggingAdapter _log;
private int _total;
private readonly string[] _tags = DataGenerator.Tags;
public EntityActor(string persistenceId)
{
_log = Context.GetLogger();
PersistenceId = persistenceId;
Command<int>(msg => Persist(msg, i =>
{
_total += i;
}));
Command<string>(msg => Persist(msg, str =>
{
_total += int.Parse(str);
}));
Command<ShardedMessage>(msg =>
{
object obj = (msg.Message % 4) switch
{
0 => msg,
1 => new Tagged(msg, new[] { _tags[0] }),
2 => new Tagged(msg, new[] { _tags[0], _tags[1] }),
_ => new Tagged(msg, new[] { _tags[0], _tags[1], _tags[2] })
};
if(obj is Tagged tagged)
{
Persist(tagged, sm => { _total += ((ShardedMessage)sm.Payload).Message; });
}
else
{
Persist(msg, sm => { _total += sm.Message; });
}
});
Command<Finish>(_ =>
{
Sender.Tell(_total);
});
Recover<int>(msg => _total += msg);
Recover<string>(msg => _total += int.Parse(msg));
Recover<ShardedMessage>(msg => _total += msg.Message);
}
public override string PersistenceId { get; }
protected override void PreStart()
{
_log.Debug($"EntityActor({PersistenceId}) started");
base.PreStart();
}
}

Просмотреть файл

@ -0,0 +1,37 @@
// -----------------------------------------------------------------------
// <copyright file="EventAdapter.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using Akka.Persistence.Journal;
namespace Akka.Persistence.Sql.Exporter.Shared.Test;
public sealed class EventAdapter: IWriteEventAdapter
{
private readonly string[] _tags = DataGenerator.Tags;
public string Manifest(object evt) => string.Empty;
public object ToJournal(object evt)
{
if (evt is not int && evt is not string)
return evt;
var value = evt switch
{
int i => i,
string str => int.Parse(str),
_ => throw new Exception($"Unknown type: {evt.GetType()}")
};
return (value % 4) switch
{
0 => evt,
1 => new Tagged(evt, new[]{ _tags[0] }),
2 => new Tagged(evt, new[]{ _tags[0], _tags[1] }),
_ => new Tagged(evt, new[]{ _tags[0], _tags[1], _tags[2] })
};
}
}

Просмотреть файл

@ -4,7 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------
namespace Akka.Persistence.Sql.Exporter.Shared;
namespace Akka.Persistence.Sql.Exporter.Shared.Test;
public sealed class Store
{

Просмотреть файл

@ -0,0 +1,139 @@
// -----------------------------------------------------------------------
// <copyright file="TestCluster.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.Cluster.Sharding;
using Akka.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using LogLevel = Akka.Event.LogLevel;
namespace Akka.Persistence.Sql.Exporter.Shared.Test;
public sealed class TestCluster: IAsyncDisposable
{
private readonly IHost _host1;
private readonly IHost _host2;
private readonly IHost _host3;
private readonly TimeSpan _clusterStartTimeout;
public TestCluster(
Action<AkkaConfigurationBuilder, IServiceProvider> setup,
float clusterStartTimeoutInSeconds = 10)
{
_clusterStartTimeout = TimeSpan.FromSeconds(clusterStartTimeoutInSeconds);
_host1 = CreateHost(setup);
_host2 = CreateHost(setup);
_host3 = CreateHost(setup);
}
public bool IsStarted => ShardRegions.Count > 0;
public ImmutableList<IActorRef> ShardRegions { get; private set; } = ImmutableList<IActorRef>.Empty;
public async Task StartAsync(CancellationToken token = default)
{
await Task.WhenAll(
_host1.StartAsync(token),
_host2.StartAsync(token),
_host3.StartAsync(token)
);
await StartClusterAsync(token);
ShardRegions = ShardRegions.Add(_host1.Services.GetRequiredService<ActorRegistry>().Get<ShardRegion>());
ShardRegions = ShardRegions.Add(_host2.Services.GetRequiredService<ActorRegistry>().Get<ShardRegion>());
ShardRegions = ShardRegions.Add(_host3.Services.GetRequiredService<ActorRegistry>().Get<ShardRegion>());
}
private static IHost CreateHost(Action<AkkaConfigurationBuilder, IServiceProvider> setup)
=> new HostBuilder()
.ConfigureLogging(logger => { logger.AddConsole(); })
.ConfigureServices((_, services) =>
{
services.AddAkka("TestSystem", (builder, provider) =>
{
builder
.AddHocon("akka.cluster.min-nr-of-members = 3", HoconAddMode.Prepend)
.ConfigureLoggers(logger => logger.LogLevel = LogLevel.DebugLevel)
.WithClustering()
.WithShardRegion<ShardRegion>(
"test",
EntityActor.Props,
new MessageExtractor(),
new ShardOptions
{
RememberEntities = true,
StateStoreMode = StateStoreMode.Persistence
});
setup(builder, provider);
});
}).Build();
[SuppressMessage("ReSharper", "MethodHasAsyncOverloadWithCancellation")]
private async Task StartClusterAsync(CancellationToken token)
{
var node1 = _host1.Services.GetRequiredService<ActorSystem>();
var node2 = _host2.Services.GetRequiredService<ActorSystem>();
var node3 = _host3.Services.GetRequiredService<ActorSystem>();
var clusterTcs = new TaskCompletionSource();
// We're not using JoinAsync, this method is doing what JoinAsync would, but with multiple nodes at once.
var cluster = Cluster.Cluster.Get(node1);
var address = cluster.SelfAddress;
cluster.RegisterOnMemberUp(() => clusterTcs.SetResult());
cluster.Join(address);
Cluster.Cluster.Get(node2).Join(address);
Cluster.Cluster.Get(node3).Join(address);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
cts.CancelAfter(_clusterStartTimeout);
var task = await Task.WhenAny(clusterTcs.Task, Task.Delay(Timeout.Infinite, cts.Token));
if(task != clusterTcs.Task)
{
await ShutdownAsync();
throw new TimeoutException($"Cluster failed to form in {_clusterStartTimeout.TotalSeconds} seconds");
}
cts.Cancel();
}
private async Task ShutdownAsync()
{
var tasks = new List<Task>();
if(_host1 is IAsyncDisposable asyncHost1)
tasks.Add(asyncHost1.DisposeAsync().AsTask());
else
_host1.Dispose();
if(_host2 is IAsyncDisposable asyncHost2)
tasks.Add(asyncHost2.DisposeAsync().AsTask());
else
_host2.Dispose();
if(_host3 is IAsyncDisposable asyncHost3)
tasks.Add(asyncHost3.DisposeAsync().AsTask());
else
_host3.Dispose();
if(tasks.Count > 0)
await Task.WhenAll(tasks);
}
public async ValueTask DisposeAsync()
{
await ShutdownAsync();
}
}

Просмотреть файл

@ -15,7 +15,7 @@ public class MySqlDocker : DockerContainer
{
}
public string? ConnectionString { get; private set; }
public string ConnectionString { get; private set; } = "";
private int Port { get; } = ThreadLocalRandom.Current.Next(9000, 10000);
private string User { get; } = "root";

Просмотреть файл

@ -4,14 +4,19 @@
// </copyright>
// -----------------------------------------------------------------------
using Akka.Hosting;
using Akka.Persistence.Sql.Exporter.Shared.Test;
await using var docker = new MySqlDocker();
docker.OnStdOut += (_, outputArgs) =>
{
Console.WriteLine(outputArgs.Output);
};
await docker.StartAsync();
var config = ConfigurationFactory.ParseString($@"
void Setup(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal {{
plugin = ""akka.persistence.journal.mysql""
@ -27,10 +32,16 @@ var config = ConfigurationFactory.ParseString($@"
connection-string = ""{docker.ConnectionString}""
}}
}}").WithFallback(MySqlPersistence.DefaultConfiguration());
using var sys = ActorSystem.Create("actorSystem", config);
await sys.CreateTestData();
builder
.AddHocon(config);
}
await using var testCluster = new TestCluster(Setup);
await testCluster.StartAsync();
var generator = new DataGenerator(testCluster);
await generator.GenerateAsync();
//Console.WriteLine(">>>>>>>>>>> Creating backup");
//await docker.DumpDatabase("backup.sql");

Просмотреть файл

@ -4,6 +4,10 @@
// </copyright>
// -----------------------------------------------------------------------
using Akka.Hosting;
using Akka.Persistence.Sql.Exporter.Shared.Test;
using Akka.Persistence.SqlServer.Hosting;
await using var docker = new SqlServerDocker();
docker.OnStdOut += (_, outputArgs) =>
{
@ -11,26 +15,17 @@ docker.OnStdOut += (_, outputArgs) =>
};
await docker.StartAsync();
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal {{
plugin = ""akka.persistence.journal.sql-server""
sql-server {{
auto-initialize = on
connection-string = ""{docker.ConnectionString}""
}}
}}
akka.persistence.snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sql-server""
sql-server {{
auto-initialize = on
connection-string = ""{docker.ConnectionString}""
}}
}}").WithFallback(SqlServerPersistence.DefaultConfiguration());
void Setup(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
builder
.WithSqlServerPersistence(docker.ConnectionString, autoInitialize:true);
}
using var sys = ActorSystem.Create("actorSystem", config);
await using var testCluster = new TestCluster(Setup);
await testCluster.StartAsync();
await sys.CreateTestData();
var generator = new DataGenerator(testCluster);
await generator.GenerateAsync();
Console.WriteLine(">>>>>>>>>>> downloading backup");

Просмотреть файл

@ -10,6 +10,7 @@
<ItemGroup>
<PackageReference Include="Akka.Persistence.SqlServer" Version="1.4.35" />
<PackageReference Include="Akka.Persistence.SqlServer.Hosting" Version="0.5.0" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -17,7 +17,7 @@ public class SqlServerDocker: DockerContainer
}
public string? ConnectionString { get; private set; }
public string ConnectionString { get; private set; } = "";
private int Port { get; } = ThreadLocalRandom.Current.Next(9000, 10000);

Просмотреть файл

@ -4,6 +4,9 @@
// </copyright>
// -----------------------------------------------------------------------
using Akka.Hosting;
using Akka.Persistence.Sql.Exporter.Shared.Test;
var outputPath = Env.OutputPath;
if (!Directory.Exists(outputPath))
Directory.CreateDirectory(outputPath);
@ -15,7 +18,9 @@ if (!File.Exists(dbFile))
var uri = new Uri($"file://{dbFile.Replace("\\", "/")}");
var connectionString = $"Filename={uri}";
var config = ConfigurationFactory.ParseString($@"
void Setup(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal {{
plugin = ""akka.persistence.journal.sqlite""
@ -31,7 +36,15 @@ var config = ConfigurationFactory.ParseString($@"
connection-string = ""{connectionString}""
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
var sys = ActorSystem.Create("actorSystem", config);
await sys.CreateTestData();
builder
.AddHocon(config);
}
await using var testCluster = new TestCluster(Setup);
await testCluster.StartAsync();
var generator = new DataGenerator(testCluster);
await generator.GenerateAsync();
Console.WriteLine(">>>>>>>>>>> DONE!");