- Add Akka.Persistence.MySql exporter
- Add Akka.Persistence.SqlServer exporter
- Add Akka.Persistence.Sqlite exporter
This commit is contained in:
Gregorius Soedharmo 2022-10-01 07:18:39 +07:00
Родитель b8c58b2f2b
Коммит 47ead4974a
29 изменённых файлов: 1464 добавлений и 0 удалений

32
.gitattributes поставляемый Normal file
Просмотреть файл

@ -0,0 +1,32 @@
# Auto detect text files and perform LF normalization
* text=auto
# Custom for Visual Studio
*.cs diff=csharp
*.sln merge=union
*.csproj merge=union
*.vbproj merge=union
*.fsproj merge=union
*.dbproj merge=union
# Standard to msysgit
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain
# Needed for Mono build shell script
*.sh -text eol=lf
# Needed for API Approvals
*.txt text eol=crlf
build.sh eol=lf
*.ps1 eol=crlf

227
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,227 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Approval test outputs
*.received.txt
*.txt.bak
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
build/
bld/
[Bb]in/
[Oo]bj/
.fake/
TestResults/
PerfResults/
*.lock.json
# Visual Studo 2015 cache/options directory
.vs/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUNIT
*.VisualState.xml
TestResult.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opensdf
*.sdf
*.cachefile
# Visual Studio profiler
*.psess
*.vsp
*.vspx
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# JustCode is a .NET coding addin-in
.JustCode
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# NCrunch
_NCrunch_*
.*crunch*.local.xml
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/packages/repositories.config
# Windows Azure Build Output
csx/
*.build.csdef
# Windows Store app package directory
AppPackages/
# Others
*.[Cc]ache
ClientBin/
[Ss]tyle[Cc]op.*
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.pfx
*.publishsettings
node_modules/
bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
# SQL Server files
*.mdf
*.ldf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
# Microsoft Fakes
FakesAssemblies/
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
tools/
build/
.nuget/
.dotnet/
/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.received.txt
launchSettings.json
.idea/
# GhostDoc is a C# XML comment helper
*.[Gg]host[Dd]oc.xml
*.[Gg]host[Dd]oc.user.dic
# CodeRush
.cr/
# Visual Studio Code
.vscode/
# NDepend
*.ndproj
/[Nn][Dd]epend[Oo]ut
.ionide/symbolCache.db

Просмотреть файл

@ -0,0 +1,39 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sqlite.Exporter", "src\Sqlite.Exporter\Sqlite.Exporter.csproj", "{B3BD02FC-F7D9-48AB-ABAD-BC2FC0CE1220}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySql.Exporter", "src\MySql.Exporter\MySql.Exporter.csproj", "{7B47BDFB-F324-4242-9149-AF75C347457E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Sql.Exporter.Shared", "src\Akka.Persistence.Sql.Exporter.Shared\Akka.Persistence.Sql.Exporter.Shared.csproj", "{457A6D02-60FE-4622-8966-54A17442BCF4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlServer.Exporter", "src\SqlServer.Exporter\SqlServer.Exporter.csproj", "{5ED55E80-132B-485C-9A07-E927BB27AE07}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{8975719B-F275-4B9D-950F-BBF878EF2D0E}"
ProjectSection(SolutionItems) = preProject
build.ps1 = build.ps1
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{B3BD02FC-F7D9-48AB-ABAD-BC2FC0CE1220}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B3BD02FC-F7D9-48AB-ABAD-BC2FC0CE1220}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B3BD02FC-F7D9-48AB-ABAD-BC2FC0CE1220}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B3BD02FC-F7D9-48AB-ABAD-BC2FC0CE1220}.Release|Any CPU.Build.0 = Release|Any CPU
{7B47BDFB-F324-4242-9149-AF75C347457E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7B47BDFB-F324-4242-9149-AF75C347457E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B47BDFB-F324-4242-9149-AF75C347457E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B47BDFB-F324-4242-9149-AF75C347457E}.Release|Any CPU.Build.0 = Release|Any CPU
{457A6D02-60FE-4622-8966-54A17442BCF4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{457A6D02-60FE-4622-8966-54A17442BCF4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{457A6D02-60FE-4622-8966-54A17442BCF4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{457A6D02-60FE-4622-8966-54A17442BCF4}.Release|Any CPU.Build.0 = Release|Any CPU
{5ED55E80-132B-485C-9A07-E927BB27AE07}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5ED55E80-132B-485C-9A07-E927BB27AE07}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5ED55E80-132B-485C-9A07-E927BB27AE07}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5ED55E80-132B-485C-9A07-E927BB27AE07}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal

70
build.ps1 Normal file
Просмотреть файл

@ -0,0 +1,70 @@
[CmdletBinding()]
Param(
[Parameter(
Mandatory=$True,
HelpMessage = "The image version number")]
[string] $version,
[Parameter(
Mandatory=$True,
HelpMessage = "Build type, must be one of these: mysql, sqlite, sqlserver, postgresql, or all")]
[ValidateSet("mysql", "sqlite", "sqlserver", "postgresql", "all")]
[string] $build
)
$currentDir = Get-Location
$Env:OUTPUT = "$PSScriptRoot/bin/output"
if("sqlite" -eq $build -or "all" -eq $build) {
# Build exporter
dotnet build -c Release $PSScriptRoot/src/Sqlite.Exporter/Sqlite.Exporter.csproj -o $PSScriptRoot/bin/build
Set-Location $PSScriptRoot/bin
# Execute exporter
dotnet $PSScriptRoot/bin/build/Sqlite.Exporter.dll
# Copy result
Copy-Item $PSScriptRoot/bin/output/database.db -Destination "$PSScriptRoot/bin/akka-sqlite.$version.db"
# Clean-up
Remove-Item -Recurse -Force $PSScriptRoot/bin/build
Remove-Item -Recurse -Force $PSScriptRoot/bin/output
}
if("mysql" -eq $build -or "all" -eq $build) {
# Build exporter
dotnet build -c Release $PSScriptRoot/src/MySql.Exporter/MySql.Exporter.csproj -o $PSScriptRoot/bin/build
Set-Location $PSScriptRoot/bin
# Execute exporter
dotnet $PSScriptRoot/bin/build/MySql.Exporter.dll
# Build docker image
Copy-Item $PSScriptRoot/src/MySql.Exporter/Dockerfile -Destination $PSScriptRoot/bin
docker build -t akka-mysql:$version .
# Clean-up
Remove-Item -Recurse -Force $PSScriptRoot/bin/build
Remove-Item -Recurse -Force $PSScriptRoot/bin/output
Remove-Item $PSScriptRoot/bin/Dockerfile
}
if("sqlserver" -eq $build -or "all" -eq $build) {
# Build exporter
dotnet build -c Release $PSScriptRoot/src/SqlServer.Exporter/SqlServer.Exporter.csproj -o $PSScriptRoot/bin/build
Set-Location $PSScriptRoot/bin
# Execute exporter
dotnet $PSScriptRoot/bin/build/SqlServer.Exporter.dll
# Build docker image
Copy-Item $PSScriptRoot/src/SqlServer.Exporter/Dockerfile -Destination $PSScriptRoot/bin
docker build -t akka-mysql:$version .
# Clean-up
Remove-Item -Recurse -Force $PSScriptRoot/bin/build
Remove-Item -Recurse -Force $PSScriptRoot/bin/output
Remove-Item $PSScriptRoot/bin/Dockerfile
}
Set-Location $currentDir

Просмотреть файл

@ -0,0 +1,39 @@
// -----------------------------------------------------------------------
// <copyright file="ActorSystemExtensions.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using Akka.Actor;
using Akka.Event;
namespace Akka.Persistence.Sql.Exporter.Shared;
public static class ActorSystemExtensions
{
private const int TotalEvents = 1000;
public static async Task CreateTestData(this ActorSystem system)
{
var log = Logging.GetLogger(system, "SQLExporter");
var actor1 = system.ActorOf(PersistenceActor.Props("one"));
await actor1.Ask<Done>(Ready.Instance);
log.Info($">>>>>>>>>>> Persisting {TotalEvents} events");
foreach (var i in Enumerable.Range(0, TotalEvents))
{
actor1.Tell(new Store(i));
if(i>0 && i%500 == 0)
log.Info($">>>>>>>>>>> Queued: {i} events");
}
log.Info(">>>>>>>>>>> Waiting for all events to be persisted");
var count = 0;
while (count < TotalEvents)
{
count = (int) await actor1.Ask<long>(Finish.Instance);
log.Info($">>>>>>>>>>> Persisted: {count} events");
}
}
}

Просмотреть файл

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence" Version="1.4.43" />
<PackageReference Include="Akka.Persistence.Query" Version="1.4.41" />
<PackageReference Include="Akka.Persistence.Query.Sql" Version="1.4.41" />
<PackageReference Include="Docker.DotNet" Version="3.125.12" />
<PackageReference Include="SharpCompress" Version="0.32.2" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,31 @@
// -----------------------------------------------------------------------
// <copyright file="DirectoryInfoExtensions.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace Akka.Persistence.Sql.Exporter.Shared;
public static class DirectoryInfoExtensions
{
public static void DeleteRecursive(this DirectoryInfo baseDir)
{
if(!baseDir.Exists)
return;
foreach (var dir in baseDir.EnumerateDirectories())
dir.DeleteRecursive();
foreach (var file in baseDir.GetFiles())
file.Delete();
baseDir.Delete(true);
}
public static void Clear(this DirectoryInfo dir)
{
dir.DeleteRecursive();
dir.Create();
}
}

Просмотреть файл

@ -0,0 +1,327 @@
// -----------------------------------------------------------------------
// <copyright file="DockerContainer.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Text;
using Docker.DotNet;
using Docker.DotNet.Models;
using SharpCompress.Archives;
using SharpCompress.Archives.Tar;
using SharpCompress.Common;
using SharpCompress.Readers;
using SharpCompress.Readers.Tar;
namespace Akka.Persistence.Sql.Exporter.Shared;
public abstract class DockerContainer: IAsyncDisposable, IDisposable
{
private Stream? _stream;
private CancellationTokenSource _logsCts = new CancellationTokenSource();
private Task? _readDockerTask;
protected DockerContainer(string imageName, string tag, string containerName)
{
ImageName = imageName;
Tag = tag;
ContainerName = containerName;
Client = new DockerClientConfiguration().CreateClient();
OutputPath = Env.OutputPath;
OutputDirectory = new DirectoryInfo(OutputPath);
if(!OutputDirectory.Exists)
OutputDirectory.Create();
OnStdOut += (_, _) => { };
}
public virtual string DatabaseName => "akka_persistence_tests";
public string OutputPath { get; }
public DirectoryInfo OutputDirectory { get; }
private string ImageName { get; }
private string Tag { get; }
private string FullImageName => $"{ImageName}:{Tag}";
public string ContainerName { get; }
protected virtual string? ReadyMarker { get; } = null;
protected virtual TimeSpan ReadyTimeout { get; } = TimeSpan.FromMinutes(1);
public DockerClient Client { get; }
public event EventHandler<OutputReceivedArgs> OnStdOut;
public async Task StartAsync(CancellationToken cancellationToken = default)
{
var images = await Client.Images.ListImagesAsync(new ImagesListParameters
{
Filters = new Dictionary<string, IDictionary<string, bool>>
{
{
"reference",
new Dictionary<string, bool>
{
{FullImageName, true}
}
}
}
}, cancellationToken);
if (images.Count == 0)
await Client.Images.CreateImageAsync(
new ImagesCreateParameters {FromImage = ImageName, Tag = Tag}, null,
new Progress<JSONMessage>(message =>
{
Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage)
? message.ErrorMessage
: $"{message.ID} {message.Status} {message.ProgressMessage}");
}), cancellationToken);
// configure container parameters
var options = new CreateContainerParameters();
ConfigureContainer(options);
options.Image = FullImageName;
options.Name = ContainerName;
options.Tty = true;
// create the container
await Client.Containers.CreateContainerAsync(options, cancellationToken);
// start the container
await Client.Containers.StartContainerAsync(ContainerName, new ContainerStartParameters(), cancellationToken);
// Create streams
_stream = await Client.Containers.GetContainerLogsAsync(
id: ContainerName,
parameters: new ContainerLogsParameters
{
Follow = true,
ShowStdout = true,
ShowStderr = true,
Timestamps = true
},
cancellationToken: cancellationToken);
_readDockerTask = ReadDockerStreamAsync();
// Wait until container is completely ready
if(ReadyMarker is { })
await AwaitUntilReadyAsync(ReadyMarker, ReadyTimeout);
await AfterContainerStartedAsync(cancellationToken);
}
protected abstract void ConfigureContainer(CreateContainerParameters parameters);
protected virtual Task AfterContainerStartedAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private async Task AwaitUntilReadyAsync(string marker, TimeSpan timeout)
{
var tcs = new TaskCompletionSource<string>();
void LineProcessor(object? sender, OutputReceivedArgs args)
{
if(args.Output.Contains(marker))
tcs.SetResult(args.Output);
}
OnStdOut += LineProcessor;
using var cts = new CancellationTokenSource(timeout);
try
{
var task = await Task.WhenAny(Task.Delay(Timeout.Infinite, cts.Token), tcs.Task);
if(task == tcs.Task)
return;
throw new Exception($"Docker image failed to run within {timeout}.");
}
finally
{
cts.Cancel();
cts.Dispose();
OnStdOut -= LineProcessor;
}
}
private readonly Dictionary<int, string> _execCache = new ();
public async Task ExecuteCommandAsync(params string[] command)
{
if (!_execCache.TryGetValue(command.GetHashCode(), out var id))
{
var createResponse = await Client.Exec.ExecCreateContainerAsync(ContainerName, new ContainerExecCreateParameters
{
Detach = true,
Cmd = command
});
id = createResponse.ID;
_execCache[command.GetHashCode()] = id;
}
var stream = await Client.Exec.StartAndAttachContainerExecAsync(id, false);
using (stream)
{
var (stdOut, stdErr) = await stream.ReadOutputToEndAsync(default);
if(!string.IsNullOrWhiteSpace(stdOut))
{
Console.WriteLine(">>>>>>>> StdOut");
Console.WriteLine(stdOut);
Console.WriteLine("<<<<<<<< StdOut");
}
if (!string.IsNullOrWhiteSpace(stdErr))
{
Console.WriteLine(">>>>>>>> StdErr");
Console.WriteLine(stdErr);
Console.WriteLine("<<<<<<<< StdErr");
}
}
}
public async Task DownloadAsync(string path, string outputPath, string outputFile, bool extract = false)
{
var response = await Client.Containers.GetArchiveFromContainerAsync(
id: ContainerName,
parameters: new GetArchiveFromContainerParameters { Path = path },
statOnly: false);
var stream = response.Stream;
try
{
var downloadFile = Path.Combine(outputPath, outputFile);
await using (stream)
{
await stream.DumpToFile(downloadFile);
}
if (extract)
{
var directoryName = Path.GetFileNameWithoutExtension(outputFile);
var extractPath = Path.Combine(outputPath, directoryName);
if (!Directory.Exists(extractPath))
Directory.CreateDirectory(extractPath);
using var archive = ArchiveFactory.Open(downloadFile);
using var reader = archive.ExtractAllEntries();
reader.WriteAllToDirectory(extractPath, new ExtractionOptions
{
ExtractFullPath = true,
Overwrite = true
});
}
}
catch (Exception e)
{
Console.WriteLine($"Failed to download file. {e}");
}
}
private async Task ReadDockerStreamAsync()
{
using var reader = new StreamReader(_stream!);
var tcs = new TaskCompletionSource();
_logsCts.Token.Register(() => tcs.SetResult());
while (!_logsCts.IsCancellationRequested)
{
var task = reader.ReadLineAsync();
var result = await Task.WhenAny(tcs.Task, task);
if (result != task)
break;
var line = task.Result;
if (!string.IsNullOrEmpty(line))
OnStdOut(this, new OutputReceivedArgs(line));
}
}
private bool _disposing;
public async ValueTask DisposeAsync()
{
// Perform async cleanup.
await DisposeAsyncCore().ConfigureAwait(false);
Dispose(false);
GC.SuppressFinalize(this);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual async ValueTask DisposeAsyncCore()
{
if (_disposing) return;
_disposing = true;
_logsCts.Cancel();
_logsCts.Dispose();
if (_readDockerTask is { })
await _readDockerTask;
if(_stream is { })
await _stream.DisposeAsync();
try
{
await Client.Containers.StopContainerAsync(
id: ContainerName,
parameters: new ContainerStopParameters());
await Client.Containers.RemoveContainerAsync(
id: ContainerName,
parameters: new ContainerRemoveParameters { Force = true });
}
catch (Exception e)
{
Console.WriteLine($"Failed to stop and/or remove docker container. {e}");
}
finally
{
Client.Dispose();
}
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if(_disposing) return;
_disposing = true;
_logsCts.Cancel();
_logsCts.Dispose();
_readDockerTask?.GetAwaiter().GetResult();
_stream?.Dispose();
try
{
Client.Containers.StopContainerAsync(
id: ContainerName,
parameters: new ContainerStopParameters())
.GetAwaiter().GetResult();
Client.Containers.RemoveContainerAsync(ContainerName, new ContainerRemoveParameters { Force = true })
.GetAwaiter().GetResult();
}
catch (Exception e)
{
Console.WriteLine($"Failed to stop and/or remove docker container. {e}");
}
finally
{
Client.Dispose();
}
}
}
}

Просмотреть файл

@ -0,0 +1,20 @@
// -----------------------------------------------------------------------
// <copyright file="Exporter.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace Akka.Persistence.Sql.Exporter.Shared;
public static class Env
{
public static string OutputPath
{
get
{
var path = Environment.GetEnvironmentVariable("OUTPUT");
return string.IsNullOrEmpty(path) ? Path.GetFullPath("./bin/output") : path;
}
}
}

Просмотреть файл

@ -0,0 +1,39 @@
// -----------------------------------------------------------------------
// <copyright file="Messages.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace Akka.Persistence.Sql.Exporter.Shared;
public sealed class Store
{
public readonly int Value;
public Store(int value)
{
Value = value;
}
}
public sealed class Stored
{
public readonly int Value;
public Stored(int value)
{
Value = value;
}
}
public sealed class Finish
{
public static readonly Finish Instance = new();
private Finish() { }
}
public sealed class Ready
{
public static readonly Ready Instance = new();
private Ready() { }
}

Просмотреть файл

@ -0,0 +1,18 @@
// -----------------------------------------------------------------------
// <copyright file="OutputReceivedArgs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace Akka.Persistence.Sql.Exporter.Shared;
public class OutputReceivedArgs : EventArgs
{
public OutputReceivedArgs(string output)
{
Output = output;
}
public string Output { get; }
}

Просмотреть файл

@ -0,0 +1,62 @@
// -----------------------------------------------------------------------
// <copyright file="PersistenceActor.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using Akka.Actor;
namespace Akka.Persistence.Sql.Exporter.Shared;
public class PersistenceActor: PersistentActor
{
public static Props Props(string id) => Actor.Props.Create(() => new PersistenceActor(id));
private long _count;
private long _state;
public PersistenceActor(string id)
{
PersistenceId = id;
}
protected override bool ReceiveRecover(object message)
{
switch (message)
{
case Stored s:
_count++;
_state += s.Value;
return true;
default:
return false;
}
}
protected override bool ReceiveCommand(object message)
{
switch (message)
{
case Store s:
Persist(new Stored(s.Value), stored =>
{
_count++;
_state += stored.Value;
});
return true;
case Ready _:
Sender.Tell(Done.Instance);
return true;
case Finish _:
Sender.Tell(_count);
return true;
default:
return false;
}
}
public override string PersistenceId { get; }
}

Просмотреть файл

@ -0,0 +1,47 @@
// -----------------------------------------------------------------------
// <copyright file="StreamExtensions.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using System.Buffers;
namespace Akka.Persistence.Sql.Exporter.Shared;
public static class StreamExtensions
{
public static async Task<byte[]> ReadAllBytes(this Stream stream, CancellationToken token)
{
using var ms = new MemoryStream((int) stream.Length);
using var memoryOwner = MemoryPool<byte>.Shared.Rent(1024);
var memory = memoryOwner.Memory;
token.Register(() => throw new OperationCanceledException("Read operation has been cancelled", token));
while (true)
{
var read = await stream.ReadAsync(memory, token);
if (read == 0)
break;
await ms.WriteAsync(memory, token);
}
return ms.ToArray();
}
public static async Task DumpToFile(this Stream stream, string path)
{
if(File.Exists(path)) File.Delete(path);
await using var outStream = File.Open(path, FileMode.OpenOrCreate, FileAccess.Write);
using var memoryOwner = MemoryPool<byte>.Shared.Rent(1024);
var memory = memoryOwner.Memory;
while(true)
{
var read = await stream.ReadAsync(memory);
if (read == 0)
break;
await outStream.WriteAsync(memory[..read]);
}
}
}

Просмотреть файл

@ -0,0 +1,25 @@
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/.idea
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md

Просмотреть файл

@ -0,0 +1,6 @@
FROM mysql:8
ENV MYSQL_ROOT_PASSWORD=Password12!
ENV MYSQL_DATABASE=akka_persistence_tests
# COPY output/backup.sql /docker-entrypoint-initdb.d/
ADD output/mysql.tar /var/lib

Просмотреть файл

@ -0,0 +1,14 @@
// -----------------------------------------------------------------------
// <copyright file="Globals.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
global using Akka.Actor;
global using Akka.Configuration;
global using Akka.Persistence.MySql;
global using Akka.Persistence.Sql.Exporter.Shared;
global using Akka.Util;
global using Docker.DotNet.Models;
global using MySql.Exporter;
global using System.Data.Common;

Просмотреть файл

@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence.MySql" Version="1.4.35" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Sql.Exporter.Shared\Akka.Persistence.Sql.Exporter.Shared.csproj" />
</ItemGroup>
<ItemGroup>
<Compile Remove="output\**" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Remove="output\**" />
</ItemGroup>
<ItemGroup>
<None Remove="output\**" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,75 @@
// -----------------------------------------------------------------------
// <copyright file="MySqlDocker.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace MySql.Exporter;
/// <summary>
/// Fixture used to run SQL Server
/// </summary>
public class MySqlDocker : DockerContainer
{
public MySqlDocker() : base("mysql", "8", $"mysql-{Guid.NewGuid():N}")
{
}
public string? ConnectionString { get; private set; }
private int Port { get; } = ThreadLocalRandom.Current.Next(9000, 10000);
private string User { get; } = "root";
private string Password { get; } = "Password12!";
protected override string ReadyMarker => "ready for connections.";
protected override void ConfigureContainer(CreateContainerParameters parameters)
{
parameters.ExposedPorts = new Dictionary<string, EmptyStruct>
{
["3306/tcp"] = new()
};
parameters.HostConfig = new HostConfig
{
PortBindings = new Dictionary<string, IList<PortBinding>>
{
["3306/tcp"] = new List<PortBinding> { new() { HostPort = $"{Port}" } }
}
};
parameters.Env = new[]
{
$"MYSQL_ROOT_PASSWORD={Password}",
$"MYSQL_DATABASE={DatabaseName}",
};
}
public async Task DumpDatabase(string outputFile)
{
try
{
await ExecuteCommandAsync("sh", "-c", $"exec mysqldump -u{User} -p'{Password}' {DatabaseName} > {outputFile}");
}
catch (Exception e)
{
Console.WriteLine($">>>>>>>>>> Failed to execute command. {e}");
}
}
protected override Task AfterContainerStartedAsync(CancellationToken cancellationToken)
{
var connectionString = new DbConnectionStringBuilder
{
["Server"] = "localhost",
["Port"] = Port.ToString(),
["Database"] = DatabaseName,
["User Id"] = User,
["Password"] = Password
};
ConnectionString = connectionString.ToString();
Console.WriteLine($"Connection string: [{ConnectionString}]");
return Task.CompletedTask;
}
}

Просмотреть файл

@ -0,0 +1,43 @@
// -----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
await using var docker = new MySqlDocker();
docker.OnStdOut += (_, outputArgs) =>
{
Console.WriteLine(outputArgs.Output);
};
await docker.StartAsync();
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal {{
plugin = ""akka.persistence.journal.mysql""
mysql {{
auto-initialize = on
connection-string = ""{docker.ConnectionString}""
}}
}}
akka.persistence.snapshot-store {{
plugin = ""akka.persistence.snapshot-store.mysql""
mysql {{
auto-initialize = on
connection-string = ""{docker.ConnectionString}""
}}
}}").WithFallback(MySqlPersistence.DefaultConfiguration());
using var sys = ActorSystem.Create("actorSystem", config);
await sys.CreateTestData();
//Console.WriteLine(">>>>>>>>>>> Creating backup");
//await docker.DumpDatabase("backup.sql");
Console.WriteLine(">>>>>>>>>>> downloading backup");
//await docker.DownloadAsync("backup.sql", docker.OutputPath, "backup.tar");
await docker.DownloadAsync("/var/lib/mysql/", docker.OutputPath, "mysql.tar");
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -0,0 +1,25 @@
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/.idea
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md

Просмотреть файл

@ -0,0 +1,56 @@
// -----------------------------------------------------------------------
// <copyright file="DbUtils.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
namespace SqlServer.Exporter;
public static class DbUtils
{
public static async Task Initialize(string connectionString)
{
var connectionBuilder = new SqlConnectionStringBuilder(connectionString);
var databaseName = connectionBuilder.InitialCatalog;
connectionBuilder.InitialCatalog = "master";
var newConnStr = connectionBuilder.ToString();
await using var conn = new SqlConnection(newConnStr);
conn.Open();
await using var cmd = new SqlCommand();
cmd.CommandText = @$"
IF db_id('{databaseName}') IS NULL
BEGIN
CREATE DATABASE {databaseName}
END
";
cmd.Connection = conn;
await cmd.ExecuteScalarAsync();
await DropTablesAsync(conn, databaseName);
}
public static async Task CleanAsync(string connectionString)
{
var connectionBuilder = new SqlConnectionStringBuilder(connectionString);
var databaseName = connectionBuilder.InitialCatalog;
await using var conn = new SqlConnection(connectionString);
conn.Open();
await DropTablesAsync(conn, databaseName);
}
private static async Task DropTablesAsync(SqlConnection conn, string databaseName)
{
await using var cmd = new SqlCommand();
cmd.CommandText = $@"
USE {databaseName};
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'EventJournal') BEGIN DROP TABLE dbo.EventJournal END;
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'Metadata') BEGIN DROP TABLE dbo.Metadata END;
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'SnapshotStore') BEGIN DROP TABLE dbo.SnapshotStore END;";
cmd.Connection = conn;
await cmd.ExecuteNonQueryAsync();
}
}

Просмотреть файл

@ -0,0 +1,7 @@
FROM mcr.microsoft.com/mssql/server:2019-latest
ENV ACCEPT_EULA=Y
ENV MSSQL_SA_PASSWORD=Password12!
ENV MSSQL_PID=Express
ADD output/data.tar /var/opt/mssql

Просмотреть файл

@ -0,0 +1,12 @@
// -----------------------------------------------------------------------
// <copyright file="Globals.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
global using Akka.Actor;
global using Akka.Configuration;
global using Akka.Persistence.SqlServer;
global using Akka.Persistence.Sql.Exporter.Shared;
global using SqlServer.Exporter;
global using Microsoft.Data.SqlClient;

Просмотреть файл

@ -0,0 +1,39 @@
// -----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
await using var docker = new SqlServerDocker();
docker.OnStdOut += (_, outputArgs) =>
{
Console.WriteLine(outputArgs.Output);
};
await docker.StartAsync();
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal {{
plugin = ""akka.persistence.journal.sql-server""
sql-server {{
auto-initialize = on
connection-string = ""{docker.ConnectionString}""
}}
}}
akka.persistence.snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sql-server""
sql-server {{
auto-initialize = on
connection-string = ""{docker.ConnectionString}""
}}
}}").WithFallback(SqlServerPersistence.DefaultConfiguration());
using var sys = ActorSystem.Create("actorSystem", config);
await sys.CreateTestData();
Console.WriteLine(">>>>>>>>>>> downloading backup");
await docker.DownloadAsync("/var/opt/mssql/data/", docker.OutputPath, "data.tar");
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence.SqlServer" Version="1.4.35" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Sql.Exporter.Shared\Akka.Persistence.Sql.Exporter.Shared.csproj" />
</ItemGroup>
<ItemGroup>
<Compile Remove="output\**" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Remove="output\**" />
</ItemGroup>
<ItemGroup>
<None Remove="output\**" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,66 @@
// -----------------------------------------------------------------------
// <copyright file="SqlServerDocker.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using System.Data.Common;
using Akka.Util;
using Docker.DotNet.Models;
namespace SqlServer.Exporter;
public class SqlServerDocker: DockerContainer
{
public SqlServerDocker() : base("mcr.microsoft.com/mssql/server", "2019-latest", $"mssql-{Guid.NewGuid():N}")
{
}
public string? ConnectionString { get; private set; }
private int Port { get; } = ThreadLocalRandom.Current.Next(9000, 10000);
private string User { get; } = "sa";
private string Password { get; } = "Password12!";
protected override string ReadyMarker => "Recovery is complete.";
protected override void ConfigureContainer(CreateContainerParameters parameters)
{
parameters.ExposedPorts = new Dictionary<string, EmptyStruct>
{
["1433/tcp"] = new()
};
parameters.HostConfig = new HostConfig
{
PortBindings = new Dictionary<string, IList<PortBinding>>
{
["1433/tcp"] = new List<PortBinding> { new() { HostPort = $"{Port}" } }
}
};
parameters.Env = new[]
{
"ACCEPT_EULA=Y",
$"MSSQL_SA_PASSWORD={Password}",
"MSSQL_PID=Express"
};
}
protected override async Task AfterContainerStartedAsync(CancellationToken cancellationToken)
{
var builder = new DbConnectionStringBuilder
{
["Server"] = $"localhost,{Port}",
["Database"] = DatabaseName,
["User Id"] = User,
["Password"] = Password
};
ConnectionString = builder.ToString();
await DbUtils.Initialize(ConnectionString);
Console.WriteLine($"Connection string: [{ConnectionString}]");
}
}

Просмотреть файл

@ -0,0 +1,10 @@
// -----------------------------------------------------------------------
// <copyright file="Globals.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
global using Akka.Actor;
global using Akka.Configuration;
global using Akka.Persistence.Sql.Exporter.Shared;
global using Akka.Persistence.Sqlite;

Просмотреть файл

@ -0,0 +1,37 @@
// -----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
var outputPath = Env.OutputPath;
if (!Directory.Exists(outputPath))
Directory.CreateDirectory(outputPath);
var dbFile = Path.Combine(outputPath, "database.db");
if (!File.Exists(dbFile))
File.Create(dbFile).Close();
var uri = new Uri($"file://{dbFile.Replace("\\", "/")}");
var connectionString = $"Filename={uri}";
var config = ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite {{
auto-initialize = on
connection-string = ""{connectionString}""
}}
}}
akka.persistence.snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sqlite""
sqlite {{
auto-initialize = on
connection-string = ""{connectionString}""
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
var sys = ActorSystem.Create("actorSystem", config);
await sys.CreateTestData();
Console.WriteLine(">>>>>>>>>>> DONE!");

Просмотреть файл

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence.Sqlite" Version="1.4.41" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Sql.Exporter.Shared\Akka.Persistence.Sql.Exporter.Shared.csproj" />
</ItemGroup>
</Project>