Remove Simple In-Memory transport

The Simple In-Memory transport isn't quite ready to be released. Since
we'd like to get the first preview release, we're removing Simple
In-Memory until it has had a change to be refactored into a releasable
state.
This commit is contained in:
Christopher Warrington 2016-05-30 18:17:04 -07:00
Родитель 0b37d6c546
Коммит bb402f3128
23 изменённых файлов: 1 добавлений и 1918 удалений

Просмотреть файл

@ -196,8 +196,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "pingpong", "..\examples\cs\
{92915BD9-8AB1-4E4D-A2AC-95BBF4F82D89} = {92915BD9-8AB1-4E4D-A2AC-95BBF4F82D89}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "simpleinmem-transport", "src\comm\simpleinmem-transport\simpleinmem-transport.csproj", "{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "logging", "..\examples\cs\comm\logging\logging.csproj", "{5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "service", "src\comm\service\service.csproj", "{79D2423A-87C8-44A2-89C2-2FA94521747E}"
@ -737,24 +735,6 @@ Global
{92C5B98B-12A4-4995-90B8-A19E96524464}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{92C5B98B-12A4-4995-90B8-A19E96524464}.Release|Win32.ActiveCfg = Release|Any CPU
{92C5B98B-12A4-4995-90B8-A19E96524464}.Release|Win32.Build.0 = Release|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Win32.ActiveCfg = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Win32.Build.0 = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Any CPU.ActiveCfg = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Any CPU.Build.0 = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Mixed Platforms.ActiveCfg = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Mixed Platforms.Build.0 = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Win32.ActiveCfg = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Win32.Build.0 = Debug|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Any CPU.Build.0 = Release|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Win32.ActiveCfg = Release|Any CPU
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Win32.Build.0 = Release|Any CPU
{5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
@ -879,7 +859,6 @@ Global
{F13CB9C0-DB52-45DB-9BFD-05CB26512FC6} = {882B4409-F7A4-496A-AC2B-3E46A448D5D8}
{C687C52C-0A5B-4F10-8CB3-DBAF9A72D042} = {ED161076-6BB8-4A13-83ED-0E9C01461D5E}
{92C5B98B-12A4-4995-90B8-A19E96524464} = {621A2166-EEE0-4A27-88AA-5BE5AC996452}
{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24} = {ED161076-6BB8-4A13-83ED-0E9C01461D5E}
{5C8132A8-C4B1-45E0-BCA6-379DA23B86D3} = {621A2166-EEE0-4A27-88AA-5BE5AC996452}
{79D2423A-87C8-44A2-89C2-2FA94521747E} = {ED161076-6BB8-4A13-83ED-0E9C01461D5E}
{12279366-F646-4FEC-8CAA-B62A8EC477BB} = {621A2166-EEE0-4A27-88AA-5BE5AC996452}

Просмотреть файл

@ -1,85 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem.Processor
{
using Service;
using System;
using System.Threading.Tasks;
using Bond.Comm.Layers;
internal class EventProcessor : QueueProcessor
{
readonly SimpleInMemConnection connection;
readonly ServiceHost serviceHost;
readonly InMemFrameQueueCollection serverqueues;
internal EventProcessor(SimpleInMemConnection connection, ServiceHost host, InMemFrameQueueCollection queues)
{
if (connection == null) throw new ArgumentNullException(nameof(connection));
if (host == null) throw new ArgumentNullException(nameof(host));
if (queues == null) throw new ArgumentNullException(nameof(queues));
this.connection = connection;
serviceHost = host;
serverqueues = queues;
}
override internal void Process()
{
const PayloadType payloadType = PayloadType.Event;
foreach (Guid key in serverqueues.GetKeys())
{
InMemFrameQueue queue = serverqueues.GetQueue(key);
Task.Run(() => ProcessQueue(queue, payloadType));
}
}
private void ProcessQueue(InMemFrameQueue queue, PayloadType payloadType)
{
int queueSize = queue.Count(payloadType);
int batchIndex = 0;
if (queueSize == 0)
{
return;
}
while (batchIndex < PROCESSING_BATCH_SIZE && queueSize > 0)
{
var payload = queue.Dequeue(payloadType);
var headers = payload.headers;
var layerData = payload.layerData;
var message = payload.message;
DispatchEvent(headers, layerData, message);
queueSize = queue.Count(payloadType);
batchIndex++;
}
}
private void DispatchEvent(SimpleInMemHeaders headers, IBonded layerData, IMessage message)
{
var receiveContext = new SimpleInMemReceiveContext(connection);
Error layerError = LayerStackUtils.ProcessOnReceive(serviceHost.ParentTransport.LayerStack,
MessageType.Event, receiveContext, layerData);
if (layerError != null)
{
Log.Error("{0}.{1}: Receiving event {2}/{3} failed due to layer error (Code: {4}, Message: {5}).",
this, nameof(DispatchEvent), headers.conversation_id, headers.method_name,
layerError.error_code, layerError.message);
return;
}
Task.Run(async () =>
{
await serviceHost.DispatchEvent(
headers.method_name, receiveContext, message, connection.ConnectionMetrics);
});
}
}
}

Просмотреть файл

@ -1,202 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem.Processor
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
internal class InMemFrameQueue
{
private Queue<InMemFrame> requests;
private Queue<InMemFrame> responses;
private Queue<InMemFrame> events;
private object requestsLock = new object();
private object responsesLock = new object();
private object eventsLock = new object();
internal InMemFrameQueue()
{
requests = new Queue<InMemFrame>();
responses = new Queue<InMemFrame>();
events = new Queue<InMemFrame>();
}
internal void Clear()
{
lock (requestsLock)
{
Clear(requests);
}
lock (responsesLock)
{
Clear(responses);
}
lock (eventsLock)
{
Clear(events);
}
}
internal void Enqueue(InMemFrame frame)
{
Util.Validate(frame);
switch(frame.headers.payload_type)
{
case PayloadType.Request:
lock (requestsLock)
{
requests.Enqueue(frame);
}
break;
case PayloadType.Response:
lock (responsesLock)
{
responses.Enqueue(frame);
}
break;
case PayloadType.Event:
lock (events)
{
events.Enqueue(frame);
}
break;
default:
var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Payload type {2} not supported!",
nameof(InMemFrameQueue), nameof(Enqueue), frame.headers.payload_type);
throw new NotImplementedException(message);
}
}
internal InMemFrame Dequeue(PayloadType payloadType)
{
InMemFrame frame;
switch (payloadType)
{
case PayloadType.Request:
lock (requestsLock)
{
frame = requests.Dequeue();
}
break;
case PayloadType.Response:
lock (responsesLock)
{
frame = responses.Dequeue();
}
break;
case PayloadType.Event:
lock (eventsLock)
{
frame = events.Dequeue();
}
break;
default:
var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Payload type {2} not supported!",
nameof(InMemFrameQueue), nameof(Dequeue), payloadType);
throw new NotImplementedException(message);
}
return frame;
}
internal int Count(PayloadType payloadType)
{
int count;
switch(payloadType)
{
case PayloadType.Request:
count = requests.Count;
break;
case PayloadType.Response:
count = responses.Count;
break;
case PayloadType.Event:
count = events.Count;
break;
default:
var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Payload type {2} not supported!",
nameof(InMemFrameQueue), nameof(Count), payloadType);
throw new NotImplementedException(message);
}
return count;
}
private void Clear(Queue<InMemFrame> queue)
{
foreach (InMemFrame frame in queue)
{
frame.outstandingRequest.SetCanceled();
}
queue.Clear();
}
}
internal class InMemFrameQueueCollection
{
private ConcurrentDictionary<Guid, InMemFrameQueue> reqresqueue;
internal InMemFrameQueueCollection()
{
reqresqueue = new ConcurrentDictionary<Guid, InMemFrameQueue>();
}
internal void Add(Guid id, InMemFrameQueue queue)
{
if (!reqresqueue.TryAdd(id, queue))
{
var message = LogUtil.FatalAndReturnFormatted(
"{0}.{1}: Guid collison must never happen for client connection Ids: {2}",
nameof(InMemFrameQueueCollection), nameof(Add), id);
throw new InvalidOperationException(message);
}
}
internal ICollection<Guid> GetKeys()
{
return reqresqueue.Keys;
}
internal InMemFrameQueue GetQueue(Guid id)
{
InMemFrameQueue queue;
reqresqueue.TryGetValue(id, out queue);
return queue;
}
internal void ClearAll()
{
ICollection<Guid> keys = GetKeys();
foreach (Guid key in keys)
{
GetQueue(key).Clear();
}
reqresqueue.Clear();
}
}
internal class InMemFrame
{
internal SimpleInMemHeaders headers;
internal IBonded layerData;
internal IMessage message;
internal TaskCompletionSource<IMessage> outstandingRequest;
}
}

Просмотреть файл

@ -1,44 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem.Processor
{
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
internal abstract class QueueProcessor
{
protected const int PROCESSING_BATCH_SIZE = 1000;
private readonly Random randomDelay = new Random(DateTime.UtcNow.Millisecond);
private readonly object processInput = new object();
public void ProcessAsync(CancellationToken t)
{
NewLongRunningTask(o => Process(), t).Post(processInput);
}
internal abstract void Process();
private ITargetBlock<object> NewLongRunningTask(Action<object> process, CancellationToken t)
{
ActionBlock<object> actionBlock = null;
actionBlock = new ActionBlock<object>(async o => {
process(o);
// Delay between 1 to 10 milliseconds before posting itself.
// Need it for task context switching for long running, CPU intensive tasks.
await Task.Delay(TimeSpan.FromMilliseconds(randomDelay.Next(1, 11)), t);
actionBlock.Post(o);
},
new ExecutionDataflowBlockOptions
{
CancellationToken = t
});
return actionBlock;
}
}
}

Просмотреть файл

@ -1,107 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem.Processor
{
using System;
using System.Threading.Tasks;
using Bond.Comm.Layers;
using Bond.Comm.Service;
internal class RequestProcessor : QueueProcessor
{
readonly SimpleInMemConnection connection;
readonly ServiceHost serviceHost;
readonly InMemFrameQueueCollection serverqueues;
internal RequestProcessor(SimpleInMemConnection connection, ServiceHost host, InMemFrameQueueCollection queues)
{
if (connection == null) throw new ArgumentNullException(nameof(connection));
if (host == null) throw new ArgumentNullException(nameof(host));
if (queues == null) throw new ArgumentNullException(nameof(queues));
this.connection = connection;
serviceHost = host;
serverqueues = queues;
}
override internal void Process()
{
const PayloadType payloadType = PayloadType.Request;
foreach (Guid key in serverqueues.GetKeys())
{
InMemFrameQueue queue = serverqueues.GetQueue(key);
Task.Run(() => ProcessQueue(queue, payloadType));
}
}
private void ProcessQueue(InMemFrameQueue queue, PayloadType payloadType)
{
int queueSize = queue.Count(payloadType);
int batchIndex = 0;
if (queueSize == 0)
{
return;
}
while (batchIndex < PROCESSING_BATCH_SIZE && queueSize > 0)
{
var payload = queue.Dequeue(payloadType);
var headers = payload.headers;
var layerData = payload.layerData;
var message = payload.message;
var taskSource = payload.outstandingRequest;
Task.Run(() => DispatchRequest(headers, layerData, message, queue, taskSource));
queueSize = queue.Count(payloadType);
batchIndex++;
}
}
private async void DispatchRequest(SimpleInMemHeaders headers, IBonded layerData, IMessage message, InMemFrameQueue queue,
TaskCompletionSource<IMessage> taskSource)
{
var receiveContext = new SimpleInMemReceiveContext(connection);
Error layerError = LayerStackUtils.ProcessOnReceive(this.serviceHost.ParentTransport.LayerStack,
MessageType.Request, receiveContext, layerData);
IMessage response;
if (layerError == null)
{
response = await serviceHost.DispatchRequest(headers.method_name, receiveContext, message, connection.ConnectionMetrics);
}
else
{
Log.Error("{0}.{1}: Receiving request {2}/{3} failed due to layer error (Code: {4}, Message: {5}).",
this, nameof(DispatchRequest), headers.conversation_id, headers.method_name,
layerError.error_code, layerError.message);
response = Message.FromError(layerError);
}
SendReply(headers.conversation_id, response, queue, taskSource);
}
internal void SendReply(ulong conversationId, IMessage response, InMemFrameQueue queue,
TaskCompletionSource<IMessage> taskSource)
{
var sendContext = new SimpleInMemSendContext(connection);
IBonded layerData;
Error layerError = LayerStackUtils.ProcessOnSend(this.serviceHost.ParentTransport.LayerStack,
MessageType.Response, sendContext, out layerData);
// If there was a layer error, replace the response with the layer error
if (layerError != null)
{
Log.Error("{0}.{1}: Sending reply for request ID {2} failed due to layer error (Code: {3}, Message: {4}).",
this, nameof(SendReply), conversationId, layerError.error_code, layerError.message);
response = Message.FromError(layerError);
}
var payload = Util.NewPayLoad(conversationId, PayloadType.Response, layerData, response, taskSource);
queue.Enqueue(payload);
}
}
}

Просмотреть файл

@ -1,70 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem.Processor
{
using System;
using System.Threading.Tasks;
using Bond.Comm.Layers;
internal class ResponseProcessor : QueueProcessor
{
readonly SimpleInMemConnection connection;
readonly Transport parentTransport;
readonly InMemFrameQueue clientreqresqueue;
internal ResponseProcessor(SimpleInMemConnection connection, Transport parentTransport, InMemFrameQueue queue)
{
if (queue == null) throw new ArgumentNullException(nameof(queue));
if (connection == null) throw new ArgumentNullException(nameof(connection));
this.connection = connection;
this.parentTransport = parentTransport;
clientreqresqueue = queue;
}
override internal void Process()
{
const PayloadType payloadType = PayloadType.Response;
int queueSize = clientreqresqueue.Count(payloadType);
int batchIndex = 0;
if (queueSize == 0)
{
return;
}
while (batchIndex < PROCESSING_BATCH_SIZE && queueSize > 0)
{
var frame = clientreqresqueue.Dequeue(payloadType);
var headers = frame.headers;
var layerData = frame.layerData;
var message = frame.message;
var taskSource = frame.outstandingRequest;
DispatchResponse(headers, layerData, message, taskSource);
queueSize = clientreqresqueue.Count(payloadType);
batchIndex++;
}
}
private void DispatchResponse(SimpleInMemHeaders headers, IBonded layerData, IMessage message,
TaskCompletionSource<IMessage> responseCompletionSource)
{
var receiveContext = new SimpleInMemReceiveContext(connection);
Error layerError = LayerStackUtils.ProcessOnReceive(parentTransport.LayerStack,
MessageType.Response, receiveContext, layerData);
if (layerError != null)
{
Log.Error("{0}.{1}: Receiving response {2}/{3} failed due to layer error (Code: {4}, Message: {5}).",
this, nameof(DispatchResponse), headers.conversation_id, headers.method_name,
layerError.error_code, layerError.message);
message = Message.FromError(layerError);
}
responseCompletionSource.SetResult(message);
}
}
}

Просмотреть файл

@ -1,49 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem.Processor
{
using System.Threading.Tasks;
internal static class Util
{
internal static InMemFrame NewPayLoad(ulong conversationId, PayloadType payloadType, IBonded layerData,
IMessage message, TaskCompletionSource<IMessage> taskSource)
{
var headers = new SimpleInMemHeaders
{
conversation_id = conversationId,
payload_type = payloadType
};
var payload = new InMemFrame
{
headers = headers,
layerData = layerData,
message = message,
outstandingRequest = taskSource
};
Validate(payload);
return payload;
}
internal static InMemFrame Validate(InMemFrame frame)
{
if (frame == null)
{
throw new SimpleInMemProtocolErrorException($"null {nameof(frame)}");
}
else if (frame.headers == null)
{
throw new SimpleInMemProtocolErrorException($"null {nameof(frame.headers)} in frame");
}
else if (frame.message == null)
{
throw new SimpleInMemProtocolErrorException($"null {nameof(frame.message)} in frame");
}
return frame;
}
}
}

Просмотреть файл

@ -1,308 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem
{
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Bond.Comm.Layers;
using Bond.Comm.Service;
using Bond.Comm.SimpleInMem.Processor;
public enum ConnectionType
{
Client,
Server
}
[Flags]
public enum CnxState
{
Created = 0x01,
Connected = 0x02,
SendProtocolError = 0x04,
Disconnecting = 0x08,
Disconnected = 0x10,
}
public class SimpleInMemConnection : Connection, IRequestResponseConnection, IEventConnection
{
private readonly Guid connectionId;
private readonly ConnectionType connectionType;
private readonly ServiceHost serviceHost;
private readonly SimpleInMemListener parentListener;
private InMemFrameQueue communicationQueue;
private InMemFrameQueueCollection serverqueues;
private object connectionsLock = new object();
private long prevConversationId;
private CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
private CnxState state;
private object stateLock = new object();
private HashSet<SimpleInMemConnection> clientConnections;
public ConnectionMetrics ConnectionMetrics { get; } = new ConnectionMetrics();
public SimpleInMemConnection(SimpleInMemTransport parentTransport, SimpleInMemListener parentListener, ConnectionType connectionType) :
this (new ServiceHost(parentTransport), parentListener, connectionType)
{
}
internal SimpleInMemConnection(ServiceHost serviceHost, SimpleInMemListener parentListener, ConnectionType connectionType)
{
if (serviceHost == null) throw new ArgumentNullException(nameof(serviceHost));
if (parentListener == null) throw new ArgumentNullException(nameof(parentListener));
connectionId = Guid.NewGuid();
this.connectionType = connectionType;
this.serviceHost = serviceHost;
this.parentListener = parentListener;
switch(connectionType)
{
case ConnectionType.Client:
communicationQueue = new InMemFrameQueue();
break;
case ConnectionType.Server:
serverqueues = new InMemFrameQueueCollection();
clientConnections = new HashSet<SimpleInMemConnection>();
break;
default:
throw new NotSupportedException(nameof(connectionType));
}
// start at -1 or 0 so the first conversation ID is 1 or 2.
prevConversationId = (connectionType == ConnectionType.Client) ? -1 : 0;
state = CnxState.Created;
ConnectionMetrics.connection_id = connectionId.ToString();
}
public CnxState State
{
get
{
return state;
}
}
public ConnectionType ConnectionType
{
get
{
return connectionType;
}
}
public Guid Id
{
get
{
return connectionId;
}
}
public override string ToString()
{
return $"{nameof(SimpleInMemConnection)}({connectionId})";
}
public override Task StopAsync()
{
bool connected = false;
lock (stateLock)
{
if (connected = ((state & CnxState.Connected) != 0))
{
state = CnxState.Disconnecting;
cancelTokenSource.Cancel();
}
}
if (connected)
{
OnDisconnect();
lock (stateLock)
{
state = CnxState.Disconnected;
}
}
return TaskExt.CompletedTask;
}
public async Task<IMessage<TResponse>> RequestResponseAsync<TRequest, TResponse>(string methodName, IMessage<TRequest> message, CancellationToken ct)
{
EnsureCorrectState(CnxState.Connected);
IMessage response = await SendRequestAsync(methodName, message);
return response.Convert<TResponse>();
}
public Task FireEventAsync<TPayload>(string methodName, IMessage<TPayload> message)
{
EnsureCorrectState(CnxState.Connected);
SendEventAsync(methodName, message);
return TaskExt.CompletedTask;
}
internal InMemFrameQueue CommunicationQueue
{
get
{
return communicationQueue;
}
}
internal void Start()
{
lock (stateLock)
{
EnsureCorrectState(CnxState.Created | CnxState.Disconnected);
if (connectionType == ConnectionType.Client)
{
var responseProcessor = new ResponseProcessor(this, serviceHost.ParentTransport, communicationQueue);
Task.Run(() => responseProcessor.ProcessAsync(cancelTokenSource.Token));
}
else if (connectionType == ConnectionType.Server)
{
var requestProcessor = new RequestProcessor(this, serviceHost, serverqueues);
var eventProcessor = new EventProcessor(this, serviceHost, serverqueues);
Task.Run(() => requestProcessor.ProcessAsync(cancelTokenSource.Token));
Task.Run(() => eventProcessor.ProcessAsync(cancelTokenSource.Token));
}
else
{
var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Connection type {2} not implemented.",
this, nameof(Start), connectionType);
throw new NotImplementedException(message);
}
state = CnxState.Connected;
}
}
internal void AddClientConnection(SimpleInMemConnection connection)
{
if (connectionType == ConnectionType.Client)
{
var message = LogUtil.FatalAndReturnFormatted(
"{0}.{1}: Client connection does not support adding new request response queue.",
this, nameof(AddClientConnection));
throw new NotSupportedException(message);
}
serverqueues.Add(connection.Id, connection.CommunicationQueue);
lock (connectionsLock)
{
clientConnections.Add(connection);
}
}
private Task<IMessage> SendRequestAsync(string methodName, IMessage request)
{
var conversationId = AllocateNextConversationId();
var sendContext = new SimpleInMemSendContext(this);
IBonded layerData;
Error layerError = LayerStackUtils.ProcessOnSend(this.serviceHost.ParentTransport.LayerStack,
MessageType.Request, sendContext, out layerData);
if (layerError != null)
{
Log.Error("{0}.{1}: Sending request {2}/{3} failed due to layer error (Code: {4}, Message: {5}).",
this, nameof(SendRequestAsync), conversationId, methodName, layerError.error_code, layerError.message);
return Task.FromResult<IMessage>(Message.FromError(layerError));
}
var payload = Util.NewPayLoad(conversationId, PayloadType.Request, layerData, request, new TaskCompletionSource<IMessage>());
payload.headers.method_name = methodName;
communicationQueue.Enqueue(payload);
return payload.outstandingRequest.Task;
}
private void SendEventAsync(string methodName, IMessage message)
{
var conversationId = AllocateNextConversationId();
var sendContext = new SimpleInMemSendContext(this);
IBonded layerData;
Error layerError = LayerStackUtils.ProcessOnSend(this.serviceHost.ParentTransport.LayerStack,
MessageType.Event, sendContext, out layerData);
if (layerError != null)
{
Log.Error("{0}.{1}: Sending event {2}/{3} failed due to layer error (Code: {4}, Message: {5}).",
this, nameof(SendEventAsync), conversationId, methodName, layerError.error_code, layerError.message);
return;
}
var payload = Util.NewPayLoad(conversationId, PayloadType.Event, layerData, message, null);
payload.headers.method_name = methodName;
communicationQueue.Enqueue(payload);
}
private ulong AllocateNextConversationId()
{
// Interlocked.Add() handles overflow by wrapping, not throwing.
var newConversationId = Interlocked.Add(ref prevConversationId, 2);
if (newConversationId < 0)
{
throw new SimpleInMemProtocolErrorException("Exhausted conversation IDs");
}
return unchecked((ulong)newConversationId);
}
private void OnDisconnect()
{
Log.Debug("{0}.{1}: Shutting down.", this, nameof(OnDisconnect));
var args = new DisconnectedEventArgs(this, null);
parentListener.InvokeOnDisconnected(args);
if(connectionType == ConnectionType.Client)
{
DisconnectClient();
}
else
{
DisconnectServer();
}
}
private void DisconnectClient()
{
communicationQueue.Clear();
}
private void DisconnectServer()
{
serverqueues.ClearAll();
lock (connectionsLock)
{
foreach (SimpleInMemConnection connection in clientConnections)
{
connection.StopAsync();
}
clientConnections.Clear();
}
}
private void EnsureCorrectState(CnxState allowedStates, [CallerMemberName] string methodName = "<unknown>")
{
if ((state & allowedStates) == 0)
{
var message = $"Connection (${this}) is not in the correct state for the requested operation (${methodName}). Current state: ${state} Allowed states: ${allowedStates}";
throw new InvalidOperationException(message);
}
}
}
}

Просмотреть файл

@ -1,25 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem
{
public class SimpleInMemSendContext : SendContext
{
public SimpleInMemSendContext(SimpleInMemConnection connection)
{
Connection = connection;
}
public override Connection Connection { get; }
}
public class SimpleInMemReceiveContext : ReceiveContext
{
public SimpleInMemReceiveContext(SimpleInMemConnection connection)
{
Connection = connection;
}
public override Connection Connection { get; }
}
}

Просмотреть файл

@ -1,80 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem
{
using Bond.Comm.Service;
using System.Collections.Generic;
using System.Threading.Tasks;
public class SimpleInMemListener : Listener
{
private ServiceHost serviceHost;
private string address;
private SimpleInMemConnection connection;
private readonly string logname;
public SimpleInMemListener(SimpleInMemTransport parentTransport, string address)
{
this.address = address;
serviceHost = new ServiceHost(parentTransport);
connection = new SimpleInMemConnection(serviceHost, this, ConnectionType.Server);
logname = $"{nameof(SimpleInMemListener)}({address})";
}
public override bool IsRegistered(string serviceMethodName)
{
return serviceHost.IsRegistered(serviceMethodName);
}
public override void AddService<T>(T service)
{
Log.Information("{0}.{1}: Adding {2}.", logname, nameof(AddService), typeof(T).Name);
serviceHost.Register(service);
}
public override void RemoveService<T>(T service)
{
serviceHost.Deregister((IService)service);
}
public override Task StartAsync()
{
connection.Start();
return TaskExt.CompletedTask;
}
public override Task StopAsync()
{
return connection.StopAsync();
}
internal void AddClient(SimpleInMemConnection client)
{
var connectedEventArgs = new ConnectedEventArgs(client);
Error disconnectError = OnConnected(connectedEventArgs);
if (disconnectError != null)
{
Log.Information("{0}.{1}: Rejecting connection {2} because {3}:{4}.",
logname, nameof(AddClient), client.Id, disconnectError.error_code, disconnectError.message);
throw new SimpleInMemProtocolErrorException(
"Connection rejected",
details: disconnectError,
innerException: null);
}
connection.AddClientConnection(client);
}
internal Error InvokeOnConnected(ConnectedEventArgs args)
{
return OnConnected(args);
}
internal void InvokeOnDisconnected(DisconnectedEventArgs args)
{
OnDisconnected(args);
}
}
}

Просмотреть файл

@ -1,32 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem
{
using System;
public class SimpleInMemProtocolErrorException : TransportException
{
public SimpleInMemProtocolErrorException(string message) : base(message)
{
}
public SimpleInMemProtocolErrorException(string message, Exception innerException) : this(
message,
details: null,
innerException: innerException)
{
}
public SimpleInMemProtocolErrorException(
string message,
Error details,
Exception innerException)
: base(message, innerException)
{
Details = details;
}
public Error Details { get; }
}
}

Просмотреть файл

@ -1,18 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace bond.comm.simpleinmem;
enum PayloadType
{
Request = 1;
Response = 2;
Event = 3;
}
struct SimpleInMemHeaders
{
0: uint64 conversation_id;
1: required PayloadType payload_type;
2: string method_name;
}

Просмотреть файл

@ -1,122 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Bond.Comm.SimpleInMem
{
using Service;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class SimpleInMemTransportBuilder : TransportBuilder<SimpleInMemTransport>
{
public override SimpleInMemTransport Construct()
{
return new SimpleInMemTransport(LayerStack);
}
}
public class SimpleInMemTransport : Transport
{
object listenersLock = new object();
IDictionary<string, SimpleInMemListener> listeners = new Dictionary<string, SimpleInMemListener>();
readonly ILayerStack layerStack;
public SimpleInMemTransport(ILayerStack layerStack)
{
this.layerStack = layerStack;
}
public override ILayerStack LayerStack
{
get
{
return this.layerStack;
}
}
public async override Task<Connection> ConnectToAsync(string address, CancellationToken ct)
{
Log.Information("{0}.{1}: Connecting to {2}.",
nameof(SimpleInMemTransport), nameof(ConnectToAsync), address);
SimpleInMemListener listener;
lock (listenersLock)
{
if (!listeners.TryGetValue(address, out listener))
{
var errorFormat = "{0}.{1}: Listener not found for address: {2}";
var message = LogUtil.FatalAndReturnFormatted(errorFormat,
nameof(SimpleInMemTransport), nameof(ConnectToAsync), address);
throw new ArgumentException(message);
}
}
return await Task.Run<Connection>(() =>
{
var connection = new SimpleInMemConnection(this, (SimpleInMemListener)GetListener(address), ConnectionType.Client);
listener.AddClient(connection);
connection.Start();
return connection;
}, ct);
}
public override Listener MakeListener(string address)
{
SimpleInMemListener listener;
if (!listeners.TryGetValue(address, out listener))
{
lock (listenersLock)
{
if (!listeners.TryGetValue(address, out listener))
{
listener = new SimpleInMemListener(this, address);
listeners.Add(address, listener);
}
}
}
return listener;
}
public Listener GetListener(string address)
{
SimpleInMemListener listener;
listeners.TryGetValue(address, out listener);
return listener;
}
public override Task StopAsync()
{
lock (listenersLock)
{
foreach (SimpleInMemListener listener in listeners.Values)
{
listener.StopAsync();
}
listeners.Clear();
}
return TaskExt.CompletedTask;
}
public bool ListenerExists(string address)
{
return listeners.ContainsKey(address);
}
public SimpleInMemListener RemoveListener(string address)
{
SimpleInMemListener listener;
if (listeners.TryGetValue(address, out listener))
{
listeners.Remove(address);
}
return listener;
}
}
}

Просмотреть файл

@ -1,4 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Tpl.Dataflow" version="4.5.24" targetFramework="net45" />
</packages>

Просмотреть файл

@ -1,20 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Reflection;
using System.Resources;
using System.Runtime.CompilerServices;
[assembly: AssemblyTitle("Bond.Comm.SimpleInMem")]
[assembly: AssemblyCompany("Microsoft")]
[assembly: AssemblyProduct("Bond")]
[assembly: AssemblyCopyright("Copyright (C) Microsoft. All rights reserved.")]
[assembly: NeutralResourcesLanguage("en")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
#if (DELAY_SIGN)
[assembly: InternalsVisibleTo("Bond, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")]
#else
[assembly: InternalsVisibleTo("Bond, PublicKey=00240000048000009400000006020000002400005253413100040000010001000d504ac18b4b149d2f7b0059b482f9b6d44d39059e6a96ff0a2a52678b5cfd8567cc67254132cd2debb5b95f6a1206a15c6f8ddac137c6c3ef4995f28c359acaa683a90995c8f08df7ce0aaa8836d331a344a514c443f112f80bf2ebed40ccb32d7df63c09b0d7bef80aecdc23ec200a458d4f8bafbcdeb9bf5ba111fbbd4787")]
#endif

Просмотреть файл

@ -1,63 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath32)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath32)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<Import Project="$(MSBuildThisFileDirectory)\..\..\..\build\Common.Internal.props" />
<PropertyGroup>
<ProjectGuid>{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Bond.Comm.SimpleInMem</RootNamespace>
<AssemblyName>Bond.Comm.SimpleInMem</AssemblyName>
<BondRedistributable>true</BondRedistributable>
<DontBuildNet40>true</DontBuildNet40>
</PropertyGroup>
<ItemGroup>
<Compile Include="Processor\EventProcessor.cs" />
<Compile Include="Processor\QueueProcessor.cs" />
<Compile Include="Processor\RequestProcessor.cs" />
<Compile Include="Processor\ResponseProcessor.cs" />
<Compile Include="Processor\Util.cs" />
<Compile Include="Processor\InMemFrameQueue.cs" />
<Compile Include="properties\AssemblyInfo.cs" />
<Compile Include="SimpleInMemContexts.cs" />
<Compile Include="SimpleInMemConnection.cs" />
<Compile Include="SimpleInMemListener.cs" />
<Compile Include="SimpleInMemProtocolErrorException.cs" />
<Compile Include="SimpleInMemTransport.cs" />
<BondCodegen Include="SimpleInMemTransport.bond">
<Options>--namespace=bond.comm.simpleinmem=Bond.Comm.SimpleInMem</Options>
</BondCodegen>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\attributes\Attributes.csproj">
<Project>{92915bd9-8ab1-4e4d-a2ac-95bbf4f82d89}</Project>
<Name>Attributes</Name>
</ProjectReference>
<ProjectReference Include="..\..\core\Bond.csproj">
<Project>{43cbba9b-c4bc-4e64-8733-7b72562d2e91}</Project>
<Name>Bond</Name>
</ProjectReference>
<ProjectReference Include="..\interfaces\interfaces.csproj">
<Project>{45efb397-298a-4a32-a178-a2bdf8abbbd9}</Project>
<Name>interfaces</Name>
</ProjectReference>
<ProjectReference Include="..\layers\layers.csproj">
<Project>{5f6cbc77-8fb5-4644-bab5-f8e62792266e}</Project>
<Name>layers</Name>
</ProjectReference>
<ProjectReference Include="..\service\service.csproj">
<Project>{79d2423a-87c8-44a2-89c2-2fa94521747e}</Project>
<Name>service</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<Reference Include="System.Threading.Tasks.Dataflow, Version=4.5.24.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\..\..\packages\Microsoft.Tpl.Dataflow.4.5.24\lib\portable-net45+win8+wpa81\System.Threading.Tasks.Dataflow.dll</HintPath>
<Private>True</Private>
</Reference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildThisFileDirectory)\..\..\..\build\Common.Internal.targets" />
</Project>

Просмотреть файл

@ -1,17 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import "SimpleStruct.bond"
namespace unittest.simpleinmem
//
// Method declarations for Calculator Service
//
service Calculator
{
Output Add(PairedInput);
Output Subtract(PairedInput);
Output Multiply(PairedInput);
nothing Clear();
};

Просмотреть файл

@ -1,45 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace UnitTest.SimpleInMem
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Bond.Comm;
internal class CalculatorService : CalculatorServiceBase
{
private const UInt16 DelayMilliseconds = 30;
public static ManualResetEvent ClearCalledEvent { get; set; } = new ManualResetEvent(false);
public override async Task<IMessage<Output>> AddAsync(IMessage<PairedInput> request, CancellationToken ct)
{
PairedInput req = request.Convert<PairedInput>().Payload.Deserialize();
var res = new Output { Result = req.First + req.Second };
await Task.Delay(DelayMilliseconds);
return Message.FromPayload(res);
}
public override async Task<IMessage<Output>> SubtractAsync(IMessage<PairedInput> request, CancellationToken ct)
{
PairedInput req = request.Convert<PairedInput>().Payload.Deserialize();
var res = new Output { Result = req.First - req.Second };
await Task.Delay(DelayMilliseconds);
return Message.FromPayload(res);
}
public override Task<IMessage<Output>> MultiplyAsync(IMessage<PairedInput> request, CancellationToken ct)
{
throw new NotImplementedException();
}
public override void ClearAsync(IMessage<Bond.Void> param)
{
ClearCalledEvent.Set();
}
}
}

Просмотреть файл

@ -1,377 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace UnitTest.SimpleInMem
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Bond.Comm;
using Bond.Comm.Layers;
using Bond.Comm.SimpleInMem;
using NUnit.Framework;
using UnitTest.Comm;
using UnitTest.Layers;
[TestFixture]
public class SimpleInMemConnectionTest
{
private const string address = "SimpleInMemTakesAnyRandomConnectionString";
private TransportBuilder<SimpleInMemTransport> transportBuilder;
private SimpleInMemTransport transport;
private SimpleInMemListener listener;
private SimpleInMemConnection[] connections;
[SetUp]
public void Init()
{
transportBuilder = new SimpleInMemTransportBuilder();
}
public async Task DefaultSetup(IService service, int count)
{
transport = transportBuilder.Construct();
listener = (SimpleInMemListener)transport.MakeListener(address);
listener.AddService(service);
await listener.StartAsync();
connections = new SimpleInMemConnection[count];
for (int connectionIndex = 0; connectionIndex < count; connectionIndex++)
{
connections[connectionIndex] = (SimpleInMemConnection)await transport.ConnectToAsync(address, System.Threading.CancellationToken.None);
}
}
[Test]
public async void ConnectionStateCycle()
{
await DefaultSetup(new CalculatorService(), 1);
SimpleInMemConnection localConnection =
(SimpleInMemConnection)await transport.ConnectToAsync(address, System.Threading.CancellationToken.None);
Assert.AreEqual(localConnection.State, CnxState.Connected);
await localConnection.StopAsync();
Assert.AreEqual(localConnection.State, CnxState.Disconnected);
}
[Test]
public async void ConnectionStateCycle_CloseAlreadyClosedConnection()
{
await DefaultSetup(new CalculatorService(), 1);
SimpleInMemConnection localConnection =
(SimpleInMemConnection)await transport.ConnectToAsync(address, System.Threading.CancellationToken.None);
Assert.AreEqual(localConnection.State, CnxState.Connected);
// Ensure that closing an already closed connection is no-op
for (int index = 0; index < 5; index++)
{
await localConnection.StopAsync();
Assert.AreEqual(localConnection.State, CnxState.Disconnected);
}
}
[Test]
public async void ValidSetup()
{
await DefaultSetup(new CalculatorService(), 1);
Assert.AreEqual(connections[0].ConnectionType, ConnectionType.Client);
}
[Test]
public async Task MethodCall()
{
await DefaultSetup(new CalculatorService(), 1);
const int first = 91;
const int second = 23;
int addResult = first + second;
int subResult = first - second;
var calculatorProxy = new CalculatorProxy<SimpleInMemConnection>(connections[0]);
var input = new PairedInput
{
First = first,
Second = second
};
var request = new Message<PairedInput>(input);
IMessage<Output> addResponse = await calculatorProxy.AddAsync(request, System.Threading.CancellationToken.None);
IMessage<Output> subResponse = await calculatorProxy.SubtractAsync(request, System.Threading.CancellationToken.None);
Output addOutput = addResponse.Payload.Deserialize();
Output subOutput = subResponse.Payload.Deserialize();
Assert.AreEqual(addResult, addOutput.Result);
Assert.AreEqual(subResult, subOutput.Result);
}
[Test]
public async void EventCall()
{
await DefaultSetup(new CalculatorService(), 1);
var calculatorProxy = new CalculatorProxy<SimpleInMemConnection>(connections[0]);
calculatorProxy.ClearAsync();
bool wasSignaled = CalculatorService.ClearCalledEvent.WaitOne(30000);
Assert.IsTrue(wasSignaled, "Timed out waiting for event");
}
[Test]
public async Task MultipleClientConnectionsMethodCalls()
{
Stopwatch sw = Stopwatch.StartNew();
await DefaultSetup(new CalculatorService(), 10);
const int first = 91;
const int second = 23;
int expectedAddResult = first + second;
int expectedSubResult = first - second;
Task[] connectionTasks = new Task[connections.Length];
for (int connectionIndex = 0; connectionIndex < connections.Length; connectionIndex++)
{
SimpleInMemConnection conn = connections[connectionIndex];
connectionTasks[connectionIndex] = Task.Run(() =>
{
int taskCount = 25;
Task[] tasks = new Task[taskCount];
for (int taskIndex = 0; taskIndex < taskCount; taskIndex++)
{
tasks[taskIndex] = Task.Run(async () =>
{
var calculatorProxy = new CalculatorProxy<SimpleInMemConnection>(conn);
var input = new PairedInput
{
First = first,
Second = second
};
var request = new Message<PairedInput>(input);
IMessage<Output> addResponse = await calculatorProxy.AddAsync(request, System.Threading.CancellationToken.None);
IMessage<Output> subResponse = await calculatorProxy.SubtractAsync(request, System.Threading.CancellationToken.None);
Output addOutput = addResponse.Payload.Deserialize();
Output subOutput = subResponse.Payload.Deserialize();
Assert.AreEqual(expectedAddResult, addOutput.Result);
Assert.AreEqual(expectedSubResult, subOutput.Result);
});
}
Task.WaitAll(tasks);
});
}
Task.WaitAll(connectionTasks);
sw.Stop();
Console.WriteLine($"{nameof(MultipleClientConnectionsMethodCalls)} test time: {sw.Elapsed.TotalSeconds}");
}
[Test]
public async Task MethodCall_WithServiceError()
{
await DefaultSetup(new CalculatorService(), 1);
const int first = 91;
const int second = 23;
var calculatorProxy = new CalculatorProxy<SimpleInMemConnection>(connections[0]);
var input = new PairedInput
{
First = first,
Second = second
};
var request = new Message<PairedInput>(input);
IMessage<Output> multiplyResponse = await calculatorProxy.MultiplyAsync(request, System.Threading.CancellationToken.None);
Assert.IsTrue(multiplyResponse.IsError);
InternalServerError error = multiplyResponse.Error.Deserialize<InternalServerError>();
Assert.AreEqual((int)ErrorCode.InternalServerError, error.error_code);
Assert.That(error.message, Is.StringContaining(Transport.InternalErrorMessage));
}
[Test]
public async Task MethodCall_WithMethodNotFound()
{
await DefaultSetup(new CalculatorService(), 1);
const int first = 91;
const int second = 23;
const string methodName = "Divide";
var input = new PairedInput
{
First = first,
Second = second
};
var request = new Message<PairedInput>(input);
IMessage<Output> divideResponse = await connections[0].RequestResponseAsync<PairedInput, Output>(methodName, request, new System.Threading.CancellationToken());
Assert.IsTrue(divideResponse.IsError);
Error error = divideResponse.Error.Deserialize<Error>();
Assert.AreEqual((int)ErrorCode.MethodNotFound, error.error_code);
Assert.That(error.message, Is.StringContaining($"Got request for unknown method [{methodName}]."));
}
[Test]
public async Task MethodCall_WithLayerStack()
{
var testList = new List<string>();
var layer1 = new TestLayer_Append("foo", testList);
var layer2 = new TestLayer_Append("bar", testList);
transportBuilder.SetLayerStack(new LayerStack<Dummy>(layer1, layer2));
await DefaultSetup(new CalculatorService(), 1);
const int first = 91;
const int second = 23;
int addResult = first + second;
var calculatorProxy = new CalculatorProxy<SimpleInMemConnection>(connections[0]);
var input = new PairedInput
{
First = first,
Second = second
};
var request = new Message<PairedInput>(input);
IMessage<Output> addResponse = await calculatorProxy.AddAsync(request, System.Threading.CancellationToken.None);
Output addOutput = addResponse.Payload.Deserialize();
Assert.AreEqual(addResult, addOutput.Result);
Assert.AreEqual(8, testList.Count);
Assert.AreEqual(layer1.value, testList[0]);
Assert.AreEqual(testList[0] + layer2.value, testList[1]);
Assert.AreEqual(testList[1] + layer2.value, testList[2]);
Assert.AreEqual(testList[2] + layer1.value, testList[3]);
Assert.AreEqual(layer1.value, testList[4]);
Assert.AreEqual(testList[4] + layer2.value, testList[5]);
Assert.AreEqual(testList[5] + layer2.value, testList[6]);
Assert.AreEqual(testList[6] + layer1.value, testList[7]);
}
[Test]
public async Task MethodCall_ReqRsp_WithLayerStackErrors()
{
var errorLayer = new TestLayer_ReturnErrors();
transportBuilder.SetLayerStack(new LayerStack<Dummy>(errorLayer));
var testService = new DummyTestService();
await DefaultSetup(testService, 1);
var proxy = new DummyTestProxy<SimpleInMemConnection>(connections[0]);
var request = new Dummy { int_value = 100 };
errorLayer.SetState(MessageType.Request, errorOnSend: false, errorOnReceive: true);
IMessage<Dummy> response = await proxy.ReqRspMethodAsync(request);
Assert.IsTrue(response.IsError);
Assert.AreEqual(TestLayer_ReturnErrors.ReceiveError, response.Error.Deserialize().error_code, "Bad error 1");
Assert.AreEqual(0, testService.RequestCount);
Assert.AreEqual(Dummy.Empty.int_value, testService.LastRequestReceived.int_value);
errorLayer.SetState(MessageType.Request, errorOnSend: true, errorOnReceive: false);
request.int_value = 101;
response = await proxy.ReqRspMethodAsync(request);
Assert.IsTrue(response.IsError);
Assert.AreEqual(TestLayer_ReturnErrors.SendError, response.Error.Deserialize().error_code);
Assert.AreEqual(0, testService.RequestCount);
Assert.AreEqual(Dummy.Empty.int_value, testService.LastRequestReceived.int_value);
errorLayer.SetState(MessageType.Response, errorOnSend: true, errorOnReceive: false);
request.int_value = 102;
response = await proxy.ReqRspMethodAsync(request);
Assert.IsTrue(response.IsError);
Assert.AreEqual(TestLayer_ReturnErrors.SendError, response.Error.Deserialize().error_code);
Assert.AreEqual(1, testService.RequestCount);
Assert.AreEqual(request.int_value, testService.LastRequestReceived.int_value);
errorLayer.SetState(MessageType.Response, errorOnSend: false, errorOnReceive: true);
request.int_value = 103;
response = await proxy.ReqRspMethodAsync(request);
Assert.IsTrue(response.IsError);
Assert.AreEqual(TestLayer_ReturnErrors.ReceiveError, response.Error.Deserialize().error_code);
Assert.AreEqual(2, testService.RequestCount);
Assert.AreEqual(request.int_value, testService.LastRequestReceived.int_value);
errorLayer.SetState(MessageType.Event, errorOnSend: true, errorOnReceive: true);
request.int_value = 104;
response = await proxy.ReqRspMethodAsync(request);
Assert.IsFalse(response.IsError);
Assert.AreEqual(105, response.Payload.Deserialize().int_value);
Assert.AreEqual(3, testService.RequestCount);
Assert.AreEqual(request.int_value, testService.LastRequestReceived.int_value);
}
[Test]
public async Task MethodCall_Event_WithLayerStackErrors()
{
var errorLayer = new TestLayer_ReturnErrors();
transportBuilder.SetLayerStack(new LayerStack<Dummy>(errorLayer));
var testService = new DummyTestService();
await DefaultSetup(testService, 1);
var proxy = new DummyTestProxy<SimpleInMemConnection>(connections[0]);
var theEvent = new Dummy { int_value = 100 };
errorLayer.SetState(MessageType.Event, errorOnSend: false, errorOnReceive: true);
ManualResetEventSlim waitForEvent = testService.CreateResetEvent();
proxy.EventMethodAsync(theEvent);
bool wasSignaled = waitForEvent.Wait(TimeSpan.FromSeconds(1));
Assert.IsFalse(wasSignaled, "Event should not fire 1");
testService.ClearResetEvent();
Assert.AreEqual(0, testService.EventCount);
Assert.AreEqual(Dummy.Empty.int_value, testService.LastEventReceived.int_value);
errorLayer.SetState(MessageType.Event, errorOnSend: true, errorOnReceive: false);
theEvent.int_value = 101;
waitForEvent = testService.CreateResetEvent();
proxy.EventMethodAsync(theEvent);
wasSignaled = waitForEvent.Wait(TimeSpan.FromSeconds(1));
Assert.IsFalse(wasSignaled, "Event should not fire 2");
testService.ClearResetEvent();
Assert.AreEqual(0, testService.EventCount);
Assert.AreEqual(Dummy.Empty.int_value, testService.LastEventReceived.int_value);
errorLayer.SetState(MessageType.Event, errorOnSend: false, errorOnReceive: false);
theEvent.int_value = 102;
waitForEvent = testService.CreateResetEvent();
proxy.EventMethodAsync(theEvent);
wasSignaled = waitForEvent.Wait(TimeSpan.FromSeconds(1));
Assert.IsTrue(wasSignaled, "Timed out waiting for event to fire");
Assert.AreEqual(1, testService.EventCount);
Assert.AreEqual(theEvent.int_value, testService.LastEventReceived.int_value);
}
[TearDown]
public async void Cleanup()
{
if (connections != null)
{
for (int connectionIndex = 0; connectionIndex < connections.Length; connectionIndex++)
{
await connections[connectionIndex].StopAsync();
}
}
connections = null;
transport.RemoveListener(address);
}
}
}

Просмотреть файл

@ -1,115 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace UnitTest.SimpleInMem
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Bond.Comm;
using Bond.Comm.SimpleInMem;
using NUnit.Framework;
[TestFixture]
public class SimpleInMemListenerTest
{
private readonly string address = "SimpleInMemTakesAnyRandomConnectionString";
private SimpleInMemTransport transport;
private CalculatorService service;
[SetUp]
public void Init()
{
transport = new SimpleInMemTransportBuilder().Construct();
service = new CalculatorService();
}
[TearDown]
public void Cleanup()
{
transport.RemoveListener(address);
}
[Test]
public void CreateInMemTransportListener()
{
var listener = transport.MakeListener(address);
Assert.IsNotNull(listener);
}
[Test]
public async Task StartStopInMemTransportListener()
{
var listener = transport.MakeListener(address);
Assert.IsNotNull(listener);
await listener.StartAsync();
await listener.StopAsync();
}
[Test]
public void AddRemoveService()
{
SimpleInMemListener listener = (SimpleInMemListener)transport.MakeListener(address);
listener.AddService<CalculatorService>(service);
foreach (var serviceMethod in service.Methods)
{
Assert.True(listener.IsRegistered($"{serviceMethod.MethodName}"));
}
Assert.False(listener.IsRegistered("Divide"));
listener.RemoveService<CalculatorService>(service);
foreach (var serviceMethod in service.Methods)
{
Assert.False(listener.IsRegistered($"{serviceMethod.MethodName}"));
}
Assert.False(listener.IsRegistered("Divide"));
}
[Test]
public async Task ConnectedEvent_HasRightRemoteEndpointDetails()
{
SimpleInMemConnection listenerConnection = null;
var connectedEventDone = new ManualResetEventSlim(initialState: false);
SimpleInMemListener listener = (SimpleInMemListener)transport.MakeListener(address);
listener.Connected += (sender, args) =>
{
Assert.AreSame(listener, sender);
listenerConnection = (SimpleInMemConnection)args.Connection;
connectedEventDone.Set();
};
await listener.StartAsync();
var connection = (SimpleInMemConnection)await transport.ConnectToAsync(address);
bool wasSignaled = connectedEventDone.Wait(TimeSpan.FromSeconds(30));
Assert.IsTrue(wasSignaled, "Timed out waiting for Connected event to complete");
Assert.AreEqual(connection.Id, listenerConnection.Id);
}
[Test]
public async Task ConnectedEvent_SetsDisconnectError_ConnectToThrows()
{
var disconnectError = new Error { error_code = 100, message = "Go away!" };
SimpleInMemListener listener = (SimpleInMemListener)transport.MakeListener(address);
listener.Connected += (sender, args) =>
{
args.DisconnectError = disconnectError;
};
await listener.StartAsync();
try
{
await transport.ConnectToAsync(address);
Assert.Fail("Expected an exception to be thrown, but one wasn't.");
}
catch (SimpleInMemProtocolErrorException ex)
{
Assert.AreSame(disconnectError, ex.Details);
}
}
}
}

Просмотреть файл

@ -1,81 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace UnitTest.SimpleInMem
{
using System;
using System.Threading.Tasks;
using Bond.Comm;
using Bond.Comm.SimpleInMem;
using NUnit.Framework;
using UnitTest.Comm;
[TestFixture]
public class SimpleInMemTransportTest
{
private readonly string address = "SimpleInMemTakesAnyRandomConnectionString";
private SimpleInMemTransport transport;
[SetUp]
public void Init()
{
transport = new SimpleInMemTransportBuilder().Construct();
}
[TearDown]
public void Cleanup()
{
transport.StopAsync();
transport = null;
}
[Test]
public void Builder_Construct_NoArgs_Succeeds()
{
var builder = new SimpleInMemTransportBuilder();
Assert.NotNull(builder.Construct());
}
[Test]
public void StopAsync()
{
Listener newListener = transport.MakeListener(address);
Assert.True(newListener == transport.GetListener(address));
transport.StopAsync();
Assert.False(transport.ListenerExists(address));
transport.MakeListener(address);
Assert.True(transport.ListenerExists(address));
}
[Test]
public void ConnectToAsync_NoListenerRunning()
{
Assert.Throws<ArgumentException>(async () => await transport.ConnectToAsync(address, new System.Threading.CancellationToken()));
}
[Test]
public void MakeListener()
{
bool listenerExists = transport.ListenerExists(address);
Assert.False(listenerExists);
var listener = transport.MakeListener(address);
listenerExists = transport.ListenerExists(address);
Assert.True(listenerExists);
transport.RemoveListener(address);
Assert.Null(transport.GetListener(address));
}
[Test]
public async Task ConnectToAsync()
{
Listener l = transport.MakeListener(address);
Connection conn = await transport.ConnectToAsync(address, new System.Threading.CancellationToken());
Assert.NotNull(conn);
Assert.True(conn is SimpleInMemConnection);
SimpleInMemConnection simpleConn = (SimpleInMemConnection)conn;
Assert.True(simpleConn.ConnectionType == ConnectionType.Client);
transport.RemoveListener(address);
Assert.Null(transport.GetListener(address));
}
}
}

Просмотреть файл

@ -1,15 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace unittest.simpleinmem
struct PairedInput
{
0: int32 First;
1: int32 Second;
}
struct Output
{
0: int32 Result;
}

Просмотреть файл

@ -34,16 +34,6 @@
</BondCodegen>
<Compile Include="Layers\LayerStackTests.cs" />
<Compile Include="properties\AssemblyInfo.cs" />
<Compile Include="SimpleInMem\CalculatorServiceImpl.cs" />
<Compile Include="SimpleInMem\SimpleInMemConnectionTest.cs" />
<Compile Include="SimpleInMem\SimpleInMemListenerTest.cs" />
<Compile Include="SimpleInMem\SimpleInMemTransportTest.cs" />
<BondCodegen Include="SimpleInMem\SimpleStruct.bond">
<Options>--namespace=unittest.simpleinmem=UnitTest.SimpleInMem</Options>
</BondCodegen>
<BondCodegen Include="SimpleInMem\Calculator.bond">
<Options>--namespace=unittest.simpleinmem=UnitTest.SimpleInMem</Options>
</BondCodegen>
<Compile Include="Epoxy\FrameTests.cs" />
<Compile Include="Epoxy\EpoxyConnectionTests.cs" />
<Compile Include="Epoxy\EpoxyListenerTests.cs" />
@ -51,9 +41,6 @@
<Compile Include="Epoxy\EpoxyTransportTests.cs" />
<Compile Include="Epoxy\ResponseMapTests.cs" />
<!-- TODO: edit the .targets to automatically include the comm files -->
<Compile Include="$(IntermediateOutputPath)calculator_interfaces.cs" />
<Compile Include="$(IntermediateOutputPath)calculator_proxies.cs" />
<Compile Include="$(IntermediateOutputPath)calculator_services.cs" />
<Compile Include="$(IntermediateOutputPath)comm-test_interfaces.cs" />
<Compile Include="$(IntermediateOutputPath)comm-test_proxies.cs" />
<Compile Include="$(IntermediateOutputPath)comm-test_services.cs" />
@ -98,10 +85,6 @@
<Project>{5f6cbc77-8fb5-4644-bab5-f8e62792266e}</Project>
<Name>layers</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\comm\simpleinmem-transport\simpleinmem-transport.csproj">
<Project>{54a3432b-99e1-4deb-b4eb-2d6e158ecd24}</Project>
<Name>simpleinmem-transport</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\comm\epoxy-transport\epoxy-transport.csproj">
<Project>{c687c52c-0a5b-4f10-8cb3-dbaf9a72d042}</Project>
<Name>epoxy-transport</Name>
@ -112,4 +95,4 @@
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildThisFileDirectory)\..\..\build\Common.Internal.targets" />
</Project>
</Project>