Add a wrapper for Socket interface
This commit is contained in:
Родитель
5512ccd48e
Коммит
3a16d09187
|
@ -31,6 +31,7 @@ scala/dependency-reduced-pom.xml
|
|||
build/runtime/
|
||||
build/tools/
|
||||
build/examples/
|
||||
build/dependencies/
|
||||
*.log
|
||||
lib/
|
||||
|
||||
|
|
|
@ -83,6 +83,9 @@
|
|||
<Compile Include="Interop\Ipc\JvmObjectReference.cs" />
|
||||
<Compile Include="Interop\Ipc\PayloadHelper.cs" />
|
||||
<Compile Include="Interop\Ipc\SerDe.cs" />
|
||||
<Compile Include="Network\DefaultSocketWrapper.cs" />
|
||||
<Compile Include="Network\ISocketWrapper.cs" />
|
||||
<Compile Include="Network\SocketFactory.cs" />
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="Proxy\IDataFrameNaFunctionsProxy.cs" />
|
||||
<Compile Include="Proxy\IDataFrameProxy.cs" />
|
||||
|
|
|
@ -12,6 +12,7 @@ using System.Runtime.Serialization;
|
|||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
[assembly: InternalsVisibleTo("CSharpWorker")]
|
||||
|
@ -181,33 +182,33 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
/// A simple TCP server that intercepts shutdown() in order to interrupt
|
||||
/// our continuous polling on the handler.
|
||||
/// </summary>
|
||||
internal class AccumulatorServer : System.Net.Sockets.TcpListener
|
||||
internal class AccumulatorServer
|
||||
{
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(AccumulatorServer));
|
||||
private volatile bool serverShutdown;
|
||||
private ISocketWrapper innerSocket;
|
||||
|
||||
internal AccumulatorServer()
|
||||
: base(IPAddress.Loopback, 0)
|
||||
{
|
||||
|
||||
innerSocket = SocketFactory.CreateSocket();
|
||||
}
|
||||
|
||||
internal void Shutdown()
|
||||
{
|
||||
serverShutdown = true;
|
||||
base.Stop();
|
||||
innerSocket.Close();
|
||||
}
|
||||
|
||||
internal int StartUpdateServer()
|
||||
{
|
||||
base.Start();
|
||||
innerSocket.Listen();
|
||||
Task.Run(() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
IFormatter formatter = new BinaryFormatter();
|
||||
using (Socket s = AcceptSocket())
|
||||
using (var ns = new NetworkStream(s))
|
||||
using (var s = innerSocket.Accept())
|
||||
using (var ns = s.GetStream())
|
||||
{
|
||||
while (!serverShutdown)
|
||||
{
|
||||
|
@ -237,7 +238,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
}
|
||||
catch (SocketException e)
|
||||
{
|
||||
if (e.ErrorCode != 10004) // A blocking operation was interrupted by a call to WSACancelBlockingCall - TcpListener.Stop cancelled AccepSocket as expected
|
||||
if (e.ErrorCode != 10004) // A blocking operation was interrupted by a call to WSACancelBlockingCall - ISocketWrapper.Close canceled Accep() as expected
|
||||
throw e;
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -247,7 +248,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
}
|
||||
});
|
||||
|
||||
return (base.LocalEndpoint as IPEndPoint).Port;
|
||||
return (innerSocket.LocalEndPoint as IPEndPoint).Port;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,12 +5,12 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Text;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Sql;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Core
|
||||
|
@ -23,10 +23,10 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public IEnumerable<dynamic> Collect(int port, SerializedMode serializedMode, Type type)
|
||||
{
|
||||
IFormatter formatter = new BinaryFormatter();
|
||||
Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
var sock = SocketFactory.CreateSocket();
|
||||
sock.Connect(IPAddress.Loopback, port);
|
||||
|
||||
using (NetworkStream s = new NetworkStream(sock))
|
||||
using (var s = sock.GetStream())
|
||||
{
|
||||
byte[] buffer;
|
||||
while ((buffer = SerDe.ReadBytes(s)) != null && buffer.Length > 0)
|
||||
|
|
|
@ -5,23 +5,24 @@ using System;
|
|||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Interop.Ipc
|
||||
{
|
||||
/// <summary>
|
||||
/// Implementation of thread safe IPC bridge between JVM and CLR
|
||||
/// throught a concourrent socket connection queue (lightweight synchronisation mechanism)
|
||||
/// Using a concurrent socket connection queue (lightweight synchronization mechanism)
|
||||
/// supporting async JVM calls like StreamingContext.AwaitTermination()
|
||||
/// </summary>
|
||||
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
|
||||
internal class JvmBridge : IJvmBridge
|
||||
{
|
||||
private int portNumber;
|
||||
private readonly ConcurrentQueue<Socket> sockets = new ConcurrentQueue<Socket>();
|
||||
private readonly ConcurrentQueue<ISocketWrapper> sockets = new ConcurrentQueue<ISocketWrapper>();
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(JvmBridge));
|
||||
|
||||
public void Initialize(int portNumber)
|
||||
|
@ -29,12 +30,12 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
|
|||
this.portNumber = portNumber;
|
||||
}
|
||||
|
||||
private Socket GetConnection()
|
||||
private ISocketWrapper GetConnection()
|
||||
{
|
||||
Socket socket;
|
||||
ISocketWrapper socket;
|
||||
if (!sockets.TryDequeue(out socket))
|
||||
{
|
||||
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
socket = SocketFactory.CreateSocket();
|
||||
socket.Connect(IPAddress.Loopback, portNumber);
|
||||
}
|
||||
return socket;
|
||||
|
@ -72,8 +73,8 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
|
|||
{
|
||||
var overallPayload = PayloadHelper.BuildPayload(isStatic, classNameOrJvmObjectReference, methodName, parameters);
|
||||
|
||||
Socket socket = GetConnection();
|
||||
using (NetworkStream s = new NetworkStream(socket))
|
||||
var socket = GetConnection();
|
||||
using (var s = socket.GetStream())
|
||||
{
|
||||
SerDe.Write(s, overallPayload);
|
||||
|
||||
|
@ -207,7 +208,7 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
|
|||
return paramsString.ToString();
|
||||
}
|
||||
|
||||
private object ReadCollection(NetworkStream s)
|
||||
private object ReadCollection(Stream s)
|
||||
{
|
||||
object returnValue;
|
||||
var listItemTypeAsChar = Convert.ToChar(s.ReadByte());
|
||||
|
@ -275,13 +276,12 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
|
|||
|
||||
public void Dispose()
|
||||
{
|
||||
Socket socket;
|
||||
ISocketWrapper socket;
|
||||
while (sockets.TryDequeue(out socket))
|
||||
{
|
||||
if (socket != null)
|
||||
{
|
||||
socket.Dispose();
|
||||
socket = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Network
|
||||
{
|
||||
/// <summary>
|
||||
/// A simple wrapper of System.Net.Sockets.Socket class.
|
||||
/// </summary>
|
||||
public class DefaultSocketWrapper : ISocketWrapper
|
||||
{
|
||||
private readonly Socket innerSocket;
|
||||
|
||||
/// <summary>
|
||||
/// Default constructor that creates a new instance of DefaultSocket class which represents
|
||||
/// a traditional socket (System.Net.Socket.Socket).
|
||||
///
|
||||
/// This socket is bound to Loopback with port 0.
|
||||
/// </summary>
|
||||
public DefaultSocketWrapper()
|
||||
{
|
||||
innerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
var localEndPoint = new IPEndPoint(IPAddress.Loopback, 0);
|
||||
innerSocket.Bind(localEndPoint);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a instance of DefaultSocket class using the specified System.Net.Socket.Socket object.
|
||||
/// </summary>
|
||||
/// <param name="socket">The existing socket</param>
|
||||
private DefaultSocketWrapper(Socket socket)
|
||||
{
|
||||
innerSocket = socket;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Accepts a incoming connection request.
|
||||
/// </summary>
|
||||
/// <returns>A DefaultSocket instance used to send and receive data</returns>
|
||||
public ISocketWrapper Accept()
|
||||
{
|
||||
var socket = innerSocket.Accept();
|
||||
return new DefaultSocketWrapper(socket);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Close the socket connections and releases all associated resources.
|
||||
/// </summary>
|
||||
public void Close()
|
||||
{
|
||||
innerSocket.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Establishes a connection to a remote host that is specified by an IP address and a port number
|
||||
/// </summary>
|
||||
/// <param name="remoteaddr">The IP address of the remote host</param>
|
||||
/// <param name="port">The port number of the remote host</param>
|
||||
public void Connect(IPAddress remoteaddr, int port)
|
||||
{
|
||||
var remoteEndPoint = new IPEndPoint(remoteaddr, port);
|
||||
innerSocket.Connect(remoteEndPoint);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the NetworkStream used to send and receive data.
|
||||
/// </summary>
|
||||
/// <returns>The underlying Stream instance that be used to send and receive data</returns>
|
||||
/// <remarks>
|
||||
/// GetStream returns a NetworkStream that you can use to send and receive data. You must close/dispose
|
||||
/// the NetworkStream by yourself. Closing DefaultSocketWrapper does not release the NetworkStream
|
||||
/// </remarks>
|
||||
public Stream GetStream()
|
||||
{
|
||||
return new NetworkStream(innerSocket);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Starts listening for incoming connections requests
|
||||
/// </summary>
|
||||
/// <param name="backlog">The maximum length of the pending connections queue. </param>
|
||||
public void Listen(int backlog = (int)SocketOptionName.MaxConnections)
|
||||
{
|
||||
innerSocket.Listen(backlog);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Disposes the resources used by this instance of the DefaultSocket class.
|
||||
/// </summary>
|
||||
/// <param name="disposing"></param>
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
innerSocket.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Releases all resources used by the current instance of the DefaultSocket class.
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Frees resources used by DefaultSocket class
|
||||
/// </summary>
|
||||
~DefaultSocketWrapper()
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the local endpoint.
|
||||
/// </summary>
|
||||
public EndPoint LocalEndPoint
|
||||
{
|
||||
get
|
||||
{
|
||||
return innerSocket.LocalEndPoint;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Network
|
||||
{
|
||||
/// <summary>
|
||||
/// ISocketWrapper interface defines the common methods to operate a socket (traditional socket or
|
||||
/// Windows Registered IO socket)
|
||||
/// </summary>
|
||||
public interface ISocketWrapper : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// Accepts a incoming connection request.
|
||||
/// </summary>
|
||||
/// <returns>A ISocket instance used to send and receive data</returns>
|
||||
ISocketWrapper Accept();
|
||||
|
||||
/// <summary>
|
||||
/// Close the ISocket connections and releases all associated resources.
|
||||
/// </summary>
|
||||
void Close();
|
||||
|
||||
/// <summary>
|
||||
/// Establishes a connection to a remote host that is specified by an IP address and a port number
|
||||
/// </summary>
|
||||
/// <param name="remoteaddr">The IP address of the remote host</param>
|
||||
/// <param name="port">The port number of the remote host</param>
|
||||
void Connect(IPAddress remoteaddr, int port);
|
||||
|
||||
/// <summary>
|
||||
/// Returns a stream used to send and receive data.
|
||||
/// </summary>
|
||||
/// <returns>The underlying Stream instance that be used to send and receive data</returns>
|
||||
Stream GetStream();
|
||||
|
||||
/// <summary>
|
||||
/// Starts listening for incoming connections requests
|
||||
/// </summary>
|
||||
/// <param name="backlog">The maximum length of the pending connections queue. </param>
|
||||
void Listen(int backlog = (int)SocketOptionName.MaxConnections);
|
||||
|
||||
/// <summary>
|
||||
/// Returns the local endpoint.
|
||||
/// </summary>
|
||||
EndPoint LocalEndPoint { get; }
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Network
|
||||
{
|
||||
/// <summary>
|
||||
/// SocketFactory is used to create ISocketWrapper instance based on a configuration and OS version.
|
||||
///
|
||||
/// The ISocket instance can be RioSocket object, if the configuration is set to RioSocket and
|
||||
/// only the application is running on a Windows OS that supports Registered IO socket.
|
||||
/// </summary>
|
||||
public static class SocketFactory
|
||||
{
|
||||
/// <summary>
|
||||
/// Creates a ISocket instance based on the configuration and OS version.
|
||||
/// </summary>
|
||||
/// <returns>
|
||||
/// A RioSocket instance, if the configuration is set to RioSocket and only the application
|
||||
/// is running on a Window OS that supports Registered IO socket. By default, it returns
|
||||
/// DefaultSocket instance which wraps System.Net.Sockets.Socket.
|
||||
/// </returns>
|
||||
public static ISocketWrapper CreateSocket()
|
||||
{
|
||||
return new DefaultSocketWrapper();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,12 +10,12 @@ using System.Net;
|
|||
using System.Net.Sockets;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
||||
|
@ -274,8 +274,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
|
||||
try
|
||||
{
|
||||
using (Socket sock = (Socket)socket)
|
||||
using (var s = new NetworkStream(sock))
|
||||
using (var sock = (ISocketWrapper)socket)
|
||||
using (var s = sock.GetStream())
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
|
@ -354,8 +354,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
|
||||
private int StartCallbackServer()
|
||||
{
|
||||
TcpListener callbackServer = new TcpListener(IPAddress.Loopback, 0);
|
||||
callbackServer.Start();
|
||||
var callbackServer = SocketFactory.CreateSocket();
|
||||
callbackServer.Listen();
|
||||
|
||||
Task.Run(() =>
|
||||
{
|
||||
|
@ -364,7 +364,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
ThreadPool.SetMaxThreads(10, 10);
|
||||
while (!callbackSocketShutdown)
|
||||
{
|
||||
Socket sock = callbackServer.AcceptSocket();
|
||||
var sock = callbackServer.Accept();
|
||||
ThreadPool.QueueUserWorkItem(ProcessCallbackRequest, sock);
|
||||
}
|
||||
}
|
||||
|
@ -377,13 +377,13 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
finally
|
||||
{
|
||||
if (callbackServer != null)
|
||||
callbackServer.Stop();
|
||||
callbackServer.Close();
|
||||
}
|
||||
});
|
||||
|
||||
int port = (callbackServer.LocalEndpoint as IPEndPoint).Port;
|
||||
int port = (callbackServer.LocalEndPoint as IPEndPoint).Port;
|
||||
logger.LogInfo("Callback server port number is {0}", port);
|
||||
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "connectCallback", port); //className and methodName hardcoded in CSharpBackendHandler
|
||||
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "connectCallback", port); //className and methodName hard coded in CSharpBackendHandler
|
||||
|
||||
return port;
|
||||
}
|
||||
|
|
|
@ -2672,7 +2672,7 @@
|
|||
<member name="T:Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge">
|
||||
<summary>
|
||||
Implementation of thread safe IPC bridge between JVM and CLR
|
||||
throught a concourrent socket connection queue (lightweight synchronisation mechanism)
|
||||
Using a concurrent socket connection queue (lightweight synchronization mechanism)
|
||||
supporting async JVM calls like StreamingContext.AwaitTermination()
|
||||
</summary>
|
||||
</member>
|
||||
|
@ -2881,6 +2881,139 @@
|
|||
<param name="s">The stream to write</param>
|
||||
<param name="value">The string to write</param>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper">
|
||||
<summary>
|
||||
A simple wrapper of System.Net.Sockets.Socket class.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Network.ISocketWrapper">
|
||||
<summary>
|
||||
ISocketWrapper interface defines the common methods to operate a socket (traditional socket or
|
||||
Windows Registered IO socket)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.ISocketWrapper.Accept">
|
||||
<summary>
|
||||
Accepts a incoming connection request.
|
||||
</summary>
|
||||
<returns>A ISocket instance used to send and receive data</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.ISocketWrapper.Close">
|
||||
<summary>
|
||||
Close the ISocket connections and releases all associated resources.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.ISocketWrapper.Connect(System.Net.IPAddress,System.Int32)">
|
||||
<summary>
|
||||
Establishes a connection to a remote host that is specified by an IP address and a port number
|
||||
</summary>
|
||||
<param name="remoteaddr">The IP address of the remote host</param>
|
||||
<param name="port">The port number of the remote host</param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.ISocketWrapper.GetStream">
|
||||
<summary>
|
||||
Returns a stream used to send and receive data.
|
||||
</summary>
|
||||
<returns>The underlying Stream instance that be used to send and receive data</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.ISocketWrapper.Listen(System.Int32)">
|
||||
<summary>
|
||||
Starts listening for incoming connections requests
|
||||
</summary>
|
||||
<param name="backlog">The maximum length of the pending connections queue. </param>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Network.ISocketWrapper.LocalEndPoint">
|
||||
<summary>
|
||||
Returns the local endpoint.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.#ctor">
|
||||
<summary>
|
||||
Default constructor that creates a new instance of DefaultSocket class which represents
|
||||
a traditional socket (System.Net.Socket.Socket).
|
||||
|
||||
This socket is bound to Loopback with port 0.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.#ctor(System.Net.Sockets.Socket)">
|
||||
<summary>
|
||||
Initializes a instance of DefaultSocket class using the specified System.Net.Socket.Socket object.
|
||||
</summary>
|
||||
<param name="socket">The existing socket</param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Accept">
|
||||
<summary>
|
||||
Accepts a incoming connection request.
|
||||
</summary>
|
||||
<returns>A DefaultSocket instance used to send and receive data</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Close">
|
||||
<summary>
|
||||
Close the socket connections and releases all associated resources.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Connect(System.Net.IPAddress,System.Int32)">
|
||||
<summary>
|
||||
Establishes a connection to a remote host that is specified by an IP address and a port number
|
||||
</summary>
|
||||
<param name="remoteaddr">The IP address of the remote host</param>
|
||||
<param name="port">The port number of the remote host</param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.GetStream">
|
||||
<summary>
|
||||
Returns the NetworkStream used to send and receive data.
|
||||
</summary>
|
||||
<returns>The underlying Stream instance that be used to send and receive data</returns>
|
||||
<remarks>
|
||||
GetStream returns a NetworkStream that you can use to send and receive data. You must close/dispose
|
||||
the NetworkStream by yourself. Closing DefaultSocketWrapper does not release the NetworkStream
|
||||
</remarks>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Listen(System.Int32)">
|
||||
<summary>
|
||||
Starts listening for incoming connections requests
|
||||
</summary>
|
||||
<param name="backlog">The maximum length of the pending connections queue. </param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Dispose(System.Boolean)">
|
||||
<summary>
|
||||
Disposes the resources used by this instance of the DefaultSocket class.
|
||||
</summary>
|
||||
<param name="disposing"></param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Dispose">
|
||||
<summary>
|
||||
Releases all resources used by the current instance of the DefaultSocket class.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.Finalize">
|
||||
<summary>
|
||||
Frees resources used by DefaultSocket class
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Network.DefaultSocketWrapper.LocalEndPoint">
|
||||
<summary>
|
||||
Returns the local endpoint.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Network.SocketFactory">
|
||||
<summary>
|
||||
SocketFactory is used to create ISocketWrapper instance based on a configuration and OS version.
|
||||
|
||||
The ISocket instance can be RioSocket object, if the configuration is set to RioSocket and
|
||||
only the application is running on a Windows OS that supports Registered IO socket.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Network.SocketFactory.CreateSocket">
|
||||
<summary>
|
||||
Creates a ISocket instance based on the configuration and OS version.
|
||||
</summary>
|
||||
<returns>
|
||||
A RioSocket instance, if the configuration is set to RioSocket and only the application
|
||||
is running on a Window OS that supports Registered IO socket. By default, it returns
|
||||
DefaultSocket instance which wraps System.Net.Sockets.Socket.
|
||||
</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Proxy.Ipc.DataFrameIpcProxy.Intersect(Microsoft.Spark.CSharp.Proxy.IDataFrameProxy)">
|
||||
<summary>
|
||||
Call https://github.com/apache/spark/blob/branch-1.4/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala, intersect(other: DataFrame): DataFrame
|
||||
|
@ -6325,7 +6458,7 @@
|
|||
<param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
|
||||
<returns>A DStream object</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32,System.Func{Microsoft.Spark.CSharp.Streaming.OffsetRange,System.Collections.Generic.IEnumerable{System.Collections.Generic.IList{System.Byte[]}}},System.Int32)">
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32)">
|
||||
<summary>
|
||||
Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
|
||||
|
||||
|
@ -6357,28 +6490,8 @@
|
|||
If numPartitions = 0, repartition using original kafka partition count
|
||||
If numPartitions > 0, repartition using this parameter
|
||||
</param>
|
||||
<param name="readFunc">if defined, will be used to read kafka on C# side</param>
|
||||
<param name="readTimeout">timeout for readFunc if value > 0</param>
|
||||
<returns>A DStream object</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.DirectKafkaStreamHelper.Execute(System.Int32,System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.Byte[],System.Byte[]}})">
|
||||
<summary>
|
||||
wrapping user defined kafka read function through OffsetRange interface
|
||||
collecting and passing back dropped offset range due to timeout if defined
|
||||
for JVM side driver to pick up and schedule a tast in next batch
|
||||
</summary>
|
||||
<param name="pid"></param>
|
||||
<param name="input"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.OffsetRange.op_Addition(Microsoft.Spark.CSharp.Streaming.OffsetRange,Microsoft.Spark.CSharp.Streaming.OffsetRange)">
|
||||
<summary>
|
||||
Accumulator friendly
|
||||
</summary>
|
||||
<param name="self"></param>
|
||||
<param name="other"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Streaming.MapWithStateDStream`4">
|
||||
<summary>
|
||||
DStream representing the stream of data generated by `mapWithState` operation on a pair DStream.
|
||||
|
|
|
@ -380,6 +380,23 @@
|
|||
---
|
||||
|
||||
|
||||
###<font color="#68228B">Microsoft.Spark.CSharp.Network.SocketFactory</font>
|
||||
####Summary
|
||||
|
||||
|
||||
SocketFactory is used to create ISocketWrapper instance based on a configuration and OS version.
|
||||
|
||||
The ISocket instance can be RioSocket object, if the configuration is set to RioSocket and
|
||||
only the application is running on a Windows OS that supports Registered IO socket.
|
||||
|
||||
|
||||
####Methods
|
||||
|
||||
<table><tr><th>Name</th><th>Description</th></tr><tr><td><font color="blue">CreateSocket</font></td><td>Creates a ISocket instance based on the configuration and OS version.</td></tr></table>
|
||||
|
||||
---
|
||||
|
||||
|
||||
###<font color="#68228B">Microsoft.Spark.CSharp.Sql.Column</font>
|
||||
####Summary
|
||||
|
||||
|
|
|
@ -1,21 +1,14 @@
|
|||
using System;
|
||||
using System.IO;
|
||||
using System.IO;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Interop;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
|
||||
using NUnit.Framework;
|
||||
using Moq;
|
||||
using AdapterTest.Mocks;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
|
||||
namespace AdapterTest
|
||||
{
|
||||
|
@ -27,7 +20,7 @@ namespace AdapterTest
|
|||
public class AccumulatorTest
|
||||
{
|
||||
private SparkContext sc;
|
||||
private Socket sock;
|
||||
private ISocketWrapper sock;
|
||||
|
||||
|
||||
[SetUp]
|
||||
|
@ -38,7 +31,7 @@ namespace AdapterTest
|
|||
|
||||
// get accumulator server port and connect to accumuator server
|
||||
int serverPort = (sc.SparkContextProxy as MockSparkContextProxy).AccumulatorServerPort;
|
||||
sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock = SocketFactory.CreateSocket();
|
||||
sock.Connect(IPAddress.Loopback, serverPort);
|
||||
}
|
||||
|
||||
|
@ -49,29 +42,31 @@ namespace AdapterTest
|
|||
|
||||
try
|
||||
{
|
||||
using (var s = new NetworkStream(sock))
|
||||
using (var s = sock.GetStream())
|
||||
{
|
||||
int numUpdates = 0;
|
||||
SerDe.Write(s, numUpdates);
|
||||
}
|
||||
|
||||
sock.Close();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// do nothing here
|
||||
}
|
||||
finally
|
||||
{
|
||||
sock.Close();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// test when no errors, accumuator server receives data as expected and exit with 0
|
||||
/// test when no errors, accumulator server receives data as expected and exit with 0
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestAccumuatorSuccess()
|
||||
{
|
||||
Accumulator<int> accumulator = sc.Accumulator<int>(0);
|
||||
|
||||
using (var s = new NetworkStream(sock))
|
||||
using (var s = sock.GetStream())
|
||||
{
|
||||
// write numUpdates
|
||||
int numUpdates = 1;
|
||||
|
@ -102,7 +97,7 @@ namespace AdapterTest
|
|||
[Test]
|
||||
public void TestUndefinedAccumuator()
|
||||
{
|
||||
using (var s = new NetworkStream(sock))
|
||||
using (var s = sock.GetStream())
|
||||
{
|
||||
// write numUpdates
|
||||
int numUpdates = 1;
|
||||
|
|
|
@ -3,21 +3,16 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Proxy.Ipc;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using NUnit.Framework;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
|
||||
namespace AdapterTest.Mocks
|
||||
{
|
||||
|
@ -204,13 +199,13 @@ namespace AdapterTest.Mocks
|
|||
return ms.ToArray();
|
||||
});
|
||||
|
||||
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var listener = SocketFactory.CreateSocket();
|
||||
listener.Listen();
|
||||
|
||||
Task.Run(() =>
|
||||
{
|
||||
using (Socket socket = listener.AcceptSocket())
|
||||
using (Stream ns = new NetworkStream(socket))
|
||||
using (var socket = listener.Accept())
|
||||
using (var ns = socket.GetStream())
|
||||
{
|
||||
foreach (var item in result)
|
||||
{
|
||||
|
@ -219,7 +214,7 @@ namespace AdapterTest.Mocks
|
|||
}
|
||||
}
|
||||
});
|
||||
return (listener.LocalEndpoint as IPEndPoint).Port;
|
||||
return (listener.LocalEndPoint as IPEndPoint).Port;
|
||||
}
|
||||
|
||||
public int RunJob(IRDDProxy rdd, IEnumerable<int> partitions)
|
||||
|
|
|
@ -14,6 +14,7 @@ using Microsoft.Spark.CSharp.Interop.Ipc;
|
|||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using AdapterTest.Mocks;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Streaming;
|
||||
using StreamingContext = Microsoft.Spark.CSharp.Streaming.StreamingContext;
|
||||
using Moq;
|
||||
|
@ -60,13 +61,13 @@ namespace AdapterTest
|
|||
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>())).Returns(_mockStreamingContextProxy.Object);
|
||||
_mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() =>
|
||||
{
|
||||
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var listener = SocketFactory.CreateSocket();
|
||||
listener.Listen();
|
||||
|
||||
Task.Run(() =>
|
||||
{
|
||||
using (Socket socket = listener.AcceptSocket())
|
||||
using (Stream ns = new NetworkStream(socket))
|
||||
using (var socket = listener.Accept())
|
||||
using (var ns = socket.GetStream())
|
||||
{
|
||||
foreach (var item in result)
|
||||
{
|
||||
|
@ -78,7 +79,7 @@ namespace AdapterTest
|
|||
}
|
||||
}
|
||||
});
|
||||
return (listener.LocalEndpoint as IPEndPoint).Port;
|
||||
return (listener.LocalEndPoint as IPEndPoint).Port;
|
||||
});
|
||||
_mockRddProxy.Setup(m => m.RDDCollector).Returns(new RDDCollector());
|
||||
|
||||
|
|
|
@ -2,20 +2,14 @@
|
|||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Diagnostics;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Linq;
|
||||
using System.Net.Sockets;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Threading;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
|
||||
|
@ -45,11 +39,11 @@ namespace Microsoft.Spark.CSharp
|
|||
try
|
||||
{
|
||||
// start TCP listening server
|
||||
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var listener = SocketFactory.CreateSocket();
|
||||
listener.Listen();
|
||||
|
||||
// get the local port and write it back to JVM side
|
||||
IPEndPoint endPoint = (IPEndPoint)listener.LocalEndpoint;
|
||||
IPEndPoint endPoint = (IPEndPoint)listener.LocalEndPoint;
|
||||
int localPort = endPoint.Port;
|
||||
byte[] bytes = SerDe.ToBytes(localPort);
|
||||
Stream outputStream = Console.OpenStandardOutput();
|
||||
|
@ -112,7 +106,7 @@ namespace Microsoft.Spark.CSharp
|
|||
/// Listen to the server socket and accept new TCP connection from JVM side. Then create new TaskRunner instance and
|
||||
/// add it to waitingTaskRunners queue.
|
||||
/// </summary>
|
||||
private void StartDaemonServer(TcpListener listener)
|
||||
private void StartDaemonServer(ISocketWrapper listener)
|
||||
{
|
||||
logger.LogInfo("StartDaemonServer ...");
|
||||
|
||||
|
@ -130,9 +124,9 @@ namespace Microsoft.Spark.CSharp
|
|||
|
||||
while (true)
|
||||
{
|
||||
Socket socket = listener.AcceptSocket();
|
||||
var socket = listener.Accept();
|
||||
logger.LogInfo("Connection accepted for taskRunnerId: {0}", trId);
|
||||
using (NetworkStream s = new NetworkStream(socket))
|
||||
using (var s = socket.GetStream())
|
||||
{
|
||||
SerDe.Write(s, trId); // write taskRunnerId to JVM side
|
||||
s.Flush();
|
||||
|
|
|
@ -2,27 +2,10 @@
|
|||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net.Sockets;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
using Microsoft.Spark.CSharp.Sql;
|
||||
using Razorvine.Pickle;
|
||||
using Razorvine.Pickle.Objects;
|
||||
|
||||
namespace Microsoft.Spark.CSharp
|
||||
{
|
||||
|
@ -46,14 +29,14 @@ namespace Microsoft.Spark.CSharp
|
|||
}
|
||||
|
||||
public int trId; // task runner Id
|
||||
private Socket socket; // socket to communicate with JVM
|
||||
private ISocketWrapper socket; // socket to communicate with JVM
|
||||
|
||||
private volatile bool stop = false;
|
||||
|
||||
// whether the socket can be reused to run multiple Spark tasks
|
||||
private bool socketReuse;
|
||||
|
||||
public TaskRunner(int trId, Socket socket, bool socketReuse)
|
||||
public TaskRunner(int trId, ISocketWrapper socket, bool socketReuse)
|
||||
{
|
||||
this.trId = trId;
|
||||
this.socket = socket;
|
||||
|
@ -68,7 +51,7 @@ namespace Microsoft.Spark.CSharp
|
|||
{
|
||||
while (!stop)
|
||||
{
|
||||
using (NetworkStream networkStream = new NetworkStream(socket))
|
||||
using (var networkStream = socket.GetStream())
|
||||
{
|
||||
byte[] bytes = SerDe.ReadBytes(networkStream, sizeof(int));
|
||||
if (bytes != null)
|
||||
|
|
|
@ -8,13 +8,12 @@ using System.IO;
|
|||
using System.Net;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net.Sockets;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Threading;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
using Microsoft.Spark.CSharp.Sql;
|
||||
using Razorvine.Pickle;
|
||||
|
@ -71,7 +70,7 @@ namespace Microsoft.Spark.CSharp
|
|||
|
||||
int javaPort = int.Parse(Console.ReadLine()); //reading port number written from JVM
|
||||
logger.LogDebug("Port number used to pipe in/out data between JVM and CLR {0}", javaPort);
|
||||
Socket socket = InitializeSocket(javaPort);
|
||||
var socket = InitializeSocket(javaPort);
|
||||
TaskRunner taskRunner = new TaskRunner(0, socket, false);
|
||||
taskRunner.Run();
|
||||
}
|
||||
|
@ -103,14 +102,14 @@ namespace Microsoft.Spark.CSharp
|
|||
}
|
||||
}
|
||||
|
||||
private static Socket InitializeSocket(int javaPort)
|
||||
private static ISocketWrapper InitializeSocket(int javaPort)
|
||||
{
|
||||
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
var socket = SocketFactory.CreateSocket();
|
||||
socket.Connect(IPAddress.Loopback, javaPort);
|
||||
return socket;
|
||||
}
|
||||
|
||||
public static bool ProcessStream(NetworkStream networkStream, int splitIndex)
|
||||
public static bool ProcessStream(Stream networkStream, int splitIndex)
|
||||
{
|
||||
logger.LogInfo(string.Format("Start of stream processing, splitIndex: {0}", splitIndex));
|
||||
bool readComplete = true; // Whether all input data from the socket is read though completely
|
||||
|
@ -195,7 +194,7 @@ namespace Microsoft.Spark.CSharp
|
|||
return readComplete;
|
||||
}
|
||||
|
||||
private static void ProcessIncludesItems(NetworkStream networkStream)
|
||||
private static void ProcessIncludesItems(Stream networkStream)
|
||||
{
|
||||
// fetch names of includes - not used //TODO - complete the impl
|
||||
int numberOfIncludesItems = SerDe.ReadInt(networkStream);
|
||||
|
@ -210,7 +209,7 @@ namespace Microsoft.Spark.CSharp
|
|||
}
|
||||
}
|
||||
|
||||
private static void ProcessBroadcastVariables(NetworkStream networkStream)
|
||||
private static void ProcessBroadcastVariables(Stream networkStream)
|
||||
{
|
||||
// fetch names and values of broadcast variables
|
||||
int numBroadcastVariables = SerDe.ReadInt(networkStream);
|
||||
|
@ -236,7 +235,7 @@ namespace Microsoft.Spark.CSharp
|
|||
}
|
||||
}
|
||||
|
||||
private static IFormatter ProcessCommand(NetworkStream networkStream, int splitIndex, DateTime bootTime)
|
||||
private static IFormatter ProcessCommand(Stream networkStream, int splitIndex, DateTime bootTime)
|
||||
{
|
||||
int lengthOfCommandByteArray = SerDe.ReadInt(networkStream);
|
||||
logger.LogDebug("command length: " + lengthOfCommandByteArray);
|
||||
|
@ -332,7 +331,7 @@ namespace Microsoft.Spark.CSharp
|
|||
return formatter;
|
||||
}
|
||||
|
||||
private static void WriteOutput(NetworkStream networkStream, string serializerMode, dynamic message, IFormatter formatter)
|
||||
private static void WriteOutput(Stream networkStream, string serializerMode, dynamic message, IFormatter formatter)
|
||||
{
|
||||
var buffer = GetSerializedMessage(serializerMode, message, formatter);
|
||||
if (buffer == null)
|
||||
|
@ -390,7 +389,7 @@ namespace Microsoft.Spark.CSharp
|
|||
}
|
||||
|
||||
|
||||
private static int ReadDiagnosticsInfo(NetworkStream networkStream)
|
||||
private static int ReadDiagnosticsInfo(Stream networkStream)
|
||||
{
|
||||
int rddId = SerDe.ReadInt(networkStream);
|
||||
int stageId = SerDe.ReadInt(networkStream);
|
||||
|
@ -399,7 +398,7 @@ namespace Microsoft.Spark.CSharp
|
|||
return stageId;
|
||||
}
|
||||
|
||||
private static void WriteDiagnosticsInfo(NetworkStream networkStream, DateTime bootTime, DateTime initTime)
|
||||
private static void WriteDiagnosticsInfo(Stream networkStream, DateTime bootTime, DateTime initTime)
|
||||
{
|
||||
DateTime finishTime = DateTime.UtcNow;
|
||||
const string format = "MM/dd/yyyy hh:mm:ss.fff tt";
|
||||
|
@ -414,7 +413,7 @@ namespace Microsoft.Spark.CSharp
|
|||
SerDe.Write(networkStream, 0L); //shuffle.DiskBytesSpilled
|
||||
}
|
||||
|
||||
private static void WriteAccumulatorValues(NetworkStream networkStream, IFormatter formatter)
|
||||
private static void WriteAccumulatorValues(Stream networkStream, IFormatter formatter)
|
||||
{
|
||||
SerDe.Write(networkStream, Accumulator.threadLocalAccumulatorRegistry.Count);
|
||||
foreach (var item in Accumulator.threadLocalAccumulatorRegistry)
|
||||
|
|
|
@ -2,28 +2,23 @@
|
|||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Collections.Specialized;
|
||||
using System.Text;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Sql;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace WorkerTest
|
||||
{
|
||||
/// <summary>
|
||||
/// Validates MultiThreadWorker by creating a TcpListener server to
|
||||
/// Validates MultiThreadWorker by creating a ISocketWrapper server to
|
||||
/// simulate interactions between CSharpRDD and CSharpWorker
|
||||
/// </summary>
|
||||
[TestFixture]
|
||||
|
@ -106,9 +101,9 @@ namespace WorkerTest
|
|||
/// Create new socket to simulate interaction between JVM and C#
|
||||
/// </summary>
|
||||
/// <param name="s"></param>
|
||||
private Socket CreateSocket(int serverPort)
|
||||
private ISocketWrapper CreateSocket(int serverPort)
|
||||
{
|
||||
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
var socket =SocketFactory.CreateSocket();
|
||||
socket.Connect(IPAddress.Loopback, serverPort);
|
||||
return socket;
|
||||
}
|
||||
|
@ -198,7 +193,7 @@ namespace WorkerTest
|
|||
Console.WriteLine("serverPort: {0}", serverPort);
|
||||
|
||||
using (var socket = CreateSocket(serverPort))
|
||||
using (var s = new NetworkStream(socket))
|
||||
using (var s = socket.GetStream())
|
||||
{
|
||||
int taskRunnerId = SerDe.ReadInt(s);
|
||||
Console.WriteLine("taskRunnerId: {0}", taskRunnerId);
|
||||
|
@ -245,7 +240,7 @@ namespace WorkerTest
|
|||
Console.WriteLine("serverPort: {0}", serverPort);
|
||||
|
||||
int num = 2;
|
||||
var sockets = new Socket[2];
|
||||
var sockets = new ISocketWrapper[2];
|
||||
var taskRunnerIds = new int[2];
|
||||
|
||||
for (int index = 0; index < num; index++)
|
||||
|
@ -255,7 +250,7 @@ namespace WorkerTest
|
|||
|
||||
for (int index = 0; index < num; index++)
|
||||
{
|
||||
using (var s = new NetworkStream(sockets[index]))
|
||||
using (var s = sockets[index].GetStream())
|
||||
{
|
||||
taskRunnerIds[index] = SerDe.ReadInt(s);
|
||||
|
||||
|
|
|
@ -8,13 +8,13 @@ using System.Diagnostics;
|
|||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Serialization.Formatters.Binary;
|
||||
using System.Text;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Sql;
|
||||
using Microsoft.Spark.CSharp.Interop.Ipc;
|
||||
using Microsoft.Spark.CSharp.Network;
|
||||
using NUnit.Framework;
|
||||
using Razorvine.Pickle;
|
||||
using Tests.Common;
|
||||
|
@ -22,7 +22,7 @@ using Tests.Common;
|
|||
namespace WorkerTest
|
||||
{
|
||||
/// <summary>
|
||||
/// Validates CSharpWorker by creating a TcpListener server to
|
||||
/// Validates CSharpWorker by creating a ISocketWrapper server to
|
||||
/// simulate interactions between CSharpRDD and CSharpWorker
|
||||
/// </summary>
|
||||
[TestFixture]
|
||||
|
@ -40,11 +40,11 @@ namespace WorkerTest
|
|||
StringBuilder output = new StringBuilder();
|
||||
private readonly object syncLock = new object();
|
||||
|
||||
private TcpListener CreateServer(out Process worker)
|
||||
private ISocketWrapper CreateServer(out Process worker)
|
||||
{
|
||||
TcpListener tcpListener = new TcpListener(IPAddress.Loopback, 0);
|
||||
tcpListener.Start();
|
||||
int port = (tcpListener.LocalEndpoint as IPEndPoint).Port;
|
||||
var tcpListener = SocketFactory.CreateSocket();
|
||||
tcpListener.Listen();
|
||||
int port = (tcpListener.LocalEndPoint as IPEndPoint).Port;
|
||||
|
||||
var exeLocation = Path.GetDirectoryName(new Uri(Assembly.GetExecutingAssembly().CodeBase).LocalPath) ?? ".";
|
||||
|
||||
|
@ -162,10 +162,10 @@ namespace WorkerTest
|
|||
public void TestWorkerSuccess()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
|
||||
|
@ -190,7 +190,7 @@ namespace WorkerTest
|
|||
|
||||
AssertWorker(worker);
|
||||
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -200,14 +200,14 @@ namespace WorkerTest
|
|||
public void TestWorkerReadIncomplete()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
const int num = 10;
|
||||
byte[] takeCommand = SparkContext.BuildCommand(new CSharpWorkerFunc((pid, iter) => iter.Take(num)),
|
||||
SerializedMode.String, SerializedMode.String);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
|
||||
|
@ -232,7 +232,7 @@ namespace WorkerTest
|
|||
|
||||
AssertWorker(worker, 0, "not all data is read");
|
||||
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -242,10 +242,10 @@ namespace WorkerTest
|
|||
public void TestWorkerIncompleteBytes()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
|
||||
|
@ -255,7 +255,7 @@ namespace WorkerTest
|
|||
|
||||
AssertWorker(worker, 0, "System.ArgumentException: Incomplete bytes read: ");
|
||||
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -265,10 +265,10 @@ namespace WorkerTest
|
|||
public void TestWorkerIncompleteData()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
|
||||
|
@ -289,7 +289,7 @@ namespace WorkerTest
|
|||
|
||||
AssertWorker(worker, 0, "System.NullReferenceException: Object reference not set to an instance of an object.");
|
||||
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -299,11 +299,11 @@ namespace WorkerTest
|
|||
public void TestWorkerWithRowDeserializedModeAndBytesSerializedMode()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
const int expectedCount = 5;
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
byte[] commandWithRowDeserializeMode =
|
||||
|
@ -342,17 +342,17 @@ namespace WorkerTest
|
|||
}
|
||||
|
||||
AssertWorker(worker);
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void TestWorkerWithRawDeserializedModeAndBytesSerializedMode()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
byte[] commandWithRawDeserializeMode = SparkContext.BuildCommand(new CSharpWorkerFunc((pid, iter) => iter), SerializedMode.None, SerializedMode.None);
|
||||
|
@ -383,6 +383,9 @@ namespace WorkerTest
|
|||
Assert.AreEqual(payloadCollection.Length, receivedElementIndex);
|
||||
|
||||
}
|
||||
|
||||
AssertWorker(worker);
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
|
||||
|
@ -394,10 +397,10 @@ namespace WorkerTest
|
|||
{
|
||||
const int expectedCount = 100;
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
byte[] command = SparkContext.BuildCommand(new CSharpWorkerFunc((pid, iter) => iter), SerializedMode.Byte, SerializedMode.Row);
|
||||
|
@ -434,7 +437,7 @@ namespace WorkerTest
|
|||
}
|
||||
|
||||
AssertWorker(worker);
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -445,10 +448,10 @@ namespace WorkerTest
|
|||
{
|
||||
const int expectedCount = 100;
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
byte[] command = SparkContext.BuildCommand(
|
||||
|
@ -489,8 +492,7 @@ namespace WorkerTest
|
|||
}
|
||||
|
||||
AssertWorker(worker);
|
||||
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -500,11 +502,11 @@ namespace WorkerTest
|
|||
public void TestBroadcastVariablesInWorker()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
string assertMessage;
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
SerDe.Write(s, splitIndex);
|
||||
SerDe.Write(s, ver);
|
||||
|
@ -548,7 +550,7 @@ namespace WorkerTest
|
|||
}
|
||||
|
||||
AssertWorker(worker, 0, assertMessage);
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -620,10 +622,10 @@ namespace WorkerTest
|
|||
public void TestAccumulatorInWorker()
|
||||
{
|
||||
Process worker;
|
||||
TcpListener CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
var CSharpRDD_SocketServer = CreateServer(out worker);
|
||||
|
||||
using (var serverSocket = CSharpRDD_SocketServer.AcceptSocket())
|
||||
using (var s = new NetworkStream(serverSocket))
|
||||
using (var serverSocket = CSharpRDD_SocketServer.Accept())
|
||||
using (var s = serverSocket.GetStream())
|
||||
{
|
||||
WritePayloadHeaderToWorker(s);
|
||||
const int accumulatorId = 1001;
|
||||
|
@ -663,7 +665,7 @@ namespace WorkerTest
|
|||
}
|
||||
|
||||
AssertWorker(worker);
|
||||
CSharpRDD_SocketServer.Stop();
|
||||
CSharpRDD_SocketServer.Close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче