Merge pull request #90 from microsoft/FixPartitionedStartEdgeEquiJoinPipe

Fix PartitionedStartEdgeEquiJoinPipe
This commit is contained in:
Peter Freiling 2019-05-17 14:27:42 -07:00 коммит произвёл GitHub
Родитель 5375d45960 2bea1212c2
Коммит 207593f9e1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 85 добавлений и 10 удалений

Просмотреть файл

@ -274,6 +274,8 @@ namespace Microsoft.StreamProcessing
{
OutputPunctuation(leftEntry.Sync, ref leftEntry.Key, leftEntry.Hash);
}
leftWorking.Dequeue();
}
else
{
@ -304,6 +306,8 @@ namespace Microsoft.StreamProcessing
{
OutputPunctuation(rightEntry.Sync, ref rightEntry.Key, rightEntry.Hash);
}
rightWorking.Dequeue();
}
}
else if (hasLeftBatch)
@ -338,6 +342,8 @@ namespace Microsoft.StreamProcessing
OutputPunctuation(leftEntry.Sync, ref leftEntry.Key, leftEntry.Hash);
return;
}
leftWorking.Dequeue();
}
else if (hasRightBatch)
{
@ -370,6 +376,8 @@ namespace Microsoft.StreamProcessing
{
OutputPunctuation(rightEntry.Sync, ref rightEntry.Key, rightEntry.Hash);
}
rightWorking.Dequeue();
}
else
{
@ -390,17 +398,13 @@ namespace Microsoft.StreamProcessing
this.emitCTI = false;
foreach (var p in this.cleanKeys)
{
this.seenKeys.Remove(p);
this.leftQueue.Lookup(p, out int index);
var l = this.leftQueue.entries[index];
var r = this.rightQueue.entries[index];
if (l.value.Count == 0 && r.value.Count == 0)
{
this.seenKeys.Remove(p);
l.value.Dispose();
this.leftQueue.Remove(p);
r.value.Dispose();
this.rightQueue.Remove(p);
}
this.leftQueue.entries[index].value.Dispose();
this.leftQueue.Remove(p);
this.rightQueue.entries[index].value.Dispose();
this.rightQueue.Remove(p);
}
this.cleanKeys.Clear();
@ -431,6 +435,11 @@ namespace Microsoft.StreamProcessing
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OutputStartEdge(long start, ref TKey key, ref TLeft leftPayload, ref TRight rightPayload, int hash)
{
if (start < this.lastCTI)
{
throw new StreamProcessingOutOfOrderException("Outputting an event out of order!");
}
int index = this.output.Count++;
this.output.vsync.col[index] = start;
this.output.vother.col[index] = StreamEvent.InfinitySyncTime;
@ -444,6 +453,11 @@ namespace Microsoft.StreamProcessing
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OutputPunctuation(long start, ref TKey key, int hash)
{
if (start < this.lastCTI)
{
throw new StreamProcessingOutOfOrderException("Outputting an event out of order!");
}
int index = this.output.Count++;
this.output.vsync.col[index] = start;
this.output.vother.col[index] = long.MinValue;

Просмотреть файл

@ -2,6 +2,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Microsoft.StreamProcessing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@ -16,6 +19,7 @@ namespace SimpleTesting
JoinIntervalsTest();
JoinEdgeIntervalTest();
JoinEdgesTest();
PartitionedStartEdgeJoinTest();
}
private static void JoinIntervalsTest()
@ -154,5 +158,62 @@ namespace SimpleTesting
Assert.IsTrue(outputStream.IsEquivalentTo(correct));
}
private static void PartitionedStartEdgeJoinTest()
{
var input1 = new[]
{
PartitionedStreamEvent.CreateStart("Partition1", 100, "A1"),
PartitionedStreamEvent.CreateStart("Partition1", 101, "B1"),
PartitionedStreamEvent.CreateStart("Partition1", 102, "C1"),
PartitionedStreamEvent.CreateStart("Partition2", 100, "A1"),
PartitionedStreamEvent.CreateStart("Partition2", 101, "B1"),
PartitionedStreamEvent.CreateStart("Partition2", 102, "C1"),
};
var input2 = new[]
{
PartitionedStreamEvent.CreateStart("Partition1", 105, "A2"),
PartitionedStreamEvent.CreateStart("Partition1", 106, "B2"),
PartitionedStreamEvent.CreateStart("Partition1", 107, "D2"),
PartitionedStreamEvent.CreateStart("Partition2", 108, "A2"),
PartitionedStreamEvent.CreateStart("Partition2", 109, "D2"),
PartitionedStreamEvent.CreateStart("Partition2", 110, "C2"),
};
// Set properties to start-edge only
var inputStream1 = input1.ToObservable().ToStreamable();
inputStream1.Properties.IsConstantDuration = true;
inputStream1.Properties.ConstantDurationLength = StreamEvent.InfinitySyncTime;
var inputStream2 = input2.ToObservable().ToStreamable();
inputStream2.Properties.IsConstantDuration = true;
inputStream2.Properties.ConstantDurationLength = StreamEvent.InfinitySyncTime;
var output = new List<PartitionedStreamEvent<string, string>>();
inputStream1
.Join(
inputStream2,
l => (l != null ? l[0].ToString() : null),
r => (r != null ? r[0].ToString() : null),
(l, r) => l + "," + r)
.ToStreamEventObservable()
.ForEachAsync(e => output.Add(e))
.Wait();
var correct = new[]
{
PartitionedStreamEvent.CreateStart("Partition1", 105, "A1,A2"),
PartitionedStreamEvent.CreateStart("Partition1", 106, "B1,B2"),
PartitionedStreamEvent.CreateStart("Partition2", 108, "A1,A2"),
PartitionedStreamEvent.CreateStart("Partition2", 110, "C1,C2"),
PartitionedStreamEvent.CreateLowWatermark<string, string>(StreamEvent.InfinitySyncTime)
};
Assert.IsTrue(output.SequenceEqual(correct));
}
}
}