updated build scripts for Akka.Persistence.Cassandra

This commit is contained in:
Aaron Stannard 2015-06-28 11:11:40 -07:00
Родитель 2e48840a5d
Коммит f980679490
4 изменённых файлов: 313 добавлений и 471 удалений

366
README.md
Просмотреть файл

@ -1,88 +1,330 @@
## Akka.Persistence.PostgreSql
Akka.Persistence.Cassandra
==========================
A replicated journal and snapshot store implementation for Akka.Persistence backed by
[Apache Cassandra](http://planetcassandra.org/).
Akka Persistence journal and snapshot store backed by PostgreSql database.
**WARNING: Akka.Persistence.PostgreSql plugin is still in beta and it's mechanics described below may be still subject to change**.
### Setup
To activate the journal plugin, add the following lines to actor system configuration file:
**WARNING: The Akka.Persistence.Cassandra plugin is still in beta and the mechanics described below are subject to
change.**
Quick Start
-----------
To activate the journal plugin, add the following line to the actor system configuration file:
```
akka.persistence.journal.plugin = "akka.persistence.journal.postgresql"
akka.persistence.journal.postgresql.connection-string = "<database connection string>"
akka.persistence.journal.plugin = "cassandra-journal"
```
Similar configuration may be used to setup a PostgreSql snapshot store:
To activate the snapshot store plugin, add the following line to the actor system configuration file:
```
akka.persistence.snasphot-store.plugin = "akka.persistence.snasphot-store.postgresql"
akka.persistence.snasphot-store.postgresql.connection-string = "<database connection string>"
akka.persistence.snasphot-store.plugin = "cassandra-snapshot-store"
```
The default configuration will try to connect to a Cassandra cluster running on `127.0.0.1` for persisting messages
and snapshots. More information on the available configuration options is in the sections below.
Remember that connection string must be provided separately to Journal and Snapshot Store. To finish setup simply initialize plugin using: `PostgreSqlPersistence.Init(actorSystem);`
Connecting to the Cluster
-------------------------
Both the journal and the snapshot store plugins use the [DataStax .NET Driver](https://github.com/datastax/csharp-driver)
for Cassandra to communicate with the cluster. The driver has an `ISession` object which is used to execute statements
against the cluster (very similar to a `DbConnection` object in ADO.NET). You can control the creation and
configuration of these session instance(s) by modifying the configuration under `cassandra-sessions`. Out of the
box, both the journal and the snapshot store plugin will try to use a session called `default`. You can override
the settings for that session with the following configuration keys:
- `cassandra-sessions.default.contact-points`: A comma-seperated list of contact points in the cluster in the format
of either `host` or `host:port`. Default value is *`[ "127.0.0.1" ]`*.
- `cassandra-sessions.default.port`: Default port for contact points in the cluster, used if a contact point is not
in [host:port] format. Default value is *`9042`*.
- `cassandra-sessions.default.credentials.username`: The username to login to Cassandra hosts. No authentication is
used by default.
- `cassandra-sessions.default.credentials.password`: The password corresponding to the username. No authentication
is used by default.
- `cassandra-sessions.default.ssl`: Boolean value indicating whether to use SSL when connecting to the cluster. No
default value is set and so SSL is not used by default.
- `cassandra-sessions.default.compression`: The [type of compression](https://github.com/datastax/csharp-driver/blob/master/src/Cassandra/CompressionType.cs)
to use when communicating with the cluster. No default value is set and so compression is not used by default.
If you require more advanced configuration of the `ISession` object than the options provided here (for example, to
use a different session for the journal and snapshot store plugins or to configure the session via code or manage
it with an IoC container), see the [Advanced Session Management](#advanced-session-management) section below.
Journal
-------
### Features
- All operations of the journal plugin API are fully supported
- Uses Cassandra in a log-oriented way (i.e. data is only ever inserted but never updated)
- Uses marker records for permanent deletes to try and avoid the problem of [reading many tombstones](http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
when replaying messages.
- Messages for a single persistence Id are partitioned across the cluster to avoid unbounded partition
growth and support scalability by adding more nodes to the cluster.
### Configuration
As mentioned in the Quick Start section, you can activate the journal plugin by adding the following line to your
actor system configuration file:
```
akka.persistence.journal.plugin = "cassandra-journal"
```
You can also override the journal's default settings with the following configuration keys:
- `cassandra-journal.class`: The Type name of the Cassandra journal plugin. Default value is *`Akka.Persistence.Cassandra.Journal.CassandraJournal, Akka.Persistence.Cassandra`*.
- `cassandra-journal.session-key`: The name (key) of the session to use when resolving an `ISession` instance. When
using default session management, this points at a configuration section under `cassandra-sessions` where the
session's configuration is found. Default value is *`default`*.
- `cassandra-journal.use-quoted-identifiers`: Whether or not to quote the table and keyspace names when executing
statements against Cassandra. Default value is *`false`*.
- `cassandra-journal.keyspace`: The keyspace to be created/used by the journal. Default value is *`akkanet`*.
- `cassandra-journal.keyspace-creation-options`: A string to be appended to the `CREATE KEYSPACE` statement after
the `WITH` clause when the keyspace is automatically created. Use this to define options like the replication
strategy. Default value is *`REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`*.
- `cassandra-journal.keyspace-autocreate`: When true, the journal will automatically try to create the keyspace if
it doesn't already exist on startup. Default value is *`true`*.
- `cassandra-journal.table`: The name of the table to be created/used by the journal. Default value is *`messages`*.
- `cassandra-journal.table-creation-properties`: A string to be appended to the `CREATE TABLE` statement after the
`WITH` clause. Use this to define advanced table options like `gc_grace_seconds` or one of the other many table
options. Default value is *an empty string*.
- `cassandra-journal.partition-size`: The approximate number of message rows to store in a single partition. Cannot
be changed after table creation. Default value is *`5000000`*.
- `cassandra-journal.max-result-size`: The maximum number of messages to retrieve in a single request to Cassandra
when replaying messages. Default value is *`50001`*.
- `cassandra-journal.read-consistency`: The consistency level to use for read operations. Default value is *`Quorum`*.
- `cassandra-journal.write-consistency`: The consistency level to use for write operations. Default value is
*`Quorum`*.
Both journal and snapshot store share the same configuration keys (however they resides in separate scopes, so they are definied distinctly for either journal or snapshot store):
The default value for read and write consistency levels ensure that persistent actors can read their own writes.
Consider using `LocalQuorum` for both reads and writes if using a Cassandra cluster with multiple datacenters.
- `class` (string with fully qualified type name) - determines class to be used as a persistent journal. Default: *Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql* (for journal) and *Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql* (for snapshot store).
- `plugin-dispatcher` (string with configuration path) - describes a message dispatcher for persistent journal. Default: *akka.actor.default-dispatcher*
- `connection-string` - connection string used to access PostgreSql database. Default: *none*.
- `connection-timeout` - timespan determining default connection timeouts on database-related operations. Default: *30s*
- `schema-name` - name of the database schema, where journal or snapshot store tables should be placed. Default: *public*
- `table-name` - name of the table used by either journal or snapshot store. Default: *event_journal* (for journal) or *snapshot_store* (for snapshot store)
- `auto-initialize` - flag determining if journal or snapshot store related tables should by automatically created when they have not been found in connected database. Default: *false*
Snapshot Store
--------------
### Features
- Snapshot IO is done in a fully asynchronous fashion, including deletes (the snapshot store plugin API only
directly specifies synchronous methods for doing deletes)
### Custom SQL data queries
### Configuration
As mentioned in the Quick Start section, you can activate the snapshot store plugin by adding the following line
to your actor system configuration file:
```
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
```
You can also override the snapshot store's default settings with the following configuration keys:
- `cassandra-snapshot-store.class`: The Type name of the Cassandra snapshot store plugin. Default value is
*`Akka.Persistence.Cassandra.Snapshot.CassandraSnapshotStore, Akka.Persistence.Cassandra`*.
- `cassandra-snapshot-store.session-key`: The name (key) of the session to use when resolving an `ISession`
instance. When using default session management, this points at a configuration section under `cassandra-sessions`
where the session's configuration is found. Default value is *`default`*.
- `cassandra-snapshot-store.use-quoted-identifiers`: Whether or not to quote the table and keyspace names when
executing statements against Cassandra. Default value is *`false`*.
- `cassandra-snapshot-store.keyspace`: The keyspace to be created/used by the snapshot store. Default value is
*`akkanet`*.
- `cassandra-snapshot-store.keyspace-creation-options`: A string to be appended to the `CREATE KEYSPACE` statement
after the `WITH` clause when the keyspace is automatically created. Use this to define options like the replication
strategy. Default value is *`REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`*.
- `cassandra-snapshot-store.keyspace-autocreate`: When true, the snapshot store will automatically try to create
the keyspace if it doesn't already exist on startup. Default value is *`true`*.
- `cassandra-snapshot-store.table`: The name of the table to be created/used by the snapshot store. Default value
is *`snapshots`*.
- `cassandra-snapshot-store.table-creation-properties`: A string to be appended to the `CREATE TABLE` statement
after the `WITH` clause. Use this to define advanced table options like `gc_grace_seconds` or one of the other
many table options. Default value is *an empty string*.
- `cassandra-snapshot-store.max-metadata-result-size`: The maximum number of snapshot metadata instances to
retrieve in a single request when trying to find a snapshot that matches criteria. Default value is *`10`*.
- `cassandra-snapshot-store.read-consistency`: The consistency level to use for read operations. Default value
is *`One`*.
- `cassandra-snapshot-store.write-consistency`: The consistency level to use for write operations. Default value
is *`One`*.
PostgreSql persistence plugin defines a default table schema used for both journal and snapshot store.
Consider using `LocalOne` consistency level for both reads and writes if using a Cassandra cluster with multiple
datacenters.
**EventJournal table**:
Advanced Session Management
---------------------------
In some advanced scenarios, you may want to have more control over how `ISession` instances are created. Some
example scenarios might include:
- to use a different session instance for the journal and snapshot store plugins (i.e. maybe you have more than one
Cassandra cluster and are storing journal messages and snapshots in different clusters)
- to access more advanced configuration options for building the session instance in code using the DataStax
driver's cluster builder API directly
- to use session instances that have already been registered with an IoC container and are being managed there
+----------------+-------------+------------+---------------+---------+
| persistence_id | sequence_nr | is_deleted | payload_type | payload |
+----------------+-------------+------------+---------------+---------+
| varchar(200) | bigint | boolean | varchar(500) | bytea |
+----------------+-------------+------------+---------------+---------+
**SnapshotStore table**:
+----------------+--------------+--------------------------+------------------+---------------+----------+
| persistence_id | sequence_nr | created_at | created_at_ticks | snapshot_type | snapshot |
+----------------+--------------+--------------------------+------------------+--------------------------+
| varchar(200) | bigint | timestamp with time zone | smallint | varchar(500) | bytea |
+----------------+--------------+--------------------------+------------------+--------------------------+
If you want more control over how session instances are created or managed, you have two options depending on how
much control you need.
**created_at and created_at_ticks - The max precision of a PostgreSQL timestamp is 6. The max precision of a .Net DateTime object is 7. Because of this differences, the additional ticks are saved in a separate column and combined during deserialization. There is also a check constraint restricting created_at_ticks to the range [0,10) to ensure that there are no precision differences in the opposite direction.**
Underneath Akka.Persistence.PostgreSql uses the Npgsql library to communicate with the database. You may choose not to use a dedicated built in ones, but to create your own being better fit for your use case. To do so, you have to create your own versions of `IJournalQueryBuilder` and `IJournalQueryMapper` (for custom journals) or `ISnapshotQueryBuilder` and `ISnapshotQueryMapper` (for custom snapshot store) and then attach inside journal, just like in the example below:
```csharp
class MyCustomPostgreSqlJournal: Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal
{
public MyCustomPostgreSqlJournal() : base()
{
QueryBuilder = new MyCustomJournalQueryBuilder();
QueryMapper = new MyCustomJournalQueryMapper();
### Defining multiple session instances in the `cassandra-sessions` section
It is possible to define configuration for more than one session instance under the `cassandra-sessions` section of
your actor system's configuration file. To do this, just create your own section with a unique name/key for the
sub-section. All of the same options listed above in the [Connecting to the Cluster](#connecting-to-the-cluster)
can then be used to configure that session. For example, I might define seperate configurations for my journal and
snapshot store plugins like this:
```
cassandra-sessions {
my-journal-session {
contact-points = [ "10.1.1.1", "10.1.1.2" ]
port = 9042
credentials {
username = "myusername"
password = "mypassword"
}
}
my-snapshot-session {
contact-points = [ "10.2.1.1:9142", "10.2.1.2:9142" ]
}
}
```
The final step is to setup your custom journal using akka config:
I can then tell the journal and snapshot store plugins to use those sessions by overriding each plugin's `session-key`
configuration like this:
```
akka.persistence.journal.postgresql.class = "MyModule.MyCustomPostgreSqlJournal, MyModule"
cassandra-journal.session-key = "my-journal-session"
cassandra-snapshot-store.session-key = "my-snapshot-session"
```
### Controlling session configuration and management with code
You can also override how sessions are created, managed and resolved with your own code. Session management is
done as its own plugin for Akka.NET and a default implementation that uses the `cassandra-sessions` section is
provided out of the box. If you want to provide your own implementation for doing this (for example, to manage
sessions with an IoC container or use the DataStax driver's cluster builder API to do more advanced configuration),
here are the steps you'll need to follow:
1. Create a class that implements the `IManageSessions` interface from `Akka.Persistence.Cassandra.SessionManagement`.
This interface is simple and just requires that you provide a way for resolving and releasing session instances. For
example:
```cs
public class MySessionManager : IManageSessions
{
public override ISession ResolveSession(string key)
{
// Do something here to get the ISession instance (pull from IoC container, etc)
}
public override ISession ReleaseSession(ISession session)
{
// Do something here to release the session instance if necessary
}
}
```
1. Next, you'll need to create an extension id provider class by inheriting from
`ExtensionIdProvider<IManageSessions>`. This class is responsible for actually providing a copy of your
`IManageSessions` implementation. For example:
```cs
public class MySessionExtension : ExtensionIdProvider<IManageSessions>
{
public override IManageSessions CreateExtension(ExtendedActorSystem system)
{
// Return a copy of your implementation of IManageSessions
return new MySessionManager();
}
}
```
1. Lastly, you'll need to register your extension with the actor system when creating it in your application. For
example:
```cs
var actorSystem = ActorSystem.Create("MyApplicationActorSystem");
var extensionId = new MySessionExtension();
actorSystem.RegisterExtension(extensionId);
```
The journal and snapshot store plugins will now call your code when resolving or releasing sessions.
### Tests
The PostgreSql tests are packaged as a separate build task with a target of "RunPostgreSqlTests".
The Cassandra tests are packaged and run as part of the default "All" build task.
In order to run the tests, you must do the following things:
1. Download and install PostgreSql from: http://www.postgresql.org/download/
2. Install PostgreSql with the default settings. The default connection string uses the following credentials:
1. Username: postgres
2. Password: postgres
3. A custom app.config file can be used and needs to be placed in the same folder as the dll
1. Download and install DataStax Community Edition of Cassandra from http://planetcassandra.org/cassandra/
2. Install Cassandra with the default settings. The default connection string will connect to a Cassandra instance running at **127.0.0.1:9042** with no authentication. Here's the full default settings for an Akka.Persistence.Cassandra connection:
```xml
cassandra-sessions {
# The "default" Cassandra session, used by both the journal and snapshot store if not changed in
# the cassandra-journal and cassandra-snapshot-store configuration sections below
default {
# Comma-seperated list of contact points in the cluster in the format of either [host] or [host:port]
contact-points = [ "127.0.0.1" ]
# Default port for contact points in the cluster, used if a contact point is not in [host:port] format
port = 9042
}
}
cassandra-journal {
# Type name of the cassandra journal plugin
class = "Akka.Persistence.Cassandra.Journal.CassandraJournal, Akka.Persistence.Cassandra"
# The name (key) of the session to use when resolving an ISession instance. When using default session management,
# this points at configuration under the "cassandra-sessions" section where the session's configuration is found.
session-key = "default"
# Whether or not to quote table and keyspace names when executing statements against Cassandra
use-quoted-identifiers = false
# The keyspace to be created/used by the journal
keyspace = "akkanet"
# A string to be appended to the CREATE KEYSPACE statement after the WITH clause when the keyspace is
# automatically created. Use this to define options like replication strategy.
keyspace-creation-options = "REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
# When true the journal will automatically try to create the keyspace if it doesn't already exist on start
keyspace-autocreate = true
# Name of the table to be created/used by the journal
table = "messages"
# A string to be appended to the CREATE TABLE statement after the WITH clause. Use this to define things
# like gc_grace_seconds or one of the many other table options.
table-creation-properties = ""
# The approximate number of rows per partition to use. Cannot be changed after table creation.
partition-size = 5000000
# The maximum number of messages to retrieve in one request when replaying messages
max-result-size = 50001
# Consistency level for reads
read-consistency = "Quorum"
# Consistency level for writes
write-consistency = "Quorum"
}
cassandra-snapshot-store {
# Type name of the cassandra snapshot store plugin
class = "Akka.Persistence.Cassandra.Snapshot.CassandraSnapshotStore, Akka.Persistence.Cassandra"
# The name (key) of the session to use when resolving an ISession instance. When using default session management,
# this points at configuration under the "cassandra-sessions" section where the session's configuration is found.
session-key = "default"
# Whether or not to quote table and keyspace names when executing statements against Cassandra
use-quoted-identifiers = false
# The keyspace to be created/used by the snapshot store
keyspace = "akkanet"
# A string to be appended to the CREATE KEYSPACE statement after the WITH clause when the keyspace is
# automatically created. Use this to define options like replication strategy.
keyspace-creation-options = "REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
# When true the journal will automatically try to create the keyspace if it doesn't already exist on start
keyspace-autocreate = true
# Name of the table to be created/used by the snapshot store
table = "snapshots"
# A string to be appended to the CREATE TABLE statement after the WITH clause. Use this to define things
# like gc_grace_seconds or one of the many other table options.
table-creation-properties = ""
# The maximum number of snapshot metadata instances to retrieve in a single request when trying to find a
# snapshot that matches the criteria
max-metadata-result-size = 10
# Consistency level for reads
read-consistency = "One"
# Consistency level for writes
write-consistency = "One"
}
```

191
build.fsx
Просмотреть файл

@ -7,8 +7,6 @@ open System.IO
open System.Text
open Fake
open Fake.FileUtils
open Fake.MSTest
open Fake.NUnitCommon
open Fake.TaskRunnerHelper
open Fake.ProcessHelper
@ -26,9 +24,6 @@ let company = "Akka.NET Team"
let description = "Akka.NET is a port of the popular Java/Scala framework Akka to .NET"
let tags = ["akka";"actors";"actor";"model";"Akka";"concurrency"]
let configuration = "Release"
let toolDir = "tools"
let CloudCopyDir = toolDir @@ "CloudCopy"
let AzCopyDir = toolDir @@ "AzCopy"
// Read release notes and version
@ -57,11 +52,11 @@ let nugetDir = binDir @@ "nuget"
let workingDir = binDir @@ "build"
let libDir = workingDir @@ @"lib\net45\"
let nugetExe = FullName @"src\.nuget\NuGet.exe"
let docDir = "bin" @@ "doc"
let slnFile = "./src/Akka.Persistence.Cassandra.sln"
open Fake.RestorePackageHelper
Target "RestorePackages" (fun _ ->
"./src/Akka.sln"
slnFile
|> RestoreMSSolutionPackages (fun p ->
{ p with
OutputPath = "./src/packages"
@ -87,80 +82,16 @@ Target "AssemblyInfo" <| fun _ ->
Attribute.Version version
Attribute.FileVersion version ] <| AssemblyInfoFileConfig(false)
for file in !! "src/**/AssemblyInfo.fs" do
let title =
file
|> Path.GetDirectoryName
|> Path.GetDirectoryName
|> Path.GetFileName
CreateFSharpAssemblyInfo file [
Attribute.Title title
Attribute.Product product
Attribute.Description description
Attribute.Copyright copyright
Attribute.Company company
Attribute.ComVisible false
Attribute.CLSCompliant true
Attribute.Version version
Attribute.FileVersion version ]
//--------------------------------------------------------------------------------
// Build the solution
Target "Build" <| fun _ ->
!!"src/Akka.sln"
!! slnFile
|> MSBuildRelease "" "Rebuild"
|> ignore
Target "BuildMono" <| fun _ ->
!!"src/Akka.sln"
|> MSBuild "" "Rebuild" [("Configuration","Release Mono")]
|> ignore
//--------------------------------------------------------------------------------
// Build the docs
Target "Docs" <| fun _ ->
!! "documentation/akkadoc.shfbproj"
|> MSBuildRelease "" "Rebuild"
|> ignore
//--------------------------------------------------------------------------------
// Push DOCs content to Windows Azure blob storage
Target "AzureDocsDeploy" (fun _ ->
let rec pushToAzure docDir azureUrl container azureKey trialsLeft =
let tracing = enableProcessTracing
enableProcessTracing <- false
let arguments = sprintf "/Source:%s /Dest:%s /DestKey:%s /S /Y /SetContentType" (Path.GetFullPath docDir) (azureUrl @@ container) azureKey
tracefn "Pushing docs to %s. Attempts left: %d" (azureUrl) trialsLeft
try
let result = ExecProcess(fun info ->
info.FileName <- AzCopyDir @@ "AzCopy.exe"
info.Arguments <- arguments) (TimeSpan.FromMinutes 120.0) //takes a very long time to upload
if result <> 0 then failwithf "Error during AzCopy.exe upload to azure."
with exn ->
if (trialsLeft > 0) then (pushToAzure docDir azureUrl container azureKey (trialsLeft-1))
else raise exn
let canPush = hasBuildParam "azureKey" && hasBuildParam "azureUrl"
if (canPush) then
printfn "Uploading API docs to Azure..."
let azureUrl = getBuildParam "azureUrl"
let azureKey = (getBuildParam "azureKey") + "==" //hack, because it looks like FAKE arg parsing chops off the "==" that gets tacked onto the end of each Azure storage key
if(isUnstableDocs) then
pushToAzure docDir azureUrl "unstable" azureKey 3
if(not isUnstableDocs) then
pushToAzure docDir azureUrl "stable" azureKey 3
pushToAzure docDir azureUrl release.NugetVersion azureKey 3
if(not canPush) then
printfn "Missing required paraments to push docs to Azure. Run build HelpDocs to find out!"
)
Target "PublishDocs" DoNothing
//--------------------------------------------------------------------------------
// Copy the build output to bin directory
@ -172,26 +103,7 @@ Target "CopyOutput" <| fun _ ->
let src = "src" @@ project @@ @"bin/Release/"
let dst = binDir @@ project
CopyDir dst src allFiles
[ "core/Akka"
"core/Akka.FSharp"
"core/Akka.TestKit"
"core/Akka.Remote"
"core/Akka.Remote.TestKit"
"core/Akka.Cluster"
"core/Akka.MultiNodeTestRunner"
"core/Akka.Persistence"
"core/Akka.Persistence.FSharp"
"core/Akka.Persistence.TestKit"
"contrib/loggers/Akka.Logger.slf4net"
"contrib/loggers/Akka.Logger.NLog"
"contrib/loggers/Akka.Logger.Serilog"
"contrib/dependencyinjection/Akka.DI.Core"
"contrib/dependencyinjection/Akka.DI.AutoFac"
"contrib/dependencyinjection/Akka.DI.CastleWindsor"
"contrib/dependencyinjection/Akka.DI.Ninject"
"contrib/testkits/Akka.TestKit.Xunit"
"contrib/testkits/Akka.TestKit.NUnit"
"contrib/testkits/Akka.TestKit.Xunit2"
[ "Akka.Persistence.Cassandra"
]
|> List.iter copyOutput
@ -213,32 +125,7 @@ Target "CleanTests" <| fun _ ->
open XUnit2Helper
Target "RunTests" <| fun _ ->
let msTestAssemblies = !! "src/**/bin/Release/Akka.TestKit.VsTest.Tests.dll"
let nunitTestAssemblies = !! "src/**/bin/Release/Akka.TestKit.NUnit.Tests.dll"
let xunitTestAssemblies = !! "src/**/bin/Release/*.Tests.dll" --
"src/**/bin/Release/Akka.TestKit.VsTest.Tests.dll" --
"src/**/bin/Release/Akka.TestKit.NUnit.Tests.dll" --
"src/**/bin/Release/Akka.Persistence.SqlServer.Tests.dll" --
"src/**/bin/Release/Akka.Persistence.PostgreSql.Tests.dll" --
"src/**/bin/Release/Akka.Persistence.Cassandra.Tests.dll"
mkdir testOutput
MSTest (fun p -> p) msTestAssemblies
nunitTestAssemblies
|> NUnit (fun p ->
{p with
DisableShadowCopy = true;
OutputFile = testOutput + @"\NUnitTestResults.xml"})
let xunitToolPath = findToolInSubPath "xunit.console.exe" "src/packages/xunit.runner.console*/tools"
printfn "Using XUnit runner: %s" xunitToolPath
xUnit2
(fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath })
xunitTestAssemblies
Target "RunTestsMono" <| fun _ ->
let xunitTestAssemblies = !! "src/**/bin/Release Mono/*.Tests.dll"
let xunitTestAssemblies = !! "src/**/bin/Release/*.Tests.dll"
mkdir testOutput
@ -248,48 +135,6 @@ Target "RunTestsMono" <| fun _ ->
(fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath })
xunitTestAssemblies
Target "MultiNodeTests" <| fun _ ->
let multiNodeTestPath = findToolInSubPath "Akka.MultiNodeTestRunner.exe" "bin/core/Akka.MultiNodeTestRunner*"
printfn "Using MultiNodeTestRunner: %s" multiNodeTestPath
let spec = getBuildParam "spec"
let args = new StringBuilder()
|> append "Akka.MultiNodeTests.dll"
|> append "-Dmultinode.enable-filesink=on"
|> appendIfNotNullOrEmpty spec "-Dmultinode.test-spec="
|> toText
let result = ExecProcess(fun info ->
info.FileName <- multiNodeTestPath
info.WorkingDirectory <- (Path.GetDirectoryName (FullName multiNodeTestPath))
info.Arguments <- args) (System.TimeSpan.FromMinutes 60.0) (* This is a VERY long running task. *)
if result <> 0 then failwithf "MultiNodeTestRunner failed. %s %s" multiNodeTestPath args
Target "RunSqlServerTests" <| fun _ ->
let sqlServerTests = !! "src/**/bin/Release/Akka.Persistence.SqlServer.Tests.dll"
let xunitToolPath = findToolInSubPath "xunit.console.exe" "src/packages/xunit.runner.console*/tools"
printfn "Using XUnit runner: %s" xunitToolPath
xUnit
(fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath })
sqlServerTests
Target "RunPostgreSqlTests" <| fun _ ->
let postgreSqlTests = !! "src/**/bin/Release/Akka.Persistence.PostgreSql.Tests.dll"
let xunitToolPath = findToolInSubPath "xunit.console.exe" "src/packages/xunit.runner.console*/tools"
printfn "Using XUnit runner: %s" xunitToolPath
xUnit2
(fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath })
postgreSqlTests
Target "RunCassandraTests" <| fun _ ->
let cassandraTests = !! "src/**/bin/Release/Akka.Persistence.Cassandra.Tests.dll"
let xunitToolPath = findToolInSubPath "xunit.console.exe" "src/packages/xunit.runner.console*/tools"
printfn "Using XUnit runner: %s" xunitToolPath
xUnit2
(fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath })
cassandraTests
//--------------------------------------------------------------------------------
// Nuget targets
//--------------------------------------------------------------------------------
@ -298,15 +143,9 @@ module Nuget =
// add Akka dependency for other projects
let getAkkaDependency project =
match project with
| "Akka" -> []
| "Akka.Cluster" -> ["Akka.Remote", release.NugetVersion]
| persistence when (persistence.Contains("Sql") && not (persistence.Equals("Akka.Persistence.Sql.Common"))) -> ["Akka.Persistence.Sql.Common", preReleaseVersion]
| persistence when (persistence.StartsWith("Akka.Persistence.")) -> ["Akka.Persistence", preReleaseVersion]
| di when (di.StartsWith("Akka.DI.") && not (di.EndsWith("Core"))) -> ["Akka.DI.Core", release.NugetVersion]
| testkit when testkit.StartsWith("Akka.TestKit.") -> ["Akka.TestKit", release.NugetVersion]
| _ -> ["Akka", release.NugetVersion]
| _ -> []
// used to add -pre suffix to pre-release packages
// used to add -pre suffix to pre-release packages
let getProjectVersion project =
match project with
| "Akka.Cluster" -> preReleaseVersion
@ -390,8 +229,7 @@ let createNugetPackages _ =
// Create both normal nuget package and symbols nuget package.
// Uses the files we copied to workingDir and outputs to nugetdir
pack nugetDir NugetSymbolPackage.Nuspec
removeDir workingDir
let publishNugetPackages _ =
let rec publishPackage url accessKey trialsLeft packageFile =
@ -554,22 +392,9 @@ Target "HelpDocs" <| fun _ ->
"CleanNuget" ==> "CreateNuget"
"CleanNuget" ==> "BuildRelease" ==> "Nuget"
//docs dependencies
"BuildRelease" ==> "Docs" ==> "AzureDocsDeploy" ==> "PublishDocs"
Target "All" DoNothing
"BuildRelease" ==> "All"
"RunTests" ==> "All"
"MultiNodeTests" ==> "All"
"Nuget" ==> "All"
Target "AllTests" DoNothing //used for Mono builds, due to Mono 4.0 bug with FAKE / NuGet https://github.com/fsharp/fsharp/issues/427
"BuildRelease" ==> "AllTests"
"RunTests" ==> "AllTests"
"MultiNodeTests" ==> "AllTests"
"BuildRelease" ==> "RunSqlServerTests"
"BuildRelease" ==> "RunPostgreSqlTests"
"BuildRelease" ==> "RunCassandraTests"
RunTargetOrDefault "Help"

Просмотреть файл

@ -13,8 +13,7 @@ mono $SCRIPT_PATH/src/.nuget/NuGet.exe update -self
mono $SCRIPT_PATH/src/.nuget/NuGet.exe install FAKE -OutputDirectory $SCRIPT_PATH/src/packages -ExcludeVersion -Version 3.28.8
mono $SCRIPT_PATH/src/.nuget/NuGet.exe install xunit.runners -OutputDirectory $SCRIPT_PATH/src/packages/FAKE -ExcludeVersion -Version 1.9.2
mono $SCRIPT_PATH/src/.nuget/NuGet.exe install nunit.runners -OutputDirectory $SCRIPT_PATH/src/packages/FAKE -ExcludeVersion -Version 2.6.4
mono $SCRIPT_PATH/src/.nuget/NuGet.exe install xunit.runners -OutputDirectory $SCRIPT_PATH/src/packages/FAKE -ExcludeVersion -Version 2.0.0
if ! [ -e $SCRIPT_PATH/src/packages/SourceLink.Fake/tools/SourceLink.fsx ] ; then
mono $SCRIPT_PATH/src/.nuget/NuGet.exe install SourceLink.Fake -OutputDirectory $SCRIPT_PATH/src/packages -ExcludeVersion

Просмотреть файл

@ -1,224 +0,0 @@
Akka.Persistence.Cassandra
==========================
A replicated journal and snapshot store implementation for Akka.Persistence backed by
[Apache Cassandra](http://planetcassandra.org/).
**WARNING: The Akka.Persistence.Cassandra plugin is still in beta and the mechanics described below are subject to
change.**
Quick Start
-----------
To activate the journal plugin, add the following line to the actor system configuration file:
```
akka.persistence.journal.plugin = "cassandra-journal"
```
To activate the snapshot store plugin, add the following line to the actor system configuration file:
```
akka.persistence.snasphot-store.plugin = "cassandra-snapshot-store"
```
The default configuration will try to connect to a Cassandra cluster running on `127.0.0.1` for persisting messages
and snapshots. More information on the available configuration options is in the sections below.
Connecting to the Cluster
-------------------------
Both the journal and the snapshot store plugins use the [DataStax .NET Driver](https://github.com/datastax/csharp-driver)
for Cassandra to communicate with the cluster. The driver has an `ISession` object which is used to execute statements
against the cluster (very similar to a `DbConnection` object in ADO.NET). You can control the creation and
configuration of these session instance(s) by modifying the configuration under `cassandra-sessions`. Out of the
box, both the journal and the snapshot store plugin will try to use a session called `default`. You can override
the settings for that session with the following configuration keys:
- `cassandra-sessions.default.contact-points`: A comma-seperated list of contact points in the cluster in the format
of either `host` or `host:port`. Default value is *`[ "127.0.0.1" ]`*.
- `cassandra-sessions.default.port`: Default port for contact points in the cluster, used if a contact point is not
in [host:port] format. Default value is *`9042`*.
- `cassandra-sessions.default.credentials.username`: The username to login to Cassandra hosts. No authentication is
used by default.
- `cassandra-sessions.default.credentials.password`: The password corresponding to the username. No authentication
is used by default.
- `cassandra-sessions.default.ssl`: Boolean value indicating whether to use SSL when connecting to the cluster. No
default value is set and so SSL is not used by default.
- `cassandra-sessions.default.compression`: The [type of compression](https://github.com/datastax/csharp-driver/blob/master/src/Cassandra/CompressionType.cs)
to use when communicating with the cluster. No default value is set and so compression is not used by default.
If you require more advanced configuration of the `ISession` object than the options provided here (for example, to
use a different session for the journal and snapshot store plugins or to configure the session via code or manage
it with an IoC container), see the [Advanced Session Management](#advanced-session-management) section below.
Journal
-------
### Features
- All operations of the journal plugin API are fully supported
- Uses Cassandra in a log-oriented way (i.e. data is only ever inserted but never updated)
- Uses marker records for permanent deletes to try and avoid the problem of [reading many tombstones](http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
when replaying messages.
- Messages for a single persistence Id are partitioned across the cluster to avoid unbounded partition
growth and support scalability by adding more nodes to the cluster.
### Configuration
As mentioned in the Quick Start section, you can activate the journal plugin by adding the following line to your
actor system configuration file:
```
akka.persistence.journal.plugin = "cassandra-journal"
```
You can also override the journal's default settings with the following configuration keys:
- `cassandra-journal.class`: The Type name of the Cassandra journal plugin. Default value is *`Akka.Persistence.Cassandra.Journal.CassandraJournal, Akka.Persistence.Cassandra`*.
- `cassandra-journal.session-key`: The name (key) of the session to use when resolving an `ISession` instance. When
using default session management, this points at a configuration section under `cassandra-sessions` where the
session's configuration is found. Default value is *`default`*.
- `cassandra-journal.use-quoted-identifiers`: Whether or not to quote the table and keyspace names when executing
statements against Cassandra. Default value is *`false`*.
- `cassandra-journal.keyspace`: The keyspace to be created/used by the journal. Default value is *`akkanet`*.
- `cassandra-journal.keyspace-creation-options`: A string to be appended to the `CREATE KEYSPACE` statement after
the `WITH` clause when the keyspace is automatically created. Use this to define options like the replication
strategy. Default value is *`REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`*.
- `cassandra-journal.keyspace-autocreate`: When true, the journal will automatically try to create the keyspace if
it doesn't already exist on startup. Default value is *`true`*.
- `cassandra-journal.table`: The name of the table to be created/used by the journal. Default value is *`messages`*.
- `cassandra-journal.table-creation-properties`: A string to be appended to the `CREATE TABLE` statement after the
`WITH` clause. Use this to define advanced table options like `gc_grace_seconds` or one of the other many table
options. Default value is *an empty string*.
- `cassandra-journal.partition-size`: The approximate number of message rows to store in a single partition. Cannot
be changed after table creation. Default value is *`5000000`*.
- `cassandra-journal.max-result-size`: The maximum number of messages to retrieve in a single request to Cassandra
when replaying messages. Default value is *`50001`*.
- `cassandra-journal.read-consistency`: The consistency level to use for read operations. Default value is *`Quorum`*.
- `cassandra-journal.write-consistency`: The consistency level to use for write operations. Default value is
*`Quorum`*.
The default value for read and write consistency levels ensure that persistent actors can read their own writes.
Consider using `LocalQuorum` for both reads and writes if using a Cassandra cluster with multiple datacenters.
Snapshot Store
--------------
### Features
- Snapshot IO is done in a fully asynchronous fashion, including deletes (the snapshot store plugin API only
directly specifies synchronous methods for doing deletes)
### Configuration
As mentioned in the Quick Start section, you can activate the snapshot store plugin by adding the following line
to your actor system configuration file:
```
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
```
You can also override the snapshot store's default settings with the following configuration keys:
- `cassandra-snapshot-store.class`: The Type name of the Cassandra snapshot store plugin. Default value is
*`Akka.Persistence.Cassandra.Snapshot.CassandraSnapshotStore, Akka.Persistence.Cassandra`*.
- `cassandra-snapshot-store.session-key`: The name (key) of the session to use when resolving an `ISession`
instance. When using default session management, this points at a configuration section under `cassandra-sessions`
where the session's configuration is found. Default value is *`default`*.
- `cassandra-snapshot-store.use-quoted-identifiers`: Whether or not to quote the table and keyspace names when
executing statements against Cassandra. Default value is *`false`*.
- `cassandra-snapshot-store.keyspace`: The keyspace to be created/used by the snapshot store. Default value is
*`akkanet`*.
- `cassandra-snapshot-store.keyspace-creation-options`: A string to be appended to the `CREATE KEYSPACE` statement
after the `WITH` clause when the keyspace is automatically created. Use this to define options like the replication
strategy. Default value is *`REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`*.
- `cassandra-snapshot-store.keyspace-autocreate`: When true, the snapshot store will automatically try to create
the keyspace if it doesn't already exist on startup. Default value is *`true`*.
- `cassandra-snapshot-store.table`: The name of the table to be created/used by the snapshot store. Default value
is *`snapshots`*.
- `cassandra-snapshot-store.table-creation-properties`: A string to be appended to the `CREATE TABLE` statement
after the `WITH` clause. Use this to define advanced table options like `gc_grace_seconds` or one of the other
many table options. Default value is *an empty string*.
- `cassandra-snapshot-store.max-metadata-result-size`: The maximum number of snapshot metadata instances to
retrieve in a single request when trying to find a snapshot that matches criteria. Default value is *`10`*.
- `cassandra-snapshot-store.read-consistency`: The consistency level to use for read operations. Default value
is *`One`*.
- `cassandra-snapshot-store.write-consistency`: The consistency level to use for write operations. Default value
is *`One`*.
Consider using `LocalOne` consistency level for both reads and writes if using a Cassandra cluster with multiple
datacenters.
Advanced Session Management
---------------------------
In some advanced scenarios, you may want to have more control over how `ISession` instances are created. Some
example scenarios might include:
- to use a different session instance for the journal and snapshot store plugins (i.e. maybe you have more than one
Cassandra cluster and are storing journal messages and snapshots in different clusters)
- to access more advanced configuration options for building the session instance in code using the DataStax
driver's cluster builder API directly
- to use session instances that have already been registered with an IoC container and are being managed there
If you want more control over how session instances are created or managed, you have two options depending on how
much control you need.
### Defining multiple session instances in the `cassandra-sessions` section
It is possible to define configuration for more than one session instance under the `cassandra-sessions` section of
your actor system's configuration file. To do this, just create your own section with a unique name/key for the
sub-section. All of the same options listed above in the [Connecting to the Cluster](#connecting-to-the-cluster)
can then be used to configure that session. For example, I might define seperate configurations for my journal and
snapshot store plugins like this:
```
cassandra-sessions {
my-journal-session {
contact-points = [ "10.1.1.1", "10.1.1.2" ]
port = 9042
credentials {
username = "myusername"
password = "mypassword"
}
}
my-snapshot-session {
contact-points = [ "10.2.1.1:9142", "10.2.1.2:9142" ]
}
}
```
I can then tell the journal and snapshot store plugins to use those sessions by overriding each plugin's `session-key`
configuration like this:
```
cassandra-journal.session-key = "my-journal-session"
cassandra-snapshot-store.session-key = "my-snapshot-session"
```
### Controlling session configuration and management with code
You can also override how sessions are created, managed and resolved with your own code. Session management is
done as its own plugin for Akka.NET and a default implementation that uses the `cassandra-sessions` section is
provided out of the box. If you want to provide your own implementation for doing this (for example, to manage
sessions with an IoC container or use the DataStax driver's cluster builder API to do more advanced configuration),
here are the steps you'll need to follow:
1. Create a class that implements the `IManageSessions` interface from `Akka.Persistence.Cassandra.SessionManagement`.
This interface is simple and just requires that you provide a way for resolving and releasing session instances. For
example:
```cs
public class MySessionManager : IManageSessions
{
public override ISession ResolveSession(string key)
{
// Do something here to get the ISession instance (pull from IoC container, etc)
}
public override ISession ReleaseSession(ISession session)
{
// Do something here to release the session instance if necessary
}
}
```
1. Next, you'll need to create an extension id provider class by inheriting from
`ExtensionIdProvider<IManageSessions>`. This class is responsible for actually providing a copy of your
`IManageSessions` implementation. For example:
```cs
public class MySessionExtension : ExtensionIdProvider<IManageSessions>
{
public override IManageSessions CreateExtension(ExtendedActorSystem system)
{
// Return a copy of your implementation of IManageSessions
return new MySessionManager();
}
}
```
1. Lastly, you'll need to register your extension with the actor system when creating it in your application. For
example:
```cs
var actorSystem = ActorSystem.Create("MyApplicationActorSystem");
var extensionId = new MySessionExtension();
actorSystem.RegisterExtension(extensionId);
```
The journal and snapshot store plugins will now call your code when resolving or releasing sessions.