Initial commit of Common Runtime for Applications (CRA).

This commit is contained in:
Badrish Chandramouli 2017-05-09 19:26:49 -07:00
Родитель c06d32baf5
Коммит 25f634a22e
47 изменённых файлов: 4252 добавлений и 0 удалений

63
.gitattributes поставляемый Normal file
Просмотреть файл

@ -0,0 +1,63 @@
###############################################################################
# Set default behavior to automatically normalize line endings.
###############################################################################
* text=auto
###############################################################################
# Set default behavior for command prompt diff.
#
# This is need for earlier builds of msysgit that does not have it on by
# default for csharp files.
# Note: This is only used by command line
###############################################################################
#*.cs diff=csharp
###############################################################################
# Set the merge driver for project and solution files
#
# Merging from the command prompt will add diff markers to the files if there
# are conflicts (Merging from VS is not affected by the settings below, in VS
# the diff markers are never inserted). Diff markers may cause the following
# file extensions to fail to load in VS. An alternative would be to treat
# these files as binary and thus will always conflict and require user
# intervention with every merge. To do so, just uncomment the entries below
###############################################################################
#*.sln merge=binary
#*.csproj merge=binary
#*.vbproj merge=binary
#*.vcxproj merge=binary
#*.vcproj merge=binary
#*.dbproj merge=binary
#*.fsproj merge=binary
#*.lsproj merge=binary
#*.wixproj merge=binary
#*.modelproj merge=binary
#*.sqlproj merge=binary
#*.wwaproj merge=binary
###############################################################################
# behavior for image files
#
# image files are treated as binary by default.
###############################################################################
#*.jpg binary
#*.png binary
#*.gif binary
###############################################################################
# diff behavior for common document formats
#
# Convert binary document formats to text before diffing them. This feature
# is only available from the command line. Turn it on by uncommenting the
# entries below.
###############################################################################
#*.doc diff=astextplain
#*.DOC diff=astextplain
#*.docx diff=astextplain
#*.DOCX diff=astextplain
#*.dot diff=astextplain
#*.DOT diff=astextplain
#*.pdf diff=astextplain
#*.PDF diff=astextplain
#*.rtf diff=astextplain
#*.RTF diff=astextplain

1
.gitignore поставляемый
Просмотреть файл

@ -250,3 +250,4 @@ paket-files/
# JetBrains Rider
.idea/
*.sln.iml
privatesettings.config

Просмотреть файл

@ -0,0 +1,111 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>CRA.ClientLibrary</RootNamespace>
<AssemblyName>CRA.ClientLibrary</AssemblyName>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Aqua, Version=3.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\aqua-core.3.0.0\lib\net45\Aqua.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Azure.KeyVault.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.KeyVault.Core.1.0.0\lib\net40\Microsoft.Azure.KeyVault.Core.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Data.Edm, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Data.Edm.5.8.2\lib\net40\Microsoft.Data.Edm.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Data.OData, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Data.OData.5.8.2\lib\net40\Microsoft.Data.OData.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Data.Services.Client, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Data.Services.Client.5.8.2\lib\net40\Microsoft.Data.Services.Client.dll</HintPath>
</Reference>
<Reference Include="Microsoft.WindowsAzure.Storage, Version=8.1.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\WindowsAzure.Storage.8.1.1\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Remote.Linq, Version=5.3.1.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Remote.Linq.5.3.1\lib\net45\Remote.Linq.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
<Reference Include="System.Core" />
<Reference Include="System.Runtime.Serialization" />
<Reference Include="System.Spatial, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\System.Spatial.5.8.2\lib\net40\System.Spatial.dll</HintPath>
</Reference>
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Main\CRAClientLibrary.cs" />
<Compile Include="Definitions\ConnectionInfo.cs" />
<Compile Include="Definitions\ConnectionInitiator.cs" />
<Compile Include="Main\CRAWorker.cs" />
<Compile Include="Definitions\CoralTaskMessageType.cs" />
<Compile Include="Definitions\ErrorCodes.cs" />
<Compile Include="Processes\IAsyncProcessInputEndpoint.cs" />
<Compile Include="Processes\IAsyncProcessOutputEndpoint.cs" />
<Compile Include="Processes\IProcess.cs" />
<Compile Include="Processes\IProcessInputEndpoint.cs" />
<Compile Include="Processes\IProcessOutputEndpoint.cs" />
<Compile Include="Processes\ProcessBase.cs" />
<Compile Include="Processes\DetachedProcess.cs" />
<Compile Include="Definitions\ObjectWrapper.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Tables\ConnectionTable.cs" />
<Compile Include="Tables\ConnectionTableManager.cs" />
<Compile Include="Tables\EndpointTable.cs" />
<Compile Include="Tables\EndpointTableManager.cs" />
<Compile Include="Tables\ProcessTable.cs" />
<Compile Include="Tables\ProcessTableManager.cs" />
<Compile Include="Utilities\AssemblyResolver.cs" />
<Compile Include="Utilities\AssemblyResolverClient.cs" />
<Compile Include="Utilities\AssemblyUtils.cs" />
<Compile Include="Utilities\ClosureEliminator.cs" />
<Compile Include="Utilities\SerializationHelper.cs" />
<Compile Include="Utilities\StreamCommunicator.cs" />
</ItemGroup>
<ItemGroup>
<None Include="app.config">
<SubType>Designer</SubType>
</None>
<None Include="packages.config">
<SubType>Designer</SubType>
</None>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>

Просмотреть файл

@ -0,0 +1,72 @@
namespace CRA.ClientLibrary
{
/// <summary>
/// Describes a connection between two process/endpoint pairs
/// </summary>
public class ConnectionInfo
{
/// <summary>
/// Connection is from this process
/// </summary>
public string FromProcess { get; set; }
/// <summary>
/// Connection is from this output endpoint
/// </summary>
public string FromEndpoint { get; set; }
/// <summary>
/// Connection is to this process
/// </summary>
public string ToProcess { get; set; }
/// <summary>
/// Connection is to this input endpoint
/// </summary>
public string ToEndpoint { get; set; }
/// <summary>
/// Constructor
/// </summary>
/// <param name="FromProcess"></param>
/// <param name="FromEndpoint"></param>
/// <param name="ToProcess"></param>
/// <param name="ToEndpoint"></param>
public ConnectionInfo(string FromProcess, string FromEndpoint, string ToProcess, string ToEndpoint)
{
this.FromProcess = FromProcess;
this.FromEndpoint = FromEndpoint;
this.ToProcess = ToProcess;
this.ToEndpoint = ToEndpoint;
}
/// <summary>
/// String representation of a CRA conection
/// </summary>
/// <returns></returns>
public override string ToString()
{
return new { FromProcess = FromProcess, FromEndpoint = FromEndpoint, ToProcess = ToProcess, ToEndpoint = ToEndpoint }.ToString();
}
/// <summary>
/// Check if two instances are equal
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
var otherConnectionInfo = (ConnectionInfo)obj;
if (otherConnectionInfo == null) return false;
return
(FromProcess == otherConnectionInfo.FromProcess) &&
(ToProcess == otherConnectionInfo.ToProcess) &&
(FromEndpoint == otherConnectionInfo.FromEndpoint) &&
(ToEndpoint == otherConnectionInfo.ToEndpoint);
}
public override int GetHashCode()
{
return FromProcess.GetHashCode() ^ FromEndpoint.GetHashCode() ^ ToProcess.GetHashCode() ^ ToEndpoint.GetHashCode();
}
}
}

Просмотреть файл

@ -0,0 +1,18 @@
namespace CRA.ClientLibrary
{
/// <summary>
/// Direction of data flow
/// </summary>
public enum ConnectionInitiator
{
/// <summary>
/// Initiate connection from "from" process
/// </summary>
FromSide,
/// <summary>
/// Initiate connection from "to" process
/// </summary>
ToSide
}
}

Просмотреть файл

@ -0,0 +1,12 @@
namespace CRA.ClientLibrary
{
internal enum CRATaskMessageType
{
LOAD_PROCESS,
CONNECT_PROCESS_INITIATOR,
CONNECT_PROCESS_RECEIVER,
CONNECT_PROCESS_INITIATOR_REVERSE,
CONNECT_PROCESS_RECEIVER_REVERSE,
};
}

Просмотреть файл

@ -0,0 +1,45 @@
namespace CRA.ClientLibrary
{
/// <summary>
/// Error codes for CRA method calls
/// </summary>
public enum CRAErrorCode : int
{
/// <summary>
/// Success
/// </summary>
Success,
/// <summary>
/// Process not found
/// </summary>
ProcessNotFound,
/// <summary>
/// Process output endpoint not found
/// </summary>
ProcessOutputNotFound,
/// <summary>
/// Process input endpoint not found
/// </summary>
ProcessInputNotFound,
/// <summary>
/// Process already exists
/// </summary>
ProcessAlreadyExists,
/// <summary>
/// Recovering
/// </summary>
ServerRecovering,
/// <summary>
/// Race condition adding connection
/// </summary>
ConnectionAdditionRace,
/// <summary>
/// Process endpoint (input or output) not found
/// </summary>
ProcessEndpointNotFound,
/// <summary>
/// Failed to establish a connection
/// </summary>
ConnectionEstablishFailed
};
}

Просмотреть файл

@ -0,0 +1,8 @@
namespace CRA.ClientLibrary
{
internal struct ObjectWrapper
{
public string type;
public string data;
}
}

Просмотреть файл

@ -0,0 +1,531 @@
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq.Expressions;
using System.Net.Sockets;
using System.Text;
using System.Text.RegularExpressions;
namespace CRA.ClientLibrary
{
/// <summary>
/// Client library for Common Runtime for Applications (CRA)
/// </summary>
public class CRAClientLibrary
{
CRAWorker _localWorker;
// Azure storage clients
string _storageConnectionString;
CloudStorageAccount _storageAccount;
CloudBlobClient _blobClient;
CloudTableClient _tableClient;
CloudTable _processTable;
CloudTable _connectionTable;
internal ProcessTableManager _processTableManager;
EndpointTableManager _endpointTableManager;
ConnectionTableManager _connectionTableManager;
Type aquaType = typeof(Aqua.TypeSystem.ConstructorInfo);
/// <summary>
/// Create an instance of the client library for Common Runtime for Applications (CRA)
/// </summary>
public CRAClientLibrary() : this("", null)
{
}
/// <summary>
/// Create an instance of the client library for Common Runtime for Applications (CRA)
/// </summary>
/// <param name="storageConnectionString">Optional storage account to use for CRA metadata, if
/// not specified, it will use the appSettings key named StorageConnectionString in app.config</param>
public CRAClientLibrary(string storageConnectionString) : this(storageConnectionString, null)
{
}
/// <summary>
/// Create an instance of the client library for Common Runtime for Applications (CRA)
/// </summary>
/// <param name="storageConnectionString">Optional storage account to use for CRA metadata, if
/// not specified, it will use the appSettings key named StorageConnectionString in app.config</param>
/// <param name = "localWorker" >Local worker if any</param>
public CRAClientLibrary(string storageConnectionString, CRAWorker localWorker)
{
_localWorker = localWorker;
if (storageConnectionString == "")
_storageConnectionString = ConfigurationManager.AppSettings.Get("StorageConnectionString");
else
_storageConnectionString = storageConnectionString;
_storageAccount = CloudStorageAccount.Parse(_storageConnectionString);
_blobClient = _storageAccount.CreateCloudBlobClient();
_tableClient = _storageAccount.CreateCloudTableClient();
_processTableManager = new ProcessTableManager(_storageConnectionString);
_endpointTableManager = new EndpointTableManager(_storageConnectionString);
_connectionTableManager = new ConnectionTableManager(_storageConnectionString);
_processTable = CreateTableIfNotExists("processtableforcra");
_connectionTable = CreateTableIfNotExists("connectiontableforcra");
}
/// <summary>
/// Define a process type and register with CRA.
/// </summary>
/// <param name="processDefinition">Name of the process type</param>
/// <param name="creator">Lambda that describes how to instantiate the process, taking in an object as parameter</param>
public CRAErrorCode DefineProcess(string processDefinition, Expression<Func<IProcess>> creator)
{
if (!Regex.IsMatch(processDefinition, @"^(([a-z\d]((-(?=[a-z\d]))|([a-z\d])){2,62})|(\$root))$"))
{
throw new InvalidOperationException("Invalid name for process definition. Names have to be all lowercase, cannot contain special characters.");
}
CloudBlobContainer container = _blobClient.GetContainerReference(processDefinition);
container.CreateIfNotExists();
var blockBlob = container.GetBlockBlobReference("binaries");
CloudBlobStream blobStream = blockBlob.OpenWrite();
AssemblyUtils.WriteAssembliesToStream(blobStream);
blobStream.Close();
// Add metadata
var newRow = new ProcessTable("", processDefinition, processDefinition, "", 0, creator, null);
TableOperation insertOperation = TableOperation.InsertOrReplace(newRow);
_processTable.Execute(insertOperation);
return CRAErrorCode.Success;
}
/// <summary>
/// Resets the cluster and deletes all knowledge of any CRA instances
/// </summary>
public void Reset()
{
_connectionTable.DeleteIfExists();
_processTable.DeleteIfExists();
_endpointTableManager.DeleteTable();
}
/// <summary>
/// Not yet implemented
/// </summary>
/// <param name="instanceName"></param>
public void DeployInstance(string instanceName)
{
throw new NotImplementedException();
}
/// <summary>
/// Instantiate a process on a CRA instance.
/// </summary>
/// <param name="instanceName">Name of the CRA instance on which process is instantiated</param>
/// <param name="processName">Name of the process (particular instance)</param>
/// <param name="processDefinition">Definition of the process (type)</param>
/// <param name="processParameter">Parameters to be passed to the process in its constructor (serializable object)</param>
/// <returns>Status of the command</returns>
public CRAErrorCode InstantiateProcess(string instanceName, string processName, string processDefinition, object processParameter)
{
var procDefRow = ProcessTable.GetRowForProcessDefinition(_processTable, processDefinition);
// Add metadata
var newRow = new ProcessTable(instanceName, processName, processDefinition, "", 0,
procDefRow.ProcessCreateAction,
SerializationHelper.SerializeObject(processParameter));
TableOperation insertOperation = TableOperation.InsertOrReplace(newRow);
_processTable.Execute(insertOperation);
CRAErrorCode result = CRAErrorCode.Success;
ProcessTable instanceRow;
try
{
instanceRow = ProcessTable.GetRowForInstance(_processTable, instanceName);
// Send request to CRA instance
TcpClient client = new TcpClient(instanceRow.Address, instanceRow.Port);
NetworkStream stream = client.GetStream();
stream.WriteInteger((int)CRATaskMessageType.LOAD_PROCESS);
stream.WriteByteArray(Encoding.UTF8.GetBytes(processName));
stream.WriteByteArray(Encoding.UTF8.GetBytes(processDefinition));
stream.WriteByteArray(Encoding.UTF8.GetBytes(newRow.ProcessParameter));
result = (CRAErrorCode) stream.ReadInteger();
if (result != 0)
{
Console.WriteLine("Process was logically loaded. However, we received an error code from the hosting CRA instance: " + result);
}
}
catch
{
Console.WriteLine("The CRA instance appears to be down. Restart it and this process will be instantiated automatically");
}
return result;
}
/// <summary>
/// Register caller as a process with given name, dummy temp instance
/// </summary>
/// <param name="processName"></param>
/// <returns></returns>
public DetachedProcess RegisterAsProcess(string processName)
{
return new DetachedProcess(processName, "", this);
}
/// <summary>
/// Register caller as a process with given name, given CRA instance name
/// </summary>
/// <param name="processName"></param>
/// <param name="instanceName"></param>
/// <returns></returns>
public DetachedProcess RegisterAsProcess(string processName, string instanceName)
{
return new DetachedProcess(processName, instanceName, this);
}
/// <summary>
/// Register CRA instance name
/// </summary>
/// <param name="instanceName"></param>
/// <param name="address"></param>
/// <param name="port"></param>
public void RegisterInstance(string instanceName, string address, int port)
{
_processTableManager.RegisterInstance(instanceName, address, port);
}
/// <summary>
/// Delete CRA instance name
/// </summary>
/// <param name="instanceName"></param>
public void DeleteInstance(string instanceName)
{
_processTableManager.DeleteInstance(instanceName);
}
/// <summary>
///
/// </summary>
/// <param name="processName"></param>
/// <param name="instanceName"></param>
public void DeleteProcess(string processName, string instanceName)
{
var entity = new DynamicTableEntity(instanceName, processName);
entity.ETag = "*";
TableOperation deleteOperation = TableOperation.Delete(entity);
_processTable.Execute(deleteOperation);
}
/// <summary>
/// Add endpoint to the appropriate CRA metadata table
/// </summary>
/// <param name="processName"></param>
/// <param name="endpointName"></param>
/// <param name="isInput"></param>
/// <param name="isAsync"></param>
public void AddEndpoint(string processName, string endpointName, bool isInput, bool isAsync)
{
_endpointTableManager.AddEndpoint(processName, endpointName, isInput, isAsync);
}
/// <summary>
/// Delete endpoint
/// </summary>
/// <param name="processName"></param>
/// <param name="endpointName"></param>
public void DeleteEndpoint(string processName, string endpointName)
{
_endpointTableManager.DeleteEndpoint(processName, endpointName);
}
/// <summary>
/// Load a process on the local instance
/// </summary>
/// <param name="processName"></param>
/// <param name="processDefinition"></param>
/// <param name="processParameter"></param>
/// <param name="instanceName"></param>
/// <param name="table"></param>
/// <returns></returns>
public IProcess LoadProcess(string processName, string processDefinition, string processParameter, string instanceName, ConcurrentDictionary<string, IProcess> table)
{
CloudBlobContainer container = _blobClient.GetContainerReference(processDefinition);
container.CreateIfNotExists();
var blockBlob = container.GetBlockBlobReference("binaries");
Stream blobStream = blockBlob.OpenRead();
AssemblyUtils.LoadAssembliesFromStream(blobStream);
blobStream.Close();
var row = ProcessTable.GetRowForProcessDefinition(_processTable, processDefinition);
// CREATE THE PROCESS
var process = row.GetProcessCreateAction()();
// LATCH CALLBACKS TO POPULATE ENDPOINT TABLE
process.OnAddInputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, true, false));
process.OnAddOutputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, false, false));
process.OnAddAsyncInputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, true, true));
process.OnAddAsyncOutputEndpoint((name, endpt) => _endpointTableManager.AddEndpoint(processName, name, false, true));
//ADD TO TABLE
if (table != null)
{
table.AddOrUpdate(processName, process, (procName, oldProc) => { oldProc.Dispose(); return process; });
process.OnDispose(() =>
{
// Delete all endpoints of the process
foreach (var key in process.InputEndpoints)
{
_endpointTableManager.DeleteEndpoint(processName, key.Key);
}
foreach (var key in process.AsyncInputEndpoints)
{
_endpointTableManager.DeleteEndpoint(processName, key.Key);
}
foreach (var key in process.OutputEndpoints)
{
_endpointTableManager.DeleteEndpoint(processName, key.Key);
}
foreach (var key in process.AsyncOutputEndpoints)
{
_endpointTableManager.DeleteEndpoint(processName, key.Key);
}
IProcess old;
if (!table.TryRemove(processName, out old))
{
Console.WriteLine("Unable to remove process on disposal");
}
var entity = new DynamicTableEntity(instanceName, processName);
entity.ETag = "*";
TableOperation deleteOperation = TableOperation.Delete(entity);
_processTable.Execute(deleteOperation);
});
}
// INITIALIZE
if ((ProcessBase)process != null)
{
((ProcessBase)process).SetProcessName(processName);
((ProcessBase)process).SetClientLibrary(this);
}
var par = SerializationHelper.DeserializeObject(processParameter);
process.Initialize(par);
return process;
}
/// <summary>
/// Load all processes for the given instance name.
/// </summary>
/// <param name="thisInstanceName"></param>
/// <returns></returns>
public ConcurrentDictionary<string, IProcess> LoadAllProcesses(string thisInstanceName)
{
ConcurrentDictionary<string, IProcess> result = new ConcurrentDictionary<string, IProcess>();
var rows = ProcessTable.GetAllRowsForInstance(_processTable, thisInstanceName);
foreach (var row in rows)
{
if (row.ProcessName == "") continue;
LoadProcess(row.ProcessName, row.ProcessDefinition, row.ProcessParameter, thisInstanceName, result);
}
return result;
}
/// <summary>
/// Add connection info to metadata table
/// </summary>
/// <param name="fromProcessName"></param>
/// <param name="fromEndpoint"></param>
/// <param name="toProcessName"></param>
/// <param name="toEndpoint"></param>
public void AddConnectionInfo(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint)
{
_connectionTableManager.AddConnection(fromProcessName, fromEndpoint, toProcessName, toEndpoint);
}
/// <summary>
/// Delete connection info from metadata table
/// </summary>
/// <param name="fromProcessName"></param>
/// <param name="fromEndpoint"></param>
/// <param name="toProcessName"></param>
/// <param name="toEndpoint"></param>
public void DeleteConnectionInfo(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint)
{
_connectionTableManager.DeleteConnection(fromProcessName, fromEndpoint, toProcessName, toEndpoint);
}
/// <summary>
/// Connect one CRA process to another, via pre-defined endpoints. We contact the "from" process
/// to initiate the creation of the link.
/// </summary>
/// <param name="fromProcessName">Name of the process from which connection is being made</param>
/// <param name="fromEndpoint">Name of the endpoint on the fromProcess, from which connection is being made</param>
/// <param name="toProcessName">Name of the process to which connection is being made</param>
/// <param name="toEndpoint">Name of the endpoint on the toProcess, to which connection is being made</param>
/// <param name="direction">Which process initiates the connection</param>
/// <returns>Status of the Connect operation</returns>
public CRAErrorCode Connect(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint, ConnectionInitiator direction = ConnectionInitiator.FromSide)
{
// Tell from process to establish connection
// Send request to CRA instance
// Get instance for source process
var _row = direction == ConnectionInitiator.FromSide ?
ProcessTable.GetRowForProcess(_processTable, fromProcessName) :
ProcessTable.GetRowForProcess(_processTable, toProcessName);
// Check that process and endpoints are valid and existing
if (!_processTableManager.ExistsProcess(fromProcessName) || !_processTableManager.ExistsProcess(toProcessName))
{
return CRAErrorCode.ProcessNotFound;
}
// Make the connection information stable
_connectionTableManager.AddConnection(fromProcessName, fromEndpoint, toProcessName, toEndpoint);
// We now try best-effort to tell the CRA instance of this connection
CRAErrorCode result = CRAErrorCode.Success;
if (_localWorker != null)
{
if (_localWorker.InstanceName == _row.InstanceName)
{
return _localWorker.Connect_InitiatorSide(fromProcessName, fromEndpoint,
toProcessName, toEndpoint, direction == ConnectionInitiator.ToSide);
}
}
// Send request to CRA instance
TcpClient client = null;
try
{
// Get address and port for instance, using row with process = ""
var row = ProcessTable.GetRowForInstance(_processTable, _row.InstanceName);
client = new TcpClient(row.Address, row.Port);
NetworkStream stream = client.GetStream();
if (direction == ConnectionInitiator.FromSide)
stream.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_INITIATOR);
else
stream.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_INITIATOR_REVERSE);
stream.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessName));
stream.WriteByteArray(Encoding.UTF8.GetBytes(fromEndpoint));
stream.WriteByteArray(Encoding.UTF8.GetBytes(toProcessName));
stream.WriteByteArray(Encoding.UTF8.GetBytes(toEndpoint));
result = (CRAErrorCode) stream.ReadInteger();
if (result != 0)
{
Console.WriteLine("Connection was logically established. However, the client received an error code from the connection-initiating CRA instance: " + result);
}
}
catch
{
Console.WriteLine("The connection-initiating CRA instance appears to be down or could not be found. Restart it and this connection will be completed automatically");
}
return (CRAErrorCode)result;
}
/// <summary>
/// Get a list of all output endpoint names for a given process
/// </summary>
/// <param name="processName"></param>
/// <returns></returns>
public IEnumerable<string> GetOutputEndpoints(string processName)
{
return _endpointTableManager.GetOutputEndpoints(processName);
}
/// <summary>
/// Get a list of all input endpoint names for a given process
/// </summary>
/// <param name="processName"></param>
/// <returns></returns>
public IEnumerable<string> GetInputEndpoints(string processName)
{
return _endpointTableManager.GetInputEndpoints(processName);
}
/// <summary>
/// Get all outgoing connection from a given process
/// </summary>
/// <param name="processName"></param>
/// <returns></returns>
public IEnumerable<ConnectionInfo> GetConnectionsFromProcess(string processName)
{
return _connectionTableManager.GetConnectionsFromProcess(processName);
}
/// <summary>
/// Get all incoming connections to a given process
/// </summary>
/// <param name="processName"></param>
/// <returns></returns>
public IEnumerable<ConnectionInfo> GetConnectionsToProcess(string processName)
{
return _connectionTableManager.GetConnectionsToProcess(processName);
}
/// <summary>
/// Gets a list of all processes registered with CRA
/// </summary>
/// <returns></returns>
public IEnumerable<string> GetProcessNames()
{
return _processTableManager.GetProcessNames();
}
private CloudTable CreateTableIfNotExists(string tableName)
{
CloudTable table = _tableClient.GetTableReference(tableName);
try
{
table.CreateIfNotExists();
}
catch { }
return table;
}
/// <summary>
/// Disconnect a CRA connection
/// </summary>
/// <param name="fromProcessName"></param>
/// <param name="fromProcessOutput"></param>
/// <param name="toProcessName"></param>
/// <param name="toProcessInput"></param>
public void Disconnect(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput)
{
_connectionTableManager.DeleteConnection(fromProcessName, fromProcessOutput, toProcessName, toProcessInput);
}
}
}

Просмотреть файл

@ -0,0 +1,587 @@
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CRA.ClientLibrary
{
/// <summary>
/// Worker library for Common Runtime for Applications (CRA)
/// </summary>
public class CRAWorker
{
CRAClientLibrary _craClient;
string _workerinstanceName;
string _address;
int _port;
// Azure storage clients
string _storageConnectionString;
CloudStorageAccount _storageAccount;
CloudBlobClient _blobClient;
CloudTableClient _tableClient;
CloudTable _workerInstanceTable;
CloudTable _connectionTable;
// Timer updateTimer
ConcurrentDictionary<string, IProcess> _localProcessTable = new ConcurrentDictionary<string, IProcess>();
ConcurrentDictionary<string, CancellationTokenSource> inConnections = new ConcurrentDictionary<string, CancellationTokenSource>();
ConcurrentDictionary<string, CancellationTokenSource> outConnections = new ConcurrentDictionary<string, CancellationTokenSource>();
/// <summary>
/// Instance name
/// </summary>
public string InstanceName { get { return _workerinstanceName; } }
/// <summary>
/// Define a new worker instance of Common Runtime for Applications (CRA)
/// </summary>
/// <param name="workerInstanceName">Name of the worker instance</param>
/// <param name="address">IP address</param>
/// <param name="port">Port</param>
/// <param name="storageConnectionString">Storage account to store metadata</param>
public CRAWorker(string workerInstanceName, string address, int port, string storageConnectionString)
{
_craClient = new CRAClientLibrary(storageConnectionString, this);
_workerinstanceName = workerInstanceName;
_address = address;
_port = port;
_storageConnectionString = storageConnectionString;
_storageAccount = CloudStorageAccount.Parse(_storageConnectionString);
_blobClient = _storageAccount.CreateCloudBlobClient();
_tableClient = _storageAccount.CreateCloudTableClient();
_workerInstanceTable = CreateTableIfNotExists("processtableforcra");
_connectionTable = CreateTableIfNotExists("connectiontableforcra");
}
/// <summary>
/// Start the CRA worker. This method does not return.
/// </summary>
public void Start()
{
// Update process table
_craClient.RegisterInstance(_workerinstanceName, _address, _port);
// Restore processes on machine - not connections between processes
// This ensures others can establish connections to it as soon as
// as we start the server
RestoreProcesses();
Thread serverThread = new Thread(StartServer);
serverThread.Start();
// Restore connections to/from the processes on this machine
RestoreConnections(null);
// Wait for server to complete execution
serverThread.Join();
}
private void StartServer()
{
var server = new TcpListener(IPAddress.Parse(_address), _port);
// Start listening for client requests.
server.Start();
while (true)
{
Debug.WriteLine("Waiting for a connection... ");
TcpClient client = server.AcceptTcpClient();
Debug.WriteLine("Connected!");
// Get a stream object for reading and writing
NetworkStream stream = client.GetStream();
CRATaskMessageType message = (CRATaskMessageType)stream.ReadInteger();
switch (message)
{
case CRATaskMessageType.LOAD_PROCESS:
Task.Run(() => LoadProcess(stream));
break;
case CRATaskMessageType.CONNECT_PROCESS_INITIATOR:
Task.Run(() => ConnectProcess_Initiator(stream, false));
break;
case CRATaskMessageType.CONNECT_PROCESS_RECEIVER:
Task.Run(() => ConnectProcess_Receiver(stream, false));
break;
case CRATaskMessageType.CONNECT_PROCESS_INITIATOR_REVERSE:
Task.Run(() => ConnectProcess_Initiator(stream, true));
break;
case CRATaskMessageType.CONNECT_PROCESS_RECEIVER_REVERSE:
Task.Run(() => ConnectProcess_Receiver(stream, true));
break;
default:
Console.WriteLine("Unknown message type: " + message);
break;
}
}
}
private void LoadProcess(object streamObject)
{
var stream = (Stream)streamObject;
string processName = Encoding.UTF8.GetString(stream.ReadByteArray());
string processDefinition = Encoding.UTF8.GetString(stream.ReadByteArray());
string processParam = Encoding.UTF8.GetString(stream.ReadByteArray());
_craClient.LoadProcess(processName, processDefinition, processParam, _workerinstanceName, _localProcessTable);
stream.WriteInteger(0);
stream.Close();
}
private void ConnectProcess_Initiator(object streamObject, bool reverse = false)
{
var stream = (Stream)streamObject;
string fromProcessName = Encoding.UTF8.GetString(stream.ReadByteArray());
string fromProcessOutput = Encoding.UTF8.GetString(stream.ReadByteArray());
string toProcessName = Encoding.UTF8.GetString(stream.ReadByteArray());
string toProcessInput = Encoding.UTF8.GetString(stream.ReadByteArray());
Debug.WriteLine("Processing request to initiate connection");
if (!reverse)
{
if (!_localProcessTable.ContainsKey(fromProcessName))
{
stream.WriteInteger((int)CRAErrorCode.ProcessNotFound);
stream.Close();
return;
}
if (!_localProcessTable[fromProcessName].OutputEndpoints.ContainsKey(fromProcessOutput) &&
!_localProcessTable[fromProcessName].AsyncOutputEndpoints.ContainsKey(fromProcessOutput)
)
{
stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound);
stream.Close();
return;
}
}
else
{
if (!_localProcessTable.ContainsKey(toProcessName))
{
stream.WriteInteger((int)CRAErrorCode.ProcessNotFound);
stream.Close();
return;
}
if (!_localProcessTable[toProcessName].InputEndpoints.ContainsKey(toProcessInput) &&
!_localProcessTable[toProcessName].AsyncInputEndpoints.ContainsKey(toProcessInput)
)
{
stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound);
stream.Close();
return;
}
}
var result = Connect_InitiatorSide(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse);
stream.WriteInteger((int)result);
stream.Close();
}
internal CRAErrorCode Connect_InitiatorSide(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse, bool killIfExists = true, bool killRemote = true)
{
CancellationTokenSource oldSource;
var conn = reverse ? inConnections : outConnections;
if (conn.TryGetValue(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput,
out oldSource))
{
if (killIfExists)
{
Debug.WriteLine("Deleting prior connection - it will automatically reconnect");
oldSource.Cancel();
}
return CRAErrorCode.Success;
}
// Need to get the latest address & port
var row = reverse ? ProcessTable.GetRowForProcess(_workerInstanceTable, fromProcessName) : ProcessTable.GetRowForProcess(_workerInstanceTable, toProcessName);
// Send request to CRA instance
TcpClient client = null;
NetworkStream ns = null;
try
{
var _row = ProcessTable.GetRowForInstanceProcess(_workerInstanceTable, row.InstanceName, "");
client = new TcpClient(_row.Address, _row.Port);
ns = client.GetStream();
}
catch
{
return CRAErrorCode.ConnectionEstablishFailed;
}
if (!reverse)
ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER);
else
ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER_REVERSE);
ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessName));
ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessOutput));
ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessName));
ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessInput));
ns.WriteInteger(killRemote ? 1 : 0);
CRAErrorCode result = (CRAErrorCode) ns.ReadInteger();
if (result != 0)
{
Debug.WriteLine("Client received error code: " + result);
}
else
{
CancellationTokenSource source = new CancellationTokenSource();
if (!reverse)
{
if (outConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source))
{
Task.Run(() =>
EgressToStream(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, ns, source));
}
else
{
source.Dispose();
ns.Close();
Console.WriteLine("Race adding connection - deleting outgoing stream");
return CRAErrorCode.ConnectionAdditionRace;
}
}
else
{
if (inConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source))
{
Task.Run(() => IngressFromStream
(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, ns, source));
}
else
{
source.Dispose();
ns.Close();
Debug.WriteLine("Race adding connection - deleting outgoing stream");
return CRAErrorCode.ConnectionAdditionRace;
}
}
}
return result;
}
private async Task EgressToStream(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput,
bool reverse, Stream ns, CancellationTokenSource source)
{
try
{
if (_localProcessTable[fromProcessName].OutputEndpoints.ContainsKey(fromProcessOutput))
{
await
Task.Run(() =>
_localProcessTable[fromProcessName].OutputEndpoints[fromProcessOutput]
.ToStream(ns, toProcessName, toProcessInput, source.Token), source.Token);
}
else if (_localProcessTable[fromProcessName].AsyncOutputEndpoints.ContainsKey(fromProcessOutput))
{
await _localProcessTable[fromProcessName].AsyncOutputEndpoints[fromProcessOutput].ToStreamAsync(ns, toProcessName, toProcessInput, source.Token);
}
else
{
Debug.WriteLine("Unable to find output endpoint (on from side)");
return;
}
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out oldSource))
{
oldSource.Dispose();
ns.Dispose();
_craClient.Disconnect(fromProcessName, fromProcessOutput, toProcessName, toProcessInput);
}
}
catch (Exception e)
{
Debug.WriteLine("Exception (" + e.ToString() + ") in outgoing stream - reconnecting");
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out oldSource))
{
oldSource.Dispose();
}
else
{
Debug.WriteLine("Unexpected: caught exception in ToStream but entry absent in outConnections");
}
// Retry following while connection not in list
RetryRestoreConnection(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, false);
}
}
private void ConnectProcess_Receiver(object streamObject, bool reverse = false)
{
var stream = (Stream)streamObject;
string fromProcessName = Encoding.UTF8.GetString(stream.ReadByteArray());
string fromProcessOutput = Encoding.UTF8.GetString(stream.ReadByteArray());
string toProcessName = Encoding.UTF8.GetString(stream.ReadByteArray());
string toProcessInput = Encoding.UTF8.GetString(stream.ReadByteArray());
bool killIfExists = stream.ReadInteger() == 1 ? true : false;
if (!reverse)
{
if (!_localProcessTable.ContainsKey(toProcessName))
{
stream.WriteInteger((int)CRAErrorCode.ProcessNotFound);
stream.Close();
return;
}
if (!_localProcessTable[toProcessName].InputEndpoints.ContainsKey(toProcessInput) &&
!_localProcessTable[toProcessName].AsyncInputEndpoints.ContainsKey(toProcessInput)
)
{
stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound);
stream.Close();
return;
}
}
else
{
if (!_localProcessTable.ContainsKey(fromProcessName))
{
stream.WriteInteger((int)CRAErrorCode.ProcessNotFound);
stream.Close();
return;
}
if (!_localProcessTable[fromProcessName].OutputEndpoints.ContainsKey(fromProcessOutput) &&
!_localProcessTable[fromProcessName].AsyncOutputEndpoints.ContainsKey(fromProcessOutput)
)
{
stream.WriteInteger((int)CRAErrorCode.ProcessInputNotFound);
stream.Close();
return;
}
}
var result = Connect_ReceiverSide(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, stream, reverse, killIfExists);
// Do not close and dispose stream because it is being reused for data
if (result != 0)
{
stream.Close();
}
}
private int Connect_ReceiverSide(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, Stream stream, bool reverse, bool killIfExists = true)
{
CancellationTokenSource oldSource;
var conn = reverse ? outConnections : inConnections;
if (conn.TryGetValue(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput,
out oldSource))
{
if (killIfExists)
{
Debug.WriteLine("Deleting prior connection - it will automatically reconnect");
oldSource.Cancel();
}
else
{
Debug.WriteLine("There exists prior connection - not killing");
}
stream.WriteInteger((int)CRAErrorCode.ServerRecovering);
return (int)CRAErrorCode.ServerRecovering;
}
else
{
stream.WriteInteger(0);
}
CancellationTokenSource source = new CancellationTokenSource();
if (!reverse)
{
if (inConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source))
{
Task.Run(() =>
IngressFromStream(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, stream, source));
return (int)CRAErrorCode.Success;
}
else
{
source.Dispose();
stream.Close();
Debug.WriteLine("Race adding connection - deleting incoming stream");
return (int)CRAErrorCode.ConnectionAdditionRace;
}
}
else
{
if (outConnections.TryAdd(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, source))
{
Task.Run(() =>
EgressToStream(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, stream, source));
return (int)CRAErrorCode.Success;
}
else
{
source.Dispose();
stream.Close();
Debug.WriteLine("Race adding connection - deleting incoming stream");
return (int)CRAErrorCode.ConnectionAdditionRace;
}
}
}
private async Task IngressFromStream(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse, Stream ns, CancellationTokenSource source)
{
try
{
if (_localProcessTable[toProcessName].InputEndpoints.ContainsKey(toProcessInput))
{
await Task.Run(
() => _localProcessTable[toProcessName].InputEndpoints[toProcessInput]
.FromStream(ns, fromProcessName, fromProcessOutput, source.Token), source.Token);
}
else if (_localProcessTable[toProcessName].AsyncInputEndpoints.ContainsKey(toProcessInput))
{
await _localProcessTable[toProcessName].AsyncInputEndpoints[toProcessInput].FromStreamAsync(ns, fromProcessName, fromProcessOutput, source.Token);
}
else
{
Debug.WriteLine("Unable to find input endpoint (on to side)");
return;
}
// Completed FromStream successfully
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out oldSource))
{
oldSource.Dispose();
ns.Dispose();
_craClient.Disconnect(fromProcessName, fromProcessOutput, toProcessName, toProcessInput);
}
}
catch (Exception e)
{
Debug.WriteLine("Exception (" + e.ToString() + ") in incoming stream - reconnecting");
CancellationTokenSource tokenSource;
if (inConnections.TryRemove(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput, out tokenSource))
{
tokenSource.Dispose();
}
else
{
Debug.WriteLine("Unexpected: caught exception in FromStream but entry absent in inConnections");
}
RetryRestoreConnection(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, true);
}
}
private void RestoreProcesses()
{
var rows = ProcessTable.GetAllRowsForInstance(_workerInstanceTable, _workerinstanceName);
foreach (var row in rows)
{
if (string.IsNullOrEmpty(row.ProcessName)) continue;
_craClient.LoadProcess(row.ProcessName, row.ProcessDefinition, row.ProcessParameter, _workerinstanceName, _localProcessTable);
}
}
private void RestoreConnections(object obj)
{
var rows = ProcessTable.GetAllRowsForInstance(_workerInstanceTable, _workerinstanceName);
foreach (var _row in rows)
{
if (string.IsNullOrEmpty(_row.ProcessName)) continue;
// Decide what to do if connection creation fails
var outRows = ConnectionTable.GetAllConnectionsFromProcess(_connectionTable, _row.ProcessName).ToList();
foreach (var row in outRows)
{
Task.Run(() => RetryRestoreConnection(row.FromProcess, row.FromEndpoint, row.ToProcess, row.ToEndpoint, false));
}
var inRows = ConnectionTable.GetAllConnectionsToProcess(_connectionTable, _row.ProcessName).ToList();
foreach (var row in inRows)
{
Task.Run(() => RetryRestoreConnection(row.FromProcess, row.FromEndpoint, row.ToProcess, row.ToEndpoint, true));
}
}
}
private void RetryRestoreConnection(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse)
{
var conn = reverse ? inConnections : outConnections;
bool killRemote = false;
while (!conn.ContainsKey(fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput))
{
if (!ConnectionTable.ContainsConnection(_connectionTable, fromProcessName, fromProcessOutput, toProcessName, toProcessInput))
break;
Debug.WriteLine("Connecting " + fromProcessName + ":" + fromProcessOutput + ":" + toProcessName + ":" + toProcessInput);
Debug.WriteLine("Connecting with killRemote set to " + (killRemote ? "true" : "false"));
var result = Connect_InitiatorSide(fromProcessName, fromProcessOutput, toProcessName, toProcessInput, reverse, false, killRemote);
if (result != 0)
{
if (result == CRAErrorCode.ServerRecovering)
killRemote = true;
Thread.Sleep(5000);
}
else
break;
}
}
private CloudTable CreateTableIfNotExists(string tableName)
{
CloudTable table = _tableClient.GetTableReference(tableName);
try
{
table.CreateIfNotExists();
}
catch { }
return table;
}
}
}

Просмотреть файл

@ -0,0 +1,340 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
namespace CRA.ClientLibrary
{
/// <summary>
/// All connections to/from this detached process
/// </summary>
public class ConnectionData
{
/// <summary>
/// Input endpoints
/// </summary>
public ConcurrentDictionary<ConnectionInfo, Stream> InputConnections { get; }
/// <summary>
/// Output endpoints
/// </summary>
public ConcurrentDictionary<ConnectionInfo, Stream> OutputConnections { get; }
/// <summary>
///
/// </summary>
public ConnectionData()
{
InputConnections = new ConcurrentDictionary<ConnectionInfo, Stream>();
OutputConnections = new ConcurrentDictionary<ConnectionInfo, Stream>();
}
}
/// <summary>
/// Endpoint information for process
/// </summary>
public class EndpointData
{
/// <summary>
/// Input endpoints
/// </summary>
public ConcurrentDictionary<string, bool> InputEndpoints { get; }
/// <summary>
/// Output endpoints
/// </summary>
public ConcurrentDictionary<string, bool> OutputEndpoints { get; }
/// <summary>
/// Constructor
/// </summary>
public EndpointData()
{
InputEndpoints = new ConcurrentDictionary<string, bool>();
OutputEndpoints = new ConcurrentDictionary<string, bool>();
}
}
/// <summary>
/// Process proxy for applications using CRA sideways
/// </summary>
public class DetachedProcess : IDisposable
{
/// <summary>
/// Connection data
/// </summary>
public ConnectionData ConnectionData { get; set; }
/// <summary>
/// Endpoint data
/// </summary>
public EndpointData EndpointData { get; set; }
private CRAClientLibrary _clientLibrary;
private string _processName;
private string _instanceName;
private bool _isEphemeralInstance;
/// <summary>
///
/// </summary>
/// <param name="processName"></param>
/// <param name="instanceName"></param>
/// <param name="clientLibrary"></param>
public DetachedProcess(string processName, string instanceName, CRAClientLibrary clientLibrary)
{
_processName = processName;
_clientLibrary = clientLibrary;
if (instanceName == "")
{
// _clientLibrary._processTableManager.GetRowForProcess(processName);
_instanceName = RandomString(16);
_isEphemeralInstance = true;
_clientLibrary.RegisterInstance(_instanceName, "", 0);
}
_clientLibrary._processTableManager.RegisterProcess(_processName, _instanceName);
EndpointData = new EndpointData();
ConnectionData = new ConnectionData();
}
/// <summary>
/// Add input endpoint
/// </summary>
/// <param name="endpointName">Endpoint name</param>
public void AddInputEndpoint(string endpointName)
{
_clientLibrary.AddEndpoint(_processName, endpointName, true, false);
EndpointData.InputEndpoints.TryAdd(endpointName, true);
}
/// <summary>
/// Add output endpoint
/// </summary>
/// <param name="endpointName">Endpoint name</param>
public void AddOutputEndpoint(string endpointName)
{
_clientLibrary.AddEndpoint(_processName, endpointName, false, false);
EndpointData.OutputEndpoints.TryAdd(endpointName, true);
}
/// <summary>
/// Create connection stream from remote output endpoint to local input endpoint
/// </summary>
/// <param name="localInputEndpointName"></param>
/// <param name="remoteProcess"></param>
/// <param name="remoteOutputEndpoint"></param>
/// <returns></returns>
public Stream FromRemoteOutputEndpointStream(string localInputEndpointName, string remoteProcess, string remoteOutputEndpoint)
{
AddInputEndpoint(localInputEndpointName);
_clientLibrary.AddConnectionInfo(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpointName);
var stream = Connect_InitiatorSide(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpointName, true);
var conn = new ConnectionInfo(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpointName);
ConnectionData.InputConnections.AddOrUpdate(conn, stream, (c, s1) => { s1?.Dispose(); return stream; });
return stream;
}
/// <summary>
/// Create connection stream from local output endpoint to remote input endpoint
/// </summary>
/// <param name="localOutputEndpointName"></param>
/// <param name="remoteProcess"></param>
/// <param name="remoteInputEndpoint"></param>
/// <returns></returns>
public Stream ToRemoteInputEndpointStream(string localOutputEndpointName, string remoteProcess, string remoteInputEndpoint)
{
AddOutputEndpoint(localOutputEndpointName);
_clientLibrary.AddConnectionInfo(_processName, localOutputEndpointName, remoteProcess, remoteInputEndpoint);
var stream = Connect_InitiatorSide(_processName, localOutputEndpointName, remoteProcess, remoteInputEndpoint, false);
var conn = new ConnectionInfo(_processName, localOutputEndpointName, remoteProcess, remoteInputEndpoint);
ConnectionData.OutputConnections.AddOrUpdate(conn, stream, (c, s1) => { s1?.Dispose(); return stream; });
return stream;
}
/// <summary>
/// Restore information about endpoints of this detached process
/// </summary>
/// <returns></returns>
public void RestoreEndpointData()
{
foreach (var endpt in _clientLibrary.GetInputEndpoints(_processName))
{
EndpointData.InputEndpoints.TryAdd(endpt, true);
}
foreach (var endpt in _clientLibrary.GetOutputEndpoints(_processName))
{
EndpointData.OutputEndpoints.TryAdd(endpt, true);
}
}
/// <summary>
/// Restore all connections from/to this process, in the CRA connection graph
/// </summary>
/// <returns></returns>
public void RestoreAllConnections()
{
foreach (var outConn in _clientLibrary.GetConnectionsFromProcess(_processName))
{
var stream = ToRemoteInputEndpointStream(outConn.FromEndpoint, outConn.ToProcess, outConn.ToEndpoint);
ConnectionData.OutputConnections.AddOrUpdate(outConn, stream, (c, s1) => { s1?.Dispose(); return stream; });
}
foreach (var inConn in _clientLibrary.GetConnectionsToProcess(_processName))
{
var stream = FromRemoteOutputEndpointStream(inConn.ToEndpoint, inConn.FromProcess, inConn.FromEndpoint);
ConnectionData.OutputConnections.AddOrUpdate(inConn, stream, (c, s1) => { s1?.Dispose(); return stream; });
}
}
/// <summary>
/// Restore a process/instance pair
/// </summary>
public void Restore()
{
RestoreEndpointData();
RestoreAllConnections();
}
/// <summary>
/// Restore all connections from/to this process that are set to 'null' locally, in the CRA connection graph
/// </summary>
/// <returns></returns>
public ConnectionData RestoreNullConnections()
{
foreach (var outConn in ConnectionData.OutputConnections)
{
if (outConn.Value == null)
{
var stream = ToRemoteInputEndpointStream(outConn.Key.FromEndpoint, outConn.Key.ToProcess, outConn.Key.ToEndpoint);
ConnectionData.OutputConnections[outConn.Key] = stream;
}
}
foreach (var inConn in ConnectionData.InputConnections)
{
if (inConn.Value == null)
{
var stream = FromRemoteOutputEndpointStream(inConn.Key.ToEndpoint, inConn.Key.FromProcess, inConn.Key.FromEndpoint);
ConnectionData.InputConnections[inConn.Key] = stream;
}
}
return ConnectionData;
}
/// <summary>
/// Dispose the detached process
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposed)
{
if (disposed)
{
if (_isEphemeralInstance)
{
_clientLibrary.DeleteInstance(_instanceName);
}
_clientLibrary.DeleteProcess(_processName, _instanceName);
foreach (var endpt in EndpointData.InputEndpoints.Keys)
{
_clientLibrary.DeleteEndpoint(_processName, endpt);
}
foreach (var endpt in EndpointData.OutputEndpoints.Keys)
{
_clientLibrary.DeleteEndpoint(_processName, endpt);
}
EndpointData.InputEndpoints.Clear();
EndpointData.OutputEndpoints.Clear();
foreach (var kvp in ConnectionData.InputConnections)
{
_clientLibrary.DeleteConnectionInfo(kvp.Key.FromProcess, kvp.Key.FromEndpoint, kvp.Key.ToProcess, kvp.Key.ToEndpoint);
if (kvp.Value != null) kvp.Value.Dispose();
}
foreach (var kvp in ConnectionData.OutputConnections)
{
_clientLibrary.DeleteConnectionInfo(kvp.Key.FromProcess, kvp.Key.FromEndpoint, kvp.Key.ToProcess, kvp.Key.ToEndpoint);
if (kvp.Value != null) kvp.Value.Dispose();
}
ConnectionData.InputConnections.Clear();
ConnectionData.OutputConnections.Clear();
}
}
private static Random random = new Random();
private static string RandomString(int length)
{
const string chars = "abcdefghijklmnopqrstuvwxyz";
return new string(Enumerable.Repeat(chars, length)
.Select(s => s[random.Next(s.Length)]).ToArray());
}
private Stream Connect_InitiatorSide(string fromProcessName, string fromProcessOutput, string toProcessName, string toProcessInput, bool reverse)
{
bool killRemote = true; // we have no way of receiving connections
var _processTableManager = _clientLibrary._processTableManager;
// Need to get the latest address & port
var row = reverse ? _processTableManager.GetRowForProcess(fromProcessName) : _processTableManager.GetRowForProcess(toProcessName);
var _row = _processTableManager.GetRowForInstanceProcess(row.InstanceName, "");
// Send request to CRA instance
TcpClient client = null;
NetworkStream ns = null;
try
{
client = new TcpClient(_row.Address, _row.Port);
ns = client.GetStream();
}
catch
{
return null;
}
if (!reverse)
ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER);
else
ns.WriteInteger((int)CRATaskMessageType.CONNECT_PROCESS_RECEIVER_REVERSE);
ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessName));
ns.WriteByteArray(Encoding.UTF8.GetBytes(fromProcessOutput));
ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessName));
ns.WriteByteArray(Encoding.UTF8.GetBytes(toProcessInput));
ns.WriteInteger(killRemote ? 1 : 0);
CRAErrorCode result = (CRAErrorCode)ns.ReadInteger();
if (result != 0)
{
Debug.WriteLine("Client received error code: " + result);
ns.Dispose();
return null;
}
else
{
return ns;
}
}
}
}

Просмотреть файл

@ -0,0 +1,31 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace CRA.ClientLibrary
{
/// <summary>
/// Interface for async input endpoints in CRA
/// </summary>
public interface IAsyncProcessInputEndpoint : IDisposable
{
/// <summary>
/// Async version of FromStream
/// </summary>
/// <param name="stream"></param>
/// <param name="otherProcess"></param>
/// <param name="otherEndpoint"></param>
/// <param name="token"></param>
/// <returns></returns>
Task FromStreamAsync(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token);
/// <summary>
/// Async version of FromOutput
/// </summary>
/// <param name="endpoint"></param>
/// <param name="token"></param>
/// <returns></returns>
Task FromOutputAsync(IProcessOutputEndpoint endpoint, CancellationToken token);
}
}

Просмотреть файл

@ -0,0 +1,31 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace CRA.ClientLibrary
{
/// <summary>
/// Interface for async output endpoints in CRA
/// </summary>
public interface IAsyncProcessOutputEndpoint : IDisposable
{
/// <summary>
/// Async version of ToStream
/// </summary>
/// <param name="stream"></param>
/// <param name="otherProcess"></param>
/// <param name="otherEndpoint"></param>
/// <param name="token"></param>
/// <returns></returns>
Task ToStreamAsync(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token);
/// <summary>
/// Async version of ToInput
/// </summary>
/// <param name="endpoint"></param>
/// <param name="token"></param>
/// <returns></returns>
Task ToInputAsync(IProcessInputEndpoint endpoint, CancellationToken token);
}
}

Просмотреть файл

@ -0,0 +1,75 @@
using System;
using System.Collections.Concurrent;
namespace CRA.ClientLibrary
{
/// <summary>
/// User provided notion of a running process
/// </summary>
public interface IProcess : IDisposable
{
/// <summary>
/// Ingress points for a process; these are observers
/// </summary>
ConcurrentDictionary<string, IProcessInputEndpoint> InputEndpoints { get; }
/// <summary>
/// Egress points for a process; these are observables
/// </summary>
ConcurrentDictionary<string, IProcessOutputEndpoint> OutputEndpoints { get; }
/// <summary>
/// Ingress points for a process; these are observers
/// </summary>
ConcurrentDictionary<string, IAsyncProcessInputEndpoint> AsyncInputEndpoints { get; }
/// <summary>
/// Egress points for a process; these are observables
/// </summary>
ConcurrentDictionary<string, IAsyncProcessOutputEndpoint> AsyncOutputEndpoints { get; }
/// <summary>
/// Callback that process will invoke when a new input is added
/// </summary>
/// <param name="addInputCallback"></param>
void OnAddInputEndpoint(Action<string, IProcessInputEndpoint> addInputCallback);
/// <summary>
/// Callback that process will invoke when a new output is added
/// </summary>
/// <param name="addOutputCallback"></param>
void OnAddAsyncOutputEndpoint(Action<string, IAsyncProcessOutputEndpoint> addOutputCallback);
/// <summary>
/// Callback that process will invoke when a new input is added
/// </summary>
/// <param name="addInputCallback"></param>
void OnAddAsyncInputEndpoint(Action<string, IAsyncProcessInputEndpoint> addInputCallback);
/// <summary>
/// Callback that process will invoke when a new output is added
/// </summary>
/// <param name="addOutputCallback"></param>
void OnAddOutputEndpoint(Action<string, IProcessOutputEndpoint> addOutputCallback);
/// <summary>
/// Callback when process is disposed
/// </summary>
void OnDispose(Action disposeCallback);
/// <summary>
/// Gets an instance of the CRA Client Library that the process
/// can use to communicate with the CRA runtime.
/// </summary>
/// <returns>Instance of CRA Client Library</returns>
CRAClientLibrary GetClientLibrary();
/// <summary>
/// Initialize process with specified params
/// </summary>
/// <param name="processParameter"></param>
void Initialize(object processParameter);
}
}

Просмотреть файл

@ -0,0 +1,28 @@
using System;
using System.IO;
using System.Threading;
namespace CRA.ClientLibrary
{
/// <summary>
/// Interface for input endpoints in CRA
/// </summary>
public interface IProcessInputEndpoint : IDisposable
{
/// <summary>
/// Call to provide a stream for input to read from
/// </summary>
/// <param name="stream"></param>
/// <param name="otherProcess"></param>
/// <param name="otherEndpoint"></param>
/// <param name="token"></param>
void FromStream(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token);
/// <summary>
/// Call to provide an output endpoint for input to read from
/// </summary>
/// <param name="endpoint"></param>
/// <param name="token"></param>
void FromOutput(IProcessOutputEndpoint endpoint, CancellationToken token);
}
}

Просмотреть файл

@ -0,0 +1,28 @@
using System;
using System.IO;
using System.Threading;
namespace CRA.ClientLibrary
{
/// <summary>
/// Interface for output endpoints in CRA
/// </summary>
public interface IProcessOutputEndpoint : IDisposable
{
/// <summary>
/// Call to provide a stream for output to write to
/// </summary>
/// <param name="stream"></param>
/// <param name="otherProcess"></param>
/// <param name="otherEndpoint"></param>
/// <param name="token"></param>
void ToStream(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token);
/// <summary>
/// Call to provide an input endpoint for output to write to
/// </summary>
/// <param name="endpoint"></param>
/// <param name="token"></param>
void ToInput(IProcessInputEndpoint endpoint, CancellationToken token);
}
}

Просмотреть файл

@ -0,0 +1,322 @@
using System;
using System.Collections.Concurrent;
namespace CRA.ClientLibrary
{
/// <summary>
/// Base class for Process abstraction
/// </summary>
public abstract class ProcessBase : IProcess
{
private string _processName;
private ConcurrentDictionary<string, IProcessInputEndpoint> _inputEndpoints = new ConcurrentDictionary<string, IProcessInputEndpoint>();
private ConcurrentDictionary<string, IProcessOutputEndpoint> _outputEndpoints = new ConcurrentDictionary<string, IProcessOutputEndpoint>();
private Action<string, IProcessInputEndpoint> onAddInputEndpoint;
private Action<string, IProcessOutputEndpoint> onAddOutputEndpoint;
private Action onDispose;
private ConcurrentDictionary<string, IAsyncProcessInputEndpoint> _asyncInputEndpoints = new ConcurrentDictionary<string, IAsyncProcessInputEndpoint>();
private ConcurrentDictionary<string, IAsyncProcessOutputEndpoint> _asyncOutputEndpoints = new ConcurrentDictionary<string, IAsyncProcessOutputEndpoint>();
private Action<string, IAsyncProcessInputEndpoint> onAddAsyncInputEndpoint;
private Action<string, IAsyncProcessOutputEndpoint> onAddAsyncOutputEndpoint;
private CRAClientLibrary _clientLibrary;
/// <summary>
/// Constructor
/// </summary>
protected ProcessBase()
{
onAddInputEndpoint = (key, proc) => _inputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc);
onAddOutputEndpoint = (key, proc) => _outputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc);
onAddAsyncInputEndpoint = (key, proc) => _asyncInputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc);
onAddAsyncOutputEndpoint = (key, proc) => _asyncOutputEndpoints.AddOrUpdate(key, proc, (str, pr) => proc);
}
/// <summary>
/// Gets an instance of the CRA client library
/// </summary>
/// <returns></returns>
public CRAClientLibrary GetClientLibrary()
{
return _clientLibrary;
}
/// <summary>
/// Set instance of CRA client library
/// </summary>
/// <param name="lib"></param>
public void SetClientLibrary(CRAClientLibrary lib)
{
_clientLibrary = lib;
}
/// <summary>
/// Dictionary of output endpoints for the process
/// </summary>
public ConcurrentDictionary<string, IProcessOutputEndpoint> OutputEndpoints
{
get
{
return _outputEndpoints;
}
}
/// <summary>
/// Dictionary of input endpoints for the process
/// </summary>
public ConcurrentDictionary<string, IProcessInputEndpoint> InputEndpoints
{
get
{
return _inputEndpoints;
}
}
/// <summary>
/// Dictionary of async output endpoints for the process
/// </summary>
public ConcurrentDictionary<string, IAsyncProcessOutputEndpoint> AsyncOutputEndpoints
{
get
{
return _asyncOutputEndpoints;
}
}
/// <summary>
/// Dictionary of async input endpoints for the process
/// </summary>
public ConcurrentDictionary<string, IAsyncProcessInputEndpoint> AsyncInputEndpoints
{
get
{
return _asyncInputEndpoints;
}
}
/// <summary>
/// Connect local output endpoint (ToStream) to remote process' input endpoint (FromStream)
/// </summary>
/// <param name="localOutputEndpoint">Local output endpoint</param>
/// <param name="remoteProcess">Remote process name</param>
/// <param name="remoteInputEndpoint">Remote input endpoint</param>
public void ConnectLocalOutputEndpoint(string localOutputEndpoint, string remoteProcess, string remoteInputEndpoint)
{
_clientLibrary.Connect(_processName, localOutputEndpoint, remoteProcess, remoteInputEndpoint);
}
/// <summary>
/// Connect local input endpoint (FromStream) to remote process' output endpoint (ToStream)
/// </summary>
/// <param name="localInputEndpoint">Local input endpoint</param>
/// <param name="remoteProcess">Remote process name</param>
/// <param name="remoteOutputEndpoint">Remote output endpoint</param>
public void ConnectLocalInputEndpoint(string localInputEndpoint, string remoteProcess, string remoteOutputEndpoint)
{
_clientLibrary.Connect(remoteProcess, remoteOutputEndpoint, _processName, localInputEndpoint, ConnectionInitiator.ToSide);
}
/// <summary>
/// Add callback for when input endpoint is added
/// </summary>
/// <param name="addInputCallback"></param>
public void OnAddInputEndpoint(Action<string, IProcessInputEndpoint> addInputCallback)
{
lock (this)
{
foreach (var key in InputEndpoints.Keys)
{
addInputCallback(key, InputEndpoints[key]);
}
onAddInputEndpoint += addInputCallback;
}
}
/// <summary>
/// Add callback for when output endpoint is added
/// </summary>
/// <param name="addOutputCallback"></param>
public void OnAddOutputEndpoint(Action<string, IProcessOutputEndpoint> addOutputCallback)
{
lock (this)
{
foreach (var key in OutputEndpoints.Keys)
{
addOutputCallback(key, OutputEndpoints[key]);
}
onAddOutputEndpoint += addOutputCallback;
}
}
/// <summary>
/// Add callback for when async input endpoint is added
/// </summary>
/// <param name="addInputCallback"></param>
public void OnAddAsyncInputEndpoint(Action<string, IAsyncProcessInputEndpoint> addInputCallback)
{
lock (this)
{
foreach (var key in AsyncInputEndpoints.Keys)
{
addInputCallback(key, AsyncInputEndpoints[key]);
}
onAddAsyncInputEndpoint += addInputCallback;
}
}
/// <summary>
/// Add callback for when async output endpoint is added
/// </summary>
/// <param name="addOutputCallback"></param>
public void OnAddAsyncOutputEndpoint(Action<string, IAsyncProcessOutputEndpoint> addOutputCallback)
{
lock (this)
{
foreach (var key in AsyncOutputEndpoints.Keys)
{
addOutputCallback(key, AsyncOutputEndpoints[key]);
}
onAddAsyncOutputEndpoint += addOutputCallback;
}
}
/// <summary>
/// Get the name of the process
/// </summary>
/// <returns></returns>
public string ProcessName
{
get { return _processName; }
}
internal void SetProcessName(string processName)
{
_processName = processName;
}
/// <summary>
/// Callback for dispose
/// </summary>
/// <param name="disposeCallback"></param>
public void OnDispose(Action disposeCallback)
{
lock (this)
{
if (onDispose == null)
onDispose = disposeCallback;
else
onDispose += disposeCallback;
}
}
/// <summary>
/// Process implementor uses this to add input endpoint
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
protected virtual void AddInputEndpoint(string key, IProcessInputEndpoint input)
{
lock (this)
{
onAddInputEndpoint(key, input);
}
}
/// <summary>
/// Process implementor uses this to add output endpoint
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
protected virtual void AddOutputEndpoint(string key, IProcessOutputEndpoint input)
{
lock (this)
{
onAddOutputEndpoint(key, input);
}
}
/// <summary>
/// Process implementor uses this to add async input endpoint
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
protected virtual void AddAsyncInputEndpoint(string key, IAsyncProcessInputEndpoint input)
{
lock (this)
{
onAddAsyncInputEndpoint(key, input);
}
}
/// <summary>
/// Process implementor uses this to add async output endpoint
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
protected virtual void AddAsyncOutputEndpoint(string key, IAsyncProcessOutputEndpoint input)
{
lock (this)
{
onAddAsyncOutputEndpoint(key, input);
}
}
/// <summary>
/// Initialize
/// </summary>
/// <param name="processParameter"></param>
public virtual void Initialize(object processParameter)
{
}
/// <summary>
/// Dispose the process
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Actual dispose occurs here
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
onDispose?.Invoke();
lock (this)
{
foreach (var key in OutputEndpoints.Keys)
{
OutputEndpoints[key].Dispose();
}
foreach (var key in InputEndpoints.Keys)
{
InputEndpoints[key].Dispose();
}
foreach (var key in AsyncOutputEndpoints.Keys)
{
AsyncOutputEndpoints[key].Dispose();
}
foreach (var key in AsyncInputEndpoints.Keys)
{
AsyncInputEndpoints[key].Dispose();
}
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,35 @@
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("CRA.ClientLibrary")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("CRA.ClientLibrary")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("ef23eb6a-e329-496d-9b7a-8cad66ea4e3a")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,128 @@
using Microsoft.WindowsAzure.Storage.Table;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
namespace CRA.ClientLibrary
{
/// <summary>
/// An assignment of one machine to a group
/// </summary>
public class ConnectionTable : TableEntity
{
/// <summary>
/// Name of the from process
/// </summary>
public string FromProcess { get { return this.PartitionKey; } }
/// <summary>
/// Other data related to connection
/// </summary>
public string EndpointToProcessEndpoint { get { return this.RowKey; } }
/// <summary>
/// From endpoint
/// </summary>
public string FromEndpoint { get { return EndpointToProcessEndpoint.Split(':')[0]; } }
/// <summary>
/// To process
/// </summary>
public string ToProcess { get { return EndpointToProcessEndpoint.Split(':')[1]; } }
/// <summary>
/// To endpoint
/// </summary>
public string ToEndpoint { get { return EndpointToProcessEndpoint.Split(':')[2]; } }
/// <summary>
/// Connection table
/// </summary>
/// <param name="fromProcess"></param>
/// <param name="fromEndpoint"></param>
/// <param name="toProcess"></param>
/// <param name="toEndpoint"></param>
public ConnectionTable(string fromProcess, string fromEndpoint, string toProcess, string toEndpoint)
{
this.PartitionKey = fromProcess;
this.RowKey = fromEndpoint + ":" + toProcess + ":" + toEndpoint;
}
/// <summary>
///
/// </summary>
public ConnectionTable() { }
/// <summary>
/// ToString
/// </summary>
/// <returns></returns>
public override string ToString()
{
return string.Format(CultureInfo.CurrentCulture, "FromProcess '{0}', FromEndpoint '{1}', ToProcess '{2}', ToEndpoint '{3}'", FromProcess, FromEndpoint, ToProcess, ToEndpoint);
}
/// <summary>
/// Equality
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
ConnectionTable other = obj as ConnectionTable;
return this.PartitionKey.Equals(other.PartitionKey) && this.RowKey.Equals(other.RowKey);
}
/// <summary>
/// GetHashCode
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
return PartitionKey.GetHashCode() ^ RowKey.GetHashCode();
}
/// <summary>
/// Returns a list of all visible nodes in all groups
/// </summary>
/// <param name="instanceTable"></param>
/// <returns></returns>
internal static IEnumerable<ConnectionTable> GetAll(CloudTable instanceTable)
{
TableQuery<ConnectionTable> query = new TableQuery<ConnectionTable>();
return instanceTable.ExecuteQuery(query);
}
/// <summary>
/// Counts all nodes in the cluster regardless of their group
/// </summary>
/// <returns></returns>
internal static int CountAll(CloudTable instanceTable)
{
return GetAll(instanceTable).Count();
}
internal static IEnumerable<ConnectionTable> GetAllConnectionsFromProcess(CloudTable instanceTable, string fromProcess)
{
return GetAll(instanceTable).Where(gn => fromProcess == gn.PartitionKey);
}
internal static IEnumerable<ConnectionTable> GetAllConnectionsToProcess(CloudTable instanceTable, string toProcess)
{
return GetAll(instanceTable).Where(gn => toProcess == gn.ToProcess);
}
internal static bool ContainsConnection(CloudTable instanceTable, string fromProcess, string fromEndpoint, string toProcess, string toEndpoint)
{
return ContainsRow(instanceTable, new ConnectionTable(fromProcess, fromEndpoint, toProcess, toEndpoint));
}
internal static bool ContainsRow(CloudTable instanceTable, ConnectionTable entity)
{
var temp = GetAll(instanceTable);
return temp.Where(gn => entity.Equals(gn)).Count() > 0;
}
}
}

Просмотреть файл

@ -0,0 +1,76 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
namespace CRA.ClientLibrary
{
/// <summary>
/// An assignment of one machine to a group
/// </summary>
public class ConnectionTableManager
{
private CloudTable _connectionTable;
internal ConnectionTableManager(string storageConnectionString)
{
var _storageAccount = CloudStorageAccount.Parse(storageConnectionString);
var _tableClient = _storageAccount.CreateCloudTableClient();
_connectionTable = CreateTableIfNotExists("connectiontableforcra", _tableClient);
}
internal void DeleteTable()
{
_connectionTable.DeleteIfExists();
}
internal void AddConnection(string fromProcess, string fromOutput, string toConnection, string toInput)
{
// Make the connection information stable
var newRow = new ConnectionTable(fromProcess, fromOutput, toConnection, toInput);
TableOperation insertOperation = TableOperation.InsertOrReplace(newRow);
_connectionTable.Execute(insertOperation);
}
internal void DeleteConnection(string fromProcess, string fromOutput, string toConnection, string toInput)
{
// Make the connection information stable
var newRow = new ConnectionTable(fromProcess, fromOutput, toConnection, toInput);
newRow.ETag = "*";
TableOperation deleteOperation = TableOperation.Delete(newRow);
_connectionTable.Execute(deleteOperation);
}
internal List<ConnectionInfo> GetConnectionsFromProcess(string processName)
{
TableQuery<ConnectionTable> query = new TableQuery<ConnectionTable>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName));
return _connectionTable
.ExecuteQuery(query)
.Select(e => new ConnectionInfo(e.FromProcess, e.FromEndpoint, e.ToProcess, e.ToEndpoint))
.ToList();
}
internal List<ConnectionInfo> GetConnectionsToProcess(string processName)
{
return
ConnectionTable.GetAllConnectionsToProcess(_connectionTable, processName)
.Select(e => new ConnectionInfo(e.FromProcess, e.FromEndpoint, e.ToProcess, e.ToEndpoint))
.ToList();
}
private CloudTable CreateTableIfNotExists(string tableName, CloudTableClient _tableClient)
{
CloudTable table = _tableClient.GetTableReference(tableName);
try
{
table.CreateIfNotExists();
}
catch { }
return table;
}
}
}

Просмотреть файл

@ -0,0 +1,100 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using Microsoft.WindowsAzure.Storage.Table;
namespace CRA.ClientLibrary
{
/// <summary>
/// An assignment of one machine to a group
/// </summary>
public class EndpointTable : TableEntity
{
/// <summary>
/// The time interval at which workers refresh their membership entry
/// </summary>
public static readonly TimeSpan HeartbeatTime = TimeSpan.FromSeconds(10);
/// <summary>
/// Name of the group
/// </summary>
public string ProcessName { get { return this.PartitionKey; } }
/// <summary>
/// Endpoint name
/// </summary>
public string EndpointName { get { return this.RowKey; } }
/// <summary>
/// Is an input (or output)
/// </summary>
public bool IsInput { get; set; }
/// <summary>
/// Is async (or sync)
/// </summary>
public bool IsAsync { get; set; }
/// <summary>
/// Constructor
/// </summary>
/// <param name="processName"></param>
/// <param name="endpointName"></param>
/// <param name="isInput"></param>
/// <param name="isAsync"></param>
public EndpointTable(string processName, string endpointName, bool isInput, bool isAsync)
{
this.PartitionKey = processName;
this.RowKey = endpointName;
this.IsInput = isInput;
this.IsAsync = isAsync;
}
/// <summary>
/// Constructor
/// </summary>
public EndpointTable() { }
/// <summary>
/// ToString
/// </summary>
/// <returns></returns>
public override string ToString()
{
return string.Format(CultureInfo.CurrentCulture, "Process '{0}', Endpoint '{1}'", PartitionKey, RowKey);
}
/// <summary>
/// Equals
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
EndpointTable other = obj as EndpointTable;
return this.PartitionKey.Equals(other.PartitionKey) && this.RowKey.Equals(other.RowKey);
}
/// <summary>
/// GetHashCode
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
return PartitionKey.GetHashCode() ^ RowKey.GetHashCode();
}
/// <summary>
/// Returns a list of all visible nodes in all groups
/// </summary>
/// <param name="instanceTable"></param>
/// <returns></returns>
internal static IEnumerable<EndpointTable> GetAll(CloudTable instanceTable)
{
var query = new TableQuery<EndpointTable>();
return instanceTable.ExecuteQuery(query);
}
}
}

Просмотреть файл

@ -0,0 +1,105 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
namespace CRA.ClientLibrary
{
/// <summary>
/// An assignment of one machine to a group
/// </summary>
public class EndpointTableManager
{
private CloudTable _endpointTable;
internal EndpointTableManager(string storageConnectionString)
{
var _storageAccount = CloudStorageAccount.Parse(storageConnectionString);
var _tableClient = _storageAccount.CreateCloudTableClient();
_endpointTable = CreateTableIfNotExists("endpointtableforcra", _tableClient);
}
internal void DeleteTable()
{
_endpointTable.DeleteIfExists();
}
internal bool ExistsEndpoint(string processName, string endPoint)
{
TableQuery<EndpointTable> query = new TableQuery<EndpointTable>()
.Where(TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, endPoint)));
return _endpointTable.ExecuteQuery(query).Any();
}
internal void AddEndpoint(string processName, string endpointName, bool isInput, bool isAsync)
{
// Make the connection information stable
var newRow = new EndpointTable(processName, endpointName, isInput, isAsync);
TableOperation insertOperation = TableOperation.InsertOrReplace(newRow);
_endpointTable.Execute(insertOperation);
}
internal void DeleteEndpoint(string processName, string endpointName)
{
// Make the connection information stable
var newRow = new DynamicTableEntity(processName, endpointName);
newRow.ETag = "*";
TableOperation deleteOperation = TableOperation.Delete(newRow);
_endpointTable.Execute(deleteOperation);
}
internal void RemoveEndpoint(string processName, string endpointName)
{
var op = TableOperation.Retrieve<EndpointTable>(processName, endpointName);
TableResult retrievedResult = _endpointTable.Execute(op);
// Assign the result to a CustomerEntity.
var deleteEntity = (EndpointTable)retrievedResult.Result;
// Create the Delete TableOperation.
if (deleteEntity != null)
{
TableOperation deleteOperation = TableOperation.Delete(deleteEntity);
// Execute the operation.
_endpointTable.Execute(deleteOperation);
}
else
{
Console.WriteLine("Could not retrieve the entity.");
}
}
internal List<string> GetInputEndpoints(string processName)
{
TableQuery<EndpointTable> query = new TableQuery<EndpointTable>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName));
return _endpointTable.ExecuteQuery(query).Where(e => e.IsInput).Select(e => e.EndpointName).ToList();
}
internal List<string> GetOutputEndpoints(string processName)
{
TableQuery<EndpointTable> query = new TableQuery<EndpointTable>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, processName));
return _endpointTable.ExecuteQuery(query).Where(e => !e.IsInput).Select(e => e.EndpointName).ToList();
}
private CloudTable CreateTableIfNotExists(string tableName, CloudTableClient _tableClient)
{
CloudTable table = _tableClient.GetTableReference(tableName);
try
{
table.CreateIfNotExists();
}
catch
{
}
return table;
}
}
}

Просмотреть файл

@ -0,0 +1,233 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using Microsoft.WindowsAzure.Storage.Table;
namespace CRA.ClientLibrary
{
/// <summary>
/// An assignment of one machine to a group
/// </summary>
public class ProcessTable : TableEntity
{
/// <summary>
/// The time interval at which workers refresh their membership entry
/// </summary>
public static readonly TimeSpan HeartbeatTime = TimeSpan.FromSeconds(10);
/// <summary>
/// Name of the CRA instance
/// </summary>
public string InstanceName { get { return this.PartitionKey; } }
/// <summary>
/// Name of process
/// </summary>
public string ProcessName { get { return this.RowKey; } }
/// <summary>
/// Definition of process
/// </summary>
public string ProcessDefinition { get; set; }
/// <summary>
/// Name of the machine
/// </summary>
public string Address { get; set; }
/// <summary>
/// Port number
/// </summary>
public int Port { get; set; }
/// <summary>
/// Action to create process
/// </summary>
public string ProcessCreateAction { get; set; }
/// <summary>
/// Parameter to process creator
/// </summary>
public string ProcessParameter { get; set; }
/// <summary>
/// Constructor
/// </summary>
/// <param name="instanceName"></param>
/// <param name="processName"></param>
/// <param name="processDefinition"></param>
/// <param name="address"></param>
/// <param name="port"></param>
/// <param name="processCreateAction"></param>
/// <param name="processParameter"></param>
public ProcessTable(string instanceName, string processName, string processDefinition, string address, int port, Expression<Func<IProcess>> processCreateAction, object processParameter)
{
this.PartitionKey = instanceName;
this.RowKey = processName;
this.ProcessDefinition = processDefinition;
this.Address = address;
this.Port = port;
this.ProcessCreateAction = "";
if (processCreateAction != null)
{
var closureEliminator = new ClosureEliminator();
Expression processedUserLambdaExpression = closureEliminator.Visit(
processCreateAction);
this.ProcessCreateAction = SerializationHelper.Serialize(processedUserLambdaExpression);
}
this.ProcessParameter = SerializationHelper.SerializeObject(processParameter);
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="instanceName"></param>
/// <param name="processName"></param>
/// <param name="processDefinition"></param>
/// <param name="address"></param>
/// <param name="port"></param>
/// <param name="processCreateAction"></param>
/// <param name="processParameter"></param>
public ProcessTable(string instanceName, string processName, string processDefinition, string address, int port, string processCreateAction, string processParameter)
{
this.PartitionKey = instanceName;
this.RowKey = processName;
this.ProcessDefinition = processDefinition;
this.Address = address;
this.Port = port;
this.ProcessCreateAction = processCreateAction;
this.ProcessParameter = processParameter;
}
/// <summary>
/// Constructor
/// </summary>
public ProcessTable() { }
/// <summary>
/// ToString
/// </summary>
/// <returns></returns>
public override string ToString()
{
return string.Format(CultureInfo.CurrentCulture, "Instance '{0}', Address '{1}', Port '{2}'", this.InstanceName, this.Address, this.Port);
}
/// <summary>
/// Equals
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
ProcessTable other = obj as ProcessTable;
return this.PartitionKey.Equals(other.PartitionKey) && this.RowKey.Equals(other.RowKey);
}
/// <summary>
/// GetHashCode
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
return PartitionKey.GetHashCode() ^ RowKey.GetHashCode();
}
internal Func<IProcess> GetProcessCreateAction()
{
var expr = SerializationHelper.Deserialize(ProcessCreateAction);
var actionExpr = AddBox((LambdaExpression) expr);
return actionExpr.Compile();
}
internal object GetProcessParam()
{
return SerializationHelper.DeserializeObject(this.ProcessParameter);
}
private static Expression<Func<IProcess>> AddBox(LambdaExpression expression)
{
Expression converted = Expression.Convert
(expression.Body, typeof(IProcess));
return Expression.Lambda<Func<IProcess>>
(converted, expression.Parameters);
}
/// <summary>
/// Returns a list of all visible nodes in all groups
/// </summary>
/// <param name="instanceTable"></param>
/// <returns></returns>
internal static IEnumerable<ProcessTable> GetAll(CloudTable instanceTable)
{
TableQuery<ProcessTable> query = new TableQuery<ProcessTable>();
return instanceTable.ExecuteQuery(query);
}
/// <summary>
/// Counts all nodes in the cluster regardless of their group
/// </summary>
/// <returns></returns>
internal static int CountAll(CloudTable instanceTable)
{
return GetAll(instanceTable).Count();
}
internal static ProcessTable GetInstanceFromAddress(CloudTable instanceTable, string address, int port)
{
return GetAll(instanceTable).Where(gn => address == gn.Address && port == gn.Port).First();
}
internal static ProcessTable GetRowForInstance(CloudTable instanceTable, string instanceName)
{
return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName && string.IsNullOrEmpty(gn.ProcessName)).First();
}
internal static IEnumerable<ProcessTable> GetAllRowsForInstance(CloudTable instanceTable, string instanceName)
{
return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName);
}
internal static ProcessTable GetRowForInstanceProcess(CloudTable instanceTable, string instanceName, string processName)
{
return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName && processName == gn.ProcessName).First();
}
internal static ProcessTable GetRowForProcessDefinition(CloudTable instanceTable, string processDefinition)
{
return GetAll(instanceTable).Where(gn => processDefinition == gn.ProcessName && string.IsNullOrEmpty(gn.InstanceName)).First();
}
internal static ProcessTable GetRowForProcess(CloudTable instanceTable, string processName)
{
return GetAll(instanceTable).Where(gn => processName == gn.ProcessName && !string.IsNullOrEmpty(gn.InstanceName)).First();
}
internal static IEnumerable<ProcessTable> GetProcesses(CloudTable instanceTable, string instanceName)
{
return GetAll(instanceTable).Where(gn => instanceName == gn.InstanceName && !string.IsNullOrEmpty(gn.ProcessName));
}
internal static bool ContainsRow(CloudTable instanceTable, ProcessTable entity)
{
var temp = GetAll(instanceTable);
return temp.Where(gn => entity.Equals(gn)).Count() > 0;
}
internal static bool ContainsInstance(CloudTable instanceTable, string instanceName)
{
var temp = GetAll(instanceTable);
return temp.Where(gn => instanceName == gn.InstanceName).Count() > 0;
}
}
}

Просмотреть файл

@ -0,0 +1,85 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
namespace CRA.ClientLibrary
{
/// <summary>
/// An assignment of one machine to a group
/// </summary>
public class ProcessTableManager
{
private CloudTable _processTable;
internal ProcessTableManager(string storageConnectionString)
{
var _storageAccount = CloudStorageAccount.Parse(storageConnectionString);
var _tableClient = _storageAccount.CreateCloudTableClient();
_processTable = CreateTableIfNotExists("processtableforcra", _tableClient);
}
internal void DeleteTable()
{
_processTable.DeleteIfExists();
}
internal bool ExistsProcess(string processName)
{
TableQuery<ProcessTable> query = new TableQuery<ProcessTable>()
.Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, processName));
return _processTable.ExecuteQuery(query).Any();
}
internal void RegisterInstance(string instanceName, string address, int port)
{
TableOperation insertOperation = TableOperation.InsertOrReplace(new ProcessTable
(instanceName, "", "", address, port, "", ""));
_processTable.Execute(insertOperation);
}
internal void RegisterProcess(string processName, string instanceName)
{
TableOperation insertOperation = TableOperation.InsertOrReplace(new ProcessTable
(instanceName, processName, "", "", 0, "", ""));
_processTable.Execute(insertOperation);
}
internal void DeleteInstance(string instanceName)
{
var newRow = new DynamicTableEntity(instanceName, "");
newRow.ETag = "*";
TableOperation deleteOperation = TableOperation.Delete(newRow);
_processTable.Execute(deleteOperation);
}
internal ProcessTable GetRowForProcess(string processName)
{
return ProcessTable.GetAll(_processTable).Where(gn => processName == gn.ProcessName && !string.IsNullOrEmpty(gn.InstanceName)).First();
}
internal ProcessTable GetRowForInstanceProcess(string instanceName, string processName)
{
return ProcessTable.GetAll(_processTable).Where(gn => instanceName == gn.InstanceName && processName == gn.ProcessName).First();
}
private static CloudTable CreateTableIfNotExists(string tableName, CloudTableClient _tableClient)
{
CloudTable table = _tableClient.GetTableReference(tableName);
try
{
table.CreateIfNotExists();
}
catch { }
return table;
}
internal List<string> GetProcessNames()
{
TableQuery<ProcessTable> query = new TableQuery<ProcessTable>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.NotEqual, ""));
return _processTable.ExecuteQuery(query).Select(e => e.ProcessName).ToList();
}
}
}

Просмотреть файл

@ -0,0 +1,61 @@
using System;
using System.Threading;
using System.Reflection;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
#pragma warning disable 420
namespace CRA.ClientLibrary
{
internal static class AssemblyResolver
{
private static readonly ConcurrentDictionary<string, byte[]> assemblies =
new ConcurrentDictionary<string, byte[]>();
private static volatile int handlerRegistered;
public static IEnumerable<string> RegisteredAssemblies
{
get
{
return assemblies.Keys;
}
}
public static bool ContainsAssembly(string name)
{
Contract.Requires(name != null);
return assemblies.ContainsKey(name);
}
public static byte[] GetAssemblyBytes(string name)
{
Contract.Requires(name != null);
return assemblies[name];
}
public static void Register(string name, byte[] assembly)
{
Contract.Requires(name != null);
Contract.Requires(assembly != null);
assemblies.TryAdd(name, assembly);
if (handlerRegistered == 0 &&
Interlocked.CompareExchange(ref handlerRegistered, 1, 0) == 0)
{
AppDomain.CurrentDomain.AssemblyResolve += Resolver;
}
}
private static Assembly Resolver(object sender, ResolveEventArgs arguments)
{
byte[] assemblyBytes;
if (assemblies.TryGetValue(arguments.Name, out assemblyBytes))
{
return Assembly.Load(assemblyBytes);
}
return null;
}
}
}

Просмотреть файл

@ -0,0 +1,45 @@
using System.Reflection;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
namespace CRA.ClientLibrary
{
internal static class AssemblyResolverClient
{
internal struct ApplicationAssembly
{
public string Name;
public string FileName;
}
private static void GetApplicationAssemblies(
AssemblyName current, List<ApplicationAssembly> assemblies, HashSet<string> exclude)
{
Contract.Requires(current != null);
Contract.Requires(assemblies != null);
Contract.Requires(exclude != null);
// If this assembly is in the "exclude" set, then done.
var name = current.FullName;
if (exclude.Contains(name))
{
return;
}
// Add to "exclude" set so this assembly isn't re-added if it is referenced additional times.
exclude.Add(name);
// Recursively add any assemblies that "current" references.
var assembly = Assembly.Load(current);
foreach (var referenced in assembly.GetReferencedAssemblies())
{
GetApplicationAssemblies(referenced, assemblies, exclude);
}
// Lastly, now that all dependant (referenced) assemblies are added, add the "current" assembly.
assemblies.Add(new ApplicationAssembly { Name = name, FileName = assembly.Location });
}
}
}

Просмотреть файл

@ -0,0 +1,280 @@
using System;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Globalization;
using System.Runtime.Serialization;
namespace CRA.ClientLibrary
{
internal static class AssemblyUtils
{
internal struct ApplicationAssembly
{
public string Name;
public string FileName;
}
[DataContract]
internal struct UserDLLsInfo
{
[DataMember]
public string UserDLLsBuffer { get; set; }
[DataMember]
public string UserDLLsBufferInfo { get; set; }
}
public static string[] GetExcludedAssemblies()
{
string[] excludedAssemblies = new[]
{
"mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Core, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"Microsoft.Azure.KeyVault.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"Microsoft.VisualStudio.HostingProcess.Utilities, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"mscorlib, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Configuration, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Xml, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Data.SqlXml, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Security, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Drawing, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Accessibility, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Deployment, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Runtime.Serialization.Formatters.Soap, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.VisualStudio.HostingProcess.Utilities.Sync, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.VisualStudio.Debugger.Runtime, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"vshost32, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Xml.Linq, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Runtime.Serialization, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.ServiceModel.Internals, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"SMDiagnostics, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Data.DataSetExtensions, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Data, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Transactions, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.EnterpriseServices, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.DirectoryServices, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Runtime.Remoting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Web, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Web.RegularExpressions, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Design, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Data.OracleClient, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Drawing.Design, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Web.ApplicationServices, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.ComponentModel.DataAnnotations, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.DirectoryServices.Protocols, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Runtime.Caching, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.ServiceProcess, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Configuration.Install, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Web.Services, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.Build.Utilities.v4.0, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.Build.Framework, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Xaml, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"Microsoft.Build.Tasks.v4.0, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Numerics, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"Microsoft.CSharp, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Dynamic, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Net.Http, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.WindowsAzure.Storage, Version=8.1.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"Microsoft.Data.Services.Client, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"Microsoft.Data.Edm, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"Microsoft.Data.OData, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.Spatial, Version=5.8.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed",
"Newtonsoft.Json, Version=8.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed",
"Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed",
"Remote.Linq, Version=5.1.0.0, Culture=neutral, PublicKeyToken=null",
"Aqua, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null",
"Anonymously Hosted DynamicMethods Assembly, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null",
"Remote.Linq, Version=5.3.1.0, Culture=neutral, PublicKeyToken=null",
"Aqua, Version=3.0.0.0, Culture=neutral, PublicKeyToken=null",
"Aqua.TypeSystem.Emit.Types, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null",
"Remote.Linq, Version=5.4.0.0, Culture=neutral, PublicKeyToken=null",
"Aqua, Version=4.0.0.0, Culture=neutral, PublicKeyToken=null",
"System.ServiceModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.IdentityModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"Microsoft.Transactions.Bridge, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.IdentityModel.Selectors, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089",
"System.Messaging, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Runtime.DurableInstancing, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.ServiceModel.Activation, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.ServiceModel.Activities, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.Activities, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"Microsoft.VisualBasic.Activities.Compiler, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.VisualBasic, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Management, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"Microsoft.JScript, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a",
"System.Activities.DurableInstancing, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"System.Xaml.Hosting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35",
"CRA.ClientLibrary, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
};
return excludedAssemblies;
}
public static ApplicationAssembly[] GetRelatedApplicationAssemblies(
string assemblyNamePrefix, params string[] excludedAssemblies)
{
Contract.Requires(excludedAssemblies != null);
var assemblies = new List<ApplicationAssembly>();
var exclude = new HashSet<string>(excludedAssemblies);
var applicationAssemblies = AppDomain.CurrentDomain.GetAssemblies();
foreach (var applicationAssembly in applicationAssemblies
.Where(v => v.GetName().FullName.StartsWith(
assemblyNamePrefix)))
{
GetApplicationAssemblies(applicationAssembly.GetName(), assemblies, exclude);
}
return assemblies.ToArray();
}
private static void GetApplicationAssemblies(AssemblyName current,
List<ApplicationAssembly> assemblies, HashSet<string> exclude)
{
Contract.Requires(current != null);
Contract.Requires(assemblies != null);
Contract.Requires(exclude != null);
var name = current.FullName;
if (exclude.Contains(name))
{
return;
}
// Console.WriteLine("\"" + name + "\",");
exclude.Add(name);
try
{
var assembly = Assembly.Load(current);
foreach (var referenced in assembly.GetReferencedAssemblies())
{
GetApplicationAssemblies(referenced, assemblies, exclude);
}
assemblies.Add(new ApplicationAssembly { Name = name, FileName = assembly.Location });
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
public static Dictionary<string, byte[]> AssembliesFromString(string assembliesString,
string assembliesStringInfo)
{
Dictionary<string, byte[]> udfAssemblies = new Dictionary<string, byte[]>();
if (!assembliesStringInfo.Equals("$"))
{
string[] parts = assembliesStringInfo.Split('@');
MemoryStream assembliesStream = new MemoryStream(
Convert.FromBase64String(assembliesString));
BinaryReader binaryReader = new BinaryReader(assembliesStream);
for (int i = 0; i < parts.Length - 1; i = i + 2)
{
int assemblyNameSize = int.Parse(parts[i], CultureInfo.InvariantCulture);
int assemblyBytesSize = int.Parse(parts[i + 1], CultureInfo.InvariantCulture);
string assemblyName = Encoding.UTF8.GetString(
binaryReader.ReadBytes(assemblyNameSize));
byte[] assemblyBytes = binaryReader.ReadBytes(assemblyBytesSize);
udfAssemblies.Add(assemblyName, assemblyBytes);
}
}
return udfAssemblies;
}
public static void WriteAssembliesToStream(Stream stream)
{
var relatedAssemblies = GetRelatedApplicationAssemblies("",
GetExcludedAssemblies());
stream.WriteInteger(relatedAssemblies.Length);
foreach (var assembly in relatedAssemblies)
{
byte[] assemblyNameBytes = Encoding.UTF8.GetBytes(assembly.Name);
stream.WriteByteArray(assemblyNameBytes);
FileStream fs = new FileStream(assembly.FileName, FileMode.Open, FileAccess.Read);
using (BinaryReader br = new BinaryReader(fs))
{
byte[] assemblyFileBytes = br.ReadBytes(Convert.ToInt32(fs.Length));
stream.WriteByteArray(assemblyFileBytes);
}
}
}
public static void LoadAssembliesFromStream(Stream stream)
{
int numAssemblies = stream.ReadInteger();
for (int i=0; i<numAssemblies; i++)
{
byte[] assemblyNameBytes = stream.ReadByteArray();
string assemblyName = Encoding.UTF8.GetString(assemblyNameBytes);
byte[] assemblyFileBytes = stream.ReadByteArray();
AssemblyResolver.Register(assemblyName, assemblyFileBytes);
}
}
public static UserDLLsInfo BuildUserDLLs(string userLibraryPrefix)
{
UserDLLsInfo userDLLsInfo = new UserDLLsInfo();
if (userLibraryPrefix == null || userLibraryPrefix.Equals("$"))
{
List<byte> userDLLsBuffer = new List<byte>();
userDLLsBuffer.Add((byte)'$');
userDLLsInfo.UserDLLsBuffer = Convert.ToBase64String(userDLLsBuffer.ToArray());
userDLLsInfo.UserDLLsBufferInfo = "$";
}
else
{
List<byte> userDLLsBuffer = new List<byte>();
StringBuilder userDLLsBufferInfo = new StringBuilder();
var relatedAssemblies = GetRelatedApplicationAssemblies(userLibraryPrefix,
GetExcludedAssemblies());
foreach (var assembly in relatedAssemblies)
{
byte[] assemblyNameBytes = Encoding.UTF8.GetBytes(assembly.Name);
userDLLsBuffer.AddRange(assemblyNameBytes);
FileStream fs = new FileStream(assembly.FileName, FileMode.Open, FileAccess.Read);
using (BinaryReader br = new BinaryReader(fs))
{
byte[] assemblyFileBytes = br.ReadBytes(Convert.ToInt32(fs.Length));
userDLLsBuffer.AddRange(assemblyFileBytes);
userDLLsBufferInfo.Append(assembly.Name.Length);
userDLLsBufferInfo.Append('@');
userDLLsBufferInfo.Append(assemblyFileBytes.Length);
userDLLsBufferInfo.Append('@');
}
}
userDLLsInfo.UserDLLsBuffer = Convert.ToBase64String(userDLLsBuffer.ToArray());
userDLLsInfo.UserDLLsBufferInfo = userDLLsBufferInfo.ToString();
}
return userDLLsInfo;
}
}
}

Просмотреть файл

@ -0,0 +1,40 @@
using System.Reflection;
using System.Linq.Expressions;
namespace CRA.ClientLibrary
{
internal class ClosureEliminator : ExpressionVisitor
{
protected override Expression VisitMember(MemberExpression node)
{
if ((node.Expression != null) && (node.Expression.NodeType == ExpressionType.Constant))
{
object target = ((ConstantExpression)node.Expression).Value, value;
switch (node.Member.MemberType)
{
case MemberTypes.Property:
value = ((PropertyInfo)node.Member).GetValue(target, null);
break;
case MemberTypes.Field:
value = ((FieldInfo)node.Member).GetValue(target);
break;
default:
value = target = null;
break;
}
if (target != null)
{
if (value.GetType().IsSubclassOf(typeof(Expression)))
{
return this.Visit(value as Expression);
}
else
{
return Expression.Constant(value, node.Type);
}
}
}
return base.VisitMember(node);
}
}
}

Просмотреть файл

@ -0,0 +1,70 @@
using System.Linq.Expressions;
using Newtonsoft.Json;
using Remote.Linq;
using Remote.Linq.ExpressionVisitors;
using System;
namespace CRA.ClientLibrary
{
internal class SerializationHelper
{
private SerializationHelper()
{
}
private static readonly JsonSerializerSettings _serializerSettings
= new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto,
NullValueHandling = NullValueHandling.Ignore,
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full
};
/// <summary>
/// Serializes a LINQ expression.
/// </summary>
/// <param name="expression">The expression.</param>
/// <returns>The serialized expression.</returns>
internal static string Serialize(Expression expression)
{
var toSerialize = expression.ToRemoteLinqExpression()
.ReplaceGenericQueryArgumentsByNonGenericArguments();
return JsonConvert.SerializeObject(toSerialize, _serializerSettings);
}
/// <summary>
/// Deserializes a LINQ expression.
/// </summary>
/// <param name="expression">The serialized expression.</param>
/// <returns>The expression.</returns>
internal static Expression Deserialize(string expression)
{
var deserialized = JsonConvert.DeserializeObject<Remote.Linq.Expressions.LambdaExpression>(
expression, _serializerSettings);
var ret = deserialized.ReplaceNonGenericQueryArgumentsByGenericArguments()
.ToLinqExpression();
return ret;
}
internal static string SerializeObject(object obj)
{
if (obj == null)
{
return JsonConvert.SerializeObject(obj, _serializerSettings);
}
var tmp = new ObjectWrapper
{
type = obj.GetType().AssemblyQualifiedName,
data = JsonConvert.SerializeObject(obj, _serializerSettings)
};
return JsonConvert.SerializeObject(tmp, typeof(ObjectWrapper), _serializerSettings);
}
internal static object DeserializeObject(string obj)
{
if (obj == "null") return null;
var ow = JsonConvert.DeserializeObject<ObjectWrapper>(obj, _serializerSettings);
return JsonConvert.DeserializeObject(ow.data, Type.GetType(ow.type), _serializerSettings);
}
}
}

Просмотреть файл

@ -0,0 +1,139 @@
using System;
using System.IO;
namespace CRA.ClientLibrary
{
/// <summary>
/// Stream communication primitives
/// </summary>
public static class StreamCommunicator
{
/// <summary>
/// Read integer fixed size
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
public static int ReadIntegerFixed(this Stream stream)
{
var value = new byte[4];
stream.ReadAllRequiredBytes(value, 0, value.Length);
int intValue = value[0]
| (int)value[1] << 0x8
| (int)value[2] << 0x10
| (int)value[3] << 0x18;
return intValue;
}
/// <summary>
/// Write integer fixed size
/// </summary>
/// <param name="stream"></param>
/// <param name="value"></param>
public static void WriteIntegerFixed(this Stream stream, int value)
{
stream.WriteByte((byte)(value & 0xFF));
stream.WriteByte((byte)((value >> 0x8) & 0xFF));
stream.WriteByte((byte)((value >> 0x10) & 0xFF));
stream.WriteByte((byte)((value >> 0x18) & 0xFF));
}
/// <summary>
/// Read integer compressed
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
public static int ReadInteger(this Stream stream)
{
var currentByte = (uint)stream.ReadByte();
byte read = 1;
uint result = currentByte & 0x7FU;
int shift = 7;
while ((currentByte & 0x80) != 0)
{
currentByte = (uint)stream.ReadByte();
read++;
result |= (currentByte & 0x7FU) << shift;
shift += 7;
if (read > 5)
{
throw new InvalidOperationException("Invalid integer value in the input stream.");
}
}
return (int)((-(result & 1)) ^ ((result >> 1) & 0x7FFFFFFFU));
}
/// <summary>
/// Write integer compressed
/// </summary>
/// <param name="stream"></param>
/// <param name="value"></param>
public static void WriteInteger(this Stream stream, int value)
{
var zigZagEncoded = unchecked((uint)((value << 1) ^ (value >> 31)));
while ((zigZagEncoded & ~0x7F) != 0)
{
stream.WriteByte((byte)((zigZagEncoded | 0x80) & 0xFF));
zigZagEncoded >>= 7;
}
stream.WriteByte((byte)zigZagEncoded);
}
/// <summary>
/// Write byte array
/// </summary>
/// <param name="stream"></param>
/// <param name="value"></param>
public static void WriteByteArray(this Stream stream, byte[] value)
{
if (value == null)
{
throw new ArgumentNullException("value");
}
stream.WriteInteger(value.Length);
if (value.Length > 0)
{
stream.Write(value, 0, value.Length);
}
}
/// <summary>
/// Read byte array
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
public static byte[] ReadByteArray(this Stream stream)
{
int arraySize = stream.ReadInteger();
var array = new byte[arraySize];
if (arraySize > 0)
{
stream.ReadAllRequiredBytes(array, 0, array.Length);
}
return array;
}
/// <summary>
/// Read all required bytes
/// </summary>
/// <param name="stream"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <returns></returns>
internal static int ReadAllRequiredBytes(this Stream stream, byte[] buffer, int offset, int count)
{
int toRead = count;
int currentOffset = offset;
int currentRead;
do
{
currentRead = stream.Read(buffer, currentOffset, toRead);
currentOffset += currentRead;
toRead -= currentRead;
}
while (toRead > 0 && currentRead != 0);
return currentOffset - offset;
}
}
}

Просмотреть файл

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-10.0.0.0" newVersion="10.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.KeyVault.Core" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-2.0.0.0" newVersion="2.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
<startup><supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" /></startup></configuration>

Просмотреть файл

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="aqua-core" version="3.0.0" targetFramework="net452" />
<package id="Microsoft.Azure.KeyVault.Core" version="1.0.0" targetFramework="net452" />
<package id="Microsoft.Data.Edm" version="5.8.2" targetFramework="net452" />
<package id="Microsoft.Data.OData" version="5.8.2" targetFramework="net452" />
<package id="Microsoft.Data.Services.Client" version="5.8.2" targetFramework="net452" />
<package id="Newtonsoft.Json" version="10.0.2" targetFramework="net452" />
<package id="Remote.Linq" version="5.3.1" targetFramework="net452" />
<package id="System.ComponentModel.EventBasedAsync" version="4.0.11" targetFramework="net452" />
<package id="System.Dynamic.Runtime" version="4.0.0" targetFramework="net452" />
<package id="System.Linq.Queryable" version="4.0.0" targetFramework="net452" />
<package id="System.Net.Requests" version="4.0.11" targetFramework="net452" />
<package id="System.Spatial" version="5.8.2" targetFramework="net452" />
<package id="WindowsAzure.Storage" version="8.1.1" targetFramework="net452" />
</packages>

33
src/CRA.Worker/App.config Normal file
Просмотреть файл

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<appSettings file="privatesettings.config">
<!--
Enter your Azure storage connection string as below, into your own privatesettings.config (this file is added to .gitignore for privacy reasons)
See privatesettings.config.example for the template
A post-build action copies this file to the output directory
<appSettings>
<add key="StorageConnectionString" value=""/>
</appSettings>
-->
</appSettings>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
<system.diagnostics>
<trace autoflush="false" indentsize="4">
<listeners>
<add name="configConsoleListener" type="System.Diagnostics.ConsoleTraceListener" />
</listeners>
</trace>
</system.diagnostics>
<runtime>
<gcServer enabled="true" />
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-10.0.0.0" newVersion="10.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{85331F0D-B724-45D7-9BB2-AC703E73DE3B}</ProjectGuid>
<OutputType>Exe</OutputType>
<RootNamespace>CRA.Worker</RootNamespace>
<AssemblyName>CRA.Worker</AssemblyName>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Configuration" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config">
<SubType>Designer</SubType>
</None>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CRA.ClientLibrary\CRA.ClientLibrary.csproj">
<Project>{ef23eb6a-e329-496d-9b7a-8cad66ea4e3a}</Project>
<Name>CRA.ClientLibrary</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<PropertyGroup>
<PostBuildEvent>IF EXIST $(ProjectDir)privatesettings.config copy $(ProjectDir)privatesettings.config $(ProjectDir)$(OutDir)</PostBuildEvent>
</PropertyGroup>
</Project>

52
src/CRA.Worker/Program.cs Normal file
Просмотреть файл

@ -0,0 +1,52 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Configuration;
using System.Diagnostics;
using CRA.ClientLibrary;
namespace CRA.Worker
{
class Program
{
static void Main(string[] args)
{
//Console.WriteLine("Press ENTER to start");
//Console.ReadLine();
if (args.Length != 3)
{
Console.WriteLine("Worker for Common Runtime for Applications (CRA)\nUsage: CRA.Worker.exe instancename (e.g., instance1) ipaddress (e.g., 127.0.0.1) port (e.g., 11000)");
return;
}
if (args[1] == "null")
{
args[1] = GetLocalIPAddress();
}
Debug.WriteLine("Worker instance name is: " + args[0]);
Debug.WriteLine("Using IP address: " + args[1] + " and port " + Convert.ToInt32(args[2]));
Debug.WriteLine("Using Azure connection string: " + ConfigurationManager.AppSettings.Get("StorageConnectionString"));
var worker = new CRAWorker
(args[0], args[1], Convert.ToInt32(args[2]),
ConfigurationManager.AppSettings.Get("StorageConnectionString"));
worker.Start();
}
private static string GetLocalIPAddress()
{
var host = Dns.GetHostEntry(Dns.GetHostName());
foreach (var ip in host.AddressList)
{
if (ip.AddressFamily == AddressFamily.InterNetwork)
{
return ip.ToString();
}
}
throw new Exception("Local IP Address Not Found!");
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("CRA.Worker")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("CRA.Worker")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("85331f0d-b724-45d7-9bb2-ac703e73de3b")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,3 @@
<appSettings>
<add key="StorageConnectionString" value="your key here" />
</appSettings>

43
src/CRA.sln Normal file
Просмотреть файл

@ -0,0 +1,43 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26403.7
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CRA.ClientLibrary", "CRA.ClientLibrary\CRA.ClientLibrary.csproj", "{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CRA.Worker", "CRA.Worker\CRA.Worker.csproj", "{85331F0D-B724-45D7-9BB2-AC703E73DE3B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConnectionPair", "Samples\ConnectionPair\ConnectionPair.csproj", "{00F97B37-965A-4F20-B5B2-E182D20509C2}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{EC027445-85E2-4FD3-83F3-224CA6481849}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{AF66150E-52D6-402F-BA4D-D3FDDA71C73F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A}.Release|Any CPU.Build.0 = Release|Any CPU
{85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{85331F0D-B724-45D7-9BB2-AC703E73DE3B}.Release|Any CPU.Build.0 = Release|Any CPU
{00F97B37-965A-4F20-B5B2-E182D20509C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{00F97B37-965A-4F20-B5B2-E182D20509C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{00F97B37-965A-4F20-B5B2-E182D20509C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{00F97B37-965A-4F20-B5B2-E182D20509C2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{EF23EB6A-E329-496D-9B7A-8CAD66EA4E3A} = {AF66150E-52D6-402F-BA4D-D3FDDA71C73F}
{85331F0D-B724-45D7-9BB2-AC703E73DE3B} = {AF66150E-52D6-402F-BA4D-D3FDDA71C73F}
{00F97B37-965A-4F20-B5B2-E182D20509C2} = {EC027445-85E2-4FD3-83F3-224CA6481849}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<appSettings file="privatesettings.config">
<!--
Enter your Azure storage connection string as below, into your own privatesettings.config (this file is added to .gitignore for privacy reasons)
See privatesettings.config.example for the template
A post-build action copies this file to the output directory
<appSettings>
<add key="StorageConnectionString" value=""/>
</appSettings>
-->
</appSettings>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-10.0.0.0" newVersion="10.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{00F97B37-965A-4F20-B5B2-E182D20509C2}</ProjectGuid>
<OutputType>Exe</OutputType>
<RootNamespace>ConnectionPair</RootNamespace>
<AssemblyName>ConnectionPair</AssemblyName>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="MyAsyncInput.cs" />
<Compile Include="MyAsyncOutput.cs" />
<Compile Include="MyFirstProcess.cs" />
<Compile Include="MySecondProcess.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\CRA.ClientLibrary\CRA.ClientLibrary.csproj">
<Project>{ef23eb6a-e329-496d-9b7a-8cad66ea4e3a}</Project>
<Name>CRA.ClientLibrary</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<PropertyGroup>
<PostBuildEvent>IF EXIST $(ProjectDir)privatesettings.config copy $(ProjectDir)privatesettings.config $(ProjectDir)$(OutDir)</PostBuildEvent>
</PropertyGroup>
</Project>

Просмотреть файл

@ -0,0 +1,45 @@
using System;
using System.Threading.Tasks;
using System.IO;
using System.Threading;
using CRA.ClientLibrary;
namespace ConnectionPair
{
public class MyAsyncInput : IAsyncProcessInputEndpoint
{
bool _running = true;
IProcess _process;
public MyAsyncInput(IProcess process)
{
_process = process;
}
public void Dispose()
{
Console.WriteLine("Disposing MyInput");
_running = false;
}
public async Task FromOutputAsync(IProcessOutputEndpoint p, CancellationToken token)
{
throw new NotImplementedException();
}
public async Task FromStreamAsync(Stream s, string otherProcess, string otherEndpoint, CancellationToken token)
{
Console.WriteLine("Receiving data from process: " + otherProcess + ", endpoint: " + otherEndpoint);
for (int i = 0; i < int.MaxValue; i++)
{
int val = s.ReadInteger();
Console.WriteLine("Read value: " + val);
token.ThrowIfCancellationRequested();
if (!_running) break;
}
}
}
}

Просмотреть файл

@ -0,0 +1,54 @@
using System;
using System.Threading.Tasks;
using System.IO;
using System.Threading;
using CRA.ClientLibrary;
namespace ConnectionPair
{
public class MyAsyncOutput : IAsyncProcessOutputEndpoint
{
bool _running = true;
IProcess _process;
public MyAsyncOutput(IProcess process)
{
_process = process;
}
public void Dispose()
{
Console.WriteLine("Disposing MyOutput");
_running = false;
}
public async Task ToInputAsync(IProcessInputEndpoint p, CancellationToken token)
{
throw new NotImplementedException();
}
public async Task ToStreamAsync(Stream stream, string otherProcess, string otherEndpoint, CancellationToken token)
{
Console.WriteLine("Sending data to process: " + otherProcess + ", endpoint: " + otherEndpoint);
for (int i = 0; i < int.MaxValue; i += 1)
{
for (int j = 0; j < 1; j++)
{
stream.WriteInteger(i + j);
Console.WriteLine("Written value: " + (i + j));
}
for (int j = 0; j < 1; j++)
{
Thread.Sleep(1000);
token.ThrowIfCancellationRequested();
}
if (!_running) break;
}
}
}
}

Просмотреть файл

@ -0,0 +1,18 @@
using CRA.ClientLibrary;
namespace ConnectionPair
{
public class MyFirstProcess : ProcessBase
{
public MyFirstProcess() : base()
{
}
public override void Initialize(object param)
{
AddAsyncOutputEndpoint("firstoutput", new MyAsyncOutput(this));
AddAsyncInputEndpoint("firstinput", new MyAsyncInput(this));
base.Initialize(param);
}
}
}

Просмотреть файл

@ -0,0 +1,18 @@
using CRA.ClientLibrary;
namespace ConnectionPair
{
public class MySecondProcess : ProcessBase
{
public MySecondProcess() : base()
{
}
public override void Initialize(object param)
{
AddAsyncInputEndpoint("secondinput", new MyAsyncInput(this));
AddAsyncOutputEndpoint("secondoutput", new MyAsyncOutput(this));
base.Initialize(param);
}
}
}

Просмотреть файл

@ -0,0 +1,24 @@
using System;
using CRA.ClientLibrary;
namespace ConnectionPair
{
class Program
{
static void Main(string[] args)
{
var client = new CRAClientLibrary();
client.DefineProcess("myfirstprocess", () => new MyFirstProcess());
client.DefineProcess("mysecondprocess", () => new MySecondProcess());
client.InstantiateProcess("instance1", "myprocess1", "myfirstprocess", null);
client.InstantiateProcess("instance2", "myprocess2", "mysecondprocess", null);
client.Connect("myprocess1", "firstoutput", "myprocess2", "secondinput");
client.Connect("myprocess2", "secondoutput", "myprocess1", "firstinput");
Console.ReadLine();
}
}
}

Просмотреть файл

@ -0,0 +1,35 @@
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("ConnectionPair")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("ConnectionPair")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("00f97b37-965a-4f20-b5b2-e182d20509c2")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,3 @@
<appSettings>
<add key="StorageConnectionString" value="your key here" />
</appSettings>