This commit is contained in:
tawan0109 2016-08-12 11:30:41 +08:00
Родитель a0c72f5fa1
Коммит 61fef193d3
4 изменённых файлов: 96 добавлений и 7 удалений

Просмотреть файл

@ -46,7 +46,7 @@ namespace Microsoft.Spark.CSharp.Core
var i = elementCount;
if (i >= queue.Length)
{
if (GE(queue[0], e)) // compare it with root of the heap
if (GT(queue[0], e)) // compare it with root of the heap
{
queue[0] = e;
SiftDownHeapRoot();
@ -77,7 +77,7 @@ namespace Microsoft.Spark.CSharp.Core
var child = (k << 1) + 1;
var c = queue[child];
var right = child + 1;
if (right < elementCount && GE(queue[right], c))
if (right < elementCount && GT(queue[right], c))
{
c = queue[child = right];
}
@ -112,10 +112,16 @@ namespace Microsoft.Spark.CSharp.Core
queue[k] = x;
}
// helper method for comparision
private bool GT(T a, T b)
{
return comparer.Compare(a, b) > 0;
}
// great or equal, helper method for comparision
private bool GE(T a, T b)
{
return comparer.Compare(a, b) > 0;
return comparer.Compare(a, b) >= 0;
}
public IEnumerator<T> GetEnumerator()

Просмотреть файл

@ -1157,8 +1157,8 @@ namespace Microsoft.Spark.CSharp.Core
internal static T[] TakeOrdered<T>(this RDD<T> self, int num, bool ascending, Func<T, dynamic> keyFunc = null) where T : IComparable<T>
{
var helper = new TakeOrderedHelper<T>(num, keyFunc, ascending);
return self.MapPartitionsWithIndex(helper.Execute)
.Reduce(helper.Execute2)
return self.MapPartitionsWithIndex(helper.TakeOrderedInPartition)
.Reduce(helper.MergeTwoPriorityQueues)
.OrderBy(x => keyFunc == null ? x : keyFunc(x))
.ToArray();
}
@ -1483,7 +1483,8 @@ namespace Microsoft.Spark.CSharp.Core
this.keyFunc = keyFunc;
this.ascending = ascending;
}
internal IEnumerable<PriorityQueue<T>> Execute(int pid, IEnumerable<T> input)
internal IEnumerable<PriorityQueue<T>> TakeOrderedInPartition(int pid, IEnumerable<T> input)
{
Comparer<T> comparer;
@ -1519,7 +1520,7 @@ namespace Microsoft.Spark.CSharp.Core
return new[] { priorityQueue };
}
internal PriorityQueue<T> Execute2(PriorityQueue<T> queue1, PriorityQueue<T> queue2)
internal PriorityQueue<T> MergeTwoPriorityQueues(PriorityQueue<T> queue1, PriorityQueue<T> queue2)
{
foreach (var e in queue1)
{

Просмотреть файл

@ -84,6 +84,7 @@
<Compile Include="Mocks\MockRDDCollector.cs" />
<Compile Include="Mocks\MockRow.cs" />
<Compile Include="PayloadHelperTest.cs" />
<Compile Include="PriorityQueueTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RowTest.cs" />
<Compile Include="SocketStreamTest.cs" />

Просмотреть файл

@ -0,0 +1,81 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.CSharp.Core;
using NUnit.Framework;
namespace AdapterTest
{
/// <summary>
/// Validates functionality of PriorityQueue
/// </summary>
[TestFixture]
class PriorityQueueTest
{
internal List<int> GetRandomList(int size)
{
var rand = new Random(DateTime.Now.Millisecond);
return Enumerable.Range(0, size).Select(i => rand.Next()).ToList();
}
[Test]
public void Test()
{
// build a random list
var randoms = GetRandomList(60);
// create a priority queue
var size = 6;
var queue = new PriorityQueue<int>(size, Comparer<int>.Create((x, y) => x - y));
// feed numbers to queue
randoms.ForEach(queue.Offer);
randoms.Sort();
// verify
var expected = randoms.Take(size);
Assert.AreEqual(expected.ToArray(), queue.OrderBy(x => x).ToArray());
}
[Test]
public void TestDuplication()
{
var numbers = new [] { -1, -1, -1, 1, 1, 2, 3, 9, 20, 15, 11 }.ToList();
var queue = new PriorityQueue<int>(1, Comparer<int>.Create((x, y) => x - y));
numbers.ForEach(queue.Offer);
Assert.AreEqual(-1, queue.ToArray()[0]);
queue = new PriorityQueue<int>(3, Comparer<int>.Create((x, y) => x - y));
numbers.ForEach(queue.Offer);
queue.ToList().ForEach(x => Assert.AreEqual(-1, x));
queue = new PriorityQueue<int>(4, Comparer<int>.Create((x, y) => x - y));
numbers.ForEach(queue.Offer);
Assert.AreEqual(1, queue.OrderBy(x => x).Last());
queue = new PriorityQueue<int>(100, Comparer<int>.Create((x, y) => x - y));
numbers.ForEach(queue.Offer);
Assert.AreEqual(20, queue.OrderBy(x => x).Last());
}
[Test]
public void TestReverseOrder()
{
var randoms = GetRandomList(100);
var queue = new PriorityQueue<int>(3, Comparer<int>.Create((x, y) => y - x));
randoms.ForEach(queue.Offer);
var expected = randoms.OrderByDescending(x => x).Take(3);
Assert.AreEqual(expected.ToArray(), queue.OrderByDescending(x => x).ToArray());
}
[Test]
public void TestNull()
{
Assert.Throws<NullReferenceException>(() => new PriorityQueue<string>(5, Comparer<string>.Create((x, y) => String.Compare(x, y, StringComparison.Ordinal))).Offer(null));
}
}
}