Merge pull request #382 from jayjaywg/TakeOrdered

Add 'keyFunc' parameter in RDD.TakeOrdered()
This commit is contained in:
guwang 2016-04-13 09:17:03 +08:00
Родитель 6fb9c42851 fbe265462f
Коммит ea6597a69c
4 изменённых файлов: 31 добавлений и 5 удалений

Просмотреть файл

@ -1147,10 +1147,12 @@ namespace Microsoft.Spark.CSharp.Core
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="self"></param> /// <param name="self"></param>
/// <param name="num"></param> /// <param name="num"></param>
/// <param name="keyFunc"></param>
/// <returns></returns> /// <returns></returns>
public static T[] TakeOrdered<T>(this RDD<T> self, int num) where T : IComparable<T> public static T[] TakeOrdered<T>(this RDD<T> self, int num, Func<T, dynamic> keyFunc = null) where T : IComparable<T>
{ {
return self.MapPartitionsWithIndex<T>(new TakeOrderedHelper<T>(num).Execute).Collect().OrderBy(x => x).Take(num).ToArray(); return self.MapPartitionsWithIndex<T>(new TakeOrderedHelper<T>(num, keyFunc).Execute).Collect()
.OrderBy(x => keyFunc == null ? x : keyFunc(x)).Take(num).ToArray();
} }
/// <summary> /// <summary>
@ -1464,13 +1466,15 @@ namespace Microsoft.Spark.CSharp.Core
internal class TakeOrderedHelper<T> internal class TakeOrderedHelper<T>
{ {
private readonly int num; private readonly int num;
internal TakeOrderedHelper(int num) private readonly Func<T, dynamic> keyFunc;
internal TakeOrderedHelper(int num, Func<T, dynamic> keyFunc)
{ {
this.num = num; this.num = num;
this.keyFunc = keyFunc;
} }
internal IEnumerable<T> Execute(int pid, IEnumerable<T> input) internal IEnumerable<T> Execute(int pid, IEnumerable<T> input)
{ {
return input.OrderBy(x => x).Take(num); return input.OrderBy(x => keyFunc == null ? x : keyFunc(x)).Take(num);
} }
} }
[Serializable] [Serializable]

Просмотреть файл

@ -1,4 +1,7 @@
using Microsoft.Spark.CSharp.Core; // Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy; using Microsoft.Spark.CSharp.Proxy;
namespace Microsoft.Spark.CSharp.Sql namespace Microsoft.Spark.CSharp.Sql

Просмотреть файл

@ -4,6 +4,7 @@ using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core; using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc; using Microsoft.Spark.CSharp.Interop.Ipc;
using NUnit.Framework; using NUnit.Framework;
using System.Linq;
namespace AdapterTest namespace AdapterTest
{ {
@ -39,6 +40,12 @@ namespace AdapterTest
Assert.AreEqual(2, taken.Length); Assert.AreEqual(2, taken.Length);
Assert.AreEqual("brown", taken[0]); Assert.AreEqual("brown", taken[0]);
Assert.AreEqual("dog", taken[1]); Assert.AreEqual("dog", taken[1]);
taken = words.Distinct().TakeOrdered(2, x => new string(x.ToCharArray().Reverse().ToArray()));
Array.Sort(taken, StringComparer.Ordinal);
Assert.AreEqual(2, taken.Length);
Assert.AreEqual("The", taken[0]);
Assert.AreEqual("the", taken[1]);
} }
[Test] [Test]

Просмотреть файл

@ -496,6 +496,18 @@ namespace Microsoft.Spark.CSharp.Samples
} }
} }
[Sample]
internal static void RDDTakeOrderedSample2()
{
var takeOrderd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 10, 1, 2, 9, 3, 4, 5, 6, 7 }, 2).TakeOrdered(6, x => -x);
Console.WriteLine(string.Join(",", takeOrderd));
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEquivalent(new[] { 10, 9, 7, 6, 5, 4 }, takeOrderd);
}
}
[Sample] [Sample]
internal static void RDDTopSample() internal static void RDDTopSample()
{ {