add null key support for SerializedMode.Pair and sharable backend handler

This commit is contained in:
renyi 2015-11-16 08:54:27 -08:00
Родитель e6c5c11bf9
Коммит 9996508efa
3 изменённых файлов: 7 добавлений и 5 удалений

Просмотреть файл

@ -176,7 +176,7 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <returns>A DStream object</returns>
public DStream<KeyValuePair<byte[], byte[]>> DirectKafkaStream(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
{
return new DStream<KeyValuePair<byte[], byte[]>>(this.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), this);
return new DStream<KeyValuePair<byte[], byte[]>>(this.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), this, SerializedMode.Pair);
}
/// <summary>

Просмотреть файл

@ -282,9 +282,9 @@ namespace Microsoft.Spark.CSharp
{
messageLength = socket.ReadInt();
if (messageLength > 0)
if (messageLength != END_OF_DATA_SECTION && (messageLength > 0 || serializedMode == "Pair"))
{
byte[] buffer = socket.ReadBytes(messageLength);
byte[] buffer = messageLength > 0 ? socket.ReadBytes(messageLength) : null;
switch (serializedMode)
{
case "String":
@ -301,7 +301,7 @@ namespace Microsoft.Spark.CSharp
case "Pair":
messageLength = socket.ReadInt();
byte[] value = socket.ReadBytes(messageLength);
byte[] value = messageLength > 0 ? socket.ReadBytes(messageLength) : null;
yield return new KeyValuePair<byte[], byte[]>(buffer, value);
break;
@ -314,7 +314,7 @@ namespace Microsoft.Spark.CSharp
}
}
} while (messageLength >= 0);
} while (messageLength != END_OF_DATA_SECTION);
}
private static void Log(string message)

Просмотреть файл

@ -7,6 +7,7 @@ import org.apache.spark.util.Utils
import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream, ByteArrayInputStream}
import java.net.Socket
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import org.apache.spark.api.csharp.SerDe._ //TODO - work with SparkR devs to make this configurable and reuse RBackendHandler
@ -20,6 +21,7 @@ import scala.collection.mutable.HashMap
*/
// Since SparkCLR is a package to Spark and not a part of spark-core it mirrors the implementation of
// selected parts from RBackend with SparkCLR customizations
@Sharable
class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHandler[Array[Byte]] {
override def channelRead0(ctx: ChannelHandlerContext, msg: Array[Byte]): Unit = {