Merge pull request #487 from xiongrenyi/DirectReceiver

add validation to tqin's for ReduceByKeyAndWindow
This commit is contained in:
Renyi Xiong 2016-07-02 08:46:56 -07:00 коммит произвёл GitHub
Родитель 4004001732 9515b44f26
Коммит 33b87b4b0f
2 изменённых файлов: 89 добавлений и 25 удалений

Просмотреть файл

@ -274,10 +274,9 @@ namespace Microsoft.Spark.CSharp.Streaming
// dstream to be transformed by substracting old RDDs and adding new RDDs based on the window
var reduced = self.ReduceByKey(reduceFunc, numPartitions);
reduced.Cache();
Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc = reduced.Piplinable ? (reduced as TransformedDStream<KeyValuePair<K, V>>).func : null;
var helper = new ReduceByKeyAndWindowHelper<K, V>(reduceFunc, invReduceFunc, numPartitions, filterFunc, prevFunc);
var helper = new ReduceByKeyAndWindowHelper<K, V>(reduceFunc, invReduceFunc, numPartitions, filterFunc);
// function to reduce the new values that entered the window (e.g., adding new counts)
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> reduceF = helper.Reduce;
@ -292,17 +291,17 @@ namespace Microsoft.Spark.CSharp.Streaming
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> invReduceF = helper.InvReduce;
invStream = new MemoryStream();
formatter.Serialize(stream, invReduceF);
formatter.Serialize(invStream, invReduceF);
}
return new DStream<KeyValuePair<K, V>>(
SparkCLREnvironment.SparkCLRProxy.StreamingContextProxy.CreateCSharpReducedWindowedDStream(
reduced.Piplinable ? reduced.prevDStreamProxy : reduced.DStreamProxy,
reduced.DStreamProxy,
stream.ToArray(),
invStream == null ? null : invStream.ToArray(),
windowSeconds,
slideSeconds,
(reduced.Piplinable ? reduced.prevSerializedMode : reduced.serializedMode).ToString()),
reduced.serializedMode.ToString()),
self.streamingContext
);
}
@ -611,34 +610,28 @@ namespace Microsoft.Spark.CSharp.Streaming
private readonly Func<V, V, V> invReduceFunc;
private readonly int numPartitions;
private readonly Func<KeyValuePair<K, V>, bool> filterFunc;
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
internal ReduceByKeyAndWindowHelper(Func<V, V, V> reduceF,
Func<V, V, V> invReduceF,
int numPartitions,
Func<KeyValuePair<K, V>, bool> filterF,
Func<double, RDD<dynamic>, RDD<dynamic>> prevF)
Func<KeyValuePair<K, V>, bool> filterF)
{
reduceFunc = reduceF;
invReduceFunc = invReduceF;
this.numPartitions = numPartitions;
filterFunc = filterF;
prevFunc = prevF;
}
internal RDD<dynamic> Reduce(double t, RDD<dynamic> a, RDD<dynamic> b)
{
if (prevFunc != null)
b = prevFunc(t, b);
var r = b.ConvertTo<KeyValuePair<K, V>>().ReduceByKey<K, V>(reduceFunc);
b.partitioner = new Partitioner(numPartitions, null);
var r = b.ConvertTo<KeyValuePair<K, V>>();
if (a != null)
{
if (prevFunc != null)
a = prevFunc(t, a);
r = a.ConvertTo<KeyValuePair<K, V>>().Union(r).ReduceByKey<K, V>(reduceFunc);
a.partitioner = b.partitioner;
r = a.ConvertTo<KeyValuePair<K, V>>().Union(r);
}
r = r.ReduceByKey<K, V>(reduceFunc, numPartitions);
if (filterFunc != null)
r.Filter(filterFunc);
return r.ConvertTo<dynamic>();
@ -646,13 +639,8 @@ namespace Microsoft.Spark.CSharp.Streaming
internal RDD<dynamic> InvReduce(double t, RDD<dynamic> a, RDD<dynamic> b)
{
if (prevFunc != null)
{
a = prevFunc(t, a);
b = prevFunc(t, b);
}
var rddb = b.ConvertTo<KeyValuePair<K, V>>().ReduceByKey<K, V>(reduceFunc);
a.partitioner = b.partitioner = new Partitioner(numPartitions, null);
var rddb = b.ConvertTo<KeyValuePair<K, V>>().ReduceByKey<K, V>(reduceFunc, numPartitions);
var rdda = a.ConvertTo<KeyValuePair<K, V>>();
var joined = rdda.Join<K, V, V>(rddb, numPartitions);
var r = joined.MapValues<K, Tuple<V, V>, V>(kv => kv.Item2 != null ? invReduceFunc(kv.Item1, kv.Item2) : kv.Item1);

Просмотреть файл

@ -215,6 +215,82 @@ namespace Microsoft.Spark.CSharp
ssc.Start();
ssc.AwaitTermination();
}
/// <summary>
/// when windowDuration not >= slideDuration * 5
/// DStreamReduceByKeyAndWindow does winodwed reduce once
/// </summary>
[Sample("experimental")]
internal static void DStreamReduceByKeyAndSmallWindowSample()
{
slideDuration = 6;
DStreamReduceByKeyAndWindowSample();
}
/// <summary>
/// when windowDuration >= slideDuration * 5
/// DStreamReduceByKeyAndWindow reduces twice based on previousRDD
/// by first invReduce on old RDDs and then reduce on new RDDs
/// </summary>
[Sample("experimental")]
internal static void DStreamReduceByKeyAndLargeWindowSample()
{
slideDuration = 4;
DStreamReduceByKeyAndWindowSample();
}
private static int slideDuration;
private static void DStreamReduceByKeyAndWindowSample()
{
count = 0;
const int bacthInterval = 2;
const int windowDuration = 26;
const int numPartitions = 2;
var sc = SparkCLRSamples.SparkContext;
var ssc = new StreamingContext(sc, bacthInterval);
// create the RDD
var seedRDD = sc.Parallelize(Enumerable.Range(0, 100), numPartitions);
var numbers = new ConstantInputDStream<int>(seedRDD, ssc);
var pairs = numbers.Map(n => new KeyValuePair<int, int>(n % numPartitions, n));
var reduced = pairs.ReduceByKeyAndWindow(
(int x, int y) => (x + y),
(int x, int y) => (x - y),
windowDuration,
slideDuration,
numPartitions
);
reduced.ForeachRDD((time, rdd) =>
{
count++;
var taken = rdd.Collect();
int partitions = rdd.GetNumPartitions();
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Time: {0}", time);
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Batch: " + count);
Console.WriteLine("Count: " + taken.Length);
Console.WriteLine("Partitions: " + partitions);
Assert.AreEqual(taken.Length, 2);
Assert.AreEqual(partitions, numPartitions);
foreach (object record in taken)
{
KeyValuePair<int, int> sum = (KeyValuePair<int, int>)record;
Console.WriteLine("Key: {0}, Value: {1}", sum.Key, sum.Value);
// when batch count reaches window size, sum of even/odd number stay at windowDuration / slideDuration * (2450, 2500) respectively
Assert.AreEqual(sum.Value, (count > windowDuration / slideDuration ? windowDuration : count * slideDuration) / bacthInterval * (sum.Key == 0 ? 2450 : 2500));
}
});
ssc.Start();
ssc.AwaitTermination();
}
}