* overhaul state API to allow state initialization (#9)

* add simple load test to local tests.

* fix ride-sharing benchmark

* Add Functions Host for tests, add simple load tests for AF and DF

* update placement support

* update and fix samples

* update functions host

* fix logging in emulator

* revise handling of activities to support placement and locking
This commit is contained in:
Sebastian Burckhardt 2019-01-29 10:58:57 -08:00 коммит произвёл GitHub
Родитель 66b88523dd
Коммит 42cf23ca79
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
119 изменённых файлов: 3320 добавлений и 1100 удалений

Просмотреть файл

@ -19,45 +19,33 @@ namespace Bank.Service
public string Owner;
[DataMember]
public DateTime? Created;
public DateTime Created;
[DataMember]
public DateTime? LastModified;
public DateTime LastModified;
[DataMember]
public int Balance;
[DataMember]
AccountType Type;
public enum AccountType { Checking, Savings }
public void On(ISubscriptionContext<Guid> context, UserSignedUp evt)
{
Owner = evt.UserId;
Owner = evt.UserId;
Created = evt.Timestamp;
LastModified = evt.Timestamp;
if (evt.CheckingAccountId == context.Key)
{
Type = AccountType.Checking;
Balance = 10;
}
else if (evt.SavingsAccountId == context.Key)
{
Type = AccountType.Savings;
Balance = 0;
}
Balance = 0;
}
public void On(ISubscriptionContext<Guid> context, AmountTransferred evt)
{
// if this account is the source of the transfer, decrease balance by amount
if (evt.TransferRequest.FromAccount == context.Key)
{
Balance -= evt.TransferRequest.Amount;
}
else if (evt.TransferRequest.ToAccount == context.Key)
// if this account is the destination of the transfer, increase balance by amount
if (evt.TransferRequest.ToAccount == context.Key)
{
Balance += evt.TransferRequest.Amount;
}

Просмотреть файл

@ -11,8 +11,8 @@ using System.Threading.Tasks;
namespace Bank.Service
{
[DataContract]
public class CheckAccount :
IRead<AccountState, CheckAccount.Response>,
public class GetAccountInfo :
IRead<AccountState, GetAccountInfo.Response>,
IAccountAffinity
{
public Guid AccountId { get; set; }
@ -20,9 +20,6 @@ namespace Bank.Service
[DataContract]
public class Response
{
[DataMember]
public bool Exists;
[DataMember]
public int Balance;
@ -32,20 +29,11 @@ namespace Bank.Service
public Response Execute(IReadContext<AccountState> context)
{
var accountInfo = context.State;
if (!accountInfo.Created.HasValue)
return new Response()
{
return null;
}
else
{
return new Response()
{
Balance = accountInfo.Balance,
Owner = accountInfo.Owner
};
}
Balance = context.State.Balance,
Owner = context.State.Owner
};
}
}
}

Просмотреть файл

@ -28,26 +28,36 @@ namespace Bank.Service
[Lock]
public async Task<int> Execute(IOrchestrationContext context)
{
// first, perform authentication
await context.PerformRead(new Authentication()
// first, check the supplied credentials
bool credentialsValidated = false;
try
{
UserId = UserId,
Credentials = Credentials
});
// check the account
var accountInfo = await context.PerformRead(
new CheckAccount()
credentialsValidated = await context.PerformRead(new CheckCredentials()
{
AccountId = AccountId
UserId = UserId,
Credentials = Credentials
});
if (accountInfo.Owner != UserId)
{
throw new InvalidOperationException("only owner of account can check balance");
}
catch (KeyNotFoundException) { }
return accountInfo.Balance;
if (!credentialsValidated)
throw new InvalidOperationException("Unauthorized");
// if the specified account exists and is owned by this user, return balance
try
{
var accountInfo = await context.PerformRead(
new GetAccountInfo()
{
AccountId = AccountId
});
if (accountInfo.Owner == UserId)
return accountInfo.Balance;
}
catch (KeyNotFoundException) { }
throw new InvalidOperationException("no such account");
}
}
}

Просмотреть файл

@ -43,21 +43,19 @@ namespace Bank.Service
[Lock]
public async Task<UnitType> Execute(IOrchestrationContext context)
{
var t1 = context.PerformRead(new CheckUseridAvailable() { UserId = UserId });
var t2 = context.PerformRead(new CheckAccount() { AccountId = CheckingAccountId });
var t3 = context.PerformRead(new CheckAccount() { AccountId = SavingsAccountId });
var t4 = context.ReadDateTimeUtcNow();
// we want to check that none of the ids clash with existing ones
var userExists = context.StateExists<UserState,IUserAffinity,string>(UserId);
var checkingExists = context.StateExists<AccountState,IAccountAffinity,Guid>(CheckingAccountId);
var savingsExists = context.StateExists<AccountState, IAccountAffinity, Guid>(SavingsAccountId);
var available = await t1;
var clash1 = (await t2) != null;
var clash2 = (await t3) != null;
DateTime timestamp = await t4;
if (! available)
// we want to record a timestamp for the creation
var timestamp = await context.ReadDateTimeUtcNow();
if (await userExists)
{
throw new Exception("user already exists");
}
if (clash1 || clash2)
if (await checkingExists || await savingsExists)
{
throw new Exception("account id already exists");
}

Просмотреть файл

@ -40,34 +40,29 @@ namespace Bank.Service
public async Task<bool> Execute(IOrchestrationContext context)
{
// perform authentication
await context.PerformRead(new Authentication()
await context.PerformRead(new CheckCredentials()
{
UserId = UserId,
Credentials = Credentials
});
// check all involved state so we can validate preconditions
var t1 = context.PerformRead(new CheckAccount() { AccountId = FromAccount });
var t2 = context.PerformRead(new CheckAccount() { AccountId = ToAccount });
var t1 = context.PerformRead(new GetAccountInfo() { AccountId = FromAccount });
var t2 = context.PerformRead(new GetAccountInfo() { AccountId = ToAccount });
// get a timestamp
var timestamp = await context.ReadDateTimeUtcNow();
// wait for the checks to complete. This ensures both accounts exist.
// (otherwise an exception is thrown)
var fromAccount = await t1;
var toAccount = await t2;
GetAccountInfo.Response fromAccountInfo = await t1;
GetAccountInfo.Response toAccountInfo = await t2;
if (fromAccount == null)
throw new KeyNotFoundException($"no such account: {fromAccount}");
if (toAccount == null)
throw new KeyNotFoundException($"no such account: {toAccount}");
if (fromAccount.Owner != UserId)
if (fromAccountInfo.Owner != UserId)
{
throw new InvalidOperationException("only owner of account can issue transfer");
}
else if (fromAccount.Balance < Amount)
else if (fromAccountInfo.Balance < Amount)
{
return false;
}
@ -82,6 +77,5 @@ namespace Bank.Service
return true;
}
}
}
}

Просмотреть файл

@ -1,46 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using ReactiveMachine;
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace Bank.Service
{
[DataContract]
internal class Authentication :
IRead<UserState, UnitType>,
IUserAffinity
{
[DataMember]
public string UserId { get; set; }
[DataMember]
public string Credentials;
public UnitType Execute(IReadContext<UserState> context)
{
// retrieve the information for this user
var userInfo = context.State;
// if authentication fails, raise event and throw exception
if (!Validate(Credentials, userInfo.InitialCredentials))
{
throw new InvalidOperationException("Unauthorized");
}
// TODO
// escape transaction to report failed login
return UnitType.Value;
}
private bool Validate(string credentials, string initialCredentials)
{
return credentials == initialCredentials; // TODO make more interesting
}
}
}

Просмотреть файл

@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using ReactiveMachine;
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace Bank.Service
{
[DataContract]
internal class CheckCredentials :
IRead<UserState, bool>,
IUserAffinity
{
[DataMember]
public string UserId { get; set; }
[DataMember]
public string Credentials;
public bool Execute(IReadContext<UserState> context)
{
// in reality this would be something more interesting,
// e.g. validate a crypto-signed token
return Credentials == context.State.InitialCredentials;
}
}
}

Просмотреть файл

@ -11,15 +11,15 @@ using System.Threading.Tasks;
namespace Bank.Service
{
[DataContract]
public class CheckUseridAvailable :
IRead<UserState, bool>,
public class GetUserInfo :
IRead<UserState, string>,
IUserAffinity
{
public string UserId { get; set; }
public bool Execute(IReadContext<UserState> context)
public string Execute(IReadContext<UserState> context)
{
return ! context.State.Created.HasValue;
return context.State.FullName;
}
}
}

Просмотреть файл

@ -3,8 +3,10 @@
using Bank.Service;
using Bank.Tests;
using Microsoft.Extensions.Logging;
using ReactiveMachine;
using System;
using System.Diagnostics;
namespace Bank.OnEmulator
{
@ -12,13 +14,29 @@ namespace Bank.OnEmulator
{
static void Main(string[] args)
{
if (args.Length < 1 || !uint.TryParse(args[0], out var numNodes))
numNodes = 4;
var configuration = new EmulatorHost.Configuration()
{
MultiThreaded = true,
ApplicationLogLevel = LogLevel.Trace,
RuntimeLogLevel = LogLevel.Trace,
FileLogLevel = LogLevel.Trace,
LocalLogDirectory = "C:\\logs\\",
};
var loggingConfig = new ReactiveMachine.LoggingConfiguration()
{
//SendLogLevel = LogLevel.Trace,
//LockLogLevel = LogLevel.Trace
};
Console.WriteLine("Building Application...");
var compiler = new ApplicationCompiler();
compiler.AddService<BankTestsService>();
compiler.AddService<BankTestsService>();
compiler.AddBuildStep(sb => sb
.SetConfiguration(configuration)
.SetConfiguration(loggingConfig));
var compiled = compiler.Compile(numNodes);
Console.WriteLine("Building Host...");

Просмотреть файл

@ -59,12 +59,12 @@ namespace Counter.Benchmark.OnEmulator
RoundTripProcessStateEvery = int.MaxValue,
DeliverStaleExternalsOneOutOf = 1,
ConsoleLogLevel = LogLevel.Information,
ConsoleLogLevel = LogLevel.Debug,
FileLogLevel = Debugger.IsAttached ? LogLevel.Trace : LogLevel.None,
LocalLogDirectory = "C:\\logs\\",
ApplicationLogLevel = LogLevel.Trace, // log through runtime
HostLogLevel = LogLevel.Trace,
HostLogLevel = LogLevel.Warning,
RuntimeLogLevel = LogLevel.Trace
};
@ -75,11 +75,17 @@ namespace Counter.Benchmark.OnEmulator
CollectThroughput = (System.Diagnostics.Debugger.IsAttached || appConfig.IsLoadLoopsExperiment),
};
var runtimeLoggingConfig = new LoggingConfiguration()
{
//SendLogLevel = LogLevel.Trace,
//ProgressLogLevel = LogLevel.None,
};
var application = new ApplicationCompiler()
.SetConfiguration(appConfig)
.SetConfiguration(telemetryConfig)
.SetConfiguration(hostConfig)
.SetConfiguration(runtimeLoggingConfig)
.AddService<CounterBenchmarkService>()
.Compile(appConfig.NumberCounterProcesses + appConfig.NumberGeneratorProcesses);

Просмотреть файл

@ -97,7 +97,7 @@ namespace Counter.Benchmark.OnFunctions
{
// connection strings
StorageConnectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage"),
ehConnectionString = System.Environment.GetEnvironmentVariable("EVENTHUBS_CONNECTION_STRING"),
EventHubsConnectionString = System.Environment.GetEnvironmentVariable("EVENTHUBS_CONNECTION_STRING"),
AppInsightsInstrumentationKey = System.Environment.GetEnvironmentVariable("APPINSIGHTS_INSTRUMENTATIONKEY"),
// logging sources : specify levels to be generated
@ -117,10 +117,5 @@ namespace Counter.Benchmark.OnFunctions
MaxReceiveBatchSize = 10000,
};
}
public IEnumerable<Type> GetResultTypes()
{
yield break;
}
}
}

Просмотреть файл

@ -67,7 +67,7 @@ namespace Counter.Service.OnFunctions
{
// connection strings
StorageConnectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage"),
ehConnectionString = System.Environment.GetEnvironmentVariable("EVENTHUBS_CONNECTION_STRING"),
EventHubsConnectionString = System.Environment.GetEnvironmentVariable("EVENTHUBS_CONNECTION_STRING"),
AppInsightsInstrumentationKey = System.Environment.GetEnvironmentVariable("APPINSIGHTS_INSTRUMENTATIONKEY"),
// logging sources : specify levels to be generated

Просмотреть файл

@ -29,6 +29,7 @@ namespace Counter.Service
[DataMember]
public uint CounterId { get; set; }
[CreateIfNotExists]
public int Execute(IUpdateContext<Counter2> context)
{
return ++(context.State.Count);

Просмотреть файл

@ -28,7 +28,7 @@ namespace Harness
[DataMember]
public IWorkloadGenerator Workload;
[CreateIfNotExists]
public UnitType Execute(IUpdateContext<GeneratorState> context)
{
if (!context.State.ExperimentIsFinished)

Просмотреть файл

@ -18,6 +18,7 @@ namespace Harness
[DataMember]
public uint GeneratorNumber { get; set; }
[CreateIfNotExists]
public UnitType Execute(IUpdateContext<GeneratorState> context)
{
context.State.ExperimentIsFinished = true;

Просмотреть файл

@ -13,7 +13,7 @@ using System.Threading.Tasks;
namespace Miner.Service
{
[DataContract]
public class SearchPortion : IAtLeastOnceActivity<List<long>>
public class SearchPortion : IActivity<List<long>>
{
public TimeSpan TimeLimit => TimeSpan.FromSeconds(30);

Просмотреть файл

@ -15,7 +15,6 @@ namespace RideSharing.Benchmark.OnEmulator
{
static void Main(string[] args)
{
var applicationConfiguration = new RideSharing.Benchmark.Configuration()
{
NumberGeneratorProcesses = 1,

Просмотреть файл

@ -12,6 +12,7 @@ namespace RideSharing.Benchmark
{
public void Build(IServiceBuilder builder)
{
builder.BuildService<RideSharingService>();
builder.ScanThisDLL();
}
}

Просмотреть файл

@ -17,18 +17,13 @@ namespace RideSharing
GeoLocation Location { get; }
}
public struct GeoLocation : IEquatable<GeoLocation>, IUseConsistentHash
public struct GeoLocation : ICustomKeyType<GeoLocation>
{
public GeoLocation Location => this;
// for this simple sample, we just use zipcode
public int ZipCode;
public ulong GetHashInput()
{
return (ulong) ZipCode;
}
public GeoLocation(int zipCode)
{
this.ZipCode = zipCode;
@ -41,11 +36,28 @@ namespace RideSharing
yield return new GeoLocation(zipCode);
}
}
public bool Equals(GeoLocation other)
public override bool Equals(object obj)
{
return Equals(ZipCode, other.ZipCode);
return (obj is GeoLocation other) && this.ZipCode == other.ZipCode;
}
public override int GetHashCode()
{
return ZipCode.GetHashCode();
}
#region ICustomKeyType
public Func<GeoLocation, uint, uint> RoundRobinPlacer =>
(location, numprocs) => 0;
public Func<GeoLocation, GeoLocation, int> Comparator =>
(location1, location2) => location1.ZipCode.CompareTo(location2.ZipCode);
public Func<GeoLocation, uint, uint> JumpConsistentHasher =>
(location, numprocs) => ReactiveMachine.Util.JumpConsistentHash.Compute((ulong)location.ZipCode, numprocs);
#endregion
}

Просмотреть файл

@ -6,6 +6,7 @@
<ItemGroup>
<ProjectReference Include="..\..\ReactiveMachine.Abstractions\ReactiveMachine.Abstractions.csproj" />
<ProjectReference Include="..\..\ReactiveMachine.Compiler\ReactiveMachine.Compiler.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -53,7 +53,7 @@ namespace HelloWorld.Service.OnFunctions
{
// connection strings
StorageConnectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage"),
ehConnectionString = System.Environment.GetEnvironmentVariable("EVENTHUBS_CONNECTION_STRING"),
EventHubsConnectionString = System.Environment.GetEnvironmentVariable("EVENTHUBS_CONNECTION_STRING"),
AppInsightsInstrumentationKey = System.Environment.GetEnvironmentVariable("APPINSIGHTS_INSTRUMENTATIONKEY"),
// logging sources : specify levels to be generated
@ -73,10 +73,5 @@ namespace HelloWorld.Service.OnFunctions
MaxReceiveBatchSize = 10000,
};
}
public IEnumerable<Type> GetResultTypes()
{
yield return typeof(string);
}
}
}

Просмотреть файл

@ -5,11 +5,6 @@ VisualStudioVersion = 15.0.28010.2003
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloWorld.Service", "HelloWorld.Service\HelloWorld.Service.csproj", "{96B4C69F-AC50-4575-9B3E-4915F053D1B2}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{23E91679-B0D7-47E1-9180-1FD8DEEAC081}"
ProjectSection(SolutionItems) = preProject
readme.md = readme.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloWorld.Test.OnEmulator", "HelloWorld.Test.OnEmulator\HelloWorld.Test.OnEmulator.csproj", "{1A7543E5-C61A-4529-A5CA-A1843F516B87}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloWorld.Test", "HelloWorld.Test\HelloWorld.Test.csproj", "{B58CC879-2BA0-4A23-BD63-891DB55F5B38}"

Просмотреть файл

@ -1,35 +0,0 @@
# HelloWorld Sample
## Purpose
This sample illustrates how to use the NuGet packages to build and deploy.
It does NOT show the capabilities of the programming model. Check the samples in the Application folder for to find out more about the various features of Reactive Machine.
## Description of Projects
HelloWorld.Service:
defines the application logic. For this HelloWorld example the service contains a single orchestration that writes "Hello World" to the log.
HelloWorld.Test:
defines a service that, on startup, calls the HelloWorld service a specified number of times.
HelloWorld.Test.OnEmulator:
an emulator deployment of the HelloWorld test.
HelloWorld.Test.OnFunctions:
an Azure Functions deployment of the HelloWorld test.
NOTE: this is a preliminary version of the sample.
The intention is to host the HelloWorld service in Azure functions using an Http binding.

Просмотреть файл

@ -21,7 +21,7 @@ namespace FunctionsHost
public String StorageConnectionString { get; set; }
[JsonIgnore] // we want no connection strings in logs
public String ehConnectionString { get; set; }
public String EventHubsConnectionString { get; set; }
[JsonIgnore] // we want no connection strings in logs
public String AppInsightsInstrumentationKey { get; set; }

Просмотреть файл

@ -19,10 +19,10 @@ namespace FunctionsHost
public class Client<TStaticApplicationInfo> : IClient, IClientInternal
where TStaticApplicationInfo : IStaticApplicationInfo, new()
{
private static Object lockable = new object();
private static readonly Object lockable = new object();
private static Client<TStaticApplicationInfo> instance;
private readonly IStaticApplicationInfo applicationInfo;
private readonly TStaticApplicationInfo applicationInfo;
private readonly FunctionsHostConfiguration configuration;
private readonly uint processId;
private readonly EventHubsConnections connections;
@ -61,7 +61,7 @@ namespace FunctionsHost
this.application = Compilation.Compile<TStaticApplicationInfo>(applicationInfo, configuration);
this.processId = (uint)random.Next((int)application.NumberProcesses); // we connect to a randomly selected process
this.responsePartition = (uint)random.Next(ResponseSender.NumberPartitions); // we receive on a randomly selected partition
this.connections = new EventHubsConnections(processId, logger, configuration.ehConnectionString);
this.connections = new EventHubsConnections(processId, logger, configuration.EventHubsConnectionString);
this.requestSender = new RequestSender(processId, connections, logger, new DataContractSerializer(typeof(List<IMessage>), application.SerializableTypes), configuration);
this.responseSenders = new Dictionary<uint, ResponseSender>();
this.continuations = new ConcurrentDictionary<Guid, object>();

Просмотреть файл

@ -8,7 +8,7 @@ using System.Threading.Tasks;
namespace FunctionsHost
{
[DataContract]
internal class ClientRequestResponseNotification<TApplicationInfo,TResult> : IAtLeastOnceActivity<UnitType>
internal class ClientRequestResponseNotification<TApplicationInfo,TResult> : IActivity<UnitType>
where TApplicationInfo: IStaticApplicationInfo, new()
{
[DataMember]

Просмотреть файл

@ -1,39 +0,0 @@
using ReactiveMachine;
using System;
using System.Collections.Generic;
using System.Text;
namespace FunctionsHost
{
internal static class Compilation
{
internal static ICompiledApplication Compile<TApplicationInfo>(IStaticApplicationInfo info, FunctionsHostConfiguration configuration)
where TApplicationInfo : IStaticApplicationInfo, new()
{
var compiler = new ReactiveMachine.ApplicationCompiler();
compiler
.SetConfiguration(configuration)
.AddBuildStep(serviceBuilder =>
{
var m = typeof(Compilation).GetMethod(nameof(Compilation.DefineForResultType));
foreach (var t in info.GetResultTypes())
{
var mg = m.MakeGenericMethod(typeof(TApplicationInfo), t);
mg.Invoke(null, new object[] { serviceBuilder });
}
})
;
return info.Build(compiler);
}
public static void DefineForResultType<TStaticApplicationInfo, TResult>(IServiceBuilder serviceBuilder)
where TStaticApplicationInfo : IStaticApplicationInfo, new()
{
serviceBuilder
.DefineOrchestration<ClientRequestOrchestration<TStaticApplicationInfo, TResult>, UnitType>()
.DefineAtLeastOnceActivity<ClientRequestResponseNotification<TStaticApplicationInfo, TResult>, UnitType>()
.RegisterSerializableType(typeof(ResponseMessage<TResult>))
;
}
}
}

Просмотреть файл

@ -0,0 +1,37 @@
using ReactiveMachine;
using System;
using System.Collections.Generic;
using System.Text;
namespace FunctionsHost
{
internal static class Compilation
{
public static ICompiledApplication Compile<TStaticApplicationInfo>(TStaticApplicationInfo info, FunctionsHostConfiguration configuration)
where TStaticApplicationInfo : IStaticApplicationInfo, new()
{
var compiler = new Compiler<TStaticApplicationInfo>();
compiler.SetConfiguration(configuration);
return info.Build(compiler);
}
internal class Compiler<TStaticApplicationInfo> : ApplicationCompiler, IDerivedDefiner
where TStaticApplicationInfo : IStaticApplicationInfo, new()
{
public override ICompiledApplication Compile(uint numberProcesses)
{
AddBuildStep(serviceBuilder => serviceBuilder.DefineDerived(this));
return base.Compile(numberProcesses);
}
public void DefineForEachOrchestration<TRequest, TResult>(IServiceBuilder serviceBuilder)
{
serviceBuilder
.DefineOrchestration<ClientRequestOrchestration<TStaticApplicationInfo, TResult>, UnitType>()
.DefineActivity<ClientRequestResponseNotification<TStaticApplicationInfo, TResult>, UnitType>()
.RegisterSerializableType(typeof(ResponseMessage<TResult>))
;
}
}
}
}

Просмотреть файл

@ -39,7 +39,7 @@ namespace FunctionsHost
private TelemetryCollector blobTelemetryListener;
private readonly Stopwatch stopwatch;
private readonly Guid invocationId;
private bool collectHostEvents;
private readonly bool collectHostEvents;
public CombinedLogger CombinedLogger;
public ILogger HostLogger;
@ -52,7 +52,7 @@ namespace FunctionsHost
private readonly DataContractSerializer payloadSerializer;
private readonly DataContractSerializer payloadSerializerLoopback;
public Host(IStaticApplicationInfo applicationInfo, FunctionsHostConfiguration configuration, ILogger logger, uint processId, Stopwatch stopwatch, Guid invocationId)
public Host(TStaticApplicationInfo applicationInfo, FunctionsHostConfiguration configuration, ILogger logger, uint processId, Stopwatch stopwatch, Guid invocationId)
{
this.processId = processId;
this.stopwatch = stopwatch;
@ -71,7 +71,7 @@ namespace FunctionsHost
this.invocationId = invocationId;
this.payloadSerializer = new DataContractSerializer(typeof(List<KeyValuePair<long,IMessage>>), application.SerializableTypes);
this.payloadSerializerLoopback = new DataContractSerializer(typeof(List<IMessage>), application.SerializableTypes);
this.Connections = new EventHubsConnections(processId, HostLogger, configuration.ehConnectionString);
this.Connections = new EventHubsConnections(processId, HostLogger, configuration.EventHubsConnectionString);
if (application.TryGetConfiguration<ReactiveMachine.TelemetryBlobWriter.Configuration>(out var config))
{

Просмотреть файл

@ -129,7 +129,7 @@ namespace FunctionsHost
}
}
private static async Task RunHost(IStaticApplicationInfo applicationInfo, FunctionsHostConfiguration configuration, uint processId,
private static async Task RunHost(TStaticApplicationInfo applicationInfo, FunctionsHostConfiguration configuration, uint processId,
ILogger hostlogger, ILogger logger, LeaseManager leaseManager, System.Diagnostics.Stopwatch stopwatch, Guid invocationId)
{
try

Просмотреть файл

@ -27,13 +27,6 @@ namespace FunctionsHost
/// <returns></returns>
FunctionsHostConfiguration GetHostConfiguration();
/// <summary>
/// A list of types that may be returned by requests, and that must
/// therefore be serializable
/// </summary>
/// <returns></returns>
IEnumerable<Type> GetResultTypes();
/// <summary>
/// The build recipe for creating the application
/// </summary>

Просмотреть файл

@ -51,12 +51,12 @@ namespace FunctionsHost
}
if ((ex.RequestInformation.HttpStatusCode == 404) && (information.ErrorCode.Equals(BlobErrorCodeStrings.ContainerNotFound)))
{
logger.LogWarning("Invalid application state (Container not found). Ignoring Doorbell.");
logger.LogWarning("Invalid application state (Container not found). Ignoring Doorbell. Did you forget to initialize the service?");
return false;
}
if ((ex.RequestInformation.HttpStatusCode == 404) && (information.ErrorCode.Equals(BlobErrorCodeStrings.BlobNotFound)))
{
logger.LogWarning("Invalid application state (Blob not found). Ignoring Doorbell.");
logger.LogWarning("Invalid application state (Blob not found). Ignoring Doorbell. Did you forget to initialize the service?");
return false;
}

Просмотреть файл

@ -57,7 +57,7 @@ namespace EmulatorHost
}
else
{
new MultiThreadedSimulation(configuration, application, deploymentId, deploymentTimestamp, hostLogger).Run(application);
new MultiThreadedSimulation(configuration, application, deploymentId, deploymentTimestamp, hostLogger, this).Run(application);
}
if (Debugger.IsAttached)
@ -91,8 +91,15 @@ namespace EmulatorHost
return null;
}
public void FlushLog()
{
if (streamWriter != null)
streamWriter.Flush();
}
public void Shutdown()
{
FlushLog();
Environment.Exit(0);
}
}

Просмотреть файл

@ -28,19 +28,21 @@ namespace EmulatorHost
private readonly Configuration configuration;
private readonly ICompiledApplication application;
private ILogger logger;
private Emulator emulator;
public MultiThreadedSimulation(Configuration configuration, ICompiledApplication application, string deploymentId, DateTime deploymentTimestamp, ILogger logger)
public MultiThreadedSimulation(Configuration configuration, ICompiledApplication application, string deploymentId, DateTime deploymentTimestamp, ILogger logger, Emulator emulator)
{
this.configuration = configuration;
this.application = application;
this.deploymentId = deploymentId;
this.deploymentTimestamp = deploymentTimestamp;
this.logger = logger;
this.emulator = emulator;
}
DataContractSerializer _serializer;
static List<IMessage> empty = new List<IMessage>();
static readonly List<IMessage> empty = new List<IMessage>();
private Random random = new Random(0);
@ -105,6 +107,8 @@ namespace EmulatorHost
if (!someoneBusy)
break;
emulator.FlushLog();
shutdown.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10));
}

Просмотреть файл

@ -25,7 +25,7 @@ namespace EmulatorHost
private readonly DateTime deploymentTimestamp;
private readonly Configuration configuration;
private readonly ICompiledApplication application;
private ILogger logger;
private readonly ILogger logger;
public SingleThreadSimulation(Configuration configuration, ICompiledApplication application, string deploymentId, DateTime deploymentTimestamp, ILogger logger)
{
@ -38,7 +38,7 @@ namespace EmulatorHost
DataContractSerializer _serializer;
static List<IMessage> empty = new List<IMessage>();
static readonly List<IMessage> empty = new List<IMessage>();
private Random random = new Random(0);
@ -153,7 +153,7 @@ namespace EmulatorHost
}
}
private object sgl = new object();
private readonly object sgl = new object();
private int messageCount;
public void HandleGlobalException(Exception e)

Просмотреть файл

@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.Text;
namespace ReactiveMachine
{
/// <summary>
/// If placed on the Execute method of an orchestration, the orchestration acuqires
/// locks on all of its affinities prior to executing the body.
/// </summary>
[AttributeUsage(AttributeTargets.Method, Inherited = false, AllowMultiple = false)]
public class CreateIfNotExistsAttribute : Attribute
{
}
}

Просмотреть файл

@ -11,7 +11,7 @@ namespace ReactiveMachine
/// If placed on an orchestration, each instance of that orchestration is executed on a randomly selected process.
/// </summary>
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class, Inherited = false, AllowMultiple = false)]
public class RandomPlacementAttribute : System.Attribute
public class DistributeAttribute : System.Attribute
{
}
}

Просмотреть файл

@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ReactiveMachine
{
public interface IDerivedDefiner
{
void DefineForEachOrchestration<TRequest, TResponse>(IServiceBuilder serviceBuilder);
}
}

Просмотреть файл

@ -56,5 +56,8 @@ namespace ReactiveMachine
IPlacementBuilder PlaceOnCaller<TOperation>()
where TOperation : INonAffineOperation;
IPlacementBuilder PlaceRandomly<TOperation>()
where TOperation : INonAffineOperation;
}
}

Просмотреть файл

@ -56,13 +56,11 @@ namespace ReactiveMachine
where TState : IState
where TRequest : IRead<TState, TReturn>;
IServiceBuilder DefineAtLeastOnceActivity<TRequest, TReturn>()
where TRequest : IAtLeastOnceActivity<TReturn>;
IServiceBuilder DefineActivity<TRequest, TReturn>()
where TRequest : IActivity<TReturn>;
IServiceBuilder DefineAtMostOnceActivity<TRequest, TReturn>()
where TRequest : IAtMostOnceActivity<TReturn>;
IServiceBuilder DefineDerived(IDerivedDefiner definer);
}
}

Просмотреть файл

@ -48,10 +48,6 @@ namespace ReactiveMachine
/// <summary>
/// the log level used for information on progress
/// </summary>
public LogLevel ProgressLogLevel { get; set; } = LogLevel.Information;
public LogLevel ProgressLogLevel { get; set; } = LogLevel.Debug;
}
}

Просмотреть файл

@ -32,7 +32,7 @@ namespace ReactiveMachine
where TState: IState
{
CheckTimeArgument(delay);
context.ForkOrchestration(new Extensions.ForkedLocalUpdate<TState, TReturn>()
context.ForkOrchestration(new Extensions.ForkedUpdate<TState, TReturn>()
{
Delay = delay,
Update = update
@ -49,7 +49,7 @@ namespace ReactiveMachine
});
}
public static void ScheduleActivity<TReturn>(this IContextWithForks context, TimeSpan delay, IActivityBase<TReturn> activity)
public static void ScheduleActivity<TReturn>(this IContextWithForks context, TimeSpan delay, IActivity<TReturn> activity)
{
CheckTimeArgument(delay);
context.ForkOrchestration(new Extensions.ForkedActivity<TReturn>()

Просмотреть файл

@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace ReactiveMachine.Extensions
{
[DataContract]
public class DeterminizationActivity<TReturn> : IActivity<TReturn>
{
[DataMember]
public TReturn Value;
public TimeSpan TimeLimit => TimeSpan.FromHours(1); // irrelevant - executes instantly
public Task<TReturn> Execute(IContext context)
{
return Task.FromResult(Value);
}
public override string ToString()
{
return $"Determinize<{typeof(TReturn).Name}>";
}
}
}

Просмотреть файл

@ -13,17 +13,14 @@ namespace ReactiveMachine.Extensions
public class ForkedActivity<TReturn> : IOrchestration<UnitType>
{
[DataMember]
public IActivityBase<TReturn> Activity;
public IActivity<TReturn> Activity;
[DataMember]
public TimeSpan Delay;
public override string ToString()
{
if (Delay != TimeSpan.Zero)
return $"Forked-{Activity}";
else
return $"Forked-{Delay}-{Activity}";
return $"Scheduled-{Delay}-{Activity}";
}
public async Task<UnitType> Execute(IOrchestrationContext context)

Просмотреть файл

@ -20,10 +20,7 @@ namespace ReactiveMachine.Extensions
public override string ToString()
{
if (Delay != TimeSpan.Zero)
return $"Forked-{Event}";
else
return $"Forked-{Delay}-{Event}";
return $"Scheduled-{Delay}-{Event}";
}
public async Task<UnitType> Execute(IOrchestrationContext context)

Просмотреть файл

@ -20,10 +20,7 @@ namespace ReactiveMachine.Extensions
public override string ToString()
{
if (Delay != TimeSpan.Zero)
return $"Forked-{Orchestration}";
else
return $"Forked-{Delay}-{Orchestration}";
return $"Scheduled-{Delay}-{Orchestration}";
}
public async Task<UnitType> Execute(IOrchestrationContext context)

Просмотреть файл

@ -10,7 +10,7 @@ using System.Threading.Tasks;
namespace ReactiveMachine.Extensions
{
[DataContract]
public class ForkedLocalUpdate<TState, TReturn> : IOrchestration<UnitType>
public class ForkedUpdate<TState, TReturn> : IOrchestration<UnitType>
where TState : IState
{
[DataMember]
@ -21,10 +21,7 @@ namespace ReactiveMachine.Extensions
public override string ToString()
{
if (Delay != TimeSpan.Zero)
return $"Forked-{Update}";
else
return $"Forked-{Delay}-{Update}";
return $"Scheduled-{Delay}-{Update}";
}
public async Task<UnitType> Execute(IOrchestrationContext context)

Просмотреть файл

@ -19,8 +19,11 @@ namespace ReactiveMachine.Extensions
public static void DefineInternalExtensions(IServiceBuilder builder)
{
builder
.DefineAtLeastOnceActivity<StableDelay, UnitType>()
.DefineActivity<StableDelay, UnitType>()
.DefineOrchestration<ForkedEvent, UnitType>()
.DefineActivity<DeterminizationActivity<Guid>, Guid>()
.DefineActivity<DeterminizationActivity<int>, int>()
.DefineActivity<DeterminizationActivity<DateTime>, DateTime>()
;
}
@ -44,7 +47,7 @@ namespace ReactiveMachine.Extensions
else
{
builder.DefineOrchestration<UpdateWrapper<TState, TReturn>, TReturn>();
builder.DefineOrchestration<ForkedLocalUpdate<TState, TReturn>, UnitType>();
builder.DefineOrchestration<ForkedUpdate<TState, TReturn>, UnitType>();
}
}
}

Просмотреть файл

@ -10,7 +10,7 @@ using System.Threading.Tasks;
namespace ReactiveMachine.Extensions
{
[DataContract]
public class StableDelay : IAtLeastOnceActivity<UnitType>
public class StableDelay : IActivity<UnitType>
{
[DataMember]
public DateTime TargetTime;

Просмотреть файл

@ -19,7 +19,7 @@ namespace ReactiveMachine
void RegisterSend(Action<uint, IMessage> action);
void RegisterSerializableType<Type>();
void RegisterSerializableType(Type type);
void RegisterGlobalShutdown(Action globalShutdown);

Просмотреть файл

@ -9,29 +9,16 @@ using System.Threading.Tasks;
namespace ReactiveMachine
{
/// <summary>
/// An activity that returns a value of type {TReturn}.
/// An activity that returns a value of type {TReturn}. An activity executes at least once, and may (in rare failure cases) execute more than once.
/// </summary>
public interface IActivityBase<TReturn> : IActivity
public interface IActivity<TReturn> : IActivity
{
TimeSpan TimeLimit { get; }
Task<TReturn> Execute(IContext context);
}
/// <summary>
/// An activity that executes at least once, and may (in rare failure cases) execute more than once.
/// </summary>
public interface IAtLeastOnceActivity<TReturn> : IActivityBase<TReturn>
{
}
/// <summary>
/// An activity that executes either once, or (in the rare case of uncertainty) executes a custom handler
/// </summary>
public interface IAtMostOnceActivity<TReturn> : IActivityBase<TReturn>
{
Task<TReturn> AfterFault(IContext context);
}

Просмотреть файл

@ -44,6 +44,13 @@ namespace ReactiveMachine
{
}
/// <summary>
/// Context for initializing state when it is first accessed
/// </summary>
public interface IInitializationContext : IOrchestrationContext
{
}
/// <summary>
/// Context for executing user code that updates state in response to an event.
/// </summary>
@ -101,10 +108,14 @@ namespace ReactiveMachine
{
Task<TReturn> PerformOrchestration<TReturn>(IOrchestration<TReturn> orchestration);
Task<TReturn> PerformActivity<TReturn>(IActivityBase<TReturn> activity);
Task<TReturn> PerformActivity<TReturn>(IActivity<TReturn> activity);
Task PerformEvent<TEvent>(TEvent evt) where TEvent : IEvent;
Task<bool> StateExists<TState, TAffinity, TKey>(TKey key)
where TState : IPartitionedState<TAffinity, TKey>
where TAffinity : IPartitionedAffinity<TAffinity, TKey>;
Task Finish();
}

Просмотреть файл

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ReactiveMachine
{
public interface ICustomKeyType<TKey>
{
Func<TKey, uint, uint> RoundRobinPlacer { get; }
Func<TKey, TKey, int> Comparator { get; }
Func<TKey, uint, uint> JumpConsistentHasher { get; }
}
}

Просмотреть файл

@ -43,4 +43,5 @@ namespace ReactiveMachine
{
}
}

Просмотреть файл

@ -1,23 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.Text;
namespace ReactiveMachine
{
public interface IUseConsistentHash
{
ulong GetHashInput();
}
public interface IUseCustomPlacement
{
uint GetProcess(uint NumberProcesses);
}
}

Просмотреть файл

@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace ReactiveMachine
{
@ -34,6 +35,14 @@ namespace ReactiveMachine
void On(ISubscriptionContext<TKey> context, TEvent evt);
}
public interface IInitialize
{
Task OnInitialize(IInitializationContext context);
}
public interface IInitialize<TKey>
{
Task OnInitialize(IInitializationContext context, TKey key);
}
}

Просмотреть файл

@ -33,5 +33,10 @@ namespace ReactiveMachine
{
return 1234;
}
public override string ToString()
{
return "unit";
}
}
}

Просмотреть файл

@ -53,7 +53,7 @@ namespace ReactiveMachine
return this;
}
public ICompiledApplication Compile(uint numberProcesses)
public virtual ICompiledApplication Compile(uint numberProcesses)
{
NumberProcesses = numberProcesses;
@ -61,6 +61,8 @@ namespace ReactiveMachine
// build (and then discard) a process for the sake of catching errors now
var p = (Process)MakeProcess(0);
// extract configurations
Configurations = p.Configurations;
return this;
@ -108,7 +110,6 @@ namespace ReactiveMachine
Orchestration = orchestration
};
}
}

Просмотреть файл

@ -48,9 +48,9 @@ namespace ReactiveMachine.Compiler
Send = action;
}
public void RegisterSerializableType<Type>()
public void RegisterSerializableType(Type type)
{
SerializableTypeSet.Add(typeof(Type));
SerializableTypeSet.Add(type);
}
public void RegisterGlobalShutdown(Action globalShutdown)

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using ReactiveMachine.Util;
using System;
using System.Collections.Generic;
using System.Text;
@ -137,7 +138,7 @@ namespace ReactiveMachine.Compiler
{
throw new BuilderException($"undefined key {typeof(TAffinity).FullName}");
}
activityInfo.AffinitizationKey = keyInfo;
activityInfo.PlacementAffinity = keyInfo;
}
else
throw new NotImplementedException();
@ -153,6 +154,7 @@ namespace ReactiveMachine.Compiler
throw new BuilderException($"undefined orchestration {typeof(TOperation).FullName}");
}
orchestrationInfo.PlacementAffinity = null;
orchestrationInfo.DistributeRandomly = false;
}
else if (typeof(IActivity).IsAssignableFrom(typeof(TOperation)))
{
@ -160,7 +162,33 @@ namespace ReactiveMachine.Compiler
{
throw new BuilderException($"undefined activity {typeof(TOperation).FullName}");
}
activityInfo.AffinitizationKey = null;
activityInfo.PlacementAffinity = null;
activityInfo.DistributeRandomly = false;
}
else
throw new NotImplementedException();
return this;
}
IPlacementBuilder IPlacementBuilder.PlaceRandomly<TOperation>()
{
if (typeof(IOrchestration).IsAssignableFrom(typeof(TOperation)))
{
if (!process.Orchestrations.TryGetValue(typeof(TOperation), out var orchestrationInfo))
{
throw new BuilderException($"undefined orchestration {typeof(TOperation).FullName}");
}
orchestrationInfo.PlacementAffinity = null;
orchestrationInfo.DistributeRandomly = true;
}
else if (typeof(IActivity).IsAssignableFrom(typeof(TOperation)))
{
if (!process.Activities.TryGetValue(typeof(TOperation), out var activityInfo))
{
throw new BuilderException($"undefined activity {typeof(TOperation).FullName}");
}
activityInfo.PlacementAffinity = null;
activityInfo.DistributeRandomly = true;
}
else
throw new NotImplementedException();

Просмотреть файл

@ -35,18 +35,13 @@ namespace ReactiveMachine.Compiler
public uint NumberProcesses { get; internal set; }
private uint Pad;
// orchestrations in progress
// orchestrations, activities, and remote forks in progress
public Dictionary<ulong, IOrchestrationState> OrchestrationStates = new Dictionary<ulong, IOrchestrationState>();
public bool IsPrimary;
public Dictionary<ulong, ActivityState> PendingActivities = new Dictionary<ulong, ActivityState>();
public Dictionary<ulong, IActivityState> PendingActivities = new Dictionary<ulong, IActivityState>();
public Dictionary<ulong, FinishState> FinishStates = new Dictionary<ulong, FinishState>();
internal struct ActivityState
{
public Task Task;
public string Name;
}
public bool IsPrimary;
// clocks
public ulong DeliveryCounter;
public ulong OperationCounter;
@ -97,10 +92,29 @@ namespace ReactiveMachine.Compiler
}
if (OrchestrationStates.Count > 0)
{
var op = OrchestrationStates.First().Value.WaitingFor;
while (OrchestrationStates.ContainsKey(op.Key))
op = OrchestrationStates[op.Key].WaitingFor;
ProgressTracer?.Invoke($"Waiting for o{op.Key:d10}-{op.Value}");
if (ProgressTracer != null)
{
var orchestration = OrchestrationStates.First().Value;
while (true)
{
var response = orchestration.WaitingFor;
if (!response.HasValue)
{
ProgressTracer?.Invoke($"Waiting for orchestration {orchestration}");
break;
}
else if (OrchestrationStates.TryGetValue(response.Value.Key, out var o))
{
orchestration = o;
continue;
}
else
{
ProgressTracer?.Invoke($"Waiting for response o{response.Value.Key:d10}-{response.Value.Value}");
break;
}
}
}
return true;
}
return false;
@ -122,16 +136,19 @@ namespace ReactiveMachine.Compiler
if (LowerBounds[origin] < opid)
LowerBounds[origin] = opid;
}
public void HandleGlobalException(Exception e)
public void CheckForUnhandledException(object result)
{
if (HostServices.GlobalExceptionHandler != null)
if (Serializer.DeserializeException(result, out var exceptionResult))
{
HostServices.GlobalExceptionHandler(e);
}
else
{
RuntimeLogger.LogError($"!!! Unhandled Exception: {e}");
if (HostServices.GlobalExceptionHandler != null)
{
HostServices.GlobalExceptionHandler(exceptionResult);
}
else
{
RuntimeLogger.LogError($"!!! Unhandled Exception: {exceptionResult}");
}
}
}
@ -190,11 +207,15 @@ namespace ReactiveMachine.Compiler
HostServices.SerializableTypeSet.Add(typeof(DataContractSerializedExceptionResult));
HostServices.SerializableTypeSet.Add(typeof(ClassicallySerializedExceptionResult));
HostServices.SerializableTypeSet.Add(typeof(NonserializedExceptionResult));
HostServices.SerializableTypeSet.Add(typeof(SynchronizationDisciplineException));
HostServices.SerializableTypeSet.Add(typeof(List<IRestorable>));
HostServices.SerializableTypeSet.Add(typeof(KeyNotFoundException));
HostServices.SerializableTypeSet.Add(typeof(TimeoutException));
HostServices.SerializableTypeSet.Add(typeof(UnitType));
HostServices.SerializableTypeSet.Add(typeof(RequestFinish));
HostServices.SerializableTypeSet.Add(typeof(PerformFinish));
HostServices.SerializableTypeSet.Add(typeof(FinishState));
HostServices.SerializableTypeSet.Add(typeof(AckFinish));
HostServices.SerializableTypeSet.Add(typeof(RecordActivityResult));
HostServices.SerializableTypeSet.Add(typeof(RespondToActivity));
HostServices.SerializableTypeSet.Add(typeof(ExternalRequest));
HostServices.SerializableTypeSet.Add(typeof(EnqueueStartup));
@ -263,30 +284,13 @@ namespace ReactiveMachine.Compiler
SnapshotTracer?.Invoke($"Becoming Primary");
IsPrimary = true;
foreach (var t in PendingActivities)
foreach (var t in PendingActivities.Values)
{
ActivityTracer?.Invoke($" Restoring activity o{t.Key:D10}");
t.Value.Task.Start();
t.BecomePrimary();
}
}
}
// TODO avoid memory leak on secondaries
internal void AddActivity(ulong opid, string name, Task task)
{
PendingActivities.Add(opid, new ActivityState() { Name = name, Task = task });
if (IsPrimary)
{
task.Start();
}
}
internal void RemoveActivity(ulong opid)
{
PendingActivities.Remove(opid);
}
public void SaveToSnapshot(Snapshot s)
{
s.ProcessId = ProcessId;
@ -303,6 +307,8 @@ namespace ReactiveMachine.Compiler
x.SaveStateTo(s);
foreach (var x in OrchestrationStates.Values)
x.SaveStateTo(s);
foreach (var x in PendingActivities.Values)
x.SaveStateTo(s);
foreach (var x in FinishStates.Values)
x.SaveStateTo(s);
}
@ -327,7 +333,7 @@ namespace ReactiveMachine.Compiler
public void Send(uint destination, Message m)
{
SendTracer?.Invoke($" {m} -->p{destination:D3}");
SendTracer?.Invoke($" -->p{destination:D3} {m}");
if (destination == ProcessId)
{

Просмотреть файл

@ -165,8 +165,7 @@ namespace ReactiveMachine.Compiler
{
var orchestrationPrefix = GetGenericTypeNamePrefix(typeof(IOrchestration<>));
var readOrchestrationPrefix = GetGenericTypeNamePrefix(typeof(IReadOrchestration<>));
var atLeastOnceActivityPrefix = GetGenericTypeNamePrefix(typeof(IAtLeastOnceActivity<>));
var atMostOnceActivityPrefix = GetGenericTypeNamePrefix(typeof(IAtMostOnceActivity<>));
var atLeastOnceActivityPrefix = GetGenericTypeNamePrefix(typeof(IActivity<>));
var updatePrefix = GetGenericTypeNamePrefix(typeof(IUpdate<,>));
var readPrefix = GetGenericTypeNamePrefix(typeof(IRead<,>));
@ -179,7 +178,7 @@ namespace ReactiveMachine.Compiler
{
var name = GetGenericTypeNamePrefix(i);
if (name == orchestrationPrefix || name == readOrchestrationPrefix ||
name == atLeastOnceActivityPrefix || name == atMostOnceActivityPrefix ||
name == atLeastOnceActivityPrefix ||
name == updatePrefix || name == readPrefix)
{
if (spec != null)
@ -213,14 +212,7 @@ namespace ReactiveMachine.Compiler
else if (specname == atLeastOnceActivityPrefix)
{
var returntype = spec.GenericTypeArguments[0];
var m = typeof(IServiceBuilder).GetMethod(nameof(IServiceBuilder.DefineAtLeastOnceActivity));
var mg = m.MakeGenericMethod(o, returntype);
mg.Invoke(serviceBuilder, new object[0]);
}
else if (specname == atMostOnceActivityPrefix)
{
var returntype = spec.GenericTypeArguments[0];
var m = typeof(IServiceBuilder).GetMethod(nameof(IServiceBuilder.DefineAtMostOnceActivity));
var m = typeof(IServiceBuilder).GetMethod(nameof(IServiceBuilder.DefineActivity));
var mg = m.MakeGenericMethod(o, returntype);
mg.Invoke(serviceBuilder, new object[0]);
}

Просмотреть файл

@ -73,6 +73,15 @@ namespace ReactiveMachine.Compiler
return this;
}
IServiceBuilder IServiceBuilder.DefineDerived(IDerivedDefiner definer)
{
foreach (var o in process.Orchestrations.Values.ToList())
{
o.CallDerivedDefiner(definer, this);
}
return this;
}
IServiceBuilder IServiceBuilder.DefinePartitionedAffinity<TAffinity, TKey>()
{
var type = typeof(TAffinity);
@ -140,23 +149,15 @@ namespace ReactiveMachine.Compiler
return this;
}
IServiceBuilder IServiceBuilder.DefineAtLeastOnceActivity<TRequest, TReturn>()
IServiceBuilder IServiceBuilder.DefineActivity<TRequest, TReturn>()
{
var type = typeof(TRequest);
if (process.Activities.ContainsKey(type)) return this;
new ActivityInfo<TRequest, TReturn>(process, ActivityType.AtLeastOnce);
new ActivityInfo<TRequest, TReturn>(process);
Placements.Add(ReflectionServiceBuilder.DefaultPlacement<TRequest>(process.Affinities.Keys));
return this;
}
IServiceBuilder IServiceBuilder.DefineAtMostOnceActivity<TRequest, TReturn>()
{
var type = typeof(TRequest);
if (process.Activities.ContainsKey(type)) return this;
new ActivityInfo<TRequest, TReturn>(process, ActivityType.AtMostOnce);
Placements.Add(ReflectionServiceBuilder.DefaultPlacement<TRequest>(process.Affinities.Keys));
return this;
}
IServiceBuilder IServiceBuilder.DefineEvent<TEvent>()
{

Просмотреть файл

@ -0,0 +1,72 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
/// <summary>
/// A request message for an activity
/// </summary>
[DataContract]
internal class PerformActivity<TActivity> : RequestMessage
{
[DataMember]
public TActivity Request;
[IgnoreDataMember]
internal override MessageType MessageType => MessageType.PerformActivity;
public override string ToString()
{
return $"{base.ToString()} PerformActivity<{typeof(TActivity).Name}>";
}
internal override void Apply(Process process)
{
var activityInfo = process.Activities[typeof(TActivity)];
activityInfo.ProcessRequest(this);
}
}
/// <summary>
/// A response message from an activity
/// </summary>
[DataContract]
internal class RespondToActivity : ResultMessage
{
[IgnoreDataMember]
internal override MessageType MessageType => MessageType.RespondToActivity;
public override string ToString()
{
return $"{base.ToString()} RespondToActivity";
}
}
[DataContract]
internal class RecordActivityResult : Message
{
[DataMember]
public object Result;
[IgnoreDataMember]
internal override MessageType MessageType => MessageType.RecordActivityResult;
public override string ToString()
{
return $"o{Opid:D10} RecordActivityResult";
}
internal override void Apply(Process process)
{
if (process.PendingActivities.TryGetValue(Opid, out var activityState))
{
activityState.RecordResult(Result);
}
}
}
}

Просмотреть файл

@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
[DataContract]
internal class PerformDeterminize : RequestMessage
{
internal override MessageType MessageType => MessageType.PerformDeterminize;
internal override void Apply(Process process)
{
throw new NotImplementedException(); // is never sent to process
}
public override string ToString()
{
return $"{base.ToString()} PerformDeterminize";
}
}
}

Просмотреть файл

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
@ -10,7 +11,7 @@ namespace ReactiveMachine.Compiler
{
[DataContract]
internal class PerformEvent : QueuedMessage
internal abstract class EventMessage : QueuedMessage
{
[DataMember]
public IEvent Event;
@ -18,20 +19,14 @@ namespace ReactiveMachine.Compiler
[DataMember]
public int Position;
[DataMember]
public int PendingAcks;
[IgnoreDataMember]
protected List<IPartitionEffect> Effects; // we cache this because it can be a bit heavy to compute
[IgnoreDataMember]
internal override object Payload => Event;
public virtual PerformEvent NextMessage(ulong timestamp)
{
return new PerformEvent()
{ Clock = timestamp, Effects = Effects, Event = Event, Opid = Opid, Parent = Parent, Position = Position + 1 };
}
[IgnoreDataMember]
internal override MessageType MessageType => MessageType.PerformEvent;
internal override string LabelForTelemetry => Event.ToString();
public List<IPartitionEffect> GetEffects(Process process)
{
@ -47,6 +42,8 @@ namespace ReactiveMachine.Compiler
{
return GetCurrent(process).UntypedKey;
}
public abstract EventMessage NextMessage(ulong timestamp);
internal override void Apply(Process process)
{
@ -54,22 +51,117 @@ namespace ReactiveMachine.Compiler
affinityInfo.PartitionLock.EnterLock(this);
}
internal override object Execute<TKey>(Process process, ulong opid)
internal override void Enter<TKey>(Process process, TKey localkey, Stopwatch stopwatch, out bool isExiting)
{
var peff = (PartitionEffect<TKey>) GetCurrent(process);
var effects = GetEffects(process);
var timestamp = process.NextOpid;
bool isCoordinator = Position == effects.Count - 1;
// kick off any initializations if needed, counting how many acks we must wait for
var peff = (PartitionEffect<TKey>)GetCurrent(process);
foreach (var s in peff.AffectedStates)
((IStateInfoWithKey<TKey>)s).GetInstance(peff.Key.Key).Execute(Event, opid, true);
return null;
if (!((IStateInfoWithKey<TKey>)s).TryGetInstance(peff.Key.Key, out var instance, true, Opid))
{
PendingAcks++;
}
// if we are not the last partition with an effect, forward to next
if (! isCoordinator)
{
PendingAcks++;
var nextReq = NextMessage(timestamp);
var destination = nextReq.GetCurrent(process).Locate(process);
process.Send(destination, nextReq);
isExiting = false; // stays in the lock
}
else
{
// send commit messages to remote participants
// so they can apply the effects and then exit
for (int i = 0; i < effects.Count - 1; i++)
{
var key = effects[i].UntypedKey;
var destination = key.Locate(process);
var message = new CommitEvent() { PartitionKey = key, OriginalOpid = Opid };
message.Clock = timestamp;
message.Parent = Parent;
message.Opid = Opid;
process.Send(destination, message);
}
// return ack to orchestration
if (MessageType == MessageType.PerformEvent)
{
process.Send(process.GetOrigin(Opid), new AckEvent()
{ Opid = Opid, Parent = Parent, Clock = process.NextOpid });
}
TryCommit<TKey>(process, stopwatch, out isExiting);
}
}
public override string ToString()
private void TryCommit<TKey>(Process process, Stopwatch stopwatch, out bool isExiting)
{
return $"{base.ToString()} PerformEvent({Position})";
if (PendingAcks > 0)
{
isExiting = false;
return;
}
ApplyEffects<TKey>(process);
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: Opid.ToString(),
name: LabelForTelemetry.ToString(),
parent: Parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Event,
duration: stopwatch.Elapsed.TotalMilliseconds
);
isExiting = true;
}
internal override void Update<TKey>(Process process, TKey localkey, ProtocolMessage protocolMessage, Stopwatch stopwatch, out bool exiting)
{
PendingAcks--;
TryCommit<TKey>(process, stopwatch, out exiting);
}
private void ApplyEffects<TKey>(Process process)
{
// apply the event to all the affected states
var peff = (PartitionEffect<TKey>)GetCurrent(process);
foreach (var s in peff.AffectedStates)
{
((IStateInfoWithKey<TKey>)s).TryGetInstance(peff.Key.Key, out var instance, false);
instance.Execute(Event, Opid, StateOperation.Event);
}
}
}
[DataContract]
internal class ForkEvent : PerformEvent
internal class PerformEvent : EventMessage
{
public override string ToString()
{
return $"{base.ToString()} PerformEvent({Position})";
}
internal override MessageType MessageType => MessageType.PerformEvent;
public override EventMessage NextMessage(ulong timestamp)
{
return new PerformEvent()
{ Clock = timestamp, Effects = Effects, Event = Event,
LockedByCaller = LockedByCaller, PendingAcks = 0,
Opid = Opid, Parent = Parent, Position = Position + 1 };
}
}
[DataContract]
internal class ForkEvent : EventMessage
{
public override string ToString()
{
@ -78,15 +170,15 @@ namespace ReactiveMachine.Compiler
internal override MessageType MessageType => MessageType.ForkEvent;
public override PerformEvent NextMessage(ulong timestamp)
public override EventMessage NextMessage(ulong timestamp)
{
return new ForkEvent()
{ Clock = timestamp, Effects = Effects, Event = Event, Opid = Opid, Parent = Parent, Position = Position + 1 };
{ Clock = timestamp, Effects = Effects, Event = Event,
Opid = Opid, Parent = Parent, Position = Position + 1 };
}
}
[DataContract]
internal class AckEvent : ResponseMessage
{
@ -96,14 +188,18 @@ namespace ReactiveMachine.Compiler
{
return $"{base.ToString()} AckEvent";
}
}
internal override void Apply(Process process)
[DataContract]
internal class CommitEvent : ProtocolMessage
{
internal override MessageType MessageType => MessageType.CommitEvent;
public override string ToString()
{
process.OrchestrationStates[Parent].Continue(Opid, Clock, MessageType.AckEvent, UnitType.Value);
return $"{base.ToString()} CommitEvent";
}
}
}

Просмотреть файл

@ -10,9 +10,9 @@ namespace ReactiveMachine.Compiler
{
[DataContract]
internal class RequestFinish : RequestMessage
internal class PerformFinish : RequestMessage
{
internal override MessageType MessageType => MessageType.RequestFinish;
internal override MessageType MessageType => MessageType.PerformFinish;
internal override void Apply(Process process)
{
@ -33,7 +33,7 @@ namespace ReactiveMachine.Compiler
public override string ToString()
{
return $"{base.ToString()} RequestFinish";
return $"{base.ToString()} PerformFinish";
}
}
@ -49,11 +49,6 @@ namespace ReactiveMachine.Compiler
{
return $"{base.ToString()} AckFinish";
}
internal override void Apply(Process process)
{
process.OrchestrationStates[Parent].Continue(Opid, Clock, MessageType.AckEvent, UnitType.Value);
}
}

Просмотреть файл

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
[DataContract]
internal class AckInitialization : ProtocolMessage
{
internal override MessageType MessageType => MessageType.AckInitialization;
public override string ToString()
{
return $"{base.ToString()} AckInitialization";
}
}
}

Просмотреть файл

@ -3,82 +3,180 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
[DataContract]
internal class ForkLocal<TState> : QueuedMessage
internal abstract class LocalMessage<TState> : QueuedMessage
{
[DataMember]
public object LocalOperation;
[IgnoreDataMember]
private IStateInfo stateInfo;
private IStateInfo GetStateInfo(Process process)
protected IStateInfo GetStateInfo(Process process)
{
return stateInfo ?? (stateInfo = process.States[typeof(TState)]);
}
[IgnoreDataMember]
internal override MessageType MessageType => MessageType.ForkLocal;
[IgnoreDataMember]
internal override object Payload => LocalOperation;
internal override void Apply(Process process)
{
GetStateInfo(process).AffinityInfo.PartitionLock.EnterLock(this);
}
internal override void Enter<TKey>(Process process, TKey localkey, Stopwatch stopwatch, out bool isExiting)
{
var completed = Execute<TKey>(process, localkey, out var result);
if (!completed)
{ // we did not actually execute, but are waiting for initialization ack
isExiting = false;
return; // keep lock
}
if (MessageType != MessageType.ForkUpdate)
process.Send(process.GetOrigin(Opid), new RespondToLocal() { Opid = Opid, Parent = Parent, Result = result });
else
process.CheckForUnhandledException(result);
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: Opid.ToString(),
name: LabelForTelemetry,
parent: Parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Local,
duration: stopwatch.Elapsed.TotalMilliseconds
);
isExiting = true;
}
internal abstract bool Execute<TKey>(Process process, TKey localkey, out object result);
}
[DataContract]
internal abstract class LocalOperation<TState> : LocalMessage<TState>
{
[DataMember]
public object Operation;
internal override string LabelForTelemetry => Operation.ToString();
internal override IPartitionKey GetPartitionKey(Process process)
{
return GetStateInfo(process).GetPartitionKeyForLocalOperation(Operation);
}
internal override bool Execute<TKey>(Process process, TKey localkey, out object result)
{
var state = (IStateInfoWithKey<TKey>)process.States[typeof(TState)];
bool createIfNotExist = state.IsCreateIfNotExists(Operation.GetType());
var success = state.TryGetInstance(localkey, out var instance, createIfNotExist, Opid);
if (success)
{
result = instance.Execute(Operation, Opid, StateOperation.ReadOrUpdate);
return true;
}
else
{
if (!createIfNotExist)
{
result = process.Serializer.SerializeException(new KeyNotFoundException($"no state for key {localkey} ({typeof(TState)})"));
return true;
}
else
{
// we kicked off an initialization
result = null;
return false;
}
}
}
internal override void Update<TKey>(Process process, TKey localkey, ProtocolMessage protocolMessage, Stopwatch stopwatch, out bool exiting)
{
// try again now that initialization is done
Enter(process, localkey, stopwatch, out exiting);
}
}
[DataContract]
internal class ForkUpdate<TState> : LocalOperation<TState>
{
internal override MessageType MessageType => MessageType.ForkUpdate;
public override string ToString()
{
return $"{base.ToString()} ForkLocal<{typeof(TState).Name}>";
return $"{base.ToString()} ForkUpdate<{typeof(TState).Name}> {this.Operation}";
}
}
[DataContract]
internal class PerformLocal<TState> : LocalOperation<TState>
{
internal override MessageType MessageType => MessageType.PerformLocal;
public override string ToString()
{
return $"{base.ToString()} PerformLocal<{typeof(TState).Name}> {this.Operation}";
}
}
[DataContract]
internal class PerformPing<TState> : LocalMessage<TState>
{
[DataMember]
public IPartitionKey Key;
[IgnoreDataMember]
internal override string LabelForTelemetry => Key.ToString();
[IgnoreDataMember]
internal override MessageType MessageType => MessageType.PerformPing;
public override string ToString()
{
return $"{base.ToString()} PerformPing<{typeof(TState).Name}>";
}
internal override IPartitionKey GetPartitionKey(Process process)
{
return GetStateInfo(process).GetPartitionKeyForLocalOperation(LocalOperation);
return Key;
}
internal override object Execute<TKey>(Process process, ulong opid)
internal override bool Execute<TKey>(Process process, TKey localkey, out object result)
{
var state = (IStateInfoWithKey<TKey>)process.States[typeof(TState)];
var partitionKey = (PartitionKey<TKey>)GetPartitionKey(process);
var instance = state.GetInstance(partitionKey.Key);
return instance.Execute(LocalOperation, opid, false);
if (state.TryGetInstance(localkey, out _, false))
{
result = true;
}
else
{
result = false;
}
return true;
}
}
[DataContract]
internal class RequestLocal<TState> : ForkLocal<TState>
{
internal override MessageType MessageType => MessageType.RequestLocal;
public override string ToString()
{
return $"{base.ToString()} RequestLocal<{typeof(TState).Name}>";
}
}
/// <summary>
/// A response message from a local operation
/// </summary>
[DataContract]
internal class RespondToLocal : ResultMessage
{
internal override MessageType MessageType => MessageType.RespondToLocal;
internal override void Apply(Process process)
{
process.OrchestrationStates[Parent].Continue(Opid, Clock, MessageType.RespondToLocal, Result);
}
public override string ToString()
{
return $"{base.ToString()} RespondToLocal";
}
}
}

Просмотреть файл

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
@ -18,8 +19,11 @@ namespace ReactiveMachine.Compiler
[DataMember]
public int Position;
[DataMember]
public Dictionary<ulong, QueuedMessage> Requests;
[IgnoreDataMember]
internal override object Payload => null;
internal override string LabelForTelemetry => "Acquire";
public AcquireLock NextMessage(ulong timestamp)
{
@ -41,9 +45,52 @@ namespace ReactiveMachine.Compiler
affinityInfo.PartitionLock.EnterLock(this);
}
internal override object Execute<TKey>(Process process, ulong opid)
internal override void Enter<TKey>(Process process, TKey localkey, Stopwatch stopwatch, out bool exitImmediately)
{
return null;
var timestamp = process.NextOpid;
// if we are not the last partition to lock send next lock message
if (Position < LockSet.Count - 1)
{
var nextReq = NextMessage(timestamp);
var destination = nextReq.LockSet[nextReq.Position].Locate(process);
process.Send(destination, nextReq);
}
// if we are the last partition to lock return ack to orchestration
else
{
process.Send(process.GetOrigin(Opid), new GrantLock()
{
Opid = Opid,
Parent = Parent,
Clock = timestamp
});
}
exitImmediately = false; // stays in the lock
}
internal void Add(QueuedMessage request)
{
if (Requests == null)
Requests = new Dictionary<ulong, QueuedMessage>();
Requests.Add(request.Opid, request);
}
internal override void Update<TKey>(Process process, TKey localkey, ProtocolMessage protocolMessage, Stopwatch stopwatch, out bool exiting)
{
if (protocolMessage.OriginalOpid == this.Opid)
{
exiting = true;
}
else
{
exiting = false;
Requests[protocolMessage.OriginalOpid].Update(process, localkey, protocolMessage, stopwatch, out var innerIsExiting);
if (innerIsExiting)
Requests.Remove(protocolMessage.OriginalOpid);
}
}
public override string ToString()
@ -63,28 +110,13 @@ namespace ReactiveMachine.Compiler
return $"{base.ToString()} GrantLock";
}
internal override void Apply(Process process)
{
process.OrchestrationStates[Parent].Continue(Opid, Clock, MessageType.GrantLock, UnitType.Value);
}
}
}
[DataContract]
internal class ReleaseLock : Message
internal class ReleaseLock : ProtocolMessage
{
internal override MessageType MessageType => MessageType.ReleaseLock;
[DataMember]
public IPartitionKey Key;
[DataMember]
public ulong LockOpid;
internal override void Apply(Process process)
{
var PartitionLock = process.AffinityIndex[Key.Index].PartitionLock;
PartitionLock.ExitLock(Key, LockOpid);
}
public override string ToString()
{
return $"{base.ToString()} ReleaseLock";

Просмотреть файл

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
@ -41,53 +42,24 @@ namespace ReactiveMachine.Compiler
}
[DataContract]
internal abstract class QueuedMessage : RequestMessage
{
internal abstract IPartitionKey GetPartitionKey(Process process);
internal abstract object Execute<TKey>(Process process, ulong opid);
[IgnoreDataMember]
internal abstract object Payload { get; }
}
/// <summary>
/// A message that will trigger a continuation when received
/// A message that can advance the state machine at a partition lock
/// </summary>
[DataContract]
internal abstract class ResponseMessage : Message
{
// we call this on messages that are not already serialized/deserialized
internal virtual void AntiAlias(IDeepCopier deepCopier)
{
}
public override string ToString()
{
return $"o{Parent:D10}.o{Opid:D10}";
}
}
/// <summary>
/// A response message containing a result or exception
/// </summary>
[DataContract]
internal abstract class ResultMessage : ResponseMessage
internal abstract class ProtocolMessage : Message
{
[DataMember]
public object Result { get; set; }
public IPartitionKey PartitionKey;
internal override void AntiAlias(IDeepCopier deepCopier)
[DataMember]
public ulong OriginalOpid;
internal override void Apply(Process process)
{
if (! ((Result is Exception) || (Result is UnitType)))
Result = deepCopier.DeepCopy(Result);
var PartitionLock = process.AffinityIndex[PartitionKey.Index].PartitionLock;
PartitionLock.UpdateLock(this);
}
}
}

Просмотреть файл

@ -12,31 +12,41 @@ namespace ReactiveMachine.Compiler
{
None,
ForkOperation,
RequestOperation,
RespondToOperation,
ForkOrchestration,
PerformOrchestration,
RespondToOrchestration,
ForkLocal,
RequestLocal,
ForkUpdate,
PerformLocal,
RespondToLocal,
ForkEvent,
PerformEvent,
AckEvent,
UnlockEvent,
CommitEvent,
AcquireLock,
GrantLock,
ReleaseLock,
RequestExternal,
PerformActivity,
RecordActivityResult,
RespondToActivity,
RequestFinish,
PerformDeterminize,
RespondToDeterminize,
PerformFinish,
AckFinish,
PerformPing,
RespondToPing,
AckInitialization,
EnqueueStartup,
ExternalRequest
ExternalRequest,
RegisterProcess
}
internal static class MessageTypeExtensions
@ -45,7 +55,7 @@ namespace ReactiveMachine.Compiler
{
switch (type)
{
case MessageType.RespondToOperation:
case MessageType.RespondToOrchestration:
case MessageType.RespondToActivity:
case MessageType.RespondToLocal:
case MessageType.AckEvent:
@ -60,8 +70,8 @@ namespace ReactiveMachine.Compiler
{
switch (type)
{
case MessageType.ForkOperation:
case MessageType.ForkLocal:
case MessageType.ForkOrchestration:
case MessageType.ForkUpdate:
case MessageType.ForkEvent:
return true;
default:

Просмотреть файл

@ -8,60 +8,59 @@ using System.Text;
namespace ReactiveMachine.Compiler
{
[DataContract]
internal class ForkOperation<TOperation> : RequestMessage
internal abstract class OrchestrationMessage<TOrchestration> : RequestMessage
{
[DataMember]
public TOperation Request;
public TOrchestration Request;
}
internal override MessageType MessageType => MessageType.ForkOperation;
[DataContract]
internal class ForkOrchestration<TOrchestration> : OrchestrationMessage<TOrchestration>
{
internal override MessageType MessageType => MessageType.ForkOrchestration;
public override string ToString()
{
return $"{base.ToString()} ForkOperation<{typeof(TOperation).Name}>";
return $"{base.ToString()} ForkOrchestration<{typeof(TOrchestration).Name}> {Request}";
}
internal override void Apply(Process process)
{
var orchestrationInfo = process.Orchestrations[typeof(TOperation)];
orchestrationInfo.ProcessRequest(this);
var orchestrationInfo = process.Orchestrations[typeof(TOrchestration)];
orchestrationInfo.ProcessRequest(this, OrchestrationType.Fork);
}
}
[DataContract]
internal class RequestOperation<TOperation> : ForkOperation<TOperation>
internal class PerformOrchestration<TOrchestration> : OrchestrationMessage<TOrchestration>
{
internal override MessageType MessageType => MessageType.RequestOperation;
internal override MessageType MessageType => MessageType.PerformOrchestration;
public override string ToString()
{
return $"{base.ToString()} RequestOperation<{typeof(TOperation).Name}>";
return $"{base.ToString()} PerformOrchestration<{typeof(TOrchestration).Name}> {Request}";
}
internal override void Apply(Process process)
{
var orchestrationInfo = process.Orchestrations[typeof(TOperation)];
orchestrationInfo.ProcessRequest(this);
var orchestrationInfo = process.Orchestrations[typeof(TOrchestration)];
orchestrationInfo.ProcessRequest(this, OrchestrationType.Perform);
}
}
/// <summary>
/// A response message from an orchestration
/// </summary>
[DataContract]
internal class RespondToOperation : ResultMessage
internal class RespondToOrchestration : ResultMessage
{
internal override MessageType MessageType => MessageType.RespondToOperation;
internal override MessageType MessageType => MessageType.RespondToOrchestration;
public override string ToString()
{
return $"{base.ToString()} RespondToOperation";
}
internal override void Apply(Process process)
{
process.OrchestrationStates[Parent].Continue(Opid, Clock, MessageType.RespondToOperation, Result);
return $"{base.ToString()} RespondToOrchestration";
}
}
}

Просмотреть файл

@ -0,0 +1,50 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
[DataContract]
internal abstract class QueuedMessage : RequestMessage
{
internal abstract IPartitionKey GetPartitionKey(Process process);
internal abstract void Enter<TKey>(Process process, TKey localkey, Stopwatch stopwatch, out bool exitImmediately);
internal virtual void Update<TKey>(Process process, TKey localkey, ProtocolMessage protocolMessage, Stopwatch stopwatch, out bool exiting)
{
throw new NotImplementedException();
}
[IgnoreDataMember]
internal abstract string LabelForTelemetry { get; }
internal void OnEnter(Process process)
{
// trace enter
process.LockTracer?.Invoke($"p{process.ProcessId:D3} {GetPartitionKey(process)} Enter {Opid}");
// if this is a fork, add it to list of pending forked operations by this parent
if (MessageType.IsFork())
{
if (!process.FinishStates.TryGetValue(Parent, out var finishState))
process.FinishStates[Parent] = finishState = new FinishState(process, Parent);
finishState.AddPending(Opid);
}
}
internal void OnExit(Process process)
{
// trace exit
process.LockTracer?.Invoke($"p{process.ProcessId:D3} {GetPartitionKey(process)} Exit {Opid}");
// if this was a fork, remove it from the list of pending forked operations
if (MessageType.IsFork())
{
process.FinishStates[Parent].RemovePending(Opid);
}
}
}
}

Просмотреть файл

@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
/// <summary>
/// A message that will trigger a continuation in an orchestration waiting for a response
/// </summary>
[DataContract]
internal abstract class ResponseMessage : Message
{
// we call this on messages that are not already serialized/deserialized
internal virtual void AntiAlias(IDeepCopier deepCopier)
{
}
public override string ToString()
{
return $"o{Parent:D10}.o{Opid:D10}";
}
[IgnoreDataMember]
internal virtual object ResultForContinuation => UnitType.Value;
internal override void Apply(Process process)
{
if (process.OrchestrationStates.TryGetValue(Parent, out var orchestrationState))
{
orchestrationState.Continue(Opid, Clock, MessageType, ResultForContinuation);
}
else
{
throw new Exception("internal error: missing continuation");
}
}
}
/// <summary>
/// A response message containing a result or exception
/// </summary>
[DataContract]
internal abstract class ResultMessage : ResponseMessage
{
[DataMember]
public object Result { get; set; }
[IgnoreDataMember]
internal override object ResultForContinuation => Result;
internal override void AntiAlias(IDeepCopier deepCopier)
{
if (!((Result is Exception) || (Result is UnitType)))
Result = deepCopier.DeepCopy(Result);
}
internal override void Apply(Process process)
{
process.OrchestrationStates[Parent].Continue(Opid, Clock, MessageType, Result);
}
}
}

Просмотреть файл

@ -9,7 +9,6 @@ using System.Text;
namespace ReactiveMachine.Compiler
{
[DataContract]
internal class EnqueueStartup : RequestMessage
{
@ -26,7 +25,7 @@ namespace ReactiveMachine.Compiler
{
var opInfo = process.Orchestrations[orchestration.GetType()];
var opid = process.NextOpid;
opInfo.CanExecuteLocally(orchestration, out var dest);
opInfo.CanExecuteLocally(orchestration, opid, out var dest);
var msg = opInfo.CreateForkMessage(orchestration);
msg.Opid = opid;
msg.Clock = 0;
@ -53,7 +52,7 @@ namespace ReactiveMachine.Compiler
{
var opInfo = process.Orchestrations[Orchestration.GetType()];
var opid = process.NextOpid;
opInfo.CanExecuteLocally(Orchestration, out var dest);
opInfo.CanExecuteLocally(Orchestration, opid, out var dest);
var msg = opInfo.CreateForkMessage(Orchestration);
msg.Opid = opid;
msg.Clock = 0;
@ -62,23 +61,4 @@ namespace ReactiveMachine.Compiler
}
}
[DataContract]
internal class RespondToActivity : ResultMessage
{
[DataMember]
public Guid InstanceId;
internal override MessageType MessageType => MessageType.RespondToActivity;
public override string ToString()
{
return $"{base.ToString()} RespondToActivity {InstanceId}";
}
internal override void Apply(Process process)
{
if (process.OrchestrationStates.TryGetValue(Parent, out var orchestrationState))
orchestrationState.Continue(Opid, Clock, MessageType.RespondToActivity, Result);
}
}
}

Просмотреть файл

@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ReactiveMachine.Compiler
{
class FNVHash
{
// non-cryptographic fast hash FNV-1
// from https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function
const uint FnvPrime = unchecked(16777619);
const uint FnvOffsetBasis = unchecked(2166136261);
public static ulong ComputeHash(ulong opid)
{
unchecked
{
var hash = 0xcbf29ce484222325ul; // FNV_offset_basis
var prime = 0x100000001b3u; // FNV_prime
hash *= ((uint)opid & 0xFF);
hash ^= prime;
opid >>= 8;
hash *= ((uint)opid & 0xFF);
hash ^= prime;
opid >>= 8;
hash *= ((uint)opid & 0xFF);
hash ^= prime;
opid >>= 8;
hash *= ((uint)opid & 0xFF);
hash ^= prime;
return hash;
}
}
}
}

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using ReactiveMachine.Util;
using System;
using System.Collections.Generic;
using System.Text;
@ -71,10 +72,13 @@ namespace ReactiveMachine.Compiler
{
return new Func<string, uint, uint>((o, n) => JumpConsistentHash.Compute(o, n));
}
else if (typeof(ICustomKeyType<T>).IsAssignableFrom(typeof(T)))
{
return ((ICustomKeyType<T>)Activator.CreateInstance(typeof(T))).JumpConsistentHasher;
}
else
{
throw new BuilderException($"type {typeof(T)} cannot be used as a key");
//return new Func<sbyte, uint>((o) => ConsistentHashing.Compute(serializer.SerializeObject(o), num_buckets));
throw new BuilderException($"type {typeof(T)} cannot be used as a key.");
}
}
@ -124,9 +128,13 @@ namespace ReactiveMachine.Compiler
{
return new Func<T, uint, uint>((o, n) => Convert.ToUInt32(o) / chunksize % n);
}
else if (typeof(ICustomKeyType<T>).IsAssignableFrom(typeof(T)))
{
return ((ICustomKeyType<T>)Activator.CreateInstance(typeof(T))).RoundRobinPlacer;
}
else
{
throw new BuilderException($"type {typeof(T)} does not allow round-robin placement");
throw new BuilderException($"type {typeof(T)} does not allow round-robin placement.");
}
}
@ -192,9 +200,13 @@ namespace ReactiveMachine.Compiler
{
return new Func<T, T, int>((a, b) => Convert.ToUInt32(a).CompareTo(Convert.ToUInt32(b)));
}
else if (typeof(ICustomKeyType<T>).IsAssignableFrom(typeof(T)))
{
return ((ICustomKeyType<T>)Activator.CreateInstance(typeof(T))).Comparator;
}
else
{
throw new BuilderException($"type {typeof(T)} cannot be used as a key");
throw new BuilderException($"type {typeof(T)} cannot be used as a key.");
}
}
}

Просмотреть файл

@ -26,6 +26,8 @@ namespace ReactiveMachine.Compiler
[DataMember]
public TKey Key;
// we need not restore this, as it is only used by events and locked orchestrations
// that sort partition keys; and those are re-creating the keys when loaded back from storage
[IgnoreDataMember]
public Func<TKey, TKey, int> Comparator;

Просмотреть файл

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -15,42 +16,62 @@ namespace ReactiveMachine.Compiler
{
IEnumerable<Type> SerializableTypes();
IAffinityInfo AffinitizationKey { set; }
IAffinityInfo PlacementAffinity { set; }
bool DistributeRandomly { set; }
bool RequiresLocks(object request, out List<IPartitionKey> list);
bool CanExecuteLocally(object request, ulong opid, out uint destination);
void DefineExtensions(IServiceBuilder serviceBuilder);
void ProcessRequest(RequestMessage message);
RequestMessage CreateRequestMessage(object activity);
}
internal interface IActivityInfo<TReturn>
{
Task<TReturn> Perform(IActivityBase<TReturn> request, IOrchestrationState orchestrationState);
}
internal enum ActivityType
{
AtLeastOnce,
AtMostOnce,
}
internal class ActivityInfo<TRequest, TReturn> : IActivityInfo, IActivityInfo<TReturn>, IContext, ILogger
where TRequest : IActivityBase<TReturn>
internal class ActivityInfo<TRequest, TReturn> : IActivityInfo, IContext, ILogger
where TRequest : IActivity<TReturn>
{
private readonly Process process;
private readonly ActivityType type;
//TODO: honor this when launching activities
public IAffinityInfo AffinitizationKey { private get; set; }
public IAffinityInfo PlacementAffinity { private get; set; }
public ActivityInfo(Process process, ActivityType type)
public bool DistributeRandomly { private get; set; }
private readonly bool requirelocks;
public List<IAffinityInfo> AffinityList;
public ActivityInfo(Process process)
{
this.process = process;
this.type = type;
process.Activities[typeof(TRequest)] = this;
// use reflection to obtain affinity and locking information
var canRouteToPrefix = ReflectionServiceBuilder.GetGenericTypeNamePrefix(typeof(ICanRouteTo<>));
foreach (var i in typeof(TRequest).GetInterfaces())
if (ReflectionServiceBuilder.GetGenericTypeNamePrefix(i) == canRouteToPrefix)
{
var gt = i.GenericTypeArguments;
var affinityInfo = process.Affinities[gt[0]];
(AffinityList ?? (AffinityList = new List<IAffinityInfo>())).Add(affinityInfo);
}
var method = typeof(TRequest).GetMethod("Execute");
requirelocks = method.GetCustomAttributes(typeof(LockAttribute), false).Count() > 0;
if (requirelocks && (AffinityList == null || AffinityList.Count == 0))
throw new BuilderException($"To use {nameof(LockAttribute)} on Execute function of {typeof(TRequest).FullName}, you must define at least one affinity.");
}
public IEnumerable<Type> SerializableTypes()
{
yield return typeof(TRequest);
yield return typeof(TReturn);
yield return typeof(PerformActivity<TRequest>);
}
IExceptionSerializer IContext.ExceptionSerializer => process.Serializer;
@ -60,37 +81,43 @@ namespace ReactiveMachine.Compiler
Extensions.Register.DefineActivityExtensions<TRequest, TReturn>(serviceBuilder);
}
public Task<TReturn> Perform(IActivityBase<TReturn> r, IOrchestrationState orchestrationState)
public bool CanExecuteLocally(object request, ulong opid, out uint destination)
{
var request = (TRequest)r;
return orchestrationState.Activity(
(originalInstanceId) => RunWithTimeout(request, originalInstanceId == process.InstanceId),
request.ToString());
}
private async Task<TReturn> Execute(TRequest request, bool guaranteedFirst)
{
if (guaranteedFirst
|| type == ActivityType.AtLeastOnce)
if (PlacementAffinity != null)
{
return await request.Execute(this);
destination = PlacementAffinity.LocateAffinity(request);
}
else if (type == ActivityType.AtMostOnce)
else if (DistributeRandomly)
{
var a = (IAtMostOnceActivity<TReturn>)request;
return await a.AfterFault(this);
destination = (uint)(FNVHash.ComputeHash(opid) % process.NumberProcesses);
}
else
{
throw new Exception("internal error: unmatched activity type");
destination = process.ProcessId;
}
return destination == process.ProcessId;
}
private async Task<TReturn> RunWithTimeout(TRequest request, bool guaranteedFirst)
public RequestMessage CreateRequestMessage(object activity)
{
return new PerformActivity<TRequest>()
{
Request = (TRequest)activity,
};
}
public void ProcessRequest(RequestMessage req)
{
var request = (PerformActivity<TRequest>)req;
new ActivityState<TRequest, TReturn>(process, request);
}
public async Task<TReturn> RunWithTimeout(TRequest request)
{
var timelimit = request.TimeLimit;
var taskToComplete = Task.Run(() => Execute(request, guaranteedFirst));
var taskToComplete = Task.Run(() => request.Execute(this));
var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timelimit, timeoutCancellationTokenSource.Token));
@ -107,6 +134,28 @@ namespace ReactiveMachine.Compiler
throw new TimeoutException(String.Format($"Activity {request} has timed out after {0}.", timelimit));
}
public bool RequiresLocks(object request, out List<IPartitionKey> list)
{
if (!requirelocks)
{
list = null;
return false;
}
else
{
if (AffinityList.Count == 1)
list = AffinityList[0].GetAffinityKeys(request).ToList();
else
{
list = new List<IPartitionKey>();
foreach (var a in AffinityList)
foreach (var k in a.GetAffinityKeys(request))
list.Add(k);
}
return true;
}
}
ILogger IContext.Logger => this;
void ILogger.Log<TState1>(LogLevel logLevel, EventId eventId, TState1 state, Exception exception, Func<TState1, Exception, string> formatter)

Просмотреть файл

@ -35,6 +35,8 @@ namespace ReactiveMachine.Compiler
TKey GetAffinityKey(object obj);
uint LocateKey(TKey key);
PartitionKey<TKey> MakePartitionKey(TKey key);
}
internal abstract class AffinityInfo<TAffinity, TKey> : IAffinityInfo, IAffinityInfoByKeyType<TKey>
@ -58,6 +60,16 @@ namespace ReactiveMachine.Compiler
process.AffinityIndex.Add(this);
}
public PartitionKey<TKey> MakePartitionKey(TKey key)
{
return new PartitionKey<TKey>()
{
Key = key,
Index = Index,
Comparator = Comparator
};
}
public IPartitionLock PartitionLock { get; set; }
public abstract bool Singleton { get; }
@ -112,6 +124,7 @@ namespace ReactiveMachine.Compiler
}
}
}
}
internal class SingletonAffinityInfo<TAffinity> : AffinityInfo<TAffinity, UnitType>

Просмотреть файл

@ -13,15 +13,20 @@ namespace ReactiveMachine.Compiler
{
IEnumerable<Type> SerializableTypes();
bool CanExecuteLocally(object request, out uint destination);
void CallDerivedDefiner(IDerivedDefiner definer, IServiceBuilder builder);
void ProcessRequest(RequestMessage request);
bool CanExecuteLocally(object request, ulong opid, out uint destination);
void ProcessRequest(RequestMessage request, OrchestrationType orchestrationType);
RequestMessage CreateForkMessage(IOrchestration orchestration);
RequestMessage CreateRequestMessage(object orchestration);
IAffinityInfo PlacementAffinity { set; }
bool DistributeRandomly { set; }
bool RequiresLocks(object request, out List<IPartitionKey> list);
void DefineExtensions(IServiceBuilder serviceBuilder);
@ -30,73 +35,103 @@ namespace ReactiveMachine.Compiler
class OrchestrationInfo<TRequest, TReturn> : IOrchestrationInfo
where TRequest : IOrchestrationBase<TReturn>
{
private readonly Process process;
public readonly Process Process;
private readonly bool requirelocks;
private readonly bool IsInitialization;
public IAffinityInfo PlacementAffinity { private get; set; }
public bool DistributeRandomly { private get; set; }
public List<IAffinityInfo> Affinities;
public List<IAffinityInfo> AffinityList;
// constructor for user-defined orchestration
public OrchestrationInfo(Process process)
{
this.process = process;
this.Process = process;
process.Orchestrations[typeof(TRequest)] = this;
if (typeof(IInitializationRequest).IsAssignableFrom(typeof(TRequest)))
{
// this is an initialization orchestration
IsInitialization = true;
requirelocks = true;
}
else
{
IsInitialization = false;
// use reflection to obtain affinity and locking information
var canRouteToPrefix = ReflectionServiceBuilder.GetGenericTypeNamePrefix(typeof(ICanRouteTo<>));
foreach (var i in typeof(TRequest).GetInterfaces())
if (ReflectionServiceBuilder.GetGenericTypeNamePrefix(i) == canRouteToPrefix)
{
var gt = i.GenericTypeArguments;
var affinityInfo = process.Affinities[gt[0]];
(AffinityList ?? (AffinityList = new List<IAffinityInfo>())).Add(affinityInfo);
}
var canRouteToPrefix = ReflectionServiceBuilder.GetGenericTypeNamePrefix(typeof(ICanRouteTo<>));
foreach (var i in typeof(TRequest).GetInterfaces())
if (ReflectionServiceBuilder.GetGenericTypeNamePrefix(i) == canRouteToPrefix)
{
var gt = i.GenericTypeArguments;
var affinityInfo = process.Affinities[gt[0]];
(Affinities ?? (Affinities = new List<IAffinityInfo>())).Add(affinityInfo);
}
var method = typeof(TRequest).GetMethod("Execute");
requirelocks = method.GetCustomAttributes(typeof(LockAttribute), false).Count() > 0;
var method = typeof(TRequest).GetMethod("Execute");
requirelocks = method.GetCustomAttributes(typeof(LockAttribute), false).Count() > 0;
if (requirelocks && (Affinities == null || Affinities.Count == 0))
throw new BuilderException($"To use {nameof(LockAttribute)} on Execute function of {typeof(TRequest).FullName}, you must define at least one affinity.");
if (requirelocks && (AffinityList == null || AffinityList.Count == 0))
throw new BuilderException($"To use {nameof(LockAttribute)} on Execute function of {typeof(TRequest).FullName}, you must define at least one affinity.");
}
}
public IEnumerable<Type> SerializableTypes()
{
yield return typeof(TRequest);
yield return typeof(TReturn);
yield return typeof(ForkOperation<TRequest>);
yield return typeof(RequestOperation<TRequest>);
yield return typeof(RespondToOperation);
yield return typeof(ForkOrchestration<TRequest>);
yield return typeof(PerformOrchestration<TRequest>);
yield return typeof(RespondToOrchestration);
yield return typeof(AckInitialization);
yield return typeof(OrchestrationState<TRequest,TReturn>);
}
public void CallDerivedDefiner(IDerivedDefiner definer, IServiceBuilder builder)
{
definer.DefineForEachOrchestration<TRequest, TReturn>(builder);
}
public void DefineExtensions(IServiceBuilder serviceBuilder)
{
Extensions.Register.DefineOrchestrationExtensions<TRequest, TReturn>(serviceBuilder);
}
public bool CanExecuteLocally(object request, out uint destination)
public bool CanExecuteLocally(object request, ulong opid, out uint destination)
{
if (PlacementAffinity != null)
{
destination = PlacementAffinity.LocateAffinity(request);
}
else if (DistributeRandomly)
{
destination = (uint)(FNVHash.ComputeHash(opid) % Process.NumberProcesses);
}
else
{
destination = process.ProcessId;
destination = Process.ProcessId;
}
return destination == process.ProcessId;
return destination == Process.ProcessId;
}
public void ProcessRequest(RequestMessage request)
public void ProcessRequest(RequestMessage request, OrchestrationType orchestrationType)
{
var state = new OrchestrationState<TRequest, TReturn>((ForkOperation<TRequest>)request);
process.OrchestrationStates[request.Opid] = state;
state.StartOrResume(process, this);
new OrchestrationState<TRequest, TReturn>(
Process,
this,
request.Opid,
((OrchestrationMessage<TRequest>)request).Request,
orchestrationType,
request.LockedByCaller,
request.Parent,
request.Clock);
}
public RequestMessage CreateForkMessage(IOrchestration orchestration)
{
return new ForkOperation<TRequest>()
return new ForkOrchestration<TRequest>()
{
Request = (TRequest) orchestration
};
@ -104,7 +139,7 @@ namespace ReactiveMachine.Compiler
public RequestMessage CreateRequestMessage(object orchestration)
{
return new RequestOperation<TRequest>()
return new PerformOrchestration<TRequest>()
{
Request = (TRequest) orchestration,
};
@ -117,14 +152,21 @@ namespace ReactiveMachine.Compiler
list = null;
return false;
}
else if (IsInitialization)
{
list = new List<IPartitionKey>() {
((IInitializationRequest)request).GetPartitionKey()
};
return true;
}
else
{
if (Affinities.Count == 1)
list = Affinities[0].GetAffinityKeys(request).ToList();
if (AffinityList.Count == 1)
list = AffinityList[0].GetAffinityKeys(request).ToList();
else
{
list = new List<IPartitionKey>();
foreach (var a in Affinities)
foreach (var a in AffinityList)
foreach (var k in a.GetAffinityKeys(request))
list.Add(k);
}

Просмотреть файл

@ -3,10 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
namespace ReactiveMachine.Compiler
{
@ -16,7 +13,7 @@ namespace ReactiveMachine.Compiler
void ProcessSubscriptions();
RequestMessage CreateLocalMessage(object local, ulong parent, bool fork);
RequestMessage CreateLocalMessage(object local, ulong parent, MessageType mtype);
IAffinityInfo AffinityInfo { get; }
@ -24,13 +21,13 @@ namespace ReactiveMachine.Compiler
IPartitionKey GetPartitionKeyForLocalOperation(object localOperation);
void DefineExtensions(IServiceBuilder serviceBuilder);
void DefineExtensions(IServiceBuilder serviceBuilder);
}
internal interface IStateInfoWithKey<TKey>
{
IStateInstance GetInstance(TKey key);
bool TryGetInstance(TKey key, out IStateInstance instance, bool createIfNotExist, ulong parent = 0);
bool IsCreateIfNotExists(Type operationType);
}
internal interface IStateInfo<TState>
@ -39,14 +36,21 @@ namespace ReactiveMachine.Compiler
}
internal class StateInfo<TState, TAffinity, TKey> : IStateInfo, IStateInfo<TState>, IStateInfoWithKey<TKey>
internal interface IStateInfoWithStateAndKey<TState, TKey>
{
void SetInitializationResult(TKey key, TState state);
}
internal class StateInfo<TState, TAffinity, TKey> : IStateInfo, IStateInfo<TState>, IStateInfoWithKey<TKey>, IStateInfoWithStateAndKey<TState, TKey>
where TState : IState<TAffinity>, new()
where TAffinity : IAffinitySpec<TAffinity>
{
private readonly Process process;
private readonly Dictionary<TKey, StateContext<TState, TAffinity,TKey>> states = new Dictionary<TKey, StateContext<TState, TAffinity, TKey>>();
private readonly AffinityInfo<TAffinity,TKey> keyInfo;
private readonly List<IOperationInfo> operations = new List<IOperationInfo>();
private readonly Dictionary<Type,IOperationInfo> operations = new Dictionary<Type, IOperationInfo>();
private bool HasInitializer;
public StateInfo(Process process, AffinityInfo<TAffinity, TKey> keyInfo)
{
@ -60,11 +64,12 @@ namespace ReactiveMachine.Compiler
{
yield return typeof(TState);
yield return typeof(StateState<TState>);
yield return typeof(ForkLocal<TState>);
yield return typeof(RequestLocal<TState>);
yield return typeof(ForkUpdate<TState>);
yield return typeof(PerformLocal<TState>);
yield return typeof(PerformPing<TState>);
yield return typeof(RespondToLocal);
foreach (var o in operations)
foreach (var t in o.SerializableTypes())
foreach (var kvp in operations)
foreach (var t in kvp.Value.SerializableTypes())
yield return t;
}
@ -85,23 +90,65 @@ namespace ReactiveMachine.Compiler
states.Clear();
}
public StateContext<TState, TAffinity, TKey> GetInstance(TKey key)
public void Restore(object keyValue, TState state)
{
var key = (TKey)keyValue;
if (!states.TryGetValue(key, out var instance))
{
states[key] = instance = CreateStateInstance(key);
states[key] = instance = new StateContext<TState, TAffinity, TKey>(process, key);
}
return instance;
instance.Restore(state);
}
IStateInstance IStateInfoWithKey<TKey>.GetInstance(TKey key)
bool IStateInfoWithKey<TKey>.TryGetInstance(TKey key, out IStateInstance instance, bool createIfNotExist, ulong parent)
{
return GetInstance(key);
}
if (states.TryGetValue(key, out var entry))
{
instance = entry;
return true;
}
else
{
if (createIfNotExist)
{
instance = states[key] = new StateContext<TState, TAffinity, TKey>(process, key);
public StateContext<TState, TAffinity, TKey> CreateStateInstance(TKey key)
{
return new StateContext<TState, TAffinity, TKey>(process, key);
if (!HasInitializer)
{
return true;
}
else
{
var initialization = new Initialization<TState, TKey>()
{
PartitionKey = keyInfo.MakePartitionKey(key),
Singleton = keyInfo.Singleton,
};
var orchestrationInfo =
(OrchestrationInfo<Initialization<TState, TKey>, UnitType>)
process.Orchestrations[typeof(Initialization<TState, TKey>)];
var clock = process.OperationCounter;
var orchestrationState =
new OrchestrationState<Initialization<TState, TKey>, UnitType>(
process,
orchestrationInfo,
process.NextOpid,
initialization,
OrchestrationType.Initialize,
true,
parent,
clock);
return false;
}
}
else
{
instance = null;
return false;
}
}
}
@ -112,6 +159,9 @@ namespace ReactiveMachine.Compiler
// use reflection to find the interfaces, check the types, and subscribe
var singletonSubscribeName = typeof(ISubscribe<,>).Name;
var partitionedSubscribeName = typeof(ISubscribe<,,>).Name;
var singletonInitialization = typeof(IInitialize).Name;
var partitionedInitialization = typeof(IInitialize<>).Name;
foreach (var i in typeof(TState).GetInterfaces())
if (i.Name == singletonSubscribeName || i.Name == partitionedSubscribeName)
{
@ -142,6 +192,28 @@ namespace ReactiveMachine.Compiler
}
eventInfo.Subscribe(this);
}
else if (i.Name == singletonInitialization || i.Name == partitionedInitialization)
{
if (i.Name == singletonInitialization)
{
if (!keyInfo.Singleton)
throw new BuilderException($"invalid initialization interface on state {typeof(TState).Name}: must use {partitionedInitialization} with keytype parameter");
}
else if (i.Name == partitionedInitialization)
{
if (keyInfo.Singleton)
throw new BuilderException($"invalid initialization interface on state {typeof(TState).Name}: must use {singletonInitialization}");
var keytype = i.GenericTypeArguments[0];
if (keytype != typeof(TKey))
{
throw new BuilderException($"invalid initialization interface on state {typeof(TState).Name}: must use key type {keytype.Name}");
}
}
HasInitializer = true;
}
if (HasInitializer)
new OrchestrationInfo<Initialization<TState, TKey>,UnitType>(process);
}
public IPartitionKey GetPartitionKeyForLocalOperation(object localOperation)
@ -154,53 +226,63 @@ namespace ReactiveMachine.Compiler
};
}
public void Restore(object keyValue, TState state)
{
var instance = GetInstance((TKey) keyValue);
instance.Restore(state);
}
public RequestMessage CreateForkLocalMessage(object op, ulong parent)
{
return new ForkLocal<TState>()
return new ForkUpdate<TState>()
{
LocalOperation = op,
Operation = op,
Parent = parent
};
}
public RequestMessage CreateLocalMessage(object op, ulong parent, bool fork)
public RequestMessage CreateLocalMessage(object payload, ulong parent, MessageType mtype)
{
if (fork)
switch (mtype)
{
return new ForkLocal<TState>()
{
LocalOperation = op,
Parent = parent
};
}
else
{
return new RequestLocal<TState>()
{
LocalOperation = op,
Parent = parent
};
case MessageType.ForkUpdate:
return new ForkUpdate<TState>()
{
Operation = payload,
Parent = parent
};
case MessageType.PerformLocal:
return new PerformLocal<TState>()
{
Operation = payload,
Parent = parent
};
case MessageType.PerformPing:
return new PerformPing<TState>()
{
Key = (IPartitionKey) payload,
Parent = parent
};
default: throw new Exception("unhandled case");
}
}
public void RegisterOperation<TRequest, TReturn>(bool isRead)
{
operations.Add(new OperationInfo<TRequest,TReturn>()
{
IsRead = isRead
});
operations.Add(typeof(TRequest), new OperationInfo<TRequest, TReturn>(isRead));
}
public void DefineExtensions(IServiceBuilder serviceBuilder)
{
foreach (var op in operations)
op.DefineExtensions(serviceBuilder);
foreach (var kvp in operations)
kvp.Value.DefineExtensions(serviceBuilder);
}
public bool IsCreateIfNotExists(Type operationType)
{
return operations[operationType].CreateIfNotExists;
}
public void SetInitializationResult(TKey key, TState state)
{
states[key].SetInitializationResult(state);
}
public interface IOperationInfo
@ -208,11 +290,25 @@ namespace ReactiveMachine.Compiler
IEnumerable<Type> SerializableTypes();
void DefineExtensions(IServiceBuilder serviceBuilder);
bool CreateIfNotExists { get; }
}
public class OperationInfo<TRequest, TReturn> : IOperationInfo
{
public bool IsRead;
public bool CreateIfNotExists { get; private set; }
public OperationInfo(bool isRead)
{
this.IsRead = isRead;
var method = typeof(TRequest).GetMethod("Execute");
CreateIfNotExists = method.GetCustomAttributes(typeof(CreateIfNotExistsAttribute), false).Count() > 0;
if (isRead && CreateIfNotExists)
throw new BuilderException($"The attribute {nameof(CreateIfNotExistsAttribute)} is not allowed on read operations. Consider making this an update operation instead.");
}
public IEnumerable<Type> SerializableTypes()
{

Просмотреть файл

@ -0,0 +1,121 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace ReactiveMachine.Compiler
{
internal interface IActivityState
{
void RecordResult(object result);
void BecomePrimary();
void SaveStateTo(Snapshot snapshot);
string Name { get; }
}
[DataContract]
internal class ActivityState<TRequest, TReturn> : IActivityState, IRestorable
where TRequest : IActivity<TReturn>
{
[DataMember]
private PerformActivity<TRequest> message;
[IgnoreDataMember]
private Process process;
[IgnoreDataMember]
private Task task;
public string Name => message.Request.ToString();
public ActivityState(Process process, PerformActivity<TRequest> message)
{
this.process = process;
this.message = message;
process.PendingActivities.Add(message.Opid, this);
if (process.IsPrimary)
{
task = Start();
}
}
public void BecomePrimary()
{
task = Start();
}
public void SaveStateTo(Snapshot snapshot)
{
snapshot.StatePieces.Add(this);
}
public void RestoreStateTo(Process process)
{
this.process = process;
}
private async Task Start()
{
process.ActivityTracer?.Invoke($" Starting activity o{message.Opid:D10} {message.Request}");
var stopwatch = process.Telemetry != null ? new Stopwatch() : null;
stopwatch?.Start();
var info = (ActivityInfo<TRequest, TReturn>)process.Activities[typeof(TRequest)];
object r;
try
{
r = await info.RunWithTimeout(message.Request);
}
catch (Exception e)
{
r = process.Serializer.SerializeException(e);
}
stopwatch?.Stop();
process.ActivityTracer?.Invoke($" Completed activity o{message.Opid:D10} {message.Request}");
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: message.Opid.ToString(),
name: message.Request.ToString(),
parent: message.Parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Activity,
duration: stopwatch.Elapsed.TotalMilliseconds
);
process.HostServices.Send(process.ProcessId, new RecordActivityResult()
{
Opid = message.Opid,
Parent = message.Parent,
Clock = message.Clock,
Result = r
});
}
public virtual void RecordResult(object result)
{
process.HostServices.Send(process.GetOrigin(message.Opid), new RespondToActivity()
{
Opid = message.Opid,
Parent = message.Parent,
Clock = message.Clock,
Result = result
});
process.PendingActivities.Remove(message.Opid);
}
}
}

Просмотреть файл

@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace ReactiveMachine.Compiler
{
internal interface IInitializationRequest
{
IPartitionKey GetPartitionKey();
void SetStateToFinalResult(Process process);
}
// the following objects mimic requests issued by users but are actually generated
// only from within the runtime, in StateInstance.
[DataContract]
internal class Initialization<TState, TKey> : IOrchestration<UnitType>, IInitializationRequest
where TState : new()
{
[DataMember]
public PartitionKey<TKey> PartitionKey { get; set; }
[DataMember]
public bool Singleton;
[IgnoreDataMember]
public TState State;
public async Task<UnitType> Execute(IOrchestrationContext context)
{
State = new TState(); // start with default constructor
if (Singleton)
await ((IInitialize)State).OnInitialize((IInitializationContext)context);
else
await ((IInitialize<TKey>)State).OnInitialize((IInitializationContext)context, PartitionKey.Key);
return UnitType.Value;
}
IPartitionKey IInitializationRequest.GetPartitionKey()
{
return PartitionKey;
}
public void SetStateToFinalResult(Process process)
{
((IStateInfoWithStateAndKey<TState, TKey>)process.States[typeof(TState)]).SetInitializationResult(PartitionKey.Key, State);
}
}
}

Просмотреть файл

@ -31,9 +31,11 @@ namespace ReactiveMachine.Compiler
public Stopwatch Stopwatch;
}
[IgnoreDataMember]
public bool IsEmpty => head == null;
public QueuedMessage Holder => head.Request;
public void Enqueue(QueuedMessage request, Stopwatch stopwatch)
{
Process.LockTracer?.Invoke($"p{Process.ProcessId:D3} {request.GetPartitionKey(Process)} Enqueue {request.Opid}");
@ -41,7 +43,7 @@ namespace ReactiveMachine.Compiler
{
Request = request,
Stopwatch = stopwatch,
};
};
if (head == null)
{
head = tail = rinfo;
@ -53,37 +55,23 @@ namespace ReactiveMachine.Compiler
}
}
public void Remove(ulong opid, out QueuedMessage msg)
public void Update<TKey>(TKey localkey, ProtocolMessage protocolMessage)
{
if (head.Request.Opid == opid)
{
msg = head.Request;
head = head.Next;
}
else
{
var pos = head;
while (pos.Next.Request.Opid != opid)
{
pos = pos.Next;
}
msg = pos.Next.Request;
if (tail == pos.Next)
tail = pos;
pos.Next = pos.Next.Next;
}
Process.LockTracer?.Invoke($"p{Process.ProcessId:D3} {msg.GetPartitionKey(Process)} Removed {opid}");
}
if (head == null)
throw new Exception("internal error: received update for request not holding lock");
public void EnterNextHolder<TKey>(TKey localkey, Func<ulong,TKey,QueuedMessage,Stopwatch,MessageType,ulong,bool> entering)
{
// move head forward past all entries that immediately leave the lock after entering
while (head != null && !entering(head.Request.Opid, localkey, head.Request, head.Stopwatch, head.Request.MessageType, head.Request.Parent))
head.Request.Update(Process, localkey, protocolMessage, head.Stopwatch, out var exiting);
while (exiting)
{
if (head.Request.MessageType.IsFork())
Process.FinishStates[head.Request.Parent].RemovePending(head.Request.Opid);
head.Request.OnExit(Process);
head = head.Next;
continue;
if (head == null)
return;
head.Request.OnEnter(Process);
head.Request.Enter(Process, localkey, head.Stopwatch, out exiting);
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license.
using Microsoft.Extensions.Logging;
using ReactiveMachine.Extensions;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -19,27 +20,49 @@ namespace ReactiveMachine.Compiler
void SaveStateTo(Snapshot snapshot);
Task<T> Activity<T>(Func<Guid, Task<T>> body, string name);
KeyValuePair<ulong, string>? WaitingFor { get; }
KeyValuePair<ulong, string> WaitingFor { get; }
object RequestObject { get; }
}
internal enum OrchestrationType
{
Perform,
Fork,
Initialize
}
[DataContract]
internal class OrchestrationState<TRequest, TReturn> : IOrchestrationState, IOrchestrationContext, IReadOrchestrationContext, ILogger
internal class OrchestrationState<TRequest, TReturn> : IOrchestrationState, IInitializationContext, IReadOrchestrationContext, ILogger
where TRequest : IOrchestrationBase<TReturn>
{
public OrchestrationState(ForkOperation<TRequest> request)
public OrchestrationState(
Process process,
OrchestrationInfo<TRequest, TReturn> info,
ulong opid,
TRequest request,
OrchestrationType orchestrationType,
bool lockedByCaller,
ulong parent,
ulong clock)
{
this.Opid = request.Opid;
this.Request = request.Request;
this.Parent = request.Parent;
this.StartingClock = request.Clock;
this.ExpectsResponse = !request.MessageType.IsFork();
this.LockedByCaller = request.LockedByCaller;
this.Opid = opid;
this.Request = request;
this.Parent = parent;
this.StartingClock = clock;
this.LockedByCaller = lockedByCaller;
this.OrchestrationType = orchestrationType;
this.History = new List<Record>();
this.Continuations = new Dictionary<ulong, IContinuationInfo>();
this.process = process;
this.info = info;
process.OrchestrationStates[opid] = this;
StartOrResume();
}
[DataMember]
public readonly ulong Opid;
@ -47,7 +70,7 @@ namespace ReactiveMachine.Compiler
public readonly TRequest Request;
[DataMember]
public readonly bool ExpectsResponse;
public readonly OrchestrationType OrchestrationType;
[DataMember]
internal bool LockedByCaller;
@ -86,7 +109,7 @@ namespace ReactiveMachine.Compiler
private Process process;
[IgnoreDataMember]
private OrchestrationInfo<TRequest,TReturn> info;
private OrchestrationInfo<TRequest, TReturn> info;
[IgnoreDataMember]
internal int HistoryPosition;
@ -106,7 +129,8 @@ namespace ReactiveMachine.Compiler
[IgnoreDataMember]
private ulong LockOpid;
public object RequestObject => Request;
private interface IContinuationInfo
{
void Continue(OrchestrationState<TRequest, TReturn> state, ulong opid, ulong clock, MessageType type, object value);
@ -145,10 +169,8 @@ namespace ReactiveMachine.Compiler
IExceptionSerializer IContext.ExceptionSerializer => process.Serializer;
public void StartOrResume(Process process, OrchestrationInfo<TRequest,TReturn> info)
public void StartOrResume()
{
this.process = process;
this.info = info;
Clock = StartingClock;
Continuations = new Dictionary<ulong, IContinuationInfo>();
@ -177,10 +199,13 @@ namespace ReactiveMachine.Compiler
public void RestoreStateTo(Process process)
{
this.process = process;
this.info = (OrchestrationInfo<TRequest, TReturn>)process.Orchestrations[typeof(TRequest)];
process.RecordReplayTracer?.Invoke($"[o{Opid:D10}-Replay] Replaying o{Opid:D10} {typeof(TRequest)}");
process.OrchestrationStates[Opid] = this;
StartOrResume(process, (OrchestrationInfo<TRequest,TReturn>) process.Orchestrations[typeof(TRequest)]);
StartOrResume();
Debug.Assert(Continuations.Count != 0);
while (HistoryPosition < History.Count)
@ -192,7 +217,6 @@ namespace ReactiveMachine.Compiler
}
}
public void Continue(ulong opid, ulong clock, MessageType type, object value)
{
Continuations.TryGetValue(opid, out var continuationInfo);
@ -209,10 +233,10 @@ namespace ReactiveMachine.Compiler
public override string ToString()
{
return $"{Opid}-{Request.GetType().FullName}";
return $"o{Opid:d10}-{Request}";
}
public KeyValuePair<ulong, string> WaitingFor
public KeyValuePair<ulong, string>? WaitingFor
{
get
{
@ -220,11 +244,10 @@ namespace ReactiveMachine.Compiler
if (kvp.Value != null)
return new KeyValuePair<ulong, string>(kvp.Key, kvp.Value.ToString());
else
return default(KeyValuePair<ulong, string>);
return null;
}
}
public async Task<Random> NewRandom()
{
return new Random(await Determinize(new Random().Next()));
@ -247,7 +270,10 @@ namespace ReactiveMachine.Compiler
public Task<T> Determinize<T>(T value)
{
return Activity<T>((replayed) => Task.FromResult(value), $"Determinize<{typeof(T).Name}>");
return PerformActivity(new DeterminizationActivity<T>()
{
Value = value
});
}
public TConfiguration GetConfiguration<TConfiguration>()
@ -260,74 +286,6 @@ namespace ReactiveMachine.Compiler
process.HostServices.GlobalShutdown?.Invoke();
}
public Task<T> Activity<T>(Func<Guid, Task<T>> body, string name)
{
// on send, always issue the activity (but pass recorded instance id, if replaying)
RecordOrReplayCall(MessageType.RequestExternal, out var opid, out var clock, out var instanceId);
process.AddActivity(opid, name, new Task(async () =>
{
var instanceIdOnStart = process.InstanceId;
process.ActivityTracer?.Invoke($" Starting activity o{opid:D10} {name} {instanceIdOnStart}");
var stopwatch = process.Telemetry != null ? new Stopwatch() : null;
stopwatch?.Start();
object r;
try
{
r = await body(instanceId);
}
catch (Exception e)
{
r = process.Serializer.SerializeException(e);
}
stopwatch?.Stop();
process.ActivityTracer?.Invoke($" Completed activity o{opid:D10} {name} {instanceIdOnStart}");
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: opid.ToString(),
name: name,
parent: Opid.ToString(),
opSide: OperationSide.Caller,
opType: OperationType.Activity,
duration: stopwatch.Elapsed.TotalMilliseconds
);
process.HostServices.Send(process.ProcessId, new RespondToActivity()
{
Opid = opid,
Parent = this.Opid,
Clock = clock,
InstanceId = instanceIdOnStart,
Result = r
});
}));
// replay the return or create a continuation
if (!ReplayReturn(MessageType.RespondToActivity, opid, out var result))
{
var continuationInfo = new ContinuationInfo<T>(name, OperationType.Activity);
Continuations[opid] = continuationInfo;
return continuationInfo.Tcs.Task;
}
else
{
process.RemoveActivity(opid);
if (process.Serializer.DeserializeException(result, out var e))
{
return Task.FromException<T>(e);
}
else
{
return Task.FromResult((T)result);
}
}
}
public async Task Run()
{
Stopwatch messageProcessingTimer = new Stopwatch();
@ -370,38 +328,53 @@ namespace ReactiveMachine.Compiler
messageProcessingTimer.Stop();
var elapsed = messageProcessingTimer.Elapsed.TotalMilliseconds;
if (ExpectsResponse)
{
var response = new RespondToOperation()
{
Opid = Opid,
Result = result,
Clock = Clock,
Parent = Parent,
};
process.Send(process.GetOrigin(Opid), response);
}
else if (process.FinishStates.TryGetValue(Parent, out var finishState))
{
finishState.RemovePending(Opid);
}
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: Opid.ToString(),
name: Request.ToString(),
parent: Parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Orchestration,
duration: elapsed
);
processId: process.ProcessId,
id: Opid.ToString(),
name: Request.ToString(),
parent: Parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Orchestration,
duration: elapsed
);
if (!ExpectsResponse && process.Serializer.DeserializeException(result, out var exceptionResult))
switch (OrchestrationType)
{
process.HandleGlobalException(exceptionResult);
case OrchestrationType.Perform:
var response = new RespondToOrchestration()
{
Opid = Opid,
Result = result,
Clock = Clock,
Parent = Parent,
};
process.Send(process.GetOrigin(Opid), response);
break;
case OrchestrationType.Fork:
process.CheckForUnhandledException(result);
if (process.FinishStates.TryGetValue(Parent, out var finishState))
{
finishState.RemovePending(Opid);
}
break;
case OrchestrationType.Initialize:
process.CheckForUnhandledException(result);
var initializationRequest = (IInitializationRequest)Request;
initializationRequest.SetStateToFinalResult(process);
var initresponse = new AckInitialization()
{
OriginalOpid = Parent,
PartitionKey = initializationRequest.GetPartitionKey(),
Opid = Opid,
Clock = Clock,
Parent = Parent,
};
process.Send(process.GetOrigin(Opid), initresponse);
break;
}
}
private bool RecordOrReplayCall(MessageType type, out ulong opid, out ulong clock, out Guid instanceId)
{
if (HistoryPosition < History.Count)
@ -411,7 +384,7 @@ namespace ReactiveMachine.Compiler
System.Diagnostics.Debug.Assert(entry.clock == Clock);
clock = entry.clock;
opid = entry.opid;
instanceId = (entry.type == MessageType.RequestExternal) ? (Guid)entry.value : process.InstanceId;
instanceId = (entry.type == MessageType.PerformActivity) ? (Guid)entry.value : process.InstanceId;
process.RecordReplayTracer?.Invoke($"[o{Opid:D10}-Replay] {entry}");
return true;
}
@ -421,7 +394,7 @@ namespace ReactiveMachine.Compiler
opid = process.NextOpid;
clock = Clock;
var entry = new Record() { type = type, opid = opid, clock = Clock };
if (type == MessageType.RequestExternal) entry.value = instanceId = process.InstanceId;
if (type == MessageType.PerformActivity) entry.value = instanceId = process.InstanceId;
History.Add(entry);
HistoryPosition++;
process.RecordReplayTracer?.Invoke($" [o{Opid:D10}-Record] {entry}");
@ -443,7 +416,7 @@ namespace ReactiveMachine.Compiler
}
else
{
if(!ExpectsResponse && Continuations.Count == 0)
if(OrchestrationType == OrchestrationType.Fork && Continuations.Count == 0)
{
if (!process.FinishStates.TryGetValue(Parent, out var finishState))
process.FinishStates[Parent] = finishState = new FinishState(process, Parent);
@ -453,7 +426,6 @@ namespace ReactiveMachine.Compiler
return false;
}
}
public void OnResult<TReturn2>(MessageType type, ulong opid, ulong clock, TaskCompletionSource<TReturn2> tcs, string opName, OperationType opType, double elapsed, object value)
{
if (HistoryPosition < History.Count)
@ -471,9 +443,6 @@ namespace ReactiveMachine.Compiler
HistoryPosition++;
Continuations.Remove(opid);
if (type == MessageType.RespondToActivity)
process.RemoveActivity(opid);
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: opid.ToString(),
@ -498,14 +467,16 @@ namespace ReactiveMachine.Compiler
Task AcquireLocks()
{
if (!RecordOrReplayCall(MessageType.AcquireLock, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.AcquireLock, out var opid, out var clock, out _))
{
var destination = LockSet[0].Locate(process);
var message = new AcquireLock();
message.Opid = opid;
message.Clock = clock;
message.Parent = this.Opid;
message.LockSet = LockSet;
var message = new AcquireLock
{
Opid = opid,
Clock = clock,
Parent = this.Opid,
LockSet = LockSet
};
process.Send(destination, message);
}
LockOpid = opid;
@ -523,12 +494,12 @@ namespace ReactiveMachine.Compiler
void ReleaseLocks()
{
if (!RecordOrReplayCall(MessageType.ReleaseLock, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.ReleaseLock, out var opid, out var clock, out _))
{
foreach (var key in LockSet)
{
var destination = key.Locate(process);
var message = new ReleaseLock() { Key = key, LockOpid = LockOpid};
var message = new ReleaseLock() { PartitionKey = key, OriginalOpid = LockOpid};
message.Opid = opid;
message.Clock = clock;
message.Parent = this.Opid;
@ -539,11 +510,11 @@ namespace ReactiveMachine.Compiler
public void ForkOrchestration<TReturn2>(IOrchestration<TReturn2> orchestration)
{
if (!RecordOrReplayCall(MessageType.ForkOperation, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.ForkOrchestration, out var opid, out var clock, out _))
{
if (!process.Orchestrations.TryGetValue(orchestration.GetType(), out var orchestrationInfo))
throw new BuilderException($"undefined orchestration type {orchestration.GetType().FullName}.");
orchestrationInfo.CanExecuteLocally(orchestration, out var destination);
orchestrationInfo.CanExecuteLocally(orchestration, opid, out var destination);
(ForkedDestinations ?? (ForkedDestinations = new SortedSet<uint>())).Add(destination);
var message = orchestrationInfo.CreateForkMessage(orchestration);
message.Opid = opid;
@ -575,7 +546,7 @@ namespace ReactiveMachine.Compiler
}
private Task<TReturn2> PerformLocal<TState, TReturn2>(object localOperation)
{
if (!RecordOrReplayCall(MessageType.RequestLocal, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.PerformLocal, out var opid, out var clock, out _))
{
if (!process.States.TryGetValue(typeof(TState), out var stateInfo))
throw new BuilderException($"undefined state {typeof(TState).FullName}.");
@ -586,7 +557,7 @@ namespace ReactiveMachine.Compiler
throw new SynchronizationDisciplineException("to perform a local operation in a locked context, the caller must include its affinity");
}
var destination = stateInfo.AffinityInfo.LocateAffinity(localOperation);
var message = stateInfo.CreateLocalMessage(localOperation, this.Opid, false);
var message = stateInfo.CreateLocalMessage(localOperation, this.Opid, MessageType.PerformLocal);
message.Opid = opid;
message.Clock = clock;
message.Parent = this.Opid;
@ -614,13 +585,13 @@ namespace ReactiveMachine.Compiler
void IContextWithForks.ForkUpdate<TState, TReturn2>(IUpdate<TState, TReturn2> update)
{
if (!RecordOrReplayCall(MessageType.ForkLocal, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.ForkUpdate, out var opid, out var clock, out _))
{
if (!process.States.TryGetValue(typeof(TState), out var stateInfo))
throw new BuilderException($"undefined state {typeof(TState).FullName}.");
var destination = stateInfo.AffinityInfo.LocateAffinity(update);
(ForkedDestinations ?? (ForkedDestinations = new SortedSet<uint>())).Add(destination);
var message = stateInfo.CreateLocalMessage(update, this.Opid, true);
var message = stateInfo.CreateLocalMessage(update, this.Opid, MessageType.ForkUpdate);
message.Opid = opid;
message.Clock = clock;
message.Parent = this.Opid;
@ -639,6 +610,43 @@ namespace ReactiveMachine.Compiler
}
}
Task<bool> IOrchestrationContext.StateExists<TState, TAffinity, TKey>(TKey key)
{
if (!RecordOrReplayCall(MessageType.PerformPing, out var opid, out var clock, out _))
{
var keyInfo = (AffinityInfo<TAffinity, TKey>)process.Affinities[typeof(TAffinity)];
var pkey = new PartitionKey<TKey>()
{
Index = keyInfo.Index,
Key = key,
Comparator = keyInfo.Comparator
};
if (LockSet == null || !LockSet.Contains(pkey))
{
throw new SynchronizationDisciplineException($"{nameof(IOrchestrationContext.StateExists)} can only be called from a context that has already locked the key");
}
if (!process.States.TryGetValue(typeof(TState), out var stateInfo))
throw new BuilderException($"undefined state {typeof(TState).FullName}.");
var destination = pkey.Locate(process);
var message = stateInfo.CreateLocalMessage(pkey, this.Opid, MessageType.PerformPing);
message.Opid = opid;
message.Clock = clock;
message.Parent = this.Opid;
message.LockedByCaller = (LockSet != null);
process.Send(destination, message);
}
if (!ReplayReturn(MessageType.RespondToLocal, opid, out var result))
{
var continuationInfo = new ContinuationInfo<bool>("StateExists", OperationType.Local);
Continuations[opid] = continuationInfo;
return continuationInfo.Tcs.Task;
}
else
{
return Task.FromResult((bool) result);
}
}
public Task<TReturn2> PerformOrchestration<TReturn2>(IReadOrchestration<TReturn2> orchestration)
{
return PerformOrchestration<TReturn2>((IOrchestration<TReturn2>) orchestration);
@ -646,7 +654,7 @@ namespace ReactiveMachine.Compiler
public Task<TReturn2> PerformOrchestration<TReturn2>(IOrchestration<TReturn2> orchestration)
{
if (!RecordOrReplayCall(MessageType.RequestOperation, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.PerformOrchestration, out var opid, out var clock, out _))
{
if (!process.Orchestrations.TryGetValue(orchestration.GetType(), out var orchestrationInfo))
throw new BuilderException($"undefined orchestration type {orchestration.GetType().FullName}.");
@ -658,7 +666,7 @@ namespace ReactiveMachine.Compiler
if (!LockSet.Contains(l))
throw new SynchronizationDisciplineException("to perform an orchestration in a locked context, the caller's affinities must contain the callee's affinities");
}
orchestrationInfo.CanExecuteLocally(orchestration, out uint destination);
orchestrationInfo.CanExecuteLocally(orchestration, opid, out uint destination);
var message = orchestrationInfo.CreateRequestMessage(orchestration);
message.Opid = opid;
message.Clock = clock;
@ -666,7 +674,7 @@ namespace ReactiveMachine.Compiler
message.LockedByCaller = (LockSet != null);
process.Send(destination, message);
}
if (!ReplayReturn(MessageType.RespondToOperation, opid, out var result))
if (!ReplayReturn(MessageType.RespondToOrchestration, opid, out var result))
{
var continuationInfo = new ContinuationInfo<TReturn2>(orchestration.ToString(), OperationType.Orchestration);
Continuations[opid] = continuationInfo;
@ -699,7 +707,7 @@ namespace ReactiveMachine.Compiler
if (!LockSet.Contains(e.UntypedKey))
throw new SynchronizationDisciplineException("to perform an event in a locked context, the caller's affinities must contain the event's affinities");
if (!RecordOrReplayCall(MessageType.PerformEvent, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.PerformEvent, out var opid, out var clock, out _))
{
var destination = effects[0].Locate(process);
var message = eventInfo.CreateMessage(false, evt, 0);
@ -728,7 +736,7 @@ namespace ReactiveMachine.Compiler
if (effects.Count != 0)
{
if (!RecordOrReplayCall(MessageType.ForkEvent, out var opid, out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.ForkEvent, out var opid, out var clock, out _))
{
var destination = effects[0].Locate(process);
(ForkedDestinations ?? (ForkedDestinations = new SortedSet<uint>())).Add(destination);
@ -752,20 +760,55 @@ namespace ReactiveMachine.Compiler
}
}
public Task<T> PerformActivity<T>(IActivityBase<T> activity)
public Task<TReturn2> PerformActivity<TReturn2>(IActivity<TReturn2> activity)
{
if (!process.Activities.TryGetValue(activity.GetType(), out var activityInfo))
throw new BuilderException($"undefined activity type {activity.GetType().FullName}.");
if (LockSet != null)
if (!RecordOrReplayCall(MessageType.PerformActivity, out var opid, out var clock, out _))
{
//TODO : check if the activity declares any affinities
}
if (!process.Activities.TryGetValue(activity.GetType(), out var activityInfo))
throw new BuilderException($"undefined activity type {activity.GetType().FullName}.");
return ((IActivityInfo<T>)activityInfo).Perform(activity, this);
if (activityInfo.RequiresLocks(activity, out var locks))
{
if (LockSet != null)
{
foreach (var l in locks)
if (!LockSet.Contains(l))
throw new SynchronizationDisciplineException("to perform an activity in a locked context, the caller's affinities must contain the callee's affinities");
}
else
{
throw new SynchronizationDisciplineException("orchestrations can not call a locked affinity unless they already hold the lock");
}
}
activityInfo.CanExecuteLocally(activity, opid, out uint destination);
var message = activityInfo.CreateRequestMessage(activity);
message.Opid = opid;
message.Clock = clock;
message.Parent = this.Opid;
message.LockedByCaller = (LockSet != null);
process.Send(destination, message);
}
if (!ReplayReturn(MessageType.RespondToActivity, opid, out var result))
{
var continuationInfo = new ContinuationInfo<TReturn2>(activity.ToString(), OperationType.Activity);
Continuations[opid] = continuationInfo;
return continuationInfo.Tcs.Task;
}
else
{
if (process.Serializer.DeserializeException(result, out var e))
{
return Task.FromException<TReturn2>(e);
}
else
{
return Task.FromResult((TReturn2)result);
}
}
}
public Task Finish()
{
@ -781,12 +824,14 @@ namespace ReactiveMachine.Compiler
int opIdPos = 0;
foreach (var destination in ForkedDestinations)
{
if (!RecordOrReplayCall(MessageType.RequestFinish, out opIds[opIdPos], out var clock, out var instanceId))
if (!RecordOrReplayCall(MessageType.PerformFinish, out opIds[opIdPos], out var clock, out _))
{
var message = new RequestFinish();
message.Opid = opIds[opIdPos];
message.Clock = clock;
message.Parent = this.Opid;
var message = new PerformFinish
{
Opid = opIds[opIdPos],
Clock = clock,
Parent = this.Opid
};
process.Send(destination, message);
}
opIdPos++;
@ -804,7 +849,6 @@ namespace ReactiveMachine.Compiler
return Task.WhenAll(finishAcks);
}
public ILogger Logger => this;
void ILogger.Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
@ -832,5 +876,7 @@ namespace ReactiveMachine.Compiler
{
return process.ApplicationLogger?.BeginScope<TState1>(state);
}
}
}
}

Просмотреть файл

@ -19,10 +19,9 @@ namespace ReactiveMachine.Compiler
void EnterLock(QueuedMessage request);
void ExitLock(IPartitionKey key, ulong opid);
void UpdateLock(ProtocolMessage message);
}
internal class PartitionLock<TAffinity, TKey> :
IPartitionLock
where TAffinity : IAffinitySpec<TAffinity>
@ -47,6 +46,7 @@ namespace ReactiveMachine.Compiler
yield return typeof(ForkEvent);
yield return typeof(PerformEvent);
yield return typeof(AckEvent);
yield return typeof(CommitEvent);
yield return typeof(AcquireLock);
yield return typeof(GrantLock);
yield return typeof(ReleaseLock);
@ -81,177 +81,65 @@ namespace ReactiveMachine.Compiler
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
bool freshlyCreated = false;
bool alreadyProcessed = false;
bool exitsImmediately = false;
var opid = request.Opid;
var localkey = (PartitionKey<TKey>) request.GetPartitionKey(process);
var parent = request.Parent;
var messageType = request.MessageType;
var partitionkey = request.GetPartitionKey(process);
TKey localkey = ((PartitionKey<TKey>)partitionkey).Key;
if (request.LockedByCaller)
{
// lock has already been acquired - so we need not touch the queue
Entering(opid, localkey.Key, request, stopwatch, messageType, parent);
request.OnEnter(process);
request.Enter(process, localkey, stopwatch, out exitsImmediately);
if (exitsImmediately)
{
request.OnExit(process);
}
else
{
((AcquireLock)lockqueues[localkey].Holder).Add(request);
}
}
else
{
if (!lockqueues.TryGetValue(localkey.Key, out var queue))
if (!lockqueues.TryGetValue(localkey, out var queue)
|| queue.IsEmpty)
{
queue = new LockQueue();
queue.Process = process;
freshlyCreated = true;
}
if (queue.IsEmpty)
{
if (Entering(opid, localkey.Key, request, stopwatch, messageType, parent))
request.OnEnter(process);
request.Enter(process, localkey, stopwatch, out exitsImmediately);
if (exitsImmediately)
{
queue.Enqueue(request, stopwatch);
if (freshlyCreated)
lockqueues[localkey.Key] = queue;
request.OnExit(process);
}
else
{
alreadyProcessed = true;
if (queue == null)
{
queue = new LockQueue() { Process = process };
lockqueues[localkey] = queue;
}
queue.Enqueue(request, stopwatch);
}
}
else
{
queue.Enqueue(request, stopwatch);
}
if (!alreadyProcessed && messageType.IsFork())
{
if (!process.FinishStates.TryGetValue(parent, out var finishState))
process.FinishStates[parent] = finishState = new FinishState(process, parent);
finishState.AddPending(opid);
}
}
if (!exitsImmediately && messageType.IsFork())
{
if (!process.FinishStates.TryGetValue(parent, out var finishState))
process.FinishStates[parent] = finishState = new FinishState(process, parent);
finishState.AddPending(opid);
}
}
private bool Entering(ulong opid, TKey localkey, QueuedMessage payload, Stopwatch stopwatch, MessageType messageType, ulong parent)
public void UpdateLock(ProtocolMessage message)
{
process.LockTracer?.Invoke($"p{process.ProcessId:D3} {payload.GetPartitionKey(process)} Enter {payload.Opid}");
var timestamp = process.NextOpid;
switch (messageType)
{
case (MessageType.RequestLocal):
case (MessageType.ForkLocal):
var result = payload.Execute<TKey>(process, opid);
if (messageType == MessageType.RequestLocal)
process.Send(process.GetOrigin(opid), new RespondToLocal() { Opid = opid, Parent = parent, Result = result });
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: opid.ToString(),
name: payload.Payload.ToString(),
parent: parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Local,
duration: stopwatch.Elapsed.TotalMilliseconds
);
process.LockTracer?.Invoke($"p{process.ProcessId:D3} {payload.GetPartitionKey(process)} Exit {payload.Opid}");
return false; // in and out of the lock
case (MessageType.PerformEvent):
case (MessageType.ForkEvent):
{
var req = (PerformEvent)payload;
var effects = req.GetEffects(process);
// apply the event to all the affected states
req.Execute<TKey>(process, opid);
// if we are not the last partition with an effect, forward to next
if (req.Position < effects.Count - 1)
{
var nextReq = req.NextMessage(timestamp);
var destination = nextReq.GetCurrent(process).Locate(process);
process.Send(destination, nextReq);
return true; // stays in the lock
}
// if we are the last partition
else
{
// if we are not running in locked orchestration,
// release all previously locked partitions now
if (! payload.LockedByCaller && effects.Count > 1)
for (int i = 0; i < effects.Count - 1; i++)
{
var key = effects[i].UntypedKey;
var destination = key.Locate(process);
var message = new ReleaseLock() { Key = key, LockOpid = req.Opid };
message.Clock = timestamp;
message.Parent = req.Parent;
message.Opid = opid;
process.Send(destination, message);
}
// return ack to orchestration
if (messageType == MessageType.PerformEvent)
{
process.Send(process.GetOrigin(opid), new AckEvent()
{ Opid = opid, Parent = parent, Clock = timestamp });
}
process.Telemetry?.OnApplicationEvent(
processId: process.ProcessId,
id: opid.ToString(),
name: payload.Payload.ToString(),
parent: parent.ToString(),
opSide: OperationSide.Callee,
opType: OperationType.Event,
duration: stopwatch.Elapsed.TotalMilliseconds
);
process.LockTracer?.Invoke($"p{process.ProcessId:D3} {payload.GetPartitionKey(process)} Exit {payload.Opid}");
return false; // in and out of the lock
}
}
case (MessageType.AcquireLock):
{
var req = (AcquireLock)payload;
// if we are not the last partition to lock send next lock message
if (req.Position < req.LockSet.Count - 1)
{
var nextReq = req.NextMessage(timestamp);
var destination = nextReq.LockSet[nextReq.Position].Locate(process);
process.Send(destination, nextReq);
}
// if we are the last partition to lock return ack to orchestration
else
{
process.Send(process.GetOrigin(opid), new GrantLock()
{ Opid = opid, Parent = parent, Clock = timestamp });
}
return true; // stays in the lock
}
default:
throw new InvalidOperationException("unhandled case for messageType");
}
}
public void ExitLock(IPartitionKey key, ulong exitingOpid)
{
var localkey = ((PartitionKey<TKey>)key).Key;
var localkey = ((PartitionKey<TKey>)message.PartitionKey).Key;
var queue = lockqueues[localkey];
queue.Remove(exitingOpid, out var msg);
if (msg.MessageType.IsFork())
{
process.FinishStates[msg.Parent].RemovePending(exitingOpid);
}
if (!queue.IsEmpty)
{
queue.EnterNextHolder(localkey, Entering);
}
queue.Update(localkey, message);
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license.
using Microsoft.Extensions.Logging;
using ReactiveMachine.Extensions;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -12,7 +13,7 @@ namespace ReactiveMachine.Compiler
{
internal interface IStateInstance
{
object Execute(dynamic op, ulong opid, bool evt);
object Execute(dynamic op, ulong opid, StateOperation operation);
}
internal interface IStateInstance<TState, TAffinity>
@ -24,6 +25,12 @@ namespace ReactiveMachine.Compiler
TState State { get; }
}
public enum StateOperation
{
ReadOrUpdate,
Event,
}
internal class StateContext<TState, TAffinity, TKey> :
IStateInstance,
IStateInstance<TState, TAffinity>,
@ -90,11 +97,11 @@ namespace ReactiveMachine.Compiler
{
if (!process.Orchestrations.TryGetValue(orchestration.GetType(), out var orchestrationInfo))
throw new BuilderException($"undefined orchestration type {orchestration.GetType().FullName}.");
orchestrationInfo.CanExecuteLocally(orchestration, out uint destination);
var opid = process.NextOpid;
var message = orchestrationInfo.CreateForkMessage(orchestration);
message.Opid = opid;
message.Parent = CurrentOpid;
orchestrationInfo.CanExecuteLocally(orchestration, opid, out uint destination);
process.Send(destination, message);
process.Telemetry?.OnApplicationEvent(
@ -114,7 +121,7 @@ namespace ReactiveMachine.Compiler
throw new BuilderException($"undefined state {typeof(TAffinity).FullName}.");
var destination = stateInfo.AffinityInfo.LocateAffinity(update);
var opid = process.NextOpid;
var message = stateInfo.CreateLocalMessage(update, CurrentOpid, true);
var message = stateInfo.CreateLocalMessage(update, CurrentOpid, MessageType.ForkUpdate);
message.Opid = opid;
message.Parent = CurrentOpid;
process.Send(destination, message);
@ -135,25 +142,33 @@ namespace ReactiveMachine.Compiler
process.HostServices.GlobalShutdown();
}
public object Execute(dynamic op, ulong opid, bool evt)
internal void SetInitializationResult(TState state)
{
this.state = state;
}
public object Execute(dynamic op, ulong opid, StateOperation operationType)
{
CurrentOpid = opid;
try
{
if (evt)
switch(operationType)
{
dynamic s = State;
s.On(this, op);
return null;
}
else
{
return op.Execute(this);
case StateOperation.ReadOrUpdate:
return op.Execute(this);
case StateOperation.Event:
dynamic s = State;
s.On(this, op);
return null;
default:
throw new Exception("internal error: unhandled case");
}
}
catch (Exception e)
{
// TODO for events, what with that?
// TODO for events and initialize, what with that?
return process.Serializer.SerializeException(e);
}
finally
@ -177,7 +192,6 @@ namespace ReactiveMachine.Compiler
return (TConfiguration)process.Configurations[typeof(TConfiguration)];
}
public ILogger Logger => this;
void ILogger.Log<TState1>(LogLevel logLevel, EventId eventId, TState1 state, Exception exception, Func<TState1, Exception, string> formatter)

Просмотреть файл

@ -5,9 +5,9 @@ using System;
using System.Collections.Generic;
using System.Text;
namespace ReactiveMachine.Compiler
namespace ReactiveMachine.Util
{
internal static class JumpConsistentHash
public static class JumpConsistentHash
{
public static uint Compute(ulong x, uint num_buckets)
{

Просмотреть файл

@ -60,6 +60,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReactiveMachine.Abstraction
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Counter.Service.OnFunctions", "Applications\Counter.Service.OnFunctions\Counter.Service.OnFunctions.csproj", "{72BBCED5-7A28-4776-997C-652F7C44B9F8}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleLoadTest.Service", "Tests\SimpleLoadTest.Service\SimpleLoadTest.Service.csproj", "{59995284-321E-4985-BE7C-A98DAD1A722B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests.OnFunctions", "Tests\Tests.OnFunctions\Tests.OnFunctions.csproj", "{283C895B-07AC-4051-B3C8-C60E5DFF6206}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -252,6 +256,22 @@ Global
{72BBCED5-7A28-4776-997C-652F7C44B9F8}.Release|Any CPU.Build.0 = Release|Any CPU
{72BBCED5-7A28-4776-997C-652F7C44B9F8}.Release|x64.ActiveCfg = Release|Any CPU
{72BBCED5-7A28-4776-997C-652F7C44B9F8}.Release|x64.Build.0 = Release|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Debug|x64.ActiveCfg = Debug|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Debug|x64.Build.0 = Debug|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Release|Any CPU.Build.0 = Release|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Release|x64.ActiveCfg = Release|Any CPU
{59995284-321E-4985-BE7C-A98DAD1A722B}.Release|x64.Build.0 = Release|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Debug|Any CPU.Build.0 = Debug|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Debug|x64.ActiveCfg = Debug|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Debug|x64.Build.0 = Debug|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Release|Any CPU.ActiveCfg = Release|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Release|Any CPU.Build.0 = Release|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Release|x64.ActiveCfg = Release|Any CPU
{283C895B-07AC-4051-B3C8-C60E5DFF6206}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -278,6 +298,8 @@ Global
{EE027279-0902-4125-9BF7-54F80C6A9A0C} = {41BF1A48-68C2-487A-8FA3-B0BEAFA6189A}
{D4699A32-C989-4DCE-895D-9D2B6D84799A} = {2CED0117-DA88-47CF-878D-6AC0DF18ECB6}
{72BBCED5-7A28-4776-997C-652F7C44B9F8} = {41BF1A48-68C2-487A-8FA3-B0BEAFA6189A}
{59995284-321E-4985-BE7C-A98DAD1A722B} = {6A84548F-C66D-4E6F-B8A7-929C645BA54F}
{283C895B-07AC-4051-B3C8-C60E5DFF6206} = {6A84548F-C66D-4E6F-B8A7-929C645BA54F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {76C173BC-ED80-49D2-96C1-636C1DD366AF}

Просмотреть файл

@ -26,7 +26,7 @@ namespace ReactiveMachine.TelemetryBlobWriter
#else
"Release",
#endif
"ReactiveMachine.Tools.Taskometer.exe");
"ReactiveMachine.Taskometer.exe");
taskoMeter.StartInfo.FileName = executable;
taskoMeter.StartInfo.UseShellExecute = false;

Просмотреть файл

@ -16,6 +16,9 @@ namespace LocalTests
throw new TestFailureException($"expected: {expected} actual: {actual}");
}
}
public static void Fail(string msg = "Assertion Failed")
{
throw new TestFailureException(msg);
}
}
}

Просмотреть файл

@ -10,9 +10,12 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace LocalTests
namespace LocalTests.BasicExamples
{
[RandomPlacement]
// The snippets in this file were used for preparing documentation and presentations
// Most of them are not executed by tests or samples
[Distribute]
public class CopyBlob : IOrchestration<UnitType>
{
public string From;
@ -35,7 +38,7 @@ namespace LocalTests
}
}
public class ReadBlob : IAtLeastOnceActivity<string>
public class ReadBlob : IActivity<string>
{
public string Path;
public TimeSpan TimeLimit => TimeSpan.FromSeconds(30);
@ -46,7 +49,7 @@ namespace LocalTests
}
}
public class WriteBlob : IAtLeastOnceActivity<string>
public class WriteBlob : IActivity<string>
{
public string Path;
public string Content;
@ -83,7 +86,7 @@ namespace LocalTests
}
}
[RandomPlacement]
[Distribute]
public class CopyBlob2 : IOrchestration<UnitType>, IMultiple<IPathAffinity, string>
{
public string From;

Просмотреть файл

@ -20,6 +20,7 @@ namespace LocalTests.Counter
[DataMember]
public uint CounterId { get; set; }
[CreateIfNotExists]
public UnitType Execute(IUpdateContext<Counter2> context)
{
context.Logger.LogDebug($"IncrementThenRead({CounterId}) Start");

Просмотреть файл

@ -10,7 +10,7 @@ using System.Threading.Tasks;
namespace LocalTests.Counter
{
[DataContract]
public class TestForks : TestTemplate
public class TestForks : TestOrchestration
{
protected override async Task Run(IOrchestrationContext context)
{

Просмотреть файл

@ -10,7 +10,7 @@ using System.Threading.Tasks;
namespace LocalTests.Counter
{
[DataContract]
public class TestRandomActivityCount : TestTemplate
public class TestRandomActivityCount : TestOrchestration
{
protected override async Task Run(IOrchestrationContext context)

Просмотреть файл

@ -12,6 +12,7 @@
<ProjectReference Include="..\..\Hosts\SimpleHost\EmulatorHost.csproj" />
<ProjectReference Include="..\..\ReactiveMachine.Abstractions\ReactiveMachine.Abstractions.csproj" />
<ProjectReference Include="..\..\ReactiveMachine.Compiler\ReactiveMachine.Compiler.csproj" />
<ProjectReference Include="..\SimpleLoadTest.Service\SimpleLoadTest.Service.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -29,8 +29,6 @@ namespace LocalTests.Locks
public void On(ISubscriptionContext<Places> context, IncrementEvent evt)
{
Balance++;
context.Logger.LogInformation($"Increment {context.Key} to {Balance} in response to {evt}");
}
}
@ -51,6 +49,7 @@ namespace LocalTests.Locks
public int NewBalance;
[CreateIfNotExists]
public UnitType Execute(IUpdateContext<State> context)
{
context.State.Balance = NewBalance;

Просмотреть файл

@ -9,7 +9,7 @@ using System.Threading.Tasks;
namespace LocalTests.Locks
{
public class TestEvents : TestTemplate
public class TestEvents : TestOrchestration
{
public IEnumerable<Places> AllPlaces()
{

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше