From f10240b01c5f842e69378d574442f5701d193d4b Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 15 Sep 2023 22:35:19 +0700 Subject: [PATCH] Fix MongoDb operations to use transaction on both read and write (#347) * Fix journal to use transaction on both read and write * Apply transaction to snapshot store * Code cleanup and documentation * Fix unit test project failed to run * Expand transaction unit test to cover all relevant unit tests * Operations inside transaction should never be ran concurrently --- build-system/azure-pipeline.template.yaml | 4 + build-system/linux-pr-validation.yaml | 4 +- build-system/windows-pr-validation.yaml | 4 +- build-system/windows-release.yaml | 2 + build.fsx | 2 +- .../Akka.Persistence.MongoDb.Tests.csproj | 4 + .../MongoDbAllEventsSpec.cs | 83 +- .../MongoDbCurrentAllEventsSpec.cs | 83 +- ...ongoDbCurrentEventsByPersistenceIdsSpec.cs | 84 +- .../MongoDbCurrentEventsByTagSpec.cs | 106 +-- .../MongoDbCurrentPersistenceIdsSpec.cs | 67 +- .../MongoDbEventsByPersistenceIdSpec.cs | 67 +- .../MongoDbEventsByTagSpec.cs | 98 ++- .../MongoDbJournalPerfSpec.cs | 4 +- .../MongoDbJournalSpec.cs | 54 +- .../MongoDbPersistenceIdsSpec.cs | 69 +- .../MongoDbSnapshotStoreSpec.cs | 52 +- .../MongoDbTransactionAllEventsSpec.cs | 57 -- .../MongoDbTransactionCurrentAllEventsSpec.cs | 62 -- .../MongoDbTransactionSnapshotStoreSpec.cs | 41 - .../Journal/MongoDbJournal.cs | 772 ++++++------------ .../Journal/MongoDbJournalQueries.cs | 225 +++++ .../Query/AllEventsPublisher.cs | 6 +- .../Query/EventByPersistenceIdPublisher.cs | 6 +- .../Query/EventsByTagPublisher.cs | 6 +- .../Query/QueryApi.cs | 10 +- .../Snapshot/MongoDbSnapshotStore.cs | 230 +++--- src/Directory.Build.props | 3 - src/Directory.Packages.props | 56 +- 29 files changed, 1148 insertions(+), 1113 deletions(-) delete mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionAllEventsSpec.cs delete mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionCurrentAllEventsSpec.cs delete mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionSnapshotStoreSpec.cs create mode 100644 src/Akka.Persistence.MongoDb/Journal/MongoDbJournalQueries.cs diff --git a/build-system/azure-pipeline.template.yaml b/build-system/azure-pipeline.template.yaml index 07f45f7..c328a06 100644 --- a/build-system/azure-pipeline.template.yaml +++ b/build-system/azure-pipeline.template.yaml @@ -33,6 +33,8 @@ jobs: filePath: ${{ parameters.scriptFileName }} arguments: ${{ parameters.scriptArgs }} continueOnError: true + env: + TEST_ENVIRONMENT: "CICD" condition: in( variables['Agent.OS'], 'Linux', 'Darwin' ) # Windows - task: BatchScript@1 @@ -41,6 +43,8 @@ jobs: filename: ${{ parameters.scriptFileName }} arguments: ${{ parameters.scriptArgs }} continueOnError: true + env: + TEST_ENVIRONMENT: "CICD" condition: eq( variables['Agent.OS'], 'Windows_NT' ) - task: PublishTestResults@2 inputs: diff --git a/build-system/linux-pr-validation.yaml b/build-system/linux-pr-validation.yaml index fe921d8..afbd925 100644 --- a/build-system/linux-pr-validation.yaml +++ b/build-system/linux-pr-validation.yaml @@ -24,8 +24,8 @@ jobs: - template: azure-pipeline.template.yaml parameters: - name: 'net_6_tests_linux' - displayName: '.NET 6 Unit Tests (Linux)' + name: 'net_7_tests_linux' + displayName: '.NET 7 Unit Tests (Linux)' vmImage: 'ubuntu-latest' scriptFileName: ./build.sh scriptArgs: runTestsNet diff --git a/build-system/windows-pr-validation.yaml b/build-system/windows-pr-validation.yaml index 638c861..c21b507 100644 --- a/build-system/windows-pr-validation.yaml +++ b/build-system/windows-pr-validation.yaml @@ -32,8 +32,8 @@ jobs: - template: azure-pipeline.template.yaml parameters: - name: 'net_6_tests_windows' - displayName: '.NET 6 Unit Tests (Windows)' + name: 'net_7_tests_windows' + displayName: '.NET 7 Unit Tests (Windows)' vmImage: 'windows-latest' scriptFileName: build.cmd scriptArgs: runTestsNet diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index ec06472..31c48e3 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -28,6 +28,8 @@ steps: version: 7.0.x - task: BatchScript@1 displayName: 'FAKE Build' + env: + TEST_ENVIRONMENT: "CICD" inputs: filename: build.cmd arguments: 'nuget nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(nugetKey)' diff --git a/build.fsx b/build.fsx index 6c38d7d..b32f1bc 100644 --- a/build.fsx +++ b/build.fsx @@ -52,7 +52,7 @@ let outputBinariesNet = outputBinaries @@ "net5.0" // Configuration values for tests let testNetFrameworkVersion = "net471" let testNetCoreVersion = "netcoreapp3.1" -let testNetVersion = "net6.0" +let testNetVersion = "net7.0" Target "Clean" (fun _ -> ActivateFinalTarget "KillCreatedProcesses" diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj index 3ee4bfa..e3a58d7 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj +++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj @@ -3,6 +3,10 @@ $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) + + $(DefineConstants);CICD + + diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbAllEventsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbAllEventsSpec.cs index 1ac4f20..a25478c 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbAllEventsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbAllEventsSpec.cs @@ -15,45 +15,64 @@ using Xunit.Abstractions; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbAllEventsSpec: AllEventsSpec, IClassFixture + public class MongoDbTransactionAllEventsSpec : MongoDbAllEventsSpecBase { - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + public MongoDbTransactionAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbAllEventsSpec : MongoDbAllEventsSpecBase + { + public MongoDbAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbAllEventsSpecBase: AllEventsSpec, IClassFixture + { + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { // akka.test.single-expect-default = 10s - var specString = @" - akka.test.single-expect-default = 10s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 10s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } + snapshot-store { + plugin = "akka.persistence.snapshot-store.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } - - public static readonly AtomicCounter Counter = new AtomicCounter(0); - public MongoDbAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output) + private static readonly AtomicCounter Counter = new AtomicCounter(0); + + protected MongoDbAllEventsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbAllEventsSpec", output) { ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentAllEventsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentAllEventsSpec.cs index 17db35c..fa85b68 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentAllEventsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentAllEventsSpec.cs @@ -15,45 +15,64 @@ using Xunit.Abstractions; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbCurrentAllEventsSpec : CurrentAllEventsSpec, IClassFixture + public class MongoDbTransactionCurrentAllEventsSpec : MongoDbCurrentAllEventsSpecBase { - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + public MongoDbTransactionCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbCurrentAllEventsSpec : MongoDbCurrentAllEventsSpecBase + { + public MongoDbCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbCurrentAllEventsSpecBase : CurrentAllEventsSpec, IClassFixture + { + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { // akka.test.single-expect-default = 10s - var specString = @" - akka.test.single-expect-default = 10s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" + akka.test.single-expect-default = 10s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } + snapshot-store { + plugin = "akka.persistence.snapshot-store.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } + } +"""; return ConfigurationFactory.ParseString(specString); } - - public static readonly AtomicCounter Counter = new AtomicCounter(0); - public MongoDbCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentAllEventsSpec", output) + private static readonly AtomicCounter Counter = new AtomicCounter(0); + + protected MongoDbCurrentAllEventsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentAllEventsSpec", output) { ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs index 44ae51e..3ac590c 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs @@ -16,48 +16,68 @@ using Akka.Util.Internal; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture + public class MongoDbTransactionCurrentEventsByPersistenceIdsSpec: MongoDbCurrentEventsByPersistenceIdsSpecBase { - public static readonly AtomicCounter Counter = new AtomicCounter(0); + public MongoDbTransactionCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbCurrentEventsByPersistenceIdsSpec: MongoDbCurrentEventsByPersistenceIdsSpecBase + { + public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbCurrentEventsByPersistenceIdsSpecBase : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture + { + private static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; - public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) - : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByPersistenceIdsSpec", output) + protected MongoDbCurrentEventsByPersistenceIdsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentEventsByPersistenceIdsSpec", output) { _output = output; output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current)); ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { - var specString = @" - akka.test.single-expect-default = 10s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" + akka.test.single-expect-default = 10s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } + snapshot-store { + plugin = "akka.persistence.snapshot-store.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } + } +"""; return ConfigurationFactory.ParseString(specString); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs index aab01bc..6f6b340 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs @@ -21,13 +21,28 @@ using System.Diagnostics; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbCurrentEventsByTagSpec : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture + public class MongoDbTransactionCurrentEventsByTagSpec : MongoDbCurrentEventsByTagSpecBase { - public static readonly AtomicCounter Counter = new AtomicCounter(0); + public MongoDbTransactionCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbCurrentEventsByTagSpec : MongoDbCurrentEventsByTagSpecBase + { + public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbCurrentEventsByTagSpecBase : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture + { + private static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; - public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) - : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output) + protected MongoDbCurrentEventsByTagSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentEventsByTagSpec", output) { _output = output; @@ -39,53 +54,50 @@ namespace Akka.Persistence.MongoDb.Tests ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + protected override bool SupportsTagsInEventEnvelope => true; + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - event-adapters { - color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"" - } - event-adapter-bindings = { - ""System.String"" = color-tagger - } - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 3s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + event-adapters { + color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK" + } + event-adapter-bindings = { + "System.String" = color-tagger + } + } + } + snapshot-store { + plugin = "akka.persistence.snapshot-store.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } - //public override void ReadJournal_query_CurrentEventsByTag_should_find_existing_events() - //{ - // var a = Sys.ActorOf(TestActor.Props("a")); - // a.Tell("warm-up"); - // ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); - //} - - internal class TestActor : UntypedPersistentActor { public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId)); @@ -126,6 +138,4 @@ namespace Akka.Persistence.MongoDb.Tests } } } - - } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs index 2227284..b4fe70c 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs @@ -21,41 +21,58 @@ using System.Diagnostics; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbCurrentPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture + public class MongoDbTransactionCurrentPersistenceIdsSpec : MongoDbCurrentPersistenceIdsSpecBase { - public static readonly AtomicCounter Counter = new AtomicCounter(0); + public MongoDbTransactionCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbCurrentPersistenceIdsSpec : MongoDbCurrentPersistenceIdsSpecBase + { + public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbCurrentPersistenceIdsSpecBase : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture + { + private static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; - public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) - : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentPersistenceIdsSpec", output) + protected MongoDbCurrentPersistenceIdsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentPersistenceIdsSpec", output) { _output = output; output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current)); ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 3s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs index d29d0dd..343d3cf 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs @@ -16,42 +16,59 @@ using Akka.Util.Internal; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbEventsByPersistenceIdSpec : Akka.Persistence.TCK.Query.EventsByPersistenceIdSpec, IClassFixture + public class MongoDbTransactionEventsByPersistenceIdSpec : MongoDbEventsByPersistenceIdSpecBase { - public static readonly AtomicCounter Counter = new AtomicCounter(0); + public MongoDbTransactionEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbEventsByPersistenceIdSpec : MongoDbEventsByPersistenceIdSpecBase + { + public MongoDbEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbEventsByPersistenceIdSpecBase : Akka.Persistence.TCK.Query.EventsByPersistenceIdSpec, IClassFixture + { + private static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; - public MongoDbEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) - : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbEventsByPersistenceIdSpec", output) + protected MongoDbEventsByPersistenceIdSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbEventsByPersistenceIdSpec", output) { _output = output; output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current)); ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 3s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs index ca0534f..ada4a64 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs @@ -18,13 +18,28 @@ using Akka.Actor; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbEventsByTagSpec : Akka.Persistence.TCK.Query.EventsByTagSpec, IClassFixture + public class MongoDbTransactionEventsByTagSpec : MongoDbEventsByTagSpecBase { - public static readonly AtomicCounter Counter = new AtomicCounter(0); + public MongoDbTransactionEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbEventsByTagSpec : MongoDbEventsByTagSpecBase + { + public MongoDbEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbEventsByTagSpecBase : Akka.Persistence.TCK.Query.EventsByTagSpec, IClassFixture + { + private static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; - public MongoDbEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) - : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output) + protected MongoDbEventsByTagSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbCurrentEventsByTagSpec", output) { _output = output; output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current)); @@ -35,41 +50,46 @@ namespace Akka.Persistence.MongoDb.Tests ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + protected override bool SupportsTagsInEventEnvelope => true; + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { - var specString = @" - akka.test.single-expect-default = 10s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - event-adapters { - color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"" - } - event-adapter-bindings = { - ""System.String"" = color-tagger - } - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 10s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + event-adapters { + color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK" + } + event-adapter-bindings = { + "System.String" = color-tagger + } + } + } + snapshot-store { + plugin = "akka.persistence.snapshot-store.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } @@ -114,6 +134,4 @@ namespace Akka.Persistence.MongoDb.Tests } } } - - } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalPerfSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalPerfSpec.cs index 7206f13..f80bb58 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalPerfSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalPerfSpec.cs @@ -1,4 +1,5 @@ -using Akka.Configuration; +#if !CICD +using Akka.Configuration; using Akka.Persistence.TestKit.Performance; using Akka.Util.Internal; using System; @@ -41,3 +42,4 @@ namespace Akka.Persistence.MongoDb.Tests } } } +#endif \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs index 2ba208e..489ea99 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs @@ -12,31 +12,49 @@ using Akka.Configuration; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbJournalSpec : JournalSpec, IClassFixture + public class MongoDbTransactionJournalSpec : MongoDbJournalSpecBase { - protected override bool SupportsRejectingNonSerializableObjects { get; } = false; + public MongoDbTransactionJournalSpec(DatabaseFixture databaseFixture) : base(databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbJournalSpec : MongoDbJournalSpecBase + { + public MongoDbJournalSpec(DatabaseFixture databaseFixture) : base(databaseFixture, false) + { + } + } + + public abstract class MongoDbJournalSpecBase : JournalSpec, IClassFixture + { + protected override bool SupportsRejectingNonSerializableObjects { get; } = false; - public MongoDbJournalSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbJournalSpec") + protected MongoDbJournalSpecBase(DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, transaction), "MongoDbJournalSpec") { Initialize(); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture) + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, bool transaction) { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.ConnectionString + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 3s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.ConnectionString}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } +} +"""; return ConfigurationFactory.ParseString(specString) .WithFallback(MongoDbPersistence.DefaultConfiguration()); diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs index f6da123..07bfbaf 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs @@ -25,42 +25,61 @@ using MongoDB.Driver.Core.Misc; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbPersistenceIdsSpec : Akka.Persistence.TCK.Query.PersistenceIdsSpec, IClassFixture + public class MongoDbTransactionPersistenceIdsSpec : MongoDbPersistenceIdsSpecBase { - public static readonly AtomicCounter Counter = new AtomicCounter(0); + public MongoDbTransactionPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(output, databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbPersistenceIdsSpec : MongoDbPersistenceIdsSpecBase + { + public MongoDbPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(output, databaseFixture, false) + { + } + } + + public abstract class MongoDbPersistenceIdsSpecBase : Akka.Persistence.TCK.Query.PersistenceIdsSpec, IClassFixture + { + private static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; - public MongoDbPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) - : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbPersistenceIdsSpec", output) + protected MongoDbPersistenceIdsSpecBase(ITestOutputHelper output, DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement(), transaction), "MongoDbPersistenceIdsSpec", output) { _output = output; output.WriteLine(databaseFixture.MongoDbConnectionString(Counter.Current)); ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id, bool transaction) { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - collection = ""EventJournal"" - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 3s +akka.persistence { + publish-plugin-commands = on + journal { + plugin = "akka.persistence.journal.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.MongoDbConnectionString(id)}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "EventJournal" + } + } + query { + mongodb { + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + refresh-interval = 1s + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs index 87d9a4e..5b82986 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs @@ -12,29 +12,47 @@ using Akka.Configuration; namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture + public class MongoDbTransactionSnapshotStoreSpec : MongoDbSnapshotStoreSpecBase { - public MongoDbSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec") + public MongoDbTransactionSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(databaseFixture, true) + { + } + } + + [Collection("MongoDbSpec")] + public class MongoDbSnapshotStoreSpec : MongoDbSnapshotStoreSpecBase + { + public MongoDbSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(databaseFixture, false) + { + } + } + + public abstract class MongoDbSnapshotStoreSpecBase : SnapshotStoreSpec, IClassFixture + { + protected MongoDbSnapshotStoreSpecBase(DatabaseFixture databaseFixture, bool transaction) + : base(CreateSpecConfig(databaseFixture, transaction), "MongoDbSnapshotStoreSpec") { Initialize(); } - private static Config CreateSpecConfig(DatabaseFixture databaseFixture) + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, bool transaction) { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.ConnectionString + @""" - auto-initialize = on - collection = ""SnapshotStore"" - } - } - }"; + var specString = $$""" +akka.test.single-expect-default = 3s +akka.persistence { + publish-plugin-commands = on + snapshot-store { + plugin = "akka.persistence.snapshot-store.mongodb" + mongodb { + class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb" + connection-string = "{{databaseFixture.ConnectionString}}" + use-write-transaction = {{(transaction ? "on" : "off")}} + auto-initialize = on + collection = "SnapshotStore" + } + } +} +"""; return ConfigurationFactory.ParseString(specString); } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionAllEventsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionAllEventsSpec.cs deleted file mode 100644 index 9cea419..0000000 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionAllEventsSpec.cs +++ /dev/null @@ -1,57 +0,0 @@ -using Akka.Configuration; -using Akka.Persistence.MongoDb.Query; -using Akka.Persistence.Query; -using Akka.Persistence.TCK.Query; -using Akka.Util.Internal; -using Xunit.Abstractions; -using Xunit; - -namespace Akka.Persistence.MongoDb.Tests -{ - [Collection("MongoDbSpec")] - public class MongoDbTransactionAllEventsSpec : AllEventsSpec, IClassFixture - { - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) - { - // akka.test.single-expect-default = 10s - var specString = @" - akka.test.single-expect-default = 10s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - use-write-transaction = on - auto-initialize = on - collection = ""EventJournal"" - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - use-write-transaction = on - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; - - return ConfigurationFactory.ParseString(specString); - } - - public static readonly AtomicCounter Counter = new AtomicCounter(0); - - public MongoDbTransactionAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output) - { - ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); - } - } -} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionCurrentAllEventsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionCurrentAllEventsSpec.cs deleted file mode 100644 index ebb523a..0000000 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionCurrentAllEventsSpec.cs +++ /dev/null @@ -1,62 +0,0 @@ -using Akka.Configuration; -using Akka.Persistence.MongoDb.Query; -using Akka.Persistence.Query; -using Akka.Persistence.TCK.Query; -using Akka.Util.Internal; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Xunit.Abstractions; -using Xunit; - -namespace Akka.Persistence.MongoDb.Tests -{ - [Collection("MongoDbSpec")] - public class MongoDbTransactionCurrentAllEventsSpec : CurrentAllEventsSpec, IClassFixture - { - private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) - { - // akka.test.single-expect-default = 10s - var specString = @" - akka.test.single-expect-default = 10s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - auto-initialize = on - use-write-transaction = on - collection = ""EventJournal"" - } - } - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.MongoDbConnectionString(id) + @""" - use-write-transaction = on - } - } - query { - mongodb { - class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" - refresh-interval = 1s - } - } - }"; - - return ConfigurationFactory.ParseString(specString); - } - - public static readonly AtomicCounter Counter = new AtomicCounter(0); - - public MongoDbTransactionCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentAllEventsSpec", output) - { - ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); - } - } -} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionSnapshotStoreSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionSnapshotStoreSpec.cs deleted file mode 100644 index edf3fef..0000000 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbTransactionSnapshotStoreSpec.cs +++ /dev/null @@ -1,41 +0,0 @@ -using Akka.Configuration; -using Akka.Persistence.TCK.Snapshot; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Xunit; - -namespace Akka.Persistence.MongoDb.Tests -{ - [Collection("MongoDbSpec")] - public class MongoDbTransactionSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture - { - public MongoDbTransactionSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec") - { - Initialize(); - } - - private static Config CreateSpecConfig(DatabaseFixture databaseFixture) - { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - snapshot-store { - plugin = ""akka.persistence.snapshot-store.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.ConnectionString + @""" - auto-initialize = on - use-write-transaction = on - collection = ""SnapshotStore"" - } - } - }"; - - return ConfigurationFactory.ParseString(specString); - } - } -} diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs index 54c68a4..7a115dd 100644 --- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs +++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs @@ -20,6 +20,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Configuration; +#nullable enable namespace Akka.Persistence.MongoDb.Journal { /// @@ -28,19 +29,14 @@ namespace Akka.Persistence.MongoDb.Journal public class MongoDbJournal : AsyncWriteJournal { private static readonly BsonTimestamp ZeroTimestamp = new(0); + private static readonly ClientSessionOptions EmptySessionOptions = new(); private readonly MongoDbJournalSettings _settings; - private Lazy _mongoDatabase; - private Lazy> _journalCollection; - private Lazy> _metadataCollection; - - private ImmutableDictionary> _persistenceIdSubscribers = - ImmutableDictionary.Create>(); - - private ImmutableDictionary> _tagSubscribers = - ImmutableDictionary.Create>(); - - private readonly HashSet _newEventsSubscriber = new(); + // ReSharper disable InconsistentNaming + private IMongoDatabase? _mongoDatabase_DoNotUseDirectly; + private IMongoCollection? _journalCollection_DoNotUseDirectly; + private IMongoCollection? _metadataCollection_DoNotUseDirectly; + // ReSharper enable InconsistentNaming /// /// Used to cancel all outstanding commands when the actor is stopped. @@ -64,104 +60,133 @@ namespace Akka.Persistence.MongoDb.Journal _serialization = Context.System.Serialization; } - protected override void PreStart() + private IMongoDatabase GetMongoDb() { - base.PreStart(); - - _mongoDatabase = new Lazy(() => + if (_mongoDatabase_DoNotUseDirectly is not null) + return _mongoDatabase_DoNotUseDirectly; + + MongoClient client; + var setupOption = Context.System.Settings.Setup.Get(); + if (setupOption.HasValue && setupOption.Value.JournalConnectionSettings != null) { - MongoClient client; - var setupOption = Context.System.Settings.Setup.Get(); - if (setupOption.HasValue && setupOption.Value.JournalConnectionSettings != null) - { - client = new MongoClient(setupOption.Value.JournalConnectionSettings); - return client.GetDatabase(setupOption.Value.JournalDatabaseName); - } + client = new MongoClient(setupOption.Value.JournalConnectionSettings); + _mongoDatabase_DoNotUseDirectly = client.GetDatabase(setupOption.Value.JournalDatabaseName); + return _mongoDatabase_DoNotUseDirectly; + } - //Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way: - var connectionString = new MongoUrl(_settings.ConnectionString); - var clientSettings = MongoClientSettings.FromUrl(connectionString); - clientSettings.LinqProvider = LinqProvider.V2; - client = new MongoClient(clientSettings); - return client.GetDatabase(connectionString.DatabaseName); - }); - _journalCollection = new Lazy>(() => - { - var collection = _mongoDatabase.Value.GetCollection(_settings.Collection); + //Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way: + var connectionString = new MongoUrl(_settings.ConnectionString); + var clientSettings = MongoClientSettings.FromUrl(connectionString); + clientSettings.LinqProvider = LinqProvider.V2; + + client = new MongoClient(clientSettings); + _mongoDatabase_DoNotUseDirectly = client.GetDatabase(connectionString.DatabaseName); + return _mongoDatabase_DoNotUseDirectly; + } + + private async Task> GetJournalCollection(CancellationToken token) + { + if (_journalCollection_DoNotUseDirectly is not null) + return _journalCollection_DoNotUseDirectly; + + _journalCollection_DoNotUseDirectly = GetMongoDb().GetCollection(_settings.Collection); + + if (!_settings.AutoInitialize) + return _journalCollection_DoNotUseDirectly; + + var modelForEntryAndSequenceNr = new CreateIndexModel(Builders + .IndexKeys + .Ascending(entry => entry.PersistenceId) + .Descending(entry => entry.SequenceNr)); - if (_settings.AutoInitialize) - { - using var unitedCts = CreatePerCallCts(); - { - var modelForEntryAndSequenceNr = new CreateIndexModel(Builders - .IndexKeys - .Ascending(entry => entry.PersistenceId) - .Descending(entry => entry.SequenceNr)); + await _journalCollection_DoNotUseDirectly.Indexes + .CreateOneAsync(modelForEntryAndSequenceNr, cancellationToken: token); - collection.Indexes - .CreateOneAsync(modelForEntryAndSequenceNr, cancellationToken: unitedCts.Token) - .Wait(); + var modelWithOrdering = new CreateIndexModel( + Builders + .IndexKeys + .Ascending(entry => entry.Ordering)); - var modelWithOrdering = new CreateIndexModel( - Builders - .IndexKeys - .Ascending(entry => entry.Ordering)); + await _journalCollection_DoNotUseDirectly.Indexes + .CreateOneAsync(modelWithOrdering, cancellationToken: token); - collection.Indexes - .CreateOneAsync(modelWithOrdering, cancellationToken: unitedCts.Token) - .Wait(); + await _journalCollection_DoNotUseDirectly.Indexes + .CreateOneAsync(modelWithOrdering, cancellationToken: token); - collection.Indexes - .CreateOne(modelWithOrdering); + var tagsWithOrdering = new CreateIndexModel( + Builders + .IndexKeys + .Ascending(entry => entry.Tags) + .Ascending(entry => entry.Ordering)); - var tagsWithOrdering = new CreateIndexModel( - Builders - .IndexKeys - .Ascending(entry => entry.Tags) - .Ascending(entry => entry.Ordering)); + await _journalCollection_DoNotUseDirectly.Indexes + .CreateOneAsync(tagsWithOrdering, cancellationToken:token); - collection.Indexes - .CreateOneAsync(tagsWithOrdering, cancellationToken:unitedCts.Token) - .Wait(); - } - } - - return collection; - }); - - _metadataCollection = new Lazy>(() => - { - using var unitedCts = CreatePerCallCts(); - { - var collection = _mongoDatabase.Value.GetCollection(_settings.MetadataCollection); - - if (_settings.AutoInitialize) - { - var modelWithAscendingPersistenceId = new CreateIndexModel( - Builders - .IndexKeys - .Ascending(entry => entry.PersistenceId)); - - collection.Indexes - .CreateOneAsync(modelWithAscendingPersistenceId, cancellationToken: unitedCts.Token) - .Wait(); - } - - return collection; - } - }); + return _journalCollection_DoNotUseDirectly; } + private async Task> GetMetadataCollection(CancellationToken token) + { + if (_metadataCollection_DoNotUseDirectly is not null) + return _metadataCollection_DoNotUseDirectly; + + _metadataCollection_DoNotUseDirectly = GetMongoDb() + .GetCollection(_settings.MetadataCollection); + + if (!_settings.AutoInitialize) + return _metadataCollection_DoNotUseDirectly; + + var modelWithAscendingPersistenceId = new CreateIndexModel( + Builders + .IndexKeys + .Ascending(entry => entry.PersistenceId)); + + await _metadataCollection_DoNotUseDirectly.Indexes + .CreateOneAsync(modelWithAscendingPersistenceId, cancellationToken: token); + + return _metadataCollection_DoNotUseDirectly; + } + private CancellationTokenSource CreatePerCallCts() { - var unitedCts = - CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token); + var unitedCts = CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token); unitedCts.CancelAfter(_settings.CallTimeout); return unitedCts; } - public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, - long toSequenceNr, long max, Action recoveryCallback) + private async Task MaybeWithTransaction(Func act, CancellationToken token) + { + if (!_settings.Transaction) + { + await act(null, token); + return; + } + + using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token); + await session.WithTransactionAsync( + async (s, ct) => + { + await act(s, ct); + return Task.FromResult(NotUsed.Instance); + }, cancellationToken:token); + } + + private async Task MaybeWithTransaction(Func> act, CancellationToken token) + { + if (!_settings.Transaction) + return await act(null, token); + + using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token); + return await session.WithTransactionAsync(act, cancellationToken:token); + } + + public override async Task ReplayMessagesAsync( + IActorContext context, + string persistenceId, + long fromSequenceNr, + long toSequenceNr, + long max, + Action recoveryCallback) { // Limit allows only integer var limitValue = max >= int.MaxValue ? int.MaxValue : (int)max; @@ -169,26 +194,18 @@ namespace Akka.Persistence.MongoDb.Journal // Do not replay messages if limit equal zero if (limitValue == 0) return; - - var builder = Builders.Filter; - var filter = builder.Eq(x => x.PersistenceId, persistenceId); - if (fromSequenceNr > 0) - filter &= builder.Gte(x => x.SequenceNr, fromSequenceNr); - if (toSequenceNr != long.MaxValue) - filter &= builder.Lte(x => x.SequenceNr, toSequenceNr); - - var sort = Builders.Sort.Ascending(x => x.SequenceNr); - + using var unitedCts = CreatePerCallCts(); - { - var collections = await _journalCollection.Value - .Find(filter) - .Sort(sort) - .Limit(limitValue) - .ToListAsync(unitedCts.Token); + var journalCollection = await GetJournalCollection(unitedCts.Token); - collections.ForEach(doc => { recoveryCallback(ToPersistenceRepresentation(doc, context.Sender)); }); - } + await MaybeWithTransaction(async (session, ct) => + { + var collections = await journalCollection + .ReplayMessagesQuery(session, persistenceId, fromSequenceNr, toSequenceNr, limitValue) + .ToListAsync(ct); + + collections.ForEach(doc => recoveryCallback(ToPersistenceRepresentation(doc, context.Sender))); + }, unitedCts.Token); } /// @@ -198,63 +215,43 @@ namespace Akka.Persistence.MongoDb.Journal /// TBD private async Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) { - /* - * NOTE: limit is used like a pagination value, not a cap on the amount - * of data returned by a query. This was at the root of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80 - */ - // Limit allows only integer; - var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max; - - var fromSequenceNr = replay.FromOffset; - var toSequenceNr = replay.ToOffset; - var tag = replay.Tag; - - var builder = Builders.Filter; - var seqNoFilter = builder.AnyEq(x => x.Tags, tag); - if (fromSequenceNr > 0) - seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); - if (toSequenceNr != long.MaxValue) - seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr)); - - - // Need to know what the highest seqNo of this query will be - // and return that as part of the RecoverySuccess message - using var unitedCts = CreatePerCallCts(); + var journalCollection = await GetJournalCollection(unitedCts.Token); + + return await MaybeWithTransaction(async (s, ct) => { - var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter) - .SortByDescending(x => x.Ordering) - .Limit(1) - .SingleOrDefaultAsync(unitedCts.Token); + /* + * NOTE: limit is used like a pagination value, not a cap on the amount + * of data returned by a query. This was at the root of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80 + */ + // Limit allows only integer; + var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max; + + var fromSequenceNr = replay.FromOffset; + var toSequenceNr = replay.ToOffset; + var tag = replay.Tag; + + // Need to know what the highest seqNo of this query will be + // and return that as part of the RecoverySuccess message + var maxSeqNoEntry = await journalCollection.MaxOrderingIdQuery(s, fromSequenceNr, toSequenceNr, tag) + .SingleOrDefaultAsync(ct); if (maxSeqNoEntry == null) return 0L; // recovered nothing - var maxOrderingId = maxSeqNoEntry.Ordering.Value; - var toSeqNo = Math.Min(toSequenceNr, maxOrderingId); + var maxOrderingId = maxSeqNoEntry.Value; - var readFilter = builder.AnyEq(x => x.Tags, tag); - if (fromSequenceNr > 0) - readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); - if (toSequenceNr != long.MaxValue) - readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo)); - var sort = Builders.Sort.Ascending(x => x.Ordering); - - - await _journalCollection.Value - .Find(readFilter) - .Sort(sort) - .Limit(limitValue) + await journalCollection.MessagesQuery(s, fromSequenceNr, toSequenceNr, maxOrderingId, tag, limitValue) .ForEachAsync(entry => { var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender); foreach (var adapted in AdaptFromJournal(persistent)) replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value), ActorRefs.NoSender); - }, unitedCts.Token); - + }, ct); + return maxOrderingId; - } + }, unitedCts.Token); } /// @@ -265,172 +262,109 @@ namespace Akka.Persistence.MongoDb.Journal /// long public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - var builder = Builders.Filter; - var filter = builder.Eq(x => x.PersistenceId, persistenceId); - using var unitedCts = CreatePerCallCts(); - { - var metadataHighestSequenceNrTask = _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr) - .FirstOrDefaultAsync(unitedCts.Token); + var token = unitedCts.Token; - var journalHighestSequenceNrTask = _journalCollection.Value.Find(Builders - .Filter.Eq(x => x.PersistenceId, persistenceId)) - .SortByDescending(x => x.SequenceNr) - .Project(x => x.SequenceNr) - .FirstOrDefaultAsync(unitedCts.Token); - - // journal data is usually good enough, except in cases when it's been deleted. - await Task.WhenAll(metadataHighestSequenceNrTask, journalHighestSequenceNrTask); - - return Math.Max(journalHighestSequenceNrTask.Result, metadataHighestSequenceNrTask.Result); - } + return await MaybeWithTransaction( + async (s, ct) => await ReadHighestSequenceNrOperation(s, persistenceId, ct), + token); } - protected override async Task> WriteMessagesAsync(IEnumerable messages) + /// + /// NOTE: This method is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + private async Task ReadHighestSequenceNrOperation( + IClientSessionHandle? session, + string persistenceId, + CancellationToken token) { - var allTags = ImmutableHashSet.Empty; - var persistentIds = new HashSet(); - var messageList = messages.ToList(); + var journalCollection = await GetJournalCollection(token); + var metadataCollection = await GetMetadataCollection(token); + + var metadataHighestSequenceNr = await metadataCollection + .MaxSequenceNrQuery(session, persistenceId) + .FirstOrDefaultAsync(token); - var writeTasks = messageList.Select(async message => + var journalHighestSequenceNr = await journalCollection + .MaxSequenceNrQuery(session, persistenceId) + .FirstOrDefaultAsync(token); + + // journal data is usually good enough, except in cases when it's been deleted. + return Math.Max(journalHighestSequenceNr, metadataHighestSequenceNr); + } + + protected override async Task> WriteMessagesAsync(IEnumerable messages) + { + using var unitedCts = CreatePerCallCts(); + var journalCollection = await GetJournalCollection(unitedCts.Token); + + var writeTasks = messages.Select(async message => { - var persistentMessages = ((IImmutableList)message.Payload); - - if (HasTagSubscribers) - { - foreach (var p in persistentMessages) - { - if (p.Payload is Tagged t) - { - allTags = allTags.Union(t.Tags); - } - } - } + var persistentMessages = (IImmutableList)message.Payload; var journalEntries = persistentMessages.Select(ToJournalEntry); - await InsertEntries(journalEntries); + await InsertEntries(journalCollection, journalEntries, unitedCts.Token); + }).ToArray(); - if (HasPersistenceIdSubscribers) - persistentIds.Add(message.PersistenceId); - }); - - var result = await Task> + var result = await Task> .Factory - .ContinueWhenAll(writeTasks.ToArray(), - tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList()); - - if (HasPersistenceIdSubscribers) - { - foreach (var id in persistentIds) - { - NotifyPersistenceIdChange(id); - } - } - - if (HasTagSubscribers && allTags.Count != 0) - { - foreach (var tag in allTags) - { - NotifyTagChange(tag); - } - } - - if (HasNewEventSubscribers) - NotifyNewEventAppended(); + .ContinueWhenAll( + tasks: writeTasks.ToArray(), + continuationFunction: tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList(), + cancellationToken: unitedCts.Token); + return result; } - private async ValueTask InsertEntries(IEnumerable entries) + private async ValueTask InsertEntries(IMongoCollection collection, IEnumerable entries, CancellationToken token) { - using var unitedCts = CreatePerCallCts(); + await MaybeWithTransaction(async (session, ct) => { - if (_settings.Transaction) - { - var sessionOptions = new ClientSessionOptions { }; - using var session = - await _journalCollection.Value.Database.Client.StartSessionAsync( - sessionOptions, unitedCts.Token); - // Begin transaction - session.StartTransaction(); - try - { - //https://www.mongodb.com/community/forums/t/insertone-vs-insertmany-is-one-preferred-over-the-other/135982/2 - //https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#runtime-limit - //https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#oplog-size-limit - //https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limits-and-thresholds - //16MB: if is bigger than this that means you do it one by one. LET'S TALK ABOUT THIS - await _journalCollection.Value.InsertManyAsync(session, entries, - new InsertManyOptions { IsOrdered = true }, cancellationToken: unitedCts.Token); - } - catch (Exception ex) - { - using var cancelCts = CreatePerCallCts(); - await session.AbortTransactionAsync(cancelCts.Token); - throw; - } - - //All good , lets commit the transaction - await session.CommitTransactionAsync(unitedCts.Token); - } + //https://www.mongodb.com/community/forums/t/insertone-vs-insertmany-is-one-preferred-over-the-other/135982/2 + //https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#runtime-limit + //https://www.mongodb.com/docs/manual/core/transactions-production-consideration/#oplog-size-limit + //https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limits-and-thresholds + //16MB: if is bigger than this that means you do it one by one. LET'S TALK ABOUT THIS + if(session is not null) + await collection + .InsertManyAsync(session, entries, new InsertManyOptions { IsOrdered = true }, cancellationToken: ct); else - await _journalCollection.Value.InsertManyAsync(entries, new InsertManyOptions { IsOrdered = true }, - unitedCts.Token); - } - } - - private void NotifyNewEventAppended() - { - if (HasNewEventSubscribers) - { - foreach (var subscriber in _newEventsSubscriber) - { - subscriber.Tell(NewEventAppended.Instance); - } - } + await collection + .InsertManyAsync(entries, new InsertManyOptions { IsOrdered = true }, cancellationToken: ct); + }, token); } protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { - var builder = Builders.Filter; - var filter = builder.Eq(x => x.PersistenceId, persistenceId); - - // read highest sequence number before we start - var highestSeqNo = await ReadHighestSequenceNrAsync(persistenceId, 0L); - - if (toSequenceNr != long.MaxValue) - filter &= builder.Lte(x => x.SequenceNr, toSequenceNr); - - // only update the sequence number of the top of the journal - // is about to be deleted. - if (highestSeqNo <= toSequenceNr) - { - await SetHighSequenceId(persistenceId, highestSeqNo); - } - using var unitedCts = CreatePerCallCts(); - if (_settings.Transaction) - { - var sessionOptions = new ClientSessionOptions { }; - using var session = - await _journalCollection.Value.Database.Client.StartSessionAsync( - sessionOptions, unitedCts.Token); - // Begin transaction - session.StartTransaction(); - try - { - await _journalCollection.Value.DeleteManyAsync(session, filter, cancellationToken: unitedCts.Token); - } - catch (Exception ex) - { - using var cancelCts = CreatePerCallCts(); - await session.AbortTransactionAsync(cancelCts.Token); - throw ex; - } + var journalCollection = await GetJournalCollection(unitedCts.Token); + var metadataCollection = await GetMetadataCollection(unitedCts.Token); - await session.CommitTransactionAsync(unitedCts.Token); - } - else - await _journalCollection.Value.DeleteManyAsync(filter, unitedCts.Token); + await MaybeWithTransaction(async (session, token) => + { + var builder = Builders.Filter; + var filter = builder.Eq(x => x.PersistenceId, persistenceId); + + // read highest sequence number before we start + var highestSeqNo = await ReadHighestSequenceNrOperation(session, persistenceId, token); + + if (toSequenceNr != long.MaxValue) + filter &= builder.Lte(x => x.SequenceNr, toSequenceNr); + + // only update the sequence number of the top of the journal + // is about to be deleted. + if (highestSeqNo <= toSequenceNr) + { + await metadataCollection.SetHighSequenceIdQuery(session, persistenceId, highestSeqNo, token); + } + + if(session is not null) + await journalCollection.DeleteManyAsync(session, filter, cancellationToken: token); + else + await journalCollection.DeleteManyAsync(filter, cancellationToken: token); + }, unitedCts.Token); } private JournalEntry ToJournalEntry(IPersistentRepresentation message) @@ -482,7 +416,7 @@ namespace Akka.Persistence.MongoDb.Journal }; } - private static long ToTicks(BsonTimestamp bson) + private static long ToTicks(BsonTimestamp? bson) { // BSON Timestamps are stored natively as Unix epoch seconds + an ordinal value @@ -492,7 +426,7 @@ namespace Akka.Persistence.MongoDb.Journal // which is used entirely for end-user purposes. // // See https://docs.mongodb.com/manual/reference/bson-types/#timestamps - bson = bson ?? ZeroTimestamp; + bson ??= ZeroTimestamp; return DateTimeOffset.FromUnixTimeSeconds(bson.Timestamp).Ticks; } @@ -531,7 +465,7 @@ namespace Akka.Persistence.MongoDb.Journal } int? serializerId = null; - Type type = null; + Type? type = null; // legacy serialization if (!entry.SerializerId.HasValue && !string.IsNullOrEmpty(entry.Manifest)) @@ -541,7 +475,7 @@ namespace Akka.Persistence.MongoDb.Journal if (entry.Payload is byte[] bytes) { - object deserialized = null; + object? deserialized; if (serializerId.HasValue) { deserialized = _serialization.Deserialize(bytes, serializerId.Value, entry.Manifest); @@ -558,53 +492,10 @@ namespace Akka.Persistence.MongoDb.Journal return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering)); } - else // backwards compat for object serialization - Payload was already deserialized by BSON - { - return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest, - entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering)); - } - } - private async Task SetHighSequenceId(string persistenceId, long maxSeqNo) - { - var builder = Builders.Filter; - var filter = builder.Eq(x => x.PersistenceId, persistenceId); - - var metadataEntry = new MetadataEntry - { - Id = persistenceId, - PersistenceId = persistenceId, - SequenceNr = maxSeqNo - }; - - using var unitedCts = CreatePerCallCts(); - { - if (_settings.Transaction) - { - var sessionOptions = new ClientSessionOptions { }; - using var session = - await _journalCollection.Value.Database.Client.StartSessionAsync( - sessionOptions, unitedCts.Token); - // Begin transaction - session.StartTransaction(); - try - { - await _metadataCollection.Value.ReplaceOneAsync(session, filter, metadataEntry, - new ReplaceOptions() { IsUpsert = true }, unitedCts.Token); - } - catch (Exception ex) - { - using var cancellationCts = CreatePerCallCts(); - await session.AbortTransactionAsync(cancellationCts.Token); - throw; - } - - await session.CommitTransactionAsync(unitedCts.Token); - } - else - await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, - new ReplaceOptions() { IsUpsert = true }, unitedCts.Token); - } + // backwards compat for object serialization - Payload was already deserialized by BSON + return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest, + entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering)); } protected override bool ReceivePluginInternal(object message) @@ -621,84 +512,53 @@ namespace Akka.Persistence.MongoDb.Journal .PipeTo(replay.ReplyTo, success: h => new EventReplaySuccess(h), failure: e => new EventReplayFailure(e)); return true; - case SubscribePersistenceId subscribe: - AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId); - Context.Watch(Sender); - return true; case SelectCurrentPersistenceIds request: SelectAllPersistenceIdsAsync(request.Offset) .PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, request.Offset)); return true; - case SubscribeTag subscribe: - AddTagSubscriber(Sender, subscribe.Tag); - Context.Watch(Sender); - return true; - case SubscribeNewEvents _: - AddNewEventsSubscriber(Sender); - Context.Watch(Sender); - return true; - case Terminated terminated: - RemoveSubscriber(terminated.ActorRef); - return true; default: return false; } } - private void AddNewEventsSubscriber(IActorRef subscriber) - { - _newEventsSubscriber.Add(subscriber); - } - protected virtual async Task<(IEnumerable Ids, long LastOrdering)> SelectAllPersistenceIdsAsync( long offset) { - var ids = await GetAllPersistenceIds(offset); - var lastOrdering = await GetHighestOrdering(); - return (ids, lastOrdering); + using var unitedCts = CreatePerCallCts(); + var journalCollection = await GetJournalCollection(unitedCts.Token); + + return await MaybeWithTransaction(async (session, token) => + { + var ids = await journalCollection.AllPersistenceIdsQuery(session, offset, token); + var lastOrdering = await journalCollection.HighestOrderingQuery(session, token); + return (ids, lastOrdering); + }, unitedCts.Token); } protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay) { - var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max; - - var fromSequenceNr = replay.FromOffset; - var toSequenceNr = replay.ToOffset; - var builder = Builders.Filter; - - var seqNoFilter = builder.Empty; - if (fromSequenceNr > 0) - seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); - if (toSequenceNr != long.MaxValue) - seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr)); - - - // Need to know what the highest seqNo of this query will be - // and return that as part of the RecoverySuccess message using var unitedCts = CreatePerCallCts(); + var journalCollection = await GetJournalCollection(unitedCts.Token); + + return await MaybeWithTransaction(async (session, token) => { - var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter) - .SortByDescending(x => x.Ordering) - .Limit(1) - .SingleOrDefaultAsync(unitedCts.Token); + var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max; + + var fromSequenceNr = replay.FromOffset; + var toSequenceNr = replay.ToOffset; + + // Need to know what the highest seqNo of this query will be + // and return that as part of the RecoverySuccess message + var maxSeqNoEntry = await journalCollection.MaxOrderingIdQuery(session, fromSequenceNr, toSequenceNr, null) + .SingleOrDefaultAsync(token); if (maxSeqNoEntry == null) return 0L; // recovered nothing + + var maxOrderingId = maxSeqNoEntry.Value; - var maxOrderingId = maxSeqNoEntry.Ordering.Value; - var toSeqNo = Math.Min(toSequenceNr, maxOrderingId); - - var readFilter = builder.Empty; - if (fromSequenceNr > 0) - readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); - if (toSequenceNr != long.MaxValue) - readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo)); - var sort = Builders.Sort.Ascending(x => x.Ordering); - - await _journalCollection.Value.Find(readFilter) - .Sort(sort) - .Limit(limitValue) + await journalCollection.MessagesQuery(session, fromSequenceNr, toSequenceNr, maxOrderingId, null, limitValue) .ForEachAsync(entry => { var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender); @@ -706,121 +566,17 @@ namespace Akka.Persistence.MongoDb.Journal { replay.ReplyTo.Tell(new ReplayedEvent(adapted, entry.Ordering.Value), ActorRefs.NoSender); } - }, unitedCts.Token); + }, token); + return maxOrderingId; - } - } - - private void AddTagSubscriber(IActorRef subscriber, string tag) - { - if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) - { - _tagSubscribers = _tagSubscribers.Add(tag, ImmutableHashSet.Create(subscriber)); - } - else - { - _tagSubscribers = _tagSubscribers.SetItem(tag, subscriptions.Add(subscriber)); - } - } - - private async Task> GetAllPersistenceIds(long offset) - { - using var unitedCts = CreatePerCallCts(); - { - var ids = await _journalCollection.Value - .DistinctAsync(x => x.PersistenceId, entry => entry.Ordering > new BsonTimestamp(offset), - cancellationToken: unitedCts.Token); - - var hashset = new List(); - while (await ids.MoveNextAsync(unitedCts.Token)) - { - hashset.AddRange(ids.Current); - } - - return hashset; - } - } - - private async Task GetHighestOrdering() - { - using var unitedCts = CreatePerCallCts(); - { - var max = await _journalCollection.Value.AsQueryable() - .Select(je => je.Ordering) - .MaxAsync(unitedCts.Token); - - - return max.Value; - } - } - - private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId) - { - if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) - { - _persistenceIdSubscribers = - _persistenceIdSubscribers.Add(persistenceId, ImmutableHashSet.Create(subscriber)); - } - else - { - _persistenceIdSubscribers = - _persistenceIdSubscribers.SetItem(persistenceId, subscriptions.Add(subscriber)); - } - } - - private void RemoveSubscriber(IActorRef subscriber) - { - _persistenceIdSubscribers = _persistenceIdSubscribers.SetItems(_persistenceIdSubscribers - .Where(kv => kv.Value.Contains(subscriber)) - .Select(kv => new KeyValuePair>(kv.Key, kv.Value.Remove(subscriber)))); - - _tagSubscribers = _tagSubscribers.SetItems(_tagSubscribers - .Where(kv => kv.Value.Contains(subscriber)) - .Select(kv => new KeyValuePair>(kv.Key, kv.Value.Remove(subscriber)))); - - _newEventsSubscriber.Remove(subscriber); - } - - /// - /// TBD - /// - protected bool HasTagSubscribers => _tagSubscribers.Count != 0; - - /// - /// TBD - /// - protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0; - - /// - /// TBD - /// - protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0; - - - private void NotifyPersistenceIdChange(string persistenceId) - { - if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscribers)) - { - var changed = new EventAppended(persistenceId); - foreach (var subscriber in subscribers) - subscriber.Tell(changed); - } - } - - private void NotifyTagChange(string tag) - { - if (_tagSubscribers.TryGetValue(tag, out var subscribers)) - { - var changed = new TaggedEventAppended(tag); - foreach (var subscriber in subscribers) - subscriber.Tell(changed); - } + }, unitedCts.Token); } protected override void PostStop() { // cancel any / all pending operations _pendingCommandsCancellation.Cancel(); + _pendingCommandsCancellation.Dispose(); base.PostStop(); } } diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournalQueries.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournalQueries.cs new file mode 100644 index 0000000..71d488f --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournalQueries.cs @@ -0,0 +1,225 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2023 .NET Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; +using MongoDB.Driver.Linq; + +#nullable enable +namespace Akka.Persistence.MongoDb.Journal; + +internal static class MongoDbJournalQueries +{ + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static IFindFluent ReplayMessagesQuery( + this IMongoCollection collection, + IClientSessionHandle? session, + string persistenceId, + long fromSequenceNr, + long toSequenceNr, + int limit) + { + var builder = Builders.Filter; + var filter = builder.Eq(x => x.PersistenceId, persistenceId); + if (fromSequenceNr > 0) + filter &= builder.Gte(x => x.SequenceNr, fromSequenceNr); + if (toSequenceNr != long.MaxValue) + filter &= builder.Lte(x => x.SequenceNr, toSequenceNr); + + var sort = Builders.Sort.Ascending(x => x.SequenceNr); + + return (session is not null ? collection.Find(session, filter) : collection.Find(filter)) + .Sort(sort) + .Limit(limit); + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static IFindFluent MaxOrderingIdQuery( + this IMongoCollection collection, + IClientSessionHandle? session, + long fromSequenceNr, + long toSequenceNr, + string? tag) + { + var builder = Builders.Filter; + + var filter = FilterDefinition.Empty; + if(!string.IsNullOrWhiteSpace(tag)) + filter &= builder.AnyEq(x => x.Tags, tag); + if (fromSequenceNr > 0) + filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); + if (toSequenceNr != long.MaxValue) + filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr)); + + return (session is not null ? collection.Find(session, filter) : collection.Find(filter)) + .SortByDescending(x => x.Ordering) + .Limit(1) + .Project(e => e.Ordering); + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static IFindFluent MessagesQuery( + this IMongoCollection collection, + IClientSessionHandle? session, + long fromSequenceNr, + long toSequenceNr, + long maxOrderingId, + string? tag, + int limit + ) + { + var builder = Builders.Filter; + var toSeqNo = Math.Min(toSequenceNr, maxOrderingId); + + var filter = FilterDefinition.Empty; + if(!string.IsNullOrWhiteSpace(tag)) + filter &= builder.AnyEq(x => x.Tags, tag); + if (fromSequenceNr > 0) + filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); + if (toSequenceNr != long.MaxValue) + filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo)); + + var sort = Builders.Sort.Ascending(x => x.Ordering); + + return (session is not null ? collection.Find(session, filter) : collection.Find(filter)) + .Sort(sort) + .Limit(limit); + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static IFindFluent MaxSequenceNrQuery( + this IMongoCollection collection, + IClientSessionHandle? session, + string persistenceId) + { + var filter = Builders.Filter.Eq(x => x.PersistenceId, persistenceId); + + return (session is not null ? collection.Find(session, filter) : collection.Find(filter)) + .Project(x => x.SequenceNr); + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static IFindFluent MaxSequenceNrQuery( + this IMongoCollection collection, + IClientSessionHandle? session, + string persistenceId) + { + var filter = Builders.Filter.Eq(x => x.PersistenceId, persistenceId); + + return (session is not null ? collection.Find(session, filter) : collection.Find(filter)) + .SortByDescending(x => x.SequenceNr) + .Project(x => x.SequenceNr); + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static async Task> AllPersistenceIdsQuery(this IMongoCollection collection, IClientSessionHandle? session, long offset, CancellationToken token) + { + IAsyncCursor idCursor; + if (session is not null) + { + idCursor = await collection + .DistinctAsync( + session: session, + field: x => x.PersistenceId, + filter: entry => entry.Ordering > new BsonTimestamp(offset), + cancellationToken: token); + } + else + { + idCursor = await collection + .DistinctAsync( + field: x => x.PersistenceId, + filter: entry => entry.Ordering > new BsonTimestamp(offset), + cancellationToken: token); + } + var hashset = new List(); + while (await idCursor.MoveNextAsync(token)) + { + hashset.AddRange(idCursor.Current); + } + + return hashset; + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static async Task HighestOrderingQuery(this IMongoCollection collection, IClientSessionHandle? session, CancellationToken token) + { + var max = await (session is not null ? collection.AsQueryable(session) : collection.AsQueryable()) + .Select(je => je.Ordering) + .MaxAsync(token); + + return max.Value; + } + + /// + /// NOTE: This query is meant to be a part of a persistence operation + /// The session parameter signals if this query is being called as part of a transaction block or not + /// NEVER CALL THIS METHOD OUTSIDE OF MaybeWithTransaction BLOCK + /// + public static async Task SetHighSequenceIdQuery(this IMongoCollection collection, IClientSessionHandle? session, string persistenceId, long maxSeqNo, CancellationToken token) + { + var builder = Builders.Filter; + var filter = builder.Eq(x => x.PersistenceId, persistenceId); + + var metadataEntry = new MetadataEntry + { + Id = persistenceId, + PersistenceId = persistenceId, + SequenceNr = maxSeqNo + }; + + if (session is not null) + { + await collection.ReplaceOneAsync( + session: session, + filter: filter, + replacement: metadataEntry, + options: new ReplaceOptions { IsUpsert = true }, + cancellationToken: token); + } + else + { + await collection.ReplaceOneAsync( + filter: filter, + replacement: metadataEntry, + options: new ReplaceOptions { IsUpsert = true }, + cancellationToken: token); + } + } + +} \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs b/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs index 30d5c00..8c3af70 100644 --- a/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs +++ b/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs @@ -78,7 +78,6 @@ namespace Akka.Persistence.MongoDb.Query switch (message) { case AllEventsPublisher.Continue _: - case NewEventAppended _: if (IsTimeForReplay) Replay(); return true; case Request _: @@ -114,7 +113,8 @@ namespace Akka.Persistence.MongoDb.Query persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, timestamp: replayed.Persistent.Timestamp, - @event: replayed.Persistent.Payload)); + @event: replayed.Persistent.Payload, + tags: Array.Empty())); CurrentOffset = replayed.Offset; Buffer.DeliverBuffer(TotalDemand); @@ -135,7 +135,6 @@ namespace Akka.Persistence.MongoDb.Query Context.Stop(Self); return true; case AllEventsPublisher.Continue _: - case NewEventAppended _: return true; default: return false; @@ -163,7 +162,6 @@ namespace Akka.Persistence.MongoDb.Query protected override void ReceiveInitialRequest() { - JournalRef.Tell(SubscribeNewEvents.Instance); Replay(); } diff --git a/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs b/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs index 1d857ca..645d3d6 100644 --- a/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs +++ b/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs @@ -95,7 +95,6 @@ namespace Akka.Persistence.MongoDb.Query switch (message) { case EventsByPersistenceIdPublisher.Continue _: - case EventAppended _: if (IsTimeForReplay) Replay(); break; case Request _: @@ -132,7 +131,8 @@ namespace Akka.Persistence.MongoDb.Query persistenceId: PersistenceId, sequenceNr: seqNr, timestamp: replayed.Persistent.Timestamp, - @event: replayed.Persistent.Payload)); + @event: replayed.Persistent.Payload, + tags: Array.Empty())); CurrentSequenceNr = seqNr + 1; Buffer.DeliverBuffer(TotalDemand); break; @@ -149,7 +149,6 @@ namespace Akka.Persistence.MongoDb.Query Buffer.DeliverBuffer(TotalDemand); break; case EventsByPersistenceIdPublisher.Continue _: - case EventAppended _: // Skip during replay break; case Cancel _: @@ -182,7 +181,6 @@ namespace Akka.Persistence.MongoDb.Query protected override void ReceiveInitialRequest() { - JournalRef.Tell(new SubscribePersistenceId(PersistenceId)); Replay(); } diff --git a/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs b/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs index 8d2829d..ec80637 100644 --- a/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs +++ b/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs @@ -90,7 +90,6 @@ namespace Akka.Persistence.MongoDb.Query switch (message) { case EventsByTagPublisher.Continue _: - case TaggedEventAppended _: if (IsTimeForReplay) Replay(); break; case Request _: @@ -126,7 +125,8 @@ namespace Akka.Persistence.MongoDb.Query persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, timestamp: replayed.Persistent.Timestamp, - @event: replayed.Persistent.Payload)); + @event: replayed.Persistent.Payload, + tags: new [] { replayed.Tag })); CurrentOffset = replayed.Offset; Buffer.DeliverBuffer(TotalDemand); @@ -144,7 +144,6 @@ namespace Akka.Persistence.MongoDb.Query Buffer.DeliverBuffer(TotalDemand); break; case EventsByTagPublisher.Continue _: - case TaggedEventAppended _: // ignore break; case Cancel _: @@ -179,7 +178,6 @@ namespace Akka.Persistence.MongoDb.Query protected override void ReceiveInitialRequest() { - JournalRef.Tell(new SubscribeTag(Tag)); Replay(); } diff --git a/src/Akka.Persistence.MongoDb/Query/QueryApi.cs b/src/Akka.Persistence.MongoDb/Query/QueryApi.cs index 6ba0f37..08dbf39 100644 --- a/src/Akka.Persistence.MongoDb/Query/QueryApi.cs +++ b/src/Akka.Persistence.MongoDb/Query/QueryApi.cs @@ -14,9 +14,7 @@ using System.Collections.Immutable; namespace Akka.Persistence.MongoDb.Query { - /// - /// TBD - /// + [Obsolete("Query is not implemented.")] public interface ISubscriptionCommand { } /// @@ -25,6 +23,7 @@ namespace Akka.Persistence.MongoDb.Query /// the subscriber when has been called. /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class SubscribePersistenceId : ISubscriptionCommand { /// @@ -46,6 +45,7 @@ namespace Akka.Persistence.MongoDb.Query /// TBD /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class EventAppended : IDeadLetterSuppression { /// @@ -107,6 +107,7 @@ namespace Akka.Persistence.MongoDb.Query /// the subscriber when `asyncWriteMessages` has been called. /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class SubscribeNewEvents : ISubscriptionCommand { public static SubscribeNewEvents Instance = new SubscribeNewEvents(); @@ -115,6 +116,7 @@ namespace Akka.Persistence.MongoDb.Query } [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class NewEventAppended : IDeadLetterSuppression { public static NewEventAppended Instance = new NewEventAppended(); @@ -130,6 +132,7 @@ namespace Akka.Persistence.MongoDb.Query /// via an . /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class SubscribeTag : ISubscriptionCommand { /// @@ -151,6 +154,7 @@ namespace Akka.Persistence.MongoDb.Query /// TBD /// [Serializable] + [Obsolete("Query is not implemented.", true)] public sealed class TaggedEventAppended : IDeadLetterSuppression { /// diff --git a/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs b/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs index 4c9a808..4485303 100644 --- a/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs +++ b/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs @@ -14,6 +14,7 @@ using Akka.Util; using MongoDB.Driver; using MongoDB.Driver.Linq; +#nullable enable namespace Akka.Persistence.MongoDb.Snapshot { /// @@ -21,8 +22,13 @@ namespace Akka.Persistence.MongoDb.Snapshot /// public class MongoDbSnapshotStore : SnapshotStore { + private static readonly ClientSessionOptions EmptySessionOptions = new(); + private readonly MongoDbSnapshotSettings _settings; - private Lazy> _snapshotCollection; + // ReSharper disable InconsistentNaming + private IMongoDatabase? _mongoDatabase_DoNotUseDirectly; + private IMongoCollection? _snapshotCollection_DoNotUseDirectly; + // ReSharper enable InconsistentNaming /// /// Used to cancel all outstanding commands when the actor is stopped. @@ -53,136 +59,162 @@ namespace Akka.Persistence.MongoDb.Snapshot return unitedCts; } - protected override void PreStart() + private async Task MaybeWithTransaction(Func act, CancellationToken token) { - base.PreStart(); - _snapshotCollection = new Lazy>(() => + if (!_settings.Transaction) { - MongoClient client; - IMongoDatabase snapshot; - var setupOption = Context.System.Settings.Setup.Get(); - if (!setupOption.HasValue || setupOption.Value.SnapshotConnectionSettings == null) + await act(null, token); + return; + } + + using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token); + await session.WithTransactionAsync( + async (s, ct) => { - //Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way: - var connectionString = new MongoUrl(_settings.ConnectionString); - var clientSettings = MongoClientSettings.FromUrl(connectionString); - clientSettings.LinqProvider = LinqProvider.V2; - client = new MongoClient(clientSettings); - snapshot = client.GetDatabase(connectionString.DatabaseName); - } - else - { - client = new MongoClient(setupOption.Value.SnapshotConnectionSettings); - snapshot = client.GetDatabase(setupOption.Value.SnapshotDatabaseName); - } + await act(s, ct); + return Task.FromResult(NotUsed.Instance); + }, cancellationToken:token); + } + + private async Task MaybeWithTransaction(Func> act, CancellationToken token) + { + if (!_settings.Transaction) + return await act(null, token); + + using var session = await GetMongoDb().Client.StartSessionAsync(EmptySessionOptions, token); + return await session.WithTransactionAsync(act, cancellationToken:token); + } + + private IMongoDatabase GetMongoDb() + { + if (_mongoDatabase_DoNotUseDirectly is not null) + return _mongoDatabase_DoNotUseDirectly; + + MongoClient client; + var setupOption = Context.System.Settings.Setup.Get(); + if (!setupOption.HasValue || setupOption.Value.SnapshotConnectionSettings == null) + { + //Default LinqProvider has been changed to LINQ3.LinqProvider can be changed back to LINQ2 in the following way: + var connectionString = new MongoUrl(_settings.ConnectionString); + var clientSettings = MongoClientSettings.FromUrl(connectionString); + clientSettings.LinqProvider = LinqProvider.V2; + client = new MongoClient(clientSettings); + _mongoDatabase_DoNotUseDirectly = client.GetDatabase(connectionString.DatabaseName); + return _mongoDatabase_DoNotUseDirectly; + } - var collection = snapshot.GetCollection(_settings.Collection); - if (_settings.AutoInitialize) - { - using var unitedCts = CreatePerCallCts(); - { - var modelWithAscendingPersistenceIdAndDescendingSequenceNr = - new CreateIndexModel(Builders.IndexKeys - .Ascending(entry => entry.PersistenceId) - .Descending(entry => entry.SequenceNr)); - - collection.Indexes - .CreateOneAsync(modelWithAscendingPersistenceIdAndDescendingSequenceNr, - cancellationToken: unitedCts.Token) - .Wait(); - } - } - - return collection; - }); + client = new MongoClient(setupOption.Value.SnapshotConnectionSettings); + _mongoDatabase_DoNotUseDirectly = client.GetDatabase(setupOption.Value.SnapshotDatabaseName); + return _mongoDatabase_DoNotUseDirectly; } + private async Task> GetSnapshotCollection(CancellationToken token) + { + _snapshotCollection_DoNotUseDirectly = GetMongoDb().GetCollection(_settings.Collection); + + if (!_settings.AutoInitialize) + return _snapshotCollection_DoNotUseDirectly; + + var modelWithAscendingPersistenceIdAndDescendingSequenceNr = + new CreateIndexModel(Builders.IndexKeys + .Ascending(entry => entry.PersistenceId) + .Descending(entry => entry.SequenceNr)); + + await _snapshotCollection_DoNotUseDirectly.Indexes + .CreateOneAsync(modelWithAscendingPersistenceIdAndDescendingSequenceNr, + cancellationToken: token); + + return _snapshotCollection_DoNotUseDirectly; + } + protected override void PostStop() { // cancel any pending database commands during shutdown _pendingCommandsCancellation.Cancel(); + _pendingCommandsCancellation.Dispose(); base.PostStop(); } - protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) { - var filter = CreateRangeFilter(persistenceId, criteria); - using var unitedCts = CreatePerCallCts(); - { + var snapshotCollection = await GetSnapshotCollection(unitedCts.Token); - return - _snapshotCollection.Value - .Find(filter) - .SortByDescending(x => x.SequenceNr) - .Limit(1) - .Project(x => ToSelectedSnapshot(x)) - .FirstOrDefaultAsync(unitedCts.Token); - } + return await MaybeWithTransaction(async (session, token) => + { + var filter = CreateRangeFilter(persistenceId, criteria); + return await (session is not null ? snapshotCollection.Find(session, filter) : snapshotCollection.Find(filter)) + .SortByDescending(x => x.SequenceNr) + .Limit(1) + .Project(x => ToSelectedSnapshot(x)) + .FirstOrDefaultAsync(token); + }, unitedCts.Token); } protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) { using var unitedCts = CreatePerCallCts(); + var snapshotCollection = await GetSnapshotCollection(unitedCts.Token); + + var snapshotEntry = ToSnapshotEntry(metadata, snapshot); + await MaybeWithTransaction(async (session, token) => { - var snapshotEntry = ToSnapshotEntry(metadata, snapshot); - if (_settings.Transaction) + if (session is not null) { - var sessionOptions = new ClientSessionOptions { }; - - using var session = - await _snapshotCollection.Value.Database.Client.StartSessionAsync( - sessionOptions, unitedCts.Token); - // Begin transaction - session.StartTransaction(); - try - { - await _snapshotCollection.Value.ReplaceOneAsync( - session, - CreateSnapshotIdFilter(snapshotEntry.Id), - snapshotEntry, - new ReplaceOptions { IsUpsert = true }, unitedCts.Token); - } - catch (Exception ex) - { - using var cancelCts = CreatePerCallCts(); - await session.AbortTransactionAsync(cancelCts.Token); - throw; - } - - await session.CommitTransactionAsync(unitedCts.Token); + await snapshotCollection.ReplaceOneAsync( + session: session, + filter: CreateSnapshotIdFilter(snapshotEntry.Id), + replacement: snapshotEntry, + options: new ReplaceOptions { IsUpsert = true }, + cancellationToken: token); } else - await _snapshotCollection.Value.ReplaceOneAsync( - CreateSnapshotIdFilter(snapshotEntry.Id), - snapshotEntry, - new ReplaceOptions { IsUpsert = true }, unitedCts.Token); - } + { + await snapshotCollection.ReplaceOneAsync( + filter: CreateSnapshotIdFilter(snapshotEntry.Id), + replacement: snapshotEntry, + options: new ReplaceOptions { IsUpsert = true }, + cancellationToken: token); + } + }, unitedCts.Token); } - protected override Task DeleteAsync(SnapshotMetadata metadata) + protected override async Task DeleteAsync(SnapshotMetadata metadata) { - var builder = Builders.Filter; - var filter = builder.Eq(x => x.PersistenceId, metadata.PersistenceId); - - if (metadata.SequenceNr > 0 && metadata.SequenceNr < long.MaxValue) - filter &= builder.Eq(x => x.SequenceNr, metadata.SequenceNr); - - if (metadata.Timestamp != DateTime.MinValue && metadata.Timestamp != DateTime.MaxValue) - filter &= builder.Eq(x => x.Timestamp, metadata.Timestamp.Ticks); - using var unitedCts = CreatePerCallCts(); + var snapshotCollection = await GetSnapshotCollection(unitedCts.Token); + + await MaybeWithTransaction(async (session, token) => { - return _snapshotCollection.Value.FindOneAndDeleteAsync(filter, cancellationToken:unitedCts.Token); - } + var builder = Builders.Filter; + var filter = builder.Eq(x => x.PersistenceId, metadata.PersistenceId); + + if (metadata.SequenceNr is > 0 and < long.MaxValue) + filter &= builder.Eq(x => x.SequenceNr, metadata.SequenceNr); + + if (metadata.Timestamp != DateTime.MinValue && metadata.Timestamp != DateTime.MaxValue) + filter &= builder.Eq(x => x.Timestamp, metadata.Timestamp.Ticks); + + if(session is not null) + await snapshotCollection.FindOneAndDeleteAsync(session, filter, cancellationToken: token); + else + await snapshotCollection.FindOneAndDeleteAsync(filter, cancellationToken: token); + }, unitedCts.Token); } - protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) { - var filter = CreateRangeFilter(persistenceId, criteria); - using var unitedCts = CreatePerCallCts(); - return _snapshotCollection.Value.DeleteManyAsync(filter, unitedCts.Token); + var snapshotCollection = await GetSnapshotCollection(unitedCts.Token); + + await MaybeWithTransaction(async (session, token) => + { + var filter = CreateRangeFilter(persistenceId, criteria); + if(session is not null) + await snapshotCollection.DeleteManyAsync(session, filter, cancellationToken: token); + else + await snapshotCollection.DeleteManyAsync(filter, token); + }, unitedCts.Token); } private static FilterDefinition CreateSnapshotIdFilter(string snapshotId) @@ -200,7 +232,7 @@ namespace Akka.Persistence.MongoDb.Snapshot var builder = Builders.Filter; var filter = builder.Eq(x => x.PersistenceId, persistenceId); - if (criteria.MaxSequenceNr > 0 && criteria.MaxSequenceNr < long.MaxValue) + if (criteria.MaxSequenceNr is > 0 and < long.MaxValue) filter &= builder.Lte(x => x.SequenceNr, criteria.MaxSequenceNr); if (criteria.MaxTimeStamp != DateTime.MinValue && criteria.MaxTimeStamp != DateTime.MaxValue) @@ -266,7 +298,7 @@ namespace Akka.Persistence.MongoDb.Snapshot } int? serializerId = null; - Type type = null; + Type? type = null; // legacy serialization if (!entry.SerializerId.HasValue && !string.IsNullOrEmpty(entry.Manifest)) diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 2418fd3..fa5ec4b 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -22,9 +22,6 @@ 1.5.12 1.5.12 - 2.5.0 - 17.6.3 - 6.11.0 diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 8f8717e..9379fe7 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -1,29 +1,29 @@ - - true - - - - - - - - - - - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - - - + + true + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + \ No newline at end of file