This commit is contained in:
Guan Wang 2016-01-06 10:55:58 +08:00
Родитель 61e2b2338e
Коммит 9ddb8d1b3c
1 изменённых файлов: 278 добавлений и 41 удалений

Просмотреть файл

@ -29,16 +29,33 @@ namespace Microsoft.Spark.CSharp.Samples
internal static void RDDSampleSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 100), 4);
Console.WriteLine(rdd.Sample(false, 0.1, 81).Count());
var sample = rdd.Sample(false, 0.1, 81);
Console.WriteLine(sample.Count());
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
var collectedOfOriginal = rdd.Collect();
var collectedOfSample = sample.Collect();
// sampled RDD should be a subset of original RDD
CollectionAssert.AreEqual(collectedOfSample, collectedOfOriginal.Intersect(collectedOfSample));
}
}
[Sample]
internal static void RDDRandomSplitSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 500), 1);
var rdds = rdd.RandomSplit(new double[] { 2, 3 }, 17);
Console.WriteLine(rdds[0].Count());
Console.WriteLine(rdds[1].Count());
var splitted = rdd.RandomSplit(new double[] { 2, 3 }, 17);
var countOfSplittedPartition1 = splitted[0].Count();
var countOfSplittedPartition2 = splitted[1].Count();
Console.WriteLine(countOfSplittedPartition1);
Console.WriteLine(countOfSplittedPartition2);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
// sum of splitted RDD should be the original RDD
Assert.AreEqual(500, countOfSplittedPartition1 + countOfSplittedPartition2);
}
}
[Sample]
@ -46,16 +63,32 @@ namespace Microsoft.Spark.CSharp.Samples
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 10), 2);
Console.WriteLine(rdd.TakeSample(true, 20, 1).Length);
Console.WriteLine(rdd.TakeSample(false, 5, 2).Length);
Console.WriteLine(rdd.TakeSample(false, 15, 3).Length);
var sample1 = rdd.TakeSample(true, 20, 1);
Console.WriteLine(sample1.Length);
var sample2 = rdd.TakeSample(false, 5, 2);
Console.WriteLine(sample2.Length);
var sample3 = rdd.TakeSample(false, 15, 3);
Console.WriteLine(sample3.Length);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(20, sample1.Length);
Assert.AreEqual(5, sample2.Length);
Assert.AreEqual(10, sample3.Length);
}
}
[Sample]
internal static void RDDUnionSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 1, 2, 3 }, 1);
Console.WriteLine(string.Join(",", rdd.Union(rdd).Collect()));
var union = rdd.Union(rdd).Collect();
Console.WriteLine(string.Join(",", union));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEqual(new[] { 1, 1, 2, 3, 1, 1, 2, 3 }, union);
}
}
[Sample]
@ -63,23 +96,50 @@ namespace Microsoft.Spark.CSharp.Samples
{
var rdd1 = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 10, 2, 3, 4, 5 }, 1);
var rdd2 = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 6, 2, 3, 7, 8 }, 1);
Console.WriteLine(string.Join(",", rdd1.Intersection(rdd2).Collect()));
var intersected = rdd1.Intersection(rdd2).Collect();
Console.WriteLine(string.Join(",", intersected));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 1, 2, 3 }, intersected);
}
}
[Sample]
internal static void RDDGlomSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 2);
foreach (var l in rdd.Glom().Collect())
var glom = rdd.Glom().Collect();
foreach (var l in glom)
Console.WriteLine(string.Join(",", l));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
// length of glom should be the partition number of rdd which was specified in SparkContext.Parallelize()
Assert.AreEqual(2, glom.Length);
CollectionAssert.AreEquivalent(new int[] { 1, 2, 3, 4 }, glom[0].Union(glom[1]));
}
}
[Sample]
internal static void RDDGroupBySample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 1, 2, 3, 5, 8 }, 1);
foreach (var kv in rdd.GroupBy(x => x % 2).Collect())
var groups = rdd.GroupBy(x => x % 2).Collect();
foreach (var kv in groups)
Console.WriteLine(kv.Key + ", " + string.Join(",", kv.Value));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, groups.Length);
foreach (var kv in groups)
{
// the group with key=1 is odd numbers
if (kv.Key == 1) CollectionAssert.AreEquivalent(new[] { 1, 1, 3, 5 }, kv.Value);
// the group with key=0 is even numbers
else if (kv.Key == 0) CollectionAssert.AreEquivalent(new[] { 2, 8 }, kv.Value);
}
}
}
[Sample]
@ -99,56 +159,111 @@ namespace Microsoft.Spark.CSharp.Samples
[Sample]
internal static void RDDReduceSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Reduce((x, y) => x + y));
var reduced = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Reduce((x, y) => x + y);
Console.WriteLine(reduced);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(15, reduced);
}
}
[Sample]
internal static void RDDTreeReduceSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[] { -5, -4, -3, -2, -1, 1, 2, 3, 4 }, 10).TreeReduce((x, y) => x + y));
var treeReduce = SparkCLRSamples.SparkContext.Parallelize(new int[] { -5, -4, -3, -2, -1, 1, 2, 3, 4 }, 10).TreeReduce((x, y) => x + y);
Console.WriteLine(treeReduce);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(-5, treeReduce);
}
}
[Sample]
internal static void RDDFoldSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Fold(0, (x, y) => x + y));
var fold = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Fold(0, (x, y) => x + y);
Console.WriteLine(fold);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(15, fold);
}
}
[Sample]
internal static void RDDAggregateSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1).Aggregate(0, (x, y) => x + y, (x, y) => x + y));
var aggregate = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1).Aggregate(0, (x, y) => x + y, (x, y) => x + y);
Console.WriteLine(aggregate);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(10, aggregate);
}
}
[Sample]
internal static void RDDTreeAggregateSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1).TreeAggregate(0, (x, y) => x + y, (x, y) => x + y));
var treeAggregate = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1).Aggregate(0, (x, y) => x + y, (x, y) => x + y);
Console.WriteLine(treeAggregate);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(10, treeAggregate);
}
}
[Sample]
internal static void RDDCountByValueSample()
{
foreach (var item in SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 1, 2, 2 }, 2).CountByValue())
var countByValue = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 1, 2, 2 }, 2).CountByValue();
foreach (var item in countByValue)
Console.WriteLine(item);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, countByValue[1]);
Assert.AreEqual(3, countByValue[2]);
}
}
[Sample]
internal static void RDDTakeSample()
{
Console.WriteLine(string.Join(",", SparkCLRSamples.SparkContext.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Cache().Take(2)));
var taked = SparkCLRSamples.SparkContext.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Cache().Take(2);
Console.WriteLine(string.Join(",", taked));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 2, 3 }, taked);
}
}
[Sample]
internal static void RDDFirstSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[] { 2, 3, 4 }, 2).First());
var first = SparkCLRSamples.SparkContext.Parallelize(new int[] { 2, 3, 4 }, 2).First();
Console.WriteLine(first);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, first);
}
}
[Sample]
internal static void RDDIsEmptySample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new int[0], 1).IsEmpty());
var isEmpty = SparkCLRSamples.SparkContext.Parallelize(new int[0], 1).IsEmpty();
Console.WriteLine(isEmpty);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(isEmpty);
}
}
[Sample]
@ -156,31 +271,62 @@ namespace Microsoft.Spark.CSharp.Samples
{
var x = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1);
var y = SparkCLRSamples.SparkContext.Parallelize(new int[] { 3 }, 1);
Console.WriteLine(string.Join(",", x.Subtract(y).Collect()));
var subtract = x.Subtract(y).Collect();
Console.WriteLine(string.Join(",", subtract));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 1, 2, 4 }, subtract);
}
}
[Sample]
internal static void RDDKeyBySample()
{
foreach (var kv in SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1).KeyBy(x => x * x).Collect())
var keyBy = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4 }, 1).KeyBy(x => x * x).Collect();
foreach (var kv in keyBy)
Console.Write(kv + " ");
Console.WriteLine();
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(keyBy.Contains(new KeyValuePair<int, int>(1, 1)));
Assert.IsTrue(keyBy.Contains(new KeyValuePair<int, int>(4, 2)));
Assert.IsTrue(keyBy.Contains(new KeyValuePair<int, int>(9, 3)));
Assert.IsTrue(keyBy.Contains(new KeyValuePair<int, int>(16, 4)));
}
}
[Sample]
internal static void RDDRepartitionSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4, 5, 6, 7 }, 4);
Console.WriteLine(rdd.Glom().Collect().Length);
Console.WriteLine(rdd.Repartition(2).Glom().Collect().Length);
var countBeforeRepartition = rdd.Glom().Collect().Length;
Console.WriteLine(countBeforeRepartition);
var countAfterRepartition = rdd.Repartition(2).Glom().Collect().Length;
Console.WriteLine(countAfterRepartition);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(4, countBeforeRepartition);
Assert.AreEqual(2, countAfterRepartition);
}
}
[Sample]
internal static void RDDCoalesceSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 3);
Console.WriteLine(rdd.Glom().Collect().Length);
Console.WriteLine(rdd.Coalesce(1).Glom().Collect().Length);
var countBeforeCoalesce = rdd.Glom().Collect().Length;
Console.WriteLine(countBeforeCoalesce);
var countAfterCoalesce = rdd.Coalesce(1).Glom().Collect().Length;
Console.WriteLine(countAfterCoalesce);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(3, countBeforeCoalesce);
Assert.AreEqual(1, countAfterCoalesce);
}
}
[Sample]
@ -188,29 +334,63 @@ namespace Microsoft.Spark.CSharp.Samples
{
var x = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 5), 1);
var y = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(1000, 5), 1);
foreach (var t in x.Zip(y).Collect())
var zip = x.Zip(y).Collect();
foreach (var t in zip)
Console.WriteLine(t);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
for (int i = 0; i < 5; i++)
{
Assert.IsTrue(zip.Contains(new KeyValuePair<int, int>(i, 1000 + i)));
}
}
}
[Sample]
internal static void RDDZipWithIndexSample()
{
foreach (var t in SparkCLRSamples.SparkContext.Parallelize(new string[] { "a", "b", "c", "d" }, 3).ZipWithIndex().Collect())
var zipWithIndex = SparkCLRSamples.SparkContext.Parallelize(new string[] { "a", "b", "c", "d" }, 3).ZipWithIndex().Collect();
foreach (var t in zipWithIndex)
Console.WriteLine(t);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(zipWithIndex.Contains(new KeyValuePair<string, long>("a", 0)));
Assert.IsTrue(zipWithIndex.Contains(new KeyValuePair<string, long>("b", 1)));
Assert.IsTrue(zipWithIndex.Contains(new KeyValuePair<string, long>("c", 2)));
Assert.IsTrue(zipWithIndex.Contains(new KeyValuePair<string, long>("d", 3)));
}
}
[Sample]
internal static void RDDZipWithUniqueIdSample()
{
foreach (var t in SparkCLRSamples.SparkContext.Parallelize(new string[] { "a", "b", "c", "d", "e" }, 3).ZipWithUniqueId().Collect())
var zipWithUniqueId = SparkCLRSamples.SparkContext.Parallelize(new string[] { "a", "b", "c", "d", "e" }, 3).ZipWithUniqueId().Collect();
foreach (var t in zipWithUniqueId)
Console.WriteLine(t);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(zipWithUniqueId.Contains(new KeyValuePair<string, long>("a", 0)));
Assert.IsTrue(zipWithUniqueId.Contains(new KeyValuePair<string, long>("b", 1)));
Assert.IsTrue(zipWithUniqueId.Contains(new KeyValuePair<string, long>("c", 4)));
Assert.IsTrue(zipWithUniqueId.Contains(new KeyValuePair<string, long>("d", 2)));
Assert.IsTrue(zipWithUniqueId.Contains(new KeyValuePair<string, long>("e", 5)));
}
}
[Sample]
internal static void RDDSetNameSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new string[] { "a", "b", "c", "d", "e" }, 3);
Console.WriteLine(rdd.SetName("SampleRDD").Name);
const string name = "SampleRDD";
var rdd = SparkCLRSamples.SparkContext.Parallelize(new string[] { "a", "b", "c", "d", "e" }, 3).SetName(name);
Console.WriteLine(rdd.Name);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(name, rdd.Name);
}
}
[Sample]
@ -223,7 +403,13 @@ namespace Microsoft.Spark.CSharp.Samples
[Sample]
internal static void RDDToLocalIteratorSample()
{
Console.WriteLine(string.Join(",", SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 10), 1).ToLocalIterator().ToArray()));
var localIteratorResult = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 10), 1).ToLocalIterator().ToArray();
Console.WriteLine(string.Join(",", localIteratorResult));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEqual(new[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, localIteratorResult);
}
}
[Sample]
@ -233,48 +419,91 @@ namespace Microsoft.Spark.CSharp.Samples
var path = Path.GetTempFileName();
File.Delete(path);
rdd.SaveAsTextFile(path);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(Directory.Exists(path));
}
}
[Sample]
internal static void RDDCartesianSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2 }, 1);
foreach (var t in rdd.Cartesian(rdd).Collect())
var cartesian = rdd.Cartesian(rdd).Collect();
foreach (var t in cartesian)
Console.WriteLine(t);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(cartesian.Contains(new Tuple<int, int>(1, 1)));
Assert.IsTrue(cartesian.Contains(new Tuple<int, int>(1, 2)));
Assert.IsTrue(cartesian.Contains(new Tuple<int, int>(2, 1)));
Assert.IsTrue(cartesian.Contains(new Tuple<int, int>(2, 2)));
}
}
[Sample]
internal static void RDDDistinctSample()
{
var m = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 1, 2, 3 }, 1).Distinct(1).Collect();
var distinct = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 1, 2, 3 }, 1).Distinct(1).Collect();
foreach (var v in m)
foreach (var v in distinct)
Console.Write(v + " ");
Console.WriteLine();
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 1, 2, 3 }, distinct);
}
}
[Sample]
internal static void RDDMaxSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new double[] { 1.0, 5.0, 43.0, 10.0 }, 2).Max());
var max = SparkCLRSamples.SparkContext.Parallelize(new double[] { 1.0, 5.0, 43.0, 10.0 }, 2).Max();
Console.WriteLine(max);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(43.0, max);
}
}
[Sample]
internal static void RDDMinSample()
{
Console.WriteLine(SparkCLRSamples.SparkContext.Parallelize(new double[] { 2.0, 5.0, 43.0, 10.0 }, 2).Min());
var min = SparkCLRSamples.SparkContext.Parallelize(new double[] { 2.0, 5.0, 43.0, 10.0 }, 2).Min();
Console.WriteLine(min);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2.0, min);
}
}
[Sample]
internal static void RDDTakeOrderedSample()
{
Console.WriteLine(string.Join(",", SparkCLRSamples.SparkContext.Parallelize(new int[] { 10, 1, 2, 9, 3, 4, 5, 6, 7 }, 2).TakeOrdered(6)));
var takeOrderd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 10, 1, 2, 9, 3, 4, 5, 6, 7 }, 2).TakeOrdered(6);
Console.WriteLine(string.Join(",", takeOrderd));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 1, 2, 3, 4, 5, 6 }, takeOrderd);
}
}
[Sample]
internal static void RDDTopSample()
{
Console.WriteLine(string.Join(",", SparkCLRSamples.SparkContext.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Top(3)));
var top = SparkCLRSamples.SparkContext.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Top(3);
Console.WriteLine(string.Join(",", top));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 6, 5, 4 }, top);
}
}
/// <summary>
@ -505,6 +734,14 @@ namespace Microsoft.Spark.CSharp.Samples
//var markets = filtered.reduce((left, right) => left + right);
var combinedRddCollectedItemCount = marketsByKey.PartitionBy(2).CombineByKey(() => "", (c, v) => c + v, (c1, c2) => c1 + c2, 2).Collect().Count();
Console.WriteLine("MarketExample: totalMarketsCount {0}, joinedRddCollectedItemCount {1}, filteredRddCollectedItemCount {2}, combinedRddCollectedItemCount {3}", totalMarketsCount, joinedRddCollectedItemCount, filteredRddCollectedItemCount, combinedRddCollectedItemCount);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(281, totalMarketsCount);
Assert.AreEqual(281, joinedRddCollectedItemCount);
Assert.AreEqual(102, filteredRddCollectedItemCount);
Assert.AreEqual(78, combinedRddCollectedItemCount);
}
}
}