From bb402f312868e4335203fbd4ded25498781d0e7d Mon Sep 17 00:00:00 2001 From: Christopher Warrington Date: Mon, 30 May 2016 18:17:04 -0700 Subject: [PATCH] Remove Simple In-Memory transport The Simple In-Memory transport isn't quite ready to be released. Since we'd like to get the first preview release, we're removing Simple In-Memory until it has had a change to be refactored into a releasable state. --- cs/cs.sln | 21 - .../Processor/EventProcessor.cs | 85 ---- .../Processor/InMemFrameQueue.cs | 202 ---------- .../Processor/QueueProcessor.cs | 44 -- .../Processor/RequestProcessor.cs | 107 ----- .../Processor/ResponseProcessor.cs | 70 ---- .../simpleinmem-transport/Processor/Util.cs | 49 --- .../SimpleInMemConnection.cs | 308 -------------- .../SimpleInMemContexts.cs | 25 -- .../SimpleInMemListener.cs | 80 ---- .../SimpleInMemProtocolErrorException.cs | 32 -- .../SimpleInMemTransport.bond | 18 - .../SimpleInMemTransport.cs | 122 ------ .../simpleinmem-transport/packages.config | 4 - .../properties/AssemblyInfo.cs | 20 - .../simpleinmem-transport.csproj | 63 --- cs/test/comm/SimpleInMem/Calculator.bond | 17 - .../comm/SimpleInMem/CalculatorServiceImpl.cs | 45 --- .../SimpleInMem/SimpleInMemConnectionTest.cs | 377 ------------------ .../SimpleInMem/SimpleInMemListenerTest.cs | 115 ------ .../SimpleInMem/SimpleInMemTransportTest.cs | 81 ---- cs/test/comm/SimpleInMem/SimpleStruct.bond | 15 - cs/test/comm/comm.csproj | 19 +- 23 files changed, 1 insertion(+), 1918 deletions(-) delete mode 100644 cs/src/comm/simpleinmem-transport/Processor/EventProcessor.cs delete mode 100644 cs/src/comm/simpleinmem-transport/Processor/InMemFrameQueue.cs delete mode 100644 cs/src/comm/simpleinmem-transport/Processor/QueueProcessor.cs delete mode 100644 cs/src/comm/simpleinmem-transport/Processor/RequestProcessor.cs delete mode 100644 cs/src/comm/simpleinmem-transport/Processor/ResponseProcessor.cs delete mode 100644 cs/src/comm/simpleinmem-transport/Processor/Util.cs delete mode 100644 cs/src/comm/simpleinmem-transport/SimpleInMemConnection.cs delete mode 100644 cs/src/comm/simpleinmem-transport/SimpleInMemContexts.cs delete mode 100644 cs/src/comm/simpleinmem-transport/SimpleInMemListener.cs delete mode 100644 cs/src/comm/simpleinmem-transport/SimpleInMemProtocolErrorException.cs delete mode 100644 cs/src/comm/simpleinmem-transport/SimpleInMemTransport.bond delete mode 100644 cs/src/comm/simpleinmem-transport/SimpleInMemTransport.cs delete mode 100644 cs/src/comm/simpleinmem-transport/packages.config delete mode 100644 cs/src/comm/simpleinmem-transport/properties/AssemblyInfo.cs delete mode 100644 cs/src/comm/simpleinmem-transport/simpleinmem-transport.csproj delete mode 100644 cs/test/comm/SimpleInMem/Calculator.bond delete mode 100644 cs/test/comm/SimpleInMem/CalculatorServiceImpl.cs delete mode 100644 cs/test/comm/SimpleInMem/SimpleInMemConnectionTest.cs delete mode 100644 cs/test/comm/SimpleInMem/SimpleInMemListenerTest.cs delete mode 100644 cs/test/comm/SimpleInMem/SimpleInMemTransportTest.cs delete mode 100644 cs/test/comm/SimpleInMem/SimpleStruct.bond diff --git a/cs/cs.sln b/cs/cs.sln index b594a315..de555260 100644 --- a/cs/cs.sln +++ b/cs/cs.sln @@ -196,8 +196,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "pingpong", "..\examples\cs\ {92915BD9-8AB1-4E4D-A2AC-95BBF4F82D89} = {92915BD9-8AB1-4E4D-A2AC-95BBF4F82D89} EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "simpleinmem-transport", "src\comm\simpleinmem-transport\simpleinmem-transport.csproj", "{54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "logging", "..\examples\cs\comm\logging\logging.csproj", "{5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "service", "src\comm\service\service.csproj", "{79D2423A-87C8-44A2-89C2-2FA94521747E}" @@ -737,24 +735,6 @@ Global {92C5B98B-12A4-4995-90B8-A19E96524464}.Release|Mixed Platforms.Build.0 = Release|Any CPU {92C5B98B-12A4-4995-90B8-A19E96524464}.Release|Win32.ActiveCfg = Release|Any CPU {92C5B98B-12A4-4995-90B8-A19E96524464}.Release|Win32.Build.0 = Release|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Any CPU.Build.0 = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Win32.ActiveCfg = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Debug|Win32.Build.0 = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Any CPU.ActiveCfg = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Any CPU.Build.0 = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Mixed Platforms.ActiveCfg = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Mixed Platforms.Build.0 = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Win32.ActiveCfg = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Fields|Win32.Build.0 = Debug|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Any CPU.ActiveCfg = Release|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Any CPU.Build.0 = Release|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Mixed Platforms.Build.0 = Release|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Win32.ActiveCfg = Release|Any CPU - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24}.Release|Win32.Build.0 = Release|Any CPU {5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}.Debug|Any CPU.Build.0 = Debug|Any CPU {5C8132A8-C4B1-45E0-BCA6-379DA23B86D3}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU @@ -879,7 +859,6 @@ Global {F13CB9C0-DB52-45DB-9BFD-05CB26512FC6} = {882B4409-F7A4-496A-AC2B-3E46A448D5D8} {C687C52C-0A5B-4F10-8CB3-DBAF9A72D042} = {ED161076-6BB8-4A13-83ED-0E9C01461D5E} {92C5B98B-12A4-4995-90B8-A19E96524464} = {621A2166-EEE0-4A27-88AA-5BE5AC996452} - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24} = {ED161076-6BB8-4A13-83ED-0E9C01461D5E} {5C8132A8-C4B1-45E0-BCA6-379DA23B86D3} = {621A2166-EEE0-4A27-88AA-5BE5AC996452} {79D2423A-87C8-44A2-89C2-2FA94521747E} = {ED161076-6BB8-4A13-83ED-0E9C01461D5E} {12279366-F646-4FEC-8CAA-B62A8EC477BB} = {621A2166-EEE0-4A27-88AA-5BE5AC996452} diff --git a/cs/src/comm/simpleinmem-transport/Processor/EventProcessor.cs b/cs/src/comm/simpleinmem-transport/Processor/EventProcessor.cs deleted file mode 100644 index dabd44ae..00000000 --- a/cs/src/comm/simpleinmem-transport/Processor/EventProcessor.cs +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem.Processor -{ - using Service; - using System; - using System.Threading.Tasks; - using Bond.Comm.Layers; - - internal class EventProcessor : QueueProcessor - { - readonly SimpleInMemConnection connection; - readonly ServiceHost serviceHost; - readonly InMemFrameQueueCollection serverqueues; - - internal EventProcessor(SimpleInMemConnection connection, ServiceHost host, InMemFrameQueueCollection queues) - { - if (connection == null) throw new ArgumentNullException(nameof(connection)); - if (host == null) throw new ArgumentNullException(nameof(host)); - if (queues == null) throw new ArgumentNullException(nameof(queues)); - - this.connection = connection; - serviceHost = host; - serverqueues = queues; - } - - override internal void Process() - { - const PayloadType payloadType = PayloadType.Event; - - foreach (Guid key in serverqueues.GetKeys()) - { - InMemFrameQueue queue = serverqueues.GetQueue(key); - Task.Run(() => ProcessQueue(queue, payloadType)); - } - } - - private void ProcessQueue(InMemFrameQueue queue, PayloadType payloadType) - { - int queueSize = queue.Count(payloadType); - - int batchIndex = 0; - - if (queueSize == 0) - { - return; - } - - while (batchIndex < PROCESSING_BATCH_SIZE && queueSize > 0) - { - var payload = queue.Dequeue(payloadType); - var headers = payload.headers; - var layerData = payload.layerData; - var message = payload.message; - - DispatchEvent(headers, layerData, message); - queueSize = queue.Count(payloadType); - batchIndex++; - } - } - - private void DispatchEvent(SimpleInMemHeaders headers, IBonded layerData, IMessage message) - { - var receiveContext = new SimpleInMemReceiveContext(connection); - - Error layerError = LayerStackUtils.ProcessOnReceive(serviceHost.ParentTransport.LayerStack, - MessageType.Event, receiveContext, layerData); - - if (layerError != null) - { - Log.Error("{0}.{1}: Receiving event {2}/{3} failed due to layer error (Code: {4}, Message: {5}).", - this, nameof(DispatchEvent), headers.conversation_id, headers.method_name, - layerError.error_code, layerError.message); - return; - } - - Task.Run(async () => - { - await serviceHost.DispatchEvent( - headers.method_name, receiveContext, message, connection.ConnectionMetrics); - }); - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/Processor/InMemFrameQueue.cs b/cs/src/comm/simpleinmem-transport/Processor/InMemFrameQueue.cs deleted file mode 100644 index a4171eab..00000000 --- a/cs/src/comm/simpleinmem-transport/Processor/InMemFrameQueue.cs +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem.Processor -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.Threading.Tasks; - - internal class InMemFrameQueue - { - private Queue requests; - private Queue responses; - private Queue events; - private object requestsLock = new object(); - private object responsesLock = new object(); - private object eventsLock = new object(); - - internal InMemFrameQueue() - { - requests = new Queue(); - responses = new Queue(); - events = new Queue(); - } - - internal void Clear() - { - lock (requestsLock) - { - Clear(requests); - } - - lock (responsesLock) - { - Clear(responses); - } - - lock (eventsLock) - { - Clear(events); - } - } - - internal void Enqueue(InMemFrame frame) - { - Util.Validate(frame); - - switch(frame.headers.payload_type) - { - case PayloadType.Request: - lock (requestsLock) - { - requests.Enqueue(frame); - } - break; - - case PayloadType.Response: - lock (responsesLock) - { - responses.Enqueue(frame); - } - break; - - case PayloadType.Event: - lock (events) - { - events.Enqueue(frame); - } - break; - - default: - var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Payload type {2} not supported!", - nameof(InMemFrameQueue), nameof(Enqueue), frame.headers.payload_type); - throw new NotImplementedException(message); - } - - } - - internal InMemFrame Dequeue(PayloadType payloadType) - { - InMemFrame frame; - switch (payloadType) - { - case PayloadType.Request: - lock (requestsLock) - { - frame = requests.Dequeue(); - } - break; - - case PayloadType.Response: - lock (responsesLock) - { - frame = responses.Dequeue(); - } - break; - - case PayloadType.Event: - lock (eventsLock) - { - frame = events.Dequeue(); - } - break; - - default: - var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Payload type {2} not supported!", - nameof(InMemFrameQueue), nameof(Dequeue), payloadType); - throw new NotImplementedException(message); - } - - return frame; - } - - internal int Count(PayloadType payloadType) - { - int count; - switch(payloadType) - { - case PayloadType.Request: - count = requests.Count; - break; - - case PayloadType.Response: - count = responses.Count; - break; - - case PayloadType.Event: - count = events.Count; - break; - - default: - var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Payload type {2} not supported!", - nameof(InMemFrameQueue), nameof(Count), payloadType); - throw new NotImplementedException(message); - } - - return count; - } - - private void Clear(Queue queue) - { - foreach (InMemFrame frame in queue) - { - frame.outstandingRequest.SetCanceled(); - } - queue.Clear(); - } - } - - internal class InMemFrameQueueCollection - { - private ConcurrentDictionary reqresqueue; - - internal InMemFrameQueueCollection() - { - reqresqueue = new ConcurrentDictionary(); - } - - internal void Add(Guid id, InMemFrameQueue queue) - { - if (!reqresqueue.TryAdd(id, queue)) - { - var message = LogUtil.FatalAndReturnFormatted( - "{0}.{1}: Guid collison must never happen for client connection Ids: {2}", - nameof(InMemFrameQueueCollection), nameof(Add), id); - throw new InvalidOperationException(message); - } - } - - internal ICollection GetKeys() - { - return reqresqueue.Keys; - } - - internal InMemFrameQueue GetQueue(Guid id) - { - InMemFrameQueue queue; - reqresqueue.TryGetValue(id, out queue); - return queue; - } - - internal void ClearAll() - { - ICollection keys = GetKeys(); - - foreach (Guid key in keys) - { - GetQueue(key).Clear(); - } - reqresqueue.Clear(); - } - } - - internal class InMemFrame - { - internal SimpleInMemHeaders headers; - internal IBonded layerData; - internal IMessage message; - internal TaskCompletionSource outstandingRequest; - } -} diff --git a/cs/src/comm/simpleinmem-transport/Processor/QueueProcessor.cs b/cs/src/comm/simpleinmem-transport/Processor/QueueProcessor.cs deleted file mode 100644 index 08f3258f..00000000 --- a/cs/src/comm/simpleinmem-transport/Processor/QueueProcessor.cs +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem.Processor -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using System.Threading.Tasks.Dataflow; - - internal abstract class QueueProcessor - { - protected const int PROCESSING_BATCH_SIZE = 1000; - - private readonly Random randomDelay = new Random(DateTime.UtcNow.Millisecond); - private readonly object processInput = new object(); - - public void ProcessAsync(CancellationToken t) - { - NewLongRunningTask(o => Process(), t).Post(processInput); - } - - internal abstract void Process(); - - private ITargetBlock NewLongRunningTask(Action process, CancellationToken t) - { - ActionBlock actionBlock = null; - - actionBlock = new ActionBlock(async o => { - process(o); - // Delay between 1 to 10 milliseconds before posting itself. - // Need it for task context switching for long running, CPU intensive tasks. - await Task.Delay(TimeSpan.FromMilliseconds(randomDelay.Next(1, 11)), t); - actionBlock.Post(o); - }, - new ExecutionDataflowBlockOptions - { - CancellationToken = t - }); - - return actionBlock; - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/Processor/RequestProcessor.cs b/cs/src/comm/simpleinmem-transport/Processor/RequestProcessor.cs deleted file mode 100644 index 7315f491..00000000 --- a/cs/src/comm/simpleinmem-transport/Processor/RequestProcessor.cs +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem.Processor -{ - using System; - using System.Threading.Tasks; - using Bond.Comm.Layers; - using Bond.Comm.Service; - - internal class RequestProcessor : QueueProcessor - { - readonly SimpleInMemConnection connection; - readonly ServiceHost serviceHost; - readonly InMemFrameQueueCollection serverqueues; - - internal RequestProcessor(SimpleInMemConnection connection, ServiceHost host, InMemFrameQueueCollection queues) - { - if (connection == null) throw new ArgumentNullException(nameof(connection)); - if (host == null) throw new ArgumentNullException(nameof(host)); - if (queues == null) throw new ArgumentNullException(nameof(queues)); - - this.connection = connection; - serviceHost = host; - serverqueues = queues; - } - - override internal void Process() - { - const PayloadType payloadType = PayloadType.Request; - - foreach (Guid key in serverqueues.GetKeys()) - { - InMemFrameQueue queue = serverqueues.GetQueue(key); - Task.Run(() => ProcessQueue(queue, payloadType)); - } - } - - private void ProcessQueue(InMemFrameQueue queue, PayloadType payloadType) - { - int queueSize = queue.Count(payloadType); - int batchIndex = 0; - - if (queueSize == 0) - { - return; - } - - while (batchIndex < PROCESSING_BATCH_SIZE && queueSize > 0) - { - var payload = queue.Dequeue(payloadType); - var headers = payload.headers; - var layerData = payload.layerData; - var message = payload.message; - var taskSource = payload.outstandingRequest; - - Task.Run(() => DispatchRequest(headers, layerData, message, queue, taskSource)); - queueSize = queue.Count(payloadType); - batchIndex++; - } - } - - private async void DispatchRequest(SimpleInMemHeaders headers, IBonded layerData, IMessage message, InMemFrameQueue queue, - TaskCompletionSource taskSource) - { - var receiveContext = new SimpleInMemReceiveContext(connection); - - Error layerError = LayerStackUtils.ProcessOnReceive(this.serviceHost.ParentTransport.LayerStack, - MessageType.Request, receiveContext, layerData); - - IMessage response; - - if (layerError == null) - { - response = await serviceHost.DispatchRequest(headers.method_name, receiveContext, message, connection.ConnectionMetrics); - } - else - { - Log.Error("{0}.{1}: Receiving request {2}/{3} failed due to layer error (Code: {4}, Message: {5}).", - this, nameof(DispatchRequest), headers.conversation_id, headers.method_name, - layerError.error_code, layerError.message); - response = Message.FromError(layerError); - } - SendReply(headers.conversation_id, response, queue, taskSource); - } - - internal void SendReply(ulong conversationId, IMessage response, InMemFrameQueue queue, - TaskCompletionSource taskSource) - { - var sendContext = new SimpleInMemSendContext(connection); - IBonded layerData; - Error layerError = LayerStackUtils.ProcessOnSend(this.serviceHost.ParentTransport.LayerStack, - MessageType.Response, sendContext, out layerData); - - // If there was a layer error, replace the response with the layer error - if (layerError != null) - { - Log.Error("{0}.{1}: Sending reply for request ID {2} failed due to layer error (Code: {3}, Message: {4}).", - this, nameof(SendReply), conversationId, layerError.error_code, layerError.message); - response = Message.FromError(layerError); - } - - var payload = Util.NewPayLoad(conversationId, PayloadType.Response, layerData, response, taskSource); - queue.Enqueue(payload); - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/Processor/ResponseProcessor.cs b/cs/src/comm/simpleinmem-transport/Processor/ResponseProcessor.cs deleted file mode 100644 index 2e5861be..00000000 --- a/cs/src/comm/simpleinmem-transport/Processor/ResponseProcessor.cs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem.Processor -{ - using System; - using System.Threading.Tasks; - using Bond.Comm.Layers; - - internal class ResponseProcessor : QueueProcessor - { - readonly SimpleInMemConnection connection; - readonly Transport parentTransport; - readonly InMemFrameQueue clientreqresqueue; - - internal ResponseProcessor(SimpleInMemConnection connection, Transport parentTransport, InMemFrameQueue queue) - { - if (queue == null) throw new ArgumentNullException(nameof(queue)); - if (connection == null) throw new ArgumentNullException(nameof(connection)); - - this.connection = connection; - this.parentTransport = parentTransport; - clientreqresqueue = queue; - } - - override internal void Process() - { - const PayloadType payloadType = PayloadType.Response; - int queueSize = clientreqresqueue.Count(payloadType); - int batchIndex = 0; - - if (queueSize == 0) - { - return; - } - - while (batchIndex < PROCESSING_BATCH_SIZE && queueSize > 0) - { - var frame = clientreqresqueue.Dequeue(payloadType); - var headers = frame.headers; - var layerData = frame.layerData; - var message = frame.message; - var taskSource = frame.outstandingRequest; - - DispatchResponse(headers, layerData, message, taskSource); - queueSize = clientreqresqueue.Count(payloadType); - batchIndex++; - } - } - - private void DispatchResponse(SimpleInMemHeaders headers, IBonded layerData, IMessage message, - TaskCompletionSource responseCompletionSource) - { - var receiveContext = new SimpleInMemReceiveContext(connection); - - Error layerError = LayerStackUtils.ProcessOnReceive(parentTransport.LayerStack, - MessageType.Response, receiveContext, layerData); - - if (layerError != null) - { - Log.Error("{0}.{1}: Receiving response {2}/{3} failed due to layer error (Code: {4}, Message: {5}).", - this, nameof(DispatchResponse), headers.conversation_id, headers.method_name, - layerError.error_code, layerError.message); - message = Message.FromError(layerError); - } - - responseCompletionSource.SetResult(message); - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/Processor/Util.cs b/cs/src/comm/simpleinmem-transport/Processor/Util.cs deleted file mode 100644 index fb9f8953..00000000 --- a/cs/src/comm/simpleinmem-transport/Processor/Util.cs +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem.Processor -{ - using System.Threading.Tasks; - - internal static class Util - { - internal static InMemFrame NewPayLoad(ulong conversationId, PayloadType payloadType, IBonded layerData, - IMessage message, TaskCompletionSource taskSource) - { - var headers = new SimpleInMemHeaders - { - conversation_id = conversationId, - payload_type = payloadType - }; - - var payload = new InMemFrame - { - headers = headers, - layerData = layerData, - message = message, - outstandingRequest = taskSource - }; - - Validate(payload); - return payload; - } - - internal static InMemFrame Validate(InMemFrame frame) - { - if (frame == null) - { - throw new SimpleInMemProtocolErrorException($"null {nameof(frame)}"); - } - else if (frame.headers == null) - { - throw new SimpleInMemProtocolErrorException($"null {nameof(frame.headers)} in frame"); - } - else if (frame.message == null) - { - throw new SimpleInMemProtocolErrorException($"null {nameof(frame.message)} in frame"); - } - - return frame; - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/SimpleInMemConnection.cs b/cs/src/comm/simpleinmem-transport/SimpleInMemConnection.cs deleted file mode 100644 index e55668cc..00000000 --- a/cs/src/comm/simpleinmem-transport/SimpleInMemConnection.cs +++ /dev/null @@ -1,308 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem -{ - using System; - using System.Collections.Generic; - using System.Runtime.CompilerServices; - using System.Threading; - using System.Threading.Tasks; - using Bond.Comm.Layers; - using Bond.Comm.Service; - using Bond.Comm.SimpleInMem.Processor; - - public enum ConnectionType - { - Client, - Server - } - - [Flags] - public enum CnxState - { - Created = 0x01, - Connected = 0x02, - SendProtocolError = 0x04, - Disconnecting = 0x08, - Disconnected = 0x10, - } - - public class SimpleInMemConnection : Connection, IRequestResponseConnection, IEventConnection - { - private readonly Guid connectionId; - private readonly ConnectionType connectionType; - private readonly ServiceHost serviceHost; - private readonly SimpleInMemListener parentListener; - private InMemFrameQueue communicationQueue; - private InMemFrameQueueCollection serverqueues; - private object connectionsLock = new object(); - private long prevConversationId; - private CancellationTokenSource cancelTokenSource = new CancellationTokenSource(); - private CnxState state; - private object stateLock = new object(); - private HashSet clientConnections; - public ConnectionMetrics ConnectionMetrics { get; } = new ConnectionMetrics(); - - public SimpleInMemConnection(SimpleInMemTransport parentTransport, SimpleInMemListener parentListener, ConnectionType connectionType) : - this (new ServiceHost(parentTransport), parentListener, connectionType) - { - } - - internal SimpleInMemConnection(ServiceHost serviceHost, SimpleInMemListener parentListener, ConnectionType connectionType) - { - if (serviceHost == null) throw new ArgumentNullException(nameof(serviceHost)); - if (parentListener == null) throw new ArgumentNullException(nameof(parentListener)); - - connectionId = Guid.NewGuid(); - this.connectionType = connectionType; - this.serviceHost = serviceHost; - this.parentListener = parentListener; - - switch(connectionType) - { - case ConnectionType.Client: - communicationQueue = new InMemFrameQueue(); - break; - - case ConnectionType.Server: - serverqueues = new InMemFrameQueueCollection(); - clientConnections = new HashSet(); - break; - - default: - throw new NotSupportedException(nameof(connectionType)); - } - - // start at -1 or 0 so the first conversation ID is 1 or 2. - prevConversationId = (connectionType == ConnectionType.Client) ? -1 : 0; - - state = CnxState.Created; - - ConnectionMetrics.connection_id = connectionId.ToString(); - } - - public CnxState State - { - get - { - return state; - } - } - - public ConnectionType ConnectionType - { - get - { - return connectionType; - } - } - - public Guid Id - { - get - { - return connectionId; - } - } - - public override string ToString() - { - return $"{nameof(SimpleInMemConnection)}({connectionId})"; - } - - public override Task StopAsync() - { - bool connected = false; - lock (stateLock) - { - if (connected = ((state & CnxState.Connected) != 0)) - { - state = CnxState.Disconnecting; - cancelTokenSource.Cancel(); - } - } - - if (connected) - { - OnDisconnect(); - lock (stateLock) - { - state = CnxState.Disconnected; - } - } - - return TaskExt.CompletedTask; - } - - public async Task> RequestResponseAsync(string methodName, IMessage message, CancellationToken ct) - { - EnsureCorrectState(CnxState.Connected); - IMessage response = await SendRequestAsync(methodName, message); - return response.Convert(); - } - - public Task FireEventAsync(string methodName, IMessage message) - { - EnsureCorrectState(CnxState.Connected); - SendEventAsync(methodName, message); - return TaskExt.CompletedTask; - } - - internal InMemFrameQueue CommunicationQueue - { - get - { - return communicationQueue; - } - } - - internal void Start() - { - lock (stateLock) - { - EnsureCorrectState(CnxState.Created | CnxState.Disconnected); - - if (connectionType == ConnectionType.Client) - { - var responseProcessor = new ResponseProcessor(this, serviceHost.ParentTransport, communicationQueue); - Task.Run(() => responseProcessor.ProcessAsync(cancelTokenSource.Token)); - } - else if (connectionType == ConnectionType.Server) - { - var requestProcessor = new RequestProcessor(this, serviceHost, serverqueues); - var eventProcessor = new EventProcessor(this, serviceHost, serverqueues); - Task.Run(() => requestProcessor.ProcessAsync(cancelTokenSource.Token)); - Task.Run(() => eventProcessor.ProcessAsync(cancelTokenSource.Token)); - } - else - { - var message = LogUtil.FatalAndReturnFormatted("{0}.{1}: Connection type {2} not implemented.", - this, nameof(Start), connectionType); - throw new NotImplementedException(message); - } - - state = CnxState.Connected; - } - } - - internal void AddClientConnection(SimpleInMemConnection connection) - { - if (connectionType == ConnectionType.Client) - { - var message = LogUtil.FatalAndReturnFormatted( - "{0}.{1}: Client connection does not support adding new request response queue.", - this, nameof(AddClientConnection)); - throw new NotSupportedException(message); - } - - serverqueues.Add(connection.Id, connection.CommunicationQueue); - lock (connectionsLock) - { - clientConnections.Add(connection); - } - } - - private Task SendRequestAsync(string methodName, IMessage request) - { - var conversationId = AllocateNextConversationId(); - - var sendContext = new SimpleInMemSendContext(this); - IBonded layerData; - Error layerError = LayerStackUtils.ProcessOnSend(this.serviceHost.ParentTransport.LayerStack, - MessageType.Request, sendContext, out layerData); - - if (layerError != null) - { - Log.Error("{0}.{1}: Sending request {2}/{3} failed due to layer error (Code: {4}, Message: {5}).", - this, nameof(SendRequestAsync), conversationId, methodName, layerError.error_code, layerError.message); - return Task.FromResult(Message.FromError(layerError)); - } - - var payload = Util.NewPayLoad(conversationId, PayloadType.Request, layerData, request, new TaskCompletionSource()); - payload.headers.method_name = methodName; - communicationQueue.Enqueue(payload); - - return payload.outstandingRequest.Task; - } - - private void SendEventAsync(string methodName, IMessage message) - { - var conversationId = AllocateNextConversationId(); - - var sendContext = new SimpleInMemSendContext(this); - IBonded layerData; - Error layerError = LayerStackUtils.ProcessOnSend(this.serviceHost.ParentTransport.LayerStack, - MessageType.Event, sendContext, out layerData); - - if (layerError != null) - { - Log.Error("{0}.{1}: Sending event {2}/{3} failed due to layer error (Code: {4}, Message: {5}).", - this, nameof(SendEventAsync), conversationId, methodName, layerError.error_code, layerError.message); - return; - } - - var payload = Util.NewPayLoad(conversationId, PayloadType.Event, layerData, message, null); - payload.headers.method_name = methodName; - communicationQueue.Enqueue(payload); - } - - private ulong AllocateNextConversationId() - { - // Interlocked.Add() handles overflow by wrapping, not throwing. - var newConversationId = Interlocked.Add(ref prevConversationId, 2); - if (newConversationId < 0) - { - throw new SimpleInMemProtocolErrorException("Exhausted conversation IDs"); - } - return unchecked((ulong)newConversationId); - } - - private void OnDisconnect() - { - Log.Debug("{0}.{1}: Shutting down.", this, nameof(OnDisconnect)); - - var args = new DisconnectedEventArgs(this, null); - parentListener.InvokeOnDisconnected(args); - - if(connectionType == ConnectionType.Client) - { - DisconnectClient(); - } - else - { - DisconnectServer(); - } - } - - private void DisconnectClient() - { - communicationQueue.Clear(); - } - - private void DisconnectServer() - { - serverqueues.ClearAll(); - - lock (connectionsLock) - { - foreach (SimpleInMemConnection connection in clientConnections) - { - connection.StopAsync(); - } - - clientConnections.Clear(); - } - } - - private void EnsureCorrectState(CnxState allowedStates, [CallerMemberName] string methodName = "") - { - - if ((state & allowedStates) == 0) - { - var message = $"Connection (${this}) is not in the correct state for the requested operation (${methodName}). Current state: ${state} Allowed states: ${allowedStates}"; - throw new InvalidOperationException(message); - } - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/SimpleInMemContexts.cs b/cs/src/comm/simpleinmem-transport/SimpleInMemContexts.cs deleted file mode 100644 index fa2adfe2..00000000 --- a/cs/src/comm/simpleinmem-transport/SimpleInMemContexts.cs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem -{ - public class SimpleInMemSendContext : SendContext - { - public SimpleInMemSendContext(SimpleInMemConnection connection) - { - Connection = connection; - } - - public override Connection Connection { get; } - } - - public class SimpleInMemReceiveContext : ReceiveContext - { - public SimpleInMemReceiveContext(SimpleInMemConnection connection) - { - Connection = connection; - } - - public override Connection Connection { get; } - } -} diff --git a/cs/src/comm/simpleinmem-transport/SimpleInMemListener.cs b/cs/src/comm/simpleinmem-transport/SimpleInMemListener.cs deleted file mode 100644 index 03caa59b..00000000 --- a/cs/src/comm/simpleinmem-transport/SimpleInMemListener.cs +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem -{ - using Bond.Comm.Service; - using System.Collections.Generic; - using System.Threading.Tasks; - - public class SimpleInMemListener : Listener - { - private ServiceHost serviceHost; - private string address; - private SimpleInMemConnection connection; - private readonly string logname; - - public SimpleInMemListener(SimpleInMemTransport parentTransport, string address) - { - this.address = address; - serviceHost = new ServiceHost(parentTransport); - connection = new SimpleInMemConnection(serviceHost, this, ConnectionType.Server); - logname = $"{nameof(SimpleInMemListener)}({address})"; - } - - public override bool IsRegistered(string serviceMethodName) - { - return serviceHost.IsRegistered(serviceMethodName); - } - - public override void AddService(T service) - { - Log.Information("{0}.{1}: Adding {2}.", logname, nameof(AddService), typeof(T).Name); - serviceHost.Register(service); - } - - public override void RemoveService(T service) - { - serviceHost.Deregister((IService)service); - } - - public override Task StartAsync() - { - connection.Start(); - return TaskExt.CompletedTask; - } - - public override Task StopAsync() - { - return connection.StopAsync(); - } - - internal void AddClient(SimpleInMemConnection client) - { - var connectedEventArgs = new ConnectedEventArgs(client); - Error disconnectError = OnConnected(connectedEventArgs); - - if (disconnectError != null) - { - Log.Information("{0}.{1}: Rejecting connection {2} because {3}:{4}.", - logname, nameof(AddClient), client.Id, disconnectError.error_code, disconnectError.message); - throw new SimpleInMemProtocolErrorException( - "Connection rejected", - details: disconnectError, - innerException: null); - } - - connection.AddClientConnection(client); - } - - internal Error InvokeOnConnected(ConnectedEventArgs args) - { - return OnConnected(args); - } - - internal void InvokeOnDisconnected(DisconnectedEventArgs args) - { - OnDisconnected(args); - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/SimpleInMemProtocolErrorException.cs b/cs/src/comm/simpleinmem-transport/SimpleInMemProtocolErrorException.cs deleted file mode 100644 index e0073cdc..00000000 --- a/cs/src/comm/simpleinmem-transport/SimpleInMemProtocolErrorException.cs +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem -{ - using System; - - public class SimpleInMemProtocolErrorException : TransportException - { - public SimpleInMemProtocolErrorException(string message) : base(message) - { - } - - public SimpleInMemProtocolErrorException(string message, Exception innerException) : this( - message, - details: null, - innerException: innerException) - { - } - - public SimpleInMemProtocolErrorException( - string message, - Error details, - Exception innerException) - : base(message, innerException) - { - Details = details; - } - - public Error Details { get; } - } -} diff --git a/cs/src/comm/simpleinmem-transport/SimpleInMemTransport.bond b/cs/src/comm/simpleinmem-transport/SimpleInMemTransport.bond deleted file mode 100644 index 69d27369..00000000 --- a/cs/src/comm/simpleinmem-transport/SimpleInMemTransport.bond +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace bond.comm.simpleinmem; - -enum PayloadType -{ - Request = 1; - Response = 2; - Event = 3; -} - -struct SimpleInMemHeaders -{ - 0: uint64 conversation_id; - 1: required PayloadType payload_type; - 2: string method_name; -} diff --git a/cs/src/comm/simpleinmem-transport/SimpleInMemTransport.cs b/cs/src/comm/simpleinmem-transport/SimpleInMemTransport.cs deleted file mode 100644 index 2b9662ac..00000000 --- a/cs/src/comm/simpleinmem-transport/SimpleInMemTransport.cs +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Bond.Comm.SimpleInMem -{ - using Service; - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - - public class SimpleInMemTransportBuilder : TransportBuilder - { - public override SimpleInMemTransport Construct() - { - return new SimpleInMemTransport(LayerStack); - } - } - - public class SimpleInMemTransport : Transport - { - object listenersLock = new object(); - IDictionary listeners = new Dictionary(); - readonly ILayerStack layerStack; - - public SimpleInMemTransport(ILayerStack layerStack) - { - this.layerStack = layerStack; - } - - public override ILayerStack LayerStack - { - get - { - return this.layerStack; - } - } - - public async override Task ConnectToAsync(string address, CancellationToken ct) - { - Log.Information("{0}.{1}: Connecting to {2}.", - nameof(SimpleInMemTransport), nameof(ConnectToAsync), address); - SimpleInMemListener listener; - - lock (listenersLock) - { - if (!listeners.TryGetValue(address, out listener)) - { - var errorFormat = "{0}.{1}: Listener not found for address: {2}"; - var message = LogUtil.FatalAndReturnFormatted(errorFormat, - nameof(SimpleInMemTransport), nameof(ConnectToAsync), address); - throw new ArgumentException(message); - } - } - - return await Task.Run(() => - { - var connection = new SimpleInMemConnection(this, (SimpleInMemListener)GetListener(address), ConnectionType.Client); - listener.AddClient(connection); - connection.Start(); - return connection; - }, ct); - } - - public override Listener MakeListener(string address) - { - SimpleInMemListener listener; - - if (!listeners.TryGetValue(address, out listener)) - { - lock (listenersLock) - { - if (!listeners.TryGetValue(address, out listener)) - { - listener = new SimpleInMemListener(this, address); - listeners.Add(address, listener); - } - } - } - - return listener; - } - - public Listener GetListener(string address) - { - SimpleInMemListener listener; - listeners.TryGetValue(address, out listener); - return listener; - } - - public override Task StopAsync() - { - lock (listenersLock) - { - foreach (SimpleInMemListener listener in listeners.Values) - { - listener.StopAsync(); - } - listeners.Clear(); - } - - return TaskExt.CompletedTask; - } - - public bool ListenerExists(string address) - { - return listeners.ContainsKey(address); - } - - public SimpleInMemListener RemoveListener(string address) - { - SimpleInMemListener listener; - - if (listeners.TryGetValue(address, out listener)) - { - listeners.Remove(address); - } - - return listener; - } - } -} diff --git a/cs/src/comm/simpleinmem-transport/packages.config b/cs/src/comm/simpleinmem-transport/packages.config deleted file mode 100644 index 9bbce38e..00000000 --- a/cs/src/comm/simpleinmem-transport/packages.config +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/cs/src/comm/simpleinmem-transport/properties/AssemblyInfo.cs b/cs/src/comm/simpleinmem-transport/properties/AssemblyInfo.cs deleted file mode 100644 index 6b2338bc..00000000 --- a/cs/src/comm/simpleinmem-transport/properties/AssemblyInfo.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System.Reflection; -using System.Resources; -using System.Runtime.CompilerServices; - -[assembly: AssemblyTitle("Bond.Comm.SimpleInMem")] -[assembly: AssemblyCompany("Microsoft")] -[assembly: AssemblyProduct("Bond")] -[assembly: AssemblyCopyright("Copyright (C) Microsoft. All rights reserved.")] -[assembly: NeutralResourcesLanguage("en")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] - -#if (DELAY_SIGN) -[assembly: InternalsVisibleTo("Bond, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")] -#else -[assembly: InternalsVisibleTo("Bond, PublicKey=00240000048000009400000006020000002400005253413100040000010001000d504ac18b4b149d2f7b0059b482f9b6d44d39059e6a96ff0a2a52678b5cfd8567cc67254132cd2debb5b95f6a1206a15c6f8ddac137c6c3ef4995f28c359acaa683a90995c8f08df7ce0aaa8836d331a344a514c443f112f80bf2ebed40ccb32d7df63c09b0d7bef80aecdc23ec200a458d4f8bafbcdeb9bf5ba111fbbd4787")] -#endif diff --git a/cs/src/comm/simpleinmem-transport/simpleinmem-transport.csproj b/cs/src/comm/simpleinmem-transport/simpleinmem-transport.csproj deleted file mode 100644 index 50224f6b..00000000 --- a/cs/src/comm/simpleinmem-transport/simpleinmem-transport.csproj +++ /dev/null @@ -1,63 +0,0 @@ - - - - - - {54A3432B-99E1-4DEB-B4EB-2D6E158ECD24} - Library - Properties - Bond.Comm.SimpleInMem - Bond.Comm.SimpleInMem - true - true - - - - - - - - - - - - - - - - --namespace=bond.comm.simpleinmem=Bond.Comm.SimpleInMem - - - - - {92915bd9-8ab1-4e4d-a2ac-95bbf4f82d89} - Attributes - - - {43cbba9b-c4bc-4e64-8733-7b72562d2e91} - Bond - - - {45efb397-298a-4a32-a178-a2bdf8abbbd9} - interfaces - - - {5f6cbc77-8fb5-4644-bab5-f8e62792266e} - layers - - - {79d2423a-87c8-44a2-89c2-2fa94521747e} - service - - - - - ..\..\..\packages\Microsoft.Tpl.Dataflow.4.5.24\lib\portable-net45+win8+wpa81\System.Threading.Tasks.Dataflow.dll - True - - - - - - - \ No newline at end of file diff --git a/cs/test/comm/SimpleInMem/Calculator.bond b/cs/test/comm/SimpleInMem/Calculator.bond deleted file mode 100644 index f700b792..00000000 --- a/cs/test/comm/SimpleInMem/Calculator.bond +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -import "SimpleStruct.bond" - -namespace unittest.simpleinmem - -// -// Method declarations for Calculator Service -// -service Calculator -{ - Output Add(PairedInput); - Output Subtract(PairedInput); - Output Multiply(PairedInput); - nothing Clear(); -}; \ No newline at end of file diff --git a/cs/test/comm/SimpleInMem/CalculatorServiceImpl.cs b/cs/test/comm/SimpleInMem/CalculatorServiceImpl.cs deleted file mode 100644 index 54ff0070..00000000 --- a/cs/test/comm/SimpleInMem/CalculatorServiceImpl.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace UnitTest.SimpleInMem -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Bond.Comm; - - internal class CalculatorService : CalculatorServiceBase - { - private const UInt16 DelayMilliseconds = 30; - - public static ManualResetEvent ClearCalledEvent { get; set; } = new ManualResetEvent(false); - - public override async Task> AddAsync(IMessage request, CancellationToken ct) - { - PairedInput req = request.Convert().Payload.Deserialize(); - var res = new Output { Result = req.First + req.Second }; - await Task.Delay(DelayMilliseconds); - - return Message.FromPayload(res); - } - - public override async Task> SubtractAsync(IMessage request, CancellationToken ct) - { - PairedInput req = request.Convert().Payload.Deserialize(); - var res = new Output { Result = req.First - req.Second }; - await Task.Delay(DelayMilliseconds); - - return Message.FromPayload(res); - } - - public override Task> MultiplyAsync(IMessage request, CancellationToken ct) - { - throw new NotImplementedException(); - } - - public override void ClearAsync(IMessage param) - { - ClearCalledEvent.Set(); - } - } -} \ No newline at end of file diff --git a/cs/test/comm/SimpleInMem/SimpleInMemConnectionTest.cs b/cs/test/comm/SimpleInMem/SimpleInMemConnectionTest.cs deleted file mode 100644 index 47a5e5ec..00000000 --- a/cs/test/comm/SimpleInMem/SimpleInMemConnectionTest.cs +++ /dev/null @@ -1,377 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace UnitTest.SimpleInMem -{ - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading; - using System.Threading.Tasks; - using Bond.Comm; - using Bond.Comm.Layers; - using Bond.Comm.SimpleInMem; - using NUnit.Framework; - using UnitTest.Comm; - using UnitTest.Layers; - - [TestFixture] - public class SimpleInMemConnectionTest - { - private const string address = "SimpleInMemTakesAnyRandomConnectionString"; - private TransportBuilder transportBuilder; - private SimpleInMemTransport transport; - private SimpleInMemListener listener; - private SimpleInMemConnection[] connections; - - [SetUp] - public void Init() - { - transportBuilder = new SimpleInMemTransportBuilder(); - } - - public async Task DefaultSetup(IService service, int count) - { - transport = transportBuilder.Construct(); - listener = (SimpleInMemListener)transport.MakeListener(address); - listener.AddService(service); - await listener.StartAsync(); - - connections = new SimpleInMemConnection[count]; - - for (int connectionIndex = 0; connectionIndex < count; connectionIndex++) - { - connections[connectionIndex] = (SimpleInMemConnection)await transport.ConnectToAsync(address, System.Threading.CancellationToken.None); - } - } - - [Test] - public async void ConnectionStateCycle() - { - await DefaultSetup(new CalculatorService(), 1); - - SimpleInMemConnection localConnection = - (SimpleInMemConnection)await transport.ConnectToAsync(address, System.Threading.CancellationToken.None); - Assert.AreEqual(localConnection.State, CnxState.Connected); - - await localConnection.StopAsync(); - Assert.AreEqual(localConnection.State, CnxState.Disconnected); - } - - [Test] - public async void ConnectionStateCycle_CloseAlreadyClosedConnection() - { - await DefaultSetup(new CalculatorService(), 1); - - SimpleInMemConnection localConnection = - (SimpleInMemConnection)await transport.ConnectToAsync(address, System.Threading.CancellationToken.None); - Assert.AreEqual(localConnection.State, CnxState.Connected); - - // Ensure that closing an already closed connection is no-op - for (int index = 0; index < 5; index++) - { - await localConnection.StopAsync(); - Assert.AreEqual(localConnection.State, CnxState.Disconnected); - } - } - - [Test] - public async void ValidSetup() - { - await DefaultSetup(new CalculatorService(), 1); - Assert.AreEqual(connections[0].ConnectionType, ConnectionType.Client); - } - - [Test] - public async Task MethodCall() - { - await DefaultSetup(new CalculatorService(), 1); - - const int first = 91; - const int second = 23; - int addResult = first + second; - int subResult = first - second; - - var calculatorProxy = new CalculatorProxy(connections[0]); - - var input = new PairedInput - { - First = first, - Second = second - }; - var request = new Message(input); - IMessage addResponse = await calculatorProxy.AddAsync(request, System.Threading.CancellationToken.None); - IMessage subResponse = await calculatorProxy.SubtractAsync(request, System.Threading.CancellationToken.None); - Output addOutput = addResponse.Payload.Deserialize(); - Output subOutput = subResponse.Payload.Deserialize(); - Assert.AreEqual(addResult, addOutput.Result); - Assert.AreEqual(subResult, subOutput.Result); - } - - [Test] - public async void EventCall() - { - await DefaultSetup(new CalculatorService(), 1); - var calculatorProxy = new CalculatorProxy(connections[0]); - - calculatorProxy.ClearAsync(); - - bool wasSignaled = CalculatorService.ClearCalledEvent.WaitOne(30000); - Assert.IsTrue(wasSignaled, "Timed out waiting for event"); - } - - [Test] - public async Task MultipleClientConnectionsMethodCalls() - { - Stopwatch sw = Stopwatch.StartNew(); - - await DefaultSetup(new CalculatorService(), 10); - - const int first = 91; - const int second = 23; - int expectedAddResult = first + second; - int expectedSubResult = first - second; - Task[] connectionTasks = new Task[connections.Length]; - - for (int connectionIndex = 0; connectionIndex < connections.Length; connectionIndex++) - { - SimpleInMemConnection conn = connections[connectionIndex]; - connectionTasks[connectionIndex] = Task.Run(() => - { - int taskCount = 25; - Task[] tasks = new Task[taskCount]; - - for (int taskIndex = 0; taskIndex < taskCount; taskIndex++) - { - tasks[taskIndex] = Task.Run(async () => - { - var calculatorProxy = new CalculatorProxy(conn); - var input = new PairedInput - { - First = first, - Second = second - }; - - var request = new Message(input); - IMessage addResponse = await calculatorProxy.AddAsync(request, System.Threading.CancellationToken.None); - IMessage subResponse = await calculatorProxy.SubtractAsync(request, System.Threading.CancellationToken.None); - Output addOutput = addResponse.Payload.Deserialize(); - Output subOutput = subResponse.Payload.Deserialize(); - Assert.AreEqual(expectedAddResult, addOutput.Result); - Assert.AreEqual(expectedSubResult, subOutput.Result); - }); - } - - Task.WaitAll(tasks); - }); - } - - Task.WaitAll(connectionTasks); - sw.Stop(); - Console.WriteLine($"{nameof(MultipleClientConnectionsMethodCalls)} test time: {sw.Elapsed.TotalSeconds}"); - } - - [Test] - public async Task MethodCall_WithServiceError() - { - await DefaultSetup(new CalculatorService(), 1); - - const int first = 91; - const int second = 23; - - var calculatorProxy = new CalculatorProxy(connections[0]); - - var input = new PairedInput - { - First = first, - Second = second - }; - var request = new Message(input); - IMessage multiplyResponse = await calculatorProxy.MultiplyAsync(request, System.Threading.CancellationToken.None); - Assert.IsTrue(multiplyResponse.IsError); - InternalServerError error = multiplyResponse.Error.Deserialize(); - Assert.AreEqual((int)ErrorCode.InternalServerError, error.error_code); - Assert.That(error.message, Is.StringContaining(Transport.InternalErrorMessage)); - } - - [Test] - public async Task MethodCall_WithMethodNotFound() - { - await DefaultSetup(new CalculatorService(), 1); - - const int first = 91; - const int second = 23; - const string methodName = "Divide"; - - var input = new PairedInput - { - First = first, - Second = second - }; - var request = new Message(input); - IMessage divideResponse = await connections[0].RequestResponseAsync(methodName, request, new System.Threading.CancellationToken()); - Assert.IsTrue(divideResponse.IsError); - Error error = divideResponse.Error.Deserialize(); - Assert.AreEqual((int)ErrorCode.MethodNotFound, error.error_code); - Assert.That(error.message, Is.StringContaining($"Got request for unknown method [{methodName}].")); - } - - [Test] - public async Task MethodCall_WithLayerStack() - { - var testList = new List(); - var layer1 = new TestLayer_Append("foo", testList); - var layer2 = new TestLayer_Append("bar", testList); - - transportBuilder.SetLayerStack(new LayerStack(layer1, layer2)); - await DefaultSetup(new CalculatorService(), 1); - - const int first = 91; - const int second = 23; - int addResult = first + second; - - var calculatorProxy = new CalculatorProxy(connections[0]); - - var input = new PairedInput - { - First = first, - Second = second - }; - var request = new Message(input); - IMessage addResponse = await calculatorProxy.AddAsync(request, System.Threading.CancellationToken.None); - Output addOutput = addResponse.Payload.Deserialize(); - Assert.AreEqual(addResult, addOutput.Result); - - Assert.AreEqual(8, testList.Count); - Assert.AreEqual(layer1.value, testList[0]); - Assert.AreEqual(testList[0] + layer2.value, testList[1]); - Assert.AreEqual(testList[1] + layer2.value, testList[2]); - Assert.AreEqual(testList[2] + layer1.value, testList[3]); - Assert.AreEqual(layer1.value, testList[4]); - Assert.AreEqual(testList[4] + layer2.value, testList[5]); - Assert.AreEqual(testList[5] + layer2.value, testList[6]); - Assert.AreEqual(testList[6] + layer1.value, testList[7]); - } - - [Test] - public async Task MethodCall_ReqRsp_WithLayerStackErrors() - { - var errorLayer = new TestLayer_ReturnErrors(); - transportBuilder.SetLayerStack(new LayerStack(errorLayer)); - var testService = new DummyTestService(); - await DefaultSetup(testService, 1); - - var proxy = new DummyTestProxy(connections[0]); - var request = new Dummy { int_value = 100 }; - - errorLayer.SetState(MessageType.Request, errorOnSend: false, errorOnReceive: true); - IMessage response = await proxy.ReqRspMethodAsync(request); - - Assert.IsTrue(response.IsError); - Assert.AreEqual(TestLayer_ReturnErrors.ReceiveError, response.Error.Deserialize().error_code, "Bad error 1"); - - Assert.AreEqual(0, testService.RequestCount); - Assert.AreEqual(Dummy.Empty.int_value, testService.LastRequestReceived.int_value); - - errorLayer.SetState(MessageType.Request, errorOnSend: true, errorOnReceive: false); - request.int_value = 101; - response = await proxy.ReqRspMethodAsync(request); - - Assert.IsTrue(response.IsError); - Assert.AreEqual(TestLayer_ReturnErrors.SendError, response.Error.Deserialize().error_code); - - Assert.AreEqual(0, testService.RequestCount); - Assert.AreEqual(Dummy.Empty.int_value, testService.LastRequestReceived.int_value); - - errorLayer.SetState(MessageType.Response, errorOnSend: true, errorOnReceive: false); - request.int_value = 102; - response = await proxy.ReqRspMethodAsync(request); - - Assert.IsTrue(response.IsError); - Assert.AreEqual(TestLayer_ReturnErrors.SendError, response.Error.Deserialize().error_code); - - Assert.AreEqual(1, testService.RequestCount); - Assert.AreEqual(request.int_value, testService.LastRequestReceived.int_value); - - errorLayer.SetState(MessageType.Response, errorOnSend: false, errorOnReceive: true); - request.int_value = 103; - response = await proxy.ReqRspMethodAsync(request); - - Assert.IsTrue(response.IsError); - Assert.AreEqual(TestLayer_ReturnErrors.ReceiveError, response.Error.Deserialize().error_code); - - Assert.AreEqual(2, testService.RequestCount); - Assert.AreEqual(request.int_value, testService.LastRequestReceived.int_value); - - errorLayer.SetState(MessageType.Event, errorOnSend: true, errorOnReceive: true); - request.int_value = 104; - response = await proxy.ReqRspMethodAsync(request); - - Assert.IsFalse(response.IsError); - Assert.AreEqual(105, response.Payload.Deserialize().int_value); - - Assert.AreEqual(3, testService.RequestCount); - Assert.AreEqual(request.int_value, testService.LastRequestReceived.int_value); - } - - [Test] - public async Task MethodCall_Event_WithLayerStackErrors() - { - var errorLayer = new TestLayer_ReturnErrors(); - transportBuilder.SetLayerStack(new LayerStack(errorLayer)); - var testService = new DummyTestService(); - await DefaultSetup(testService, 1); - - var proxy = new DummyTestProxy(connections[0]); - var theEvent = new Dummy { int_value = 100 }; - - errorLayer.SetState(MessageType.Event, errorOnSend: false, errorOnReceive: true); - - ManualResetEventSlim waitForEvent = testService.CreateResetEvent(); - proxy.EventMethodAsync(theEvent); - bool wasSignaled = waitForEvent.Wait(TimeSpan.FromSeconds(1)); - Assert.IsFalse(wasSignaled, "Event should not fire 1"); - testService.ClearResetEvent(); - - Assert.AreEqual(0, testService.EventCount); - Assert.AreEqual(Dummy.Empty.int_value, testService.LastEventReceived.int_value); - - errorLayer.SetState(MessageType.Event, errorOnSend: true, errorOnReceive: false); - theEvent.int_value = 101; - - waitForEvent = testService.CreateResetEvent(); - proxy.EventMethodAsync(theEvent); - wasSignaled = waitForEvent.Wait(TimeSpan.FromSeconds(1)); - Assert.IsFalse(wasSignaled, "Event should not fire 2"); - testService.ClearResetEvent(); - - Assert.AreEqual(0, testService.EventCount); - Assert.AreEqual(Dummy.Empty.int_value, testService.LastEventReceived.int_value); - - errorLayer.SetState(MessageType.Event, errorOnSend: false, errorOnReceive: false); - theEvent.int_value = 102; - - waitForEvent = testService.CreateResetEvent(); - proxy.EventMethodAsync(theEvent); - wasSignaled = waitForEvent.Wait(TimeSpan.FromSeconds(1)); - Assert.IsTrue(wasSignaled, "Timed out waiting for event to fire"); - - Assert.AreEqual(1, testService.EventCount); - Assert.AreEqual(theEvent.int_value, testService.LastEventReceived.int_value); - } - - [TearDown] - public async void Cleanup() - { - if (connections != null) - { - for (int connectionIndex = 0; connectionIndex < connections.Length; connectionIndex++) - { - await connections[connectionIndex].StopAsync(); - } - } - connections = null; - transport.RemoveListener(address); - } - } -} diff --git a/cs/test/comm/SimpleInMem/SimpleInMemListenerTest.cs b/cs/test/comm/SimpleInMem/SimpleInMemListenerTest.cs deleted file mode 100644 index 8794936a..00000000 --- a/cs/test/comm/SimpleInMem/SimpleInMemListenerTest.cs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace UnitTest.SimpleInMem -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Bond.Comm; - using Bond.Comm.SimpleInMem; - using NUnit.Framework; - - [TestFixture] - public class SimpleInMemListenerTest - { - private readonly string address = "SimpleInMemTakesAnyRandomConnectionString"; - private SimpleInMemTransport transport; - private CalculatorService service; - - [SetUp] - public void Init() - { - transport = new SimpleInMemTransportBuilder().Construct(); - service = new CalculatorService(); - } - - [TearDown] - public void Cleanup() - { - transport.RemoveListener(address); - } - - [Test] - public void CreateInMemTransportListener() - { - var listener = transport.MakeListener(address); - Assert.IsNotNull(listener); - } - - [Test] - public async Task StartStopInMemTransportListener() - { - var listener = transport.MakeListener(address); - Assert.IsNotNull(listener); - await listener.StartAsync(); - await listener.StopAsync(); - } - - [Test] - public void AddRemoveService() - { - SimpleInMemListener listener = (SimpleInMemListener)transport.MakeListener(address); - listener.AddService(service); - - foreach (var serviceMethod in service.Methods) - { - Assert.True(listener.IsRegistered($"{serviceMethod.MethodName}")); - } - - Assert.False(listener.IsRegistered("Divide")); - listener.RemoveService(service); - - foreach (var serviceMethod in service.Methods) - { - Assert.False(listener.IsRegistered($"{serviceMethod.MethodName}")); - } - Assert.False(listener.IsRegistered("Divide")); - } - - [Test] - public async Task ConnectedEvent_HasRightRemoteEndpointDetails() - { - SimpleInMemConnection listenerConnection = null; - var connectedEventDone = new ManualResetEventSlim(initialState: false); - - SimpleInMemListener listener = (SimpleInMemListener)transport.MakeListener(address); - listener.Connected += (sender, args) => - { - Assert.AreSame(listener, sender); - listenerConnection = (SimpleInMemConnection)args.Connection; - connectedEventDone.Set(); - }; - - await listener.StartAsync(); - var connection = (SimpleInMemConnection)await transport.ConnectToAsync(address); - bool wasSignaled = connectedEventDone.Wait(TimeSpan.FromSeconds(30)); - Assert.IsTrue(wasSignaled, "Timed out waiting for Connected event to complete"); - - Assert.AreEqual(connection.Id, listenerConnection.Id); - } - - [Test] - public async Task ConnectedEvent_SetsDisconnectError_ConnectToThrows() - { - var disconnectError = new Error { error_code = 100, message = "Go away!" }; - - SimpleInMemListener listener = (SimpleInMemListener)transport.MakeListener(address); - listener.Connected += (sender, args) => - { - args.DisconnectError = disconnectError; - }; - - await listener.StartAsync(); - try - { - await transport.ConnectToAsync(address); - Assert.Fail("Expected an exception to be thrown, but one wasn't."); - } - catch (SimpleInMemProtocolErrorException ex) - { - Assert.AreSame(disconnectError, ex.Details); - } - } - } -} diff --git a/cs/test/comm/SimpleInMem/SimpleInMemTransportTest.cs b/cs/test/comm/SimpleInMem/SimpleInMemTransportTest.cs deleted file mode 100644 index bf0e7eeb..00000000 --- a/cs/test/comm/SimpleInMem/SimpleInMemTransportTest.cs +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace UnitTest.SimpleInMem -{ - using System; - using System.Threading.Tasks; - using Bond.Comm; - using Bond.Comm.SimpleInMem; - using NUnit.Framework; - using UnitTest.Comm; - - [TestFixture] - public class SimpleInMemTransportTest - { - private readonly string address = "SimpleInMemTakesAnyRandomConnectionString"; - private SimpleInMemTransport transport; - - [SetUp] - public void Init() - { - transport = new SimpleInMemTransportBuilder().Construct(); - } - - [TearDown] - public void Cleanup() - { - transport.StopAsync(); - transport = null; - } - - [Test] - public void Builder_Construct_NoArgs_Succeeds() - { - var builder = new SimpleInMemTransportBuilder(); - Assert.NotNull(builder.Construct()); - } - - [Test] - public void StopAsync() - { - Listener newListener = transport.MakeListener(address); - Assert.True(newListener == transport.GetListener(address)); - transport.StopAsync(); - Assert.False(transport.ListenerExists(address)); - transport.MakeListener(address); - Assert.True(transport.ListenerExists(address)); - } - - [Test] - public void ConnectToAsync_NoListenerRunning() - { - Assert.Throws(async () => await transport.ConnectToAsync(address, new System.Threading.CancellationToken())); - } - - [Test] - public void MakeListener() - { - bool listenerExists = transport.ListenerExists(address); - Assert.False(listenerExists); - var listener = transport.MakeListener(address); - listenerExists = transport.ListenerExists(address); - Assert.True(listenerExists); - transport.RemoveListener(address); - Assert.Null(transport.GetListener(address)); - } - - [Test] - public async Task ConnectToAsync() - { - Listener l = transport.MakeListener(address); - Connection conn = await transport.ConnectToAsync(address, new System.Threading.CancellationToken()); - Assert.NotNull(conn); - Assert.True(conn is SimpleInMemConnection); - SimpleInMemConnection simpleConn = (SimpleInMemConnection)conn; - Assert.True(simpleConn.ConnectionType == ConnectionType.Client); - transport.RemoveListener(address); - Assert.Null(transport.GetListener(address)); - } - } -} diff --git a/cs/test/comm/SimpleInMem/SimpleStruct.bond b/cs/test/comm/SimpleInMem/SimpleStruct.bond deleted file mode 100644 index fe2ab7c7..00000000 --- a/cs/test/comm/SimpleInMem/SimpleStruct.bond +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace unittest.simpleinmem - -struct PairedInput -{ - 0: int32 First; - 1: int32 Second; -} - -struct Output -{ - 0: int32 Result; -} \ No newline at end of file diff --git a/cs/test/comm/comm.csproj b/cs/test/comm/comm.csproj index e7cf9abc..7ce34287 100644 --- a/cs/test/comm/comm.csproj +++ b/cs/test/comm/comm.csproj @@ -34,16 +34,6 @@ - - - - - - --namespace=unittest.simpleinmem=UnitTest.SimpleInMem - - - --namespace=unittest.simpleinmem=UnitTest.SimpleInMem - @@ -51,9 +41,6 @@ - - - @@ -98,10 +85,6 @@ {5f6cbc77-8fb5-4644-bab5-f8e62792266e} layers - - {54a3432b-99e1-4deb-b4eb-2d6e158ecd24} - simpleinmem-transport - {c687c52c-0a5b-4f10-8cb3-dbaf9a72d042} epoxy-transport @@ -112,4 +95,4 @@ - + \ No newline at end of file