Fix MongoDb operations to use transaction on both read and write (#347)

* Fix journal to use transaction on both read and write

* Apply transaction to snapshot store

* Code cleanup and documentation

* Fix unit test project failed to run

* Expand transaction unit test to cover all relevant unit tests

* Operations inside transaction should never be ran concurrently
This commit is contained in:
Gregorius Soedharmo 2023-09-15 22:35:19 +07:00 коммит произвёл GitHub
Родитель ef7bed77e9
Коммит f10240b01c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
29 изменённых файлов: 1148 добавлений и 1113 удалений

Просмотреть файл

@ -33,6 +33,8 @@ jobs:
filePath: ${{ parameters.scriptFileName }}
arguments: ${{ parameters.scriptArgs }}
continueOnError: true
env:
TEST_ENVIRONMENT: "CICD"
condition: in( variables['Agent.OS'], 'Linux', 'Darwin' )
# Windows
- task: BatchScript@1
@ -41,6 +43,8 @@ jobs:
filename: ${{ parameters.scriptFileName }}
arguments: ${{ parameters.scriptArgs }}
continueOnError: true
env:
TEST_ENVIRONMENT: "CICD"
condition: eq( variables['Agent.OS'], 'Windows_NT' )
- task: PublishTestResults@2
inputs:

Просмотреть файл

@ -24,8 +24,8 @@ jobs:
- template: azure-pipeline.template.yaml
parameters:
name: 'net_6_tests_linux'
displayName: '.NET 6 Unit Tests (Linux)'
name: 'net_7_tests_linux'
displayName: '.NET 7 Unit Tests (Linux)'
vmImage: 'ubuntu-latest'
scriptFileName: ./build.sh
scriptArgs: runTestsNet

Просмотреть файл

@ -32,8 +32,8 @@ jobs:
- template: azure-pipeline.template.yaml
parameters:
name: 'net_6_tests_windows'
displayName: '.NET 6 Unit Tests (Windows)'
name: 'net_7_tests_windows'
displayName: '.NET 7 Unit Tests (Windows)'
vmImage: 'windows-latest'
scriptFileName: build.cmd
scriptArgs: runTestsNet

Просмотреть файл

@ -28,6 +28,8 @@ steps:
version: 7.0.x
- task: BatchScript@1
displayName: 'FAKE Build'
env:
TEST_ENVIRONMENT: "CICD"
inputs:
filename: build.cmd
arguments: 'nuget nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(nugetKey)'

Просмотреть файл

@ -52,7 +52,7 @@ let outputBinariesNet = outputBinaries @@ "net5.0"
// Configuration values for tests
let testNetFrameworkVersion = "net471"
let testNetCoreVersion = "netcoreapp3.1"
let testNetVersion = "net6.0"
let testNetVersion = "net7.0"
Target "Clean" (fun _ ->
ActivateFinalTarget "KillCreatedProcesses"

Просмотреть файл

@ -3,6 +3,10 @@
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
</PropertyGroup>
<PropertyGroup>
<DefineConstants Condition="'$(TEST_ENVIRONMENT)' == 'CICD'" >$(DefineConstants);CICD</DefineConstants>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />

Просмотреть файл

@ -15,45 +15,64 @@ using Xunit.Abstractions;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbAllEventsSpec: AllEventsSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionAllEventsSpec : MongoDbAllEventsSpecBase
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
public MongoDbTransactionAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbAllEventsSpec : MongoDbAllEventsSpecBase
{
public MongoDbAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbAllEventsSpecBase: AllEventsSpec, IClassFixture<DatabaseFixture>
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
// akka.test.single-expect-default = 10s
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output)
private static readonly AtomicCounter Counter = new AtomicCounter(0);
protected MongoDbAllEventsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbAllEventsSpec", output)
{
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}

Просмотреть файл

@ -15,45 +15,64 @@ using Xunit.Abstractions;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentAllEventsSpec : CurrentAllEventsSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionCurrentAllEventsSpec : MongoDbCurrentAllEventsSpecBase
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
public MongoDbTransactionCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbCurrentAllEventsSpec : MongoDbCurrentAllEventsSpecBase
{
public MongoDbCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbCurrentAllEventsSpecBase : CurrentAllEventsSpec, IClassFixture<DatabaseFixture>
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
// akka.test.single-expect-default = 10s
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentAllEventsSpec", output)
private static readonly AtomicCounter Counter = new AtomicCounter(0);
protected MongoDbCurrentAllEventsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentAllEventsSpec", output)
{
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}

Просмотреть файл

@ -16,48 +16,68 @@ using Akka.Util.Internal;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionCurrentEventsByPersistenceIdsSpec: MongoDbCurrentEventsByPersistenceIdsSpecBase
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByPersistenceIdsSpec: MongoDbCurrentEventsByPersistenceIdsSpecBase
{
public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbCurrentEventsByPersistenceIdsSpecBase : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByPersistenceIdsSpec", output)
protected MongoDbCurrentEventsByPersistenceIdsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentEventsByPersistenceIdsSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current));
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}

Просмотреть файл

@ -21,13 +21,28 @@ using System.Diagnostics;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByTagSpec : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionCurrentEventsByTagSpec : MongoDbCurrentEventsByTagSpecBase
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByTagSpec : MongoDbCurrentEventsByTagSpecBase
{
public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbCurrentEventsByTagSpecBase : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture<DatabaseFixture>
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output)
protected MongoDbCurrentEventsByTagSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentEventsByTagSpec", output)
{
_output = output;
@ -39,53 +54,50 @@ namespace Akka.Persistence.MongoDb.Tests
ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
protected override bool SupportsTagsInEventEnvelope => true;
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
event-adapters {
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
}
event-adapter-bindings = {
""System.String"" = color-tagger
}
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
event-adapters {
color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"
}
event-adapter-bindings = {
"System.String" = color-tagger
}
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}
//public override void ReadJournal_query_CurrentEventsByTag_should_find_existing_events()
//{
// var a = Sys.ActorOf(TestActor.Props("a"));
// a.Tell("warm-up");
// ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
//}
internal class TestActor : UntypedPersistentActor
{
public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId));
@ -126,6 +138,4 @@ namespace Akka.Persistence.MongoDb.Tests
}
}
}
}

Просмотреть файл

@ -21,41 +21,58 @@ using System.Diagnostics;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionCurrentPersistenceIdsSpec : MongoDbCurrentPersistenceIdsSpecBase
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbCurrentPersistenceIdsSpec : MongoDbCurrentPersistenceIdsSpecBase
{
public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbCurrentPersistenceIdsSpecBase : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture<DatabaseFixture>
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentPersistenceIdsSpec", output)
protected MongoDbCurrentPersistenceIdsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentPersistenceIdsSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current));
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}

Просмотреть файл

@ -16,42 +16,59 @@ using Akka.Util.Internal;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbEventsByPersistenceIdSpec : Akka.Persistence.TCK.Query.EventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionEventsByPersistenceIdSpec : MongoDbEventsByPersistenceIdSpecBase
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbEventsByPersistenceIdSpec : MongoDbEventsByPersistenceIdSpecBase
{
public MongoDbEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbEventsByPersistenceIdSpecBase : Akka.Persistence.TCK.Query.EventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
public MongoDbEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbEventsByPersistenceIdSpec", output)
protected MongoDbEventsByPersistenceIdSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbEventsByPersistenceIdSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current));
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}

Просмотреть файл

@ -18,13 +18,28 @@ using Akka.Actor;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbEventsByTagSpec : Akka.Persistence.TCK.Query.EventsByTagSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionEventsByTagSpec : MongoDbEventsByTagSpecBase
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbEventsByTagSpec : MongoDbEventsByTagSpecBase
{
public MongoDbEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbEventsByTagSpecBase : Akka.Persistence.TCK.Query.EventsByTagSpec, IClassFixture<DatabaseFixture>
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
public MongoDbEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output)
protected MongoDbEventsByTagSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentEventsByTagSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current));
@ -35,41 +50,46 @@ namespace Akka.Persistence.MongoDb.Tests
ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
protected override bool SupportsTagsInEventEnvelope => true;
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
event-adapters {
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
}
event-adapter-bindings = {
""System.String"" = color-tagger
}
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
event-adapters {
color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"
}
event-adapter-bindings = {
"System.String" = color-tagger
}
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}
@ -114,6 +134,4 @@ namespace Akka.Persistence.MongoDb.Tests
}
}
}
}

Просмотреть файл

@ -1,4 +1,5 @@
using Akka.Configuration;
#if !CICD
using Akka.Configuration;
using Akka.Persistence.TestKit.Performance;
using Akka.Util.Internal;
using System;
@ -41,3 +42,4 @@ namespace Akka.Persistence.MongoDb.Tests
}
}
}
#endif

Просмотреть файл

@ -12,31 +12,49 @@ using Akka.Configuration;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbJournalSpec : JournalSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionJournalSpec : MongoDbJournalSpecBase
{
protected override bool SupportsRejectingNonSerializableObjects { get; } = false;
public MongoDbTransactionJournalSpec(DatabaseFixture databaseFixture) : base(databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbJournalSpec : MongoDbJournalSpecBase
{
public MongoDbJournalSpec(DatabaseFixture databaseFixture) : base(databaseFixture, false)
{
}
}
public abstract class MongoDbJournalSpecBase : JournalSpec, IClassFixture<DatabaseFixture>
{
protected override bool SupportsRejectingNonSerializableObjects { get; } = false;
public MongoDbJournalSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbJournalSpec")
protected MongoDbJournalSpecBase(DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, transaction), "MongoDbJournalSpec")
{
Initialize();
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.ConnectionString}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
}
""";
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());

Просмотреть файл

@ -25,42 +25,61 @@ using MongoDB.Driver.Core.Misc;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbPersistenceIdsSpec : Akka.Persistence.TCK.Query.PersistenceIdsSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionPersistenceIdsSpec : MongoDbPersistenceIdsSpecBase
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(output, databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbPersistenceIdsSpec : MongoDbPersistenceIdsSpecBase
{
public MongoDbPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(output, databaseFixture, false)
{
}
}
public abstract class MongoDbPersistenceIdsSpecBase : Akka.Persistence.TCK.Query.PersistenceIdsSpec, IClassFixture<DatabaseFixture>
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
public MongoDbPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbPersistenceIdsSpec", output)
protected MongoDbPersistenceIdsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbPersistenceIdsSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current));
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "EventJournal"
}
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}

Просмотреть файл

@ -12,29 +12,47 @@ using Akka.Configuration;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
public class MongoDbTransactionSnapshotStoreSpec : MongoDbSnapshotStoreSpecBase
{
public MongoDbSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec")
public MongoDbTransactionSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(databaseFixture, true)
{
}
}
[Collection("MongoDbSpec")]
public class MongoDbSnapshotStoreSpec : MongoDbSnapshotStoreSpecBase
{
public MongoDbSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(databaseFixture, false)
{
}
}
public abstract class MongoDbSnapshotStoreSpecBase : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
{
protected MongoDbSnapshotStoreSpecBase(DatabaseFixture databaseFixture, bool transaction)
: base(CreateSpecConfig(databaseFixture, transaction), "MongoDbSnapshotStoreSpec")
{
Initialize();
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, bool transaction)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + @"""
auto-initialize = on
collection = ""SnapshotStore""
}
}
}";
var specString = $$"""
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "{{databaseFixture.ConnectionString}}"
use-write-transaction = {{(transaction ? "on" : "off")}}
auto-initialize = on
collection = "SnapshotStore"
}
}
}
""";
return ConfigurationFactory.ParseString(specString);
}

Просмотреть файл

@ -1,57 +0,0 @@
using Akka.Configuration;
using Akka.Persistence.MongoDb.Query;
using Akka.Persistence.Query;
using Akka.Persistence.TCK.Query;
using Akka.Util.Internal;
using Xunit.Abstractions;
using Xunit;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbTransactionAllEventsSpec : AllEventsSpec, IClassFixture<DatabaseFixture>
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
// akka.test.single-expect-default = 10s
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
use-write-transaction = on
auto-initialize = on
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
use-write-transaction = on
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
return ConfigurationFactory.ParseString(specString);
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output)
{
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
}
}

Просмотреть файл

@ -1,62 +0,0 @@
using Akka.Configuration;
using Akka.Persistence.MongoDb.Query;
using Akka.Persistence.Query;
using Akka.Persistence.TCK.Query;
using Akka.Util.Internal;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit.Abstractions;
using Xunit;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbTransactionCurrentAllEventsSpec : CurrentAllEventsSpec, IClassFixture<DatabaseFixture>
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
// akka.test.single-expect-default = 10s
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
auto-initialize = on
use-write-transaction = on
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @"""
use-write-transaction = on
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";
return ConfigurationFactory.ParseString(specString);
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);
public MongoDbTransactionCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentAllEventsSpec", output)
{
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
}
}

Просмотреть файл

@ -1,41 +0,0 @@
using Akka.Configuration;
using Akka.Persistence.TCK.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbTransactionSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
{
public MongoDbTransactionSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec")
{
Initialize();
}
private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + @"""
auto-initialize = on
use-write-transaction = on
collection = ""SnapshotStore""
}
}
}";
return ConfigurationFactory.ParseString(specString);
}
}
}

Просмотреть файл

@ -20,6 +20,7 @@ using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
#nullable enable
namespace Akka.Persistence.MongoDb.Journal
{
/// <summary>
@ -28,19 +29,14 @@ namespace Akka.Persistence.MongoDb.Journal
public class MongoDbJournal : AsyncWriteJournal
{
private static readonly BsonTimestamp ZeroTimestamp = new(0);
private static readonly ClientSessionOptions EmptySessionOptions = new();
private readonly MongoDbJournalSettings _settings;
private Lazy<IMongoDatabase> _mongoDatabase;
private Lazy<IMongoCollection<JournalEntry>> _journalCollection;
private Lazy<IMongoCollection<MetadataEntry>> _metadataCollection;
private ImmutableDictionary<string, IImmutableSet<IActorRef>> _persistenceIdSubscribers =
ImmutableDictionary.Create<string, IImmutableSet<IActorRef>>();
private ImmutableDictionary<string, IImmutableSet<IActorRef>> _tagSubscribers =
ImmutableDictionary.Create<string, IImmutableSet<IActorRef>>();
private readonly HashSet<IActorRef> _newEventsSubscriber = new();
// ReSharper disable InconsistentNaming
private IMongoDatabase? _mongoDatabase_DoNotUseDirectly;
private IMongoCollection<JournalEntry>? _journalCollection_DoNotUseDirectly;
private IMongoCollection<MetadataEntry>? _metadataCollection_DoNotUseDirectly;
// ReSharper enable InconsistentNaming
/// <summary>
/// Used to cancel all outstanding commands when the actor is stopped.
@ -64,104 +60,133 @@ namespace Akka.Persistence.MongoDb.Journal
_serialization = Context.System.Serialization;
}
protected override void PreStart()
private IMongoDatabase GetMongoDb()
{
base.PreStart();
_mongoDatabase = new Lazy<IMongoDatabase>(() =>
if (_mongoDatabase_DoNotUseDirectly is not null)
return _mongoDatabase_DoNotUseDirectly;
MongoClient client;
var setupOption = Context.System.Settings.Setup.Get<MongoDbPersistenceSetup>();
if (setupOption.HasValue && setupOption.Value.JournalConnectionSettings != null)
{
MongoClient client;
var setupOption = Context.System.Settings.Setup.Get<MongoDbPersistenceSetup>();
if (setupOption.HasValue && setupOption.Value.JournalConnectionSettings != null)
{
client = new MongoClient(setupOption.Value.JournalConnectionSettings);
return client.GetDatabase(setupOption.Value.JournalDatabaseName);
}
client = new MongoClient(setupOption.Value.JournalConnectionSettings);
_mongoDatabase_DoNotUseDirectly = client.GetDatabase(setupOption.Value.JournalDatabaseName);
return _mongoDatabase_DoNotUseDirectly;
}
//Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way:
var connectionString = new MongoUrl(_settings.ConnectionString);
var clientSettings = MongoClientSettings.FromUrl(connectionString);
clientSettings.LinqProvider = LinqProvider.V2;
client = new MongoClient(clientSettings);
return client.GetDatabase(connectionString.DatabaseName);
});
_journalCollection = new Lazy<IMongoCollection<JournalEntry>>(() =>
{
var collection = _mongoDatabase.Value.GetCollection<JournalEntry>(_settings.Collection);
//Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way:
var connectionString = new MongoUrl(_settings.ConnectionString);
var clientSettings = MongoClientSettings.FromUrl(connectionString);
clientSettings.LinqProvider = LinqProvider.V2;
client = new MongoClient(clientSettings);
_mongoDatabase_DoNotUseDirectly = client.GetDatabase(connectionString.DatabaseName);
return _mongoDatabase_DoNotUseDirectly;
}
private async Task<IMongoCollection<JournalEntry>> GetJournalCollection(CancellationToken token)
{
if (_journalCollection_DoNotUseDirectly is not null)
return _journalCollection_DoNotUseDirectly;
_journalCollection_DoNotUseDirectly = GetMongoDb().GetCollection<JournalEntry>(_settings.Collection);
if (!_settings.AutoInitialize)
return _journalCollection_DoNotUseDirectly;
var modelForEntryAndSequenceNr = new CreateIndexModel<JournalEntry>(Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.PersistenceId)
.Descending(entry => entry.SequenceNr));
if (_settings.AutoInitialize)
{
using var unitedCts = CreatePerCallCts();
{
var modelForEntryAndSequenceNr = new CreateIndexModel<JournalEntry>(Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.PersistenceId)
.Descending(entry => entry.SequenceNr));
await _journalCollection_DoNotUseDirectly.Indexes
.CreateOneAsync(modelForEntryAndSequenceNr, cancellationToken: token);
collection.Indexes
.CreateOneAsync(modelForEntryAndSequenceNr, cancellationToken: unitedCts.Token)
.Wait();
var modelWithOrdering = new CreateIndexModel<JournalEntry>(
Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.Ordering));
var modelWithOrdering = new CreateIndexModel<JournalEntry>(
Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.Ordering));
await _journalCollection_DoNotUseDirectly.Indexes
.CreateOneAsync(modelWithOrdering, cancellationToken: token);
collection.Indexes
.CreateOneAsync(modelWithOrdering, cancellationToken: unitedCts.Token)
.Wait();
await _journalCollection_DoNotUseDirectly.Indexes
.CreateOneAsync(modelWithOrdering, cancellationToken: token);
collection.Indexes
.CreateOne(modelWithOrdering);
var tagsWithOrdering = new CreateIndexModel<JournalEntry>(
Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.Tags)
.Ascending(entry => entry.Ordering));
var tagsWithOrdering = new CreateIndexModel<JournalEntry>(
Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.Tags)
.Ascending(entry => entry.Ordering));
await _journalCollection_DoNotUseDirectly.Indexes
.CreateOneAsync(tagsWithOrdering, cancellationToken:token);
collection.Indexes
.CreateOneAsync(tagsWithOrdering, cancellationToken:unitedCts.Token)
.Wait();
}
}
return collection;
});
_metadataCollection = new Lazy<IMongoCollection<MetadataEntry>>(() =>
{
using var unitedCts = CreatePerCallCts();
{
var collection = _mongoDatabase.Value.GetCollection<MetadataEntry>(_settings.MetadataCollection);
if (_settings.AutoInitialize)
{
var modelWithAscendingPersistenceId = new CreateIndexModel<MetadataEntry>(
Builders<MetadataEntry>
.IndexKeys
.Ascending(entry => entry.PersistenceId));
collection.Indexes
.CreateOneAsync(modelWithAscendingPersistenceId, cancellationToken: unitedCts.Token)
.Wait();
}
return collection;
}
});
return _journalCollection_DoNotUseDirectly;
}
private async Task<IMongoCollection<MetadataEntry>> GetMetadataCollection(CancellationToken token)
{
if (_metadataCollection_DoNotUseDirectly is not null)
return _metadataCollection_DoNotUseDirectly;
_metadataCollection_DoNotUseDirectly = GetMongoDb()
.GetCollection<MetadataEntry>(_settings.MetadataCollection);
if (!_settings.AutoInitialize)
return _metadataCollection_DoNotUseDirectly;
var modelWithAscendingPersistenceId = new CreateIndexModel<MetadataEntry>(
Builders<MetadataEntry>
.IndexKeys
.Ascending(entry => entry.PersistenceId));
await _metadataCollection_DoNotUseDirectly.Indexes
.CreateOneAsync(modelWithAscendingPersistenceId, cancellationToken: token);
return _metadataCollection_DoNotUseDirectly;
}
private CancellationTokenSource CreatePerCallCts()
{
var unitedCts =
CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token);
var unitedCts = CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token);
unitedCts.CancelAfter(_settings.CallTimeout);
return unitedCts;
}
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr,
long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
private async Task MaybeWithTransaction(Func<IClientSessionHandle?, CancellationToken, Task> act, CancellationToken token)
{
if (!_settings.Transaction)
{
await act(null, token);
return;
}
using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token);
await session.WithTransactionAsync(
async (s, ct) =>
{
await act(s, ct);
return Task.FromResult(NotUsed.Instance);
}, cancellationToken:token);
}
private async Task<T> MaybeWithTransaction<T>(Func<IClientSessionHandle?, CancellationToken, Task<T>> act, CancellationToken token)
{
if (!_settings.Transaction)
return await act(null, token);
using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token);
return await session.WithTransactionAsync(act, cancellationToken:token);
}
public override async Task ReplayMessagesAsync(
IActorContext context,
string persistenceId,
long fromSequenceNr,
long toSequenceNr,
long max,
Action<IPersistentRepresentation> recoveryCallback)
{
// Limit allows only integer
var limitValue = max >= int.MaxValue ? int.MaxValue : (int)max;
@ -169,26 +194,18 @@ namespace Akka.Persistence.MongoDb.Journal
// Do not replay messages if limit equal zero
if (limitValue == 0)
return;
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
if (fromSequenceNr > 0)
filter &= builder.Gte(x => x.SequenceNr, fromSequenceNr);
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.SequenceNr);
using var unitedCts = CreatePerCallCts();
{
var collections = await _journalCollection.Value
.Find(filter)
.Sort(sort)
.Limit(limitValue)
.ToListAsync(unitedCts.Token);
var journalCollection = await GetJournalCollection(unitedCts.Token);
collections.ForEach(doc => { recoveryCallback(ToPersistenceRepresentation(doc, context.Sender)); });
}
await MaybeWithTransaction(async (session, ct) =>
{
var collections = await journalCollection
.ReplayMessagesQuery(session, persistenceId, fromSequenceNr, toSequenceNr, limitValue)
.ToListAsync(ct);
collections.ForEach(doc => recoveryCallback(ToPersistenceRepresentation(doc, context.Sender)));
}, unitedCts.Token);
}
/// <summary>
@ -198,63 +215,43 @@ namespace Akka.Persistence.MongoDb.Journal
/// <returns>TBD</returns>
private async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
{
/*
* NOTE: limit is used like a pagination value, not a cap on the amount
* of data returned by a query. This was at the root of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
*/
// Limit allows only integer;
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;
var fromSequenceNr = replay.FromOffset;
var toSequenceNr = replay.ToOffset;
var tag = replay.Tag;
var builder = Builders<JournalEntry>.Filter;
var seqNoFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));
// Need to know what the highest seqNo of this query will be
// and return that as part of the RecoverySuccess message
using var unitedCts = CreatePerCallCts();
var journalCollection = await GetJournalCollection(unitedCts.Token);
return await MaybeWithTransaction(async (s, ct) =>
{
var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter)
.SortByDescending(x => x.Ordering)
.Limit(1)
.SingleOrDefaultAsync(unitedCts.Token);
/*
* NOTE: limit is used like a pagination value, not a cap on the amount
* of data returned by a query. This was at the root of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
*/
// Limit allows only integer;
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;
var fromSequenceNr = replay.FromOffset;
var toSequenceNr = replay.ToOffset;
var tag = replay.Tag;
// Need to know what the highest seqNo of this query will be
// and return that as part of the RecoverySuccess message
var maxSeqNoEntry = await journalCollection.MaxOrderingIdQuery(s, fromSequenceNr, toSequenceNr, tag)
.SingleOrDefaultAsync(ct);
if (maxSeqNoEntry == null)
return 0L; // recovered nothing
var maxOrderingId = maxSeqNoEntry.Ordering.Value;
var toSeqNo = Math.Min(toSequenceNr, maxOrderingId);
var maxOrderingId = maxSeqNoEntry.Value;
var readFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo));
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);
await _journalCollection.Value
.Find(readFilter)
.Sort(sort)
.Limit(limitValue)
await journalCollection.MessagesQuery(s, fromSequenceNr, toSequenceNr, maxOrderingId, tag, limitValue)
.ForEachAsync(entry =>
{
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value),
ActorRefs.NoSender);
}, unitedCts.Token);
}, ct);
return maxOrderingId;
}
}, unitedCts.Token);
}
/// <summary>
@ -265,172 +262,109 @@ namespace Akka.Persistence.MongoDb.Journal
/// <returns>long</returns>
public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
{
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
using var unitedCts = CreatePerCallCts();
{
var metadataHighestSequenceNrTask = _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr)
.FirstOrDefaultAsync(unitedCts.Token);
var token = unitedCts.Token;
var journalHighestSequenceNrTask = _journalCollection.Value.Find(Builders<JournalEntry>
.Filter.Eq(x => x.PersistenceId, persistenceId))
.SortByDescending(x => x.SequenceNr)
.Project(x => x.SequenceNr)
.FirstOrDefaultAsync(unitedCts.Token);
// journal data is usually good enough, except in cases when it's been deleted.
await Task.WhenAll(metadataHighestSequenceNrTask, journalHighestSequenceNrTask);
return Math.Max(journalHighestSequenceNrTask.Result, metadataHighestSequenceNrTask.Result);
}
return await MaybeWithTransaction(
async (s, ct) => await ReadHighestSequenceNrOperation(s, persistenceId, ct),
token);
}
protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
/// <summary>
/// NOTE: This method is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
private async Task<long> ReadHighestSequenceNrOperation(
IClientSessionHandle? session,
string persistenceId,
CancellationToken token)
{
var allTags = ImmutableHashSet<string>.Empty;
var persistentIds = new HashSet<string>();
var messageList = messages.ToList();
var journalCollection = await GetJournalCollection(token);
var metadataCollection = await GetMetadataCollection(token);
var metadataHighestSequenceNr = await metadataCollection
.MaxSequenceNrQuery(session, persistenceId)
.FirstOrDefaultAsync(token);
var writeTasks = messageList.Select(async message =>
var journalHighestSequenceNr = await journalCollection
.MaxSequenceNrQuery(session, persistenceId)
.FirstOrDefaultAsync(token);
// journal data is usually good enough, except in cases when it's been deleted.
return Math.Max(journalHighestSequenceNr, metadataHighestSequenceNr);
}
protected override async Task<IImmutableList<Exception?>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
using var unitedCts = CreatePerCallCts();
var journalCollection = await GetJournalCollection(unitedCts.Token);
var writeTasks = messages.Select(async message =>
{
var persistentMessages = ((IImmutableList<IPersistentRepresentation>)message.Payload);
if (HasTagSubscribers)
{
foreach (var p in persistentMessages)
{
if (p.Payload is Tagged t)
{
allTags = allTags.Union(t.Tags);
}
}
}
var persistentMessages = (IImmutableList<IPersistentRepresentation>)message.Payload;
var journalEntries = persistentMessages.Select(ToJournalEntry);
await InsertEntries(journalEntries);
await InsertEntries(journalCollection, journalEntries, unitedCts.Token);
}).ToArray();
if (HasPersistenceIdSubscribers)
persistentIds.Add(message.PersistenceId);
});
var result = await Task<IImmutableList<Exception>>
var result = await Task<IImmutableList<Exception?>>
.Factory
.ContinueWhenAll(writeTasks.ToArray(),
tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList());
if (HasPersistenceIdSubscribers)
{
foreach (var id in persistentIds)
{
NotifyPersistenceIdChange(id);
}
}
if (HasTagSubscribers && allTags.Count != 0)
{
foreach (var tag in allTags)
{
NotifyTagChange(tag);
}
}
if (HasNewEventSubscribers)
NotifyNewEventAppended();
.ContinueWhenAll(
tasks: writeTasks.ToArray(),
continuationFunction: tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList(),
cancellationToken: unitedCts.Token);
return result;
}
private async ValueTask InsertEntries(IEnumerable<JournalEntry> entries)
private async ValueTask InsertEntries(IMongoCollection<JournalEntry> collection, IEnumerable<JournalEntry> entries, CancellationToken token)
{
using var unitedCts = CreatePerCallCts();
await MaybeWithTransaction(async (session, ct) =>
{
if (_settings.Transaction)
{
var sessionOptions = new ClientSessionOptions { };
using var session =
await _journalCollection.Value.Database.Client.StartSessionAsync(
sessionOptions, unitedCts.Token);
// Begin transaction
session.StartTransaction();
try
{
//https://www.mongodb.com/community/forums/t/insertone-vs-insertmany-is-one-preferred-over-the-other/135982/2
//https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#runtime-limit
//https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#oplog-size-limit
//https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limits-and-thresholds
//16MB: if is bigger than this that means you do it one by one. LET'S TALK ABOUT THIS
await _journalCollection.Value.InsertManyAsync(session, entries,
new InsertManyOptions { IsOrdered = true }, cancellationToken: unitedCts.Token);
}
catch (Exception ex)
{
using var cancelCts = CreatePerCallCts();
await session.AbortTransactionAsync(cancelCts.Token);
throw;
}
//All good , lets commit the transaction
await session.CommitTransactionAsync(unitedCts.Token);
}
//https://www.mongodb.com/community/forums/t/insertone-vs-insertmany-is-one-preferred-over-the-other/135982/2
//https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#runtime-limit
//https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#oplog-size-limit
//https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limits-and-thresholds
//16MB: if is bigger than this that means you do it one by one. LET'S TALK ABOUT THIS
if(session is not null)
await collection
.InsertManyAsync(session, entries, new InsertManyOptions { IsOrdered = true }, cancellationToken: ct);
else
await _journalCollection.Value.InsertManyAsync(entries, new InsertManyOptions { IsOrdered = true },
unitedCts.Token);
}
}
private void NotifyNewEventAppended()
{
if (HasNewEventSubscribers)
{
foreach (var subscriber in _newEventsSubscriber)
{
subscriber.Tell(NewEventAppended.Instance);
}
}
await collection
.InsertManyAsync(entries, new InsertManyOptions { IsOrdered = true }, cancellationToken: ct);
}, token);
}
protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
// read highest sequence number before we start
var highestSeqNo = await ReadHighestSequenceNrAsync(persistenceId, 0L);
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);
// only update the sequence number of the top of the journal
// is about to be deleted.
if (highestSeqNo <= toSequenceNr)
{
await SetHighSequenceId(persistenceId, highestSeqNo);
}
using var unitedCts = CreatePerCallCts();
if (_settings.Transaction)
{
var sessionOptions = new ClientSessionOptions { };
using var session =
await _journalCollection.Value.Database.Client.StartSessionAsync(
sessionOptions, unitedCts.Token);
// Begin transaction
session.StartTransaction();
try
{
await _journalCollection.Value.DeleteManyAsync(session, filter, cancellationToken: unitedCts.Token);
}
catch (Exception ex)
{
using var cancelCts = CreatePerCallCts();
await session.AbortTransactionAsync(cancelCts.Token);
throw ex;
}
var journalCollection = await GetJournalCollection(unitedCts.Token);
var metadataCollection = await GetMetadataCollection(unitedCts.Token);
await session.CommitTransactionAsync(unitedCts.Token);
}
else
await _journalCollection.Value.DeleteManyAsync(filter, unitedCts.Token);
await MaybeWithTransaction(async (session, token) =>
{
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
// read highest sequence number before we start
var highestSeqNo = await ReadHighestSequenceNrOperation(session, persistenceId, token);
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);
// only update the sequence number of the top of the journal
// is about to be deleted.
if (highestSeqNo <= toSequenceNr)
{
await metadataCollection.SetHighSequenceIdQuery(session, persistenceId, highestSeqNo, token);
}
if(session is not null)
await journalCollection.DeleteManyAsync(session, filter, cancellationToken: token);
else
await journalCollection.DeleteManyAsync(filter, cancellationToken: token);
}, unitedCts.Token);
}
private JournalEntry ToJournalEntry(IPersistentRepresentation message)
@ -482,7 +416,7 @@ namespace Akka.Persistence.MongoDb.Journal
};
}
private static long ToTicks(BsonTimestamp bson)
private static long ToTicks(BsonTimestamp? bson)
{
// BSON Timestamps are stored natively as Unix epoch seconds + an ordinal value
@ -492,7 +426,7 @@ namespace Akka.Persistence.MongoDb.Journal
// which is used entirely for end-user purposes.
//
// See https://docs.mongodb.com/manual/reference/bson-types/#timestamps
bson = bson ?? ZeroTimestamp;
bson ??= ZeroTimestamp;
return DateTimeOffset.FromUnixTimeSeconds(bson.Timestamp).Ticks;
}
@ -531,7 +465,7 @@ namespace Akka.Persistence.MongoDb.Journal
}
int? serializerId = null;
Type type = null;
Type? type = null;
// legacy serialization
if (!entry.SerializerId.HasValue && !string.IsNullOrEmpty(entry.Manifest))
@ -541,7 +475,7 @@ namespace Akka.Persistence.MongoDb.Journal
if (entry.Payload is byte[] bytes)
{
object deserialized = null;
object? deserialized;
if (serializerId.HasValue)
{
deserialized = _serialization.Deserialize(bytes, serializerId.Value, entry.Manifest);
@ -558,53 +492,10 @@ namespace Akka.Persistence.MongoDb.Journal
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
}
else // backwards compat for object serialization - Payload was already deserialized by BSON
{
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
}
}
private async Task SetHighSequenceId(string persistenceId, long maxSeqNo)
{
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
var metadataEntry = new MetadataEntry
{
Id = persistenceId,
PersistenceId = persistenceId,
SequenceNr = maxSeqNo
};
using var unitedCts = CreatePerCallCts();
{
if (_settings.Transaction)
{
var sessionOptions = new ClientSessionOptions { };
using var session =
await _journalCollection.Value.Database.Client.StartSessionAsync(
sessionOptions, unitedCts.Token);
// Begin transaction
session.StartTransaction();
try
{
await _metadataCollection.Value.ReplaceOneAsync(session, filter, metadataEntry,
new ReplaceOptions() { IsUpsert = true }, unitedCts.Token);
}
catch (Exception ex)
{
using var cancellationCts = CreatePerCallCts();
await session.AbortTransactionAsync(cancellationCts.Token);
throw;
}
await session.CommitTransactionAsync(unitedCts.Token);
}
else
await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry,
new ReplaceOptions() { IsUpsert = true }, unitedCts.Token);
}
// backwards compat for object serialization - Payload was already deserialized by BSON
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
}
protected override bool ReceivePluginInternal(object message)
@ -621,84 +512,53 @@ namespace Akka.Persistence.MongoDb.Journal
.PipeTo(replay.ReplyTo, success: h => new EventReplaySuccess(h),
failure: e => new EventReplayFailure(e));
return true;
case SubscribePersistenceId subscribe:
AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId);
Context.Watch(Sender);
return true;
case SelectCurrentPersistenceIds request:
SelectAllPersistenceIdsAsync(request.Offset)
.PipeTo(request.ReplyTo,
success: result => new CurrentPersistenceIds(result.Ids, request.Offset));
return true;
case SubscribeTag subscribe:
AddTagSubscriber(Sender, subscribe.Tag);
Context.Watch(Sender);
return true;
case SubscribeNewEvents _:
AddNewEventsSubscriber(Sender);
Context.Watch(Sender);
return true;
case Terminated terminated:
RemoveSubscriber(terminated.ActorRef);
return true;
default:
return false;
}
}
private void AddNewEventsSubscriber(IActorRef subscriber)
{
_newEventsSubscriber.Add(subscriber);
}
protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(
long offset)
{
var ids = await GetAllPersistenceIds(offset);
var lastOrdering = await GetHighestOrdering();
return (ids, lastOrdering);
using var unitedCts = CreatePerCallCts();
var journalCollection = await GetJournalCollection(unitedCts.Token);
return await MaybeWithTransaction(async (session, token) =>
{
var ids = await journalCollection.AllPersistenceIdsQuery(session, offset, token);
var lastOrdering = await journalCollection.HighestOrderingQuery(session, token);
return (ids, lastOrdering);
}, unitedCts.Token);
}
protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
{
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;
var fromSequenceNr = replay.FromOffset;
var toSequenceNr = replay.ToOffset;
var builder = Builders<JournalEntry>.Filter;
var seqNoFilter = builder.Empty;
if (fromSequenceNr > 0)
seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));
// Need to know what the highest seqNo of this query will be
// and return that as part of the RecoverySuccess message
using var unitedCts = CreatePerCallCts();
var journalCollection = await GetJournalCollection(unitedCts.Token);
return await MaybeWithTransaction(async (session, token) =>
{
var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter)
.SortByDescending(x => x.Ordering)
.Limit(1)
.SingleOrDefaultAsync(unitedCts.Token);
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;
var fromSequenceNr = replay.FromOffset;
var toSequenceNr = replay.ToOffset;
// Need to know what the highest seqNo of this query will be
// and return that as part of the RecoverySuccess message
var maxSeqNoEntry = await journalCollection.MaxOrderingIdQuery(session, fromSequenceNr, toSequenceNr, null)
.SingleOrDefaultAsync(token);
if (maxSeqNoEntry == null)
return 0L; // recovered nothing
var maxOrderingId = maxSeqNoEntry.Value;
var maxOrderingId = maxSeqNoEntry.Ordering.Value;
var toSeqNo = Math.Min(toSequenceNr, maxOrderingId);
var readFilter = builder.Empty;
if (fromSequenceNr > 0)
readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo));
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);
await _journalCollection.Value.Find(readFilter)
.Sort(sort)
.Limit(limitValue)
await journalCollection.MessagesQuery(session, fromSequenceNr, toSequenceNr, maxOrderingId, null, limitValue)
.ForEachAsync(entry =>
{
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
@ -706,121 +566,17 @@ namespace Akka.Persistence.MongoDb.Journal
{
replay.ReplyTo.Tell(new ReplayedEvent(adapted, entry.Ordering.Value), ActorRefs.NoSender);
}
}, unitedCts.Token);
}, token);
return maxOrderingId;
}
}
private void AddTagSubscriber(IActorRef subscriber, string tag)
{
if (!_tagSubscribers.TryGetValue(tag, out var subscriptions))
{
_tagSubscribers = _tagSubscribers.Add(tag, ImmutableHashSet.Create(subscriber));
}
else
{
_tagSubscribers = _tagSubscribers.SetItem(tag, subscriptions.Add(subscriber));
}
}
private async Task<IEnumerable<string>> GetAllPersistenceIds(long offset)
{
using var unitedCts = CreatePerCallCts();
{
var ids = await _journalCollection.Value
.DistinctAsync(x => x.PersistenceId, entry => entry.Ordering > new BsonTimestamp(offset),
cancellationToken: unitedCts.Token);
var hashset = new List<string>();
while (await ids.MoveNextAsync(unitedCts.Token))
{
hashset.AddRange(ids.Current);
}
return hashset;
}
}
private async Task<long> GetHighestOrdering()
{
using var unitedCts = CreatePerCallCts();
{
var max = await _journalCollection.Value.AsQueryable()
.Select(je => je.Ordering)
.MaxAsync(unitedCts.Token);
return max.Value;
}
}
private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId)
{
if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions))
{
_persistenceIdSubscribers =
_persistenceIdSubscribers.Add(persistenceId, ImmutableHashSet.Create(subscriber));
}
else
{
_persistenceIdSubscribers =
_persistenceIdSubscribers.SetItem(persistenceId, subscriptions.Add(subscriber));
}
}
private void RemoveSubscriber(IActorRef subscriber)
{
_persistenceIdSubscribers = _persistenceIdSubscribers.SetItems(_persistenceIdSubscribers
.Where(kv => kv.Value.Contains(subscriber))
.Select(kv => new KeyValuePair<string, IImmutableSet<IActorRef>>(kv.Key, kv.Value.Remove(subscriber))));
_tagSubscribers = _tagSubscribers.SetItems(_tagSubscribers
.Where(kv => kv.Value.Contains(subscriber))
.Select(kv => new KeyValuePair<string, IImmutableSet<IActorRef>>(kv.Key, kv.Value.Remove(subscriber))));
_newEventsSubscriber.Remove(subscriber);
}
/// <summary>
/// TBD
/// </summary>
protected bool HasTagSubscribers => _tagSubscribers.Count != 0;
/// <summary>
/// TBD
/// </summary>
protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0;
/// <summary>
/// TBD
/// </summary>
protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0;
private void NotifyPersistenceIdChange(string persistenceId)
{
if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscribers))
{
var changed = new EventAppended(persistenceId);
foreach (var subscriber in subscribers)
subscriber.Tell(changed);
}
}
private void NotifyTagChange(string tag)
{
if (_tagSubscribers.TryGetValue(tag, out var subscribers))
{
var changed = new TaggedEventAppended(tag);
foreach (var subscriber in subscribers)
subscriber.Tell(changed);
}
}, unitedCts.Token);
}
protected override void PostStop()
{
// cancel any / all pending operations
_pendingCommandsCancellation.Cancel();
_pendingCommandsCancellation.Dispose();
base.PostStop();
}
}

Просмотреть файл

@ -0,0 +1,225 @@
// -----------------------------------------------------------------------
// <copyright file="MongoDbJournalQueries.cs" company="Petabridge, LLC">
// Copyright (C) 2015-2023 .NET Petabridge, LLC
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
#nullable enable
namespace Akka.Persistence.MongoDb.Journal;
internal static class MongoDbJournalQueries
{
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static IFindFluent<JournalEntry, JournalEntry> ReplayMessagesQuery(
this IMongoCollection<JournalEntry> collection,
IClientSessionHandle? session,
string persistenceId,
long fromSequenceNr,
long toSequenceNr,
int limit)
{
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
if (fromSequenceNr > 0)
filter &= builder.Gte(x => x.SequenceNr, fromSequenceNr);
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.SequenceNr);
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
.Sort(sort)
.Limit(limit);
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static IFindFluent<JournalEntry, BsonTimestamp> MaxOrderingIdQuery(
this IMongoCollection<JournalEntry> collection,
IClientSessionHandle? session,
long fromSequenceNr,
long toSequenceNr,
string? tag)
{
var builder = Builders<JournalEntry>.Filter;
var filter = FilterDefinition<JournalEntry>.Empty;
if(!string.IsNullOrWhiteSpace(tag))
filter &= builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
.SortByDescending(x => x.Ordering)
.Limit(1)
.Project(e => e.Ordering);
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static IFindFluent<JournalEntry, JournalEntry> MessagesQuery(
this IMongoCollection<JournalEntry> collection,
IClientSessionHandle? session,
long fromSequenceNr,
long toSequenceNr,
long maxOrderingId,
string? tag,
int limit
)
{
var builder = Builders<JournalEntry>.Filter;
var toSeqNo = Math.Min(toSequenceNr, maxOrderingId);
var filter = FilterDefinition<JournalEntry>.Empty;
if(!string.IsNullOrWhiteSpace(tag))
filter &= builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo));
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
.Sort(sort)
.Limit(limit);
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static IFindFluent<MetadataEntry, long> MaxSequenceNrQuery(
this IMongoCollection<MetadataEntry> collection,
IClientSessionHandle? session,
string persistenceId)
{
var filter = Builders<MetadataEntry>.Filter.Eq(x => x.PersistenceId, persistenceId);
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
.Project(x => x.SequenceNr);
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static IFindFluent<JournalEntry, long> MaxSequenceNrQuery(
this IMongoCollection<JournalEntry> collection,
IClientSessionHandle? session,
string persistenceId)
{
var filter = Builders<JournalEntry>.Filter.Eq(x => x.PersistenceId, persistenceId);
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
.SortByDescending(x => x.SequenceNr)
.Project(x => x.SequenceNr);
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static async Task<IEnumerable<string>> AllPersistenceIdsQuery(this IMongoCollection<JournalEntry> collection, IClientSessionHandle? session, long offset, CancellationToken token)
{
IAsyncCursor<string> idCursor;
if (session is not null)
{
idCursor = await collection
.DistinctAsync(
session: session,
field: x => x.PersistenceId,
filter: entry => entry.Ordering > new BsonTimestamp(offset),
cancellationToken: token);
}
else
{
idCursor = await collection
.DistinctAsync(
field: x => x.PersistenceId,
filter: entry => entry.Ordering > new BsonTimestamp(offset),
cancellationToken: token);
}
var hashset = new List<string>();
while (await idCursor.MoveNextAsync(token))
{
hashset.AddRange(idCursor.Current);
}
return hashset;
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static async Task<long> HighestOrderingQuery(this IMongoCollection<JournalEntry> collection, IClientSessionHandle? session, CancellationToken token)
{
var max = await (session is not null ? collection.AsQueryable(session) : collection.AsQueryable())
.Select(je => je.Ordering)
.MaxAsync(token);
return max.Value;
}
/// <summary>
/// NOTE: This query is meant to be a part of a persistence operation
/// The session parameter signals if this query is being called as part of a transaction block or not
/// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK
/// </summary>
public static async Task SetHighSequenceIdQuery(this IMongoCollection<MetadataEntry> collection, IClientSessionHandle? session, string persistenceId, long maxSeqNo, CancellationToken token)
{
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
var metadataEntry = new MetadataEntry
{
Id = persistenceId,
PersistenceId = persistenceId,
SequenceNr = maxSeqNo
};
if (session is not null)
{
await collection.ReplaceOneAsync(
session: session,
filter: filter,
replacement: metadataEntry,
options: new ReplaceOptions { IsUpsert = true },
cancellationToken: token);
}
else
{
await collection.ReplaceOneAsync(
filter: filter,
replacement: metadataEntry,
options: new ReplaceOptions { IsUpsert = true },
cancellationToken: token);
}
}
}

Просмотреть файл

@ -78,7 +78,6 @@ namespace Akka.Persistence.MongoDb.Query
switch (message)
{
case AllEventsPublisher.Continue _:
case NewEventAppended _:
if (IsTimeForReplay) Replay();
return true;
case Request _:
@ -114,7 +113,8 @@ namespace Akka.Persistence.MongoDb.Query
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));
@event: replayed.Persistent.Payload,
tags: Array.Empty<string>()));
CurrentOffset = replayed.Offset;
Buffer.DeliverBuffer(TotalDemand);
@ -135,7 +135,6 @@ namespace Akka.Persistence.MongoDb.Query
Context.Stop(Self);
return true;
case AllEventsPublisher.Continue _:
case NewEventAppended _:
return true;
default:
return false;
@ -163,7 +162,6 @@ namespace Akka.Persistence.MongoDb.Query
protected override void ReceiveInitialRequest()
{
JournalRef.Tell(SubscribeNewEvents.Instance);
Replay();
}

Просмотреть файл

@ -95,7 +95,6 @@ namespace Akka.Persistence.MongoDb.Query
switch (message)
{
case EventsByPersistenceIdPublisher.Continue _:
case EventAppended _:
if (IsTimeForReplay) Replay();
break;
case Request _:
@ -132,7 +131,8 @@ namespace Akka.Persistence.MongoDb.Query
persistenceId: PersistenceId,
sequenceNr: seqNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));
@event: replayed.Persistent.Payload,
tags: Array.Empty<string>()));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
break;
@ -149,7 +149,6 @@ namespace Akka.Persistence.MongoDb.Query
Buffer.DeliverBuffer(TotalDemand);
break;
case EventsByPersistenceIdPublisher.Continue _:
case EventAppended _:
// Skip during replay
break;
case Cancel _:
@ -182,7 +181,6 @@ namespace Akka.Persistence.MongoDb.Query
protected override void ReceiveInitialRequest()
{
JournalRef.Tell(new SubscribePersistenceId(PersistenceId));
Replay();
}

Просмотреть файл

@ -90,7 +90,6 @@ namespace Akka.Persistence.MongoDb.Query
switch (message)
{
case EventsByTagPublisher.Continue _:
case TaggedEventAppended _:
if (IsTimeForReplay) Replay();
break;
case Request _:
@ -126,7 +125,8 @@ namespace Akka.Persistence.MongoDb.Query
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));
@event: replayed.Persistent.Payload,
tags: new [] { replayed.Tag }));
CurrentOffset = replayed.Offset;
Buffer.DeliverBuffer(TotalDemand);
@ -144,7 +144,6 @@ namespace Akka.Persistence.MongoDb.Query
Buffer.DeliverBuffer(TotalDemand);
break;
case EventsByTagPublisher.Continue _:
case TaggedEventAppended _:
// ignore
break;
case Cancel _:
@ -179,7 +178,6 @@ namespace Akka.Persistence.MongoDb.Query
protected override void ReceiveInitialRequest()
{
JournalRef.Tell(new SubscribeTag(Tag));
Replay();
}

Просмотреть файл

@ -14,9 +14,7 @@ using System.Collections.Immutable;
namespace Akka.Persistence.MongoDb.Query
{
/// <summary>
/// TBD
/// </summary>
[Obsolete("Query is not implemented.")]
public interface ISubscriptionCommand { }
/// <summary>
@ -25,6 +23,7 @@ namespace Akka.Persistence.MongoDb.Query
/// the subscriber when <see cref="AsyncWriteJournal.WriteMessagesAsync"/> has been called.
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class SubscribePersistenceId : ISubscriptionCommand
{
/// <summary>
@ -46,6 +45,7 @@ namespace Akka.Persistence.MongoDb.Query
/// TBD
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class EventAppended : IDeadLetterSuppression
{
/// <summary>
@ -107,6 +107,7 @@ namespace Akka.Persistence.MongoDb.Query
/// the subscriber when `asyncWriteMessages` has been called.
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class SubscribeNewEvents : ISubscriptionCommand
{
public static SubscribeNewEvents Instance = new SubscribeNewEvents();
@ -115,6 +116,7 @@ namespace Akka.Persistence.MongoDb.Query
}
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class NewEventAppended : IDeadLetterSuppression
{
public static NewEventAppended Instance = new NewEventAppended();
@ -130,6 +132,7 @@ namespace Akka.Persistence.MongoDb.Query
/// via an <see cref="IEventAdapter"/>.
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class SubscribeTag : ISubscriptionCommand
{
/// <summary>
@ -151,6 +154,7 @@ namespace Akka.Persistence.MongoDb.Query
/// TBD
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class TaggedEventAppended : IDeadLetterSuppression
{
/// <summary>

Просмотреть файл

@ -14,6 +14,7 @@ using Akka.Util;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
#nullable enable
namespace Akka.Persistence.MongoDb.Snapshot
{
/// <summary>
@ -21,8 +22,13 @@ namespace Akka.Persistence.MongoDb.Snapshot
/// </summary>
public class MongoDbSnapshotStore : SnapshotStore
{
private static readonly ClientSessionOptions EmptySessionOptions = new();
private readonly MongoDbSnapshotSettings _settings;
private Lazy<IMongoCollection<SnapshotEntry>> _snapshotCollection;
// ReSharper disable InconsistentNaming
private IMongoDatabase? _mongoDatabase_DoNotUseDirectly;
private IMongoCollection<SnapshotEntry>? _snapshotCollection_DoNotUseDirectly;
// ReSharper enable InconsistentNaming
/// <summary>
/// Used to cancel all outstanding commands when the actor is stopped.
@ -53,136 +59,162 @@ namespace Akka.Persistence.MongoDb.Snapshot
return unitedCts;
}
protected override void PreStart()
private async Task MaybeWithTransaction(Func<IClientSessionHandle?, CancellationToken, Task> act, CancellationToken token)
{
base.PreStart();
_snapshotCollection = new Lazy<IMongoCollection<SnapshotEntry>>(() =>
if (!_settings.Transaction)
{
MongoClient client;
IMongoDatabase snapshot;
var setupOption = Context.System.Settings.Setup.Get<MongoDbPersistenceSetup>();
if (!setupOption.HasValue || setupOption.Value.SnapshotConnectionSettings == null)
await act(null, token);
return;
}
using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token);
await session.WithTransactionAsync(
async (s, ct) =>
{
//Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way:
var connectionString = new MongoUrl(_settings.ConnectionString);
var clientSettings = MongoClientSettings.FromUrl(connectionString);
clientSettings.LinqProvider = LinqProvider.V2;
client = new MongoClient(clientSettings);
snapshot = client.GetDatabase(connectionString.DatabaseName);
}
else
{
client = new MongoClient(setupOption.Value.SnapshotConnectionSettings);
snapshot = client.GetDatabase(setupOption.Value.SnapshotDatabaseName);
}
await act(s, ct);
return Task.FromResult(NotUsed.Instance);
}, cancellationToken:token);
}
private async Task<T> MaybeWithTransaction<T>(Func<IClientSessionHandle?, CancellationToken, Task<T>> act, CancellationToken token)
{
if (!_settings.Transaction)
return await act(null, token);
using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token);
return await session.WithTransactionAsync(act, cancellationToken:token);
}
private IMongoDatabase GetMongoDb()
{
if (_mongoDatabase_DoNotUseDirectly is not null)
return _mongoDatabase_DoNotUseDirectly;
MongoClient client;
var setupOption = Context.System.Settings.Setup.Get<MongoDbPersistenceSetup>();
if (!setupOption.HasValue || setupOption.Value.SnapshotConnectionSettings == null)
{
//Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way:
var connectionString = new MongoUrl(_settings.ConnectionString);
var clientSettings = MongoClientSettings.FromUrl(connectionString);
clientSettings.LinqProvider = LinqProvider.V2;
client = new MongoClient(clientSettings);
_mongoDatabase_DoNotUseDirectly = client.GetDatabase(connectionString.DatabaseName);
return _mongoDatabase_DoNotUseDirectly;
}
var collection = snapshot.GetCollection<SnapshotEntry>(_settings.Collection);
if (_settings.AutoInitialize)
{
using var unitedCts = CreatePerCallCts();
{
var modelWithAscendingPersistenceIdAndDescendingSequenceNr =
new CreateIndexModel<SnapshotEntry>(Builders<SnapshotEntry>.IndexKeys
.Ascending(entry => entry.PersistenceId)
.Descending(entry => entry.SequenceNr));
collection.Indexes
.CreateOneAsync(modelWithAscendingPersistenceIdAndDescendingSequenceNr,
cancellationToken: unitedCts.Token)
.Wait();
}
}
return collection;
});
client = new MongoClient(setupOption.Value.SnapshotConnectionSettings);
_mongoDatabase_DoNotUseDirectly = client.GetDatabase(setupOption.Value.SnapshotDatabaseName);
return _mongoDatabase_DoNotUseDirectly;
}
private async Task<IMongoCollection<SnapshotEntry>> GetSnapshotCollection(CancellationToken token)
{
_snapshotCollection_DoNotUseDirectly = GetMongoDb().GetCollection<SnapshotEntry>(_settings.Collection);
if (!_settings.AutoInitialize)
return _snapshotCollection_DoNotUseDirectly;
var modelWithAscendingPersistenceIdAndDescendingSequenceNr =
new CreateIndexModel<SnapshotEntry>(Builders<SnapshotEntry>.IndexKeys
.Ascending(entry => entry.PersistenceId)
.Descending(entry => entry.SequenceNr));
await _snapshotCollection_DoNotUseDirectly.Indexes
.CreateOneAsync(modelWithAscendingPersistenceIdAndDescendingSequenceNr,
cancellationToken: token);
return _snapshotCollection_DoNotUseDirectly;
}
protected override void PostStop()
{
// cancel any pending database commands during shutdown
_pendingCommandsCancellation.Cancel();
_pendingCommandsCancellation.Dispose();
base.PostStop();
}
protected override Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
var filter = CreateRangeFilter(persistenceId, criteria);
using var unitedCts = CreatePerCallCts();
{
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);
return
_snapshotCollection.Value
.Find(filter)
.SortByDescending(x => x.SequenceNr)
.Limit(1)
.Project(x => ToSelectedSnapshot(x))
.FirstOrDefaultAsync(unitedCts.Token);
}
return await MaybeWithTransaction(async (session, token) =>
{
var filter = CreateRangeFilter(persistenceId, criteria);
return await (session is not null ? snapshotCollection.Find(session, filter) : snapshotCollection.Find(filter))
.SortByDescending(x => x.SequenceNr)
.Limit(1)
.Project(x => ToSelectedSnapshot(x))
.FirstOrDefaultAsync(token);
}, unitedCts.Token);
}
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
using var unitedCts = CreatePerCallCts();
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);
var snapshotEntry = ToSnapshotEntry(metadata, snapshot);
await MaybeWithTransaction(async (session, token) =>
{
var snapshotEntry = ToSnapshotEntry(metadata, snapshot);
if (_settings.Transaction)
if (session is not null)
{
var sessionOptions = new ClientSessionOptions { };
using var session =
await _snapshotCollection.Value.Database.Client.StartSessionAsync(
sessionOptions, unitedCts.Token);
// Begin transaction
session.StartTransaction();
try
{
await _snapshotCollection.Value.ReplaceOneAsync(
session,
CreateSnapshotIdFilter(snapshotEntry.Id),
snapshotEntry,
new ReplaceOptions { IsUpsert = true }, unitedCts.Token);
}
catch (Exception ex)
{
using var cancelCts = CreatePerCallCts();
await session.AbortTransactionAsync(cancelCts.Token);
throw;
}
await session.CommitTransactionAsync(unitedCts.Token);
await snapshotCollection.ReplaceOneAsync(
session: session,
filter: CreateSnapshotIdFilter(snapshotEntry.Id),
replacement: snapshotEntry,
options: new ReplaceOptions { IsUpsert = true },
cancellationToken: token);
}
else
await _snapshotCollection.Value.ReplaceOneAsync(
CreateSnapshotIdFilter(snapshotEntry.Id),
snapshotEntry,
new ReplaceOptions { IsUpsert = true }, unitedCts.Token);
}
{
await snapshotCollection.ReplaceOneAsync(
filter: CreateSnapshotIdFilter(snapshotEntry.Id),
replacement: snapshotEntry,
options: new ReplaceOptions { IsUpsert = true },
cancellationToken: token);
}
}, unitedCts.Token);
}
protected override Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(SnapshotMetadata metadata)
{
var builder = Builders<SnapshotEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, metadata.PersistenceId);
if (metadata.SequenceNr > 0 && metadata.SequenceNr < long.MaxValue)
filter &= builder.Eq(x => x.SequenceNr, metadata.SequenceNr);
if (metadata.Timestamp != DateTime.MinValue && metadata.Timestamp != DateTime.MaxValue)
filter &= builder.Eq(x => x.Timestamp, metadata.Timestamp.Ticks);
using var unitedCts = CreatePerCallCts();
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);
await MaybeWithTransaction(async (session, token) =>
{
return _snapshotCollection.Value.FindOneAndDeleteAsync(filter, cancellationToken:unitedCts.Token);
}
var builder = Builders<SnapshotEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, metadata.PersistenceId);
if (metadata.SequenceNr is > 0 and < long.MaxValue)
filter &= builder.Eq(x => x.SequenceNr, metadata.SequenceNr);
if (metadata.Timestamp != DateTime.MinValue && metadata.Timestamp != DateTime.MaxValue)
filter &= builder.Eq(x => x.Timestamp, metadata.Timestamp.Ticks);
if(session is not null)
await snapshotCollection.FindOneAndDeleteAsync(session, filter, cancellationToken: token);
else
await snapshotCollection.FindOneAndDeleteAsync(filter, cancellationToken: token);
}, unitedCts.Token);
}
protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
var filter = CreateRangeFilter(persistenceId, criteria);
using var unitedCts = CreatePerCallCts();
return _snapshotCollection.Value.DeleteManyAsync(filter, unitedCts.Token);
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);
await MaybeWithTransaction(async (session, token) =>
{
var filter = CreateRangeFilter(persistenceId, criteria);
if(session is not null)
await snapshotCollection.DeleteManyAsync(session, filter, cancellationToken: token);
else
await snapshotCollection.DeleteManyAsync(filter, token);
}, unitedCts.Token);
}
private static FilterDefinition<SnapshotEntry> CreateSnapshotIdFilter(string snapshotId)
@ -200,7 +232,7 @@ namespace Akka.Persistence.MongoDb.Snapshot
var builder = Builders<SnapshotEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
if (criteria.MaxSequenceNr > 0 && criteria.MaxSequenceNr < long.MaxValue)
if (criteria.MaxSequenceNr is > 0 and < long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, criteria.MaxSequenceNr);
if (criteria.MaxTimeStamp != DateTime.MinValue && criteria.MaxTimeStamp != DateTime.MaxValue)
@ -266,7 +298,7 @@ namespace Akka.Persistence.MongoDb.Snapshot
}
int? serializerId = null;
Type type = null;
Type? type = null;
// legacy serialization
if (!entry.SerializerId.HasValue && !string.IsNullOrEmpty(entry.Manifest))

Просмотреть файл

@ -22,9 +22,6 @@
<AkkaVersion>1.5.12</AkkaVersion>
<AkkaHostingVersion>1.5.12</AkkaHostingVersion>
<XunitVersion>2.5.0</XunitVersion>
<TestSdkVersion>17.6.3</TestSdkVersion>
<FluentAssertionsVersion>6.11.0</FluentAssertionsVersion>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>

Просмотреть файл

@ -1,29 +1,29 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<!-- App dependencies -->
<ItemGroup>
<PackageVersion Include="Akka.Persistence.Hosting" Version="$(AkkaHostingVersion) "/>
<PackageVersion Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Streams" Version="$(AkkaVersion) "/>
<PackageVersion Include="MongoDB.Driver" Version="2.20.0" />
</ItemGroup>
<!-- Test dependencies -->
<ItemGroup>
<PackageVersion Include="FluentAssertions" Version="$(FluentAssertionsVersion) "/>
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion) "/>
<PackageVersion Include="xunit.runner.visualstudio" Version="$(XunitVersion)">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="xunit" Version="$(XunitVersion) "/>
<PackageVersion Include="Akka.Persistence.TCK" Version="$(AkkaVersion) "/>
<PackageVersion Include="Mongo2Go" Version="2.2.16 "/>
<PackageVersion Include="System.Net.NetworkInformation" Version="4.3.0 "/>
</ItemGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1 "/>
</ItemGroup>
</Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<!-- App dependencies -->
<ItemGroup>
<PackageVersion Include="Akka.Persistence.Hosting" Version="$(AkkaHostingVersion) " />
<PackageVersion Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Streams" Version="$(AkkaVersion) " />
<PackageVersion Include="MongoDB.Driver" Version="2.20.0" />
</ItemGroup>
<!-- Test dependencies -->
<ItemGroup>
<PackageVersion Include="FluentAssertions" Version="5.10.3" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.7.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="xunit" Version="2.5.0" />
<PackageVersion Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageVersion Include="Mongo2Go" Version="2.2.16" />
<PackageVersion Include="System.Net.NetworkInformation" Version="4.3.0" />
</ItemGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1 " />
</ItemGroup>
</Project>