Fix ReduceByKey failure when the value type is a reference type

This commit is contained in:
Attila Szucs 2016-12-04 07:49:00 +01:00 коммит произвёл Kaarthik Sivashanmugam
Родитель 12c46c8ed1
Коммит 88f76af038
2 изменённых файлов: 38 добавлений и 1 удалений

Просмотреть файл

@ -97,7 +97,11 @@ namespace Microsoft.Spark.CSharp.Core
/// <returns></returns> /// <returns></returns>
public static RDD<KeyValuePair<K, V>> ReduceByKey<K, V>(this RDD<KeyValuePair<K, V>> self, Func<V, V, V> reduceFunc, int numPartitions = 0) public static RDD<KeyValuePair<K, V>> ReduceByKey<K, V>(this RDD<KeyValuePair<K, V>> self, Func<V, V, V> reduceFunc, int numPartitions = 0)
{ {
return CombineByKey(self, () => default(V), reduceFunc, reduceFunc, numPartitions); var locallyCombined = self.MapPartitionsWithIndex(new GroupByMergeHelper<K, V>(reduceFunc).Execute, true);
var shuffled = locallyCombined.PartitionBy(numPartitions);
return shuffled.MapPartitionsWithIndex(new GroupByMergeHelper<K, V>(reduceFunc).Execute, true);
} }
/// <summary> /// <summary>

Просмотреть файл

@ -105,6 +105,39 @@ namespace AdapterTest
} }
} }
[Serializable]
private class IntWrapper
{
public IntWrapper(int value)
{
Value = value;
}
public int Value { get; }
}
[Test]
public void TestPairRddReduceByKeyWithObjects()
{
// The ReduceByKey method below fails with NPE if ReduceByKey
// calls CombineByKey with () => default(V) as seed generator
var sums = pairs
.MapValues(value => new IntWrapper(value))
.ReduceByKey((x, y) => new IntWrapper(x.Value + y.Value));
var result = sums
.CollectAsMap()
.Select(pair => new KeyValuePair<string, int>(pair.Key, pair.Value.Value))
.ToList();
var expectedResult = pairs
.ReduceByKey((x, y) => x + y)
.CollectAsMap()
.ToList();
Assert.That(result, Is.EquivalentTo(expectedResult));
}
[Test] [Test]
public void TestPairRddFoldByKey() public void TestPairRddFoldByKey()
{ {