This commit is contained in:
tawan0109 2016-07-18 14:26:01 +08:00
Родитель 0bf617ae4a bf210f47f0
Коммит 8cbdf30ed1
16 изменённых файлов: 378 добавлений и 40 удалений

Просмотреть файл

@ -149,6 +149,7 @@
<Compile Include="Streaming\Kafka.cs" />
<Compile Include="Streaming\MapWithStateDStream.cs" />
<Compile Include="Streaming\PairDStreamFunctions.cs" />
<Compile Include="Streaming\CSharpInputDStreamUtils.cs" />
<Compile Include="Streaming\StreamingContext.cs" />
<Compile Include="Streaming\TransformedDStream.cs" />
</ItemGroup>

Просмотреть файл

@ -234,7 +234,7 @@ namespace Microsoft.Spark.CSharp.Network
var token = HeapAlloc(GetProcessHeap(), 0, chunkSize);
if (token == IntPtr.Zero)
{
throw new OutOfMemoryException();
throw new OutOfMemoryException("Failed to allocate memory by calling HeapAlloc()");
}
// register this heap buffer to RIO buffer
@ -242,7 +242,7 @@ namespace Microsoft.Spark.CSharp.Network
if (bufferId == IntPtr.Zero)
{
FreeToProcessHeap(token);
throw new Exception("Failed to register RIO buffer");
throw new Exception(string.Format("Failed to register RIO buffer with error code {0}", Marshal.GetLastWin32Error()));
}
try

Просмотреть файл

@ -119,14 +119,22 @@ namespace Microsoft.Spark.CSharp.Network
}
// Add a new chunk and allocate a segment from it.
var chunk = ByteBufChunk.NewChunk(this, SegmentSize, ChunkSize, isUnsafe);
if (!chunk.Allocate(out byteBuf))
try
{
logger.LogError("Failed to allocate a ByteBuf from a new ByteBufChunk. {0}", chunk);
var chunk = ByteBufChunk.NewChunk(this, SegmentSize, ChunkSize, isUnsafe);
if (!chunk.Allocate(out byteBuf))
{
logger.LogError("Failed to allocate a ByteBuf from a new ByteBufChunk - isUnsafe [{0}].", isUnsafe);
return null;
}
qInit.Add(chunk);
return byteBuf;
}
catch (Exception e)
{
logger.LogException(e);
return null;
}
qInit.Add(chunk);
return byteBuf;
}
/// <summary>

Просмотреть файл

@ -499,6 +499,13 @@ namespace Microsoft.Spark.CSharp.Network
return;
}
if (status == 0 && byteTransferred == 0)
{
// The remote has gracefully closed the connection
logger.LogDebug("ProcessReceive() with status(0) and byteTransferred(0). The connection has been gracefully closed.");
return;
}
// Posts another receive operation
DoReceive();
}

Просмотреть файл

@ -2,8 +2,8 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
namespace Microsoft.Spark.CSharp.Network
{
@ -119,7 +119,7 @@ namespace Microsoft.Spark.CSharp.Network
/// </returns>
public override int ReadByte()
{
if (!recvDataCache.IsReadable())
if (recvDataCache == null || !recvDataCache.IsReadable())
{
recvDataCache = streamSocket.Receive();
}
@ -136,30 +136,48 @@ namespace Microsoft.Spark.CSharp.Network
/// <returns>Number of bytes we read.</returns>
public override int Read(byte[] buffer, int offset, int count)
{
int bytesRemaining = count;
if (recvDataCache == null || !recvDataCache.IsReadable())
try
{
recvDataCache = streamSocket.Receive();
}
if (recvDataCache == null)
{
recvDataCache = streamSocket.Receive();
}
while (recvDataCache.IsReadable() && bytesRemaining > 0)
{
var bytesToRead = Math.Min(bytesRemaining, recvDataCache.ReadableBytes);
var n = recvDataCache.ReadBytes(buffer, offset + count - bytesRemaining, bytesToRead);
if (!recvDataCache.IsReadable())
{
recvDataCache.Release();
recvDataCache = null;
return 0;
}
var bytesRemaining = count;
while (recvDataCache != null && recvDataCache.IsReadable() && bytesRemaining > 0)
{
var bytesToRead = Math.Min(bytesRemaining, recvDataCache.ReadableBytes);
bytesRemaining -= recvDataCache.ReadBytes(buffer, offset + count - bytesRemaining, bytesToRead);
if (recvDataCache.IsReadable()) continue;
recvDataCache.Release();
recvDataCache = null;
if (streamSocket.HasData)
{
recvDataCache = streamSocket.Receive();
}
}
bytesRemaining -= n;
return count - bytesRemaining;
}
catch (Exception e)
{
if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException)
{
throw;
}
return count - bytesRemaining;
// some sort of error occurred on the socket call,
// set the SocketException as InnerException and throw
throw new IOException(string.Format("Unable to read data from the transport connection: {0}.", e.Message), e);
}
}
/// <summary>
@ -170,14 +188,28 @@ namespace Microsoft.Spark.CSharp.Network
/// <param name="count">Number of bytes to write.</param>
public override void Write(byte[] buffer, int offset, int count)
{
var remainingBytes = count;
while (0 < remainingBytes)
try
{
var sendBuffer = bufPool.Allocate();
var sendCount = Math.Min(sendBuffer.WritableBytes, remainingBytes);
sendBuffer.WriteBytes(buffer, offset, sendCount);
streamSocket.Send(sendBuffer);
remainingBytes -= sendCount;
var remainingBytes = count;
while (0 < remainingBytes)
{
var sendBuffer = bufPool.Allocate();
var sendCount = Math.Min(sendBuffer.WritableBytes, remainingBytes);
sendBuffer.WriteBytes(buffer, offset, sendCount);
streamSocket.Send(sendBuffer);
remainingBytes -= sendCount;
}
}
catch (Exception e)
{
if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException)
{
throw;
}
// some sort of error occurred on the socked call,
// set the SocketException as InnerException and throw
throw new IOException(string.Format("Unable to write data to the transport connection: {0}.", e.Message), e);
}
}
}

Просмотреть файл

@ -32,6 +32,7 @@ namespace Microsoft.Spark.CSharp.Proxy
IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string serializationMode);
IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string className, string serializationMode, string serializationMode2);
IDStreamProxy CreateConstantInputDStream(IRDDProxy rddProxy);
IDStreamProxy CreateCSharpInputDStream(byte[] func, string serializationMode);
IDStreamProxy EventHubsUnionStream(Dictionary<string, string> eventHubsParams, StorageLevelType storageLevelType);
}

Просмотреть файл

@ -168,6 +168,23 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
return new DStreamIpcProxy(javaDStreamReference, jvmDStreamReference);
}
public IDStreamProxy CreateCSharpInputDStream(byte[] func, string serializationMode)
{
var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor(
"org.apache.spark.streaming.api.csharp.CSharpInputDStream",
new object[]
{
jvmStreamingContextReference,
func,
serializationMode
});
var javaDStreamReference =
new JvmObjectReference((String)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "asJavaDStream"));
return new DStreamIpcProxy(javaDStreamReference, jvmDStreamReference);
}
public IDStreamProxy TextFileStream(string directory)
{
var jstream = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaStreamingReference, "textFileStream", new object[] { directory }).ToString());

Просмотреть файл

@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -11,17 +13,15 @@ namespace Microsoft.Spark.CSharp.Services
/// </summary>
public class LoggerServiceFactory
{
private static ILoggerService loggerService = DefaultLoggerService.Instance;
private static Lazy<ILoggerService> loggerService = new Lazy<ILoggerService>(() => GetDefaultLogger());
/// <summary>
/// Overrides an existing logger by a given logger service instance
/// </summary>
/// <param name="loggerServiceOverride">The logger service instance used to overrides</param>
public static void SetLoggerService(ILoggerService loggerServiceOverride)
{
loggerService = loggerServiceOverride;
var logger = GetLogger(typeof(LoggerServiceFactory));
logger.LogInfo("Logger service configured to use {0}", logger.GetType().Name);
loggerService = new Lazy<ILoggerService>(() => loggerServiceOverride);
}
/// <summary>
@ -31,7 +31,22 @@ namespace Microsoft.Spark.CSharp.Services
/// <returns>An instance of logger service</returns>
public static ILoggerService GetLogger(Type type)
{
return loggerService.GetLoggerInstance(type);
return loggerService.Value.GetLoggerInstance(type);
}
/// <summary>
/// if there exists xxx.exe.config file and log4net settings, then use log4net
/// </summary>
/// <returns></returns>
private static ILoggerService GetDefaultLogger()
{
if (File.Exists(AppDomain.CurrentDomain.SetupInformation.ConfigurationFile)
&& ConfigurationManager.GetSection("log4net") != null)
{
return Log4NetLoggerService.Instance;
}
return DefaultLoggerService.Instance;
}
}
}

Просмотреть файл

@ -0,0 +1,131 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.IO;
using System.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Proxy.Ipc;
namespace Microsoft.Spark.CSharp.Streaming
{
/// <summary>
/// Utils for csharp input stream.
/// </summary>
public class CSharpInputDStreamUtils
{
/// <summary>
/// Create an input stream that user can control the data injection by C# code
/// </summary>
/// <param name="ssc">Spark Streaming Context</param>
/// <param name="func">
/// function provided by user to inject data to the DStream.
/// it should return a RDD for each batch interval
/// </param>
/// <returns>A DStream object</returns>
public static DStream<T> CreateStream<T>(StreamingContext ssc, Func<double, RDD<T>> func)
{
Func<double, RDD<dynamic>, RDD<dynamic>> csharpFunc = new CSharpInputDStreamTransformRDDHelper<T>(func).Execute;
var formatter = new BinaryFormatter();
var stream = new MemoryStream();
formatter.Serialize(stream, csharpFunc);
var dstreamProxy = ssc.streamingContextProxy.CreateCSharpInputDStream(stream.ToArray(), SerializedMode.Byte.ToString());
return new DStream<T>(dstreamProxy, ssc, SerializedMode.Byte);
}
/// <summary>
/// Create an input stream that user can control the data injection by C# code
/// </summary>
/// <param name="ssc">Spark Streaming Context</param>
/// <param name="numPartitions">number of partitions</param>
/// <param name="func">
/// function provided by user to inject data to the DStream.
/// it has two input parameters: time and partitionIndex
/// it should return IEnumerable of injected data
/// </param>
/// <returns>A DStream object</returns>
public static DStream<T> CreateStream<T>(StreamingContext ssc, int numPartitions, Func<double, int, IEnumerable<T>> func)
{
Func<double, RDD<T>> generateRDDFunc = new CSharpInputDStreamGenerateRDDHelper<T>(numPartitions, func).Execute;
return CreateStream<T>(ssc, generateRDDFunc);
}
}
/// <summary>
/// This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
/// private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers
/// for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation
/// on the serializability of compiler generated types
/// </summary>
[Serializable]
internal class CSharpInputDStreamTransformRDDHelper<T>
{
private Func<double, RDD<T>> func;
public CSharpInputDStreamTransformRDDHelper(Func<double, RDD<T>> func)
{
this.func = func;
}
internal RDD<dynamic> Execute(double t, RDD<dynamic> rdd)
{
return func(t).ConvertTo<dynamic>();
}
}
/// <summary>
/// This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
/// private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers
/// for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation
/// on the serializability of compiler generated types
/// </summary>
[Serializable]
internal class CSharpInputDStreamMapPartitionWithIndexHelper<T>
{
Func<double, int, IEnumerable <T>> func;
double time;
public CSharpInputDStreamMapPartitionWithIndexHelper(double time, Func<double, int, IEnumerable<T>> func)
{
this.time = time;
this.func = func;
}
internal IEnumerable<T> Execute(int partitionIndex, IEnumerable<int> input)
{
return func(time, partitionIndex);
}
}
/// <summary>
/// This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
/// private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers
/// for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation
/// on the serializability of compiler generated types
/// </summary>
[Serializable]
internal class CSharpInputDStreamGenerateRDDHelper<T>
{
private Func<double, int, IEnumerable<T>> func;
private int numPartitions;
public CSharpInputDStreamGenerateRDDHelper(int numPartitions, Func<double, int, IEnumerable<T>> func)
{
this.numPartitions = numPartitions;
this.func = func;
}
internal RDD<T> Execute(double t)
{
var sc = SparkContext.GetActiveSparkContext();
int[] array = new int[numPartitions];
var initialRdd = sc.Parallelize(array.AsEnumerable(), numPartitions);
return initialRdd.MapPartitionsWithIndex<T>(new CSharpInputDStreamMapPartitionWithIndexHelper<T>(t, func).Execute, true);
}
}
}

Просмотреть файл

@ -437,5 +437,34 @@ namespace AdapterTest
Assert.IsNotNull(constantInputDStream);
Assert.AreEqual(ssc, constantInputDStream.streamingContext);
}
[Test]
public void TestCSharpInputDStream()
{
// test create CSharpInputDStream
var sc = new SparkContext("", "");
var ssc = new StreamingContext(sc, 1);
Func<double, int, IEnumerable<string>> func =
(double time, int pid) =>
{
var list = new List<string>() { string.Format("PluggableInputDStream-{0}-{1}", pid, time) };
return list.AsEnumerable();
};
const int numPartitions = 5;
var inputDStream = CSharpInputDStreamUtils.CreateStream<string>(
ssc,
numPartitions,
func);
Assert.IsNotNull(inputDStream);
Assert.AreEqual(ssc, inputDStream.streamingContext);
// test CSharpInputDStreamMapPartitionWithIndexHelper
int[] array = new int[numPartitions];
int partitionIndex = 0;
new CSharpInputDStreamMapPartitionWithIndexHelper<string>(0.0, func).Execute(partitionIndex, array.AsEnumerable());
// test CSharpInputDStreamGenerateRDDHelper
new CSharpInputDStreamGenerateRDDHelper<string>(numPartitions, func).Execute(0.0);
}
}
}

Просмотреть файл

@ -114,6 +114,11 @@ namespace AdapterTest.Mocks
return new MockDStreamProxy();
}
public IDStreamProxy CreateCSharpInputDStream(byte[] func, string serializationMode)
{
return new MockDStreamProxy();
}
public IDStreamProxy EventHubsUnionStream(Dictionary<string, string> eventHubsParams, StorageLevelType storageLevelType)
{
throw new NotImplementedException();

Просмотреть файл

@ -291,6 +291,45 @@ namespace Microsoft.Spark.CSharp
ssc.Start();
ssc.AwaitTermination();
}
[Sample("experimental")]
internal static void DStreamCSharpInputSample()
{
const int bacthInterval = 2;
const int numPartitions = 5;
var sc = SparkCLRSamples.SparkContext;
var ssc = new StreamingContext(sc, bacthInterval);
var inputDStream = CSharpInputDStreamUtils.CreateStream<string>(
ssc,
numPartitions,
(double time, int pid) =>
{
var list = new List<string>() { string.Format("PluggableInputDStream-{0}-{1}", pid, time) };
return list.AsEnumerable();
});
inputDStream.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
int partitions = rdd.GetNumPartitions();
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Time: {0}", time);
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Count: " + taken.Length);
Console.WriteLine("Partitions: " + partitions);
foreach (object record in taken)
{
Console.WriteLine(record);
}
});
ssc.Start();
ssc.AwaitTermination();
}
}

Просмотреть файл

@ -183,19 +183,21 @@ namespace WorkerTest
/// <param name="exitCode"></param>
private void AssertWorker(Process worker, int exitCode = 0, string errorMessage = null)
{
if (!worker.WaitForExit(3000))
if (!worker.WaitForExit(30000))
{
Console.WriteLine("Time out for worker.WaitForExit(). Force to kill worker process.");
worker.Kill();
}
string str;
lock (syncLock)
{
str = output.ToString();
Console.WriteLine("output from server: {0}", str);
}
Assert.IsTrue(errorMessage == null || str.Contains(errorMessage),
string.Format("Actual output from worker: {0}{1}", Environment.NewLine, str));
Assert.IsTrue(worker.HasExited);
Assert.AreEqual(exitCode, worker.ExitCode);
Assert.IsTrue(errorMessage == null || str.Contains(errorMessage));
}

Просмотреть файл

@ -59,6 +59,10 @@ namespace WorkerTest
{
RioNative.UnloadRio();
}
// Reset Socket wrapper to default
Environment.SetEnvironmentVariable(ConfigurationService.CSharpSocketTypeEnvName, "Normal");
SocketFactory.SocketWrapperType = SocketWrapperType.None;
}
// StringBuilder is not thread-safe, it shouldn't be used concurrently from different threads.
@ -170,18 +174,21 @@ namespace WorkerTest
/// <param name="exitCode"></param>
private void AssertWorker(Process worker, int exitCode = 0, string assertMessage = null)
{
if (!worker.WaitForExit(3000))
if (!worker.WaitForExit(6000))
{
Console.WriteLine("Time out for worker.WaitForExit(). Force to kill worker process.");
worker.Kill();
}
Assert.IsTrue(worker.HasExited);
Assert.AreEqual(exitCode, worker.ExitCode);
string str;
lock (syncLock)
{
str = output.ToString();
}
Assert.IsTrue(assertMessage == null || str.Contains(assertMessage));
Assert.IsTrue(assertMessage == null || str.Contains(assertMessage),
string.Format("Actual output from worker: {0}{1}", Environment.NewLine, str));
Assert.IsTrue(worker.HasExited);
Assert.AreEqual(exitCode, worker.ExitCode);
}
/// <summary>

Просмотреть файл

@ -369,6 +369,30 @@ class CSharpConstantInputDStream(ssc_ : StreamingContext, rdd: RDD[Array[Byte]])
val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
}
/**
* An pluggalbe input stream that user can control the data injection by C# code
*/
class CSharpInputDStream(
ssc_ : StreamingContext,
cSharpFunc: Array[Byte],
serializationMode: String
) extends InputDStream[Array[Byte]](ssc_) {
override def start(): Unit = {}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
if (cSharpFunc != null && !cSharpFunc.isEmpty) {
CSharpDStream.callCSharpTransform(List(None), validTime, cSharpFunc, List(serializationMode))
} else {
None
}
}
val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
}
case class RddPreComputeRecord[T] (
rddSeqNum: Long,
rdd: RDD[T])

Просмотреть файл

@ -163,3 +163,23 @@ class CSharpDStreamSuite extends SparkCLRFunSuite with BeforeAndAfterAll with Be
}
}
}
class CSharpInputDStreamSuite extends SparkCLRFunSuite {
test("create CSharpInputDStream") {
val conf = new SparkConf().setAppName("CSharpInputDStreamTest").setMaster("local").set("spark.testing", "true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, new Duration(1000))
try {
val dStream = new CSharpInputDStream(ssc, null, "None")
assert(null != dStream)
dStream.start()
val rdd = dStream.compute(new Time(0))
assert(rdd.isEmpty)
assert(null != dStream.asJavaDStream)
dStream.stop()
} finally {
sc.stop()
}
}
}