This commit is contained in:
James Terwilliger 2020-03-19 13:02:47 -07:00
Родитель 77b67bfa8c
Коммит eb2df9b772
21 изменённых файлов: 50 добавлений и 19 удалений

Просмотреть файл

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net46</TargetFrameworks>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<Platforms>x64;AnyCPU</Platforms>
</PropertyGroup>

Просмотреть файл

@ -24,6 +24,7 @@ namespace Microsoft.StreamProcessing.Signal
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.DigitalFilter;
/// <summary>
/// Returns the filter associated with the active query operator
/// </summary>

Просмотреть файл

@ -25,10 +25,12 @@ namespace Microsoft.StreamProcessing.Signal
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.FilterIIR;
/// <summary>
/// The period of the uniformly-sampled signal.
/// </summary>
public long Period { get; private set; }
/// <summary>
/// The leakage factor of the leaky integrator.
/// </summary>

Просмотреть файл

@ -186,7 +186,6 @@ namespace Microsoft.StreamProcessing.Signal
// gets transformed into at least two samples: a start sample [a, a + 1) if a < b,
// an end sample [b - 1, b) if a < b - 1, and any additional samples created at
// sampling beats.
long previousBeatTime = currBeatTime - period;
long sampleTime = edgeEndTime - 1;
@ -368,7 +367,6 @@ namespace Microsoft.StreamProcessing.Signal
// We are guaranteed there are no events within (currBeatTime, currBeatTime + period) because
// lastTime must be <= currBeatTime and time is >= currBeatTime + period. Note there could have
// been edges at currBeatTime or edges to still come at currBeatTime + period, however.
LeaveBeat(this.currBeatTime);
this.currBeatTime += period;
ReachBeat(this.currBeatTime);
@ -413,12 +411,14 @@ namespace Microsoft.StreamProcessing.Signal
// Create a sample at edgeStartTime if the event started between two last beats
long edgeStartTime = this.newEdges.Values[index].Start;
// if (previousBeatTime < edgeStartTime && edgeStartTime < beatTime)
{
// Add sample at edgeStartTime
ProcessNewPoint(interpolator, edgeStartTime, ref this.newEdges.Values[index].Key, ref this.newEdges.Values[index].Payload, hash, beatTime);
}
}
// Clear new edges as no longer need to output.
this.newEdges.Clear();
@ -431,7 +431,6 @@ namespace Microsoft.StreamProcessing.Signal
// Each event interval [vs, vo) gets transformed into at least two samples:
// a start sample [vs, vs + 1) if vs < vo, an end sample [vo - 1, vo) if vs < vo - 1,
// and any additional samples created at sampling beats.
long intervalEndTime = this.intervals.Values[index].end;
// Create sample and remove interval if ends before or at (beatTime + 1).

Просмотреть файл

@ -22,10 +22,12 @@ namespace Microsoft.StreamProcessing.Signal
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.Interpolate;
/// <summary>
/// The offset value in the sample operation.
/// </summary>
public long Offset { get; private set; }
/// <summary>
/// The period value in the sample operation.
/// </summary>

Просмотреть файл

@ -51,7 +51,7 @@ namespace Microsoft.StreamProcessing.Signal
var parameters = exp.Parameters;
var var1 = Expression.Parameter(typeof(T));
var var2 = Expression.Parameter(typeof(T));
var body = ParameterSubstituter.Replace(parameters[0], var1, parameters[2], var2, exp);
var body = exp.ReplaceParametersInBody(var1, parameters[1], var2);
return new TwoPointInterpolationPolicy<T>(
windowSize,
Expression.Lambda<Func<T, long, T, long, long, T>>(
@ -73,7 +73,7 @@ namespace Microsoft.StreamProcessing.Signal
var parameters1 = exp1.Parameters;
var var1 = Expression.Parameter(typeof(T));
var var2 = Expression.Parameter(typeof(T));
var body1 = ParameterSubstituter.Replace(parameters1[0], var1, parameters1[2], var2, exp1);
var body1 = exp1.ReplaceParametersInBody(var1, parameters1[1], var2);
Expression<Func<double, long, double, long, double, long, long, double>> exp2 = (v1, t1, v2, t2, v3, t3, t) =>
t == t1 ? v1 : (t == t2 ? v2 : t == t3 ? v3 :
@ -82,7 +82,7 @@ namespace Microsoft.StreamProcessing.Signal
v3 * ((double)(t - t1) * (t - t2) / (t3 - t1) / (t3 - t2)));
var parameters2 = exp2.Parameters;
var var3 = Expression.Parameter(typeof(T));
var body2 = ParameterSubstituter.Replace(parameters2[0], var1, parameters2[2], var2, parameters2[4], var3, exp2);
var body2 = exp2.ReplaceParametersInBody(var1, parameters2[1], var2, parameters2[3], var3);
return new ThreePointInterpolationPolicy<T>(
windowSize,
@ -299,6 +299,7 @@ namespace Microsoft.StreamProcessing.Signal
this.leftValue = this.middleValue = this.rightValue = value;
this.numActiveSamples = 1;
}
// Check if interpolator has one sample or new sample is too far away from middle point
else if (numActiveSamples == 1 || watermark > this.middleTime)
{

Просмотреть файл

@ -22,10 +22,12 @@ namespace Microsoft.StreamProcessing.Signal
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.Sample;
/// <summary>
/// The offset value in the sample operation.
/// </summary>
public long Offset { get; private set; }
/// <summary>
/// The period value in the sample operation.
/// </summary>

Просмотреть файл

@ -21,11 +21,13 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// </summary>
[DataMember]
protected int BaseWindowSize;
/// <summary>
/// For internal use only - do not use externally
/// </summary>
[DataMember]
protected int HistoryWindowSize;
/// <summary>
/// For internal use only - do not use externally
/// </summary>
@ -33,6 +35,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
protected int Size;
[DataMember]
internal int Capacity;
/// <summary>
/// For internal use only - do not use externally
/// </summary>
@ -89,7 +92,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
throw new ArgumentException("Array size has to be greater than base size");
}
if (!Utility.IsPowerOfTwo(items.Length))
if (!IsPowerOfTwo(items.Length))
{
throw new ArgumentException("Initial capacity has to be zero or a power of two");
}
@ -103,6 +106,9 @@ namespace Microsoft.StreamProcessing.Signal.UDO
tail = baseSize;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool IsPowerOfTwo(long x) => (x > 0) && ((x & (x - 1)) == 0);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Enqueue(ref T item)
{
@ -238,7 +244,6 @@ namespace Microsoft.StreamProcessing.Signal.UDO
if (time < nextSampleTime) { return; }
// Ensured: windowStartTime <= nextSampleTime = time < windowEndTime
Items[tail] = item;
tail = (tail + 1) & IndexMask;
numberOfActiveItems++;
@ -304,13 +309,11 @@ namespace Microsoft.StreamProcessing.Signal.UDO
// Advance time to ensure:
// 1) time <= nextSampleTime
// 2) windowStartTime <= nextSampleTime < windowEndTime
AdvanceTime(time);
if (time < nextSampleTime) { return; }
// Ensured: windowStartTime <= nextSampleTime = time < windowEndTime
Items[tail] = item;
tail = (tail + 1) & IndexMask;
numberOfActiveItems++;

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.WindowedPipeline;
/// <summary>
/// Returns the operator pipeline function represented by this windowed operator
/// </summary>

Просмотреть файл

@ -113,7 +113,6 @@ namespace Microsoft.StreamProcessing.Signal.UDO
{
// First time group is active for this time.
// Create window pipeline and wire together observers and observables.
var resultObserver = this as ISignalWindowObserver<TResult>;
if (this.windowPipelineIsEmpty)
{

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.WindowedPipeline;
/// <summary>
/// Returns the operator pipeline function represented by this windowed operator
/// </summary>

Просмотреть файл

@ -149,7 +149,6 @@ namespace Microsoft.StreamProcessing.Signal.UDO
{
// First time group is active for this time.
// Create window pipeline and wire together observers and observables.
var resultObserver = new OverlappingArrayOutputObserver(this, ref this.currentKey, this.currentHash);
ISignalWindowObserver<TSource> firstPipelineObserver;

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.WindowedPipeline;
/// <summary>
/// Returns the operator pipeline function represented by this windowed operator
/// </summary>

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.WindowedPipeline;
/// <summary>
/// Returns the operator pipeline function represented by this windowed operator
/// </summary>

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.WindowedPipeline;
/// <summary>
/// Returns the operator pipeline function represented by this windowed operator
/// </summary>

Просмотреть файл

@ -108,7 +108,6 @@ namespace Microsoft.StreamProcessing.Signal.UDO
// First time group is active for this time.
// Create window pipeline and wire together observers and observables.
var resultObserver = this as ISignalWindowObserver<TResult>;
ISignalWindowObserver<TSource> firstPipelineObserver;

Просмотреть файл

@ -166,7 +166,6 @@ namespace Microsoft.StreamProcessing.Signal.UDO
{
// First time group is active for this time.
// Create window pipeline and wire together observers and observables.
var resultObserver = new OverlappingOutputObserver(this, ref this.currentKey, this.currentHash);
ISignalWindowObserver<TSource> firstPipelineObserver;

Просмотреть файл

@ -23,6 +23,7 @@ namespace Microsoft.StreamProcessing.Signal.UDO
/// Returns the kind of plan node, which can then be used for type casting.
/// </summary>
public override PlanNodeKind Kind => PlanNodeKind.WindowedPipeline;
/// <summary>
/// Returns the operator pipeline function represented by this windowed operator
/// </summary>

Просмотреть файл

@ -8,6 +8,7 @@ using System.Diagnostics.Contracts;
namespace Microsoft.StreamProcessing.Signal
{
// For more details about window functions see https://en.wikipedia.org/wiki/Window_function
/// <summary>
/// Static class that provides helper methods for creating window arrays
/// </summary>

Просмотреть файл

@ -15,6 +15,24 @@ namespace Microsoft.StreamProcessing
{
internal static readonly IDisposable EmptyDisposable = new NullDisposable();
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static bool IsPowerOfTwo(long x) => (x > 0) && ((x & (x - 1)) == 0);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2233:OperationsShouldNotOverflow", MessageId = "x+1", Justification = "Reviewed.")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2233:OperationsShouldNotOverflow", MessageId = "x-1", Justification = "Reviewed.")]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int Power2Ceil(int x)
{
x--;
x |= x >> 1;
x |= x >> 2;
x |= x >> 4;
x |= x >> 8;
x |= x >> 16;
x++;
return x;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static long SnapToLeftBoundary(this long value, long period, long offset = 0)
=> period <= 1 ? value : value - ((value + period - (offset % period)) % period);

Просмотреть файл

@ -19,9 +19,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Test", "Test", "{42433653-7
Test\Directory.Build.props = Test\Directory.Build.props
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleTesting", "Test\SimpleTesting\SimpleTesting.csproj", "{F88FBCF8-C33B-4089-8998-19E9CF901B99}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleTesting", "Test\SimpleTesting\SimpleTesting.csproj", "{F88FBCF8-C33B-4089-8998-19E9CF901B99}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PerformanceTesting", "Test\TrillPerf\PerformanceTesting.csproj", "{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PerformanceTesting", "Test\TrillPerf\PerformanceTesting.csproj", "{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.StreamProcessing.Signals", "Core\Microsoft.StreamProcessing.Signals\Microsoft.StreamProcessing.Signals.csproj", "{B1A04639-ACF7-4791-B60A-35D370767823}"
EndProject
@ -57,8 +57,8 @@ Global
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Debug|x64.Build.0 = Debug|Any CPU
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Release|Any CPU.Build.0 = Release|Any CPU
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Release|x64.ActiveCfg = Release|x64
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Release|x64.Build.0 = Release|x64
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Release|x64.ActiveCfg = Release|Any CPU
{5DD350C9-C6A8-4EBF-A192-BAEFD26013FB}.Release|x64.Build.0 = Release|Any CPU
{B1A04639-ACF7-4791-B60A-35D370767823}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B1A04639-ACF7-4791-B60A-35D370767823}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1A04639-ACF7-4791-B60A-35D370767823}.Debug|x64.ActiveCfg = Debug|x64