From dc69f562661d69ec72c6ef9e6827db45922a108a Mon Sep 17 00:00:00 2001 From: Peter <53205062+peterfreiling@users.noreply.github.com> Date: Wed, 21 Oct 2020 12:43:35 -0700 Subject: [PATCH] Fixing bug that dropped punctuations for Grouped Afa SingleEvent pipe when IsSyncTimeSimultaneityFree is true (#142) --- .../Afa/CompiledGroupedAfaPipe_SingleEvent.cs | 4 ++-- .../SimpleTesting/Streamables/AfaTests.cs | 20 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs index 60a917f..84b2da3 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs @@ -320,10 +320,10 @@ namespace Microsoft.StreamProcessing if (this.IsDeterministic) break; // We are guaranteed to have only one start state } } - else if (batch.vother.col[i] < 0 && !this.IsSyncTimeSimultaneityFree) + else if (batch.vother.col[i] < 0) { long synctime = src_vsync[i]; - if (synctime > this.lastSyncTime) // move time forward + if (!this.IsSyncTimeSimultaneityFree && synctime > this.lastSyncTime) // move time forward { this.seenEvent.Clear(); diff --git a/Sources/Test/SimpleTesting/Streamables/AfaTests.cs b/Sources/Test/SimpleTesting/Streamables/AfaTests.cs index 00eec75..50fa132 100644 --- a/Sources/Test/SimpleTesting/Streamables/AfaTests.cs +++ b/Sources/Test/SimpleTesting/Streamables/AfaTests.cs @@ -844,16 +844,21 @@ namespace SimpleTesting { var source = new StreamEvent>[] { - StreamEvent.CreateStart(0, new Tuple("A", 1)), + StreamEvent.CreateStart(0, new Tuple("A", 1)), StreamEvent.CreateStart(1, new Tuple("A", 2)), StreamEvent.CreateStart(1, new Tuple("B", 2)), - StreamEvent.CreateStart(3, new Tuple("A", 1)), - StreamEvent.CreateStart(4, new Tuple("B", 1)), + StreamEvent.CreateStart(3, new Tuple("A", 1)), + + StreamEvent.CreatePunctuation>(4), + + StreamEvent.CreateStart(4, new Tuple("B", 1)), StreamEvent.CreateStart(4, new Tuple("B", 2)), - StreamEvent.CreateStart(5, new Tuple("B", 1)), - StreamEvent.CreateStart(5, new Tuple("C", 1)), + StreamEvent.CreateStart(5, new Tuple("B", 1)), + StreamEvent.CreateStart(5, new Tuple("C", 1)), StreamEvent.CreateStart(6, new Tuple("B", 2)), StreamEvent.CreateStart(7, new Tuple("A", 2)), + + StreamEvent.CreatePunctuation>(7), }.ToObservable() .ToStreamable() .AlterEventDuration(10); @@ -881,18 +886,21 @@ namespace SimpleTesting var result = afa_compiled .ToStreamEventObservable() - .Where(evt => evt.IsData) .ToEnumerable() .ToArray(); var expected = new StreamEvent>[] { StreamEvent.CreateInterval(1, 11, new Tuple("AB", 2)), + StreamEvent.CreatePunctuation>(4), StreamEvent.CreateInterval(4, 13, new Tuple("AB", 1)), StreamEvent.CreateInterval(4, 10, new Tuple("AAB", 1)), StreamEvent.CreateInterval(4, 11, new Tuple("ABB", 2)), StreamEvent.CreateInterval(5, 13, new Tuple("ABB", 1)), StreamEvent.CreateInterval(5, 10, new Tuple("AABB", 1)), StreamEvent.CreateInterval(6, 11, new Tuple("ABBB", 2)), + StreamEvent.CreatePunctuation>(7), + + StreamEvent.CreatePunctuation>(StreamEvent.InfinitySyncTime), }; Assert.IsTrue(result.SequenceEqual(expected)); }