optimize DStream transformation by removing extra CSharpRDD introduced by current implementation from python

This commit is contained in:
renyi 2015-12-11 10:01:02 -08:00
Родитель 0bc83339a1
Коммит 4162263823
8 изменённых файлов: 77 добавлений и 22 удалений

Просмотреть файл

@ -22,7 +22,7 @@ namespace Microsoft.Spark.CSharp.Proxy
IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string deserializer);
IDStreamProxy CreateCSharpTransformed2DStream(IDStreamProxy jdstream, IDStreamProxy jother, byte[] func, string deserializer, string deserializerOther);
IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string deserializer);
IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer);
IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer, string deserializer2);
bool CheckpointExists(string checkpointPath);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
IStreamingContextProxy CreateStreamingContext(string checkpointPath);

Просмотреть файл

@ -119,10 +119,10 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
return new DStreamIpcProxy(javaDStreamReference, jvmDStreamReference);
}
public IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer)
public IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer, string deserializer2)
{
var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.csharp.CSharpStateDStream",
new object[] { (jdstream as DStreamIpcProxy).jvmDStreamReference, func, deserializer });
new object[] { (jdstream as DStreamIpcProxy).jvmDStreamReference, func, deserializer, deserializer2 });
var javaDStreamReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "asJavaDStream"));
return new DStreamIpcProxy(javaDStreamReference, jvmDStreamReference);

Просмотреть файл

@ -51,6 +51,14 @@ namespace Microsoft.Spark.CSharp.Streaming
internal bool isCheckpointed;
internal virtual IDStreamProxy DStreamProxy { get { return dstreamProxy; } }
internal bool Piplinable
{
get
{
return this is TransformedDStream<T> && !isCached && !isCheckpointed;
}
}
/// <summary>
/// Return the slideDuration in seconds of this DStream
@ -311,18 +319,21 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <returns></returns>
public DStream<V> TransformWith<U, V>(Func<double, RDD<T>, RDD<U>, RDD<V>> f, DStream<U> other, bool keepSerializer = false)
{
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new TransformWithDynamicHelper<T, U, V>(f).Execute;
Func<double, RDD<dynamic>, RDD<dynamic>> prevF = this.Piplinable ? (this as TransformedDStream<T>).func : null;
Func<double, RDD<dynamic>, RDD<dynamic>> otherF = other.Piplinable ? (other as TransformedDStream<U>).func : null;
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new TransformWithDynamicHelper<T, U, V>(f, prevF, otherF).Execute;
var formatter = new BinaryFormatter();
var stream = new MemoryStream();
formatter.Serialize(stream, func);
return new DStream<V>(SparkCLREnvironment.SparkCLRProxy.CreateCSharpTransformed2DStream(
DStreamProxy,
other.DStreamProxy,
stream.ToArray(),
serializedMode.ToString(),
other.serializedMode.ToString()),
this.Piplinable ? prevDStreamProxy : DStreamProxy,
other.Piplinable ? other.prevDStreamProxy : other.DStreamProxy,
stream.ToArray(),
(this.Piplinable ? prevSerializedMode : serializedMode).ToString(),
(other.Piplinable ? other.prevSerializedMode : other.serializedMode).ToString()),
streamingContext,
keepSerializer ? serializedMode : SerializedMode.Byte);
}
@ -536,13 +547,24 @@ namespace Microsoft.Spark.CSharp.Streaming
internal class TransformWithDynamicHelper<T, U, V>
{
private readonly Func<double, RDD<T>, RDD<U>, RDD<V>> func;
internal TransformWithDynamicHelper(Func<double, RDD<T>, RDD<U>, RDD<V>> f)
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> otherFunc;
internal TransformWithDynamicHelper(Func<double, RDD<T>, RDD<U>, RDD<V>> f, Func<double, RDD<dynamic>, RDD<dynamic>> prevF, Func<double, RDD<dynamic>, RDD<dynamic>> otherF)
{
func = f;
prevFunc = prevF;
otherFunc = otherF;
}
internal RDD<dynamic> Execute(double t, RDD<dynamic> rdd1, RDD<dynamic> rdd2)
{
if (prevFunc != null)
rdd1 = prevFunc(t, rdd1);
if (otherFunc != null)
rdd2 = otherFunc(t, rdd2);
return func(t, rdd1.ConvertTo<T>(), rdd2.ConvertTo<U>()).ConvertTo<dynamic>();
}
}

Просмотреть файл

@ -275,7 +275,9 @@ namespace Microsoft.Spark.CSharp.Streaming
// dstream to be transformed by substracting old RDDs and adding new RDDs based on the window
var reduced = self.ReduceByKey(reduceFunc, numPartitions);
var helper = new ReduceByKeyAndWindowHelper<K, V>(reduceFunc, invReduceFunc, numPartitions, filterFunc);
Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc = reduced.Piplinable ? (reduced as TransformedDStream<KeyValuePair<K, V>>).func : null;
var helper = new ReduceByKeyAndWindowHelper<K, V>(reduceFunc, invReduceFunc, numPartitions, filterFunc, prevFunc);
// function to reduce the new values that entered the window (e.g., adding new counts)
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> reduceF = helper.Reduce;
@ -295,11 +297,12 @@ namespace Microsoft.Spark.CSharp.Streaming
return new DStream<KeyValuePair<K, V>>(
SparkCLREnvironment.SparkCLRProxy.CreateCSharpReducedWindowedDStream(
reduced.DStreamProxy, stream.ToArray(),
reduced.Piplinable ? reduced.prevDStreamProxy : reduced.DStreamProxy,
stream.ToArray(),
invStream == null ? null : invStream.ToArray(),
windowSeconds,
slideSeconds,
null),
(reduced.Piplinable ? reduced.prevSerializedMode : reduced.serializedMode).ToString()),
self.streamingContext
);
}
@ -322,16 +325,19 @@ namespace Microsoft.Spark.CSharp.Streaming
if (numPartitions <= 0)
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeyHelper<K, V, S>(updateFunc, numPartitions).Execute;
Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc = self.Piplinable ? (self as TransformedDStream<KeyValuePair<K, V>>).func : null;
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeyHelper<K, V, S>(updateFunc, prevFunc, numPartitions).Execute;
var formatter = new BinaryFormatter();
var stream = new MemoryStream();
formatter.Serialize(stream, func);
return new DStream<KeyValuePair<K, S>>(SparkCLREnvironment.SparkCLRProxy.CreateCSharpStateDStream(
self.DStreamProxy,
self.Piplinable ? self.prevDStreamProxy : self.DStreamProxy,
stream.ToArray(),
self.serializedMode.ToString()),
self.serializedMode.ToString(),
(self.Piplinable ? self.prevSerializedMode : self.serializedMode).ToString()),
self.streamingContext);
}
}
@ -506,20 +512,32 @@ namespace Microsoft.Spark.CSharp.Streaming
private readonly Func<V, V, V> invReduceFunc;
private readonly int numPartitions;
private readonly Func<KeyValuePair<K, V>, bool> filterFunc;
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
internal ReduceByKeyAndWindowHelper(Func<V, V, V> reduceF, Func<V, V, V> invReduceF, int numPartitions, Func<KeyValuePair<K, V>, bool> filterF)
internal ReduceByKeyAndWindowHelper(Func<V, V, V> reduceF,
Func<V, V, V> invReduceF,
int numPartitions,
Func<KeyValuePair<K, V>, bool> filterF,
Func<double, RDD<dynamic>, RDD<dynamic>> prevF)
{
reduceFunc = reduceF;
invReduceFunc = invReduceF;
this.numPartitions = numPartitions;
filterFunc = filterF;
prevFunc = prevF;
}
internal RDD<dynamic> Reduce(double t, RDD<dynamic> a, RDD<dynamic> b)
{
if (prevFunc != null)
b = prevFunc(t, b);
var r = b.ConvertTo<KeyValuePair<K, V>>().ReduceByKey<K, V>(reduceFunc);
if (a != null)
{
if (prevFunc != null)
a = prevFunc(t, a);
r = a.ConvertTo<KeyValuePair<K, V>>().Union(r).ReduceByKey<K, V>(reduceFunc);
}
if (filterFunc != null)
@ -529,6 +547,12 @@ namespace Microsoft.Spark.CSharp.Streaming
internal RDD<dynamic> InvReduce(double t, RDD<dynamic> a, RDD<dynamic> b)
{
if (prevFunc != null)
{
a = prevFunc(t, a);
b = prevFunc(t, b);
}
var rddb = b.ConvertTo<KeyValuePair<K, V>>().ReduceByKey<K, V>(reduceFunc);
var rdda = a.ConvertTo<KeyValuePair<K, V>>();
var joined = rdda.Join<K, V, V>(rddb, numPartitions);
@ -541,10 +565,12 @@ namespace Microsoft.Spark.CSharp.Streaming
internal class UpdateStateByKeyHelper<K, V, S>
{
private readonly Func<IEnumerable<V>, S, S> func;
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
private readonly int numPartitions;
internal UpdateStateByKeyHelper(Func<IEnumerable<V>, S, S> f, int numPartitions)
internal UpdateStateByKeyHelper(Func<IEnumerable<V>, S, S> f, Func<double, RDD<dynamic>, RDD<dynamic>> prevF, int numPartitions)
{
func = f;
prevFunc = prevF;
this.numPartitions = numPartitions;
}
@ -553,6 +579,9 @@ namespace Microsoft.Spark.CSharp.Streaming
RDD<KeyValuePair<K, S>> state = null;
RDD<KeyValuePair<K, Tuple<List<V>, S>>> g = null;
if (prevFunc != null)
valuesRDD = prevFunc(t, valuesRDD);
var values = valuesRDD.ConvertTo<KeyValuePair<K, V>>();
if (stateRDD == null)

Просмотреть файл

@ -26,7 +26,7 @@ namespace Microsoft.Spark.CSharp.Streaming
[Serializable]
internal class TransformedDStream<U> : DStream<U>
{
protected Func<double, RDD<dynamic>, RDD<dynamic>> func;
internal Func<double, RDD<dynamic>, RDD<dynamic>> func;
private Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
internal void Init<T>(DStream<T> prev, Func<double, RDD<dynamic>, RDD<dynamic>> f)

Просмотреть файл

@ -253,6 +253,9 @@ namespace AdapterTest
}
});
// disable pipeline to UpdateStateByKey which replys on checkpoint mock proxy doesn't support
pairs.Cache();
var state = pairs.UpdateStateByKey<string, int, int>((v, s) => s + (v as List<int>).Count);
state.ForeachRDD((time, rdd) =>
{

Просмотреть файл

@ -87,7 +87,7 @@ namespace AdapterTest.Mocks
}
public IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer)
public IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer, string deserializer2)
{
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> f = (Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>)formatter.Deserialize(new MemoryStream(func));
RDD<dynamic> rdd = f(DateTime.UtcNow.Ticks,

Просмотреть файл

@ -234,7 +234,8 @@ class CSharpReducedWindowedDStream(
class CSharpStateDStream(
parent: DStream[Array[Byte]],
reduceFunc: Array[Byte],
deserializer: String)
deserializer: String,
deserializer2: String)
extends DStream[Array[Byte]](parent.ssc) {
super.persist(StorageLevel.MEMORY_ONLY)
@ -250,7 +251,7 @@ class CSharpStateDStream(
val rdd = parent.getOrCompute(validTime)
if (rdd.isDefined) {
CSharpDStream.callCSharpTransform(List(lastState, rdd), validTime, reduceFunc,
List(deserializer, deserializer))
List(deserializer, deserializer2))
} else {
lastState
}