Add validations for PariRDDSamples

This commit is contained in:
Guan Wang 2016-01-08 16:08:04 +08:00
Родитель 15ab18b687
Коммит 69ecd357ef
6 изменённых файлов: 256 добавлений и 102 удалений

Просмотреть файл

@ -63,6 +63,7 @@
<Compile Include="Configuration\IConfigurationService.cs" />
<Compile Include="Core\Accumulator.cs" />
<Compile Include="Core\Broadcast.cs" />
<Compile Include="Core\Option.cs" />
<Compile Include="Core\RDDCollector.cs" />
<Compile Include="Core\DoubleRDDFunctions.cs" />
<Compile Include="Core\IRDDCollector.cs" />

Просмотреть файл

@ -0,0 +1,37 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
namespace Microsoft.Spark.CSharp.Core
{
/// <summary>
/// Container for an optional value of type T. If the value of type T is present, the Option.IsDefined is TRUE and GetValue() return the value.
/// If the value is absent, the Option.IsDefined is FALSE, exception will be thrown when calling GetValue().
/// </summary>
/// <typeparam name="T"></typeparam>
[Serializable]
public class Option<T>
{
private bool isDefined = false;
private T value;
public Option()
{ }
public Option(T value)
{
isDefined = true;
this.value = value;
}
public bool IsDefined { get { return isDefined; } }
public T GetValue()
{
if (isDefined) return value;
throw new ArgumentException("Value is not defined.");
}
}
}

Просмотреть файл

@ -4,16 +4,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Reflection;
using System.IO;
using System.Security.Cryptography;
using Microsoft.Spark.CSharp.Interop;
namespace Microsoft.Spark.CSharp.Core
{
/// <summary>
@ -156,9 +151,9 @@ namespace Microsoft.Spark.CSharp.Core
}
/// <summary>
/// Return an RDD containing all pairs of elements with matching keys in C{self} and C{other}.
/// Return an RDD containing all pairs of elements with matching keys in this RDD and <paramref name="other"/>.
///
/// Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in C{self} and (k, v2) is in C{other}.
/// Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this RDD and (k, v2) is in <paramref name="other"/>.
///
/// Performs a hash join across the cluster.
///
@ -189,11 +184,11 @@ namespace Microsoft.Spark.CSharp.Core
}
/// <summary>
/// Perform a left outer join of C{self} and C{other}.
/// Perform a left outer join of this RDD and <paramref name="other"/>.
///
/// For each element (k, v) in C{self}, the resulting RDD will either
/// contain all pairs (k, (v, w)) for w in C{other}, or the pair
/// (k, (v, None)) if no elements in C{other} have key k.
/// For each element (k, v) in this RDD, the resulting RDD will either
/// contain all pairs (k, (v, Option)) for w in <paramref name="other"/>, where Option.IsDefined is TRUE, or the pair
/// (k, (v, Option)) if no elements in <paramref name="other"/> have key k, where Option.IsDefined is FALSE.
///
/// Hash-partitions the resulting RDD into the given number of partitions.
///
@ -203,7 +198,8 @@ namespace Microsoft.Spark.CSharp.Core
/// new[] { new KeyValuePair&lt;string, int>("a", 2) }, 1);
/// var m = l.LeftOuterJoin(r).Collect();
///
/// [('a', (1, 2)), ('b', (4, None))]
/// [('a', (1, 2)), ('b', (4, Option))]
/// * Option.IsDefined = FALSE
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
@ -212,22 +208,21 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="other"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static RDD<KeyValuePair<K, Tuple<V, W>>> LeftOuterJoin<K, V, W>(
public static RDD<KeyValuePair<K, Tuple<V, Option<W>>>> LeftOuterJoin<K, V, W>(
this RDD<KeyValuePair<K, V>> self,
RDD<KeyValuePair<K, W>> other,
int numPartitions = 0)
{
return self.GroupWith(other, numPartitions).FlatMapValues(
input => input.Item1.SelectMany(v => input.Item2.DefaultIfEmpty().Select(w => new Tuple<V, W>(v, w)))
);
input => input.Item1.SelectMany(v => input.Item2.NullIfEmpty().Select(optionW => new Tuple<V, Option<W>>(v, optionW))) );
}
/// <summary>
/// Perform a right outer join of C{self} and C{other}.
/// Perform a right outer join of this RDD and <paramref name="other"/>.
///
/// For each element (k, w) in C{other}, the resulting RDD will either
/// contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
/// if no elements in C{self} have key k.
/// For each element (k, w) in <paramref name="other"/>, the resulting RDD will either
/// contain all pairs (k, (Option, w)) for v in this, where Option.IsDefined is TRUE, or the pair (k, (Option, w))
/// if no elements in this RDD have key k, where Option.IsDefined is FALSE.
///
/// Hash-partitions the resulting RDD into the given number of partitions.
///
@ -237,7 +232,8 @@ namespace Microsoft.Spark.CSharp.Core
/// new[] { new KeyValuePair&lt;string, int>("a", 1), new KeyValuePair&lt;string, int>("b", 4) }, 1);
/// var m = l.RightOuterJoin(r).Collect();
///
/// [('a', (2, 1)), ('b', (None, 4))]
/// [('a', (2, 1)), ('b', (Option, 4))]
/// * Option.IsDefined = FALSE
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
@ -246,13 +242,13 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="other"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static RDD<KeyValuePair<K, Tuple<V, W>>> RightOuterJoin<K, V, W>(
public static RDD<KeyValuePair<K, Tuple<Option<V>, W>>> RightOuterJoin<K, V, W>(
this RDD<KeyValuePair<K, V>> self,
RDD<KeyValuePair<K, W>> other,
int numPartitions = 0)
{
return self.GroupWith(other, numPartitions).FlatMapValues(
input => input.Item1.DefaultIfEmpty().SelectMany(v => input.Item2.Select(w => new Tuple<V, W>(v, w)))
input => input.Item1.NullIfEmpty().SelectMany(v => input.Item2.Select(w => new Tuple<Option<V>, W>(v, w)))
);
}
@ -285,13 +281,13 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="other"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static RDD<KeyValuePair<K, Tuple<V, W>>> FullOuterJoin<K, V, W>(
public static RDD<KeyValuePair<K, Tuple<Option<V>, Option<W>>>> FullOuterJoin<K, V, W>(
this RDD<KeyValuePair<K, V>> self,
RDD<KeyValuePair<K, W>> other,
int numPartitions = 0)
{
return self.GroupWith(other, numPartitions).FlatMapValues(
input => input.Item1.DefaultIfEmpty().SelectMany(v => input.Item2.DefaultIfEmpty().Select(w => new Tuple<V, W>(v, w)))
input => input.Item1.NullIfEmpty().SelectMany(v => input.Item2.NullIfEmpty().Select(w => new Tuple<Option<V>, Option<W>>(v, w)))
);
}
@ -986,5 +982,10 @@ namespace Microsoft.Spark.CSharp.Core
return input.Key.ToString() == key.ToString();
}
}
public static List<Option<T>> NullIfEmpty<T>(this IEnumerable<T> list)
{
return list.Any() ? list.Select(v => new Option<T>(v)).ToList() : new List<Option<T>>() { new Option<T>() };
}
}
}

Просмотреть файл

@ -166,12 +166,12 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <param name="other"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static DStream<KeyValuePair<K, Tuple<V, W>>> LeftOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
public static DStream<KeyValuePair<K, Tuple<V, Option<W>>>> LeftOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new LeftOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, Option<W>>>>(new LeftOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
}
/// <summary>
@ -185,12 +185,12 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <param name="other"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static DStream<KeyValuePair<K, Tuple<V, W>>> RightOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
public static DStream<KeyValuePair<K, Tuple<Option<V>, W>>> RightOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new RightOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<Option<V>, W>>>(new RightOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
}
/// <summary>
@ -204,12 +204,12 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <param name="other"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static DStream<KeyValuePair<K, Tuple<V, W>>> FullOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
public static DStream<KeyValuePair<K, Tuple<Option<V>, Option<W>>>> FullOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new FullOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<Option<V>, Option<W>>>>(new FullOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
}
/// <summary>
@ -487,7 +487,7 @@ namespace Microsoft.Spark.CSharp.Streaming
this.numPartitions = numPartitions;
}
internal RDD<KeyValuePair<K, Tuple<V, W>>> Execute(RDD<KeyValuePair<K, V>> l, RDD<KeyValuePair<K, W>> r)
internal RDD<KeyValuePair<K, Tuple<V, Option<W>>>> Execute<K,V,W>(RDD<KeyValuePair<K, V>> l, RDD<KeyValuePair<K, W>> r)
{
return l.LeftOuterJoin<K, V, W>(r, numPartitions);
}
@ -502,7 +502,7 @@ namespace Microsoft.Spark.CSharp.Streaming
this.numPartitions = numPartitions;
}
internal RDD<KeyValuePair<K, Tuple<V, W>>> Execute(RDD<KeyValuePair<K, V>> l, RDD<KeyValuePair<K, W>> r)
internal RDD<KeyValuePair<K, Tuple<Option<V>, W>>> Execute(RDD<KeyValuePair<K, V>> l, RDD<KeyValuePair<K, W>> r)
{
return l.RightOuterJoin<K, V, W>(r, numPartitions);
}
@ -517,7 +517,7 @@ namespace Microsoft.Spark.CSharp.Streaming
this.numPartitions = numPartitions;
}
internal RDD<KeyValuePair<K, Tuple<V, W>>> Execute(RDD<KeyValuePair<K, V>> l, RDD<KeyValuePair<K, W>> r)
internal RDD<KeyValuePair<K, Tuple<Option<V>, Option<W>>>> Execute(RDD<KeyValuePair<K, V>> l, RDD<KeyValuePair<K, W>> r)
{
return l.FullOuterJoin<K, V, W>(r, numPartitions);
}

Просмотреть файл

@ -4,12 +4,9 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop;
using NUnit.Framework;
namespace Microsoft.Spark.CSharp.Samples
{
@ -18,34 +15,52 @@ namespace Microsoft.Spark.CSharp.Samples
[Sample]
internal static void PairRDDCollectAsMapSample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<int, int>(1, 2), new KeyValuePair<int, int>(3, 4) }, 1).CollectAsMap();
var map = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<int, int>(1, 2), new KeyValuePair<int, int>(3, 4) }, 1).CollectAsMap();
foreach (var kv in m)
foreach (var kv in map)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(map.ContainsKey(1) && map[1] == 2);
Assert.IsTrue(map.ContainsKey(1) && map[3] == 4);
}
}
[Sample]
internal static void PairRDDKeysSample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<int, int>(1, 2), new KeyValuePair<int, int>(3, 4) }, 1).Keys().Collect();
var keys = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<int, int>(1, 2), new KeyValuePair<int, int>(3, 4) }, 1).Keys().Collect();
Console.WriteLine(m[0]);
Console.WriteLine(m[1]);
Console.WriteLine(keys[0]);
Console.WriteLine(keys[1]);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(1, keys[0]);
Assert.AreEqual(3, keys[1]);
}
}
[Sample]
internal static void PairRDDValuesSample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<int, int>(1, 2), new KeyValuePair<int, int>(3, 4) }, 1).Values().Collect();
var values = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<int, int>(1, 2), new KeyValuePair<int, int>(3, 4) }, 1).Values().Collect();
Console.WriteLine(m[0]);
Console.WriteLine(m[1]);
Console.WriteLine(values[0]);
Console.WriteLine(values[1]);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, values[0]);
Assert.AreEqual(4, values[1]);
}
}
[Sample]
internal static void PairRDDReduceByKeySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var reduced = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
@ -54,14 +69,20 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.ReduceByKey((x, y) => x + y).Collect();
foreach (var kv in m)
foreach (var kv in reduced)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(reduced.Contains(new KeyValuePair<string, int>("a", 2)));
Assert.IsTrue(reduced.Contains(new KeyValuePair<string, int>("b", 1)));
}
}
[Sample]
internal static void PairRDDReduceByKeyLocallySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var reduced = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
@ -70,14 +91,20 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.ReduceByKeyLocally((x, y) => x + y);
foreach (var kv in m)
foreach (var kv in reduced)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(reduced.Contains(new KeyValuePair<string, int>("a", 2)));
Assert.IsTrue(reduced.Contains(new KeyValuePair<string, int>("b", 1)));
}
}
[Sample]
internal static void PairRDDCountByKeySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var countByKey = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
@ -86,8 +113,14 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.CountByKey();
foreach (var kv in m)
foreach (var kv in countByKey)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, countByKey["a"]);
Assert.AreEqual(1, countByKey["b"]);
}
}
[Sample]
@ -107,10 +140,16 @@ namespace Microsoft.Spark.CSharp.Samples
new KeyValuePair<string, int>("a", 3),
}, 1);
var m = l.Join(r, 2).Collect();
var joined = l.Join(r, 2).Collect();
foreach (var kv in m)
foreach (var kv in joined)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(joined.Contains(new KeyValuePair<string, Tuple<int, int>>("a", new Tuple<int, int>(1, 2))));
Assert.IsTrue(joined.Contains(new KeyValuePair<string, Tuple<int, int>>("a", new Tuple<int, int>(1, 3))));
}
}
[Sample]
@ -129,10 +168,16 @@ namespace Microsoft.Spark.CSharp.Samples
new KeyValuePair<string, int>("a", 2),
}, 1);
var m = l.LeftOuterJoin(r).Collect();
var joined = l.LeftOuterJoin(r).Collect();
foreach (var kv in m)
foreach (var kv in joined)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(joined.Any(kv => kv.Key == "a" && kv.Value.Item1 == 1 && kv.Value.Item2.IsDefined && kv.Value.Item2.GetValue() == 2));
Assert.IsTrue(joined.Any(kv => kv.Key == "b" && kv.Value.Item1 == 4 && !kv.Value.Item2.IsDefined));
}
}
[Sample]
@ -151,10 +196,16 @@ namespace Microsoft.Spark.CSharp.Samples
new KeyValuePair<string, int>("b", 4),
}, 2);
var m = l.RightOuterJoin(r).Collect();
var joined = l.RightOuterJoin(r).Collect();
foreach (var kv in m)
foreach (var kv in joined)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(joined.Any(kv => kv.Key == "a" && kv.Value.Item1.IsDefined && kv.Value.Item1.GetValue() == 2 && kv.Value.Item2 == 1));
Assert.IsTrue(joined.Any(kv => kv.Key == "b" && !kv.Value.Item1.IsDefined && kv.Value.Item2 == 4));
}
}
[Sample]
@ -174,35 +225,50 @@ namespace Microsoft.Spark.CSharp.Samples
new KeyValuePair<string, int>("c", 8),
}, 2);
var m = l.FullOuterJoin(r).Collect();
var joined = l.FullOuterJoin(r).Collect();
foreach (var kv in m)
foreach (var kv in joined)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(joined.Any(kv => kv.Key == "a" && kv.Value.Item1.IsDefined && kv.Value.Item1.GetValue() == 1 &&
kv.Value.Item2.IsDefined && kv.Value.Item2.GetValue() == 2));
Assert.IsTrue(joined.Any(kv => kv.Key == "b" && kv.Value.Item1.IsDefined && kv.Value.Item1.GetValue() == 4 &&
!kv.Value.Item2.IsDefined));
Assert.IsTrue(joined.Any(kv => kv.Key == "c" && !kv.Value.Item1.IsDefined &&
kv.Value.Item2.IsDefined && kv.Value.Item2.GetValue() == 8));
}
}
[Sample]
internal static void PairRDDPartitionBySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(new[] { 1, 2, 3, 4, 2, 4, 1 }, 1)
var partitionBy = SparkCLRSamples.SparkContext.Parallelize(new[] { 1, 2, 3, 4, 2, 4, 1 }, 1)
.Map(x => new KeyValuePair<int, int>(x, x))
.PartitionBy(3)
.Glom()
.Collect();
foreach (var a in m)
foreach (var partition in partitionBy)
{
foreach (var kv in a)
foreach (var kv in partition)
{
Console.Write(kv + " ");
}
Console.WriteLine();
}
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(3, partitionBy.Length);
}
}
[Sample]
internal static void PairRDDCombineByKeySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var combineByKey = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
@ -211,14 +277,20 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(CultureInfo.InvariantCulture), (x, y) => x + y).Collect();
foreach (var kv in m)
foreach (var kv in combineByKey)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(combineByKey.Contains(new KeyValuePair<string, string>("a", "11")));
Assert.IsTrue(combineByKey.Contains(new KeyValuePair<string, string>("b", "1")));
}
}
[Sample]
internal static void PairRDDAggregateByKeySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var aggregateByKey = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
@ -227,14 +299,20 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.AggregateByKey(() => 0, (x, y) => x + y, (x, y) => x + y).Collect();
foreach (var kv in m)
foreach (var kv in aggregateByKey)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(aggregateByKey.Contains(new KeyValuePair<string, int>("a", 2)));
Assert.IsTrue(aggregateByKey.Contains(new KeyValuePair<string, int>("b", 1)));
}
}
[Sample]
internal static void PairRDDFoldByKeySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var FoldByKey = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
@ -243,30 +321,42 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.FoldByKey(() => 0, (x, y) => x + y).Collect();
foreach (var kv in m)
foreach (var kv in FoldByKey)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(FoldByKey.Contains(new KeyValuePair<string, int>("a", 2)));
Assert.IsTrue(FoldByKey.Contains(new KeyValuePair<string, int>("b", 1)));
}
}
[Sample]
internal static void PairRDDGroupByKeySample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var groupByKey = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, int>("a", 1),
new KeyValuePair<string, int>("b", 1),
new KeyValuePair<string, int>("a", 1)
}, 2)
.GroupByKey().MapValues(l => string.Join(" ", l)).Collect();
.GroupByKey().Collect();
foreach (var kv in m)
Console.WriteLine(kv);
foreach (var kv in groupByKey)
Console.WriteLine(kv.Key + ", " + "(" + string.Join(",", kv.Value) + ")");
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(groupByKey.Any(kv => kv.Key == "a" && kv.Value.Count == 2 && kv.Value[0] == 1 && kv.Value[1] == 1));
Assert.IsTrue(groupByKey.Any(kv => kv.Key == "b" && kv.Value.Count == 1 && kv.Value[0] == 1));
}
}
[Sample]
internal static void PairRDDMapValuesSample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var mapValues = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, string[]>("a", new[]{"apple", "banana", "lemon"}),
@ -274,14 +364,20 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.MapValues(x => x.Length).Collect();
foreach (var kv in m)
foreach (var kv in mapValues)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(mapValues.Any(kv => kv.Key == "a" && kv.Value == 3));
Assert.IsTrue(mapValues.Any(kv => kv.Key == "b" && kv.Value == 1));
}
}
[Sample]
internal static void PairRDDFlatMapValuesSample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(
var flatMapValues = SparkCLRSamples.SparkContext.Parallelize(
new[]
{
new KeyValuePair<string, string[]>("a", new[]{"x", "y", "z"}),
@ -289,8 +385,17 @@ namespace Microsoft.Spark.CSharp.Samples
}, 2)
.FlatMapValues(x => x).Collect();
foreach (var kv in m)
foreach (var kv in flatMapValues)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(flatMapValues.Any(kv => kv.Key == "a" && kv.Value == "x"));
Assert.IsTrue(flatMapValues.Any(kv => kv.Key == "a" && kv.Value == "y"));
Assert.IsTrue(flatMapValues.Any(kv => kv.Key == "a" && kv.Value == "z"));
Assert.IsTrue(flatMapValues.Any(kv => kv.Key == "b" && kv.Value == "p"));
Assert.IsTrue(flatMapValues.Any(kv => kv.Key == "b" && kv.Value == "r"));
}
}
[Sample]
@ -299,10 +404,16 @@ namespace Microsoft.Spark.CSharp.Samples
var x = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 1), new KeyValuePair<string, int>("b", 4)}, 2);
var y = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 2)}, 1);
var m = x.GroupWith(y).MapValues(l => string.Join(" ", l.Item1) + " : " + string.Join(" ", l.Item2)).Collect();
var groupWith = x.GroupWith(y).Collect();
foreach (var kv in m)
Console.WriteLine(kv);
foreach (var kv in groupWith)
Console.WriteLine(kv.Key + ", " + "(" + string.Join(",", kv.Value) + ")");
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(groupWith.Any(kv => kv.Key == "a" && kv.Value.Item1[0] == 1 && kv.Value.Item2[0] == 2));
Assert.IsTrue(groupWith.Any(kv => kv.Key == "b" && kv.Value.Item1[0] == 4 && !kv.Value.Item2.Any()));
}
}
[Sample]
@ -312,24 +423,16 @@ namespace Microsoft.Spark.CSharp.Samples
var y = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 1), new KeyValuePair<string, int>("b", 4) }, 2);
var z = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 2) }, 1);
var m = x.GroupWith(y, z).MapValues(l => string.Join(" ", l.Item1) + " : " + string.Join(" ", l.Item2) + " : " + string.Join(" ", l.Item3)).Collect();
var groupWith = x.GroupWith(y, z).Collect();
foreach (var kv in m)
Console.WriteLine(kv);
}
foreach (var kv in groupWith)
Console.WriteLine(kv.Key + ", " + "(" + string.Join(",", kv.Value) + ")");
[Sample]
internal static void PairRDDGroupWithSample3()
{
var x = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 5), new KeyValuePair<string, int>("b", 6) }, 2);
var y = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 1), new KeyValuePair<string, int>("b", 4) }, 2);
var z = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("a", 2) }, 1);
var w = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int>("b", 42) }, 1);
var m = x.GroupWith(y, z, w).MapValues(l => string.Join(" ", l.Item1) + " : " + string.Join(" ", l.Item2) + " : " + string.Join(" ", l.Item3) + " : " + string.Join(" ", l.Item4)).Collect();
foreach (var kv in m)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(groupWith.Any(kv => kv.Key == "a" && kv.Value.Item1[0] == 5 && kv.Value.Item2[0] == 1 && kv.Value.Item3[0] == 2));
Assert.IsTrue(groupWith.Any(kv => kv.Key == "b" && kv.Value.Item1[0] == 6 && kv.Value.Item2[0] == 4 && !kv.Value.Item3.Any()));
}
}
//TO DO: implement PairRDDFunctions.SampleByKey
@ -349,18 +452,33 @@ namespace Microsoft.Spark.CSharp.Samples
var x = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int?>("a", 1), new KeyValuePair<string, int?>("b", 4), new KeyValuePair<string, int?>("b", 5), new KeyValuePair<string, int?>("a", 2) }, 2);
var y = SparkCLRSamples.SparkContext.Parallelize(new[] { new KeyValuePair<string, int?>("a", 3), new KeyValuePair<string, int?>("c", null) }, 2);
var m = x.SubtractByKey(y).Collect();
var subtractByKey = x.SubtractByKey(y).Collect();
foreach (var kv in m)
foreach (var kv in subtractByKey)
Console.WriteLine(kv);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, subtractByKey.Length);
subtractByKey.Contains(new KeyValuePair<string, int?>("b", 4));
subtractByKey.Contains(new KeyValuePair<string, int?>("b", 5));
}
}
[Sample]
internal static void PairRDDLookupSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 1000).Zip(Enumerable.Range(0, 1000), (x, y) => new KeyValuePair<int, int>(x, y)), 10);
Console.WriteLine(string.Join(",", rdd.Lookup(42)));
Console.WriteLine(string.Join(",", rdd.Lookup(1024)));
var lookup42 = rdd.Lookup(42);
var lookup1024 = rdd.Lookup(1024);
Console.WriteLine(string.Join(",", lookup42));
Console.WriteLine(string.Join(",", lookup1024));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEqual(new[] { 42 }, lookup42);
Assert.AreEqual(0, lookup1024.Length);
}
}
}
}

Просмотреть файл

@ -5,10 +5,7 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop;
using NUnit.Framework;
namespace Microsoft.Spark.CSharp.Samples