This commit is contained in:
dwnichols 2016-12-13 16:24:59 -05:00
Родитель 66fc5123d0
Коммит a36b16009e
6 изменённых файлов: 26 добавлений и 26 удалений

Просмотреть файл

@ -18,7 +18,7 @@ namespace Microsoft.Spark.CSharp.Core
{ {
/// <summary> /// <summary>
/// Sorts this RDD, which is assumed to consist of KeyValuePair pairs. /// Sorts this RDD, which is assumed to consist of Tuple pairs.
/// </summary> /// </summary>
/// <typeparam name="K"></typeparam> /// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam> /// <typeparam name="V"></typeparam>
@ -32,7 +32,7 @@ namespace Microsoft.Spark.CSharp.Core
return SortByKey<K, V, K>(self, ascending, numPartitions, new DefaultSortKeyFuncHelper<K>().Execute); return SortByKey<K, V, K>(self, ascending, numPartitions, new DefaultSortKeyFuncHelper<K>().Execute);
} }
/// <summary> /// <summary>
/// Sorts this RDD, which is assumed to consist of KeyValuePairs. If key is type of string, case is sensitive. /// Sorts this RDD, which is assumed to consist of Tuples. If Item1 is type of string, case is sensitive.
/// </summary> /// </summary>
/// <typeparam name="K"></typeparam> /// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam> /// <typeparam name="V"></typeparam>
@ -40,7 +40,7 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="self"></param> /// <param name="self"></param>
/// <param name="ascending"></param> /// <param name="ascending"></param>
/// <param name="numPartitions">Number of partitions. Each partition of the sorted RDD contains a sorted range of the elements.</param> /// <param name="numPartitions">Number of partitions. Each partition of the sorted RDD contains a sorted range of the elements.</param>
/// <param name="keyFunc">RDD will sort by keyFunc(key) for every key in KeyValuePair. Must not be null.</param> /// <param name="keyFunc">RDD will sort by keyFunc(key) for every Item1 in Tuple. Must not be null.</param>
/// <returns></returns> /// <returns></returns>
public static RDD<Tuple<K, V>> SortByKey<K, V, U>(this RDD<Tuple<K, V>> self, public static RDD<Tuple<K, V>> SortByKey<K, V, U>(this RDD<Tuple<K, V>> self,
bool ascending, int? numPartitions, Func<K, U> keyFunc) bool ascending, int? numPartitions, Func<K, U> keyFunc)
@ -103,13 +103,13 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="partitionFunc"></param> /// <param name="partitionFunc"></param>
/// <param name="ascending"></param> /// <param name="ascending"></param>
/// <returns></returns> /// <returns></returns>
public static RDD<KeyValuePair<K, V>> repartitionAndSortWithinPartitions<K, V>( public static RDD<Tuple<K, V>> repartitionAndSortWithinPartitions<K, V>(
this RDD<KeyValuePair<K, V>> self, this RDD<Tuple<K, V>> self,
int? numPartitions = null, int? numPartitions = null,
Func<K, int> partitionFunc = null, Func<K, int> partitionFunc = null,
bool ascending = true) bool ascending = true)
{ {
return self.MapPartitionsWithIndex<KeyValuePair<K, V>>((pid, iter) => ascending ? iter.OrderBy(kv => kv.Key) : iter.OrderByDescending(kv => kv.Key)); return self.MapPartitionsWithIndex<Tuple<K, V>>((pid, iter) => ascending ? iter.OrderBy(kv => kv.Item1) : iter.OrderByDescending(kv => kv.Item1));
} }
[Serializable] [Serializable]

Просмотреть файл

@ -241,7 +241,7 @@ namespace Microsoft.Spark.CSharp.Core
/// ///
/// Do /// Do
/// {{{ /// {{{
/// RDD&lt;KeyValuePair&lt;string, string>> rdd = sparkContext.WholeTextFiles("hdfs://a-hdfs-path") /// RDD&lt;Tuple&lt;string, string>> rdd = sparkContext.WholeTextFiles("hdfs://a-hdfs-path")
/// }}} /// }}}
/// ///
/// then `rdd` contains /// then `rdd` contains
@ -259,9 +259,9 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="filePath"></param> /// <param name="filePath"></param>
/// <param name="minPartitions"></param> /// <param name="minPartitions"></param>
/// <returns></returns> /// <returns></returns>
public RDD<KeyValuePair<byte[], byte[]>> WholeTextFiles(string filePath, int? minPartitions = null) public RDD<Tuple<byte[], byte[]>> WholeTextFiles(string filePath, int? minPartitions = null)
{ {
return new RDD<KeyValuePair<byte[], byte[]>>(SparkContextProxy.WholeTextFiles(filePath, minPartitions ?? DefaultMinPartitions), this, SerializedMode.Pair); return new RDD<Tuple<byte[], byte[]>>(SparkContextProxy.WholeTextFiles(filePath, minPartitions ?? DefaultMinPartitions), this, SerializedMode.Pair);
} }
/// <summary> /// <summary>
@ -279,7 +279,7 @@ namespace Microsoft.Spark.CSharp.Core
/// }}} /// }}}
/// ///
/// Do /// Do
/// RDD&lt;KeyValuePair&lt;string, byte[]>>"/> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, /// RDD&lt;Tuple&lt;string, byte[]>>"/> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
/// ///
/// then `rdd` contains /// then `rdd` contains
/// {{{ /// {{{
@ -296,9 +296,9 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="filePath"></param> /// <param name="filePath"></param>
/// <param name="minPartitions"></param> /// <param name="minPartitions"></param>
/// <returns></returns> /// <returns></returns>
public RDD<KeyValuePair<byte[], byte[]>> BinaryFiles(string filePath, int? minPartitions) public RDD<Tuple<byte[], byte[]>> BinaryFiles(string filePath, int? minPartitions)
{ {
return new RDD<KeyValuePair<byte[], byte[]>>(SparkContextProxy.BinaryFiles(filePath, minPartitions ?? DefaultMinPartitions), this, SerializedMode.Pair); return new RDD<Tuple<byte[], byte[]>>(SparkContextProxy.BinaryFiles(filePath, minPartitions ?? DefaultMinPartitions), this, SerializedMode.Pair);
} }
/// <summary> /// <summary>

Просмотреть файл

@ -372,7 +372,7 @@ namespace AdapterTest
SparkContext sc = new SparkContext(sparkContextProxy.Object, null); SparkContext sc = new SparkContext(sparkContextProxy.Object, null);
// Act // Act
RDD<KeyValuePair<byte[], byte[]>> rdd = sc.WholeTextFiles(filePath, null); RDD<Tuple<byte[], byte[]>> rdd = sc.WholeTextFiles(filePath, null);
// Assert // Assert
Assert.IsNotNull(rdd); Assert.IsNotNull(rdd);
@ -394,7 +394,7 @@ namespace AdapterTest
SparkContext sc = new SparkContext(sparkContextProxy.Object, null); SparkContext sc = new SparkContext(sparkContextProxy.Object, null);
// Act // Act
RDD<KeyValuePair<byte[], byte[]>> rdd = sc.BinaryFiles(filePath, null); RDD<Tuple<byte[], byte[]>> rdd = sc.BinaryFiles(filePath, null);
// Assert // Assert
Assert.IsNotNull(rdd); Assert.IsNotNull(rdd);

Просмотреть файл

@ -99,8 +99,8 @@ namespace Microsoft.Spark.CSharp
{ {
Console.WriteLine(record); Console.WriteLine(record);
var countByWord = (KeyValuePair<string, int>)record; var countByWord = (Tuple<string, int>)record;
Assert.AreEqual(countByWord.Value, countByWord.Key == "The" || countByWord.Key == "lazy" || countByWord.Key == "dog" ? 92 : 88); Assert.AreEqual(countByWord.Item2, countByWord.Item1 == "The" || countByWord.Item1 == "lazy" || countByWord.Item1 == "dog" ? 92 : 88);
} }
Console.WriteLine(); Console.WriteLine();
@ -283,10 +283,10 @@ namespace Microsoft.Spark.CSharp
foreach (object record in taken) foreach (object record in taken)
{ {
KeyValuePair<int, int> sum = (KeyValuePair<int, int>)record; Tuple<int, int> sum = (Tuple<int, int>)record;
Console.WriteLine("Key: {0}, Value: {1}", sum.Key, sum.Value); Console.WriteLine("Key: {0}, Value: {1}", sum.Item1, sum.Item2);
// when batch count reaches window size, sum of even/odd number stay at windowDuration / slideDuration * (2450, 2500) respectively // when batch count reaches window size, sum of even/odd number stay at windowDuration / slideDuration * (2450, 2500) respectively
Assert.AreEqual(sum.Value, (count > windowDuration / slideDuration ? windowDuration : count * slideDuration) / (bacthIntervalMs / 1000) * (sum.Key == 0 ? 2450 : 2500)); Assert.AreEqual(sum.Item2, (count > windowDuration / slideDuration ? windowDuration : count * slideDuration) / (bacthIntervalMs / 1000) * (sum.Item1 == 0 ? 2450 : 2500));
} }
}); });

Просмотреть файл

@ -533,7 +533,7 @@ namespace Microsoft.Spark.CSharp
.GetField("value", BindingFlags.NonPublic | BindingFlags.Instance) .GetField("value", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(item.Value); .GetValue(item.Value);
logger.LogDebug("({0}, {1})", item.Key, value); logger.LogDebug("({0}, {1})", item.Key, value);
formatter.Serialize(ms, new KeyValuePair<int, dynamic>(item.Key, value)); formatter.Serialize(ms, new Tuple<int, dynamic>(item.Key, value));
byte[] buffer = ms.ToArray(); byte[] buffer = ms.ToArray();
SerDe.Write(networkStream, buffer.Length); SerDe.Write(networkStream, buffer.Length);
SerDe.Write(networkStream, buffer); SerDe.Write(networkStream, buffer);
@ -649,7 +649,7 @@ namespace Microsoft.Spark.CSharp
} }
watch.Stop(); watch.Stop();
yield return new KeyValuePair<byte[], byte[]>(pairKey, pairValue); yield return new Tuple<byte[], byte[]>(pairKey, pairValue);
break; break;
} }

Просмотреть файл

@ -573,7 +573,7 @@ namespace WorkerTest
{ {
WritePayloadHeaderToWorker(s); WritePayloadHeaderToWorker(s);
byte[] command = SparkContext.BuildCommand( byte[] command = SparkContext.BuildCommand(
new CSharpWorkerFunc((pid, iter) => iter.Cast<KeyValuePair<byte[], byte[]>>().Select(pair => pair.Key)), new CSharpWorkerFunc((pid, iter) => iter.Cast<Tuple<byte[], byte[]>>().Select(pair => pair.Item1)),
SerializedMode.Pair, SerializedMode.None); SerializedMode.Pair, SerializedMode.None);
SerDe.Write(s, command.Length); SerDe.Write(s, command.Length);
@ -713,7 +713,7 @@ namespace WorkerTest
/// <summary> /// <summary>
/// read accumulator /// read accumulator
/// </summary> /// </summary>
private IEnumerable<KeyValuePair<int, dynamic>> ReadAccumulator(Stream s, int expectedCount = 0) private IEnumerable<Tuple<int, dynamic>> ReadAccumulator(Stream s, int expectedCount = 0)
{ {
int count = 0; int count = 0;
var formatter = new BinaryFormatter(); var formatter = new BinaryFormatter();
@ -723,7 +723,7 @@ namespace WorkerTest
if (length > 0) if (length > 0)
{ {
var ms = new MemoryStream(SerDe.ReadBytes(s, length)); var ms = new MemoryStream(SerDe.ReadBytes(s, length));
yield return (KeyValuePair<int, dynamic>)formatter.Deserialize(ms); yield return (Tuple<int, dynamic>)formatter.Deserialize(ms);
if (expectedCount > 0 && ++count >= expectedCount) if (expectedCount > 0 && ++count >= expectedCount)
{ {
@ -780,8 +780,8 @@ namespace WorkerTest
int accumulatorsCount = SerDe.ReadInt(s); int accumulatorsCount = SerDe.ReadInt(s);
Assert.IsTrue(accumulatorsCount == 1); Assert.IsTrue(accumulatorsCount == 1);
var accumulatorFromWorker = ReadAccumulator(s, accumulatorsCount).First(); var accumulatorFromWorker = ReadAccumulator(s, accumulatorsCount).First();
Assert.AreEqual(accumulatorId, accumulatorFromWorker.Key); Assert.AreEqual(accumulatorId, accumulatorFromWorker.Item1);
Assert.AreEqual(expectedCount, accumulatorFromWorker.Value); Assert.AreEqual(expectedCount, accumulatorFromWorker.Item2);
SerDe.ReadInt(s); SerDe.ReadInt(s);
} }