Merge pull request #73 from Microsoft/Provider

Add XML documentation, and fix an issue in EquiJoin code generation
This commit is contained in:
James Terwilliger 2019-05-01 10:00:44 -07:00 коммит произвёл GitHub
Родитель 4bd6a5c146 a31556f533
Коммит c3f4d48dfa
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
17 изменённых файлов: 122 добавлений и 115 удалений

Просмотреть файл

@ -6,19 +6,19 @@
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// A streaming analogue to the framework interface IGrouping
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TPayload"></typeparam>
/// <typeparam name="TKey">The type of the grouping key</typeparam>
/// <typeparam name="TPayload">The type of the payload being grouped</typeparam>
public interface IGroupedWindow<out TKey, out TPayload>
{
/// <summary>
///
/// A concrete reference to the key of the group
/// </summary>
TKey Key { get; }
/// <summary>
///
/// A reference to the payload data contained in the group
/// </summary>
IWindow<TPayload> Window { get; }
}

Просмотреть файл

@ -2,23 +2,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// The core interface of the streaming provider API, representing a temporal or time-series stream of data
/// </summary>
/// <typeparam name="TPayload">The type of the payload of the underlying stream query.</typeparam>
public interface IQStreamable<out TPayload>
{
/// <summary>
///
/// The expression representing the full query so far in the pipeline.
/// </summary>
Expression Expression { get; }
/// <summary>
///
/// The assigned provider whose job it is to evaluate the query once it is constructed.
/// </summary>
IQStreamableProvider Provider { get; }
}

Просмотреть файл

@ -7,24 +7,24 @@ using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// Provides the base interface for the implementation of a stream provider.
/// </summary>
public interface IQStreamableProvider
{
/// <summary>
///
/// This method represents a link in the chain for building new IQStreamable queries.
/// </summary>
/// <typeparam name="TElement"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
/// <typeparam name="TElement">The type of the underlying payload.</typeparam>
/// <param name="expression">The expression representing the streaming query that has been created.</param>
/// <returns>A new IQStreamable object from which the query can continue to be built or evaluated.</returns>
IQStreamable<TElement> CreateQuery<TElement>(Expression expression);
/// <summary>
///
/// The method that is called when it is time to evaluate the constructed query.
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
/// <typeparam name="TResult">The type of the result of the streaming query.</typeparam>
/// <param name="expression">The expression to be evaluated and run.</param>
/// <returns>The result of evaluating the query expression.</returns>
TResult Execute<TResult>(Expression expression);
}
}

Просмотреть файл

@ -6,13 +6,13 @@
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// A window of data in a grouped window, similar to the group in an IGrouping.
/// </summary>
/// <typeparam name="TPayload"></typeparam>
/// <typeparam name="TPayload">The type of the underlying payload in the window of an IGroupedWindow.</typeparam>
public interface IWindow<out TPayload>
{
/// <summary>
///
/// Take the count of the number of elements in the current window.
/// </summary>
int Count();
}

Просмотреть файл

@ -7,9 +7,9 @@ using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// A concrete implementation of the IQStreamable interface for building streaming query expressions.
/// </summary>
/// <typeparam name="TPayload"></typeparam>
/// <typeparam name="TPayload">The type of the payload of the underlying stream query.</typeparam>
public sealed class QStreamable<TPayload> : IQStreamable<TPayload>
{
internal QStreamable(Expression expression, IQStreamableProvider provider)
@ -19,12 +19,12 @@ namespace Microsoft.StreamProcessing.Provider
}
/// <summary>
///
/// The expression representing the full query so far in the pipeline.
/// </summary>
public Expression Expression { get; }
/// <summary>
///
/// The assigned provider whose job it is to evaluate the query once it is constructed.
/// </summary>
public IQStreamableProvider Provider { get; }
}

Просмотреть файл

@ -8,25 +8,25 @@ using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// Provides the base implementation for a Stream provider.
/// </summary>
public abstract class QStreamableProviderBase : IQStreamableProvider
{
/// <summary>
///
/// This method represents a link in the chain for building new IQStreamable queries.
/// </summary>
/// <typeparam name="TElement"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
/// <typeparam name="TElement">The type of the underlying payload.</typeparam>
/// <param name="expression">The expression representing the streaming query that has been created.</param>
/// <returns>A new IQStreamable object from which the query can continue to be built or evaluated.</returns>
public IQStreamable<TElement> CreateQuery<TElement>(Expression expression)
=> new QStreamable<TElement>(expression, this);
/// <summary>
///
/// The method that is called when it is time to evaluate the constructed query.
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
/// <typeparam name="TResult">The type of the result of the streaming query.</typeparam>
/// <param name="expression">The expression to be evaluated and run.</param>
/// <returns>The result of evaluating the query expression.</returns>
public TResult Execute<TResult>(Expression expression) => throw new NotImplementedException();
}
}

Просмотреть файл

@ -9,7 +9,7 @@ using System.Reflection;
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// The extension methods over interface IQStreamable
/// </summary>
public static class QStreamableStatic
{

Просмотреть файл

@ -8,7 +8,7 @@ using System.Reflection;
namespace Microsoft.StreamProcessing.Provider
{
/// <summary>
///
/// An augmented expression visitor that requires implementors to provide transformations for the method API for IQStreamable
/// </summary>
public abstract class QStreamableVisitor : ExpressionVisitor
{
@ -19,10 +19,10 @@ namespace Microsoft.StreamProcessing.Provider
private static readonly MethodInfo WhereMethod = typeof(QStreamableStatic).GetMethod("Where");
/// <summary>
///
/// Visits the children of the System.Linq.Expressions.MethodCallExpression.
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected sealed override Expression VisitMethodCall(MethodCallExpression node)
{
var method = node.Method;
@ -39,45 +39,45 @@ namespace Microsoft.StreamProcessing.Provider
}
/// <summary>
///
/// An overrideable method for handling any method that is not part of the IQStreamable API.
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
protected virtual Expression VisitNonStreamingMethodCall(MethodCallExpression node) => node;
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected virtual Expression VisitNonStreamingMethodCall(MethodCallExpression node) => base.VisitMethodCall(node);
/// <summary>
///
/// Visits an IQStreamable Alter Duration call.
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected virtual Expression VisitAlterDurationCall(MethodCallExpression node) => node;
/// <summary>
///
/// Visits an IQStreamable Quantize Lifetime call.
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected virtual Expression VisitQuantizeLifetimeCall(MethodCallExpression node) => node;
/// <summary>
///
/// Visits an IQStreamable Select call.
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected virtual Expression VisitSelectCall(MethodCallExpression node) => node;
/// <summary>
///
/// Visits an IQStreamable Where call.
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected virtual Expression VisitWhereCall(MethodCallExpression node) => node;
/// <summary>
///
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
/// <param name="node">The expression to visit.</param>
/// <returns>The modified expression, if it or any subexpression was modified; otherwise, returns the original expression.</returns>
protected virtual Expression VisitGroupByCall(MethodCallExpression node) => node;
}
}

Просмотреть файл

@ -112,9 +112,10 @@ namespace Microsoft.StreamProcessing
#region LeftBatchSelector
{
var leftBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("i");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = "i", parameterRepresentation = template.leftMessageRepresentation, }),
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = leftBatchIndexVariable, parameterRepresentation = template.leftMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
@ -127,7 +128,7 @@ namespace Microsoft.StreamProcessing
var d = new Dictionary<ParameterExpression, string>
{
{ Expression.Variable(leftMessageType, "leftBatch"), leftBatch },
{ Expression.Variable(typeof(int), "i"), leftIndex },
{ Expression.Variable(typeof(int), leftBatchIndexVariable), leftIndex },
{ selector.Parameters[1], rightEvent }
};
var sb = new System.Text.StringBuilder();
@ -158,9 +159,10 @@ namespace Microsoft.StreamProcessing
#endregion
#region RightBatchSelector
{
var rightBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("j");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = "j", parameterRepresentation = template.rightMessageRepresentation, }),
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = rightBatchIndexVariable, parameterRepresentation = template.rightMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
@ -174,7 +176,7 @@ namespace Microsoft.StreamProcessing
{
{ selector.Parameters[0], leftEvent },
{ Expression.Variable(rightMessageType, "rightBatch"), rightBatch },
{ Expression.Variable(typeof(int), "j"), rightIndex }
{ Expression.Variable(typeof(int), rightBatchIndexVariable), rightIndex }
};
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");

Просмотреть файл

@ -32,12 +32,18 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (left.Properties.IsColumnar && !left.Properties.IsStartEdgeOnly && !left.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException($"Type of left side of join, '{typeof(TLeft).FullName}', does not have a valid equality operator for columnar mode.");
this.errorMessages = $"The left input payload type, '{typeof(TLeft).FullName}', to Equijoin does not implement the interface {nameof(IEqualityComparerExpression<TLeft>)}. This interface is needed for code generation of this operator for columnar mode. Furthermore, the equality expression in the interface can only refer to input variables if used in field or property references.";
if (Config.CodegenOptions.DontFallBackToRowBasedExecution)
throw new StreamProcessingException(this.errorMessages);
else this.Left = this.Left.ColumnToRow();
}
// This operator uses the equality method on payloads
if (right.Properties.IsColumnar && !right.Properties.IsStartEdgeOnly && !right.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException($"Type of right side of join, '{typeof(TRight).FullName}', does not have a valid equality operator for columnar mode.");
this.errorMessages = $"The right input payload type, '{typeof(TRight).FullName}', to Equijoin does not implement the interface {nameof(IEqualityComparerExpression<TRight>)}. This interface is needed for code generation of this operator for columnar mode. Furthermore, the equality expression in the interface can only refer to input variables if used in field or property references.";
if (Config.CodegenOptions.DontFallBackToRowBasedExecution)
throw new StreamProcessingException(this.errorMessages);
else this.Right = this.Right.ColumnToRow();
}
if (left.Properties.IsStartEdgeOnly && right.Properties.IsStartEdgeOnly)
@ -138,15 +144,11 @@ namespace Microsoft.StreamProcessing
}
protected override IBinaryObserver<TKey, TLeft, TRight, TResult> CreatePipe(IStreamObserver<TKey, TResult> observer)
{
if (typeof(TKey).GetPartitionType() == null)
{
return this.properties.IsColumnar
=> typeof(TKey).GetPartitionType() != null
? this.partitionedGenerator(this, this.Selector, observer)
: this.properties.IsColumnar
? GetPipe(observer)
: this.fallbackGenerator(this, this.Selector, observer);
}
return this.partitionedGenerator(this, this.Selector, observer);
}
protected override bool CanGenerateColumnar()
{

Просмотреть файл

@ -117,9 +117,10 @@ namespace Microsoft.StreamProcessing
#region LeftBatchSelector
{
var leftBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("i");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = "i", parameterRepresentation = template.leftMessageRepresentation, }),
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = leftBatchIndexVariable, parameterRepresentation = template.leftMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
@ -132,7 +133,7 @@ namespace Microsoft.StreamProcessing
var d = new Dictionary<ParameterExpression, string>
{
{ Expression.Variable(leftMessageType, "leftBatch"), leftBatch },
{ Expression.Variable(typeof(int), "i"), leftIndex },
{ Expression.Variable(typeof(int), leftBatchIndexVariable), leftIndex },
{ selector.Parameters[1], rightEvent }
};
var sb = new System.Text.StringBuilder();
@ -163,9 +164,10 @@ namespace Microsoft.StreamProcessing
#endregion
#region RightBatchSelector
{
var rightBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("j");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = "j", parameterRepresentation = template.rightMessageRepresentation, }),
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = rightBatchIndexVariable, parameterRepresentation = template.rightMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultMessageRepresentation, true);
if (projectionResult.Error)
@ -179,7 +181,7 @@ namespace Microsoft.StreamProcessing
{
{ selector.Parameters[0], leftEvent },
{ Expression.Variable(rightMessageType, "rightBatch"), rightBatch },
{ Expression.Variable(typeof(int), "j"), rightIndex }
{ Expression.Variable(typeof(int), rightBatchIndexVariable), rightIndex }
};
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");

Просмотреть файл

@ -93,9 +93,10 @@ namespace Microsoft.StreamProcessing
return null;
#region LeftBatchSelector
{
var leftBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("i");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = "i", parameterRepresentation = template.leftMessageRepresentation, }),
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = leftBatchIndexVariable, parameterRepresentation = template.leftMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultRepresentation, true);
if (projectionResult.Error)
@ -108,7 +109,7 @@ namespace Microsoft.StreamProcessing
var d = new Dictionary<ParameterExpression, string>
{
{ Expression.Variable(leftMessageType, "leftBatch"), leftBatch },
{ Expression.Variable(typeof(int), "i"), leftIndex },
{ Expression.Variable(typeof(int), leftBatchIndexVariable), leftIndex },
{ selector.Parameters[1], rightEvent }
};
var sb = new System.Text.StringBuilder();
@ -139,9 +140,10 @@ namespace Microsoft.StreamProcessing
#endregion
#region RightBatchSelector
{
var rightBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("j");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = "j", parameterRepresentation = template.rightMessageRepresentation, }),
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = rightBatchIndexVariable, parameterRepresentation = template.rightMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultRepresentation, true);
if (projectionResult.Error)
@ -155,7 +157,7 @@ namespace Microsoft.StreamProcessing
{
{ selector.Parameters[0], leftEvent },
{ Expression.Variable(rightMessageType, "rightBatch"), rightBatch },
{ Expression.Variable(typeof(int), "j"), rightIndex }
{ Expression.Variable(typeof(int), rightBatchIndexVariable), rightIndex }
};
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");

Просмотреть файл

@ -75,9 +75,10 @@ namespace Microsoft.StreamProcessing
var rightMessageType = StreamMessageManager.GetStreamMessageType<TKey, TRight>();
#region LeftBatchSelector
{
var leftBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("i");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = "i", parameterRepresentation = template.leftMessageRepresentation, }),
Tuple.Create(selector.Parameters[0], new SelectParameterInformation() { BatchName = "leftBatch", BatchType = leftMessageType, IndexVariableName = leftBatchIndexVariable, parameterRepresentation = template.leftMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultRepresentation, true);
if (projectionResult.Error)
@ -90,7 +91,7 @@ namespace Microsoft.StreamProcessing
var d = new Dictionary<ParameterExpression, string>
{
{ Expression.Variable(leftMessageType, "leftBatch"), leftBatch },
{ Expression.Variable(typeof(int), "i"), leftIndex },
{ Expression.Variable(typeof(int), leftBatchIndexVariable), leftIndex },
{ selector.Parameters[1], rightEvent }
};
var sb = new System.Text.StringBuilder();
@ -121,9 +122,10 @@ namespace Microsoft.StreamProcessing
#endregion
#region RightBatchSelector
{
var rightBatchIndexVariable = selector.Parameters.GenerateFreshVariableName("j");
var parameterSubsitutions = new List<Tuple<ParameterExpression, SelectParameterInformation>>()
{
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = "j", parameterRepresentation = template.rightMessageRepresentation, }),
Tuple.Create(selector.Parameters[1], new SelectParameterInformation() { BatchName = "rightBatch", BatchType = rightMessageType, IndexVariableName = rightBatchIndexVariable, parameterRepresentation = template.rightMessageRepresentation, }),
};
var projectionResult = SelectTransformer.Transform(selector, parameterSubsitutions, resultRepresentation, true);
if (projectionResult.Error)
@ -137,7 +139,7 @@ namespace Microsoft.StreamProcessing
{
{ selector.Parameters[0], leftEvent },
{ Expression.Variable(rightMessageType, "rightBatch"), rightBatch },
{ Expression.Variable(typeof(int), "j"), rightIndex }
{ Expression.Variable(typeof(int), rightBatchIndexVariable), rightIndex }
};
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Diagnostics.Contracts;
using System.Globalization;
using Microsoft.StreamProcessing.Internal.Collections;
namespace Microsoft.StreamProcessing
@ -25,13 +24,16 @@ namespace Microsoft.StreamProcessing
this.LeftComparer = left.Properties.PayloadEqualityComparer;
Initialize();
// This operator uses the equality method on payloads
if (this.Properties.IsColumnar && !this.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
if (left.Properties.IsColumnar && !this.LeftComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException($"Type of payload, '{typeof(TLeft).FullName}', to LASJ does not have a valid equality operator for columnar mode.");
this.errorMessages = $"The payload type, '{typeof(TLeft).FullName}', to Left Antisemijoin does not implement the interface {nameof(IEqualityComparerExpression<TLeft>)}. This interface is needed for code generation of this operator for columnar mode. Furthermore, the equality expression in the interface can only refer to input variables if used in field or property references.";
if (Config.CodegenOptions.DontFallBackToRowBasedExecution)
throw new StreamProcessingException(this.errorMessages);
else this.Left = this.Left.ColumnToRow();
}
Initialize();
}
protected override IBinaryObserver<TKey, TLeft, TRight, TLeft> CreatePipe(IStreamObserver<TKey, TLeft> observer)

Просмотреть файл

@ -182,7 +182,7 @@ namespace Microsoft.StreamProcessing
/// <summary>
/// Equality comparers for possible key selectors (selector -> IECE)
/// </summary>
private Dictionary<Expression, object> EqualityComparerSelectorMap;
private readonly Dictionary<Expression, object> EqualityComparerSelectorMap;
/// <summary>
/// Selectors that identify what the data is sorted by (could be more than one)
@ -506,19 +506,15 @@ namespace Microsoft.StreamProcessing
/// Clone
/// </summary>
internal StreamProperties<TKey, TPayload> Clone()
{
return new StreamProperties<TKey, TPayload>
=> new StreamProperties<TKey, TPayload>
(this.IsColumnar, this.IsConstantDuration, this.ConstantDurationLength, this.IsConstantHop, this.ConstantHopLength, this.ConstantHopOffset, this.IsIntervalFree, this.IsSyncTimeSimultaneityFree, this.IsSnapshotSorted, this.IsEventOverlappingFree, this.KeyEqualityComparer, this.PayloadEqualityComparer, this.KeyComparer, this.PayloadComparer, this.EqualityComparerSelectorMap.Clone(), this.SortSelectorMap.Clone(), this.QueryContainer);
}
/// <summary>
/// Clone
/// </summary>
internal StreamProperties<TKey, TPayload> CloneDelayed()
{
return new StreamProperties<TKey, TPayload>
=> new StreamProperties<TKey, TPayload>
(this.isColumnar, this.IsConstantDuration, this.ConstantDurationLength, this.IsConstantHop, this.ConstantHopLength, this.ConstantHopOffset, this.IsIntervalFree, this.IsSyncTimeSimultaneityFree, this.IsSnapshotSorted, this.IsEventOverlappingFree, this.KeyEqualityComparer, this.PayloadEqualityComparer, this.KeyComparer, this.PayloadComparer, this.EqualityComparerSelectorMap.Clone(), this.SortSelectorMap.Clone(), this.QueryContainer);
}
/// <summary>
/// Clone
@ -526,12 +522,10 @@ namespace Microsoft.StreamProcessing
internal StreamProperties<TNewKey, TPayload> CloneToNewKeyType<TNewKey>(
IEqualityComparerExpression<TNewKey> newKeyEqualityComparer,
IComparerExpression<TNewKey> newKeyComparer)
{
return new StreamProperties<TNewKey, TPayload>
=> new StreamProperties<TNewKey, TPayload>
(this.IsColumnar, this.IsConstantDuration, this.ConstantDurationLength, this.IsConstantHop, this.ConstantHopLength, this.ConstantHopOffset, this.IsIntervalFree, this.IsSyncTimeSimultaneityFree, this.IsSnapshotSorted, this.IsEventOverlappingFree,
newKeyEqualityComparer, this.PayloadEqualityComparer,
newKeyComparer, this.PayloadComparer, this.EqualityComparerSelectorMap.Clone(), this.SortSelectorMap.Clone(), this.QueryContainer);
}
/// <summary>
/// Clone
@ -539,20 +533,15 @@ namespace Microsoft.StreamProcessing
internal StreamProperties<TKey, TNewPayload> CloneToNewPayloadType<TNewPayload>(
IEqualityComparerExpression<TNewPayload> newPayloadEqualityComparer,
IComparerExpression<TNewPayload> newPayloadComparer)
{
return new StreamProperties<TKey, TNewPayload>
=> new StreamProperties<TKey, TNewPayload>
(this.IsColumnar, this.IsConstantDuration, this.ConstantDurationLength, this.IsConstantHop, this.ConstantHopLength, this.ConstantHopOffset, this.IsIntervalFree, this.IsSyncTimeSimultaneityFree, this.IsSnapshotSorted, this.IsEventOverlappingFree, this.KeyEqualityComparer,
newPayloadEqualityComparer, this.KeyComparer,
newPayloadComparer, this.EqualityComparerSelectorMap.Clone(), this.SortSelectorMap.Clone(), this.QueryContainer);
}
/// <summary>
/// Where
/// </summary>
internal StreamProperties<TKey, TPayload> Where(Expression<Func<TPayload, bool>> predicate)
{
return this;
}
internal StreamProperties<TKey, TPayload> Where(Expression<Func<TPayload, bool>> predicate) => this;
/// <summary>
/// Select
@ -875,9 +864,7 @@ namespace Microsoft.StreamProcessing
public NullStreamable(StreamProperties<TKey, TPayload> properties) : base(properties) { }
public override IDisposable Subscribe(IStreamObserver<TKey, TPayload> observer)
{
throw new InvalidOperationException("Only for use in property derivation");
}
=> throw new InvalidOperationException("Only for use in property derivation");
}
internal sealed class VerifyPropertiesStreamable<TKey, TPayload> : UnaryStreamable<TKey, TPayload, TPayload>
@ -885,9 +872,7 @@ namespace Microsoft.StreamProcessing
public VerifyPropertiesStreamable(IStreamable<TKey, TPayload> source) : base(source, source.Properties) { }
internal override IStreamObserver<TKey, TPayload> CreatePipe(IStreamObserver<TKey, TPayload> observer)
{
return new VerifyPropertiesPipe<TKey, TPayload>(this.properties, observer);
}
=> new VerifyPropertiesPipe<TKey, TPayload>(this.properties, observer);
protected override bool CanGenerateColumnar() => true;
}
@ -967,6 +952,7 @@ namespace Microsoft.StreamProcessing
this.ConstantDurationValidator(s, o);
this.ConstantHopValidator(s, o);
this.IntervalFreeValidator(s, o);
this.SyncTimeValidator(s, k);
this.SyncTimeSimultaneityFreeValidator(s, o, k);
}
@ -1058,10 +1044,7 @@ namespace Microsoft.StreamProcessing
key.ToString()));
}
public override void ProduceQueryPlan(PlanNode previous)
{
throw new NotImplementedException();
}
public override void ProduceQueryPlan(PlanNode previous) => throw new NotImplementedException();
public override int CurrentlyBufferedOutputCount => 0;

Просмотреть файл

@ -44,6 +44,15 @@ namespace Microsoft.StreamProcessing
yield return element;
}
internal static string GenerateFreshVariableName(this IEnumerable<ParameterExpression> parameters, string baseName)
{
var names = new HashSet<string>(parameters.Select(p => p.Name));
var suffix = 0;
var testName = baseName;
while (names.Contains(testName)) testName = baseName + suffix++;
return testName;
}
/// <summary>
/// Tries to get the first element in the given sorted dictionary.
/// </summary>

Просмотреть файл

@ -1686,6 +1686,9 @@ namespace SimpleTesting
[TestClass]
public class LeftComparerPayload_WithCodegen : TestWithConfigSettingsAndMemoryLeakDetection
{
public LeftComparerPayload_WithCodegen()
: base(new ConfigModifier().DontFallBackToRowBasedExecution(true)) { }
public class ClassOverridingEquals
{
public int x;
@ -1729,7 +1732,7 @@ namespace SimpleTesting
.ToEnumerable()
.ToArray();
}
catch (Exception)
catch (StreamProcessingException)
{
exceptionHappened = true;
}