From 0260524872656edc97e8018424d3fd6be1b37544 Mon Sep 17 00:00:00 2001 From: dwnichols Date: Wed, 14 Dec 2016 14:25:15 -0500 Subject: [PATCH 1/2] DStream ReduceByKey will fail with reference types This fix depends on #605, and is related to #602 --- .../Streaming/PairDStreamFunctions.cs | 25 +++++++++++++++- .../Microsoft.Spark.CSharp.Adapter.Doc.XML | 2 +- csharp/AdapterTest/DStreamTest.cs | 29 ++++++++++++++++++- 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs index adaa3bd..4aaf576 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs @@ -30,7 +30,12 @@ namespace Microsoft.Spark.CSharp.Streaming /// public static DStream> ReduceByKey(this DStream> self, Func reduceFunc, int numPartitions = 0) { - return self.CombineByKey(() => default(V), reduceFunc, reduceFunc, numPartitions); + var locallyCombined = self.MapPartitionsWithIndex(new GroupByMergeHelper(reduceFunc).Execute, true); + + var shuffled = locallyCombined.PartitionBy(numPartitions); + + return shuffled.MapPartitionsWithIndex(new GroupByMergeHelper(reduceFunc).Execute, true); + //return self.CombineByKey(() => default(V) == null ? (V) Activator.CreateInstance(typeof(V)) : default(V), reduceFunc, reduceFunc, numPartitions); } /// @@ -427,6 +432,24 @@ namespace Microsoft.Spark.CSharp.Streaming /// for execution, it is necessary to have the type marked [Serializable]. These classes are to work around the limitation /// on the serializability of compiler generated types /// + [Serializable] + internal class GroupByMergeHelper + { + private readonly Func mergeCombiners; + public GroupByMergeHelper(Func mc) + { + mergeCombiners = mc; + } + + public IEnumerable> Execute(int pid, IEnumerable> input) + { + return input.GroupBy( + kvp => kvp.Item1, + kvp => kvp.Item2, + (k, v) => new Tuple(k, v.Aggregate(mergeCombiners)) + ); + } + } [Serializable] internal class CombineByKeyHelper diff --git a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML index 0f4c49c..471cc8a 100644 --- a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML +++ b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML @@ -8488,7 +8488,7 @@ the given function on the previous state of the key and the new values of the key. - + Following classes are defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating private anonymous type that is not marked serializable. Since the delegate has to be serialized and sent to the Spark workers diff --git a/csharp/AdapterTest/DStreamTest.cs b/csharp/AdapterTest/DStreamTest.cs index c6a363b..0683892 100644 --- a/csharp/AdapterTest/DStreamTest.cs +++ b/csharp/AdapterTest/DStreamTest.cs @@ -243,6 +243,33 @@ namespace AdapterTest }); } + [Test] + public void TestDStreamGroupByKeyAndWindow() + { + var ssc = new StreamingContext(new SparkContext("", ""), 1000L); + Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy)); + + var lines = ssc.TextFileStream(Path.GetTempPath()); + Assert.IsNotNull(lines.DStreamProxy); + + var words = lines.FlatMap(l => l.Split(' ')); + + var pairs = words.Map(w => new Tuple(w, 1)); + + var doubleCounts = pairs.GroupByKeyAndWindow(1000, 0).FlatMapValues(vs => vs).ReduceByKey((x, y) => x + y); + doubleCounts.ForeachRDD((time, rdd) => + { + var taken = rdd.Collect(); + Assert.AreEqual(taken.Length, 9); + + foreach (object record in taken) + { + Tuple countByWord = (Tuple) record; + Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2); + } + }); + } + [Test] public void TestDStreamUpdateStateByKey() { @@ -265,7 +292,7 @@ namespace AdapterTest foreach (object record in taken) { Tuple countByWord = (Tuple)record; - Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22); + Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2); } }); From 9f60e5ec573d0397011ae3485901688b59786fde Mon Sep 17 00:00:00 2001 From: dwnichols Date: Tue, 31 Jan 2017 12:50:56 -0500 Subject: [PATCH 2/2] Remove unnecessary comment from previous attempt --- .../Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs index 4aaf576..a5e8e12 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/PairDStreamFunctions.cs @@ -35,7 +35,6 @@ namespace Microsoft.Spark.CSharp.Streaming var shuffled = locallyCombined.PartitionBy(numPartitions); return shuffled.MapPartitionsWithIndex(new GroupByMergeHelper(reduceFunc).Execute, true); - //return self.CombineByKey(() => default(V) == null ? (V) Activator.CreateInstance(typeof(V)) : default(V), reduceFunc, reduceFunc, numPartitions); } ///