diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs index 60a917f..84b2da3 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_SingleEvent.cs @@ -320,10 +320,10 @@ namespace Microsoft.StreamProcessing if (this.IsDeterministic) break; // We are guaranteed to have only one start state } } - else if (batch.vother.col[i] < 0 && !this.IsSyncTimeSimultaneityFree) + else if (batch.vother.col[i] < 0) { long synctime = src_vsync[i]; - if (synctime > this.lastSyncTime) // move time forward + if (!this.IsSyncTimeSimultaneityFree && synctime > this.lastSyncTime) // move time forward { this.seenEvent.Clear(); diff --git a/Sources/Test/SimpleTesting/Streamables/AfaTests.cs b/Sources/Test/SimpleTesting/Streamables/AfaTests.cs index 00eec75..50fa132 100644 --- a/Sources/Test/SimpleTesting/Streamables/AfaTests.cs +++ b/Sources/Test/SimpleTesting/Streamables/AfaTests.cs @@ -844,16 +844,21 @@ namespace SimpleTesting { var source = new StreamEvent>[] { - StreamEvent.CreateStart(0, new Tuple("A", 1)), + StreamEvent.CreateStart(0, new Tuple("A", 1)), StreamEvent.CreateStart(1, new Tuple("A", 2)), StreamEvent.CreateStart(1, new Tuple("B", 2)), - StreamEvent.CreateStart(3, new Tuple("A", 1)), - StreamEvent.CreateStart(4, new Tuple("B", 1)), + StreamEvent.CreateStart(3, new Tuple("A", 1)), + + StreamEvent.CreatePunctuation>(4), + + StreamEvent.CreateStart(4, new Tuple("B", 1)), StreamEvent.CreateStart(4, new Tuple("B", 2)), - StreamEvent.CreateStart(5, new Tuple("B", 1)), - StreamEvent.CreateStart(5, new Tuple("C", 1)), + StreamEvent.CreateStart(5, new Tuple("B", 1)), + StreamEvent.CreateStart(5, new Tuple("C", 1)), StreamEvent.CreateStart(6, new Tuple("B", 2)), StreamEvent.CreateStart(7, new Tuple("A", 2)), + + StreamEvent.CreatePunctuation>(7), }.ToObservable() .ToStreamable() .AlterEventDuration(10); @@ -881,18 +886,21 @@ namespace SimpleTesting var result = afa_compiled .ToStreamEventObservable() - .Where(evt => evt.IsData) .ToEnumerable() .ToArray(); var expected = new StreamEvent>[] { StreamEvent.CreateInterval(1, 11, new Tuple("AB", 2)), + StreamEvent.CreatePunctuation>(4), StreamEvent.CreateInterval(4, 13, new Tuple("AB", 1)), StreamEvent.CreateInterval(4, 10, new Tuple("AAB", 1)), StreamEvent.CreateInterval(4, 11, new Tuple("ABB", 2)), StreamEvent.CreateInterval(5, 13, new Tuple("ABB", 1)), StreamEvent.CreateInterval(5, 10, new Tuple("AABB", 1)), StreamEvent.CreateInterval(6, 11, new Tuple("ABBB", 2)), + StreamEvent.CreatePunctuation>(7), + + StreamEvent.CreatePunctuation>(StreamEvent.InfinitySyncTime), }; Assert.IsTrue(result.SequenceEqual(expected)); }