restrict V type of foldByKey in order to retain ClassManifest; added foldByKey to Java API and test

This commit is contained in:
Mark Hamstra 2013-03-14 13:58:37 -07:00
Родитель b1422cbdd5
Коммит 16a4ca4537
3 изменённых файлов: 31 добавлений и 3 удалений

Просмотреть файл

@ -91,8 +91,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/**
* Merge the values for each key using an associative function and a neutral "zero value".
*/
def foldByKey[V1 >: V](zeroValue: V1)(op: (V1, V1) => V1): RDD[(K, V1)] = {
groupByKey.mapValues(seq => seq.fold(zeroValue)(op))
def foldByKey(zeroValue: V)(op: (V, V) => V): RDD[(K, V)] = {
groupByKey.mapValues(seq => seq.fold[V](zeroValue)(op))
}
/**

Просмотреть файл

@ -160,6 +160,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
/**
* Merge the values for each key using an associative function and a neutral "zero value".
*/
def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue)(func))
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a

Просмотреть файл

@ -196,6 +196,28 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(33, sum);
}
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(3, 2),
new Tuple2<Integer, Integer>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) {
return a + b;
}
});
Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
}
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(