Removing previous fix in UnionPipe, and instead addressing the problem at ingress

This commit is contained in:
Peter Freiling 2019-03-15 14:19:36 -07:00
Родитель 37c4f77372
Коммит 6141b0ed39
12 изменённых файлов: 753 добавлений и 436 удалений

Просмотреть файл

@ -286,7 +286,12 @@ namespace Microsoft.StreamProcessing.Internal
{
Contract.Requires(value.IsPunctuation);
this.lastPunctuationTime = Math.Max(value.SyncTime, this.lastPunctuationTime);
if (value.SyncTime < this.lastPunctuationTime)
{
throw new InvalidOperationException($"Received out of order punctuation {value.SyncTime}! Previous punctuation time {this.lastPunctuationTime}");
}
this.lastPunctuationTime = value.SyncTime;
var count = this.currentBatch.Count;
this.currentBatch.vsync.col[count] = value.SyncTime;
@ -774,7 +779,12 @@ namespace Microsoft.StreamProcessing.Internal
{
Contract.Requires(value.IsPunctuation);
this.lastPunctuationTime = Math.Max(value.SyncTime, this.lastPunctuationTime);
if (value.SyncTime < this.lastPunctuationTime)
{
throw new InvalidOperationException($"Received out of order punctuation {value.SyncTime}! Previous punctuation time {this.lastPunctuationTime}");
}
this.lastPunctuationTime = value.SyncTime;
var count = this.currentBatch.Count;
this.currentBatch.vsync.col[count] = value.SyncTime;
@ -1321,7 +1331,12 @@ namespace Microsoft.StreamProcessing.Internal
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
{
this.lastPunctuationTime[value.PartitionKey] = Math.Max(value.SyncTime, this.lastPunctuationTime[value.PartitionKey]);
if (value.SyncTime < this.lastPunctuationTime[value.PartitionKey])
{
throw new InvalidOperationException($"Received out of order punctuation {value.SyncTime}! Previous punctuation time {this.lastPunctuationTime[value.PartitionKey]}");
}
this.lastPunctuationTime[value.PartitionKey] = value.SyncTime;
}
var count = this.currentBatch.Count;
@ -1860,7 +1875,12 @@ namespace Microsoft.StreamProcessing.Internal
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
{
this.lastPunctuationTime[value.PartitionKey] = Math.Max(value.SyncTime, this.lastPunctuationTime[value.PartitionKey]);
if (value.SyncTime < this.lastPunctuationTime[value.PartitionKey])
{
throw new InvalidOperationException($"Received out of order punctuation {value.SyncTime}! Previous punctuation time {this.lastPunctuationTime[value.PartitionKey]}");
}
this.lastPunctuationTime[value.PartitionKey] = value.SyncTime;
}
var count = this.currentBatch.Count;

Просмотреть файл

@ -406,7 +406,12 @@ foreach (bool disordered in new[] { true, false })
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
{
this.lastPunctuationTime[value.PartitionKey] = Math.Max(value.SyncTime, this.lastPunctuationTime[value.PartitionKey]);
if (value.SyncTime < this.lastPunctuationTime[value.PartitionKey])
{
throw new InvalidOperationException($"Received out of order punctuation {value.SyncTime}! Previous punctuation time {this.lastPunctuationTime[value.PartitionKey]}");
}
this.lastPunctuationTime[value.PartitionKey] = value.SyncTime;
}
var count = this.currentBatch.Count;
@ -431,7 +436,12 @@ foreach (bool disordered in new[] { true, false })
{
Contract.Requires(value.IsPunctuation);
this.lastPunctuationTime = Math.Max(value.SyncTime, this.lastPunctuationTime);
if (value.SyncTime < this.lastPunctuationTime)
{
throw new InvalidOperationException($"Received out of order punctuation {value.SyncTime}! Previous punctuation time {this.lastPunctuationTime}");
}
this.lastPunctuationTime = value.SyncTime;
var count = this.currentBatch.Count;
this.currentBatch.vsync.col[count] = value.SyncTime;

Просмотреть файл

@ -367,7 +367,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -513,6 +517,11 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
syncTime = Math.Max(syncTime, this.currentTime);
// Process events queued for reorderLatency up to the Punctuation syncTime
if (this.priorityQueueSorter != null)
{
@ -731,7 +740,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -872,6 +885,11 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
syncTime = Math.Max(syncTime, this.currentTime);
// Process events queued for reorderLatency up to the Punctuation syncTime
if (this.priorityQueueSorter != null)
{
@ -1093,7 +1111,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -1239,6 +1261,11 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
syncTime = Math.Max(syncTime, this.currentTime);
// Process events queued for reorderLatency up to the Punctuation syncTime
if (this.priorityQueueSorter != null)
{
@ -1360,7 +1387,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -1506,6 +1537,10 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
syncTime = Math.Max(syncTime, this.currentTime);
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
this.currentTime = Math.Max(syncTime, this.currentTime);
@ -1617,7 +1652,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -1758,6 +1797,10 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
syncTime = Math.Max(syncTime, this.currentTime);
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
this.currentTime = Math.Max(syncTime, this.currentTime);
@ -1872,7 +1915,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -2018,6 +2065,10 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
syncTime = Math.Max(syncTime, this.currentTime);
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
this.currentTime = Math.Max(syncTime, this.currentTime);
@ -2198,7 +2249,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -2255,6 +2310,11 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
syncTime = Math.Max(syncTime, this.currentTime);
// Process events queued for reorderLatency up to the Punctuation syncTime
if (this.priorityQueueSorter != null)
{
@ -2475,7 +2535,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -2527,6 +2591,11 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
syncTime = Math.Max(syncTime, this.currentTime);
// Process events queued for reorderLatency up to the Punctuation syncTime
if (this.priorityQueueSorter != null)
{
@ -2749,7 +2818,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -2806,6 +2879,11 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
syncTime = Math.Max(syncTime, this.currentTime);
// Process events queued for reorderLatency up to the Punctuation syncTime
if (this.priorityQueueSorter != null)
{
@ -2938,7 +3016,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -2995,6 +3077,10 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
syncTime = Math.Max(syncTime, this.currentTime);
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
this.currentTime = Math.Max(syncTime, this.currentTime);
@ -3117,7 +3203,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -3169,6 +3259,10 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
syncTime = Math.Max(syncTime, this.currentTime);
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
this.currentTime = Math.Max(syncTime, this.currentTime);
@ -3293,7 +3387,11 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -3350,6 +3448,10 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lastPunctuationTime) return;
// Update the Punctuation to be at least the currentTime, so the Punctuation
// is not before the preceding data event.
syncTime = Math.Max(syncTime, this.currentTime);
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
this.currentTime = Math.Max(syncTime, this.currentTime);
@ -3441,8 +3543,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -3580,9 +3685,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -3736,6 +3846,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Process events queued for reorderLatency up to the LowWatermark syncTime
if (this.priorityQueueSorter != null)
{
@ -3914,8 +4025,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -4053,9 +4167,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -4204,6 +4323,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Process events queued for reorderLatency up to the LowWatermark syncTime
if (this.priorityQueueSorter != null)
{
@ -4385,8 +4505,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -4524,9 +4647,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -4680,6 +4808,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Process events queued for reorderLatency up to the LowWatermark syncTime
if (this.priorityQueueSorter != null)
{
@ -4843,8 +4972,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -4881,9 +5013,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -5037,6 +5174,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
if (this.lowWatermark < syncTime)
@ -5145,8 +5283,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -5183,9 +5324,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -5334,6 +5480,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
if (this.lowWatermark < syncTime)
@ -5445,8 +5592,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -5483,9 +5633,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -5639,6 +5794,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
if (this.lowWatermark < syncTime)
@ -5751,8 +5907,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -5875,9 +6034,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -5935,6 +6099,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Process events queued for reorderLatency up to the LowWatermark syncTime
if (this.priorityQueueSorter != null)
{
@ -6132,8 +6297,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -6256,9 +6424,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -6311,6 +6484,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Process events queued for reorderLatency up to the LowWatermark syncTime
if (this.priorityQueueSorter != null)
{
@ -6510,8 +6684,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -6634,9 +6811,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -6694,6 +6876,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Process events queued for reorderLatency up to the LowWatermark syncTime
if (this.priorityQueueSorter != null)
{
@ -6876,8 +7059,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -6908,9 +7094,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -6968,6 +7159,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
if (this.lowWatermark < syncTime)
@ -7095,8 +7287,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -7127,9 +7322,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -7182,6 +7382,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
if (this.lowWatermark < syncTime)
@ -7311,8 +7512,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -7343,9 +7547,14 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -7403,6 +7612,7 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= this.lowWatermark) return;
// Update cached global times
this.highWatermark = Math.Max(syncTime, this.highWatermark);
if (this.lowWatermark < syncTime)

Просмотреть файл

@ -308,8 +308,11 @@ namespace Microsoft.StreamProcessing
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark) >= this.lowWatermarkGenerationPeriod)
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
@ -501,9 +504,13 @@ namespace Microsoft.StreamProcessing
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
<# if (!partitioned)
{ #>
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime);
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -515,7 +522,8 @@ namespace Microsoft.StreamProcessing
{ #>
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark));
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
@ -701,6 +709,17 @@ namespace Microsoft.StreamProcessing
{
if (syncTime <= <#= partitioned ? "this.lowWatermark" : "this.lastPunctuationTime" #>) return;
<# if (!partitioned)
{ #>
// Update the <#= globalPunctuation #> to be at least the currentTime, so the <#= globalPunctuation #>
// is not before the preceding data event.
<# if (latencyOption == "WithLatency")
{ #>
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
<# } #>
syncTime = Math.Max(syncTime, this.currentTime);
<# } #>
<# if (latencyOption == "WithLatency")
{ #>
// Process events queued for reorderLatency up to the <#= globalPunctuation #> syncTime

Просмотреть файл

@ -26,7 +26,7 @@ namespace Microsoft.StreamProcessing
public override string TransformText()
{
this.Write("// *********************************************************************\r\n// Copy" +
"right (C) Microsoft Corporation. All rights reserved.\r\n// Licensed under the MI" +
"right (c) Microsoft Corporation. All rights reserved.\r\n// Licensed under the MI" +
"T License\r\n// ******************************************************************" +
"***\r\n/*\\\r\n * Spec:\r\n *\r\n * Apply punctuation policy first. This means t" +
"hat even dropped events\r\n * count towards the number of events seen since the" +
@ -58,7 +58,7 @@ namespace Microsoft.StreamProcessing
bool partitioned = (partitionString == "Partitioned");
string baseStructure = partitionString + "StreamEvent<" + adjustedGenericArgs + ">";
string globalPunctuation = partitioned ? "LowWatermark" : "Punctuation";
string highWatermark = partitioned ? "partitionHighWatermarks[value.PartitionKey]" : "highWatermark";
string highWatermark = partitioned ? "this.partitionHighWatermarks[value.PartitionKey]" : "this.highWatermark";
string keyType = !partitioned ? "Microsoft.StreamProcessing.Empty" : "PartitionKey<TKey>";
string streamEventFromValue = fusionOption == "Disordered" ? ("new " + partitionString + "StreamEvent<" + genericArguments + ">(" + (!partitioned ? string.Empty : "value.PartitionKey, ") + "value.SyncTime, value.OtherTime, default)") : "value";
@ -196,33 +196,33 @@ namespace Microsoft.StreamProcessing
{
if (partitioned)
{
this.Write(@" // Check to see if we need to generate a low watermark due to PeriodicLowWatermarkPolicy
if (lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > lowWatermarkTimestampLag)
{
var newLowWatermark = value.SyncTime - lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - lowWatermark) >= lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)lowWatermarkGenerationPeriod);
GenerateAndProcessLowWatermark(newLowWatermarkSnapped);
}
}
long moveFrom;
if (!currentTime.TryGetValue(value.PartitionKey, out moveFrom)) moveFrom = lowWatermark;
if (!partitionHighWatermarks.ContainsKey(value.PartitionKey))
{
partitionHighWatermarks.Add(value.PartitionKey, lowWatermark);
if (highWatermarkToPartitionsMap.TryGetValue(lowWatermark, out HashSet<TKey> keySet)) keySet.Add(value.PartitionKey);
else highWatermarkToPartitionsMap.Add(lowWatermark, new HashSet<TKey> { value.PartitionKey });
}
");
this.Write(" // Check to see if we need to generate a low watermark due to Periodi" +
"cLowWatermarkPolicy\r\n if (this.lowWatermarkPolicyType == PeriodicLowW" +
"atermarkPolicyType.Time &&\r\n value.SyncTime > this.lowWatermarkTi" +
"mestampLag)\r\n {\r\n // Generated lowWatermark is always " +
"quantized to the last lowWatermarkGenerationPeriod boundary, but\r\n " +
" // since the last lowWatermark time may have been explicitly ingressed, it is " +
"not necessarily quantized,\r\n // so use the quantized value when c" +
"omputing the delta.\r\n var newLowWatermark = value.SyncTime - this" +
".lowWatermarkTimestampLag;\r\n if ((ulong)(newLowWatermark - this.l" +
"owWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this" +
".lowWatermarkGenerationPeriod)\r\n {\r\n // SyncTi" +
"me is sufficiently high to generate a new watermark, but first snap it to the ne" +
"arest generationPeriod boundary\r\n var newLowWatermarkSnapped " +
"= newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);\r\n" +
" GenerateAndProcessLowWatermark(newLowWatermarkSnapped);\r\n " +
" }\r\n }\r\n\r\n long moveFrom;\r\n if (!this.curre" +
"ntTime.TryGetValue(value.PartitionKey, out moveFrom)) moveFrom = this.lowWaterma" +
"rk;\r\n if (!this.partitionHighWatermarks.ContainsKey(value.PartitionKey))\r" +
"\n {\r\n this.partitionHighWatermarks.Add(value.PartitionKey, thi" +
"s.lowWatermark);\r\n\r\n if (this.highWatermarkToPartitionsMap.TryGetValu" +
"e(this.lowWatermark, out HashSet<TKey> keySet)) keySet.Add(value.PartitionKey);\r" +
"\n else this.highWatermarkToPartitionsMap.Add(this.lowWatermark, new H" +
"ashSet<TKey> { value.PartitionKey });\r\n }\r\n");
}
else
{
this.Write(" long moveFrom = currentTime;\r\n");
this.Write(" long moveFrom = this.currentTime;\r\n");
}
this.Write(@" long moveTo = moveFrom;
@ -246,32 +246,34 @@ namespace Microsoft.StreamProcessing
this.Write(" = value.SyncTime;\r\n\r\n");
if (partitioned)
{
this.Write(@" var oldSet = highWatermarkToPartitionsMap[oldTime];
if (oldSet.Count <= 1) highWatermarkToPartitionsMap.Remove(oldTime);
this.Write(@" var oldSet = this.highWatermarkToPartitionsMap[oldTime];
if (oldSet.Count <= 1) this.highWatermarkToPartitionsMap.Remove(oldTime);
else oldSet.Remove(value.PartitionKey);
if (highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet<TKey> set)) set.Add(value.PartitionKey);
else highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet<TKey> { value.PartitionKey });
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet<TKey> set)) set.Add(value.PartitionKey);
else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet<TKey> { value.PartitionKey });
");
}
this.Write(" moveTo = value.SyncTime - reorderLatency;\r\n if (moveTo < S" +
"treamEvent.MinSyncTime) moveTo = StreamEvent.MinSyncTime;\r\n if (moveT" +
"o < moveFrom) moveTo = moveFrom;\r\n }\r\n");
this.Write(" moveTo = value.SyncTime - this.reorderLatency;\r\n if (moveT" +
"o < StreamEvent.MinSyncTime) moveTo = StreamEvent.MinSyncTime;\r\n if (" +
"moveTo < moveFrom) moveTo = moveFrom;\r\n }\r\n");
if (ingressType == "StreamEvent")
{
PopIndent();
this.Write(" }\r\n");
}
this.Write("\r\n if (moveTo > moveFrom)\r\n {\r\n if (priorityQueueSorter " +
"!= null)\r\n {\r\n ");
this.Write("\r\n if (moveTo > moveFrom)\r\n {\r\n if (this.priorityQueueSo" +
"rter != null)\r\n {\r\n ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent<");
this.Write(this.ToStringHelper.ToStringWithCulture(adjustedGenericArgs));
this.Write("> resultEvent;\r\n\r\n while ((!priorityQueueSorter.IsEmpty()) && prio" +
"rityQueueSorter.Peek().SyncTime <= moveTo)\r\n {\r\n " +
" resultEvent = priorityQueueSorter.Dequeue();\r\n Process(ref" +
" resultEvent");
this.Write(@"> resultEvent;
while ((!this.priorityQueueSorter.IsEmpty()) && this.priorityQueueSorter.Peek().SyncTime <= moveTo)
{
resultEvent = this.priorityQueueSorter.Dequeue();
Process(ref resultEvent");
if (partitioned) {
this.Write(", updateCurrentTime : false");
}
@ -279,7 +281,7 @@ if (partitioned) {
" // Extract and process data in-order from impatience, until timestamp of m" +
"oveTo\r\n PooledElasticCircularBuffer<");
this.Write(this.ToStringHelper.ToStringWithCulture(baseStructure));
this.Write("> streamEvents = impatienceSorter.DequeueUntil(");
this.Write("> streamEvents = this.impatienceSorter.DequeueUntil(");
this.Write(this.ToStringHelper.ToStringWithCulture(!partitioned ? string.Empty : "value.PartitionKey, "));
this.Write(" moveTo, out bool recheck);\r\n\r\n if (streamEvents != null)\r\n " +
" {\r\n ");
@ -293,7 +295,7 @@ if (partitioned) {
this.Write(", updateCurrentTime : false");
}
this.Write(");\r\n }\r\n }\r\n\r\n if (!recheck && (" +
"streamEvents != null))\r\n impatienceSorter.Return(");
"streamEvents != null))\r\n this.impatienceSorter.Return(");
this.Write(this.ToStringHelper.ToStringWithCulture(!partitioned ? string.Empty : "value.PartitionKey, "));
this.Write(@" streamEvents);
}
@ -306,20 +308,21 @@ if (partitioned) {
}
// Enqueue value into impatience
if (priorityQueueSorter != null) priorityQueueSorter.Enqueue(value);
else impatienceSorter.Enqueue(ref value);
if (this.priorityQueueSorter != null) this.priorityQueueSorter.Enqueue(value);
else this.impatienceSorter.Enqueue(ref value);
// Move currentTime forward
");
if (!partitioned)
{
this.Write(" currentTime = moveTo;\r\n");
this.Write(" this.currentTime = moveTo;\r\n");
}
else
{
this.Write(" if (!currentTime.TryGetValue(value.PartitionKey, out long oldCurrentTime)" +
")\r\n currentTime.Add(value.PartitionKey, moveTo);\r\n else if (ol" +
"dCurrentTime < moveTo)\r\n currentTime[value.PartitionKey] = moveTo;\r\n");
this.Write(" if (!this.currentTime.TryGetValue(value.PartitionKey, out long oldCurrent" +
"Time))\r\n this.currentTime.Add(value.PartitionKey, moveTo);\r\n e" +
"lse if (oldCurrentTime < moveTo)\r\n this.currentTime[value.PartitionKe" +
"y] = moveTo;\r\n");
}
this.Write(" }\r\n\r\n private void Process(ref ");
this.Write(this.ToStringHelper.ToStringWithCulture(baseStructure));
@ -349,19 +352,19 @@ if (partitioned) {
}
if (!partitioned)
{
this.Write(" long current = currentTime;\r\n");
this.Write(" long current = this.currentTime;\r\n");
}
else
{
this.Write(@" // Update global high water mark if necessary
highWatermark = Math.Max(highWatermark, value.SyncTime);
this.highWatermark = Math.Max(this.highWatermark, value.SyncTime);
if (punctuationPolicyType == PeriodicPunctuationPolicyType.Time && !lastPunctuationTime.ContainsKey(value.PartitionKey))
lastPunctuationTime.Add(value.PartitionKey, lowWatermark);
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time && !this.lastPunctuationTime.ContainsKey(value.PartitionKey))
this.lastPunctuationTime.Add(value.PartitionKey, this.lowWatermark);
// Retrieve current time for this partition, updating currentTime if necessary
long current;
if (!currentTime.TryGetValue(value.PartitionKey, out current))
if (!this.currentTime.TryGetValue(value.PartitionKey, out current))
{
");
if (latencyOption == "WithLatency")
@ -370,32 +373,37 @@ if (partitioned) {
"t\r\n if (!updateCurrentTime) throw new IngressException(\"Partition exp" +
"ected to have a valid currentTime!\");\r\n");
}
this.Write(" current = lowWatermark;\r\n }\r\n else if (");
this.Write(" current = this.lowWatermark;\r\n }\r\n else if (");
if (latencyOption == "WithLatency") {
this.Write("updateCurrentTime && ");
}
this.Write("current < lowWatermark)\r\n {\r\n current = lowWatermark;\r\n " +
" currentTime[value.PartitionKey] = lowWatermark;\r\n }\r\n");
this.Write("current < this.lowWatermark)\r\n {\r\n current = this.lowWatermark;" +
"\r\n this.currentTime[value.PartitionKey] = this.lowWatermark;\r\n " +
" }\r\n");
}
this.Write("\r\n var outOfOrder = value.SyncTime < current;\r\n if (punctuationPoli" +
"cyType == PeriodicPunctuationPolicyType.Time)\r\n {\r\n");
this.Write("\r\n var outOfOrder = value.SyncTime < current;\r\n if (this.punctuatio" +
"nPolicyType == PeriodicPunctuationPolicyType.Time)\r\n {\r\n");
if (ingressType == "StreamEvent" && partitioned)
{
this.Write(" // Track punctuation\r\n if (value.IsPunctuation && value.Sy" +
"ncTime > lastPunctuationTime[value.PartitionKey])\r\n lastPunctuati" +
"onTime[value.PartitionKey] = value.SyncTime;\r\n\r\n");
"ncTime > this.lastPunctuationTime[value.PartitionKey])\r\n this.las" +
"tPunctuationTime[value.PartitionKey] = value.SyncTime;\r\n\r\n");
}
this.Write(@" // out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
");
if (!partitioned)
{
this.Write(@" ulong delta = (ulong)(value.SyncTime - lastPunctuationTime);
if (!outOfOrder && delta >= punctuationGenerationPeriod)
this.Write(@" ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)punctuationGenerationPeriod);
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod);
OnPunctuation(StreamEvent.CreatePunctuation<");
this.Write(this.ToStringHelper.ToStringWithCulture(fusionOption == "Disordered" ? TResult : TPayload));
this.Write(">(punctuationTimeSnapped));\r\n }\r\n");
@ -404,11 +412,12 @@ if (latencyOption == "WithLatency") {
{
this.Write(@" // We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(lastPunctuationTime[value.PartitionKey], lowWatermark));
if (!outOfOrder && punctuationGenerationPeriod > 0 && delta >= punctuationGenerationPeriod)
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)punctuationGenerationPeriod);
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod);
OnPunctuation(PartitionedStreamEvent.CreatePunctuation<TKey, ");
this.Write(this.ToStringHelper.ToStringWithCulture(fusionOption == "Disordered" ? TResult : TPayload));
this.Write(">(value.PartitionKey, punctuationTimeSnapped));\r\n }\r\n");
@ -425,7 +434,7 @@ if (latencyOption == "WithLatency") {
PushIndent(" ");
}
}
this.Write(@" if (disorderPolicyType == DisorderPolicyType.Throw)
this.Write(@" if (this.disorderPolicyType == DisorderPolicyType.Throw)
{
if (outOfOrder)
{
@ -448,10 +457,10 @@ if (latencyOption == "WithLatency") {
if (outOfOrder)
{
key = Tuple.Create(value.SyncTime, value.Payload);
if (!startEventInformation.TryGetValue(key, out q))
if (!this.startEventInformation.TryGetValue(key, out q))
{
q = new ElasticCircularBuffer<AdjustInfo>();
startEventInformation.Add(key, q);
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
}
@ -466,15 +475,15 @@ if (latencyOption == "WithLatency") {
}
}
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder");
this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
this.Write(", new long?()));\r\n return; // drop\r\n " +
" }\r\n else\r\n {\r\n " +
" diagnosticOutput?.OnNext(OutOfOrder");
" this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
@ -490,8 +499,8 @@ if (latencyOption == "WithLatency") {
"amEventKind.Interval:\r\n");
}
this.Write(" if (outOfOrder)\r\n {\r\n " +
" if (disorderPolicyType == DisorderPolicyType.Drop)\r\n {" +
"\r\n diagnosticOutput?.OnNext(OutOfOrder");
" if (this.disorderPolicyType == DisorderPolicyType.Drop)\r\n " +
" {\r\n this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
@ -502,13 +511,13 @@ if (latencyOption == "WithLatency") {
{
if (current >= value.OtherTime)
{
diagnosticOutput?.OnNext(OutOfOrder");
this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
this.Write(", new long?()));\r\n return; // drop\r\n " +
" }\r\n\r\n diagnosticOutput?.OnNext(OutOfOrd" +
"er");
" }\r\n\r\n this.diagnosticOutput?.OnNext(Out" +
"OfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
@ -528,7 +537,7 @@ if (latencyOption == "WithLatency") {
case StreamEventKind.End:
// it may not be out of order, but did we drop/adjust the corresponding start event?
key = Tuple.Create(value.OtherTime, value.Payload);
if (startEventInformation.TryGetValue(key, out q))
if (this.startEventInformation.TryGetValue(key, out q))
{
Contract.Assume(!q.IsEmpty());
var firstElement = q.PeekFirst();
@ -536,19 +545,19 @@ if (latencyOption == "WithLatency") {
if (firstElement.numberOfOccurrences == 0)
{
q.Dequeue(); // throw away returned value
if (q.Count == 0) startEventInformation.Remove(key);
if (q.Count == 0) this.startEventInformation.Remove(key);
}
var adjustedTime = firstElement.modifiedStartTime;
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder");
this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
this.Write(", new long?()));\r\n return; // drop\r\n " +
" }\r\n else\r\n {\r\n " +
" diagnosticOutput?.OnNext(OutOfOrder");
" this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
@ -564,15 +573,15 @@ if (latencyOption == "WithLatency") {
}
else if (outOfOrder)
{
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder");
this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
this.Write(", new long?()));\r\n return; // drop\r\n " +
" }\r\n else\r\n {\r\n " +
" diagnosticOutput?.OnNext(OutOfOrder");
" this.diagnosticOutput?.OnNext(OutOfOrder");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.Create(");
this.Write(this.ToStringHelper.ToStringWithCulture(streamEventFromValue));
@ -610,37 +619,55 @@ if (latencyOption == "WithLatency") {
this.Write("\r\n");
if (!partitioned)
{
this.Write(" if (currentTime < value.SyncTime) currentTime = value.SyncTime;\r\n");
this.Write(" if (this.currentTime < value.SyncTime) this.currentTime = value.SyncTime;" +
"\r\n");
}
else
{
this.Write(" if (!currentTime.TryGetValue(value.PartitionKey, out long oldCurrentTime)" +
" || oldCurrentTime < value.SyncTime)\r\n currentTime[value.PartitionKey" +
"] = value.SyncTime;\r\n");
this.Write(" if (!this.currentTime.TryGetValue(value.PartitionKey, out long oldCurrent" +
"Time) || oldCurrentTime < value.SyncTime)\r\n this.currentTime[value.Pa" +
"rtitionKey] = value.SyncTime;\r\n");
}
this.Write(" }\r\n\r\n private void GenerateAndProcess");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write("(long syncTime)\r\n {\r\n if (syncTime <= ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitioned ? "lowWatermark" : "lastPunctuationTime"));
this.Write(this.ToStringHelper.ToStringWithCulture(partitioned ? "this.lowWatermark" : "this.lastPunctuationTime"));
this.Write(") return;\r\n\r\n");
if (!partitioned)
{
this.Write(" \r\n // Update the ");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write(" to be at least the currentTime, so the ");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write("\r\n // is not before the preceding data event.\r\n");
if (latencyOption == "WithLatency")
{
this.Write(" // Note that currentTime only reflects events already processed, and " +
"excludes events in the reorder buffer.\r\n");
}
this.Write(" syncTime = Math.Max(syncTime, this.currentTime);\r\n");
}
this.Write("\r\n");
if (latencyOption == "WithLatency")
{
this.Write(" // Process events queued for reorderLatency up to the ");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write(" syncTime\r\n if (priorityQueueSorter != null)\r\n {\r\n ");
this.Write(" syncTime\r\n if (this.priorityQueueSorter != null)\r\n {\r\n " +
"");
this.Write(this.ToStringHelper.ToStringWithCulture(baseStructure));
this.Write(" resultEvent;\r\n while ((!priorityQueueSorter.IsEmpty()) && priorityQue" +
"ueSorter.Peek().SyncTime <= syncTime)\r\n {\r\n resultEven" +
"t = priorityQueueSorter.Dequeue();\r\n Process(ref resultEvent");
this.Write(" resultEvent;\r\n while ((!this.priorityQueueSorter.IsEmpty()) && this.p" +
"riorityQueueSorter.Peek().SyncTime <= syncTime)\r\n {\r\n " +
"resultEvent = this.priorityQueueSorter.Dequeue();\r\n Process(ref r" +
"esultEvent");
if (partitioned) {
this.Write(", updateCurrentTime : false");
}
this.Write(");\r\n }\r\n }\r\n else\r\n {\r\n");
if (partitioned)
{
this.Write(" bool recheck;\r\n var PartitionedstreamEvents = impatienceSo" +
"rter.DequeueUntil(syncTime);\r\n\r\n int index = FastDictionary<TKey, Tup" +
"le<bool, PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, ");
this.Write(" bool recheck;\r\n var PartitionedstreamEvents = this.impatie" +
"nceSorter.DequeueUntil(syncTime);\r\n\r\n int index = FastDictionary<TKey" +
", Tuple<bool, PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, ");
this.Write(this.ToStringHelper.ToStringWithCulture(TPayload));
this.Write(">>>>.IteratorStart;\r\n while(");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
@ -652,8 +679,8 @@ if (partitioned) {
}
else
{
this.Write(" var streamEvents = impatienceSorter.DequeueUntil(syncTime, out bool r" +
"echeck);\r\n");
this.Write(" var streamEvents = this.impatienceSorter.DequeueUntil(syncTime, out b" +
"ool recheck);\r\n");
}
this.Write(" if (streamEvents != null)\r\n {\r\n ");
this.Write(this.ToStringHelper.ToStringWithCulture(baseStructure));
@ -664,7 +691,8 @@ if (partitioned) {
if (partitioned) {
this.Write(", updateCurrentTime : false");
}
this.Write(");\r\n }\r\n if (!recheck) impatienceSorter.Return(");
this.Write(");\r\n }\r\n if (!recheck) this.impatienceSorter.Return" +
"(");
this.Write(this.ToStringHelper.ToStringWithCulture(!partitioned ? string.Empty : "entry.key , "));
this.Write(" streamEvents);\r\n }\r\n");
if (partitioned)
@ -674,20 +702,20 @@ if (partitioned) {
}
this.Write(" }\r\n\r\n");
}
this.Write(" // Update cached global times\r\n highWatermark = Math.Max(syncTime," +
" highWatermark);\r\n");
this.Write(" // Update cached global times\r\n this.highWatermark = Math.Max(sync" +
"Time, this.highWatermark);\r\n");
if (partitioned)
{
this.Write(@" if (lowWatermark < syncTime)
this.Write(@" if (this.lowWatermark < syncTime)
{
lowWatermark = syncTime;
this.lowWatermark = syncTime;
// Gather keys whose high watermarks are before the new low watermark
var expiredWatermarkKVPs = new List<KeyValuePair<long, HashSet<TKey>>>();
foreach (var keyValuePair in highWatermarkToPartitionsMap)
foreach (var keyValuePair in this.highWatermarkToPartitionsMap)
{
// Since highWatermarkToPartitionsMap is sorted, we can stop as soon as we reach the threshold
if (keyValuePair.Key >= lowWatermark) break;
if (keyValuePair.Key >= this.lowWatermark) break;
expiredWatermarkKVPs.Add(keyValuePair);
}
@ -696,14 +724,14 @@ if (partitioned) {
foreach (var expiredWatermarkKVP in expiredWatermarkKVPs)
{
var expiredWatermark = expiredWatermarkKVP.Key;
highWatermarkToPartitionsMap.Remove(expiredWatermark);
this.highWatermarkToPartitionsMap.Remove(expiredWatermark);
var expiredKeys = expiredWatermarkKVP.Value;
foreach (var expiredKey in expiredKeys)
{
lastPunctuationTime.Remove(expiredKey);
partitionHighWatermarks.Remove(expiredKey);
currentTime.Remove(expiredKey);
this.lastPunctuationTime.Remove(expiredKey);
this.partitionHighWatermarks.Remove(expiredKey);
this.currentTime.Remove(expiredKey);
}
}
}
@ -711,44 +739,51 @@ if (partitioned) {
}
else
{
this.Write(" currentTime = Math.Max(syncTime, currentTime);\r\n lastPunctuationTi" +
"me = Math.Max(syncTime, lastPunctuationTime);\r\n");
this.Write(" this.currentTime = Math.Max(syncTime, this.currentTime);\r\n this.la" +
"stPunctuationTime = Math.Max(syncTime, this.lastPunctuationTime);\r\n");
}
this.Write("\r\n // Add ");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write(" to batch\r\n var count = currentBatch.Count;\r\n currentBatch.vsync.co" +
"l[count] = syncTime;\r\n currentBatch.vother.col[count] = ");
this.Write(" to batch\r\n var count = this.currentBatch.Count;\r\n this.currentBatc" +
"h.vsync.col[count] = syncTime;\r\n this.currentBatch.vother.col[count] = ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("StreamEvent.");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write(@"OtherTime;
currentBatch.bitvector.col[count >> 6] |= (1L << (count & 0x3f));
currentBatch.key.col[count] = default;
currentBatch[count] = default;
currentBatch.hash.col[count] = 0;
currentBatch.Count = count + 1;
this.currentBatch.bitvector.col[count >> 6] |= (1L << (count & 0x3f));
this.currentBatch.key.col[count] = default;
this.currentBatch[count] = default;
this.currentBatch.hash.col[count] = 0;
this.currentBatch.Count = count + 1;
// Flush if necessary
if (flushPolicy == ");
if (this.flushPolicy == ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("FlushPolicy.FlushOn");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write(" ||\r\n (flushPolicy == ");
this.Write(" ||\r\n (this.flushPolicy == ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("FlushPolicy.FlushOnBatchBoundary && currentBatch.Count == Config.DataBatchSize))\r" +
"\n {\r\n OnFlush();\r\n }\r\n else if (currentBatch.Cou" +
"nt == Config.DataBatchSize)\r\n {\r\n FlushContents();\r\n }\r" +
"\n }\r\n\r\n");
this.Write(@"FlushPolicy.FlushOnBatchBoundary && this.currentBatch.Count == Config.DataBatchSize))
{
OnFlush();
}
else if (this.currentBatch.Count == Config.DataBatchSize)
{
FlushContents();
}
}
");
if (partitioned && latencyOption == "WithLatency")
{
this.Write(@" protected override void UpdatePointers()
{
foreach (var kvp in partitionHighWatermarks)
foreach (var kvp in this.partitionHighWatermarks)
{
if (highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet<TKey> set))
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet<TKey> set))
set.Add(kvp.Key);
else
highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet<TKey> { kvp.Key });
this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet<TKey> { kvp.Key });
}
}
@ -758,7 +793,7 @@ if (partitioned) {
"erateAndProcess");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
this.Write("(punctuationTime);\r\n // Flush, but if we just flushed due to the punctuati" +
"on generated above\r\n if (flushPolicy != ");
"on generated above\r\n if (this.flushPolicy != ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));
this.Write("FlushPolicy.FlushOn");
this.Write(this.ToStringHelper.ToStringWithCulture(globalPunctuation));
@ -777,16 +812,17 @@ this.Write(" generatedBatch = (");
this.Write(this.ToStringHelper.ToStringWithCulture(GeneratedBatchName));
this.Write(") this.currentBatch;\r\n var count = currentBatch.Count;\r\n currentBatch.vsync" +
".col[count] = value.SyncTime;\r\n currentBatch.vother.col[count] = ");
this.Write(") this.currentBatch;\r\n var count = this.currentBatch.Count;\r\n this.currentB" +
"atch.vsync.col[count] = value.SyncTime;\r\n this.currentBatch.vother.col[count]" +
" = ");
this.Write(this.ToStringHelper.ToStringWithCulture(generatedEndTimeVariable));
this.Write(";\r\n currentBatch.key.col[count] = ");
this.Write(";\r\n this.currentBatch.key.col[count] = ");
this.Write(this.ToStringHelper.ToStringWithCulture(emptyOrPartition));
this.Write(";\r\n currentBatch.hash.col[count] = ");
this.Write(";\r\n this.currentBatch.hash.col[count] = ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString == "Partitioned" ? "GetHashCode(value.PartitionKey)" : "0"));
@ -858,8 +894,8 @@ this.Write(";\r\n");
this.Write(" }\r\n");
}
this.Write("\r\n currentBatch.Count++;\r\n if (currentBatch.Count == Config.DataBatchSize)\r" +
"\n {\r\n if (flushPolicy == ");
this.Write("\r\n this.currentBatch.Count++;\r\n if (this.currentBatch.Count == Config.DataB" +
"atchSize)\r\n {\r\n if (this.flushPolicy == ");
this.Write(this.ToStringHelper.ToStringWithCulture(partitionString));

Просмотреть файл

@ -4,7 +4,7 @@
<#@ import namespace="System.Text" #>
<#@ import namespace="System.Collections.Generic" #>
// *********************************************************************
// Copyright (C) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
/*\
@ -55,7 +55,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
<# bool partitioned = (partitionString == "Partitioned");
string baseStructure = partitionString + "StreamEvent<" + adjustedGenericArgs + ">";
string globalPunctuation = partitioned ? "LowWatermark" : "Punctuation";
string highWatermark = partitioned ? "partitionHighWatermarks[value.PartitionKey]" : "highWatermark";
string highWatermark = partitioned ? "this.partitionHighWatermarks[value.PartitionKey]" : "this.highWatermark";
string keyType = !partitioned ? "Microsoft.StreamProcessing.Empty" : "PartitionKey<TKey>";
string streamEventFromValue = fusionOption == "Disordered" ? ("new " + partitionString + "StreamEvent<" + genericArguments + ">(" + (!partitioned ? string.Empty : "value.PartitionKey, ") + "value.SyncTime, value.OtherTime, default)") : "value";
#>
@ -153,31 +153,34 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
if (partitioned)
{ #>
// Check to see if we need to generate a low watermark due to PeriodicLowWatermarkPolicy
if (lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > lowWatermarkTimestampLag)
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
var newLowWatermark = value.SyncTime - lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - lowWatermark) >= lowWatermarkGenerationPeriod)
// Generated lowWatermark is always quantized to the last lowWatermarkGenerationPeriod boundary, but
// since the last lowWatermark time may have been explicitly ingressed, it is not necessarily quantized,
// so use the quantized value when computing the delta.
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod)) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)lowWatermarkGenerationPeriod);
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
GenerateAndProcessLowWatermark(newLowWatermarkSnapped);
}
}
long moveFrom;
if (!currentTime.TryGetValue(value.PartitionKey, out moveFrom)) moveFrom = lowWatermark;
if (!partitionHighWatermarks.ContainsKey(value.PartitionKey))
if (!this.currentTime.TryGetValue(value.PartitionKey, out moveFrom)) moveFrom = this.lowWatermark;
if (!this.partitionHighWatermarks.ContainsKey(value.PartitionKey))
{
partitionHighWatermarks.Add(value.PartitionKey, lowWatermark);
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark);
if (highWatermarkToPartitionsMap.TryGetValue(lowWatermark, out HashSet<TKey> keySet)) keySet.Add(value.PartitionKey);
else highWatermarkToPartitionsMap.Add(lowWatermark, new HashSet<TKey> { value.PartitionKey });
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark, out HashSet<TKey> keySet)) keySet.Add(value.PartitionKey);
else this.highWatermarkToPartitionsMap.Add(this.lowWatermark, new HashSet<TKey> { value.PartitionKey });
}
<# }
else
{ #>
long moveFrom = currentTime;
long moveFrom = this.currentTime;
<# } #>
long moveTo = moveFrom;
@ -201,15 +204,15 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# if (partitioned)
{ #>
var oldSet = highWatermarkToPartitionsMap[oldTime];
if (oldSet.Count <= 1) highWatermarkToPartitionsMap.Remove(oldTime);
var oldSet = this.highWatermarkToPartitionsMap[oldTime];
if (oldSet.Count <= 1) this.highWatermarkToPartitionsMap.Remove(oldTime);
else oldSet.Remove(value.PartitionKey);
if (highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet<TKey> set)) set.Add(value.PartitionKey);
else highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet<TKey> { value.PartitionKey });
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet<TKey> set)) set.Add(value.PartitionKey);
else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet<TKey> { value.PartitionKey });
<# } #>
moveTo = value.SyncTime - reorderLatency;
moveTo = value.SyncTime - this.reorderLatency;
if (moveTo < StreamEvent.MinSyncTime) moveTo = StreamEvent.MinSyncTime;
if (moveTo < moveFrom) moveTo = moveFrom;
}
@ -221,20 +224,20 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
if (moveTo > moveFrom)
{
if (priorityQueueSorter != null)
if (this.priorityQueueSorter != null)
{
<#= partitionString #>StreamEvent<<#= adjustedGenericArgs #>> resultEvent;
while ((!priorityQueueSorter.IsEmpty()) && priorityQueueSorter.Peek().SyncTime <= moveTo)
while ((!this.priorityQueueSorter.IsEmpty()) && this.priorityQueueSorter.Peek().SyncTime <= moveTo)
{
resultEvent = priorityQueueSorter.Dequeue();
resultEvent = this.priorityQueueSorter.Dequeue();
Process(ref resultEvent<#if (partitioned) {#>, updateCurrentTime : false<#}#>);
}
}
else
{
// Extract and process data in-order from impatience, until timestamp of moveTo
PooledElasticCircularBuffer<<#= baseStructure #>> streamEvents = impatienceSorter.DequeueUntil(<#= !partitioned ? string.Empty : "value.PartitionKey, " #> moveTo, out bool recheck);
PooledElasticCircularBuffer<<#= baseStructure #>> streamEvents = this.impatienceSorter.DequeueUntil(<#= !partitioned ? string.Empty : "value.PartitionKey, " #> moveTo, out bool recheck);
if (streamEvents != null)
{
@ -247,7 +250,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
}
if (!recheck && (streamEvents != null))
impatienceSorter.Return(<#= !partitioned ? string.Empty : "value.PartitionKey, " #> streamEvents);
this.impatienceSorter.Return(<#= !partitioned ? string.Empty : "value.PartitionKey, " #> streamEvents);
}
}
@ -258,20 +261,20 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
}
// Enqueue value into impatience
if (priorityQueueSorter != null) priorityQueueSorter.Enqueue(value);
else impatienceSorter.Enqueue(ref value);
if (this.priorityQueueSorter != null) this.priorityQueueSorter.Enqueue(value);
else this.impatienceSorter.Enqueue(ref value);
// Move currentTime forward
<# if (!partitioned)
{ #>
currentTime = moveTo;
this.currentTime = moveTo;
<# }
else
{ #>
if (!currentTime.TryGetValue(value.PartitionKey, out long oldCurrentTime))
currentTime.Add(value.PartitionKey, moveTo);
if (!this.currentTime.TryGetValue(value.PartitionKey, out long oldCurrentTime))
this.currentTime.Add(value.PartitionKey, moveTo);
else if (oldCurrentTime < moveTo)
currentTime[value.PartitionKey] = moveTo;
this.currentTime[value.PartitionKey] = moveTo;
<# } #>
}
@ -301,54 +304,58 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# }
if (!partitioned)
{ #>
long current = currentTime;
long current = this.currentTime;
<# }
else
{ #>
// Update global high water mark if necessary
highWatermark = Math.Max(highWatermark, value.SyncTime);
this.highWatermark = Math.Max(this.highWatermark, value.SyncTime);
if (punctuationPolicyType == PeriodicPunctuationPolicyType.Time && !lastPunctuationTime.ContainsKey(value.PartitionKey))
lastPunctuationTime.Add(value.PartitionKey, lowWatermark);
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time && !this.lastPunctuationTime.ContainsKey(value.PartitionKey))
this.lastPunctuationTime.Add(value.PartitionKey, this.lowWatermark);
// Retrieve current time for this partition, updating currentTime if necessary
long current;
if (!currentTime.TryGetValue(value.PartitionKey, out current))
if (!this.currentTime.TryGetValue(value.PartitionKey, out current))
{
<# if (latencyOption == "WithLatency")
{#>
// We should always have a currentTime entry if we are not updating it
if (!updateCurrentTime) throw new IngressException("Partition expected to have a valid currentTime!");
<# }#>
current = lowWatermark;
current = this.lowWatermark;
}
else if (<#if (latencyOption == "WithLatency") {#>updateCurrentTime && <#}#>current < lowWatermark)
else if (<#if (latencyOption == "WithLatency") {#>updateCurrentTime && <#}#>current < this.lowWatermark)
{
current = lowWatermark;
currentTime[value.PartitionKey] = lowWatermark;
current = this.lowWatermark;
this.currentTime[value.PartitionKey] = this.lowWatermark;
}
<# } #>
var outOfOrder = value.SyncTime < current;
if (punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
{
<# if (ingressType == "StreamEvent" && partitioned)
{ #>
// Track punctuation
if (value.IsPunctuation && value.SyncTime > lastPunctuationTime[value.PartitionKey])
lastPunctuationTime[value.PartitionKey] = value.SyncTime;
if (value.IsPunctuation && value.SyncTime > this.lastPunctuationTime[value.PartitionKey])
this.lastPunctuationTime[value.PartitionKey] = value.SyncTime;
<# } #>
// out of order events shouldn't count, and if the disorder policy adjusts their sync time, then it
// will be made equal to a timestamp already seen earlier in the sequence and this would have triggered
// (if necessary) when that timestamp was seen.
// Generated punctuation is always quantized to the last generationPeriod boundary, but since the last
// punctuation time may have been explicitly ingressed, it is not necessarily quantized, so use the
// quantized value when computing the delta, otherwise, we could produce a punctuation that is before
// the previous data event.
<# if (!partitioned)
{ #>
ulong delta = (ulong)(value.SyncTime - lastPunctuationTime);
if (!outOfOrder && delta >= punctuationGenerationPeriod)
ulong delta = (ulong)(value.SyncTime - this.lastPunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)punctuationGenerationPeriod);
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod);
OnPunctuation(StreamEvent.CreatePunctuation<<#= fusionOption == "Disordered" ? TResult : TPayload #>>(punctuationTimeSnapped));
}
<# }
@ -356,11 +363,12 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
{ #>
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
ulong delta = (ulong)(value.SyncTime - Math.Max(lastPunctuationTime[value.PartitionKey], lowWatermark));
if (!outOfOrder && punctuationGenerationPeriod > 0 && delta >= punctuationGenerationPeriod)
var baselinePunctuationTime = Math.Max(this.lastPunctuationTime[value.PartitionKey], this.lowWatermark);
var delta = (ulong)(value.SyncTime - baselinePunctuationTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod));
if (!outOfOrder && this.punctuationGenerationPeriod > 0 && delta >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)punctuationGenerationPeriod);
var punctuationTimeSnapped = value.SyncTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod);
OnPunctuation(PartitionedStreamEvent.CreatePunctuation<TKey, <#= fusionOption == "Disordered" ? TResult : TPayload #>>(value.PartitionKey, punctuationTimeSnapped));
}
<# } #>
@ -380,7 +388,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# PushIndent(" ");
}
} #>
if (disorderPolicyType == DisorderPolicyType.Throw)
if (this.disorderPolicyType == DisorderPolicyType.Throw)
{
if (outOfOrder)
{
@ -400,10 +408,10 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
if (outOfOrder)
{
key = Tuple.Create(value.SyncTime, value.Payload);
if (!startEventInformation.TryGetValue(key, out q))
if (!this.startEventInformation.TryGetValue(key, out q))
{
q = new ElasticCircularBuffer<AdjustInfo>();
startEventInformation.Add(key, q);
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
}
@ -418,14 +426,14 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
}
}
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
return; // drop
}
else
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
value = new <#= partitionString #>StreamEvent<<#= adjustedGenericArgs #>>(<#= !partitioned ? string.Empty : "value.PartitionKey, " #>current, StreamEvent.InfinitySyncTime, value.Payload);
}
}
@ -435,20 +443,20 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# } #>
if (outOfOrder)
{
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
return; // drop
}
else
{
if (current >= value.OtherTime)
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
return; // drop
}
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
value = new <#= partitionString #>StreamEvent<<#= adjustedGenericArgs #>>(<#= !partitioned ? string.Empty : "value.PartitionKey, " #>current, value.OtherTime, value.Payload);
}
}
@ -459,7 +467,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
case StreamEventKind.End:
// it may not be out of order, but did we drop/adjust the corresponding start event?
key = Tuple.Create(value.OtherTime, value.Payload);
if (startEventInformation.TryGetValue(key, out q))
if (this.startEventInformation.TryGetValue(key, out q))
{
Contract.Assume(!q.IsEmpty());
var firstElement = q.PeekFirst();
@ -467,31 +475,31 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
if (firstElement.numberOfOccurrences == 0)
{
q.Dequeue(); // throw away returned value
if (q.Count == 0) startEventInformation.Remove(key);
if (q.Count == 0) this.startEventInformation.Remove(key);
}
var adjustedTime = firstElement.modifiedStartTime;
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
return; // drop
}
else
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
value = new <#= partitionString #>StreamEvent<<#= adjustedGenericArgs #>>(<#= !partitioned ? string.Empty : ("value.PartitionKey, ") #>outOfOrder ? current : value.SyncTime, adjustedTime, value.Payload);
}
}
else if (outOfOrder)
{
if (disorderPolicyType == DisorderPolicyType.Drop)
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?()));
return; // drop
}
else
{
diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
this.diagnosticOutput?.OnNext(OutOfOrder<#= partitionString #>StreamEvent.Create(<#= streamEventFromValue #>, new long?(current - value.SyncTime)));
value = new <#= partitionString #>StreamEvent<<#= adjustedGenericArgs #>>(<#= !partitioned ? string.Empty : ("value.PartitionKey, ") #>current, value.OtherTime, value.Payload);
}
}
@ -517,28 +525,39 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# if (!partitioned)
{ #>
if (currentTime < value.SyncTime) currentTime = value.SyncTime;
if (this.currentTime < value.SyncTime) this.currentTime = value.SyncTime;
<# }
else
{ #>
if (!currentTime.TryGetValue(value.PartitionKey, out long oldCurrentTime) || oldCurrentTime < value.SyncTime)
currentTime[value.PartitionKey] = value.SyncTime;
if (!this.currentTime.TryGetValue(value.PartitionKey, out long oldCurrentTime) || oldCurrentTime < value.SyncTime)
this.currentTime[value.PartitionKey] = value.SyncTime;
<# } #>
}
private void GenerateAndProcess<#= globalPunctuation #>(long syncTime)
{
if (syncTime <= <#= partitioned ? "lowWatermark" : "lastPunctuationTime" #>) return;
if (syncTime <= <#= partitioned ? "this.lowWatermark" : "this.lastPunctuationTime" #>) return;
<# if (!partitioned)
{ #>
// Update the <#= globalPunctuation #> to be at least the currentTime, so the <#= globalPunctuation #>
// is not before the preceding data event.
<# if (latencyOption == "WithLatency")
{ #>
// Note that currentTime only reflects events already processed, and excludes events in the reorder buffer.
<# } #>
syncTime = Math.Max(syncTime, this.currentTime);
<# } #>
<# if (latencyOption == "WithLatency")
{ #>
// Process events queued for reorderLatency up to the <#= globalPunctuation #> syncTime
if (priorityQueueSorter != null)
if (this.priorityQueueSorter != null)
{
<#= baseStructure #> resultEvent;
while ((!priorityQueueSorter.IsEmpty()) && priorityQueueSorter.Peek().SyncTime <= syncTime)
while ((!this.priorityQueueSorter.IsEmpty()) && this.priorityQueueSorter.Peek().SyncTime <= syncTime)
{
resultEvent = priorityQueueSorter.Dequeue();
resultEvent = this.priorityQueueSorter.Dequeue();
Process(ref resultEvent<#if (partitioned) {#>, updateCurrentTime : false<#}#>);
}
}
@ -547,7 +566,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# if (partitioned)
{ #>
bool recheck;
var PartitionedstreamEvents = impatienceSorter.DequeueUntil(syncTime);
var PartitionedstreamEvents = this.impatienceSorter.DequeueUntil(syncTime);
int index = FastDictionary<TKey, Tuple<bool, PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, <#= TPayload #>>>>>.IteratorStart;
while(<#= partitionString #>streamEvents.Iterate(ref index))
@ -559,7 +578,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
}
else
{ #>
var streamEvents = impatienceSorter.DequeueUntil(syncTime, out bool recheck);
var streamEvents = this.impatienceSorter.DequeueUntil(syncTime, out bool recheck);
<# } #>
if (streamEvents != null)
{
@ -569,7 +588,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
resultEvent = streamEvents.Dequeue();
Process(ref resultEvent<#if (partitioned) {#>, updateCurrentTime : false<#}#>);
}
if (!recheck) impatienceSorter.Return(<#= !partitioned ? string.Empty : "entry.key , " #> streamEvents);
if (!recheck) this.impatienceSorter.Return(<#= !partitioned ? string.Empty : "entry.key , " #> streamEvents);
}
<# if (partitioned)
{
@ -580,19 +599,19 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
<# } #>
// Update cached global times
highWatermark = Math.Max(syncTime, highWatermark);
this.highWatermark = Math.Max(syncTime, this.highWatermark);
<# if (partitioned)
{ #>
if (lowWatermark < syncTime)
if (this.lowWatermark < syncTime)
{
lowWatermark = syncTime;
this.lowWatermark = syncTime;
// Gather keys whose high watermarks are before the new low watermark
var expiredWatermarkKVPs = new List<KeyValuePair<long, HashSet<TKey>>>();
foreach (var keyValuePair in highWatermarkToPartitionsMap)
foreach (var keyValuePair in this.highWatermarkToPartitionsMap)
{
// Since highWatermarkToPartitionsMap is sorted, we can stop as soon as we reach the threshold
if (keyValuePair.Key >= lowWatermark) break;
if (keyValuePair.Key >= this.lowWatermark) break;
expiredWatermarkKVPs.Add(keyValuePair);
}
@ -601,41 +620,41 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
foreach (var expiredWatermarkKVP in expiredWatermarkKVPs)
{
var expiredWatermark = expiredWatermarkKVP.Key;
highWatermarkToPartitionsMap.Remove(expiredWatermark);
this.highWatermarkToPartitionsMap.Remove(expiredWatermark);
var expiredKeys = expiredWatermarkKVP.Value;
foreach (var expiredKey in expiredKeys)
{
lastPunctuationTime.Remove(expiredKey);
partitionHighWatermarks.Remove(expiredKey);
currentTime.Remove(expiredKey);
this.lastPunctuationTime.Remove(expiredKey);
this.partitionHighWatermarks.Remove(expiredKey);
this.currentTime.Remove(expiredKey);
}
}
}
<# }
else
{ #>
currentTime = Math.Max(syncTime, currentTime);
lastPunctuationTime = Math.Max(syncTime, lastPunctuationTime);
this.currentTime = Math.Max(syncTime, this.currentTime);
this.lastPunctuationTime = Math.Max(syncTime, this.lastPunctuationTime);
<# } #>
// Add <#= globalPunctuation #> to batch
var count = currentBatch.Count;
currentBatch.vsync.col[count] = syncTime;
currentBatch.vother.col[count] = <#= partitionString #>StreamEvent.<#= globalPunctuation #>OtherTime;
currentBatch.bitvector.col[count >> 6] |= (1L << (count & 0x3f));
currentBatch.key.col[count] = default;
currentBatch[count] = default;
currentBatch.hash.col[count] = 0;
currentBatch.Count = count + 1;
var count = this.currentBatch.Count;
this.currentBatch.vsync.col[count] = syncTime;
this.currentBatch.vother.col[count] = <#= partitionString #>StreamEvent.<#= globalPunctuation #>OtherTime;
this.currentBatch.bitvector.col[count >> 6] |= (1L << (count & 0x3f));
this.currentBatch.key.col[count] = default;
this.currentBatch[count] = default;
this.currentBatch.hash.col[count] = 0;
this.currentBatch.Count = count + 1;
// Flush if necessary
if (flushPolicy == <#= partitionString #>FlushPolicy.FlushOn<#= globalPunctuation #> ||
(flushPolicy == <#= partitionString #>FlushPolicy.FlushOnBatchBoundary && currentBatch.Count == Config.DataBatchSize))
if (this.flushPolicy == <#= partitionString #>FlushPolicy.FlushOn<#= globalPunctuation #> ||
(this.flushPolicy == <#= partitionString #>FlushPolicy.FlushOnBatchBoundary && this.currentBatch.Count == Config.DataBatchSize))
{
OnFlush();
}
else if (currentBatch.Count == Config.DataBatchSize)
else if (this.currentBatch.Count == Config.DataBatchSize)
{
FlushContents();
}
@ -645,12 +664,12 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
{ #>
protected override void UpdatePointers()
{
foreach (var kvp in partitionHighWatermarks)
foreach (var kvp in this.partitionHighWatermarks)
{
if (highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet<TKey> set))
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet<TKey> set))
set.Add(kvp.Key);
else
highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet<TKey> { kvp.Key });
this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet<TKey> { kvp.Key });
}
}
@ -659,7 +678,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
{
GenerateAndProcess<#= globalPunctuation #>(punctuationTime);
// Flush, but if we just flushed due to the punctuation generated above
if (flushPolicy != <#= partitionString #>FlushPolicy.FlushOn<#= globalPunctuation #>)
if (this.flushPolicy != <#= partitionString #>FlushPolicy.FlushOn<#= globalPunctuation #>)
OnFlush();
}
}
@ -670,11 +689,11 @@ private void AddToGeneratedBatch()
#>
{
<#= GeneratedBatchName #> generatedBatch = (<#= GeneratedBatchName #>) this.currentBatch;
var count = currentBatch.Count;
currentBatch.vsync.col[count] = value.SyncTime;
currentBatch.vother.col[count] = <#= generatedEndTimeVariable #>;
currentBatch.key.col[count] = <#= emptyOrPartition #>;
currentBatch.hash.col[count] = <#= partitionString == "Partitioned" ? "GetHashCode(value.PartitionKey)" : "0" #>;
var count = this.currentBatch.Count;
this.currentBatch.vsync.col[count] = value.SyncTime;
this.currentBatch.vother.col[count] = <#= generatedEndTimeVariable #>;
this.currentBatch.key.col[count] = <#= emptyOrPartition #>;
this.currentBatch.hash.col[count] = <#= partitionString == "Partitioned" ? "GetHashCode(value.PartitionKey)" : "0" #>;
<#+ if (resultRepresentation.noFields)
{ #>
generatedBatch.payload.col[count] = <#= valueString #>;
@ -709,10 +728,10 @@ private void AddToGeneratedBatch()
}
<#+ } #>
currentBatch.Count++;
if (currentBatch.Count == Config.DataBatchSize)
this.currentBatch.Count++;
if (this.currentBatch.Count == Config.DataBatchSize)
{
if (flushPolicy == <#= partitionString #>FlushPolicy.FlushOnBatchBoundary) OnFlush();
if (this.flushPolicy == <#= partitionString #>FlushPolicy.FlushOnBatchBoundary) OnFlush();
else FlushContents();
}
}

Просмотреть файл

@ -58,7 +58,7 @@ namespace Microsoft.StreamProcessing
}
this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];
if (first) lastLeftTime = MaxBatchSyncTime(leftBatch);
if (first) lastLeftTime = leftBatch.vsync.col[leftBatch.Count - 1];
first = (rightBatch.iter == 0);
if (!GoToVisibleRow(rightBatch))
@ -70,7 +70,7 @@ namespace Microsoft.StreamProcessing
}
this.nextRightTime = rightBatch.vsync.col[rightBatch.iter];
if (first) lastRightTime = MaxBatchSyncTime(rightBatch);
if (first) lastRightTime = rightBatch.vsync.col[rightBatch.Count - 1];
if ((lastLeftTime != -1) && (lastRightTime != -1))
{
@ -133,8 +133,7 @@ namespace Microsoft.StreamProcessing
isBatchFree = true;
if (batch.iter == 0)
{
long maxLeftTime = MaxBatchSyncTime(batch);
if (maxLeftTime <= nextRightTime)
if (batch.vsync.col[batch.Count - 1] <= this.nextRightTime)
{
OutputBatch(batch);
isBatchDone = true;
@ -171,8 +170,7 @@ namespace Microsoft.StreamProcessing
isBatchFree = true;
if (batch.iter == 0)
{
long maxRightTime = MaxBatchSyncTime(batch);
if (Config.DeterministicWithinTimestamp ? (maxRightTime < nextLeftTime) : (maxRightTime <= nextLeftTime))
if (Config.DeterministicWithinTimestamp ? (batch.vsync.col[batch.Count - 1] < this.nextLeftTime) : (batch.vsync.col[batch.Count - 1] <= this.nextLeftTime))
{
OutputBatch(batch);
isBatchDone = true;
@ -212,31 +210,6 @@ namespace Microsoft.StreamProcessing
return (batch.iter != batch.Count);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static long MaxBatchSyncTime(StreamMessage<TKey, TPayload> batch)
{
// Punctuations are not necessarily monotonically increasing with respect to data events, so we cannot
// simply retrieve the last event.
// Iterate from end of batch until we hit a data event, then take the maximum of the events we have seen.
long max = -1;
for (int i = batch.Count - 1; i >= 0; i--)
{
// Skip deleted data events
if ((batch.bitvector.col[i >> 6] & (1L << (i & 0x3f))) != 0 && batch.vother.col[i] >= 0)
{
continue;
}
max = Math.Max(max, batch.vsync.col[i]);
if (batch.vother.col[i] != StreamEvent.PunctuationOtherTime)
{
break;
}
}
return max;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OutputCurrentTuple(StreamMessage<TKey, TPayload> batch)
{

Просмотреть файл

@ -130,34 +130,34 @@ using Microsoft.StreamProcessing.Internal.Collections;
" bool first = (leftBatch.iter == 0);\r\n if (!GoToVisibleRow(leftBatch" +
"))\r\n {\r\n leftBatchDone = true;\r\n rightBatchDone = f" +
"alse;\r\n return;\r\n }\r\n\r\n this.nextLeftTime = leftBatch.v" +
"sync.col[leftBatch.iter];\r\n if (first) lastLeftTime = MaxBatchSyncTime(le" +
"ftBatch);\r\n\r\n first = (rightBatch.iter == 0);\r\n if (!GoToVisibleRo" +
"w(rightBatch))\r\n {\r\n leftBatchDone = false;\r\n right" +
"BatchDone = true;\r\n\r\n return;\r\n }\r\n\r\n this.nextRightTim" +
"e = rightBatch.vsync.col[rightBatch.iter];\r\n if (first) lastRightTime = M" +
"axBatchSyncTime(rightBatch);\r\n\r\n if ((lastLeftTime != -1) && (lastRightTi" +
"me != -1))\r\n {\r\n leftBatchDone = rightBatchDone = false;\r\n " +
" if (lastLeftTime <= this.nextRightTime)\r\n {\r\n " +
"OutputBatch(leftBatch);\r\n leftBatchDone = true;\r\n " +
"leftBatchFree = false;\r\n }\r\n\r\n if (Config.DeterministicWit" +
"hinTimestamp ? (lastRightTime < this.nextLeftTime) : (lastRightTime <= this.next" +
"LeftTime))\r\n {\r\n OutputBatch(rightBatch);\r\n " +
" rightBatchDone = true;\r\n rightBatchFree = false;\r\n " +
" }\r\n\r\n if (leftBatchDone || rightBatchDone) return;\r\n }\r\n\r\n " +
" while (true)\r\n {\r\n if (this.nextLeftTime <= this.nextRig" +
"htTime)\r\n {\r\n OutputCurrentTuple(leftBatch);\r\n\r\n " +
" leftBatch.iter++;\r\n\r\n if (!GoToVisibleRow(leftBatch))\r\n" +
" {\r\n leftBatchDone = true;\r\n " +
" rightBatchDone = false;\r\n return;\r\n }\r\n\r\n " +
" this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];\r\n " +
" }\r\n else\r\n {\r\n OutputCurrentTuple(rightB" +
"atch);\r\n\r\n rightBatch.iter++;\r\n\r\n if (!GoToVisible" +
"Row(rightBatch))\r\n {\r\n leftBatchDone = false;\r" +
"\n rightBatchDone = true;\r\n return;\r\n " +
" }\r\n\r\n this.nextRightTime = rightBatch.vsync.col[rightBa" +
"tch.iter];\r\n }\r\n }\r\n }\r\n\r\n [MethodImpl(MethodImplOptions" +
".AggressiveInlining)]\r\n protected override void ProcessLeftBatch(StreamMessag" +
"e<");
"sync.col[leftBatch.iter];\r\n if (first) lastLeftTime = leftBatch.vsync.col" +
"[leftBatch.Count - 1];\r\n\r\n first = (rightBatch.iter == 0);\r\n if (!" +
"GoToVisibleRow(rightBatch))\r\n {\r\n leftBatchDone = false;\r\n " +
" rightBatchDone = true;\r\n\r\n return;\r\n }\r\n\r\n this" +
".nextRightTime = rightBatch.vsync.col[rightBatch.iter];\r\n if (first) last" +
"RightTime = rightBatch.vsync.col[rightBatch.Count - 1];\r\n\r\n if ((lastLeft" +
"Time != -1) && (lastRightTime != -1))\r\n {\r\n leftBatchDone = ri" +
"ghtBatchDone = false;\r\n if (lastLeftTime <= this.nextRightTime)\r\n " +
" {\r\n OutputBatch(leftBatch);\r\n leftBatchDon" +
"e = true;\r\n leftBatchFree = false;\r\n }\r\n\r\n " +
"if (Config.DeterministicWithinTimestamp ? (lastRightTime < this.nextLeftTime) : " +
"(lastRightTime <= this.nextLeftTime))\r\n {\r\n OutputBatc" +
"h(rightBatch);\r\n rightBatchDone = true;\r\n rightBat" +
"chFree = false;\r\n }\r\n\r\n if (leftBatchDone || rightBatchDon" +
"e) return;\r\n }\r\n\r\n while (true)\r\n {\r\n if (this.n" +
"extLeftTime <= this.nextRightTime)\r\n {\r\n OutputCurrent" +
"Tuple(leftBatch);\r\n\r\n leftBatch.iter++;\r\n\r\n if (!G" +
"oToVisibleRow(leftBatch))\r\n {\r\n leftBatchDone " +
"= true;\r\n rightBatchDone = false;\r\n return" +
";\r\n }\r\n\r\n this.nextLeftTime = leftBatch.vsync.col[" +
"leftBatch.iter];\r\n }\r\n else\r\n {\r\n " +
" OutputCurrentTuple(rightBatch);\r\n\r\n rightBatch.iter++;\r\n\r\n " +
" if (!GoToVisibleRow(rightBatch))\r\n {\r\n " +
" leftBatchDone = false;\r\n rightBatchDone = true;\r\n " +
" return;\r\n }\r\n\r\n this.nextRightTime = r" +
"ightBatch.vsync.col[rightBatch.iter];\r\n }\r\n }\r\n }\r\n\r\n [M" +
"ethodImpl(MethodImplOptions.AggressiveInlining)]\r\n protected override void Pr" +
"ocessLeftBatch(StreamMessage<");
this.Write(this.ToStringHelper.ToStringWithCulture(TKey));
this.Write(", ");
this.Write(this.ToStringHelper.ToStringWithCulture(TPayload));
@ -166,8 +166,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
isBatchFree = true;
if (batch.iter == 0)
{
long maxLeftTime = MaxBatchSyncTime(batch);
if (maxLeftTime <= nextRightTime)
if (batch.vsync.col[batch.Count - 1] <= this.nextRightTime)
{
OutputBatch(batch);
isBatchDone = true;
@ -208,8 +207,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
isBatchFree = true;
if (batch.iter == 0)
{
long maxRightTime = MaxBatchSyncTime(batch);
if (Config.DeterministicWithinTimestamp ? (maxRightTime < nextLeftTime) : (maxRightTime <= nextLeftTime))
if (Config.DeterministicWithinTimestamp ? (batch.vsync.col[batch.Count - 1] < this.nextLeftTime) : (batch.vsync.col[batch.Count - 1] <= this.nextLeftTime))
{
OutputBatch(batch);
isBatchDone = true;
@ -253,35 +251,6 @@ using Microsoft.StreamProcessing.Internal.Collections;
return batch.iter != batch.Count;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static long MaxBatchSyncTime(StreamMessage<");
this.Write(this.ToStringHelper.ToStringWithCulture(TKey));
this.Write(", ");
this.Write(this.ToStringHelper.ToStringWithCulture(TPayload));
this.Write(@"> batch)
{
// Punctuations are not necessarily monotonically increasing with respect to data events, so we cannot
// simply retrieve the last event.
// Iterate from end of batch until we hit a data event, then take the maximum of the events we have seen.
long max = -1;
for (int i = batch.Count - 1; i >= 0; i--)
{
// Skip deleted data events
if ((batch.bitvector.col[i >> 6] & (1L << (i & 0x3f))) != 0 && batch.vother.col[i] >= 0)
{
continue;
}
max = Math.Max(max, batch.vsync.col[i]);
if (batch.vother.col[i] != StreamEvent.PunctuationOtherTime)
{
break;
}
}
return max;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OutputCurrentTuple(StreamMessage<");
this.Write(this.ToStringHelper.ToStringWithCulture(TKey));

Просмотреть файл

@ -84,7 +84,7 @@ internal sealed class <#= className #><#= genericParameters #> : BinaryPipe<<#=
}
this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];
if (first) lastLeftTime = MaxBatchSyncTime(leftBatch);
if (first) lastLeftTime = leftBatch.vsync.col[leftBatch.Count - 1];
first = (rightBatch.iter == 0);
if (!GoToVisibleRow(rightBatch))
@ -96,7 +96,7 @@ internal sealed class <#= className #><#= genericParameters #> : BinaryPipe<<#=
}
this.nextRightTime = rightBatch.vsync.col[rightBatch.iter];
if (first) lastRightTime = MaxBatchSyncTime(rightBatch);
if (first) lastRightTime = rightBatch.vsync.col[rightBatch.Count - 1];
if ((lastLeftTime != -1) && (lastRightTime != -1))
{
@ -159,8 +159,7 @@ internal sealed class <#= className #><#= genericParameters #> : BinaryPipe<<#=
isBatchFree = true;
if (batch.iter == 0)
{
long maxLeftTime = MaxBatchSyncTime(batch);
if (maxLeftTime <= nextRightTime)
if (batch.vsync.col[batch.Count - 1] <= this.nextRightTime)
{
OutputBatch(batch);
isBatchDone = true;
@ -197,8 +196,7 @@ internal sealed class <#= className #><#= genericParameters #> : BinaryPipe<<#=
isBatchFree = true;
if (batch.iter == 0)
{
long maxRightTime = MaxBatchSyncTime(batch);
if (Config.DeterministicWithinTimestamp ? (maxRightTime < nextLeftTime) : (maxRightTime <= nextLeftTime))
if (Config.DeterministicWithinTimestamp ? (batch.vsync.col[batch.Count - 1] < this.nextLeftTime) : (batch.vsync.col[batch.Count - 1] <= this.nextLeftTime))
{
OutputBatch(batch);
isBatchDone = true;
@ -238,31 +236,6 @@ internal sealed class <#= className #><#= genericParameters #> : BinaryPipe<<#=
return batch.iter != batch.Count;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static long MaxBatchSyncTime(StreamMessage<<#= TKey #>, <#= TPayload #>> batch)
{
// Punctuations are not necessarily monotonically increasing with respect to data events, so we cannot
// simply retrieve the last event.
// Iterate from end of batch until we hit a data event, then take the maximum of the events we have seen.
long max = -1;
for (int i = batch.Count - 1; i >= 0; i--)
{
// Skip deleted data events
if ((batch.bitvector.col[i >> 6] & (1L << (i & 0x3f))) != 0 && batch.vother.col[i] >= 0)
{
continue;
}
max = Math.Max(max, batch.vsync.col[i]);
if (batch.vother.col[i] != StreamEvent.PunctuationOtherTime)
{
break;
}
}
return max;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OutputCurrentTuple(StreamMessage<<#= TKey #>, <#= TPayload #>> batch)
{

Просмотреть файл

@ -1,5 +1,5 @@
// *********************************************************************
// Copyright (C) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System;

Просмотреть файл

@ -20,6 +20,29 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace SimpleTesting
{
public static class Helpers
{
public static void RunTwiceForRowAndColumnar(Action action)
{
var savedForceRowBasedExecution = Config.ForceRowBasedExecution;
var savedRowFallback = Config.CodegenOptions.DontFallBackToRowBasedExecution;
try
{
foreach (var rowBased in new bool[] { true, false })
{
Config.ForceRowBasedExecution = rowBased;
Config.CodegenOptions.DontFallBackToRowBasedExecution = !rowBased;
action();
}
}
finally
{
Config.ForceRowBasedExecution = savedForceRowBasedExecution;
Config.CodegenOptions.DontFallBackToRowBasedExecution = savedRowFallback;
}
}
}
[TestClass]
public class AdHoc : TestWithConfigSettingsAndMemoryLeakDetection
{
@ -585,6 +608,68 @@ namespace SimpleTesting
}
}
}
[TestMethod, TestCategory("Gated")]
public void ExplicitAndGeneratedPunctuations()
{
Helpers.RunTwiceForRowAndColumnar(() =>
{
var input = new StreamEvent<int>[]
{
StreamEvent.CreateInterval(90, 91, 0),
StreamEvent.CreatePunctuation<int>(80),
StreamEvent.CreateInterval(110, 111, 0),
StreamEvent.CreateInterval(195, 200, 0),
}.ToObservable().ToStreamable(periodicPunctuationPolicy: PeriodicPunctuationPolicy.Time(100));
var output = new List<StreamEvent<int>>();
input.ToStreamEventObservable()
.ForEachAsync(x => output.Add(x))
.Wait();
var expected = new List<StreamEvent<int>>
{
StreamEvent.CreateInterval(90, 91, 0),
StreamEvent.CreatePunctuation<int>(90), // Explicitly ingressed punctuation is adjusted to currentTime
StreamEvent.CreatePunctuation<int>(100), // Generated punctuation based on quantized previous punctuation
StreamEvent.CreateInterval(110, 111, 0),
StreamEvent.CreateInterval(195, 200, 0),
StreamEvent.CreatePunctuation<int>(StreamEvent.InfinitySyncTime)
};
Assert.IsTrue(expected.SequenceEqual(output));
});
}
[TestMethod, TestCategory("Gated")]
public void ExplicitAndGeneratedLowWatermarks()
{
var input = new PartitionedStreamEvent<int, int>[]
{
PartitionedStreamEvent.CreateInterval(0, 90, 91, 0),
PartitionedStreamEvent.CreateLowWatermark<int, int>(80),
PartitionedStreamEvent.CreateInterval(0, 110, 111, 0),
PartitionedStreamEvent.CreateInterval(0, 195, 200, 0),
}.ToObservable()
.ToStreamable(periodicLowWatermarkPolicy: PeriodicLowWatermarkPolicy.Time(generationPeriod: 100, lowWatermarkTimestampLag: 0));
var output = new List<PartitionedStreamEvent<int, int>>();
input.ToStreamEventObservable()
.ForEachAsync(x => output.Add(x))
.Wait();
var expected = new List<PartitionedStreamEvent<int, int>>
{
PartitionedStreamEvent.CreateInterval(0, 90, 91, 0),
PartitionedStreamEvent.CreateLowWatermark<int, int>(80), // Explicitly ingressed low watermark
PartitionedStreamEvent.CreateLowWatermark<int, int>(100), // Generated punctuation based on quantized previous lowWatermark
PartitionedStreamEvent.CreateInterval(0, 110, 111, 0),
PartitionedStreamEvent.CreateInterval(0, 195, 200, 0),
PartitionedStreamEvent.CreateLowWatermark<int, int>(PartitionedStreamEvent.InfinitySyncTime)
};
Assert.IsTrue(expected.SequenceEqual(output));
}
}
[TestClass]
@ -1442,34 +1527,37 @@ namespace SimpleTesting
[TestMethod, TestCategory("Gated")]
public void UnionWithBatchEndingInPastPunctuation()
{
var leftInput = new StreamEvent<int>[]
Helpers.RunTwiceForRowAndColumnar(() =>
{
StreamEvent.CreateInterval(100, 101, 0),
StreamEvent.CreatePunctuation<int>(50)
}.ToObservable().ToStreamable();
var rightInput = new StreamEvent<int>[]
{
StreamEvent.CreateInterval(50, 51, 0),
StreamEvent.CreatePunctuation<int>(200)
}.ToObservable().ToStreamable();
var leftInput = new StreamEvent<int>[]
{
StreamEvent.CreateInterval(100, 101, 0),
StreamEvent.CreatePunctuation<int>(50)
}.ToObservable().ToStreamable();
var rightInput = new StreamEvent<int>[]
{
StreamEvent.CreateInterval(50, 51, 0),
StreamEvent.CreatePunctuation<int>(200)
}.ToObservable().ToStreamable();
var query = leftInput.Union(rightInput);
var query = leftInput.Union(rightInput);
var output = new List<StreamEvent<int>>();
query.ToStreamEventObservable()
.ForEachAsync(x => output.Add(x))
.Wait();
var output = new List<StreamEvent<int>>();
query.ToStreamEventObservable()
.ForEachAsync(x => output.Add(x))
.Wait();
var expected = new List<StreamEvent<int>>
{
StreamEvent.CreateInterval(50, 51, 0),
StreamEvent.CreateInterval(100, 101, 0),
StreamEvent.CreatePunctuation<int>(50),
StreamEvent.CreatePunctuation<int>(200),
StreamEvent.CreatePunctuation<int>(StreamEvent.InfinitySyncTime)
};
var expected = new List<StreamEvent<int>>
{
StreamEvent.CreateInterval(50, 51, 0),
StreamEvent.CreateInterval(100, 101, 0),
StreamEvent.CreatePunctuation<int>(100),
StreamEvent.CreatePunctuation<int>(200),
StreamEvent.CreatePunctuation<int>(StreamEvent.InfinitySyncTime)
};
Assert.IsTrue(expected.SequenceEqual(output));
Assert.IsTrue(expected.SequenceEqual(output));
});
}
}

Просмотреть файл

@ -108,7 +108,7 @@ namespace SimpleTesting.PartitionedIngressAndEgress
if (interruptingWatermarks.Any())
{
last = (int)interruptingWatermarks.Last().LowWatermark.StartTime;
last = (int)interruptingWatermarks.Last().LowWatermark.StartTime.SnapToLeftBoundary((long)this.punctuationPolicy.generationPeriod);
if (t - last < (uint)this.punctuationPolicy.generationPeriod)
{
continue;