Merge pull request #18 from Microsoft/IssuesFoundDuringSampleCleanup

Fixing issues uncovered during Sample cleanup
This commit is contained in:
James Terwilliger 2018-12-13 17:45:17 -08:00 коммит произвёл GitHub
Родитель 74415d9be0 a662194b28
Коммит ae058e7a7a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 251 добавлений и 67 удалений

Просмотреть файл

@ -111,6 +111,23 @@ namespace Microsoft.StreamProcessing
queue.Add(payload);
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<TPayload>(this.array, 0, this.arrayLength));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => this.toDelete.Values.Select(o => o.Count).Sum();

Просмотреть файл

@ -76,24 +76,24 @@ namespace Microsoft.StreamProcessing.Internal
public void OnError(Exception error)
{
if (this is IDisposable disposable) disposable.Dispose();
observer.OnError(error);
this.observer.OnError(error);
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public void OnCompleted()
public virtual void OnCompleted()
{
if (this is IDisposable disposable) disposable.Dispose();
observer.OnCompleted();
this.observer.OnCompleted();
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public void OnFlush() { }
public virtual void OnFlush() { }
/// <summary>
/// Currently for internal use only - do not use directly.

Просмотреть файл

@ -60,6 +60,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<StreamEvent<TPayload>>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;
@ -118,6 +135,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<TResult>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;
@ -174,6 +208,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<TResult>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;
@ -232,6 +283,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<PartitionedStreamEvent<TKey, TPayload>>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;
@ -291,6 +359,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<TResult>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;
@ -348,6 +433,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<TResult>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;

Просмотреть файл

@ -131,6 +131,23 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<<#= egress #>>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;

Просмотреть файл

@ -40,9 +40,9 @@ internal sealed class ");
this.Write(this.ToStringHelper.ToStringWithCulture(inputKey));
this.Write(", ");
this.Write(this.ToStringHelper.ToStringWithCulture(TPayload));
this.Write(">, IDisposable\r\n{\r\n private readonly IObserver<ArraySegment<");
this.Write(", ArraySegment<");
this.Write(this.ToStringHelper.ToStringWithCulture(egress));
this.Write(">> observer;\r\n private readonly Func<");
this.Write(">\r\n{\r\n private readonly Func<");
this.Write(this.ToStringHelper.ToStringWithCulture(egress));
this.Write("[]> generator;\r\n [DataMember]\r\n private ");
this.Write(this.ToStringHelper.ToStringWithCulture(egress));
@ -59,7 +59,7 @@ internal sealed class ");
this.Write("[]> generator,\r\n IObserver<ArraySegment<");
this.Write(this.ToStringHelper.ToStringWithCulture(egress));
this.Write(@">> observer)
: base()
: base(observer, null)
{
this.generator = generator;
this.array = this.generator();
@ -67,13 +67,7 @@ internal sealed class ");
this.observer = observer;
}
public void Dispose()
{
if (this.populationCount > 0) this.observer.OnNext(new ArraySegment<");
this.Write(this.ToStringHelper.ToStringWithCulture(egress));
this.Write(">(this.array, 0, this.populationCount));\r\n this.populationCount = 0;\r\n " +
" this.array = null;\r\n this.arrayLength = 0;\r\n }\r\n\r\n public overr" +
"ide void OnNext(StreamMessage<");
public override void OnNext(StreamMessage<");
this.Write(this.ToStringHelper.ToStringWithCulture(inputKey));
this.Write(", ");
this.Write(this.ToStringHelper.ToStringWithCulture(TPayload));
@ -144,6 +138,25 @@ internal sealed class ");
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<");
this.Write(this.ToStringHelper.ToStringWithCulture(egress));
this.Write(@">(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;

Просмотреть файл

@ -11,9 +11,8 @@ using System.Runtime.Serialization;
using Microsoft.StreamProcessing.Internal;
[DataContract]
internal sealed class <#= partitionString #><#= ingressType #>EgressPipe : EgressBoundary<<#= inputKey #>, <#= TPayload #>>, IDisposable
internal sealed class <#= partitionString #><#= ingressType #>EgressPipe : EgressBoundary<<#= inputKey #>, <#= TPayload #>, ArraySegment<<#= egress #>>
{
private readonly IObserver<ArraySegment<<#= egress #>>> observer;
private readonly Func<<#= egress #>[]> generator;
[DataMember]
private <#= egress #>[] array;
@ -28,7 +27,7 @@ internal sealed class <#= partitionString #><#= ingressType #>EgressPipe : Egres
public <#= partitionString #><#= ingressType #>EgressPipe(
Func<<#= egress #>[]> generator,
IObserver<ArraySegment<<#= egress #>>> observer)
: base()
: base(observer, null)
{
this.generator = generator;
this.array = this.generator();
@ -36,14 +35,6 @@ internal sealed class <#= partitionString #><#= ingressType #>EgressPipe : Egres
this.observer = observer;
}
public void Dispose()
{
if (this.populationCount > 0) this.observer.OnNext(new ArraySegment<<#= egress #>>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = null;
this.arrayLength = 0;
}
public override void OnNext(StreamMessage<<#= inputKey #>, <#= TPayload #>> batch)
{
<# if (!string.IsNullOrEmpty(partitionString))
@ -96,6 +87,23 @@ internal sealed class <#= partitionString #><#= ingressType #>EgressPipe : Egres
batch.Free();
}
public override void OnCompleted()
{
OnFlush();
base.OnCompleted();
}
public override void OnFlush()
{
if (this.populationCount > 0)
{
this.observer.OnNext(new ArraySegment<<#= egress #>>(this.array, 0, this.populationCount));
this.populationCount = 0;
this.array = this.generator();
this.arrayLength = this.array.Length;
}
}
public override int CurrentlyBufferedOutputCount => this.populationCount;
public override int CurrentlyBufferedInputCount => 0;

Просмотреть файл

@ -144,15 +144,15 @@ namespace Microsoft.StreamProcessing
switch (this.Kind)
{
case StreamEventKind.Start:
return string.Format(CultureInfo.InvariantCulture, "[Start: {0},{1}]", TimeAsString(this.SyncTime), this.Payload.ToString());
return $"[{this.Kind}: {TimeAsString(this.SyncTime)},{this.Payload}]";
case StreamEventKind.End:
return string.Format(CultureInfo.InvariantCulture, "[End: {0},{1},{2}]", TimeAsString(this.SyncTime), TimeAsString(this.OtherTime), this.Payload.ToString());
return $"[{this.Kind}: {TimeAsString(this.SyncTime)},{TimeAsString(this.OtherTime)},{this.Payload}]";
case StreamEventKind.Interval:
return string.Format(CultureInfo.InvariantCulture, "[Interval: {0}-{1},{2}]", TimeAsString(this.SyncTime), TimeAsString(this.OtherTime), this.Payload.ToString());
return $"[{this.Kind}: {TimeAsString(this.SyncTime)}-{TimeAsString(this.OtherTime)},{this.Payload}]";
case StreamEventKind.Punctuation:
return string.Format(CultureInfo.InvariantCulture, "[Punc: {0}]", TimeAsString(this.SyncTime));
return $"[{this.Kind}: {TimeAsString(this.SyncTime)}]";
case StreamEventKind.LowWatermark:
return string.Format(CultureInfo.InvariantCulture, "[Low Watermark: {0}]", TimeAsString(this.SyncTime));
return $"[{this.Kind}: {TimeAsString(this.SyncTime)}]";
}
return string.Empty;
}
@ -312,19 +312,18 @@ namespace Microsoft.StreamProcessing
/// <returns>A string representing the event for display</returns>
public override string ToString()
{
string fragment = "(" + this.PartitionKey.ToString() + ")";
switch (this.Kind)
{
case StreamEventKind.Start:
return string.Format(CultureInfo.InvariantCulture, fragment + "[Start: {0},{1}]", TimeAsString(this.SyncTime), this.Payload.ToString());
return $"({this.PartitionKey})[{this.Kind}: {TimeAsString(this.SyncTime)},{this.Payload}]";
case StreamEventKind.End:
return string.Format(CultureInfo.InvariantCulture, fragment + "[End: {0},{1},{2}]", TimeAsString(this.SyncTime), TimeAsString(this.OtherTime), this.Payload.ToString());
return $"({this.PartitionKey})[{this.Kind}: {TimeAsString(this.SyncTime)},{TimeAsString(this.OtherTime)},{this.Payload}]";
case StreamEventKind.Interval:
return string.Format(CultureInfo.InvariantCulture, fragment + "[Interval: {0}-{1},{2}]", TimeAsString(this.SyncTime), TimeAsString(this.OtherTime), this.Payload.ToString());
return $"({this.PartitionKey})[{this.Kind}: {TimeAsString(this.SyncTime)}-{TimeAsString(this.OtherTime)},{this.Payload}]";
case StreamEventKind.Punctuation:
return string.Format(CultureInfo.InvariantCulture, fragment + "[Punc: {0}]", TimeAsString(this.SyncTime));
return $"({this.PartitionKey})[{this.Kind}: {TimeAsString(this.SyncTime)}]";
case StreamEventKind.LowWatermark:
return string.Format(CultureInfo.InvariantCulture, "[Low Watermark: {0}]", TimeAsString(this.SyncTime));
return $"[{this.Kind}: {TimeAsString(this.SyncTime)}]";
}
return string.Empty;
}

Просмотреть файл

@ -21,7 +21,9 @@ namespace Microsoft.StreamProcessing
string[] keyParameter = new string[] { string.Empty, "TKey key, " };
string[] keyArgument = new string[] { string.Empty, "key, " };
string[] toStringFragment = new string[] { string.Empty, "fragment + " };
for (int i = 0; i < classNames.Length; i++) {
for (int i = 0; i < classNames.Length; i++)
{
bool partitioned = (i == 1);
#>
internal class <#= classNames[i] #>SyncTimeComparer<<#= genericParameters[i] #>> : IComparer<<#= classNames[i] #><<#= genericParameters[i] #>>>
@ -33,20 +35,24 @@ namespace Microsoft.StreamProcessing
}
/// <summary>
/// Represents <#= (i == 1) ? "a partitioned" : "an unpartitioned" #> Stream event
/// Represents <#= partitioned ? "a partitioned" : "an unpartitioned" #> Stream event
/// </summary>
<# if (i == 1) { #> /// <typeparam name="TKey">Type of payload for the event</typeparam>
<# } #> /// <typeparam name="TPayload">Type of payload for the event</typeparam>
<# if (partitioned)
{ #>
/// <typeparam name="TKey">Type of payload for the event</typeparam>
<# } #>
/// <typeparam name="TPayload">Type of payload for the event</typeparam>
[DataContract]
public struct <#= classNames[i] #><<#= genericParameters[i] #>>
{
<# if (i == 1) { #>
<# if (partitioned)
{ #>
/// <summary>
/// Partition key for the event
/// </summary>
[DataMember]
public TKey PartitionKey;
<# } #>
<# } #>
/// <summary>
/// Start-time for the event
@ -109,7 +115,7 @@ namespace Microsoft.StreamProcessing
/// </summary>
public bool IsPunctuation => this.OtherTime == StreamEvent.PunctuationOtherTime;
<# if (i == 1)
<# if (partitioned)
{ #>
/// <summary>
/// Check if the event is a punctuation
@ -155,17 +161,19 @@ namespace Microsoft.StreamProcessing
/// <summary>
/// Creates a new stream event with the given temporal parameters.
/// </summary>
<# if (i == 1) { #>
<# if (partitioned)
{ #>
/// <param name="key">The partition key value to which this event belongs.</param>
<# } #>
<# } #>
/// <param name="syncTime">The sync time for this event. This value corresponds to the start time for a start edge or interval and the end time for an end edge.</param>
/// <param name="otherTime">The other associated time for this events. For intervals, this value is the end time. For an end edge, this value identifies when the value started.</param>
/// <param name="payload">The actual event associated with these temporal parameters.</param>
public <#= classNames[i] #>(<#= keyParameter[i] #>long syncTime, long otherTime, TPayload payload)
{
<# if (i == 1) { #>
<# if (partitioned)
{ #>
this.PartitionKey = key;
<# } #>
<# } #>
this.SyncTime = syncTime;
this.OtherTime = otherTime;
this.Payload = payload;
@ -177,21 +185,18 @@ namespace Microsoft.StreamProcessing
/// <returns>A string representing the event for display</returns>
public override string ToString()
{
<# if (i == 1) { #>
string fragment = "(" + this.PartitionKey.ToString() + ")";
<# } #>
switch (this.Kind)
{
case StreamEventKind.Start:
return string.Format(CultureInfo.InvariantCulture, <#= toStringFragment[i] #>"[Start: {0},{1}]", TimeAsString(this.SyncTime), this.Payload.ToString());
return $"<#= partitioned ? "({this.PartitionKey})" : "" #>[{this.Kind}: {TimeAsString(this.SyncTime)},{this.Payload}]";
case StreamEventKind.End:
return string.Format(CultureInfo.InvariantCulture, <#= toStringFragment[i] #>"[End: {0},{1},{2}]", TimeAsString(this.SyncTime), TimeAsString(this.OtherTime), this.Payload.ToString());
return $"<#= partitioned ? "({this.PartitionKey})" : "" #>[{this.Kind}: {TimeAsString(this.SyncTime)},{TimeAsString(this.OtherTime)},{this.Payload}]";
case StreamEventKind.Interval:
return string.Format(CultureInfo.InvariantCulture, <#= toStringFragment[i] #>"[Interval: {0}-{1},{2}]", TimeAsString(this.SyncTime), TimeAsString(this.OtherTime), this.Payload.ToString());
return $"<#= partitioned ? "({this.PartitionKey})" : "" #>[{this.Kind}: {TimeAsString(this.SyncTime)}-{TimeAsString(this.OtherTime)},{this.Payload}]";
case StreamEventKind.Punctuation:
return string.Format(CultureInfo.InvariantCulture, <#= toStringFragment[i] #>"[Punc: {0}]", TimeAsString(this.SyncTime));
return $"<#= partitioned ? "({this.PartitionKey})" : "" #>[{this.Kind}: {TimeAsString(this.SyncTime)}]";
case StreamEventKind.LowWatermark:
return string.Format(CultureInfo.InvariantCulture, "[Low Watermark: {0}]", TimeAsString(this.SyncTime));
return $"[{this.Kind}: {TimeAsString(this.SyncTime)}]";
}
return string.Empty;
}

Просмотреть файл

@ -39,8 +39,15 @@ namespace Microsoft.StreamProcessing
public unsafe void OnNext(StreamMessage<Empty, TPayload> batch)
{
this.l1Pool.Get(out StreamMessage<PartitionKey<TPartitionKey>, TPayload> outputBatch);
outputBatch.vsync = batch.vsync;
outputBatch.vother = batch.vother;
if (this.partitionLag > 0)
{
outputBatch.vsync = batch.vsync.MakeWritable(this.l1Pool.longPool);
}
else
{
outputBatch.vsync = batch.vsync;
}
outputBatch.vother = batch.vother.MakeWritable(this.l1Pool.longPool);
outputBatch.payload = batch.payload;
outputBatch.hash = batch.hash.MakeWritable(this.l1Pool.intPool);
outputBatch.bitvector = batch.bitvector;
@ -56,6 +63,17 @@ namespace Microsoft.StreamProcessing
{
for (int i = 0; i < count; i++)
{
if ((batch.bitvector.col[i >> 6] & (1L << (i & 0x3f))) != 0 &&
batch.vother.col[i] == StreamEvent.PunctuationOtherTime)
{
if (this.partitionLag > 0)
{
outputBatch.vsync.col[i] = batch.vsync.col[i] - this.partitionLag;
}
outputBatch.vother.col[i] = PartitionedStreamEvent.LowWatermarkOtherTime;
continue;
}
var key = this.keySelectorFunc(destPayload[i]);
destKey[i] = new PartitionKey<TPartitionKey> { Key = key };
destHash[i] = key.GetHashCode();

Просмотреть файл

@ -219,7 +219,7 @@ namespace Microsoft.StreamProcessing
o => Expression.Bind(newType.GetTypeInfo().GetMember(o).Single(), Expression.PropertyOrField(inputParameter, o)));
var newFieldAssignments = newColumnFormulas.Select(
o => Expression.Bind(newType.GetTypeInfo().GetMember(o.Key).Single(), o.Value.ReplaceParametersInBody(inputParameter)));
o => Expression.Bind(newType.GetTypeInfo().GetMember(o.Key).Single(), o.Value.RemoveCastToObject().ReplaceParametersInBody(inputParameter)));
var member = Expression.MemberInit(newExpression, commonFieldAssignments.Concat(newFieldAssignments).ToArray());
var lambda = Expression.Lambda<Func<TOld, TNew>>(member, new ParameterExpression[] { inputParameter });

Просмотреть файл

@ -115,6 +115,17 @@ namespace Microsoft.StreamProcessing
return s;
}
public static LambdaExpression RemoveCastToObject(this LambdaExpression lambda)
{
if (lambda.Body is UnaryExpression body &&
(body.NodeType == ExpressionType.Convert || body.NodeType == ExpressionType.TypeAs) &&
(body.Type == typeof(object)))
{
return Expression.Lambda(body.Operand, lambda.Parameters);
}
return lambda;
}
public static Expression ReplaceParametersInBody(this LambdaExpression lambda, Expression exp0)
=> ParameterSubstituter.Replace(lambda.Parameters[0], exp0, lambda.Body);
@ -222,10 +233,7 @@ namespace Microsoft.StreamProcessing
private readonly Dictionary<ParameterExpression, int> parameterMap = new Dictionary<ParameterExpression, int>();
private int uniqueParameterNumber;
private EqualityComparer(Expression e1, Expression e2)
{
this.uniqueParameterNumber = 0;
}
private EqualityComparer(Expression e1, Expression e2) => this.uniqueParameterNumber = 0;
public static bool IsEqual(Expression e1, Expression e2)
{
@ -488,10 +496,7 @@ namespace Microsoft.StreamProcessing
ExpressionType.Block
};
public ConvertToCSharp(TextWriter writer)
{
this.writer = writer ?? throw new ArgumentNullException(nameof(writer));
}
public ConvertToCSharp(TextWriter writer) => this.writer = writer ?? throw new ArgumentNullException(nameof(writer));
private new void Visit(ReadOnlyCollection<Expression> es)
{
@ -770,7 +775,7 @@ namespace Microsoft.StreamProcessing
// returns the original expression.
protected override Expression VisitDynamic(DynamicExpression node)
{
writer.Write(node.ToString());
this.writer.Write(node.ToString());
return base.VisitDynamic(node);
}
#endif