DStream ReduceByKey will fail with reference types

This commit is contained in:
Kaarthik Sivashanmugam 2017-02-12 01:31:15 -08:00 коммит произвёл GitHub
Родитель e8c8f4db92 9f60e5ec57
Коммит 14d9eb4f46
3 изменённых файлов: 52 добавлений и 3 удалений

Просмотреть файл

@ -30,7 +30,11 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <returns></returns>
public static DStream<Tuple<K, V>> ReduceByKey<K, V>(this DStream<Tuple<K, V>> self, Func<V, V, V> reduceFunc, int numPartitions = 0)
{
return self.CombineByKey(() => default(V), reduceFunc, reduceFunc, numPartitions);
var locallyCombined = self.MapPartitionsWithIndex(new GroupByMergeHelper<K, V>(reduceFunc).Execute, true);
var shuffled = locallyCombined.PartitionBy(numPartitions);
return shuffled.MapPartitionsWithIndex(new GroupByMergeHelper<K, V>(reduceFunc).Execute, true);
}
/// <summary>
@ -427,6 +431,24 @@ namespace Microsoft.Spark.CSharp.Streaming
/// for execution, it is necessary to have the type marked [Serializable]. These classes are to work around the limitation
/// on the serializability of compiler generated types
/// </summary>
[Serializable]
internal class GroupByMergeHelper<K, C>
{
private readonly Func<C, C, C> mergeCombiners;
public GroupByMergeHelper(Func<C, C, C> mc)
{
mergeCombiners = mc;
}
public IEnumerable<Tuple<K, C>> Execute(int pid, IEnumerable<Tuple<K, C>> input)
{
return input.GroupBy(
kvp => kvp.Item1,
kvp => kvp.Item2,
(k, v) => new Tuple<K, C>(k, v.Aggregate(mergeCombiners))
);
}
}
[Serializable]
internal class CombineByKeyHelper<K, V, C>

Просмотреть файл

@ -8658,7 +8658,7 @@
the given function on the previous state of the key and the new values of the key.
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Streaming.CombineByKeyHelper`3">
<member name="T:Microsoft.Spark.CSharp.Streaming.GroupByMergeHelper`2">
<summary>
Following classes are defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
private anonymous type that is not marked serializable. Since the delegate has to be serialized and sent to the Spark workers

Просмотреть файл

@ -243,6 +243,33 @@ namespace AdapterTest
});
}
[Test]
public void TestDStreamGroupByKeyAndWindow()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
Assert.IsNotNull(lines.DStreamProxy);
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new Tuple<string, int>(w, 1));
var doubleCounts = pairs.GroupByKeyAndWindow(1000, 0).FlatMapValues(vs => vs).ReduceByKey((x, y) => x + y);
doubleCounts.ForeachRDD((time, rdd) =>
{
var taken = rdd.Collect();
Assert.AreEqual(taken.Length, 9);
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>) record;
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2);
}
});
}
[Test]
public void TestDStreamUpdateStateByKey()
{
@ -265,7 +292,7 @@ namespace AdapterTest
foreach (object record in taken)
{
Tuple<string, int> countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22);
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2);
}
});