DStream ReduceByKey will fail with reference types
This fix depends on #605, and is related to #602
This commit is contained in:
Родитель
ed73082895
Коммит
0260524872
|
@ -30,7 +30,12 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
/// <returns></returns>
|
||||
public static DStream<Tuple<K, V>> ReduceByKey<K, V>(this DStream<Tuple<K, V>> self, Func<V, V, V> reduceFunc, int numPartitions = 0)
|
||||
{
|
||||
return self.CombineByKey(() => default(V), reduceFunc, reduceFunc, numPartitions);
|
||||
var locallyCombined = self.MapPartitionsWithIndex(new GroupByMergeHelper<K, V>(reduceFunc).Execute, true);
|
||||
|
||||
var shuffled = locallyCombined.PartitionBy(numPartitions);
|
||||
|
||||
return shuffled.MapPartitionsWithIndex(new GroupByMergeHelper<K, V>(reduceFunc).Execute, true);
|
||||
//return self.CombineByKey(() => default(V) == null ? (V) Activator.CreateInstance(typeof(V)) : default(V), reduceFunc, reduceFunc, numPartitions);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -427,6 +432,24 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
/// for execution, it is necessary to have the type marked [Serializable]. These classes are to work around the limitation
|
||||
/// on the serializability of compiler generated types
|
||||
/// </summary>
|
||||
[Serializable]
|
||||
internal class GroupByMergeHelper<K, C>
|
||||
{
|
||||
private readonly Func<C, C, C> mergeCombiners;
|
||||
public GroupByMergeHelper(Func<C, C, C> mc)
|
||||
{
|
||||
mergeCombiners = mc;
|
||||
}
|
||||
|
||||
public IEnumerable<Tuple<K, C>> Execute(int pid, IEnumerable<Tuple<K, C>> input)
|
||||
{
|
||||
return input.GroupBy(
|
||||
kvp => kvp.Item1,
|
||||
kvp => kvp.Item2,
|
||||
(k, v) => new Tuple<K, C>(k, v.Aggregate(mergeCombiners))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
[Serializable]
|
||||
internal class CombineByKeyHelper<K, V, C>
|
||||
|
|
|
@ -8488,7 +8488,7 @@
|
|||
the given function on the previous state of the key and the new values of the key.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Streaming.CombineByKeyHelper`3">
|
||||
<member name="T:Microsoft.Spark.CSharp.Streaming.GroupByMergeHelper`2">
|
||||
<summary>
|
||||
Following classes are defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
|
||||
private anonymous type that is not marked serializable. Since the delegate has to be serialized and sent to the Spark workers
|
||||
|
|
|
@ -243,6 +243,33 @@ namespace AdapterTest
|
|||
});
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void TestDStreamGroupByKeyAndWindow()
|
||||
{
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
|
||||
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
|
||||
|
||||
var lines = ssc.TextFileStream(Path.GetTempPath());
|
||||
Assert.IsNotNull(lines.DStreamProxy);
|
||||
|
||||
var words = lines.FlatMap(l => l.Split(' '));
|
||||
|
||||
var pairs = words.Map(w => new Tuple<string, int>(w, 1));
|
||||
|
||||
var doubleCounts = pairs.GroupByKeyAndWindow(1000, 0).FlatMapValues(vs => vs).ReduceByKey((x, y) => x + y);
|
||||
doubleCounts.ForeachRDD((time, rdd) =>
|
||||
{
|
||||
var taken = rdd.Collect();
|
||||
Assert.AreEqual(taken.Length, 9);
|
||||
|
||||
foreach (object record in taken)
|
||||
{
|
||||
Tuple<string, int> countByWord = (Tuple<string, int>) record;
|
||||
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void TestDStreamUpdateStateByKey()
|
||||
{
|
||||
|
@ -265,7 +292,7 @@ namespace AdapterTest
|
|||
foreach (object record in taken)
|
||||
{
|
||||
Tuple<string, int> countByWord = (Tuple<string, int>)record;
|
||||
Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22);
|
||||
Assert.AreEqual(countByWord.Item1 == "The" || countByWord.Item1 == "dog" || countByWord.Item1 == "lazy" ? 2 * 23 : 2 * 22, countByWord.Item2);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче