From 25f634a22edd6831ee501bd62d42340b8c007418 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 9 May 2017 19:26:49 -0700 Subject: [PATCH] Initial commit of Common Runtime for Applications (CRA). --- .gitattributes | 63 ++ .gitignore | 1 + .../CRA.ClientLibrary.csproj | 111 ++++ .../Definitions/ConnectionInfo.cs | 72 +++ .../Definitions/ConnectionInitiator.cs | 18 + .../Definitions/CoralTaskMessageType.cs | 12 + .../Definitions/ErrorCodes.cs | 45 ++ .../Definitions/ObjectWrapper.cs | 8 + .../Main/CRAClientLibrary.cs | 531 ++++++++++++++++ src/CRA.ClientLibrary/Main/CRAWorker.cs | 587 ++++++++++++++++++ .../Processes/DetachedProcess.cs | 340 ++++++++++ .../Processes/IAsyncProcessInputEndpoint.cs | 31 + .../Processes/IAsyncProcessOutputEndpoint.cs | 31 + src/CRA.ClientLibrary/Processes/IProcess.cs | 75 +++ .../Processes/IProcessInputEndpoint.cs | 28 + .../Processes/IProcessOutputEndpoint.cs | 28 + .../Processes/ProcessBase.cs | 322 ++++++++++ .../Properties/AssemblyInfo.cs | 35 ++ .../Tables/ConnectionTable.cs | 128 ++++ .../Tables/ConnectionTableManager.cs | 76 +++ src/CRA.ClientLibrary/Tables/EndpointTable.cs | 100 +++ .../Tables/EndpointTableManager.cs | 105 ++++ src/CRA.ClientLibrary/Tables/ProcessTable.cs | 233 +++++++ .../Tables/ProcessTableManager.cs | 85 +++ .../Utilities/AssemblyResolver.cs | 61 ++ .../Utilities/AssemblyResolverClient.cs | 45 ++ .../Utilities/AssemblyUtils.cs | 280 +++++++++ .../Utilities/ClosureEliminator.cs | 40 ++ .../Utilities/SerializationHelper.cs | 70 +++ .../Utilities/StreamCommunicator.cs | 139 +++++ src/CRA.ClientLibrary/app.config | 15 + src/CRA.ClientLibrary/packages.config | 16 + src/CRA.Worker/App.config | 33 + src/CRA.Worker/CRA.Worker.csproj | 67 ++ src/CRA.Worker/Program.cs | 52 ++ src/CRA.Worker/Properties/AssemblyInfo.cs | 36 ++ src/CRA.Worker/privatesettings.config.example | 3 + src/CRA.sln | 43 ++ src/Samples/ConnectionPair/App.config | 25 + .../ConnectionPair/ConnectionPair.csproj | 65 ++ src/Samples/ConnectionPair/MyAsyncInput.cs | 45 ++ src/Samples/ConnectionPair/MyAsyncOutput.cs | 54 ++ src/Samples/ConnectionPair/MyFirstProcess.cs | 18 + src/Samples/ConnectionPair/MySecondProcess.cs | 18 + src/Samples/ConnectionPair/Program.cs | 24 + .../ConnectionPair/Properties/AssemblyInfo.cs | 35 ++ .../privatesettings.config.example | 3 + 47 files changed, 4252 insertions(+) create mode 100644 .gitattributes create mode 100644 src/CRA.ClientLibrary/CRA.ClientLibrary.csproj create mode 100644 src/CRA.ClientLibrary/Definitions/ConnectionInfo.cs create mode 100644 src/CRA.ClientLibrary/Definitions/ConnectionInitiator.cs create mode 100644 src/CRA.ClientLibrary/Definitions/CoralTaskMessageType.cs create mode 100644 src/CRA.ClientLibrary/Definitions/ErrorCodes.cs create mode 100644 src/CRA.ClientLibrary/Definitions/ObjectWrapper.cs create mode 100644 src/CRA.ClientLibrary/Main/CRAClientLibrary.cs create mode 100644 src/CRA.ClientLibrary/Main/CRAWorker.cs create mode 100644 src/CRA.ClientLibrary/Processes/DetachedProcess.cs create mode 100644 src/CRA.ClientLibrary/Processes/IAsyncProcessInputEndpoint.cs create mode 100644 src/CRA.ClientLibrary/Processes/IAsyncProcessOutputEndpoint.cs create mode 100644 src/CRA.ClientLibrary/Processes/IProcess.cs create mode 100644 src/CRA.ClientLibrary/Processes/IProcessInputEndpoint.cs create mode 100644 src/CRA.ClientLibrary/Processes/IProcessOutputEndpoint.cs create mode 100644 src/CRA.ClientLibrary/Processes/ProcessBase.cs create mode 100644 src/CRA.ClientLibrary/Properties/AssemblyInfo.cs create mode 100644 src/CRA.ClientLibrary/Tables/ConnectionTable.cs create mode 100644 src/CRA.ClientLibrary/Tables/ConnectionTableManager.cs create mode 100644 src/CRA.ClientLibrary/Tables/EndpointTable.cs create mode 100644 src/CRA.ClientLibrary/Tables/EndpointTableManager.cs create mode 100644 src/CRA.ClientLibrary/Tables/ProcessTable.cs create mode 100644 src/CRA.ClientLibrary/Tables/ProcessTableManager.cs create mode 100644 src/CRA.ClientLibrary/Utilities/AssemblyResolver.cs create mode 100644 src/CRA.ClientLibrary/Utilities/AssemblyResolverClient.cs create mode 100644 src/CRA.ClientLibrary/Utilities/AssemblyUtils.cs create mode 100644 src/CRA.ClientLibrary/Utilities/ClosureEliminator.cs create mode 100644 src/CRA.ClientLibrary/Utilities/SerializationHelper.cs create mode 100644 src/CRA.ClientLibrary/Utilities/StreamCommunicator.cs create mode 100644 src/CRA.ClientLibrary/app.config create mode 100644 src/CRA.ClientLibrary/packages.config create mode 100644 src/CRA.Worker/App.config create mode 100644 src/CRA.Worker/CRA.Worker.csproj create mode 100644 src/CRA.Worker/Program.cs create mode 100644 src/CRA.Worker/Properties/AssemblyInfo.cs create mode 100644 src/CRA.Worker/privatesettings.config.example create mode 100644 src/CRA.sln create mode 100644 src/Samples/ConnectionPair/App.config create mode 100644 src/Samples/ConnectionPair/ConnectionPair.csproj create mode 100644 src/Samples/ConnectionPair/MyAsyncInput.cs create mode 100644 src/Samples/ConnectionPair/MyAsyncOutput.cs create mode 100644 src/Samples/ConnectionPair/MyFirstProcess.cs create mode 100644 src/Samples/ConnectionPair/MySecondProcess.cs create mode 100644 src/Samples/ConnectionPair/Program.cs create mode 100644 src/Samples/ConnectionPair/Properties/AssemblyInfo.cs create mode 100644 src/Samples/ConnectionPair/privatesettings.config.example diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..1ff0c42 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,63 @@ +############################################################################### +# Set default behavior to automatically normalize line endings. +############################################################################### +* text=auto + +############################################################################### +# Set default behavior for command prompt diff. +# +# This is need for earlier builds of msysgit that does not have it on by +# default for csharp files. +# Note: This is only used by command line +############################################################################### +#*.cs diff=csharp + +############################################################################### +# Set the merge driver for project and solution files +# +# Merging from the command prompt will add diff markers to the files if there +# are conflicts (Merging from VS is not affected by the settings below, in VS +# the diff markers are never inserted). Diff markers may cause the following +# file extensions to fail to load in VS. An alternative would be to treat +# these files as binary and thus will always conflict and require user +# intervention with every merge. To do so, just uncomment the entries below +############################################################################### +#*.sln merge=binary +#*.csproj merge=binary +#*.vbproj merge=binary +#*.vcxproj merge=binary +#*.vcproj merge=binary +#*.dbproj merge=binary +#*.fsproj merge=binary +#*.lsproj merge=binary +#*.wixproj merge=binary +#*.modelproj merge=binary +#*.sqlproj merge=binary +#*.wwaproj merge=binary + +############################################################################### +# behavior for image files +# +# image files are treated as binary by default. +############################################################################### +#*.jpg binary +#*.png binary +#*.gif binary + +############################################################################### +# diff behavior for common document formats +# +# Convert binary document formats to text before diffing them. This feature +# is only available from the command line. Turn it on by uncommenting the +# entries below. +############################################################################### +#*.doc diff=astextplain +#*.DOC diff=astextplain +#*.docx diff=astextplain +#*.DOCX diff=astextplain +#*.dot diff=astextplain +#*.DOT diff=astextplain +#*.pdf diff=astextplain +#*.PDF diff=astextplain +#*.rtf diff=astextplain +#*.RTF diff=astextplain diff --git a/.gitignore b/.gitignore index f1e3d20..fd3b0eb 100644 --- a/.gitignore +++ b/.gitignore @@ -250,3 +250,4 @@ paket-files/ # JetBrains Rider .idea/ *.sln.iml +privatesettings.config diff --git a/src/CRA.ClientLibrary/CRA.ClientLibrary.csproj b/src/CRA.ClientLibrary/CRA.ClientLibrary.csproj new file mode 100644 index 0000000..f486253 --- /dev/null +++ b/src/CRA.ClientLibrary/CRA.ClientLibrary.csproj @@ -0,0 +1,111 @@ + + + + + Debug + AnyCPU + {EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A} + Library + Properties + CRA.ClientLibrary + CRA.ClientLibrary + v4.5.2 + 512 + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\packages\aqua-core.3.0.0\lib\net45\Aqua.dll + True + + + ..\packages\Microsoft.Azure.KeyVault.Core.1.0.0\lib\net40\Microsoft.Azure.KeyVault.Core.dll + + + ..\packages\Microsoft.Data.Edm.5.8.2\lib\net40\Microsoft.Data.Edm.dll + + + ..\packages\Microsoft.Data.OData.5.8.2\lib\net40\Microsoft.Data.OData.dll + + + ..\packages\Microsoft.Data.Services.Client.5.8.2\lib\net40\Microsoft.Data.Services.Client.dll + + + ..\packages\WindowsAzure.Storage.8.1.1\lib\net45\Microsoft.WindowsAzure.Storage.dll + + + ..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll + + + ..\packages\Remote.Linq.5.3.1\lib\net45\Remote.Linq.dll + + + + + + + ..\packages\System.Spatial.5.8.2\lib\net40\System.Spatial.dll + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Designer + + + Designer + + + + \ No newline at end of file diff --git a/src/CRA.ClientLibrary/Definitions/ConnectionInfo.cs b/src/CRA.ClientLibrary/Definitions/ConnectionInfo.cs new file mode 100644 index 0000000..ba91938 --- /dev/null +++ b/src/CRA.ClientLibrary/Definitions/ConnectionInfo.cs @@ -0,0 +1,72 @@ +namespace CRA.ClientLibrary +{ + /// + /// Describes a connection between two process/endpoint pairs + /// + public class ConnectionInfo + { + /// + /// Connection is from this process + /// + public string FromProcess { get; set; } + /// + /// Connection is from this output endpoint + /// + public string FromEndpoint { get; set; } + /// + /// Connection is to this process + /// + public string ToProcess { get; set; } + /// + /// Connection is to this input endpoint + /// + public string ToEndpoint { get; set; } + + + /// + /// Constructor + /// + /// + /// + /// + /// + public ConnectionInfo(string FromProcess, string FromEndpoint, string ToProcess, string ToEndpoint) + { + this.FromProcess = FromProcess; + this.FromEndpoint = FromEndpoint; + this.ToProcess = ToProcess; + this.ToEndpoint = ToEndpoint; + } + + /// + /// String representation of a CRA conection + /// + /// + public override string ToString() + { + return new { FromProcess = FromProcess, FromEndpoint = FromEndpoint, ToProcess = ToProcess, ToEndpoint = ToEndpoint }.ToString(); + } + + /// + /// Check if two instances are equal + /// + /// + /// + public override bool Equals(object obj) + { + var otherConnectionInfo = (ConnectionInfo)obj; + if (otherConnectionInfo == null) return false; + + return + (FromProcess == otherConnectionInfo.FromProcess) && + (ToProcess == otherConnectionInfo.ToProcess) && + (FromEndpoint == otherConnectionInfo.FromEndpoint) && + (ToEndpoint == otherConnectionInfo.ToEndpoint); + } + + public override int GetHashCode() + { + return FromProcess.GetHashCode() ^ FromEndpoint.GetHashCode() ^ ToProcess.GetHashCode() ^ ToEndpoint.GetHashCode(); + } + } +} diff --git a/src/CRA.ClientLibrary/Definitions/ConnectionInitiator.cs b/src/CRA.ClientLibrary/Definitions/ConnectionInitiator.cs new file mode 100644 index 0000000..0c22a18 --- /dev/null +++ b/src/CRA.ClientLibrary/Definitions/ConnectionInitiator.cs @@ -0,0 +1,18 @@ +namespace CRA.ClientLibrary +{ + /// + /// Direction of data flow + /// + public enum ConnectionInitiator + { + /// + /// Initiate connection from "from" process + /// + FromSide, + + /// + /// Initiate connection from "to" process + /// + ToSide + } +} diff --git a/src/CRA.ClientLibrary/Definitions/CoralTaskMessageType.cs b/src/CRA.ClientLibrary/Definitions/CoralTaskMessageType.cs new file mode 100644 index 0000000..4e7a8ed --- /dev/null +++ b/src/CRA.ClientLibrary/Definitions/CoralTaskMessageType.cs @@ -0,0 +1,12 @@ +namespace CRA.ClientLibrary +{ + internal enum CRATaskMessageType + { + LOAD_PROCESS, + CONNECT_PROCESS_INITIATOR, + CONNECT_PROCESS_RECEIVER, + CONNECT_PROCESS_INITIATOR_REVERSE, + CONNECT_PROCESS_RECEIVER_REVERSE, + }; + +} diff --git a/src/CRA.ClientLibrary/Definitions/ErrorCodes.cs b/src/CRA.ClientLibrary/Definitions/ErrorCodes.cs new file mode 100644 index 0000000..589547a --- /dev/null +++ b/src/CRA.ClientLibrary/Definitions/ErrorCodes.cs @@ -0,0 +1,45 @@ +namespace CRA.ClientLibrary +{ + /// + /// Error codes for CRA method calls + /// + public enum CRAErrorCode : int + { + /// + /// Success + /// + Success, + /// + /// Process not found + /// + ProcessNotFound, + /// + /// Process output endpoint not found + /// + ProcessOutputNotFound, + /// + /// Process input endpoint not found + /// + ProcessInputNotFound, + /// + /// Process already exists + /// + ProcessAlreadyExists, + /// + /// Recovering + /// + ServerRecovering, + /// + /// Race condition adding connection + /// + ConnectionAdditionRace, + /// + /// Process endpoint (input or output) not found + /// + ProcessEndpointNotFound, + /// + /// Failed to establish a connection + /// + ConnectionEstablishFailed + }; +} diff --git a/src/CRA.ClientLibrary/Definitions/ObjectWrapper.cs b/src/CRA.ClientLibrary/Definitions/ObjectWrapper.cs new file mode 100644 index 0000000..96048e6 --- /dev/null +++ b/src/CRA.ClientLibrary/Definitions/ObjectWrapper.cs @@ -0,0 +1,8 @@ +namespace CRA.ClientLibrary +{ + internal struct ObjectWrapper + { + public string type; + public string data; + } +} diff --git a/src/CRA.ClientLibrary/Main/CRAClientLibrary.cs b/src/CRA.ClientLibrary/Main/CRAClientLibrary.cs new file mode 100644 index 0000000..9585d59 --- /dev/null +++ b/src/CRA.ClientLibrary/Main/CRAClientLibrary.cs @@ -0,0 +1,531 @@ +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Blob; +using Microsoft.WindowsAzure.Storage.Table; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Configuration; +using System.IO; +using System.Linq.Expressions; +using System.Net.Sockets; +using System.Text; +using System.Text.RegularExpressions; + +namespace CRA.ClientLibrary +{ + + /// + /// Client library for Common Runtime for Applications (CRA) + /// + public class CRAClientLibrary + { + CRAWorker _localWorker; + + // Azure storage clients + string _storageConnectionString; + CloudStorageAccount _storageAccount; + + CloudBlobClient _blobClient; + CloudTableClient _tableClient; + + CloudTable _processTable; + CloudTable _connectionTable; + + internal ProcessTableManager _processTableManager; + EndpointTableManager _endpointTableManager; + ConnectionTableManager _connectionTableManager; + + Type aquaType = typeof(Aqua.TypeSystem.ConstructorInfo); + + /// + /// Create an instance of the client library for Common Runtime for Applications (CRA) + /// + public CRAClientLibrary() : this("", null) + { + + } + + /// + /// Create an instance of the client library for Common Runtime for Applications (CRA) + /// + /// Optional storage account to use for CRA metadata, if + /// not specified, it will use the appSettings key named StorageConnectionString in app.config + public CRAClientLibrary(string storageConnectionString) : this(storageConnectionString, null) + { + + } + + /// + /// Create an instance of the client library for Common Runtime for Applications (CRA) + /// + /// Optional storage account to use for CRA metadata, if + /// not specified, it will use the appSettings key named StorageConnectionString in app.config + /// Local worker if any + public CRAClientLibrary(string storageConnectionString, CRAWorker localWorker) + { + _localWorker = localWorker; + + if (storageConnectionString == "") + _storageConnectionString = ConfigurationManager.AppSettings.Get("StorageConnectionString"); + else + _storageConnectionString = storageConnectionString; + + _storageAccount = CloudStorageAccount.Parse(_storageConnectionString); + + _blobClient = _storageAccount.CreateCloudBlobClient(); + _tableClient = _storageAccount.CreateCloudTableClient(); + + _processTableManager = new ProcessTableManager(_storageConnectionString); + _endpointTableManager = new EndpointTableManager(_storageConnectionString); + _connectionTableManager = new ConnectionTableManager(_storageConnectionString); + + _processTable = CreateTableIfNotExists("processtableforcra"); + _connectionTable = CreateTableIfNotExists("connectiontableforcra"); + } + + /// + /// Define a process type and register with CRA. + /// + /// Name of the process type + /// Lambda that describes how to instantiate the process, taking in an object as parameter + public CRAErrorCode DefineProcess(string processDefinition, Expression> creator) + { + if (!Regex.IsMatch(processDefinition, @"^(([a-z\d]((-(?=[a-z\d]))|([a-z\d])){2,62})|(\$root))$")) + { + throw new InvalidOperationException("Invalid name for process definition. Names have to be all lowercase, cannot contain special characters."); + } + + CloudBlobContainer container = _blobClient.GetContainerReference(processDefinition); + container.CreateIfNotExists(); + var blockBlob = container.GetBlockBlobReference("binaries"); + CloudBlobStream blobStream = blockBlob.OpenWrite(); + AssemblyUtils.WriteAssembliesToStream(blobStream); + blobStream.Close(); + + // Add metadata + var newRow = new ProcessTable("", processDefinition, processDefinition, "", 0, creator, null); + TableOperation insertOperation = TableOperation.InsertOrReplace(newRow); + _processTable.Execute(insertOperation); + + return CRAErrorCode.Success; + } + + /// + /// Resets the cluster and deletes all knowledge of any CRA instances + /// + public void Reset() + { + _connectionTable.DeleteIfExists(); + _processTable.DeleteIfExists(); + _endpointTableManager.DeleteTable(); + } + + /// + /// Not yet implemented + /// + /// + public void DeployInstance(string instanceName) + { + throw new NotImplementedException(); + } + + + /// + /// Instantiate a process on a CRA instance. + /// + /// Name of the CRA instance on which process is instantiated + /// Name of the process (particular instance) + /// Definition of the process (type) + /// Parameters to be passed to the process in its constructor (serializable object) + /// Status of the command + public CRAErrorCode InstantiateProcess(string instanceName, string processName, string processDefinition, object processParameter) + { + var procDefRow = ProcessTable.GetRowForProcessDefinition(_processTable, processDefinition); + + // Add metadata + var newRow = new ProcessTable(instanceName, processName, processDefinition, "", 0, + procDefRow.ProcessCreateAction, + SerializationHelper.SerializeObject(processParameter)); + TableOperation insertOperation = TableOperation.InsertOrReplace(newRow); + _processTable.Execute(insertOperation); + + CRAErrorCode result = CRAErrorCode.Success; + + ProcessTable instanceRow; + try + { + instanceRow = ProcessTable.GetRowForInstance(_processTable, instanceName); + + // Send request to CRA instance + TcpClient client = new TcpClient(instanceRow.Address, instanceRow.Port); + NetworkStream stream = client.GetStream(); + stream.WriteInteger((int)CRATaskMessageType.LOAD_PROCESS); + stream.WriteByteArray(Encoding.UTF8.GetBytes(processName)); + stream.WriteByteArray(Encoding.UTF8.GetBytes(processDefinition)); + stream.WriteByteArray(Encoding.UTF8.GetBytes(newRow.ProcessParameter)); + result = (CRAErrorCode) stream.ReadInteger(); + if (result != 0) + { + Console.WriteLine("Process was logically loaded. However, we received an error code from the hosting CRA instance: " + result); + } + } + catch + { + Console.WriteLine("The CRA instance appears to be down. Restart it and this process will be instantiated automatically"); + } + return result; + } + + /// + /// Register caller as a process with given name, dummy temp instance + /// + /// + /// + public DetachedProcess RegisterAsProcess(string processName) + { + return new DetachedProcess(processName, "", this); + } + + /// + /// Register caller as a process with given name, given CRA instance name + /// + /// + /// + /// + public DetachedProcess RegisterAsProcess(string processName, string instanceName) + { + return new DetachedProcess(processName, instanceName, this); + } + + /// + /// Register CRA instance name + /// + /// + /// + /// + public void RegisterInstance(string instanceName, string address, int port) + { + _processTableManager.RegisterInstance(instanceName, address, port); + } + + /// + /// Delete CRA instance name + /// + /// + public void DeleteInstance(string instanceName) + { + _processTableManager.DeleteInstance(instanceName); + } + + + /// + /// + /// + /// + /// + public void DeleteProcess(string processName, string instanceName) + { + var entity = new DynamicTableEntity(instanceName, processName); + entity.ETag = "*"; + TableOperation deleteOperation = TableOperation.Delete(entity); + _processTable.Execute(deleteOperation); + } + + /// + /// Add endpoint to the appropriate CRA metadata table + /// + /// + /// + /// + /// + public void AddEndpoint(string processName, string endpointName, bool isInput, bool isAsync) + { + _endpointTableManager.AddEndpoint(processName, endpointName, isInput, isAsync); + } + + /// + /// Delete endpoint + /// + /// + /// + public void DeleteEndpoint(string processName, string endpointName) + { + _endpointTableManager.DeleteEndpoint(processName, endpointName); + } + + /// + /// Load a process on the local instance + /// + /// + /// + /// + /// + /// + /// + public IProcess LoadProcess(string processName, string processDefinition, string processParameter, string instanceName, ConcurrentDictionary table) + { + CloudBlobContainer container = _blobClient.GetContainerReference(processDefinition); + container.CreateIfNotExists(); + var blockBlob = container.GetBlockBlobReference("binaries"); + Stream blobStream = blockBlob.OpenRead(); + AssemblyUtils.LoadAssembliesFromStream(blobStream); + blobStream.Close(); + + var row = ProcessTable.GetRowForProcessDefinition(_processTable, processDefinition); + + // CREATE THE PROCESS + var process = row.GetProcessCreateAction()(); + + // LATCH CALLBACKS TO POPULATE ENDPOINT TABLE + process.OnAddInputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, true, false)); + process.OnAddOutputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, false, false)); + process.OnAddAsyncInputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, true, true)); + process.OnAddAsyncOutputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, false, true)); + + //ADD TO TABLE + if (table != null) + { + table.AddOrUpdate(processName, process, (procName, oldProc) => { oldProc.Dispose(); return process; }); + + process.OnDispose(() => + { + // Delete all endpoints of the process + foreach (var key in process.InputEndpoints) + { + _endpointTableManager.DeleteEndpoint(processName, key.Key); + } + foreach (var key in process.AsyncInputEndpoints) + { + _endpointTableManager.DeleteEndpoint(processName, key.Key); + } + foreach (var key in process.OutputEndpoints) + { + _endpointTableManager.DeleteEndpoint(processName, key.Key); + } + foreach (var key in process.AsyncOutputEndpoints) + { + _endpointTableManager.DeleteEndpoint(processName, key.Key); + } + + IProcess old; + if (!table.TryRemove(processName, out old)) + { + Console.WriteLine("Unable to remove process on disposal"); + } + var entity = new DynamicTableEntity(instanceName, processName); + entity.ETag = "*"; + TableOperation deleteOperation = TableOperation.Delete(entity); + _processTable.Execute(deleteOperation); + }); + } + + // INITIALIZE + if ((ProcessBase)process != null) + { + ((ProcessBase)process).SetProcessName(processName); + ((ProcessBase)process).SetClientLibrary(this); + } + + var par = SerializationHelper.DeserializeObject(processParameter); + process.Initialize(par); + + return process; + } + + /// + /// Load all processes for the given instance name. + /// + /// + /// + public ConcurrentDictionary LoadAllProcesses(string thisInstanceName) + { + ConcurrentDictionary result = new ConcurrentDictionary(); + var rows = ProcessTable.GetAllRowsForInstance(_processTable, thisInstanceName); + + foreach (var row in rows) + { + if (row.ProcessName == "") continue; + LoadProcess(row.ProcessName, row.ProcessDefinition, row.ProcessParameter, thisInstanceName, result); + } + + return result; + } + + /// + /// Add connection info to metadata table + /// + /// + /// + /// + /// + public void AddConnectionInfo(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint) + { + _connectionTableManager.AddConnection(fromProcessName, fromEndpoint, toProcessName, toEndpoint); + } + + + /// + /// Delete connection info from metadata table + /// + /// + /// + /// + /// + public void DeleteConnectionInfo(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint) + { + _connectionTableManager.DeleteConnection(fromProcessName, fromEndpoint, toProcessName, toEndpoint); + } + + /// + /// Connect one CRA process to another, via pre-defined endpoints. We contact the "from" process + /// to initiate the creation of the link. + /// + /// Name of the process from which connection is being made + /// Name of the endpoint on the fromProcess, from which connection is being made + /// Name of the process to which connection is being made + /// Name of the endpoint on the toProcess, to which connection is being made + /// Which process initiates the connection + /// Status of the Connect operation + public CRAErrorCode Connect(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint, ConnectionInitiator direction = ConnectionInitiator.FromSide) + { + // Tell from process to establish connection + // Send request to CRA instance + + // Get instance for source process + var _row = direction == ConnectionInitiator.FromSide ? + ProcessTable.GetRowForProcess(_processTable, fromProcessName) : + ProcessTable.GetRowForProcess(_processTable, toProcessName); + + + // Check that process and endpoints are valid and existing + if (!_processTableManager.ExistsProcess(fromProcessName) || !_processTableManager.ExistsProcess(toProcessName)) + { + return CRAErrorCode.ProcessNotFound; + } + + // Make the connection information stable + _connectionTableManager.AddConnection(fromProcessName, fromEndpoint, toProcessName, toEndpoint); + + // We now try best-effort to tell the CRA instance of this connection + + CRAErrorCode result = CRAErrorCode.Success; + + if (_localWorker != null) + { + if (_localWorker.InstanceName == _row.InstanceName) + { + return _localWorker.Connect_InitiatorSide(fromProcessName, fromEndpoint, + toProcessName, toEndpoint, direction == ConnectionInitiator.ToSide); + } + } + + + // Send request to CRA instance + TcpClient client = null; + try + { + + // Get address and port for instance, using row with process = "" + var row = ProcessTable.GetRowForInstance(_processTable, _row.InstanceName); + + client = new TcpClient(row.Address, row.Port); + NetworkStream stream = client.GetStream(); + + if (direction == ConnectionInitiator.FromSide) + stream.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_INITIATOR); + else + stream.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_INITIATOR_REVERSE); + + stream.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessName)); + stream.WriteByteArray(Encoding.UTF8.GetBytes(fromEndpoint)); + stream.WriteByteArray(Encoding.UTF8.GetBytes(toProcessName)); + stream.WriteByteArray(Encoding.UTF8.GetBytes(toEndpoint)); + + result = (CRAErrorCode) stream.ReadInteger(); + if (result != 0) + { + Console.WriteLine("Connection was logically established. However, the client received an error code from the connection-initiating CRA instance: " + result); + } + } + catch + { + Console.WriteLine("The connection-initiating CRA instance appears to be down or could not be found. Restart it and this connection will be completed automatically"); + } + return (CRAErrorCode)result; + } + + /// + /// Get a list of all output endpoint names for a given process + /// + /// + /// + public IEnumerable GetOutputEndpoints(string processName) + { + return _endpointTableManager.GetOutputEndpoints(processName); + } + + /// + /// Get a list of all input endpoint names for a given process + /// + /// + /// + public IEnumerable GetInputEndpoints(string processName) + { + return _endpointTableManager.GetInputEndpoints(processName); + } + + /// + /// Get all outgoing connection from a given process + /// + /// + /// + public IEnumerable GetConnectionsFromProcess(string processName) + { + return _connectionTableManager.GetConnectionsFromProcess(processName); + } + + /// + /// Get all incoming connections to a given process + /// + /// + /// + public IEnumerable GetConnectionsToProcess(string processName) + { + return _connectionTableManager.GetConnectionsToProcess(processName); + } + + + /// + /// Gets a list of all processes registered with CRA + /// + /// + public IEnumerable GetProcessNames() + { + return _processTableManager.GetProcessNames(); + } + + private CloudTable CreateTableIfNotExists(string tableName) + { + CloudTable table = _tableClient.GetTableReference(tableName); + try + { + table.CreateIfNotExists(); + } + catch { } + + return table; + } + + /// + /// Disconnect a CRA connection + /// + /// + /// + /// + /// + public void Disconnect(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput) + { + _connectionTableManager.DeleteConnection(fromProcessName, fromProcessOutput, toProcessName, toProcessInput); + } + } +} diff --git a/src/CRA.ClientLibrary/Main/CRAWorker.cs b/src/CRA.ClientLibrary/Main/CRAWorker.cs new file mode 100644 index 0000000..2b6de1f --- /dev/null +++ b/src/CRA.ClientLibrary/Main/CRAWorker.cs @@ -0,0 +1,587 @@ +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Blob; +using Microsoft.WindowsAzure.Storage.Table; +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace CRA.ClientLibrary +{ + /// + /// Worker library for Common Runtime for Applications (CRA) + /// + public class CRAWorker + { + CRAClientLibrary _craClient; + + string _workerinstanceName; + string _address; + int _port; + + // Azure storage clients + string _storageConnectionString; + CloudStorageAccount _storageAccount; + CloudBlobClient _blobClient; + CloudTableClient _tableClient; + CloudTable _workerInstanceTable; + CloudTable _connectionTable; + + // Timer updateTimer + ConcurrentDictionary _localProcessTable = new ConcurrentDictionary(); + ConcurrentDictionary inConnections = new ConcurrentDictionary(); + ConcurrentDictionary outConnections = new ConcurrentDictionary(); + + /// + /// Instance name + /// + public string InstanceName { get { return _workerinstanceName; } } + + /// + /// Define a new worker instance of Common Runtime for Applications (CRA) + /// + /// Name of the worker instance + /// IP address + /// Port + /// Storage account to store metadata + public CRAWorker(string workerInstanceName, string address, int port, string storageConnectionString) + { + _craClient = new CRAClientLibrary(storageConnectionString, this); + + _workerinstanceName = workerInstanceName; + _address = address; + _port = port; + + _storageConnectionString = storageConnectionString; + _storageAccount = CloudStorageAccount.Parse(_storageConnectionString); + _blobClient = _storageAccount.CreateCloudBlobClient(); + _tableClient = _storageAccount.CreateCloudTableClient(); + _workerInstanceTable = CreateTableIfNotExists("processtableforcra"); + _connectionTable = CreateTableIfNotExists("connectiontableforcra"); + } + + /// + /// Start the CRA worker. This method does not return. + /// + public void Start() + { + // Update process table + _craClient.RegisterInstance(_workerinstanceName, _address, _port); + + // Restore processes on machine - not connections between processes + // This ensures others can establish connections to it as soon as + // as we start the server + RestoreProcesses(); + + Thread serverThread = new Thread(StartServer); + serverThread.Start(); + + // Restore connections to/from the processes on this machine + RestoreConnections(null); + + + + // Wait for server to complete execution + serverThread.Join(); + } + + + private void StartServer() + { + var server = new TcpListener(IPAddress.Parse(_address), _port); + + // Start listening for client requests. + server.Start(); + + while (true) + { + Debug.WriteLine("Waiting for a connection... "); + TcpClient client = server.AcceptTcpClient(); + Debug.WriteLine("Connected!"); + + // Get a stream object for reading and writing + NetworkStream stream = client.GetStream(); + + CRATaskMessageType message = (CRATaskMessageType)stream.ReadInteger(); + + switch (message) + { + case CRATaskMessageType.LOAD_PROCESS: + Task.Run(() => LoadProcess(stream)); + break; + + case CRATaskMessageType.CONNECT_PROCESS_INITIATOR: + Task.Run(() => ConnectProcess_Initiator(stream, false)); + break; + + case CRATaskMessageType.CONNECT_PROCESS_RECEIVER: + Task.Run(() => ConnectProcess_Receiver(stream, false)); + break; + + case CRATaskMessageType.CONNECT_PROCESS_INITIATOR_REVERSE: + Task.Run(() => ConnectProcess_Initiator(stream, true)); + break; + + case CRATaskMessageType.CONNECT_PROCESS_RECEIVER_REVERSE: + Task.Run(() => ConnectProcess_Receiver(stream, true)); + break; + + default: + Console.WriteLine("Unknown message type: " + message); + break; + } + } + } + + private void LoadProcess(object streamObject) + { + var stream = (Stream)streamObject; + + string processName = Encoding.UTF8.GetString(stream.ReadByteArray()); + string processDefinition = Encoding.UTF8.GetString(stream.ReadByteArray()); + string processParam = Encoding.UTF8.GetString(stream.ReadByteArray()); + + _craClient.LoadProcess(processName, processDefinition, processParam, _workerinstanceName, _localProcessTable); + + stream.WriteInteger(0); + stream.Close(); + } + + private void ConnectProcess_Initiator(object streamObject, bool reverse = false) + { + var stream = (Stream)streamObject; + + string fromProcessName = Encoding.UTF8.GetString(stream.ReadByteArray()); + string fromProcessOutput = Encoding.UTF8.GetString(stream.ReadByteArray()); + string toProcessName = Encoding.UTF8.GetString(stream.ReadByteArray()); + string toProcessInput = Encoding.UTF8.GetString(stream.ReadByteArray()); + + Debug.WriteLine("Processing request to initiate connection"); + + if (!reverse) + { + if (!_localProcessTable.ContainsKey(fromProcessName)) + { + stream.WriteInteger((int)CRAErrorCode.ProcessNotFound); + stream.Close(); + return; + } + + if (!_localProcessTable[fromProcessName].OutputEndpoints.ContainsKey(fromProcessOutput) && + !_localProcessTable[fromProcessName].AsyncOutputEndpoints.ContainsKey(fromProcessOutput) + ) + { + stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound); + stream.Close(); + return; + } + } + else + { + if (!_localProcessTable.ContainsKey(toProcessName)) + { + stream.WriteInteger((int)CRAErrorCode.ProcessNotFound); + stream.Close(); + return; + } + + if (!_localProcessTable[toProcessName].InputEndpoints.ContainsKey(toProcessInput) && + !_localProcessTable[toProcessName].AsyncInputEndpoints.ContainsKey(toProcessInput) + ) + { + stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound); + stream.Close(); + return; + } + } + + + var result = Connect_InitiatorSide(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse); + + stream.WriteInteger((int)result); + stream.Close(); + } + + internal CRAErrorCode Connect_InitiatorSide(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse, bool killIfExists = true, bool killRemote = true) + { + CancellationTokenSource oldSource; + + var conn = reverse ? inConnections : outConnections; + + if (conn.TryGetValue(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, + out oldSource)) + { + if (killIfExists) + { + Debug.WriteLine("Deleting prior connection - it will automatically reconnect"); + oldSource.Cancel(); + } + return CRAErrorCode.Success; + } + + // Need to get the latest address & port + var row = reverse ? ProcessTable.GetRowForProcess(_workerInstanceTable, fromProcessName) : ProcessTable.GetRowForProcess(_workerInstanceTable, toProcessName); + + + + + // Send request to CRA instance + TcpClient client = null; + NetworkStream ns = null; + try + { + var _row = ProcessTable.GetRowForInstanceProcess(_workerInstanceTable, row.InstanceName, ""); + client = new TcpClient(_row.Address, _row.Port); + ns = client.GetStream(); + } + catch + { + return CRAErrorCode.ConnectionEstablishFailed; + } + + if (!reverse) + ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER); + else + ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER_REVERSE); + + ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessName)); + ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessOutput)); + ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessName)); + ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessInput)); + ns.WriteInteger(killRemote ? 1 : 0); + CRAErrorCode result = (CRAErrorCode) ns.ReadInteger(); + + if (result != 0) + { + Debug.WriteLine("Client received error code: " + result); + } + else + { + CancellationTokenSource source = new CancellationTokenSource(); + + if (!reverse) + { + if (outConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source)) + { + Task.Run(() => + EgressToStream(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, ns, source)); + } + else + { + source.Dispose(); + ns.Close(); + Console.WriteLine("Race adding connection - deleting outgoing stream"); + return CRAErrorCode.ConnectionAdditionRace; + } + } + else + { + if (inConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source)) + { + Task.Run(() => IngressFromStream + (fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, ns, source)); + } + else + { + source.Dispose(); + ns.Close(); + Debug.WriteLine("Race adding connection - deleting outgoing stream"); + return CRAErrorCode.ConnectionAdditionRace; + } + } + } + return result; + } + + private async Task EgressToStream(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, + bool reverse, Stream ns, CancellationTokenSource source) + { + try + { + if (_localProcessTable[fromProcessName].OutputEndpoints.ContainsKey(fromProcessOutput)) + { + await + Task.Run(() => + _localProcessTable[fromProcessName].OutputEndpoints[fromProcessOutput] + .ToStream(ns, toProcessName, toProcessInput, source.Token), source.Token); + } + else if (_localProcessTable[fromProcessName].AsyncOutputEndpoints.ContainsKey(fromProcessOutput)) + { + await _localProcessTable[fromProcessName].AsyncOutputEndpoints[fromProcessOutput].ToStreamAsync(ns, toProcessName, toProcessInput, source.Token); + } + else + { + Debug.WriteLine("Unable to find output endpoint (on from side)"); + return; + } + + CancellationTokenSource oldSource; + if (outConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out oldSource)) + { + oldSource.Dispose(); + ns.Dispose(); + _craClient.Disconnect(fromProcessName, fromProcessOutput, toProcessName, toProcessInput); + } + } + catch (Exception e) + { + Debug.WriteLine("Exception (" + e.ToString() + ") in outgoing stream - reconnecting"); + CancellationTokenSource oldSource; + if (outConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out oldSource)) + { + oldSource.Dispose(); + } + else + { + Debug.WriteLine("Unexpected: caught exception in ToStream but entry absent in outConnections"); + } + + // Retry following while connection not in list + RetryRestoreConnection(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, false); + } + } + + private void ConnectProcess_Receiver(object streamObject, bool reverse = false) + { + var stream = (Stream)streamObject; + + string fromProcessName = Encoding.UTF8.GetString(stream.ReadByteArray()); + string fromProcessOutput = Encoding.UTF8.GetString(stream.ReadByteArray()); + string toProcessName = Encoding.UTF8.GetString(stream.ReadByteArray()); + string toProcessInput = Encoding.UTF8.GetString(stream.ReadByteArray()); + bool killIfExists = stream.ReadInteger() == 1 ? true : false; + + if (!reverse) + { + if (!_localProcessTable.ContainsKey(toProcessName)) + { + stream.WriteInteger((int)CRAErrorCode.ProcessNotFound); + stream.Close(); + return; + } + + if (!_localProcessTable[toProcessName].InputEndpoints.ContainsKey(toProcessInput) && + !_localProcessTable[toProcessName].AsyncInputEndpoints.ContainsKey(toProcessInput) + ) + { + stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound); + stream.Close(); + return; + } + } + else + { + if (!_localProcessTable.ContainsKey(fromProcessName)) + { + stream.WriteInteger((int)CRAErrorCode.ProcessNotFound); + stream.Close(); + return; + } + + if (!_localProcessTable[fromProcessName].OutputEndpoints.ContainsKey(fromProcessOutput) && + !_localProcessTable[fromProcessName].AsyncOutputEndpoints.ContainsKey(fromProcessOutput) + ) + { + stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound); + stream.Close(); + return; + } + } + + var result = Connect_ReceiverSide(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, stream, reverse, killIfExists); + + // Do not close and dispose stream because it is being reused for data + if (result != 0) + { + stream.Close(); + } + } + + private int Connect_ReceiverSide(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, Stream stream, bool reverse, bool killIfExists = true) + { + CancellationTokenSource oldSource; + + var conn = reverse ? outConnections : inConnections; + + if (conn.TryGetValue(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, + out oldSource)) + { + if (killIfExists) + { + Debug.WriteLine("Deleting prior connection - it will automatically reconnect"); + oldSource.Cancel(); + } + else + { + Debug.WriteLine("There exists prior connection - not killing"); + } + stream.WriteInteger((int)CRAErrorCode.ServerRecovering); + return (int)CRAErrorCode.ServerRecovering; + } + else + { + stream.WriteInteger(0); + } + + CancellationTokenSource source = new CancellationTokenSource(); + + if (!reverse) + { + if (inConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source)) + { + Task.Run(() => + IngressFromStream(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, stream, source)); + return (int)CRAErrorCode.Success; + } + else + { + source.Dispose(); + stream.Close(); + Debug.WriteLine("Race adding connection - deleting incoming stream"); + return (int)CRAErrorCode.ConnectionAdditionRace; + } + } + else + { + if (outConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source)) + { + Task.Run(() => + EgressToStream(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, stream, source)); + return (int)CRAErrorCode.Success; + } + else + { + source.Dispose(); + stream.Close(); + Debug.WriteLine("Race adding connection - deleting incoming stream"); + return (int)CRAErrorCode.ConnectionAdditionRace; + } + } + } + + private async Task IngressFromStream(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse, Stream ns, CancellationTokenSource source) + { + try + { + if (_localProcessTable[toProcessName].InputEndpoints.ContainsKey(toProcessInput)) + { + await Task.Run( + () => _localProcessTable[toProcessName].InputEndpoints[toProcessInput] + .FromStream(ns, fromProcessName, fromProcessOutput, source.Token), source.Token); + } + else if (_localProcessTable[toProcessName].AsyncInputEndpoints.ContainsKey(toProcessInput)) + { + await _localProcessTable[toProcessName].AsyncInputEndpoints[toProcessInput].FromStreamAsync(ns, fromProcessName, fromProcessOutput, source.Token); + } + else + { + Debug.WriteLine("Unable to find input endpoint (on to side)"); + return; + } + + // Completed FromStream successfully + CancellationTokenSource oldSource; + if (outConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out oldSource)) + { + oldSource.Dispose(); + ns.Dispose(); + _craClient.Disconnect(fromProcessName, fromProcessOutput, toProcessName, toProcessInput); + } + } + catch (Exception e) + { + Debug.WriteLine("Exception (" + e.ToString() + ") in incoming stream - reconnecting"); + CancellationTokenSource tokenSource; + if (inConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out tokenSource)) + { + tokenSource.Dispose(); + } + else + { + Debug.WriteLine("Unexpected: caught exception in FromStream but entry absent in inConnections"); + } + + RetryRestoreConnection(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, true); + } + } + + private void RestoreProcesses() + { + var rows = ProcessTable.GetAllRowsForInstance(_workerInstanceTable, _workerinstanceName); + + foreach (var row in rows) + { + if (string.IsNullOrEmpty(row.ProcessName)) continue; + + _craClient.LoadProcess(row.ProcessName, row.ProcessDefinition, row.ProcessParameter, _workerinstanceName, _localProcessTable); + } + } + + private void RestoreConnections(object obj) + { + var rows = ProcessTable.GetAllRowsForInstance(_workerInstanceTable, _workerinstanceName); + + foreach (var _row in rows) + { + if (string.IsNullOrEmpty(_row.ProcessName)) continue; + + // Decide what to do if connection creation fails + var outRows = ConnectionTable.GetAllConnectionsFromProcess(_connectionTable, _row.ProcessName).ToList(); + foreach (var row in outRows) + { + Task.Run(() => RetryRestoreConnection(row.FromProcess, row.FromEndpoint, row.ToProcess, row.ToEndpoint, false)); + } + + var inRows = ConnectionTable.GetAllConnectionsToProcess(_connectionTable, _row.ProcessName).ToList(); + foreach (var row in inRows) + { + Task.Run(() => RetryRestoreConnection(row.FromProcess, row.FromEndpoint, row.ToProcess, row.ToEndpoint, true)); + } + } + } + + private void RetryRestoreConnection(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse) + { + var conn = reverse ? inConnections : outConnections; + + bool killRemote = false; + while (!conn.ContainsKey(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput)) + { + if (!ConnectionTable.ContainsConnection(_connectionTable, fromProcessName, fromProcessOutput, toProcessName, toProcessInput)) + break; + + Debug.WriteLine("Connecting " + fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput); + Debug.WriteLine("Connecting with killRemote set to " + (killRemote ? "true" : "false")); + + var result = Connect_InitiatorSide(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, false, killRemote); + + if (result != 0) + { + if (result == CRAErrorCode.ServerRecovering) + killRemote = true; + Thread.Sleep(5000); + } + else + break; + } + } + + private CloudTable CreateTableIfNotExists(string tableName) + { + CloudTable table = _tableClient.GetTableReference(tableName); + try + { + table.CreateIfNotExists(); + } + catch { } + + return table; + } + } +} diff --git a/src/CRA.ClientLibrary/Processes/DetachedProcess.cs b/src/CRA.ClientLibrary/Processes/DetachedProcess.cs new file mode 100644 index 0000000..a944137 --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/DetachedProcess.cs @@ -0,0 +1,340 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Net.Sockets; +using System.Text; + +namespace CRA.ClientLibrary +{ + /// + /// All connections to/from this detached process + /// + public class ConnectionData + { + /// + /// Input endpoints + /// + public ConcurrentDictionary InputConnections { get; } + + /// + /// Output endpoints + /// + public ConcurrentDictionary OutputConnections { get; } + + /// + /// + /// + public ConnectionData() + { + InputConnections = new ConcurrentDictionary(); + OutputConnections = new ConcurrentDictionary(); + } + } + + /// + /// Endpoint information for process + /// + public class EndpointData + { + /// + /// Input endpoints + /// + public ConcurrentDictionary InputEndpoints { get; } + + /// + /// Output endpoints + /// + public ConcurrentDictionary OutputEndpoints { get; } + + /// + /// Constructor + /// + public EndpointData() + { + InputEndpoints = new ConcurrentDictionary(); + OutputEndpoints = new ConcurrentDictionary(); + } + } + + /// + /// Process proxy for applications using CRA sideways + /// + public class DetachedProcess : IDisposable + { + /// + /// Connection data + /// + public ConnectionData ConnectionData { get; set; } + + /// + /// Endpoint data + /// + public EndpointData EndpointData { get; set; } + + private CRAClientLibrary _clientLibrary; + private string _processName; + private string _instanceName; + private bool _isEphemeralInstance; + + /// + /// + /// + /// + /// + /// + public DetachedProcess(string processName, string instanceName, CRAClientLibrary clientLibrary) + { + _processName = processName; + _clientLibrary = clientLibrary; + + if (instanceName == "") + { + // _clientLibrary._processTableManager.GetRowForProcess(processName); + _instanceName = RandomString(16); + _isEphemeralInstance = true; + _clientLibrary.RegisterInstance(_instanceName, "", 0); + } + + _clientLibrary._processTableManager.RegisterProcess(_processName, _instanceName); + + EndpointData = new EndpointData(); + ConnectionData = new ConnectionData(); + } + + /// + /// Add input endpoint + /// + /// Endpoint name + public void AddInputEndpoint(string endpointName) + { + _clientLibrary.AddEndpoint(_processName, endpointName, true, false); + EndpointData.InputEndpoints.TryAdd(endpointName, true); + } + + /// + /// Add output endpoint + /// + /// Endpoint name + public void AddOutputEndpoint(string endpointName) + { + _clientLibrary.AddEndpoint(_processName, endpointName, false, false); + EndpointData.OutputEndpoints.TryAdd(endpointName, true); + } + + /// + /// Create connection stream from remote output endpoint to local input endpoint + /// + /// + /// + /// + /// + public Stream FromRemoteOutputEndpointStream(string localInputEndpointName, string remoteProcess, string remoteOutputEndpoint) + { + AddInputEndpoint(localInputEndpointName); + + _clientLibrary.AddConnectionInfo(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpointName); + var stream = Connect_InitiatorSide(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpointName, true); + var conn = new ConnectionInfo(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpointName); + ConnectionData.InputConnections.AddOrUpdate(conn, stream, (c, s1) => { s1?.Dispose(); return stream; }); + return stream; + } + + + /// + /// Create connection stream from local output endpoint to remote input endpoint + /// + /// + /// + /// + /// + public Stream ToRemoteInputEndpointStream(string localOutputEndpointName, string remoteProcess, string remoteInputEndpoint) + { + AddOutputEndpoint(localOutputEndpointName); + _clientLibrary.AddConnectionInfo(_processName, localOutputEndpointName, remoteProcess, remoteInputEndpoint); + var stream = Connect_InitiatorSide(_processName, localOutputEndpointName, remoteProcess, remoteInputEndpoint, false); + var conn = new ConnectionInfo(_processName, localOutputEndpointName, remoteProcess, remoteInputEndpoint); + ConnectionData.OutputConnections.AddOrUpdate(conn, stream, (c, s1) => { s1?.Dispose(); return stream; }); + return stream; + } + + /// + /// Restore information about endpoints of this detached process + /// + /// + public void RestoreEndpointData() + { + foreach (var endpt in _clientLibrary.GetInputEndpoints(_processName)) + { + EndpointData.InputEndpoints.TryAdd(endpt, true); + } + + foreach (var endpt in _clientLibrary.GetOutputEndpoints(_processName)) + { + EndpointData.OutputEndpoints.TryAdd(endpt, true); + } + } + + /// + /// Restore all connections from/to this process, in the CRA connection graph + /// + /// + public void RestoreAllConnections() + { + foreach (var outConn in _clientLibrary.GetConnectionsFromProcess(_processName)) + { + var stream = ToRemoteInputEndpointStream(outConn.FromEndpoint, outConn.ToProcess, outConn.ToEndpoint); + ConnectionData.OutputConnections.AddOrUpdate(outConn, stream, (c, s1) => { s1?.Dispose(); return stream; }); + } + + foreach (var inConn in _clientLibrary.GetConnectionsToProcess(_processName)) + { + var stream = FromRemoteOutputEndpointStream(inConn.ToEndpoint, inConn.FromProcess, inConn.FromEndpoint); + ConnectionData.OutputConnections.AddOrUpdate(inConn, stream, (c, s1) => { s1?.Dispose(); return stream; }); + } + } + + /// + /// Restore a process/instance pair + /// + public void Restore() + { + RestoreEndpointData(); + RestoreAllConnections(); + } + + + /// + /// Restore all connections from/to this process that are set to 'null' locally, in the CRA connection graph + /// + /// + public ConnectionData RestoreNullConnections() + { + foreach (var outConn in ConnectionData.OutputConnections) + { + if (outConn.Value == null) + { + var stream = ToRemoteInputEndpointStream(outConn.Key.FromEndpoint, outConn.Key.ToProcess, outConn.Key.ToEndpoint); + ConnectionData.OutputConnections[outConn.Key] = stream; + } + } + + foreach (var inConn in ConnectionData.InputConnections) + { + if (inConn.Value == null) + { + var stream = FromRemoteOutputEndpointStream(inConn.Key.ToEndpoint, inConn.Key.FromProcess, inConn.Key.FromEndpoint); + ConnectionData.InputConnections[inConn.Key] = stream; + } + } + return ConnectionData; + } + + + /// + /// Dispose the detached process + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + + } + + private void Dispose(bool disposed) + { + if (disposed) + { + if (_isEphemeralInstance) + { + _clientLibrary.DeleteInstance(_instanceName); + } + _clientLibrary.DeleteProcess(_processName, _instanceName); + foreach (var endpt in EndpointData.InputEndpoints.Keys) + { + _clientLibrary.DeleteEndpoint(_processName, endpt); + } + foreach (var endpt in EndpointData.OutputEndpoints.Keys) + { + _clientLibrary.DeleteEndpoint(_processName, endpt); + } + + EndpointData.InputEndpoints.Clear(); + EndpointData.OutputEndpoints.Clear(); + + foreach (var kvp in ConnectionData.InputConnections) + { + _clientLibrary.DeleteConnectionInfo(kvp.Key.FromProcess, kvp.Key.FromEndpoint, kvp.Key.ToProcess, kvp.Key.ToEndpoint); + if (kvp.Value != null) kvp.Value.Dispose(); + } + + foreach (var kvp in ConnectionData.OutputConnections) + { + _clientLibrary.DeleteConnectionInfo(kvp.Key.FromProcess, kvp.Key.FromEndpoint, kvp.Key.ToProcess, kvp.Key.ToEndpoint); + if (kvp.Value != null) kvp.Value.Dispose(); + } + + ConnectionData.InputConnections.Clear(); + ConnectionData.OutputConnections.Clear(); + } + } + + private static Random random = new Random(); + private static string RandomString(int length) + { + const string chars = "abcdefghijklmnopqrstuvwxyz"; + return new string(Enumerable.Repeat(chars, length) + .Select(s => s[random.Next(s.Length)]).ToArray()); + } + + private Stream Connect_InitiatorSide(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse) + { + bool killRemote = true; // we have no way of receiving connections + + var _processTableManager = _clientLibrary._processTableManager; + + // Need to get the latest address & port + var row = reverse ? _processTableManager.GetRowForProcess(fromProcessName) : _processTableManager.GetRowForProcess(toProcessName); + + var _row = _processTableManager.GetRowForInstanceProcess(row.InstanceName, ""); + + + // Send request to CRA instance + TcpClient client = null; + NetworkStream ns = null; + try + { + client = new TcpClient(_row.Address, _row.Port); + ns = client.GetStream(); + } + catch + { + return null; + } + + if (!reverse) + ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER); + else + ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER_REVERSE); + + ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessName)); + ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessOutput)); + ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessName)); + ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessInput)); + ns.WriteInteger(killRemote ? 1 : 0); + CRAErrorCode result = (CRAErrorCode)ns.ReadInteger(); + + if (result != 0) + { + Debug.WriteLine("Client received error code: " + result); + ns.Dispose(); + return null; + } + else + { + return ns; + } + } + } +} diff --git a/src/CRA.ClientLibrary/Processes/IAsyncProcessInputEndpoint.cs b/src/CRA.ClientLibrary/Processes/IAsyncProcessInputEndpoint.cs new file mode 100644 index 0000000..dcd6be0 --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/IAsyncProcessInputEndpoint.cs @@ -0,0 +1,31 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace CRA.ClientLibrary +{ + /// + /// Interface for async input endpoints in CRA + /// + public interface IAsyncProcessInputEndpoint : IDisposable + { + /// + /// Async version of FromStream + /// + /// + /// + /// + /// + /// + Task FromStreamAsync(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token); + + /// + /// Async version of FromOutput + /// + /// + /// + /// + Task FromOutputAsync(IProcessOutputEndpoint endpoint, CancellationToken token); + } +} diff --git a/src/CRA.ClientLibrary/Processes/IAsyncProcessOutputEndpoint.cs b/src/CRA.ClientLibrary/Processes/IAsyncProcessOutputEndpoint.cs new file mode 100644 index 0000000..f86b5c7 --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/IAsyncProcessOutputEndpoint.cs @@ -0,0 +1,31 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace CRA.ClientLibrary +{ + /// + /// Interface for async output endpoints in CRA + /// + public interface IAsyncProcessOutputEndpoint : IDisposable + { + /// + /// Async version of ToStream + /// + /// + /// + /// + /// + /// + Task ToStreamAsync(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token); + + /// + /// Async version of ToInput + /// + /// + /// + /// + Task ToInputAsync(IProcessInputEndpoint endpoint, CancellationToken token); + } +} diff --git a/src/CRA.ClientLibrary/Processes/IProcess.cs b/src/CRA.ClientLibrary/Processes/IProcess.cs new file mode 100644 index 0000000..86b015f --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/IProcess.cs @@ -0,0 +1,75 @@ +using System; +using System.Collections.Concurrent; + +namespace CRA.ClientLibrary +{ + + /// + /// User provided notion of a running process + /// + public interface IProcess : IDisposable + { + /// + /// Ingress points for a process; these are observers + /// + ConcurrentDictionary InputEndpoints { get; } + + /// + /// Egress points for a process; these are observables + /// + ConcurrentDictionary OutputEndpoints { get; } + + /// + /// Ingress points for a process; these are observers + /// + ConcurrentDictionary AsyncInputEndpoints { get; } + + /// + /// Egress points for a process; these are observables + /// + ConcurrentDictionary AsyncOutputEndpoints { get; } + + + /// + /// Callback that process will invoke when a new input is added + /// + /// + void OnAddInputEndpoint(Action addInputCallback); + + /// + /// Callback that process will invoke when a new output is added + /// + /// + void OnAddAsyncOutputEndpoint(Action addOutputCallback); + + /// + /// Callback that process will invoke when a new input is added + /// + /// + void OnAddAsyncInputEndpoint(Action addInputCallback); + + /// + /// Callback that process will invoke when a new output is added + /// + /// + void OnAddOutputEndpoint(Action addOutputCallback); + + /// + /// Callback when process is disposed + /// + void OnDispose(Action disposeCallback); + + /// + /// Gets an instance of the CRA Client Library that the process + /// can use to communicate with the CRA runtime. + /// + /// Instance of CRA Client Library + CRAClientLibrary GetClientLibrary(); + + /// + /// Initialize process with specified params + /// + /// + void Initialize(object processParameter); + } +} diff --git a/src/CRA.ClientLibrary/Processes/IProcessInputEndpoint.cs b/src/CRA.ClientLibrary/Processes/IProcessInputEndpoint.cs new file mode 100644 index 0000000..398888a --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/IProcessInputEndpoint.cs @@ -0,0 +1,28 @@ +using System; +using System.IO; +using System.Threading; + +namespace CRA.ClientLibrary +{ + /// + /// Interface for input endpoints in CRA + /// + public interface IProcessInputEndpoint : IDisposable + { + /// + /// Call to provide a stream for input to read from + /// + /// + /// + /// + /// + void FromStream(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token); + + /// + /// Call to provide an output endpoint for input to read from + /// + /// + /// + void FromOutput(IProcessOutputEndpoint endpoint, CancellationToken token); + } +} diff --git a/src/CRA.ClientLibrary/Processes/IProcessOutputEndpoint.cs b/src/CRA.ClientLibrary/Processes/IProcessOutputEndpoint.cs new file mode 100644 index 0000000..7de1950 --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/IProcessOutputEndpoint.cs @@ -0,0 +1,28 @@ +using System; +using System.IO; +using System.Threading; + +namespace CRA.ClientLibrary +{ + /// + /// Interface for output endpoints in CRA + /// + public interface IProcessOutputEndpoint : IDisposable + { + /// + /// Call to provide a stream for output to write to + /// + /// + /// + /// + /// + void ToStream(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token); + + /// + /// Call to provide an input endpoint for output to write to + /// + /// + /// + void ToInput(IProcessInputEndpoint endpoint, CancellationToken token); + } +} diff --git a/src/CRA.ClientLibrary/Processes/ProcessBase.cs b/src/CRA.ClientLibrary/Processes/ProcessBase.cs new file mode 100644 index 0000000..e478d2b --- /dev/null +++ b/src/CRA.ClientLibrary/Processes/ProcessBase.cs @@ -0,0 +1,322 @@ +using System; +using System.Collections.Concurrent; + +namespace CRA.ClientLibrary +{ + /// + /// Base class for Process abstraction + /// + public abstract class ProcessBase : IProcess + { + private string _processName; + + private ConcurrentDictionary _inputEndpoints = new ConcurrentDictionary(); + private ConcurrentDictionary _outputEndpoints = new ConcurrentDictionary(); + private Action onAddInputEndpoint; + private Action onAddOutputEndpoint; + private Action onDispose; + + private ConcurrentDictionary _asyncInputEndpoints = new ConcurrentDictionary(); + private ConcurrentDictionary _asyncOutputEndpoints = new ConcurrentDictionary(); + private Action onAddAsyncInputEndpoint; + private Action onAddAsyncOutputEndpoint; + private CRAClientLibrary _clientLibrary; + + /// + /// Constructor + /// + protected ProcessBase() + { + onAddInputEndpoint = (key, proc) => _inputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc); + onAddOutputEndpoint = (key, proc) => _outputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc); + + onAddAsyncInputEndpoint = (key, proc) => _asyncInputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc); + onAddAsyncOutputEndpoint = (key, proc) => _asyncOutputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc); + } + + /// + /// Gets an instance of the CRA client library + /// + /// + public CRAClientLibrary GetClientLibrary() + { + return _clientLibrary; + } + + /// + /// Set instance of CRA client library + /// + /// + public void SetClientLibrary(CRAClientLibrary lib) + { + _clientLibrary = lib; + } + + /// + /// Dictionary of output endpoints for the process + /// + public ConcurrentDictionary OutputEndpoints + { + get + { + return _outputEndpoints; + } + } + + /// + /// Dictionary of input endpoints for the process + /// + public ConcurrentDictionary InputEndpoints + { + get + { + return _inputEndpoints; + } + } + + /// + /// Dictionary of async output endpoints for the process + /// + public ConcurrentDictionary AsyncOutputEndpoints + { + get + { + return _asyncOutputEndpoints; + } + } + + /// + /// Dictionary of async input endpoints for the process + /// + public ConcurrentDictionary AsyncInputEndpoints + { + get + { + return _asyncInputEndpoints; + } + } + + /// + /// Connect local output endpoint (ToStream) to remote process' input endpoint (FromStream) + /// + /// Local output endpoint + /// Remote process name + /// Remote input endpoint + public void ConnectLocalOutputEndpoint(string localOutputEndpoint, string remoteProcess, string remoteInputEndpoint) + { + _clientLibrary.Connect(_processName, localOutputEndpoint, remoteProcess, remoteInputEndpoint); + } + + /// + /// Connect local input endpoint (FromStream) to remote process' output endpoint (ToStream) + /// + /// Local input endpoint + /// Remote process name + /// Remote output endpoint + public void ConnectLocalInputEndpoint(string localInputEndpoint, string remoteProcess, string remoteOutputEndpoint) + { + _clientLibrary.Connect(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpoint, ConnectionInitiator.ToSide); + } + + + /// + /// Add callback for when input endpoint is added + /// + /// + public void OnAddInputEndpoint(Action addInputCallback) + { + lock (this) + { + foreach (var key in InputEndpoints.Keys) + { + addInputCallback(key, InputEndpoints[key]); + } + + onAddInputEndpoint += addInputCallback; + } + } + + /// + /// Add callback for when output endpoint is added + /// + /// + public void OnAddOutputEndpoint(Action addOutputCallback) + { + lock (this) + { + foreach (var key in OutputEndpoints.Keys) + { + addOutputCallback(key, OutputEndpoints[key]); + } + + onAddOutputEndpoint += addOutputCallback; + } + } + + /// + /// Add callback for when async input endpoint is added + /// + /// + public void OnAddAsyncInputEndpoint(Action addInputCallback) + { + lock (this) + { + foreach (var key in AsyncInputEndpoints.Keys) + { + addInputCallback(key, AsyncInputEndpoints[key]); + } + + onAddAsyncInputEndpoint += addInputCallback; + } + } + + /// + /// Add callback for when async output endpoint is added + /// + /// + public void OnAddAsyncOutputEndpoint(Action addOutputCallback) + { + lock (this) + { + foreach (var key in AsyncOutputEndpoints.Keys) + { + addOutputCallback(key, AsyncOutputEndpoints[key]); + } + + onAddAsyncOutputEndpoint += addOutputCallback; + } + } + + /// + /// Get the name of the process + /// + /// + public string ProcessName + { + get { return _processName; } + } + + + internal void SetProcessName(string processName) + { + _processName = processName; + } + + /// + /// Callback for dispose + /// + /// + public void OnDispose(Action disposeCallback) + { + lock (this) + { + if (onDispose == null) + onDispose = disposeCallback; + else + onDispose += disposeCallback; + } + } + + /// + /// Process implementor uses this to add input endpoint + /// + /// + /// + protected virtual void AddInputEndpoint(string key, IProcessInputEndpoint input) + { + lock (this) + { + onAddInputEndpoint(key, input); + } + } + + /// + /// Process implementor uses this to add output endpoint + /// + /// + /// + protected virtual void AddOutputEndpoint(string key, IProcessOutputEndpoint input) + { + lock (this) + { + onAddOutputEndpoint(key, input); + } + } + + /// + /// Process implementor uses this to add async input endpoint + /// + /// + /// + protected virtual void AddAsyncInputEndpoint(string key, IAsyncProcessInputEndpoint input) + { + lock (this) + { + onAddAsyncInputEndpoint(key, input); + } + } + + /// + /// Process implementor uses this to add async output endpoint + /// + /// + /// + protected virtual void AddAsyncOutputEndpoint(string key, IAsyncProcessOutputEndpoint input) + { + lock (this) + { + onAddAsyncOutputEndpoint(key, input); + } + } + + /// + /// Initialize + /// + /// + public virtual void Initialize(object processParameter) + { + + } + + /// + /// Dispose the process + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Actual dispose occurs here + /// + /// + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + onDispose?.Invoke(); + + lock (this) + { + foreach (var key in OutputEndpoints.Keys) + { + OutputEndpoints[key].Dispose(); + } + foreach (var key in InputEndpoints.Keys) + { + InputEndpoints[key].Dispose(); + } + foreach (var key in AsyncOutputEndpoints.Keys) + { + AsyncOutputEndpoints[key].Dispose(); + } + foreach (var key in AsyncInputEndpoints.Keys) + { + AsyncInputEndpoints[key].Dispose(); + } + } + } + } + } +} diff --git a/src/CRA.ClientLibrary/Properties/AssemblyInfo.cs b/src/CRA.ClientLibrary/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..2ab9c62 --- /dev/null +++ b/src/CRA.ClientLibrary/Properties/AssemblyInfo.cs @@ -0,0 +1,35 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("CRA.ClientLibrary")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("CRA.ClientLibrary")] +[assembly: AssemblyCopyright("Copyright © 2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("ef23eb6a-e329-496d-9b7a-8cad66ea4e3a")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/CRA.ClientLibrary/Tables/ConnectionTable.cs b/src/CRA.ClientLibrary/Tables/ConnectionTable.cs new file mode 100644 index 0000000..9796e55 --- /dev/null +++ b/src/CRA.ClientLibrary/Tables/ConnectionTable.cs @@ -0,0 +1,128 @@ +using Microsoft.WindowsAzure.Storage.Table; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; + +namespace CRA.ClientLibrary +{ + /// + /// An assignment of one machine to a group + /// + public class ConnectionTable : TableEntity + { + /// + /// Name of the from process + /// + public string FromProcess { get { return this.PartitionKey; } } + + /// + /// Other data related to connection + /// + public string EndpointToProcessEndpoint { get { return this.RowKey; } } + + /// + /// From endpoint + /// + public string FromEndpoint { get { return EndpointToProcessEndpoint.Split(':')[0]; } } + + /// + /// To process + /// + public string ToProcess { get { return EndpointToProcessEndpoint.Split(':')[1]; } } + + /// + /// To endpoint + /// + public string ToEndpoint { get { return EndpointToProcessEndpoint.Split(':')[2]; } } + + /// + /// Connection table + /// + /// + /// + /// + /// + public ConnectionTable(string fromProcess, string fromEndpoint, string toProcess, string toEndpoint) + { + this.PartitionKey = fromProcess; + this.RowKey = fromEndpoint + ":" + toProcess + ":" + toEndpoint; + } + + /// + /// + /// + public ConnectionTable() { } + + /// + /// ToString + /// + /// + public override string ToString() + { + return string.Format(CultureInfo.CurrentCulture, "FromProcess '{0}', FromEndpoint '{1}', ToProcess '{2}', ToEndpoint '{3}'", FromProcess, FromEndpoint, ToProcess, ToEndpoint); + } + + /// + /// Equality + /// + /// + /// + public override bool Equals(object obj) + { + ConnectionTable other = obj as ConnectionTable; + return this.PartitionKey.Equals(other.PartitionKey) && this.RowKey.Equals(other.RowKey); + } + + + /// + /// GetHashCode + /// + /// + public override int GetHashCode() + { + return PartitionKey.GetHashCode() ^ RowKey.GetHashCode(); + } + + /// + /// Returns a list of all visible nodes in all groups + /// + /// + /// + internal static IEnumerable GetAll(CloudTable instanceTable) + { + TableQuery query = new TableQuery(); + return instanceTable.ExecuteQuery(query); + } + + /// + /// Counts all nodes in the cluster regardless of their group + /// + /// + internal static int CountAll(CloudTable instanceTable) + { + return GetAll(instanceTable).Count(); + } + + internal static IEnumerable GetAllConnectionsFromProcess(CloudTable instanceTable, string fromProcess) + { + return GetAll(instanceTable).Where(gn => fromProcess == gn.PartitionKey); + } + + internal static IEnumerable GetAllConnectionsToProcess(CloudTable instanceTable, string toProcess) + { + return GetAll(instanceTable).Where(gn => toProcess == gn.ToProcess); + } + + internal static bool ContainsConnection(CloudTable instanceTable, string fromProcess, string fromEndpoint, string toProcess, string toEndpoint) + { + return ContainsRow(instanceTable, new ConnectionTable(fromProcess, fromEndpoint, toProcess, toEndpoint)); + } + + internal static bool ContainsRow(CloudTable instanceTable, ConnectionTable entity) + { + var temp = GetAll(instanceTable); + + return temp.Where(gn => entity.Equals(gn)).Count() > 0; + } + } +} diff --git a/src/CRA.ClientLibrary/Tables/ConnectionTableManager.cs b/src/CRA.ClientLibrary/Tables/ConnectionTableManager.cs new file mode 100644 index 0000000..18513ae --- /dev/null +++ b/src/CRA.ClientLibrary/Tables/ConnectionTableManager.cs @@ -0,0 +1,76 @@ +using System.Collections.Generic; +using System.Linq; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; + +namespace CRA.ClientLibrary +{ + /// + /// An assignment of one machine to a group + /// + public class ConnectionTableManager + { + private CloudTable _connectionTable; + + internal ConnectionTableManager(string storageConnectionString) + { + var _storageAccount = CloudStorageAccount.Parse(storageConnectionString); + var _tableClient = _storageAccount.CreateCloudTableClient(); + _connectionTable = CreateTableIfNotExists("connectiontableforcra", _tableClient); + } + + internal void DeleteTable() + { + _connectionTable.DeleteIfExists(); + } + + + internal void AddConnection(string fromProcess, string fromOutput, string toConnection, string toInput) + { + // Make the connection information stable + var newRow = new ConnectionTable(fromProcess, fromOutput, toConnection, toInput); + TableOperation insertOperation = TableOperation.InsertOrReplace(newRow); + _connectionTable.Execute(insertOperation); + } + + internal void DeleteConnection(string fromProcess, string fromOutput, string toConnection, string toInput) + { + // Make the connection information stable + var newRow = new ConnectionTable(fromProcess, fromOutput, toConnection, toInput); + newRow.ETag = "*"; + TableOperation deleteOperation = TableOperation.Delete(newRow); + _connectionTable.Execute(deleteOperation); + } + + internal List GetConnectionsFromProcess(string processName) + { + TableQuery query = new TableQuery() + .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName)); + + return _connectionTable + .ExecuteQuery(query) + .Select(e => new ConnectionInfo(e.FromProcess, e.FromEndpoint, e.ToProcess, e.ToEndpoint)) + .ToList(); + } + + internal List GetConnectionsToProcess(string processName) + { + return + ConnectionTable.GetAllConnectionsToProcess(_connectionTable, processName) + .Select(e => new ConnectionInfo(e.FromProcess, e.FromEndpoint, e.ToProcess, e.ToEndpoint)) + .ToList(); + } + + private CloudTable CreateTableIfNotExists(string tableName, CloudTableClient _tableClient) + { + CloudTable table = _tableClient.GetTableReference(tableName); + try + { + table.CreateIfNotExists(); + } + catch { } + + return table; + } + } +} diff --git a/src/CRA.ClientLibrary/Tables/EndpointTable.cs b/src/CRA.ClientLibrary/Tables/EndpointTable.cs new file mode 100644 index 0000000..9e0fbec --- /dev/null +++ b/src/CRA.ClientLibrary/Tables/EndpointTable.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using Microsoft.WindowsAzure.Storage.Table; + +namespace CRA.ClientLibrary +{ + /// + /// An assignment of one machine to a group + /// + public class EndpointTable : TableEntity + { + /// + /// The time interval at which workers refresh their membership entry + /// + public static readonly TimeSpan HeartbeatTime = TimeSpan.FromSeconds(10); + + /// + /// Name of the group + /// + public string ProcessName { get { return this.PartitionKey; } } + + /// + /// Endpoint name + /// + public string EndpointName { get { return this.RowKey; } } + + /// + /// Is an input (or output) + /// + public bool IsInput { get; set; } + + /// + /// Is async (or sync) + /// + public bool IsAsync { get; set; } + + + /// + /// Constructor + /// + /// + /// + /// + /// + public EndpointTable(string processName, string endpointName, bool isInput, bool isAsync) + { + this.PartitionKey = processName; + this.RowKey = endpointName; + + this.IsInput = isInput; + this.IsAsync = isAsync; + } + + /// + /// Constructor + /// + public EndpointTable() { } + + /// + /// ToString + /// + /// + public override string ToString() + { + return string.Format(CultureInfo.CurrentCulture, "Process '{0}', Endpoint '{1}'", PartitionKey, RowKey); + } + + /// + /// Equals + /// + /// + /// + public override bool Equals(object obj) + { + EndpointTable other = obj as EndpointTable; + return this.PartitionKey.Equals(other.PartitionKey) && this.RowKey.Equals(other.RowKey); + } + + /// + /// GetHashCode + /// + /// + public override int GetHashCode() + { + return PartitionKey.GetHashCode() ^ RowKey.GetHashCode(); + } + + /// + /// Returns a list of all visible nodes in all groups + /// + /// + /// + internal static IEnumerable GetAll(CloudTable instanceTable) + { + var query = new TableQuery(); + return instanceTable.ExecuteQuery(query); + } + } +} diff --git a/src/CRA.ClientLibrary/Tables/EndpointTableManager.cs b/src/CRA.ClientLibrary/Tables/EndpointTableManager.cs new file mode 100644 index 0000000..9170e0c --- /dev/null +++ b/src/CRA.ClientLibrary/Tables/EndpointTableManager.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; + +namespace CRA.ClientLibrary +{ + /// + /// An assignment of one machine to a group + /// + public class EndpointTableManager + { + private CloudTable _endpointTable; + + internal EndpointTableManager(string storageConnectionString) + { + var _storageAccount = CloudStorageAccount.Parse(storageConnectionString); + var _tableClient = _storageAccount.CreateCloudTableClient(); + _endpointTable = CreateTableIfNotExists("endpointtableforcra", _tableClient); + } + + internal void DeleteTable() + { + _endpointTable.DeleteIfExists(); + } + + internal bool ExistsEndpoint(string processName, string endPoint) + { + TableQuery query = new TableQuery() + .Where(TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName), + TableOperators.And, + TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, endPoint))); + return _endpointTable.ExecuteQuery(query).Any(); + } + + internal void AddEndpoint(string processName, string endpointName, bool isInput, bool isAsync) + { + // Make the connection information stable + var newRow = new EndpointTable(processName, endpointName, isInput, isAsync); + TableOperation insertOperation = TableOperation.InsertOrReplace(newRow); + _endpointTable.Execute(insertOperation); + } + + internal void DeleteEndpoint(string processName, string endpointName) + { + // Make the connection information stable + var newRow = new DynamicTableEntity(processName, endpointName); + newRow.ETag = "*"; + TableOperation deleteOperation = TableOperation.Delete(newRow); + _endpointTable.Execute(deleteOperation); + } + + internal void RemoveEndpoint(string processName, string endpointName) + { + var op = TableOperation.Retrieve(processName, endpointName); + TableResult retrievedResult = _endpointTable.Execute(op); + + // Assign the result to a CustomerEntity. + var deleteEntity = (EndpointTable)retrievedResult.Result; + + // Create the Delete TableOperation. + if (deleteEntity != null) + { + TableOperation deleteOperation = TableOperation.Delete(deleteEntity); + + // Execute the operation. + _endpointTable.Execute(deleteOperation); + } + else + { + Console.WriteLine("Could not retrieve the entity."); + } + } + + internal List GetInputEndpoints(string processName) + { + TableQuery query = new TableQuery() + .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName)); + return _endpointTable.ExecuteQuery(query).Where(e => e.IsInput).Select(e => e.EndpointName).ToList(); + } + + internal List GetOutputEndpoints(string processName) + { + TableQuery query = new TableQuery() + .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName)); + return _endpointTable.ExecuteQuery(query).Where(e => !e.IsInput).Select(e => e.EndpointName).ToList(); + } + + private CloudTable CreateTableIfNotExists(string tableName, CloudTableClient _tableClient) + { + CloudTable table = _tableClient.GetTableReference(tableName); + try + { + table.CreateIfNotExists(); + } + catch + { + } + + return table; + } + } +} diff --git a/src/CRA.ClientLibrary/Tables/ProcessTable.cs b/src/CRA.ClientLibrary/Tables/ProcessTable.cs new file mode 100644 index 0000000..592b313 --- /dev/null +++ b/src/CRA.ClientLibrary/Tables/ProcessTable.cs @@ -0,0 +1,233 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Linq.Expressions; +using Microsoft.WindowsAzure.Storage.Table; + +namespace CRA.ClientLibrary +{ + /// + /// An assignment of one machine to a group + /// + public class ProcessTable : TableEntity + { + /// + /// The time interval at which workers refresh their membership entry + /// + public static readonly TimeSpan HeartbeatTime = TimeSpan.FromSeconds(10); + + /// + /// Name of the CRA instance + /// + public string InstanceName { get { return this.PartitionKey; } } + + /// + /// Name of process + /// + public string ProcessName { get { return this.RowKey; } } + + /// + /// Definition of process + /// + public string ProcessDefinition { get; set; } + + /// + /// Name of the machine + /// + public string Address { get; set; } + + /// + /// Port number + /// + public int Port { get; set; } + + /// + /// Action to create process + /// + public string ProcessCreateAction { get; set; } + + /// + /// Parameter to process creator + /// + public string ProcessParameter { get; set; } + + /// + /// Constructor + /// + /// + /// + /// + /// + /// + /// + /// + public ProcessTable(string instanceName, string processName, string processDefinition, string address, int port, Expression> processCreateAction, object processParameter) + { + this.PartitionKey = instanceName; + this.RowKey = processName; + + this.ProcessDefinition = processDefinition; + this.Address = address; + this.Port = port; + this.ProcessCreateAction = ""; + + if (processCreateAction != null) + { + var closureEliminator = new ClosureEliminator(); + Expression processedUserLambdaExpression = closureEliminator.Visit( + processCreateAction); + + this.ProcessCreateAction = SerializationHelper.Serialize(processedUserLambdaExpression); + } + + this.ProcessParameter = SerializationHelper.SerializeObject(processParameter); + } + + /// + /// Constructor + /// + /// + /// + /// + /// + /// + /// + /// + public ProcessTable(string instanceName, string processName, string processDefinition, string address, int port, string processCreateAction, string processParameter) + { + this.PartitionKey = instanceName; + this.RowKey = processName; + + this.ProcessDefinition = processDefinition; + this.Address = address; + this.Port = port; + this.ProcessCreateAction = processCreateAction; + this.ProcessParameter = processParameter; + } + + /// + /// Constructor + /// + public ProcessTable() { } + + /// + /// ToString + /// + /// + public override string ToString() + { + return string.Format(CultureInfo.CurrentCulture, "Instance '{0}', Address '{1}', Port '{2}'", this.InstanceName, this.Address, this.Port); + } + + /// + /// Equals + /// + /// + /// + public override bool Equals(object obj) + { + ProcessTable other = obj as ProcessTable; + return this.PartitionKey.Equals(other.PartitionKey) && this.RowKey.Equals(other.RowKey); + } + + /// + /// GetHashCode + /// + /// + public override int GetHashCode() + { + return PartitionKey.GetHashCode() ^ RowKey.GetHashCode(); + } + + internal Func GetProcessCreateAction() + { + var expr = SerializationHelper.Deserialize(ProcessCreateAction); + var actionExpr = AddBox((LambdaExpression) expr); + return actionExpr.Compile(); + } + + internal object GetProcessParam() + { + return SerializationHelper.DeserializeObject(this.ProcessParameter); + } + + + private static Expression> AddBox(LambdaExpression expression) + { + Expression converted = Expression.Convert + (expression.Body, typeof(IProcess)); + return Expression.Lambda> + (converted, expression.Parameters); + } + + /// + /// Returns a list of all visible nodes in all groups + /// + /// + /// + internal static IEnumerable GetAll(CloudTable instanceTable) + { + TableQuery query = new TableQuery(); + return instanceTable.ExecuteQuery(query); + } + + /// + /// Counts all nodes in the cluster regardless of their group + /// + /// + internal static int CountAll(CloudTable instanceTable) + { + return GetAll(instanceTable).Count(); + } + + internal static ProcessTable GetInstanceFromAddress(CloudTable instanceTable, string address, int port) + { + return GetAll(instanceTable).Where(gn => address == gn.Address && port == gn.Port).First(); + } + + internal static ProcessTable GetRowForInstance(CloudTable instanceTable, string instanceName) + { + return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName && string.IsNullOrEmpty(gn.ProcessName)).First(); + } + internal static IEnumerable GetAllRowsForInstance(CloudTable instanceTable, string instanceName) + { + return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName); + } + + internal static ProcessTable GetRowForInstanceProcess(CloudTable instanceTable, string instanceName, string processName) + { + return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName && processName == gn.ProcessName).First(); + } + + internal static ProcessTable GetRowForProcessDefinition(CloudTable instanceTable, string processDefinition) + { + return GetAll(instanceTable).Where(gn => processDefinition == gn.ProcessName && string.IsNullOrEmpty(gn.InstanceName)).First(); + } + + internal static ProcessTable GetRowForProcess(CloudTable instanceTable, string processName) + { + return GetAll(instanceTable).Where(gn => processName == gn.ProcessName && !string.IsNullOrEmpty(gn.InstanceName)).First(); + } + + internal static IEnumerable GetProcesses(CloudTable instanceTable, string instanceName) + { + return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName && !string.IsNullOrEmpty(gn.ProcessName)); + } + + internal static bool ContainsRow(CloudTable instanceTable, ProcessTable entity) + { + var temp = GetAll(instanceTable); + + return temp.Where(gn => entity.Equals(gn)).Count() > 0; + } + + internal static bool ContainsInstance(CloudTable instanceTable, string instanceName) + { + var temp = GetAll(instanceTable); + + return temp.Where(gn => instanceName == gn.InstanceName).Count() > 0; + } + + } +} diff --git a/src/CRA.ClientLibrary/Tables/ProcessTableManager.cs b/src/CRA.ClientLibrary/Tables/ProcessTableManager.cs new file mode 100644 index 0000000..3a2bce2 --- /dev/null +++ b/src/CRA.ClientLibrary/Tables/ProcessTableManager.cs @@ -0,0 +1,85 @@ +using System.Collections.Generic; +using System.Linq; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Table; + +namespace CRA.ClientLibrary +{ + /// + /// An assignment of one machine to a group + /// + public class ProcessTableManager + { + private CloudTable _processTable; + + internal ProcessTableManager(string storageConnectionString) + { + var _storageAccount = CloudStorageAccount.Parse(storageConnectionString); + var _tableClient = _storageAccount.CreateCloudTableClient(); + _processTable = CreateTableIfNotExists("processtableforcra", _tableClient); + } + + internal void DeleteTable() + { + _processTable.DeleteIfExists(); + } + + internal bool ExistsProcess(string processName) + { + TableQuery query = new TableQuery() + .Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, processName)); + return _processTable.ExecuteQuery(query).Any(); + } + + internal void RegisterInstance(string instanceName, string address, int port) + { + TableOperation insertOperation = TableOperation.InsertOrReplace(new ProcessTable + (instanceName, "", "", address, port, "", "")); + _processTable.Execute(insertOperation); + } + + internal void RegisterProcess(string processName, string instanceName) + { + TableOperation insertOperation = TableOperation.InsertOrReplace(new ProcessTable + (instanceName, processName, "", "", 0, "", "")); + _processTable.Execute(insertOperation); + } + + internal void DeleteInstance(string instanceName) + { + var newRow = new DynamicTableEntity(instanceName, ""); + newRow.ETag = "*"; + TableOperation deleteOperation = TableOperation.Delete(newRow); + _processTable.Execute(deleteOperation); + } + + internal ProcessTable GetRowForProcess(string processName) + { + return ProcessTable.GetAll(_processTable).Where(gn => processName == gn.ProcessName && !string.IsNullOrEmpty(gn.InstanceName)).First(); + } + + internal ProcessTable GetRowForInstanceProcess(string instanceName, string processName) + { + return ProcessTable.GetAll(_processTable).Where(gn => instanceName == gn.InstanceName && processName == gn.ProcessName).First(); + } + + private static CloudTable CreateTableIfNotExists(string tableName, CloudTableClient _tableClient) + { + CloudTable table = _tableClient.GetTableReference(tableName); + try + { + table.CreateIfNotExists(); + } + catch { } + + return table; + } + + internal List GetProcessNames() + { + TableQuery query = new TableQuery() + .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.NotEqual, "")); + return _processTable.ExecuteQuery(query).Select(e => e.ProcessName).ToList(); + } + } +} diff --git a/src/CRA.ClientLibrary/Utilities/AssemblyResolver.cs b/src/CRA.ClientLibrary/Utilities/AssemblyResolver.cs new file mode 100644 index 0000000..50c1c2b --- /dev/null +++ b/src/CRA.ClientLibrary/Utilities/AssemblyResolver.cs @@ -0,0 +1,61 @@ +using System; +using System.Threading; +using System.Reflection; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics.Contracts; + +#pragma warning disable 420 + +namespace CRA.ClientLibrary +{ + internal static class AssemblyResolver + { + private static readonly ConcurrentDictionary assemblies = + new ConcurrentDictionary(); + + private static volatile int handlerRegistered; + + public static IEnumerable RegisteredAssemblies + { + get + { + return assemblies.Keys; + } + } + + public static bool ContainsAssembly(string name) + { + Contract.Requires(name != null); + return assemblies.ContainsKey(name); + } + + public static byte[] GetAssemblyBytes(string name) + { + Contract.Requires(name != null); + return assemblies[name]; + } + + public static void Register(string name, byte[] assembly) + { + Contract.Requires(name != null); + Contract.Requires(assembly != null); + assemblies.TryAdd(name, assembly); + if (handlerRegistered == 0 && + Interlocked.CompareExchange(ref handlerRegistered, 1, 0) == 0) + { + AppDomain.CurrentDomain.AssemblyResolve += Resolver; + } + } + + private static Assembly Resolver(object sender, ResolveEventArgs arguments) + { + byte[] assemblyBytes; + if (assemblies.TryGetValue(arguments.Name, out assemblyBytes)) + { + return Assembly.Load(assemblyBytes); + } + return null; + } + } +} diff --git a/src/CRA.ClientLibrary/Utilities/AssemblyResolverClient.cs b/src/CRA.ClientLibrary/Utilities/AssemblyResolverClient.cs new file mode 100644 index 0000000..f0415ec --- /dev/null +++ b/src/CRA.ClientLibrary/Utilities/AssemblyResolverClient.cs @@ -0,0 +1,45 @@ +using System.Reflection; +using System.Collections.Generic; +using System.Diagnostics.Contracts; + +namespace CRA.ClientLibrary +{ + internal static class AssemblyResolverClient + { + internal struct ApplicationAssembly + { + public string Name; + + public string FileName; + } + + private static void GetApplicationAssemblies( + AssemblyName current, List assemblies, HashSet exclude) + { + Contract.Requires(current != null); + Contract.Requires(assemblies != null); + Contract.Requires(exclude != null); + + // If this assembly is in the "exclude" set, then done. + var name = current.FullName; + if (exclude.Contains(name)) + { + return; + } + + // Add to "exclude" set so this assembly isn't re-added if it is referenced additional times. + exclude.Add(name); + + // Recursively add any assemblies that "current" references. + var assembly = Assembly.Load(current); + foreach (var referenced in assembly.GetReferencedAssemblies()) + { + GetApplicationAssemblies(referenced, assemblies, exclude); + } + + // Lastly, now that all dependant (referenced) assemblies are added, add the "current" assembly. + assemblies.Add(new ApplicationAssembly { Name = name, FileName = assembly.Location }); + } + + } +} diff --git a/src/CRA.ClientLibrary/Utilities/AssemblyUtils.cs b/src/CRA.ClientLibrary/Utilities/AssemblyUtils.cs new file mode 100644 index 0000000..9e6561a --- /dev/null +++ b/src/CRA.ClientLibrary/Utilities/AssemblyUtils.cs @@ -0,0 +1,280 @@ +using System; +using System.IO; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Collections.Generic; +using System.Diagnostics.Contracts; +using System.Globalization; +using System.Runtime.Serialization; + +namespace CRA.ClientLibrary +{ + internal static class AssemblyUtils + { + internal struct ApplicationAssembly + { + public string Name; + + public string FileName; + } + + [DataContract] + internal struct UserDLLsInfo + { + [DataMember] + public string UserDLLsBuffer { get; set; } + + [DataMember] + public string UserDLLsBufferInfo { get; set; } + } + + public static string[] GetExcludedAssemblies() + { + string[] excludedAssemblies = new[] + { + "mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Core, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "Microsoft.Azure.KeyVault.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "Microsoft.VisualStudio.HostingProcess.Utilities, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "mscorlib, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Configuration, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Xml, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Data.SqlXml, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Security, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Drawing, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Accessibility, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Deployment, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Runtime.Serialization.Formatters.Soap, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.VisualStudio.HostingProcess.Utilities.Sync, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.VisualStudio.Debugger.Runtime, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "vshost32, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Xml.Linq, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Runtime.Serialization, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.ServiceModel.Internals, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "SMDiagnostics, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Data.DataSetExtensions, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Data, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Transactions, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.EnterpriseServices, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.DirectoryServices, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Runtime.Remoting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Web, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Web.RegularExpressions, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Design, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Data.OracleClient, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Drawing.Design, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Web.ApplicationServices, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.ComponentModel.DataAnnotations, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.DirectoryServices.Protocols, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Runtime.Caching, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.ServiceProcess, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Configuration.Install, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Web.Services, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.Build.Utilities.v4.0, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.Build.Framework, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Xaml, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "Microsoft.Build.Tasks.v4.0, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Numerics, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "Microsoft.CSharp, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Dynamic, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Net.Http, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.WindowsAzure.Storage, Version=8.1.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "Microsoft.Data.Services.Client, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "Microsoft.Data.Edm, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "Microsoft.Data.OData, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.Spatial, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed", + "Newtonsoft.Json, Version=8.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed", + "Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed", + + "Remote.Linq, Version=5.1.0.0, Culture=neutral, PublicKeyToken=null", + "Aqua, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null", + "Anonymously Hosted DynamicMethods Assembly, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null", + "Remote.Linq, Version=5.3.1.0, Culture=neutral, PublicKeyToken=null", + "Aqua, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null", + "Aqua.TypeSystem.Emit.Types, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null", + "Remote.Linq, Version=5.4.0.0, Culture=neutral, PublicKeyToken=null", + "Aqua, Version=4.0.0.0, Culture=neutral, PublicKeyToken=null", + + "System.ServiceModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.IdentityModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "Microsoft.Transactions.Bridge, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.IdentityModel.Selectors, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089", + "System.Messaging, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Runtime.DurableInstancing, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.ServiceModel.Activation, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.ServiceModel.Activities, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.Activities, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "Microsoft.VisualBasic.Activities.Compiler, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.VisualBasic, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Management, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "Microsoft.JScript, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a", + "System.Activities.DurableInstancing, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "System.Xaml.Hosting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35", + "CRA.ClientLibrary, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null", + }; + return excludedAssemblies; + } + + + public static ApplicationAssembly[] GetRelatedApplicationAssemblies( + string assemblyNamePrefix, params string[] excludedAssemblies) + { + Contract.Requires(excludedAssemblies != null); + + var assemblies = new List(); + var exclude = new HashSet(excludedAssemblies); + + var applicationAssemblies = AppDomain.CurrentDomain.GetAssemblies(); + foreach (var applicationAssembly in applicationAssemblies + .Where(v => v.GetName().FullName.StartsWith( + assemblyNamePrefix))) + { + GetApplicationAssemblies(applicationAssembly.GetName(), assemblies, exclude); + } + + return assemblies.ToArray(); + } + + private static void GetApplicationAssemblies(AssemblyName current, + List assemblies, HashSet exclude) + { + Contract.Requires(current != null); + Contract.Requires(assemblies != null); + Contract.Requires(exclude != null); + + var name = current.FullName; + if (exclude.Contains(name)) + { + return; + } + // Console.WriteLine("\"" + name + "\","); + + exclude.Add(name); + + try + { + var assembly = Assembly.Load(current); + foreach (var referenced in assembly.GetReferencedAssemblies()) + { + GetApplicationAssemblies(referenced, assemblies, exclude); + } + + assemblies.Add(new ApplicationAssembly { Name = name, FileName = assembly.Location }); + } + catch (Exception e) + { + Console.WriteLine(e.ToString()); + } + } + + public static Dictionary AssembliesFromString(string assembliesString, + string assembliesStringInfo) + { + Dictionary udfAssemblies = new Dictionary(); + + if (!assembliesStringInfo.Equals("$")) + { + string[] parts = assembliesStringInfo.Split('@'); + + MemoryStream assembliesStream = new MemoryStream( + Convert.FromBase64String(assembliesString)); + BinaryReader binaryReader = new BinaryReader(assembliesStream); + for (int i = 0; i < parts.Length - 1; i = i + 2) + { + int assemblyNameSize = int.Parse(parts[i], CultureInfo.InvariantCulture); + int assemblyBytesSize = int.Parse(parts[i + 1], CultureInfo.InvariantCulture); + string assemblyName = Encoding.UTF8.GetString( + binaryReader.ReadBytes(assemblyNameSize)); + byte[] assemblyBytes = binaryReader.ReadBytes(assemblyBytesSize); + udfAssemblies.Add(assemblyName, assemblyBytes); + } + } + + return udfAssemblies; + } + + public static void WriteAssembliesToStream(Stream stream) + { + var relatedAssemblies = GetRelatedApplicationAssemblies("", + GetExcludedAssemblies()); + + stream.WriteInteger(relatedAssemblies.Length); + + foreach (var assembly in relatedAssemblies) + { + byte[] assemblyNameBytes = Encoding.UTF8.GetBytes(assembly.Name); + stream.WriteByteArray(assemblyNameBytes); + + + FileStream fs = new FileStream(assembly.FileName, FileMode.Open, FileAccess.Read); + + using (BinaryReader br = new BinaryReader(fs)) + { + byte[] assemblyFileBytes = br.ReadBytes(Convert.ToInt32(fs.Length)); + stream.WriteByteArray(assemblyFileBytes); + } + } + } + + public static void LoadAssembliesFromStream(Stream stream) + { + int numAssemblies = stream.ReadInteger(); + + for (int i=0; i userDLLsBuffer = new List(); + userDLLsBuffer.Add((byte)'$'); + userDLLsInfo.UserDLLsBuffer = Convert.ToBase64String(userDLLsBuffer.ToArray()); + userDLLsInfo.UserDLLsBufferInfo = "$"; + } + else + { + List userDLLsBuffer = new List(); + StringBuilder userDLLsBufferInfo = new StringBuilder(); + + var relatedAssemblies = GetRelatedApplicationAssemblies(userLibraryPrefix, + GetExcludedAssemblies()); + foreach (var assembly in relatedAssemblies) + { + byte[] assemblyNameBytes = Encoding.UTF8.GetBytes(assembly.Name); + userDLLsBuffer.AddRange(assemblyNameBytes); + + FileStream fs = new FileStream(assembly.FileName, FileMode.Open, FileAccess.Read); + + using (BinaryReader br = new BinaryReader(fs)) + { + byte[] assemblyFileBytes = br.ReadBytes(Convert.ToInt32(fs.Length)); + userDLLsBuffer.AddRange(assemblyFileBytes); + + userDLLsBufferInfo.Append(assembly.Name.Length); + userDLLsBufferInfo.Append('@'); + userDLLsBufferInfo.Append(assemblyFileBytes.Length); + userDLLsBufferInfo.Append('@'); + } + } + userDLLsInfo.UserDLLsBuffer = Convert.ToBase64String(userDLLsBuffer.ToArray()); + userDLLsInfo.UserDLLsBufferInfo = userDLLsBufferInfo.ToString(); + } + return userDLLsInfo; + } + } +} diff --git a/src/CRA.ClientLibrary/Utilities/ClosureEliminator.cs b/src/CRA.ClientLibrary/Utilities/ClosureEliminator.cs new file mode 100644 index 0000000..db127d5 --- /dev/null +++ b/src/CRA.ClientLibrary/Utilities/ClosureEliminator.cs @@ -0,0 +1,40 @@ +using System.Reflection; +using System.Linq.Expressions; + +namespace CRA.ClientLibrary +{ + internal class ClosureEliminator : ExpressionVisitor + { + protected override Expression VisitMember(MemberExpression node) + { + if ((node.Expression != null) && (node.Expression.NodeType == ExpressionType.Constant)) + { + object target = ((ConstantExpression)node.Expression).Value, value; + switch (node.Member.MemberType) + { + case MemberTypes.Property: + value = ((PropertyInfo)node.Member).GetValue(target, null); + break; + case MemberTypes.Field: + value = ((FieldInfo)node.Member).GetValue(target); + break; + default: + value = target = null; + break; + } + if (target != null) + { + if (value.GetType().IsSubclassOf(typeof(Expression))) + { + return this.Visit(value as Expression); + } + else + { + return Expression.Constant(value, node.Type); + } + } + } + return base.VisitMember(node); + } + } +} diff --git a/src/CRA.ClientLibrary/Utilities/SerializationHelper.cs b/src/CRA.ClientLibrary/Utilities/SerializationHelper.cs new file mode 100644 index 0000000..57b9884 --- /dev/null +++ b/src/CRA.ClientLibrary/Utilities/SerializationHelper.cs @@ -0,0 +1,70 @@ +using System.Linq.Expressions; +using Newtonsoft.Json; +using Remote.Linq; +using Remote.Linq.ExpressionVisitors; +using System; + +namespace CRA.ClientLibrary +{ + internal class SerializationHelper + { + private SerializationHelper() + { + + } + + private static readonly JsonSerializerSettings _serializerSettings + = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto, + NullValueHandling = NullValueHandling.Ignore, + TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full + }; + + /// + /// Serializes a LINQ expression. + /// + /// The expression. + /// The serialized expression. + internal static string Serialize(Expression expression) + { + var toSerialize = expression.ToRemoteLinqExpression() + .ReplaceGenericQueryArgumentsByNonGenericArguments(); + return JsonConvert.SerializeObject(toSerialize, _serializerSettings); + } + + /// + /// Deserializes a LINQ expression. + /// + /// The serialized expression. + /// The expression. + internal static Expression Deserialize(string expression) + { + var deserialized = JsonConvert.DeserializeObject( + expression, _serializerSettings); + var ret = deserialized.ReplaceNonGenericQueryArgumentsByGenericArguments() + .ToLinqExpression(); + return ret; + } + + internal static string SerializeObject(object obj) + { + if (obj == null) + { + return JsonConvert.SerializeObject(obj, _serializerSettings); + } + var tmp = new ObjectWrapper + { + type = obj.GetType().AssemblyQualifiedName, + data = JsonConvert.SerializeObject(obj, _serializerSettings) + }; + + return JsonConvert.SerializeObject(tmp, typeof(ObjectWrapper), _serializerSettings); + } + + internal static object DeserializeObject(string obj) + { + if (obj == "null") return null; + var ow = JsonConvert.DeserializeObject(obj, _serializerSettings); + return JsonConvert.DeserializeObject(ow.data, Type.GetType(ow.type), _serializerSettings); + } + } +} diff --git a/src/CRA.ClientLibrary/Utilities/StreamCommunicator.cs b/src/CRA.ClientLibrary/Utilities/StreamCommunicator.cs new file mode 100644 index 0000000..1b4a338 --- /dev/null +++ b/src/CRA.ClientLibrary/Utilities/StreamCommunicator.cs @@ -0,0 +1,139 @@ +using System; +using System.IO; + +namespace CRA.ClientLibrary +{ + /// + /// Stream communication primitives + /// + public static class StreamCommunicator + { + /// + /// Read integer fixed size + /// + /// + /// + public static int ReadIntegerFixed(this Stream stream) + { + var value = new byte[4]; + stream.ReadAllRequiredBytes(value, 0, value.Length); + int intValue = value[0] + | (int)value[1] << 0x8 + | (int)value[2] << 0x10 + | (int)value[3] << 0x18; + return intValue; + } + + /// + /// Write integer fixed size + /// + /// + /// + public static void WriteIntegerFixed(this Stream stream, int value) + { + stream.WriteByte((byte)(value & 0xFF)); + stream.WriteByte((byte)((value >> 0x8) & 0xFF)); + stream.WriteByte((byte)((value >> 0x10) & 0xFF)); + stream.WriteByte((byte)((value >> 0x18) & 0xFF)); + } + + /// + /// Read integer compressed + /// + /// + /// + public static int ReadInteger(this Stream stream) + { + var currentByte = (uint)stream.ReadByte(); + byte read = 1; + uint result = currentByte & 0x7FU; + int shift = 7; + while ((currentByte & 0x80) != 0) + { + currentByte = (uint)stream.ReadByte(); + read++; + result |= (currentByte & 0x7FU) << shift; + shift += 7; + if (read > 5) + { + throw new InvalidOperationException("Invalid integer value in the input stream."); + } + } + return (int)((-(result & 1)) ^ ((result >> 1) & 0x7FFFFFFFU)); + } + + /// + /// Write integer compressed + /// + /// + /// + public static void WriteInteger(this Stream stream, int value) + { + var zigZagEncoded = unchecked((uint)((value << 1) ^ (value >> 31))); + while ((zigZagEncoded & ~0x7F) != 0) + { + stream.WriteByte((byte)((zigZagEncoded | 0x80) & 0xFF)); + zigZagEncoded >>= 7; + } + stream.WriteByte((byte)zigZagEncoded); + } + + /// + /// Write byte array + /// + /// + /// + public static void WriteByteArray(this Stream stream, byte[] value) + { + if (value == null) + { + throw new ArgumentNullException("value"); + } + + stream.WriteInteger(value.Length); + if (value.Length > 0) + { + stream.Write(value, 0, value.Length); + } + } + + /// + /// Read byte array + /// + /// + /// + public static byte[] ReadByteArray(this Stream stream) + { + int arraySize = stream.ReadInteger(); + var array = new byte[arraySize]; + if (arraySize > 0) + { + stream.ReadAllRequiredBytes(array, 0, array.Length); + } + return array; + } + + /// + /// Read all required bytes + /// + /// + /// + /// + /// + /// + internal static int ReadAllRequiredBytes(this Stream stream, byte[] buffer, int offset, int count) + { + int toRead = count; + int currentOffset = offset; + int currentRead; + do + { + currentRead = stream.Read(buffer, currentOffset, toRead); + currentOffset += currentRead; + toRead -= currentRead; + } + while (toRead > 0 && currentRead != 0); + return currentOffset - offset; + } + } +} diff --git a/src/CRA.ClientLibrary/app.config b/src/CRA.ClientLibrary/app.config new file mode 100644 index 0000000..5f60adc --- /dev/null +++ b/src/CRA.ClientLibrary/app.config @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/src/CRA.ClientLibrary/packages.config b/src/CRA.ClientLibrary/packages.config new file mode 100644 index 0000000..453d6c0 --- /dev/null +++ b/src/CRA.ClientLibrary/packages.config @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/CRA.Worker/App.config b/src/CRA.Worker/App.config new file mode 100644 index 0000000..803a94c --- /dev/null +++ b/src/CRA.Worker/App.config @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/CRA.Worker/CRA.Worker.csproj b/src/CRA.Worker/CRA.Worker.csproj new file mode 100644 index 0000000..91db861 --- /dev/null +++ b/src/CRA.Worker/CRA.Worker.csproj @@ -0,0 +1,67 @@ + + + + + Debug + AnyCPU + {85331F0D-B724-45D7-9BB2-AC703E73DE3B} + Exe + CRA.Worker + CRA.Worker + v4.5.2 + 512 + true + + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + false + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + false + + + + + + + + + + + + + + + + + + + Designer + + + + + {ef23eb6a-e329-496d-9b7a-8cad66ea4e3a} + CRA.ClientLibrary + + + + + IF EXIST $(ProjectDir)privatesettings.config copy $(ProjectDir)privatesettings.config $(ProjectDir)$(OutDir) + + \ No newline at end of file diff --git a/src/CRA.Worker/Program.cs b/src/CRA.Worker/Program.cs new file mode 100644 index 0000000..02d1259 --- /dev/null +++ b/src/CRA.Worker/Program.cs @@ -0,0 +1,52 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Configuration; +using System.Diagnostics; +using CRA.ClientLibrary; + +namespace CRA.Worker +{ + class Program + { + static void Main(string[] args) + { + //Console.WriteLine("Press ENTER to start"); + //Console.ReadLine(); + + if (args.Length != 3) + { + Console.WriteLine("Worker for Common Runtime for Applications (CRA)\nUsage: CRA.Worker.exe instancename (e.g., instance1) ipaddress (e.g., 127.0.0.1) port (e.g., 11000)"); + return; + } + + if (args[1] == "null") + { + args[1] = GetLocalIPAddress(); + } + Debug.WriteLine("Worker instance name is: " + args[0]); + Debug.WriteLine("Using IP address: " + args[1] + " and port " + Convert.ToInt32(args[2])); + Debug.WriteLine("Using Azure connection string: " + ConfigurationManager.AppSettings.Get("StorageConnectionString")); + + var worker = new CRAWorker + (args[0], args[1], Convert.ToInt32(args[2]), + ConfigurationManager.AppSettings.Get("StorageConnectionString")); + + + worker.Start(); + } + + private static string GetLocalIPAddress() + { + var host = Dns.GetHostEntry(Dns.GetHostName()); + foreach (var ip in host.AddressList) + { + if (ip.AddressFamily == AddressFamily.InterNetwork) + { + return ip.ToString(); + } + } + throw new Exception("Local IP Address Not Found!"); + } + } +} diff --git a/src/CRA.Worker/Properties/AssemblyInfo.cs b/src/CRA.Worker/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..feac22a --- /dev/null +++ b/src/CRA.Worker/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("CRA.Worker")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("CRA.Worker")] +[assembly: AssemblyCopyright("Copyright © 2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("85331f0d-b724-45d7-9bb2-ac703e73de3b")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/CRA.Worker/privatesettings.config.example b/src/CRA.Worker/privatesettings.config.example new file mode 100644 index 0000000..58050a8 --- /dev/null +++ b/src/CRA.Worker/privatesettings.config.example @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/src/CRA.sln b/src/CRA.sln new file mode 100644 index 0000000..534e045 --- /dev/null +++ b/src/CRA.sln @@ -0,0 +1,43 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26403.7 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CRA.ClientLibrary", "CRA.ClientLibrary\CRA.ClientLibrary.csproj", "{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CRA.Worker", "CRA.Worker\CRA.Worker.csproj", "{85331F0D-B724-45D7-9BB2-AC703E73DE3B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConnectionPair", "Samples\ConnectionPair\ConnectionPair.csproj", "{00F97B37-965A-4F20-B5B2-E182D20509C2}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{EC027445-85E2-4FD3-83F3-224CA6481849}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{AF66150E-52D6-402F-BA4D-D3FDDA71C73F}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Release|Any CPU.Build.0 = Release|Any CPU + {85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Release|Any CPU.Build.0 = Release|Any CPU + {00F97B37-965A-4F20-B5B2-E182D20509C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {00F97B37-965A-4F20-B5B2-E182D20509C2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {00F97B37-965A-4F20-B5B2-E182D20509C2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {00F97B37-965A-4F20-B5B2-E182D20509C2}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A} = {AF66150E-52D6-402F-BA4D-D3FDDA71C73F} + {85331F0D-B724-45D7-9BB2-AC703E73DE3B} = {AF66150E-52D6-402F-BA4D-D3FDDA71C73F} + {00F97B37-965A-4F20-B5B2-E182D20509C2} = {EC027445-85E2-4FD3-83F3-224CA6481849} + EndGlobalSection +EndGlobal diff --git a/src/Samples/ConnectionPair/App.config b/src/Samples/ConnectionPair/App.config new file mode 100644 index 0000000..20ae585 --- /dev/null +++ b/src/Samples/ConnectionPair/App.config @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Samples/ConnectionPair/ConnectionPair.csproj b/src/Samples/ConnectionPair/ConnectionPair.csproj new file mode 100644 index 0000000..2da90c4 --- /dev/null +++ b/src/Samples/ConnectionPair/ConnectionPair.csproj @@ -0,0 +1,65 @@ + + + + + Debug + AnyCPU + {00F97B37-965A-4F20-B5B2-E182D20509C2} + Exe + ConnectionPair + ConnectionPair + v4.5.2 + 512 + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + + + + {ef23eb6a-e329-496d-9b7a-8cad66ea4e3a} + CRA.ClientLibrary + + + + + IF EXIST $(ProjectDir)privatesettings.config copy $(ProjectDir)privatesettings.config $(ProjectDir)$(OutDir) + + \ No newline at end of file diff --git a/src/Samples/ConnectionPair/MyAsyncInput.cs b/src/Samples/ConnectionPair/MyAsyncInput.cs new file mode 100644 index 0000000..ea8c821 --- /dev/null +++ b/src/Samples/ConnectionPair/MyAsyncInput.cs @@ -0,0 +1,45 @@ +using System; +using System.Threading.Tasks; +using System.IO; +using System.Threading; +using CRA.ClientLibrary; + +namespace ConnectionPair +{ + public class MyAsyncInput : IAsyncProcessInputEndpoint + { + bool _running = true; + IProcess _process; + + public MyAsyncInput(IProcess process) + { + _process = process; + } + + public void Dispose() + { + Console.WriteLine("Disposing MyInput"); + _running = false; + } + + public async Task FromOutputAsync(IProcessOutputEndpoint p, CancellationToken token) + { + throw new NotImplementedException(); + } + + public async Task FromStreamAsync(Stream s, string otherProcess, string otherEndpoint, CancellationToken token) + { + Console.WriteLine("Receiving data from process: " + otherProcess + ", endpoint: " + otherEndpoint); + + for (int i = 0; i < int.MaxValue; i++) + { + int val = s.ReadInteger(); + Console.WriteLine("Read value: " + val); + token.ThrowIfCancellationRequested(); + + if (!_running) break; + } + } + } + +} diff --git a/src/Samples/ConnectionPair/MyAsyncOutput.cs b/src/Samples/ConnectionPair/MyAsyncOutput.cs new file mode 100644 index 0000000..4aeac38 --- /dev/null +++ b/src/Samples/ConnectionPair/MyAsyncOutput.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading.Tasks; +using System.IO; +using System.Threading; +using CRA.ClientLibrary; + +namespace ConnectionPair +{ + public class MyAsyncOutput : IAsyncProcessOutputEndpoint + { + bool _running = true; + IProcess _process; + + public MyAsyncOutput(IProcess process) + { + _process = process; + } + + public void Dispose() + { + Console.WriteLine("Disposing MyOutput"); + _running = false; + } + + public async Task ToInputAsync(IProcessInputEndpoint p, CancellationToken token) + { + throw new NotImplementedException(); + } + + public async Task ToStreamAsync(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token) + { + Console.WriteLine("Sending data to process: " + otherProcess + ", endpoint: " + otherEndpoint); + + for (int i = 0; i < int.MaxValue; i += 1) + { + for (int j = 0; j < 1; j++) + { + stream.WriteInteger(i + j); + Console.WriteLine("Written value: " + (i + j)); + } + + + for (int j = 0; j < 1; j++) + { + Thread.Sleep(1000); + token.ThrowIfCancellationRequested(); + } + + if (!_running) break; + } + } + } + +} diff --git a/src/Samples/ConnectionPair/MyFirstProcess.cs b/src/Samples/ConnectionPair/MyFirstProcess.cs new file mode 100644 index 0000000..34c9e99 --- /dev/null +++ b/src/Samples/ConnectionPair/MyFirstProcess.cs @@ -0,0 +1,18 @@ +using CRA.ClientLibrary; + +namespace ConnectionPair +{ + public class MyFirstProcess : ProcessBase + { + public MyFirstProcess() : base() + { + } + + public override void Initialize(object param) + { + AddAsyncOutputEndpoint("firstoutput", new MyAsyncOutput(this)); + AddAsyncInputEndpoint("firstinput", new MyAsyncInput(this)); + base.Initialize(param); + } + } +} diff --git a/src/Samples/ConnectionPair/MySecondProcess.cs b/src/Samples/ConnectionPair/MySecondProcess.cs new file mode 100644 index 0000000..07a1935 --- /dev/null +++ b/src/Samples/ConnectionPair/MySecondProcess.cs @@ -0,0 +1,18 @@ +using CRA.ClientLibrary; + +namespace ConnectionPair +{ + public class MySecondProcess : ProcessBase + { + public MySecondProcess() : base() + { + } + public override void Initialize(object param) + { + AddAsyncInputEndpoint("secondinput", new MyAsyncInput(this)); + AddAsyncOutputEndpoint("secondoutput", new MyAsyncOutput(this)); + base.Initialize(param); + } + } + +} diff --git a/src/Samples/ConnectionPair/Program.cs b/src/Samples/ConnectionPair/Program.cs new file mode 100644 index 0000000..fedcf2d --- /dev/null +++ b/src/Samples/ConnectionPair/Program.cs @@ -0,0 +1,24 @@ +using System; +using CRA.ClientLibrary; + +namespace ConnectionPair +{ + class Program + { + static void Main(string[] args) + { + var client = new CRAClientLibrary(); + + client.DefineProcess("myfirstprocess", () => new MyFirstProcess()); + client.DefineProcess("mysecondprocess", () => new MySecondProcess()); + + client.InstantiateProcess("instance1", "myprocess1", "myfirstprocess", null); + client.InstantiateProcess("instance2", "myprocess2", "mysecondprocess", null); + + client.Connect("myprocess1", "firstoutput", "myprocess2", "secondinput"); + client.Connect("myprocess2", "secondoutput", "myprocess1", "firstinput"); + + Console.ReadLine(); + } + } +} diff --git a/src/Samples/ConnectionPair/Properties/AssemblyInfo.cs b/src/Samples/ConnectionPair/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..1f785ad --- /dev/null +++ b/src/Samples/ConnectionPair/Properties/AssemblyInfo.cs @@ -0,0 +1,35 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("ConnectionPair")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("ConnectionPair")] +[assembly: AssemblyCopyright("Copyright © 2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("00f97b37-965a-4f20-b5b2-e182d20509c2")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/Samples/ConnectionPair/privatesettings.config.example b/src/Samples/ConnectionPair/privatesettings.config.example new file mode 100644 index 0000000..58050a8 --- /dev/null +++ b/src/Samples/ConnectionPair/privatesettings.config.example @@ -0,0 +1,3 @@ + + + \ No newline at end of file