Minor code cleanup/refactoring

This commit is contained in:
James Terwilliger 2019-03-07 23:16:23 -08:00
Родитель 8b8a12254c
Коммит b08662f593
90 изменённых файлов: 470 добавлений и 1229 удалений

Просмотреть файл

@ -3,7 +3,6 @@
// Licensed under the MIT License
// *********************************************************************
using System.ComponentModel;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
@ -151,6 +150,6 @@ namespace Microsoft.StreamProcessing
/// </summary>
/// <returns>A string representation of the ranked event.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => string.Format(CultureInfo.InvariantCulture, "[Rank={0}, Payload={1}]", this.rank, this.payload);
public override string ToString() => $"[Rank={this.rank}, Payload={this.payload}]";
}
}

Просмотреть файл

@ -41,7 +41,7 @@ namespace Microsoft.StreamProcessing
{
var tm = new TypeMapper(tKey, tPayload, tResult);
this.className = string.Format("GeneratedTemporalEgress_{0}", TemporalEgressSequenceNumber++);
this.className = $"GeneratedTemporalEgress_{TemporalEgressSequenceNumber++}";
this.payloadRepresentation = new ColumnarRepresentation(tPayload);

Просмотреть файл

@ -41,7 +41,7 @@ namespace Microsoft.StreamProcessing
{
var tm = new TypeMapper(tKey, tPayload, tResult);
this.className = string.Format("GeneratedTemporalArrayEgress_{0}", TemporalArrayEgressSequenceNumber++);
this.className = $"GeneratedTemporalArrayEgress_{TemporalArrayEgressSequenceNumber++}";
this.payloadRepresentation = new ColumnarRepresentation(tPayload);

Просмотреть файл

@ -42,11 +42,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)
@ -109,11 +105,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)

Просмотреть файл

@ -65,11 +65,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)

Просмотреть файл

@ -43,9 +43,7 @@ namespace Microsoft.StreamProcessing
/// </summary>
/// <returns>An instance of the punctuation policy</returns>
public static PeriodicPunctuationPolicy None()
{
return new PeriodicPunctuationPolicy(PeriodicPunctuationPolicyType.None, 0);
}
=> new PeriodicPunctuationPolicy(PeriodicPunctuationPolicyType.None, 0);
/// <summary>
/// Inject punctuations every <paramref name="generationPeriod"/> time ticks, rounded down to the previous
@ -63,10 +61,7 @@ namespace Microsoft.StreamProcessing
/// Provides a hash code for the PeriodicPunctuationPolicy object.
/// </summary>
/// <returns>A hash code for the PeriodicPunctuationPolicy object.</returns>
public override int GetHashCode()
{
return this.type.GetHashCode() ^ this.generationPeriod.GetHashCode();
}
public override int GetHashCode() => this.type.GetHashCode() ^ this.generationPeriod.GetHashCode();
/// <summary>
/// Provides a string representation for the PeriodicPunctuationPolicy object.
@ -81,7 +76,7 @@ namespace Microsoft.StreamProcessing
case PeriodicPunctuationPolicyType.Time: kind = "Time"; break;
default: kind = "Unknown(" + this.type.ToString() + ")"; break;
}
return string.Format("PeriodicPunctuationPolicy.{0}({1})", kind, this.generationPeriod.ToString());
return $"PeriodicPunctuationPolicy.{kind}({this.generationPeriod.ToString()})";
}
}
}

Просмотреть файл

@ -381,8 +381,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -746,8 +745,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -1109,8 +1107,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -1377,8 +1374,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -1635,8 +1631,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -1891,8 +1886,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -2217,8 +2211,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -2495,8 +2488,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -2770,8 +2762,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -2960,8 +2951,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -3140,8 +3130,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -3317,8 +3306,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -3614,8 +3602,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -4088,8 +4075,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -4560,8 +4546,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -4918,8 +4903,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -5221,8 +5205,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -5522,8 +5505,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -5908,8 +5890,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -6290,8 +6271,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -6669,8 +6649,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -6944,8 +6923,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -7164,8 +7142,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
@ -7381,8 +7358,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else

Просмотреть файл

@ -543,8 +543,7 @@ namespace Microsoft.StreamProcessing
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else

Просмотреть файл

@ -429,8 +429,7 @@ if (latencyOption == "WithLatency") {
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, ""Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}"", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($""Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current: {current}"");
}
}
else

Просмотреть файл

@ -384,8 +384,7 @@ internal sealed class <#= className #><#= genericParameters #> : <#= partitionSt
{
if (outOfOrder)
{
var outOfOrderMessage = string.Format(System.Globalization.CultureInfo.InvariantCulture, "Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {0}, current:{1}", value.SyncTime, current);
throw new IngressException(outOfOrderMessage);
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current: {current}");
}
}
else

Просмотреть файл

@ -116,7 +116,7 @@ namespace Microsoft.StreamProcessing
FuseModule fuseModule)
{
var template = new TemporalIngressTemplate(
string.Format("GeneratedTemporalIngress_{0}", TemporalIngressSequenceNumber++),
$"GeneratedTemporalIngress_{TemporalIngressSequenceNumber++}",
typeof(TKey), typeof(TSource), fuseModule?.OutputType ?? typeof(TResult));
var tm = new TypeMapper(template.keyType, template.payloadType, template.resultType);

Просмотреть файл

@ -39,7 +39,6 @@
using System;
using System.Diagnostics.Contracts;
using System.Globalization;
using System.Linq.Expressions;
using System.Runtime.Serialization;
using Microsoft.StreamProcessing.Internal;
@ -147,11 +146,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)
@ -235,11 +230,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)
@ -306,11 +297,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)
@ -393,11 +380,7 @@ namespace Microsoft.StreamProcessing
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)

Просмотреть файл

@ -45,7 +45,6 @@
using System;
using System.Diagnostics.Contracts;
using System.Globalization;
using System.Linq.Expressions;
using System.Runtime.Serialization;
using Microsoft.StreamProcessing.Internal;
@ -213,11 +212,7 @@ foreach (string ingressType in new string[] { "StreamEvent", "Interval" })
int n = value.Count + value.Offset;
// Sanity check
if (n > value.Array.Length) throw new IngressException(
string.Format(
CultureInfo.InvariantCulture,
"Invalid array segment. Offset: " + value.Offset +
"Count: " + value.Count +
"Length: " + value.Array.Length));
$"Invalid array segment. Offset: {value.Offset} Count: {value.Count} Length: {value.Array.Length}");
int offset = value.Offset;
while (offset < n)

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq.Expressions;
namespace Microsoft.StreamProcessing
@ -252,7 +251,7 @@ namespace Microsoft.StreamProcessing
public override string ToString()
{
string result = string.Empty;
result += string.Format(CultureInfo.InvariantCulture, "Start State: {0}\n", this.StartState);
result += $"Start State: {this.StartState}\n";
result += "Final States:";
this.finalStates.ForEach(x => result += " " + x);
result += "\n";

Просмотреть файл

@ -649,7 +649,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
private void CreateNewInstance(string s) {
if (payloadIsAnon) {
var fieldArgs = String.Join(",", this.sourceFields.Select(f => string.Format("{0}.{1}", s, f.Name)));
var fieldArgs = String.Join(",", this.sourceFields.Select(f => $"{s}.{f.Name}"));
this.Write(" var payload = (");

Просмотреть файл

@ -704,7 +704,7 @@ public class AList<T>
private void CreateNewInstance(string s) {
#>
<#+ if (payloadIsAnon) {
var fieldArgs = String.Join(",", this.sourceFields.Select(f => string.Format("{0}.{1}", s, f.Name)));
var fieldArgs = String.Join(",", this.sourceFields.Select(f => $"{s}.{f.Name}"));
#>
var payload = (<#= TPayload #>)Activator.CreateInstance(typeof(<#= TPayload #>) <#= fieldArgs #> );
<#+ } else if (this.payloadHasNoFields) { #>

Просмотреть файл

@ -23,8 +23,8 @@ namespace Microsoft.StreamProcessing
{
if (Config.ForceRowBasedExecution)
{
this.sourceBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TPayload);
this.resultBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TRegister);
this.sourceBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TPayload}>";
this.resultBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TPayload}>";
}
else
{

Просмотреть файл

@ -88,15 +88,13 @@ namespace Microsoft.StreamProcessing
var emptyObserver = observer as IStreamObserver<Empty, TRegister>;
if (this.afa.uncompiledAfa.IsDeterministic)
{
if (this.Source.Properties.IsColumnar)
return GetUngroupedDAfaPipe(emptyObserver) as IStreamObserver<TKey, TPayload>;
else
return new CompiledUngroupedDAfaPipe<TPayload, TRegister, TAccumulator>(downcast, observer as IStreamObserver<Empty, TRegister>, this.afa, this.MaxDuration) as IStreamObserver<TKey, TPayload>;
return this.Source.Properties.IsColumnar
? GetUngroupedDAfaPipe(emptyObserver) as IStreamObserver<TKey, TPayload>
: new CompiledUngroupedDAfaPipe<TPayload, TRegister, TAccumulator>(downcast, observer as IStreamObserver<Empty, TRegister>, this.afa, this.MaxDuration) as IStreamObserver<TKey, TPayload>;
}
else if (this.Source.Properties.IsColumnar)
return GetUngroupedAFAPipe(emptyObserver) as IStreamObserver<TKey, TPayload>;
else
return new CompiledUngroupedAfaPipe<TPayload, TRegister, TAccumulator>(downcast, observer as IStreamObserver<Empty, TRegister>, this.afa, this.MaxDuration) as IStreamObserver<TKey, TPayload>;
else return this.Source.Properties.IsColumnar
? GetUngroupedAFAPipe(emptyObserver) as IStreamObserver<TKey, TPayload>
: new CompiledUngroupedAfaPipe<TPayload, TRegister, TAccumulator>(downcast, observer as IStreamObserver<Empty, TRegister>, this.afa, this.MaxDuration) as IStreamObserver<TKey, TPayload>;
}
else
{
@ -157,26 +155,20 @@ namespace Microsoft.StreamProcessing
{
if (typeof(TKey) == typeof(Empty))
{
if (this.afa.uncompiledAfa.IsDeterministic)
return Config.CodegenOptions.CodeGenAfa && CanGenerateUngroupedDAfaPipe();
else
return Config.CodegenOptions.CodeGenAfa && CanGenerateUngroupedAFAPipe();
return this.afa.uncompiledAfa.IsDeterministic
? Config.CodegenOptions.CodeGenAfa && CanGenerateUngroupedDAfaPipe()
: Config.CodegenOptions.CodeGenAfa && CanGenerateUngroupedAFAPipe();
}
else
return Config.CodegenOptions.CodeGenAfa && CanGenerateGroupedAFAPipe();
}
else if (Config.CodegenOptions.CodeGenAfa && CanGenerateAFAMultiEventListPipe())
return true;
else return false;
else return Config.CodegenOptions.CodeGenAfa && CanGenerateAFAMultiEventListPipe();
}
private bool CanGenerateUngroupedAFAPipe()
{
var typeOfTPayload = typeof(TPayload);
var typeOfTRegister = typeof(TRegister);
if (!typeOfTPayload.CanRepresentAsColumnar()) return false;
if (!typeOfTRegister.CanRepresentAsColumnar()) return false;
if (!typeof(TPayload).CanRepresentAsColumnar()) return false;
if (!typeof(TRegister).CanRepresentAsColumnar()) return false;
var lookupKey = CacheKey.Create((object)this.afa);
var downcast = this as AfaStreamable<Empty, TPayload, TRegister, TAccumulator>;
@ -185,6 +177,7 @@ namespace Microsoft.StreamProcessing
this.errorMessages = generatedPipeType.Item2;
return generatedPipeType.Item1 != null;
}
private UnaryPipe<Empty, TPayload, TRegister> GetUngroupedAFAPipe(IStreamObserver<Empty, TRegister> observer)
{
var lookupKey = CacheKey.Create((object)this.afa);
@ -198,13 +191,9 @@ namespace Microsoft.StreamProcessing
private bool CanGenerateGroupedAFAPipe()
{
var typeOfTKey = typeof(TKey);
var typeOfTPayload = typeof(TPayload);
var typeOfTRegister = typeof(TRegister);
if (!typeOfTPayload.CanRepresentAsColumnar()) return false;
if (!typeOfTRegister.CanRepresentAsColumnar()) return false;
if (typeOfTKey.GetPartitionType() != null) return false;
if (!typeof(TPayload).CanRepresentAsColumnar()) return false;
if (!typeof(TRegister).CanRepresentAsColumnar()) return false;
if (typeof(TKey).GetPartitionType() != null) return false;
var lookupKey = CacheKey.Create((object)this.afa);
@ -213,6 +202,7 @@ namespace Microsoft.StreamProcessing
this.errorMessages = generatedPipeType.Item2;
return generatedPipeType.Item1 != null;
}
private UnaryPipe<TKey, TPayload, TRegister> GetGroupedAFAPipe(IStreamObserver<TKey, TRegister> observer)
{
var lookupKey = CacheKey.Create((object)this.afa);
@ -225,11 +215,8 @@ namespace Microsoft.StreamProcessing
private bool CanGenerateUngroupedDAfaPipe()
{
var typeOfTPayload = typeof(TPayload);
var typeOfTRegister = typeof(TRegister);
if (!typeOfTPayload.CanRepresentAsColumnar()) return false;
if (!typeOfTRegister.CanRepresentAsColumnar()) return false;
if (!typeof(TPayload).CanRepresentAsColumnar()) return false;
if (!typeof(TRegister).CanRepresentAsColumnar()) return false;
var lookupKey = CacheKey.Create((object)this.afa);
@ -238,6 +225,7 @@ namespace Microsoft.StreamProcessing
this.errorMessages = generatedPipeType.Item2;
return generatedPipeType.Item1 != null;
}
private UnaryPipe<Empty, TPayload, TRegister> GetUngroupedDAfaPipe(IStreamObserver<Empty, TRegister> observer)
{
var lookupKey = CacheKey.Create((object)this.afa);
@ -250,13 +238,9 @@ namespace Microsoft.StreamProcessing
private bool CanGenerateGroupedAFAEventListPipe()
{
var typeOfTKey = typeof(TKey);
var typeOfTPayload = typeof(TPayload);
var typeOfTRegister = typeof(TRegister);
if (!typeOfTPayload.CanRepresentAsColumnar()) return false;
if (!typeOfTRegister.CanRepresentAsColumnar()) return false;
if (typeOfTKey.GetPartitionType() != null) return false;
if (!typeof(TPayload).CanRepresentAsColumnar()) return false;
if (!typeof(TRegister).CanRepresentAsColumnar()) return false;
if (typeof(TKey).GetPartitionType() != null) return false;
var lookupKey = CacheKey.Create((object)this.afa);
@ -265,6 +249,7 @@ namespace Microsoft.StreamProcessing
this.errorMessages = generatedPipeType.Item2;
return generatedPipeType.Item1 != null;
}
private UnaryPipe<TKey, TPayload, TRegister> GetGroupedAFAEventListPipe(IStreamObserver<TKey, TRegister> observer)
{
var lookupKey = CacheKey.Create((object)this.afa);
@ -277,13 +262,9 @@ namespace Microsoft.StreamProcessing
private bool CanGenerateGroupedAFAMultiEventPipe()
{
var typeOfTKey = typeof(TKey);
var typeOfTPayload = typeof(TPayload);
var typeOfTRegister = typeof(TRegister);
if (!typeOfTPayload.CanRepresentAsColumnar()) return false;
if (!typeOfTRegister.CanRepresentAsColumnar()) return false;
if (typeOfTKey.GetPartitionType() != null) return false;
if (!typeof(TPayload).CanRepresentAsColumnar()) return false;
if (!typeof(TRegister).CanRepresentAsColumnar()) return false;
if (typeof(TKey).GetPartitionType() != null) return false;
var lookupKey = CacheKey.Create((object)this.afa);
@ -292,6 +273,7 @@ namespace Microsoft.StreamProcessing
this.errorMessages = generatedPipeType.Item2;
return generatedPipeType.Item1 != null;
}
private UnaryPipe<TKey, TPayload, TRegister> GetGroupedAFAMultiEventPipe(IStreamObserver<TKey, TRegister> observer)
{
var lookupKey = CacheKey.Create((object)this.afa);
@ -304,13 +286,9 @@ namespace Microsoft.StreamProcessing
private bool CanGenerateAFAMultiEventListPipe()
{
var typeOfTKey = typeof(TKey);
var typeOfTPayload = typeof(TPayload);
var typeOfTRegister = typeof(TRegister);
if (!typeOfTPayload.CanRepresentAsColumnar()) return false;
if (!typeOfTRegister.CanRepresentAsColumnar()) return false;
if (typeOfTKey.GetPartitionType() != null) return false;
if (!typeof(TPayload).CanRepresentAsColumnar()) return false;
if (!typeof(TRegister).CanRepresentAsColumnar()) return false;
if (typeof(TKey).GetPartitionType() != null) return false;
var lookupKey = CacheKey.Create((object)this.afa);
@ -319,6 +297,7 @@ namespace Microsoft.StreamProcessing
this.errorMessages = generatedPipeType.Item2;
return generatedPipeType.Item1 != null;
}
private UnaryPipe<TKey, TPayload, TRegister> GetAFAMultiEventListPipe(IStreamObserver<TKey, TRegister> observer)
{
var lookupKey = CacheKey.Create((object)this.afa);

Просмотреть файл

@ -84,10 +84,7 @@ namespace Microsoft.StreamProcessing
public Func<string, string> Dispose;
public Func<string, string, string, string> SkipToEnd;
public MultiEdgeInfo()
{
this.Type = EdgeType.Multi;
}
public MultiEdgeInfo() => this.Type = EdgeType.Multi;
}
protected static List<int> EpsilonClosure<TPayload, TRegister, TAccumulator>(CompiledAfa<TPayload, TRegister, TAccumulator> afa, int node)
@ -109,14 +106,9 @@ namespace Microsoft.StreamProcessing
SourceNode = targetNodeNumber,
Fence = (ts, ev, reg) => searc.Fence.Inline(ts, ev, reg),
};
if (searc.Transfer == null)
{
edgeInfo.Transfer = null;
}
else
{
edgeInfo.Transfer = (ts, ev, reg) => searc.Transfer.Inline(ts, ev, reg);
}
edgeInfo.Transfer = searc.Transfer == null
? (Func<string, string, string, string>)null
: ((ts, ev, reg) => searc.Transfer.Inline(ts, ev, reg));
return edgeInfo;
}
@ -135,11 +127,9 @@ namespace Microsoft.StreamProcessing
protected void UpdateRegisterValue(EdgeInfo e, string defaultRegisterValue, string ts, string payloadList, string reg)
{
string newRegisterValue;
if (!this.hasRegister || e.Transfer == null)
newRegisterValue = defaultRegisterValue;
else
newRegisterValue = e.Transfer(ts, payloadList, reg);
var newRegisterValue = !this.hasRegister || e.Transfer == null
? defaultRegisterValue
: e.Transfer(ts, payloadList, reg);
WriteLine("{0}var newReg = {1};", this.CurrentIndent, newRegisterValue);
return;
@ -147,11 +137,9 @@ namespace Microsoft.StreamProcessing
protected void UpdateRegisterValue(MultiEdgeInfo e, string defaultRegisterValue, string ts, string acc, string reg)
{
string newRegisterValue;
if (e.Transfer == null)
newRegisterValue = defaultRegisterValue;
else
newRegisterValue = e.Transfer(ts, acc, reg);
var newRegisterValue = e.Transfer == null
? defaultRegisterValue
: e.Transfer(ts, acc, reg);
WriteLine("{0}var newReg = {1};", this.CurrentIndent, newRegisterValue);
return;
@ -170,54 +158,25 @@ namespace Microsoft.StreamProcessing
}
return;
}
protected static string BeginColumnPointerDeclaration(MyFieldInfo f, string batchName)
{
if (f.canBeFixed)
{
return string.Format("fixed ({0}* {2}_{1}_col = {2}.{1}.col) {{", f.TypeName, f.Name, batchName);
}
else if (f.OptimizeString())
{
return string.Format("var {1}_{0}_col = {1}.{0};", f.Name, batchName);
}
else
{
return string.Format("var {1}_{0}_col = {1}.{0}.col;", f.Name, batchName);
}
}
protected static string EndColumnPointerDeclaration(MyFieldInfo f)
{
if (f.canBeFixed)
{
return "}";
}
else
{
return string.Empty;
}
}
=> f.canBeFixed
? string.Format("fixed ({0}* {2}_{1}_col = {2}.{1}.col) {{", f.TypeName, f.Name, batchName)
: f.OptimizeString()
? string.Format("var {1}_{0}_col = {1}.{0};", f.Name, batchName)
: string.Format("var {1}_{0}_col = {1}.{0}.col;", f.Name, batchName);
protected static string EndColumnPointerDeclaration(MyFieldInfo f) => f.canBeFixed ? "}" : string.Empty;
protected static string ColumnPointerFieldDeclaration(MyFieldInfo f, string batchName)
{
if (f.OptimizeString())
{
return string.Format("Microsoft.StreamProcessing.Internal.Collections.Multistring {1}_{0}_col;", f.Name, batchName);
}
else
{
return string.Format("{2} {1}_{0}_col;", f.Name, batchName, f.Type.MakeArrayType().GetCSharpSourceSyntax());
}
}
=> f.OptimizeString()
? string.Format("Microsoft.StreamProcessing.Internal.Collections.Multistring {1}_{0}_col;", f.Name, batchName)
: string.Format("{2} {1}_{0}_col;", f.Name, batchName, f.Type.MakeArrayType().GetCSharpSourceSyntax());
protected static string ColumnPointerFieldAssignment(MyFieldInfo f, string batchName)
{
if (f.OptimizeString())
{
return string.Format("this.{1}_{0}_col = {1}.{0};", f.Name, batchName);
}
else
{
return string.Format("this.{1}_{0}_col = {1}.{0}.col;", f.Name, batchName);
}
}
=> f.OptimizeString()
? string.Format("this.{1}_{0}_col = {1}.{0};", f.Name, batchName)
: string.Format("this.{1}_{0}_col = {1}.{0}.col;", f.Name, batchName);
}

Просмотреть файл

@ -70,7 +70,7 @@ namespace Microsoft.StreamProcessing
/// <typeparam name="TRegister"></typeparam>
/// <typeparam name="TAccumulator"></typeparam>
[EditorBrowsable(EditorBrowsableState.Never)]
public class MultiEventArcInfo<TPayload, TRegister, TAccumulator>
public sealed class MultiEventArcInfo<TPayload, TRegister, TAccumulator>
{
/// <summary>
/// Currently for internal use only - do not use directly

Просмотреть файл

@ -130,10 +130,6 @@ namespace Microsoft.StreamProcessing
/// </summary>
public override ArcType ArcType => ArcType.Epsilon;
public override string ToString()
{
return "EpsilonArc";
}
public override string ToString() => "EpsilonArc";
}
}

Просмотреть файл

@ -251,9 +251,7 @@ namespace Microsoft.StreamProcessing.Internal
/// <param name="previous"></param>
[EditorBrowsable(EditorBrowsableState.Never)]
public override void ProduceQueryPlan(PlanNode previous)
{
this.Observer.ProduceQueryPlan(new AfaPlanNode(
=> this.Observer.ProduceQueryPlan(new AfaPlanNode(
previous, this, typeof(TKey), typeof(TPayload), this.IsGenerated, this.errorMessages));
}
}
}

Просмотреть файл

@ -20,8 +20,8 @@ namespace Microsoft.StreamProcessing
{
if (Config.ForceRowBasedExecution)
{
this.sourceBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TPayload);
this.resultBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TRegister);
this.sourceBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TPayload}>";
this.resultBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TRegister}>";
}
else
{
@ -39,14 +39,18 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
var className = string.Format("GeneratedGroupedAfaEventList_{0}", AFASequenceNumber++);
var className = $"GeneratedGroupedAfaEventList_{AFASequenceNumber++}";
var template = new GroupedAfaEventListTemplate(className, typeof(TKey), typeof(TPayload), typeof(TRegister), typeof(TAccumulator))
{
TKey = typeof(TKey).GetCSharpSourceSyntax(),
isFinal = stream.afa.isFinal,
hasOutgoingArcs = stream.afa.hasOutgoingArcs,
startStates = stream.afa.startStates,
AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances
AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances,
isSyncTimeSimultaneityFree = true, // The handwritten version doesn't make a distinction.
keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right),
};
var d1 = stream.afa.uncompiledAfa.transitionInfo;
@ -100,10 +104,6 @@ namespace Microsoft.StreamProcessing
template.newActivationInfo.Add(Tuple.Create(startState, edgeList2));
}
template.isSyncTimeSimultaneityFree = true; // The handwritten version doesn't make a distinction.
template.keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right);
var expandedCode = template.TransformText();

Просмотреть файл

@ -23,8 +23,8 @@ namespace Microsoft.StreamProcessing
if (Config.ForceRowBasedExecution)
{
this.sourceBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TPayload);
this.resultBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TRegister);
this.sourceBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TPayload}>";
this.resultBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TRegister}>";
}
else
{
@ -42,13 +42,18 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
var className = string.Format("GeneratedGroupedAfaMultiEvent_{0}", AFASequenceNumber++);
var template = new GroupedAfaMultiEventTemplate(className, typeof(TKey), typeof(TPayload), typeof(TRegister), typeof(TAccumulator));
template.isFinal = stream.afa.isFinal;
template.hasOutgoingArcs = stream.afa.hasOutgoingArcs;
template.startStates = stream.afa.startStates;
template.AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances;
var className = $"GeneratedGroupedAfaMultiEvent_{AFASequenceNumber++}";
var template = new GroupedAfaMultiEventTemplate(className, typeof(TKey), typeof(TPayload), typeof(TRegister), typeof(TAccumulator))
{
isFinal = stream.afa.isFinal,
hasOutgoingArcs = stream.afa.hasOutgoingArcs,
startStates = stream.afa.startStates,
AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances,
isSyncTimeSimultaneityFree = true, // The handwritten version doesn't make a distinction.
keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right),
};
var d1 = stream.afa.uncompiledAfa.transitionInfo;
var orderedKeys = d1.Keys.OrderBy(e => e).ToArray();
@ -78,14 +83,11 @@ namespace Microsoft.StreamProcessing
},
Fence = (ts, acc, reg) => multiArc.Fence.Inline(ts, acc, reg),
};
if (multiArc.Transfer == null)
{
multiEdgeInfo.Transfer = null;
}
else
{
multiEdgeInfo.Transfer = (ts, acc, reg) => multiArc.Transfer.Inline(ts, acc, reg);
}
multiEdgeInfo.Transfer = multiArc.Transfer == null
? (Func<string, string, string, string>)null
: ((ts, acc, reg) => multiArc.Transfer.Inline(ts, acc, reg));
if (multiArc.Dispose == null)
{
multiEdgeInfo.Dispose = (acc) => "// no dispose function";
@ -94,14 +96,10 @@ namespace Microsoft.StreamProcessing
{
multiEdgeInfo.Dispose = (acc) => multiArc.Dispose.Inline(acc);
}
if (multiArc.SkipToEnd == null)
{
multiEdgeInfo.SkipToEnd = null;
}
else
{
multiEdgeInfo.SkipToEnd = (ts, ev, acc) => multiArc.SkipToEnd.Inline(ts, ev, acc);
}
multiEdgeInfo.SkipToEnd = multiArc.SkipToEnd == null
? (Func<string, string, string, string>)null
: ((ts, ev, acc) => multiArc.SkipToEnd.Inline(ts, ev, acc));
edgeList.Add(multiEdgeInfo);
}
@ -135,14 +133,11 @@ namespace Microsoft.StreamProcessing
},
Fence = (ts, acc, reg) => multiArc.Fence.Inline(ts, acc, reg),
};
if (multiArc.Transfer == null)
{
multiEdgeInfo.Transfer = null;
}
else
{
multiEdgeInfo.Transfer = (ts, acc, reg) => multiArc.Transfer.Inline(ts, acc, reg);
}
multiEdgeInfo.Transfer = multiArc.Transfer == null
? (Func<string, string, string, string>)null
: ((ts, acc, reg) => multiArc.Transfer.Inline(ts, acc, reg));
if (multiArc.Dispose == null)
{
multiEdgeInfo.Dispose = (acc) => "// no dispose function";
@ -151,14 +146,11 @@ namespace Microsoft.StreamProcessing
{
multiEdgeInfo.Dispose = (acc) => multiArc.Dispose.Inline(acc);
}
if (multiArc.SkipToEnd == null)
{
multiEdgeInfo.SkipToEnd = null;
}
else
{
multiEdgeInfo.SkipToEnd = (ts, ev, acc) => multiArc.SkipToEnd.Inline(ts, ev, acc);
}
multiEdgeInfo.SkipToEnd = multiArc.SkipToEnd == null
? (Func<string, string, string, string>)null
: ((ts, ev, acc) => multiArc.SkipToEnd.Inline(ts, ev, acc));
edgeList2.Add(multiEdgeInfo);
}
}
@ -166,11 +158,6 @@ namespace Microsoft.StreamProcessing
template.startEdgeInfos.Add(Tuple.Create(startState, edgeList2));
}
template.isSyncTimeSimultaneityFree = true; // The handwritten version doesn't make a distinction.
template.keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right);
var expandedCode = template.TransformText();
var assemblyReferences = Transformer.AssemblyReferencesNeededFor(typeof(TKey), typeof(TPayload), typeof(TRegister));

Просмотреть файл

@ -19,8 +19,8 @@ namespace Microsoft.StreamProcessing
{
if (Config.ForceRowBasedExecution)
{
this.sourceBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TPayload);
this.resultBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TRegister);
this.sourceBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TPayload}>";
this.resultBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<{this.TKey}, {this.TRegister}>";
}
else
{
@ -38,17 +38,20 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
var className = string.Format("GeneratedGroupedAfa_{0}", AFASequenceNumber++);
var className = $"GeneratedGroupedAfa_{AFASequenceNumber++}";
var template = new GroupedAfaTemplate(className, typeof(TKey), typeof(TPayload), typeof(TRegister), typeof(TAccumulator))
{
TKey = typeof(TKey).GetCSharpSourceSyntax()
TKey = typeof(TKey).GetCSharpSourceSyntax(),
isFinal = stream.afa.isFinal,
hasOutgoingArcs = stream.afa.hasOutgoingArcs,
startStates = stream.afa.startStates,
AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances,
isSyncTimeSimultaneityFree = stream.Properties.IsSyncTimeSimultaneityFree,
keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right),
};
template.isFinal = stream.afa.isFinal;
template.hasOutgoingArcs = stream.afa.hasOutgoingArcs;
template.startStates = stream.afa.startStates;
template.AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances;
var d1 = stream.afa.uncompiledAfa.transitionInfo;
var orderedKeys = d1.Keys.OrderBy(e => e).ToArray();
for (int i = 0; i < orderedKeys.Length; i++)
@ -88,10 +91,6 @@ namespace Microsoft.StreamProcessing
template.newActivationInfo.Add(Tuple.Create(startState, edgeList2));
}
template.isSyncTimeSimultaneityFree = stream.Properties.IsSyncTimeSimultaneityFree;
template.keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right);
var expandedCode = template.TransformText();

Просмотреть файл

@ -111,7 +111,6 @@ namespace Microsoft.StreamProcessing
return ListElement(condition, aggregatorTemplate.InlineCalls());
}
public IPattern<TKey, TPayload, TRegister, TAccumulator> ListElement(Expression<Func<List<TPayload>, TRegister, bool>> condition, Expression<Func<long, List<TPayload>, TRegister, TRegister>> aggregator = null)
{
Expression<Func<long, List<TPayload>, TRegister, bool>> conditionTemplate = (ts, ev, r) => CallInliner.Call(condition, ev, r);
@ -296,14 +295,13 @@ namespace Microsoft.StreamProcessing
}
result.StartState = 0;
return Concat(x => new PatternMatcher<TKey, TPayload, TRegister, TAccumulator>(this.source, result));
}
#endregion
#region Set Register and Accumulator
public IAbstractPattern<TKey, TPayload, TRegister, TAccumulatorNew> SetAccumulator<TAccumulatorNew>(TAccumulatorNew defaultAccumulator = default)
=> new PatternMatcher<TKey, TPayload, TRegister, TAccumulatorNew>(this.source, null);
=> new PatternMatcher<TKey, TPayload, TRegister, TAccumulatorNew>(this.source);
public IAbstractPattern<TKey, TPayload, TRegisterNew, TAccumulator> SetRegister<TRegisterNew>(TRegisterNew defaultRegister = default)
=> new PatternMatcher<TKey, TPayload, TRegisterNew, TAccumulator>(this.source, null, defaultRegister);
@ -413,10 +411,8 @@ namespace Microsoft.StreamProcessing
result.AddArc(from, to, kvp2.Value);
if (nextPattern.finalStates.Contains(kvp2.Key))
{
if (!result.finalStates.Contains(to))
result.finalStates.Add(to);
}
}
}
}

Просмотреть файл

@ -12,13 +12,13 @@ namespace Microsoft.StreamProcessing
{
internal partial class UngroupedAfaTemplate : AfaTemplate
{
private UngroupedAfaTemplate(string className, Type keyType, Type payloadType, Type registerType, Type accumulatorType)
: base(className, keyType, payloadType, registerType, accumulatorType)
private UngroupedAfaTemplate(string className, Type payloadType, Type registerType, Type accumulatorType)
: base(className, typeof(Empty), payloadType, registerType, accumulatorType)
{
if (Config.ForceRowBasedExecution)
{
this.sourceBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<Microsoft.StreamProcessing.Empty, {0}>", this.TPayload);
this.resultBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<Microsoft.StreamProcessing.Empty, {0}>", this.TRegister);
this.sourceBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<Microsoft.StreamProcessing.Empty, {this.TPayload}>";
this.resultBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<Microsoft.StreamProcessing.Empty, {this.TRegister}>";
}
else
{
@ -36,13 +36,14 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
var className = string.Format("GeneratedUngroupedAfa_{0}", AFASequenceNumber++);
var template = new UngroupedAfaTemplate(className, typeof(Empty), typeof(TPayload), typeof(TRegister), typeof(TAccumulator));
template.isFinal = stream.afa.isFinal;
template.hasOutgoingArcs = stream.afa.hasOutgoingArcs;
template.startStates = stream.afa.startStates;
template.AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances;
var className = $"GeneratedUngroupedAfa_{AFASequenceNumber++}";
var template = new UngroupedAfaTemplate(className, typeof(TPayload), typeof(TRegister), typeof(TAccumulator))
{
isFinal = stream.afa.isFinal,
hasOutgoingArcs = stream.afa.hasOutgoingArcs,
startStates = stream.afa.startStates,
AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances
};
var d1 = stream.afa.uncompiledAfa.transitionInfo;
var orderedKeys = d1.Keys.OrderBy(e => e).ToArray();

Просмотреть файл

@ -12,20 +12,18 @@ namespace Microsoft.StreamProcessing
{
internal partial class UngroupedDAfaTemplate : AfaTemplate
{
private Func<string, string, string> keyEqualityComparer;
private UngroupedDAfaTemplate(string className, Type keyType, Type payloadType, Type registerType, Type accumulatorType)
: base(className, keyType, payloadType, registerType, accumulatorType)
private UngroupedDAfaTemplate(string className, Type payloadType, Type registerType, Type accumulatorType)
: base(className, typeof(Empty), payloadType, registerType, accumulatorType)
{
if (Config.ForceRowBasedExecution)
{
this.sourceBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TPayload);
this.resultBatchTypeName = string.Format("Microsoft.StreamProcessing.StreamMessage<{0}, {1}>", this.TKey, this.TRegister);
this.sourceBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<Microsoft.StreamProcessing.Empty, {this.TPayload}>";
this.resultBatchTypeName = $"Microsoft.StreamProcessing.StreamMessage<Microsoft.StreamProcessing.Empty, {this.TRegister}>";
}
else
{
this.sourceBatchTypeName = Transformer.GetBatchClassName(keyType, payloadType);
this.resultBatchTypeName = Transformer.GetBatchClassName(keyType, registerType);
this.sourceBatchTypeName = Transformer.GetBatchClassName(typeof(Empty), payloadType);
this.resultBatchTypeName = Transformer.GetBatchClassName(typeof(Empty), registerType);
}
}
@ -38,17 +36,17 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
var className = string.Format("GeneratedUngroupedDAfa_{0}", AFASequenceNumber++);
var template = new UngroupedDAfaTemplate(className, typeof(TKey), typeof(TPayload), typeof(TRegister), typeof(TAccumulator))
var className = $"GeneratedUngroupedDAfa_{AFASequenceNumber++}";
var template = new UngroupedDAfaTemplate(className, typeof(TPayload), typeof(TRegister), typeof(TAccumulator))
{
TKey = typeof(TKey).GetCSharpSourceSyntax()
TKey = typeof(Empty).GetCSharpSourceSyntax(),
isFinal = stream.afa.isFinal,
hasOutgoingArcs = stream.afa.hasOutgoingArcs,
startStates = stream.afa.startStates,
AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances,
isSyncTimeSimultaneityFree = stream.Properties.IsSyncTimeSimultaneityFree,
};
template.isFinal = stream.afa.isFinal;
template.hasOutgoingArcs = stream.afa.hasOutgoingArcs;
template.startStates = stream.afa.startStates;
template.AllowOverlappingInstances = stream.afa.uncompiledAfa.AllowOverlappingInstances;
var d1 = stream.afa.uncompiledAfa.transitionInfo;
var orderedKeys = d1.Keys.OrderBy(e => e).ToArray();
for (int i = 0; i < orderedKeys.Length; i++)
@ -88,10 +86,6 @@ namespace Microsoft.StreamProcessing
template.newActivationInfo.Add(Tuple.Create(startState, edgeList2));
}
template.isSyncTimeSimultaneityFree = stream.Properties.IsSyncTimeSimultaneityFree;
template.keyEqualityComparer =
(left, right) =>
stream.Properties.KeyEqualityComparer.GetEqualsExpr().Inline(left, right);
var expandedCode = template.TransformText();

Просмотреть файл

@ -44,12 +44,10 @@ namespace Microsoft.StreamProcessing
}
public override void ProduceQueryPlan(PlanNode previous)
{
this.Observer.ProduceQueryPlan(new AlterLifetimePlanNode(
=> this.Observer.ProduceQueryPlan(new AlterLifetimePlanNode(
previous, this,
typeof(TKey), typeof(TPayload), this.startTimeSelector, this.startTimeDurationSelector,
true));
}
public unsafe override void OnNext(StreamMessage<TKey, TPayload> batch)
{

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Diagnostics.Contracts;
using System.Globalization;
using Microsoft.StreamProcessing.Internal.Collections;
namespace Microsoft.StreamProcessing
@ -23,7 +22,7 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (this.Properties.IsColumnar && !this.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of payload, '{0}', to Beat does not have a valid equality operator for columnar mode.", typeof(TPayload).FullName));
throw new InvalidOperationException($"Type of payload, '{typeof(TPayload).FullName}', to Beat does not have a valid equality operator for columnar mode.");
}
}
@ -46,8 +45,9 @@ namespace Microsoft.StreamProcessing
var p = typeof(TKey).GetPartitionType();
if (p == null)
{
if (this.Source.Properties.IsColumnar) return GetPipe(observer);
else return new BeatPipe<TKey, TPayload>(this, observer);
return this.Source.Properties.IsColumnar
? GetPipe(observer)
: new BeatPipe<TKey, TPayload>(this, observer);
}
var outputType = typeof(PartitionedBeatPipe<,,>).MakeGenericType(typeof(TKey), typeof(TPayload), p);

Просмотреть файл

@ -38,7 +38,7 @@ namespace Microsoft.StreamProcessing
sw.Start();
#endif
var template = new BeatTemplate(
string.Format("GeneratedBeat_{0}", BeatSequenceNumber++),
$"GeneratedBeat_{BeatSequenceNumber++}",
typeof(TKey), typeof(TPayload));
template.ActiveEventType = typeof(TPayload).GetTypeInfo().IsValueType ? template.TPayload : "Active_Event";

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Diagnostics.Contracts;
using System.Globalization;
using Microsoft.StreamProcessing.Internal.Collections;
namespace Microsoft.StreamProcessing
@ -28,7 +27,7 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads from the left side
if (left.Properties.IsColumnar && !this.LeftComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of payload, '{0}', to Clip does not have a valid equality operator for columnar mode.", typeof(TLeft).FullName));
throw new InvalidOperationException($"Type of payload, '{typeof(TLeft).FullName}', to Clip does not have a valid equality operator for columnar mode.");
}
Initialize();

Просмотреть файл

@ -65,7 +65,7 @@ namespace Microsoft.StreamProcessing
var gps = tm.GenericTypeVariables(keyType, leftType, rightType);
template.genericParameters = gps.BracketedCommaSeparatedString();
template.className = string.Format("GeneratedClip_{0}", ClipSequenceNumber++);
template.className = $"GeneratedClip_{ClipSequenceNumber++}";
var resultRepresentation = new ColumnarRepresentation(leftType);

Просмотреть файл

@ -35,7 +35,7 @@ namespace Microsoft.StreamProcessing
sw.Start();
#endif
var template = new ClipByConstantTemplate(
string.Format("ClipByConstant_{0}", ClipByConstantSequenceNumber++),
$"ClipByConstant_{ClipByConstantSequenceNumber++}",
typeof(TKey), typeof(TPayload));
return template.Generate<TKey, TPayload>(typeof(IStreamable<,>));

Просмотреть файл

@ -31,7 +31,7 @@ namespace Microsoft.StreamProcessing
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(keyType));
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(payloadType));
var generatedClassName = string.Format(CultureInfo.InvariantCulture, "ColumnToRowUnaryPipeGeneratedFrom_{0}_{1}_{2}", keyType.GetValidIdentifier(), payloadType.GetValidIdentifier(), ColumnToRowSequenceNumber++);
var generatedClassName = $"ColumnToRowUnaryPipeGeneratedFrom_{keyType.GetValidIdentifier()}_{payloadType.GetValidIdentifier()}_{ColumnToRowSequenceNumber++}";
var template = new ColumnToRowTemplate(generatedClassName, keyType, payloadType);
var expandedCode = template.TransformText();

Просмотреть файл

@ -21,29 +21,25 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (this.Properties.IsColumnar && !this.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of payload, '{0}', to EndEdgeFreeOutputStreamable does not have a valid equality operator for columnar mode.", typeof(TPayload).FullName));
throw new InvalidOperationException($"Type of payload, '{typeof(TPayload).FullName}', to EndEdgeFreeOutputStreamable does not have a valid equality operator for columnar mode.");
}
Initialize();
}
public override sealed IDisposable Subscribe(IStreamObserver<TKey, TPayload> observer)
{
if (this.Source.Properties.IsConstantDuration)
{
return this.Source.Subscribe(observer);
}
return this.Source.Subscribe(CreatePipe(observer));
}
=> this.Source.Properties.IsConstantDuration
? this.Source.Subscribe(observer)
: this.Source.Subscribe(CreatePipe(observer));
internal override IStreamObserver<TKey, TPayload> CreatePipe(IStreamObserver<TKey, TPayload> observer)
{
var part = typeof(TKey).GetPartitionType();
if (part == null)
{
if (this.Source.Properties.IsColumnar) return GetPipe(observer);
else return new EndEdgeFreeOutputPipe<TKey, TPayload>(this, observer);
return this.Source.Properties.IsColumnar
? GetPipe(observer)
: new EndEdgeFreeOutputPipe<TKey, TPayload>(this, observer);
}
var outputType = typeof(PartitionedEndEdgeFreeOutputPipe<,,>).MakeGenericType(

Просмотреть файл

@ -38,7 +38,7 @@ namespace Microsoft.StreamProcessing
Contract.Ensures(Contract.Result<Tuple<Type, string>>() == null || typeof(UnaryPipe<TKey, TPayload, TPayload>).GetTypeInfo().IsAssignableFrom(Contract.Result<Tuple<Type, string>>().Item1));
var template = new EndEdgeFreeOutputTemplate(
string.Format("GeneratedEndEdgeFreeOutput_{0}", EndEdgeFreeOutputTemplateSequenceNumber++),
$"GeneratedEndEdgeFreeOutput_{EndEdgeFreeOutputTemplateSequenceNumber++}",
typeof(TKey), typeof(TPayload));
template.ActiveEventType = typeof(TPayload).GetTypeInfo().IsValueType ? template.TPayload : "Active_Event";

Просмотреть файл

@ -72,7 +72,7 @@ namespace Microsoft.StreamProcessing
template.TKeyTResultGenericParameters = tm.GenericTypeVariables(keyType, resultType).BracketedCommaSeparatedString();
template.genericParameters = tm.GenericTypeVariables(keyType, leftType, rightType, resultType).BracketedCommaSeparatedString();
template.className = string.Format("GeneratedEquiJoin_{0}", EquiJoinSequenceNumber++);
template.className = $"GeneratedEquiJoin_{EquiJoinSequenceNumber++}";
template.leftMessageRepresentation = new ColumnarRepresentation(leftType);
template.rightMessageRepresentation = new ColumnarRepresentation(rightType);
@ -100,7 +100,7 @@ namespace Microsoft.StreamProcessing
if (keyType.IsAnonymousType())
{
template.keyComparerEquals =
(left, right) => string.Format("keyComparerEquals({0}, {1})", left, right);
(left, right) => $"keyComparerEquals({left}, {right})";
}
#endregion
@ -277,7 +277,7 @@ namespace Microsoft.StreamProcessing
template.endPointHeap = "EndPointQueue";
}
else if (stream.Left.Properties.IsConstantDuration && stream.Right.Properties.IsConstantDuration &&
stream.Left.Properties.ConstantDurationLength == stream.Right.Properties.ConstantDurationLength)
stream.Left.Properties.ConstantDurationLength == stream.Right.Properties.ConstantDurationLength)
{
template.endPointHeap = "EndPointQueue";
}

Просмотреть файл

@ -33,12 +33,12 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (left.Properties.IsColumnar && !left.Properties.IsStartEdgeOnly && !left.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of left side of join, '{0}', does not have a valid equality operator for columnar mode.", typeof(TLeft).FullName));
throw new InvalidOperationException($"Type of left side of join, '{typeof(TLeft).FullName}', does not have a valid equality operator for columnar mode.");
}
// This operator uses the equality method on payloads
if (right.Properties.IsColumnar && !right.Properties.IsStartEdgeOnly && !right.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of right side of join, '{0}', does not have a valid equality operator for columnar mode.", typeof(TRight).FullName));
throw new InvalidOperationException($"Type of right side of join, '{typeof(TRight).FullName}', does not have a valid equality operator for columnar mode.");
}
if (left.Properties.IsStartEdgeOnly && right.Properties.IsStartEdgeOnly)
@ -140,11 +140,11 @@ namespace Microsoft.StreamProcessing
protected override IBinaryObserver<TKey, TLeft, TRight, TResult> CreatePipe(IStreamObserver<TKey, TResult> observer)
{
var part = typeof(TKey).GetPartitionType();
if (part == null)
if (typeof(TKey).GetPartitionType() == null)
{
if (this.properties.IsColumnar) return GetPipe(observer);
else return this.fallbackGenerator(this, this.Selector, observer);
return this.properties.IsColumnar
? GetPipe(observer)
: this.fallbackGenerator(this, this.Selector, observer);
}
return this.partitionedGenerator(this, this.Selector, observer);
}
@ -166,18 +166,18 @@ namespace Microsoft.StreamProcessing
var lookupKey = CacheKey.Create(this.joinKind, this.Properties.KeyEqualityComparer.GetEqualsExpr().ExpressionToCSharp(), this.Left.Properties.PayloadEqualityComparer.GetEqualsExpr().ExpressionToCSharp(), this.Right.Properties.PayloadEqualityComparer.GetEqualsExpr().ExpressionToCSharp(), this.Selector.ExpressionToCSharp());
var generatedPipeType = cachedPipes.GetOrAdd(lookupKey, this.columnarGenerator);
Func<PlanNode, PlanNode, IBinaryObserver, BinaryPlanNode> planNode = ((PlanNode left, PlanNode right, IBinaryObserver o) =>
Func<PlanNode, PlanNode, IBinaryObserver, BinaryPlanNode> planNode = (PlanNode left, PlanNode right, IBinaryObserver o) =>
{
var node = new JoinPlanNode(
left, right, o,
typeof(TLeft), typeof(TRight), typeof(TLeft), typeof(TKey), this.joinKind, true, generatedPipeType.Item2, false);
left, right, o,
typeof(TLeft), typeof(TRight), typeof(TLeft), typeof(TKey), this.joinKind, true, generatedPipeType.Item2, false);
node.AddJoinExpression("key comparer", this.Properties.KeyEqualityComparer.GetEqualsExpr());
node.AddJoinExpression("left payload comparer", this.Left.Properties.PayloadEqualityComparer.GetEqualsExpr());
node.AddJoinExpression("right payload comparer", this.Right.Properties.PayloadEqualityComparer.GetEqualsExpr());
node.AddJoinExpression("left key comparer", this.Left.Properties.KeyComparer.GetCompareExpr());
node.AddJoinExpression("right key comparer", this.Right.Properties.KeyComparer.GetCompareExpr());
return node;
});
};
var instance = Activator.CreateInstance(generatedPipeType.Item1, this, observer, planNode);
var returnValue = (BinaryPipe<TKey, TLeft, TRight, TResult>)instance;

Просмотреть файл

@ -77,7 +77,7 @@ namespace Microsoft.StreamProcessing
template.TKeyTResultGenericParameters = tm.GenericTypeVariables(keyType, resultType).BracketedCommaSeparatedString();
template.genericParameters = tm.GenericTypeVariables(keyType, leftType, rightType, resultType).BracketedCommaSeparatedString();
template.className = string.Format("GeneratedFixedIntervalEquiJoin_{0}", EquiJoinSequenceNumber++);
template.className = $"GeneratedFixedIntervalEquiJoin_{EquiJoinSequenceNumber++}";
template.leftMessageRepresentation = new ColumnarRepresentation(leftType);
template.rightMessageRepresentation = new ColumnarRepresentation(rightType);
@ -105,7 +105,7 @@ namespace Microsoft.StreamProcessing
if (keyType.IsAnonymousType())
{
template.keyComparerEquals =
(left, right) => string.Format("keyComparerEquals({0}, {1})", left, right);
(left, right) => $"keyComparerEquals({left}, {right})";
}
#endregion

Просмотреть файл

@ -79,7 +79,7 @@ namespace Microsoft.StreamProcessing
template.TRight = rightType.GetCSharpSourceSyntax();
template.TResult = resultType.GetCSharpSourceSyntax(); // BUGBUG: need to get any generic parameters needed
template.className = string.Format("GeneratedIncreasingOrderEquiJoin_{0}", IOOEJSequenceNumber++);
template.className = $"GeneratedIncreasingOrderEquiJoin_{IOOEJSequenceNumber++}";
template.leftMessageRepresentation = new ColumnarRepresentation(leftType);
template.leftFields = template.leftMessageRepresentation.AllFields;

Просмотреть файл

@ -3,7 +3,6 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Globalization;
using Microsoft.StreamProcessing.Internal.Collections;
namespace Microsoft.StreamProcessing
@ -24,7 +23,7 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (duration < 0 && this.Properties.IsColumnar && !this.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of payload, '{0}', to ExtendLifetime does not have a valid equality operator for columnar mode.", typeof(TPayload).FullName));
throw new InvalidOperationException($"Type of payload, '{typeof(TPayload).FullName}', to ExtendLifetime does not have a valid equality operator for columnar mode.");
}
Initialize();
@ -38,8 +37,9 @@ namespace Microsoft.StreamProcessing
var t = typeof(TKey).GetPartitionType();
if (t == null)
{
if (this.Source.Properties.IsColumnar) return GetPipe(this, observer);
else return new ExtendLifetimeNegativePipe<TKey, TPayload>(this, observer, -this.duration);
return this.Source.Properties.IsColumnar
? GetPipe(this, observer)
: new ExtendLifetimeNegativePipe<TKey, TPayload>(this, observer, -this.duration);
}
var outputType = typeof(PartitionedExtendLifetimeNegativePipe<,,>).MakeGenericType(
typeof(TKey),
@ -52,8 +52,9 @@ namespace Microsoft.StreamProcessing
var t = typeof(TKey).GetPartitionType();
if (t == null)
{
if (this.Source.Properties.IsColumnar) return GetPipe(this, observer);
else return new ExtendLifetimePipe<TKey, TPayload>(this, observer, this.duration);
return this.Source.Properties.IsColumnar
? GetPipe(this, observer)
: new ExtendLifetimePipe<TKey, TPayload>(this, observer, this.duration);
}
var outputType = typeof(PartitionedExtendLifetimePipe<,,>).MakeGenericType(
typeof(TKey),

Просмотреть файл

@ -56,16 +56,11 @@ namespace Microsoft.StreamProcessing
sw.Start();
#endif
ExtendLifetimeBaseTemplate template;
var className = string.Format("GeneratedExtendLifetime_{0}", ExtendLifetimeSequenceNumber++);
var className = $"GeneratedExtendLifetime_{ExtendLifetimeSequenceNumber++}";
if (duration < 0)
{
template = new ExtendLifetimeNegativeTemplate(className, typeof(TKey), typeof(TPayload));
}
else
{
template = new ExtendLifetimeTemplate(className, typeof(TKey), typeof(TPayload));
}
template = duration < 0
? new ExtendLifetimeNegativeTemplate(className, typeof(TKey), typeof(TPayload))
: (ExtendLifetimeBaseTemplate)new ExtendLifetimeTemplate(className, typeof(TKey), typeof(TPayload));
template.ActiveEventType = typeof(TPayload).GetTypeInfo().IsValueType ? template.TPayload : "Active_Event";
@ -88,7 +83,7 @@ namespace Microsoft.StreamProcessing
template.useCompiledPayloadComparer = useCompiledPayloadComparer;
if (useCompiledPayloadComparer)
{
template.payloadComparer = (left, right) => string.Format("payloadComparer({0}.Payload, {1}.Payload", left, right);
template.payloadComparer = (left, right) => $"payloadComparer({left}.Payload, {right}.Payload";
}
else
{

Просмотреть файл

@ -65,7 +65,7 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
string generatedClassName = string.Format("GeneratedGroupStreamable_{0}", GroupStreamableSequenceNumber++);
string generatedClassName = $"GeneratedGroupStreamable_{GroupStreamableSequenceNumber++}";
string transformedKeySelectorAsString;
MyFieldInfo swingingField = default;
@ -132,8 +132,8 @@ namespace Microsoft.StreamProcessing
var transformedPredicate = Extensions.TransformUnaryFunction<TOuterKey, TSource>(keySelector).Body;
template.transformedKeySelectorAsString = transformedPredicate.ExpressionToCSharp();
template.inlinedHashCodeComputation = "hashCodeVector.col[i]";
var fieldName = ((MemberExpression)(keySelector.Body)).Member.Name;
template.vectorHashCodeInitialization = string.Format("resultBatch.hash = {0}{1}_col.GetHashCode(batch.bitvector);", Transformer.ColumnFieldPrefix, fieldName);
var fieldName = ((MemberExpression)keySelector.Body).Member.Name;
template.vectorHashCodeInitialization = $"resultBatch.hash = {Transformer.ColumnFieldPrefix}{fieldName}_col.GetHashCode(batch.bitvector);";
template.swingingHashColumn = true;
}

Просмотреть файл

@ -115,12 +115,10 @@ using Microsoft.StreamProcessing.Aggregates;
"MIT License\r\n// ****************************************************************" +
"*****\r\n");
var resultMessageMemoryPoolGenericParameters = string.Format("<{0}, {1}>", TKey, TResult);
var resultMessageMemoryPoolGenericParameters = $"<{TKey}, {TResult}>";
if (resultType == typeof(int) || resultType == typeof(long) || resultType == typeof(string)) resultMessageMemoryPoolGenericParameters = string.Empty;
getOutputBatch = string.Format("this.pool.Get(out genericOutputbatch); this.batch = ({0}{1})genericOutputbatch;",
Transformer.GetBatchClassName(typeof(Empty), resultType),
UnitTResultGenericParameters);
getOutputBatch = $"this.pool.Get(out genericOutputbatch); this.batch = ({Transformer.GetBatchClassName(typeof(Empty), resultType)}{UnitTResultGenericParameters})genericOutputbatch;";
this.Write("\r\n// TKey: ");

Просмотреть файл

@ -9,12 +9,10 @@
// Licensed under the MIT License
// *********************************************************************
<#
var resultMessageMemoryPoolGenericParameters = string.Format("<{0}, {1}>", TKey, TResult);
var resultMessageMemoryPoolGenericParameters = $"<{TKey}, {TResult}>";
if (resultType == typeof(int) || resultType == typeof(long) || resultType == typeof(string)) resultMessageMemoryPoolGenericParameters = string.Empty;
getOutputBatch = string.Format("this.pool.Get(out genericOutputbatch); this.batch = ({0}{1})genericOutputbatch;",
Transformer.GetBatchClassName(typeof(Empty), resultType),
UnitTResultGenericParameters);
getOutputBatch = $"this.pool.Get(out genericOutputbatch); this.batch = ({Transformer.GetBatchClassName(typeof(Empty), resultType)}{UnitTResultGenericParameters})genericOutputbatch;";
#>

Просмотреть файл

@ -82,7 +82,7 @@ namespace Microsoft.StreamProcessing
template.TResult = resultType.GetCSharpSourceSyntax(); // BUGBUG: need to get any generic parameters needed
template.isUngrouped = (keyType == typeof(Empty));
template.className = string.Format("GeneratedGroupedAggregate_{0}", GroupedAggregateSequenceNumber++);
template.className = $"GeneratedGroupedAggregate_{GroupedAggregateSequenceNumber++}";
var inputMessageRepresentation = new ColumnarRepresentation(inputType);
@ -160,7 +160,7 @@ namespace Microsoft.StreamProcessing
else
{
template.useCompiledAccumulate = true;
template.accumulate = (s1, s2) => string.Format("accumulate({0}, {1}, batch[i]);", s1, s2);
template.accumulate = (s1, s2) => $"accumulate({s1}, {s2}, batch[i]);";
}
}
@ -180,7 +180,7 @@ namespace Microsoft.StreamProcessing
else
{
template.useCompiledDeaccumulate = true;
template.deaccumulate = (s1, s2) => string.Format("deaccumulate({0}, {1}, batch[i]);", s1, s2);
template.deaccumulate = (s1, s2) => $"deaccumulate({s1}, {s2}, batch[i]);";
}
}
@ -200,7 +200,7 @@ namespace Microsoft.StreamProcessing
else
{
template.useCompiledDifference = true;
template.deaccumulate = (s1, s2) => string.Format("difference({0}, {1});", s1, s2);
template.deaccumulate = (s1, s2) => $"difference({s1}, {s2});";
}
}
@ -235,7 +235,7 @@ namespace Microsoft.StreamProcessing
else
{
template.useCompiledComputeResult = true;
template.computeResult = (stateArg) => "computeResult(" + stateArg + ")";
template.computeResult = (stateArg) => $"computeResult({stateArg})";
}
}
#endregion
@ -262,7 +262,7 @@ namespace Microsoft.StreamProcessing
};
var sb = new System.Text.StringBuilder();
sb.AppendLine("{");
sb.AppendLine(string.Format("var {0} = {1};\n", resultSelector.Parameters.ElementAt(1).Name, aggregateResult));
sb.AppendLine($"var {resultSelector.Parameters.ElementAt(1).Name} = {aggregateResult};\n");
foreach (var kv in projectionResult.ComputedFields)
{
var f = kv.Key;

Просмотреть файл

@ -30,7 +30,7 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (this.Properties.IsColumnar && !this.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of payload, '{0}', to LASJ does not have a valid equality operator for columnar mode.", typeof(TLeft).FullName));
throw new InvalidOperationException($"Type of payload, '{typeof(TLeft).FullName}', to LASJ does not have a valid equality operator for columnar mode.");
}
}
@ -39,8 +39,9 @@ namespace Microsoft.StreamProcessing
var part = typeof(TKey).GetPartitionType();
if (part == null)
{
if (this.properties.IsColumnar) return GetPipe(observer, this.Left.Properties.IsConstantDuration, this.Right.Properties.IsConstantDuration);
else return new LeftAntiSemiJoinPipe<TKey, TLeft, TRight>(this, observer);
return this.properties.IsColumnar
? GetPipe(observer, this.Left.Properties.IsConstantDuration, this.Right.Properties.IsConstantDuration)
: new LeftAntiSemiJoinPipe<TKey, TLeft, TRight>(this, observer);
}
var outputType = typeof(PartitionedLeftAntiSemiJoinPipe<,,,>).MakeGenericType(

Просмотреть файл

@ -66,7 +66,7 @@ namespace Microsoft.StreamProcessing
var gps = tm.GenericTypeVariables(keyType, leftType, rightType);
template.genericParameters = gps.BracketedCommaSeparatedString();
template.className = string.Format("GeneratedLeftAntiSemiJoin_{0}", LASJSequenceNumber++);
template.className = $"GeneratedLeftAntiSemiJoin_{LASJSequenceNumber++}";
var leftMessageRepresentation = new ColumnarRepresentation(leftType);
var resultRepresentation = new ColumnarRepresentation(leftType);

Просмотреть файл

@ -35,7 +35,7 @@ namespace Microsoft.StreamProcessing
sw.Start();
#endif
var template = new PointAtEndTemplate(
string.Format("GeneratedPointAtEnd_{0}", PointAtEndSequenceNumber++),
$"GeneratedPointAtEnd_{PointAtEndSequenceNumber++}",
typeof(TKey), typeof(TPayload));
return template.Generate<TKey, TPayload>(typeof(IStreamable<,>));

Просмотреть файл

@ -25,7 +25,7 @@ namespace Microsoft.StreamProcessing
sw.Start();
#endif
var template = new QuantizeLifetimeTemplate(
string.Format("GeneratedQuantizeLifetime_{0}", QuantizeLifetimeSequenceNumber++),
$"GeneratedQuantizeLifetime_{QuantizeLifetimeSequenceNumber++}",
typeof(TKey), typeof(TPayload));
return template.Generate<TKey, TPayload>(typeof(IStreamable<,>));

Просмотреть файл

@ -80,7 +80,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
var genericParameters = Transformer.GenericParameterList(genericTypeNames);
var genericParameters2 = string.Format("<{0}, {1}>", TKey, TPayload);
var genericParameters2 = $"<{TKey}, {TPayload}>";
if (payloadType == typeof(int) || payloadType == typeof(long) || payloadType == typeof(string)) genericParameters2 = string.Empty;
this.Write("\r\n[DataContract]\r\ninternal sealed class ");

Просмотреть файл

@ -53,7 +53,7 @@ using <#= this.payloadType.Namespace #>;
var genericParameters = Transformer.GenericParameterList(genericTypeNames);
var genericParameters2 = string.Format("<{0}, {1}>", TKey, TPayload);
var genericParameters2 = $"<{TKey}, {TPayload}>";
if (payloadType == typeof(int) || payloadType == typeof(long) || payloadType == typeof(string)) genericParameters2 = string.Empty;
#>

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Diagnostics.Contracts;
using System.Globalization;
using System.Reflection;
namespace Microsoft.StreamProcessing
@ -16,9 +15,7 @@ namespace Microsoft.StreamProcessing
public RowToColumnTemplate(string className, Type keyType, Type payloadType)
: base(className, keyType, payloadType, payloadType)
{
this.fields = payloadType.GetAnnotatedFields().Item1; // Do we need this special case?
}
=> this.fields = payloadType.GetAnnotatedFields().Item1; // Do we need this special case?
internal static Tuple<Type, string> Generate<TKey, TPayload>(RowToColumnStreamable<TKey, TPayload> stream)
{
@ -29,7 +26,7 @@ namespace Microsoft.StreamProcessing
var keyType = typeof(TKey);
var payloadType = typeof(TPayload);
var generatedClassName = string.Format(CultureInfo.InvariantCulture, "RowToColumnUnaryPipeGeneratedFrom_{0}_{1}_{2}", keyType.GetValidIdentifier(), payloadType.GetValidIdentifier(), RowToColumnSequenceNumber++);
var generatedClassName = $"RowToColumnUnaryPipeGeneratedFrom_{keyType.GetValidIdentifier()}_{payloadType.GetValidIdentifier()}_{RowToColumnSequenceNumber++}";
var template = new RowToColumnTemplate(generatedClassName, keyType, payloadType);
var expandedCode = template.TransformText();

Просмотреть файл

@ -71,16 +71,13 @@ namespace Microsoft.StreamProcessing
var lookupKey = CacheKey.Create(this.Selector.ExpressionToCSharp(), this.HasStartEdge, this.HasKey);
var generatedPipeType = cachedPipes.GetOrAdd(lookupKey, key => SelectTemplate.Generate(this));
Func<PlanNode, IQueryObject, PlanNode> planNode = ((PlanNode p, IQueryObject o) => new SelectPlanNode(p, o, typeof(TKey), typeof(TPayload), typeof(TResult), this.Selector, this.HasKey, this.HasStartEdge, true, generatedPipeType.Item2));
Func<PlanNode, IQueryObject, PlanNode> planNode = (PlanNode p, IQueryObject o) => new SelectPlanNode(p, o, typeof(TKey), typeof(TPayload), typeof(TResult), this.Selector, this.HasKey, this.HasStartEdge, true, generatedPipeType.Item2);
var instance = Activator.CreateInstance(generatedPipeType.Item1, this, observer, planNode);
var returnValue = (UnaryPipe<TKey, TPayload, TResult>)instance;
return returnValue;
}
public override string ToString()
{
return "Select(" + this.Selector.ExpressionToCSharp() + ")";
}
public override string ToString() => "Select(" + this.Selector.ExpressionToCSharp() + ")";
}
}

Просмотреть файл

@ -62,7 +62,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
this.Write("\r\n// Computed Fields: ");
this.Write(this.ToStringHelper.ToStringWithCulture(String.Join(",", this.computedFields.Keys.Select(f => f.OriginalName))));
this.Write("\r\n// Swinging Fields: ");
this.Write(this.ToStringHelper.ToStringWithCulture(String.Join(",", this.swingingFields.Select(tup => string.Format("<{0},{1}>", tup.Item1.OriginalName, tup.Item2.Name)))));
this.Write(this.ToStringHelper.ToStringWithCulture(String.Join(",", this.swingingFields.Select(tup => $"<{tup.Item1.OriginalName},{tup.Item2.Name}>"))));
this.Write("\r\n\r\n[DataContract]\r\ninternal sealed class ");
this.Write(this.ToStringHelper.ToStringWithCulture(className));
this.Write(this.ToStringHelper.ToStringWithCulture(genericParameters));

Просмотреть файл

@ -30,7 +30,7 @@ using <#= this.resultType.Namespace #>;
// Source Fields: <#= String.Join(",", this.fields.Select(f => f.OriginalName)) #>
// Destination Fields: <#= String.Join(",", this.destinationFields.Select(f => f.OriginalName)) #>
// Computed Fields: <#= String.Join(",", this.computedFields.Keys.Select(f => f.OriginalName)) #>
// Swinging Fields: <#= String.Join(",", this.swingingFields.Select(tup => string.Format("<{0},{1}>", tup.Item1.OriginalName, tup.Item2.Name))) #>
// Swinging Fields: <#= String.Join(",", this.swingingFields.Select(tup => $"<{tup.Item1.OriginalName},{tup.Item2.Name}>")) #>
[DataContract]
internal sealed class <#= className #><#= genericParameters #> : UnaryPipe<<#= TKey #>, <#= TPayload #>, <#= TResult #>>

Просмотреть файл

@ -82,7 +82,7 @@ namespace Microsoft.StreamProcessing
template.genericParameters = gps.BracketedCommaSeparatedString();
template.numberOfGenericParameters = gps.Count();
template.TKeyTResultGenericParameters = tm.GenericTypeVariables(keyType, resultType).BracketedCommaSeparatedString();
template.MemoryPoolGenericParameters = string.Format("<{0}, {1}>", template.TKey, template.TResult);
template.MemoryPoolGenericParameters = $"<{template.TKey}, {template.TResult}>";
if (resultType == typeof(int) || resultType == typeof(long) || resultType == typeof(string))
template.MemoryPoolGenericParameters = string.Empty;

Просмотреть файл

@ -56,7 +56,7 @@ namespace Microsoft.StreamProcessing
string errorMessages = null;
try
{
generatedClassName = string.Format("SelectMany_{0}", sequenceNumber++);
generatedClassName = $"SelectMany_{sequenceNumber++}";
var keyType = typeof(TKey);
var payloadType = typeof(TPayload);
var resultType = typeof(TResult);
@ -67,7 +67,7 @@ namespace Microsoft.StreamProcessing
template.genericParameters = gps.BracketedCommaSeparatedString();
template.numberOfGenericParameters = gps.Count();
template.TKeyTResultGenericParameters = tm.GenericTypeVariables(keyType, resultType).BracketedCommaSeparatedString();
template.MemoryPoolGenericParameters = string.Format("<{0}, {1}>", template.TKey, template.TResult);
template.MemoryPoolGenericParameters = $"<{template.TKey}, {template.TResult}>";
if (resultType == typeof(int) || resultType == typeof(long) || resultType == typeof(string))
template.MemoryPoolGenericParameters = string.Empty;

Просмотреть файл

@ -35,7 +35,7 @@ namespace Microsoft.StreamProcessing
sw.Start();
#endif
var template = new SessionWindowTemplate(
string.Format("GeneratedSessionWindow_{0}", SessionWindowSequenceNumber++),
$"GeneratedSessionWindow_{SessionWindowSequenceNumber++}",
typeof(TKey), typeof(TPayload));
return template.Generate<TKey, TPayload>(typeof(IStreamable<,>));

Просмотреть файл

@ -122,8 +122,8 @@ namespace Microsoft.StreamProcessing
keySelector.IsSimpleFieldOrPropertyAccess())
{
template.inlinedHashCodeComputation = "hashCodeVector.col[i]";
var fieldName = ((MemberExpression)(keySelector.Body)).Member.Name;
template.vectorHashCodeInitialization = string.Format(CultureInfo.InvariantCulture, "var hashCodeVector = {0}{1}_col.GetHashCode(batch.bitvector);", Transformer.ColumnFieldPrefix, fieldName);
var fieldName = ((MemberExpression)keySelector.Body).Member.Name;
template.vectorHashCodeInitialization = $"var hashCodeVector = {Transformer.ColumnFieldPrefix}{fieldName}_col.GetHashCode(batch.bitvector);";
}
}
}

Просмотреть файл

@ -101,16 +101,16 @@ namespace Microsoft.StreamProcessing
var getHashcodeExpression = keyComparer.GetGetHashCodeExpr();
template.inlinedKeyComparerEquals =
(left, right) =>
string.Format("({0})", equalsExpression.Inline(left, right));
$"({equalsExpression.Inline(left, right)})";
template.inlinedKeyComparerGetHashCode =
(x) =>
string.Format("({0}/* inlined GetHashCode */)", getHashcodeExpression.Inline(x));
$"({getHashcodeExpression.Inline(x)}/* inlined GetHashCode */)";
if (keyType.IsAnonymousType())
{
template.inlinedKeyComparerEquals =
(left, right) => string.Format("keyComparerEquals({0}, {1})", left, right);
(left, right) => $"keyComparerEquals({left}, {right})";
template.inlinedKeyComparerGetHashCode =
(x) => string.Format("keyComparerGetHashCode({0})", x);
(x) => $"keyComparerGetHashCode({x})";
}
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(equalsExpression));
assemblyReferences.AddRange(Transformer.AssemblyReferencesNeededFor(getHashcodeExpression));

Просмотреть файл

@ -3,7 +3,6 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Globalization;
using Microsoft.StreamProcessing.Internal.Collections;
namespace Microsoft.StreamProcessing
@ -19,7 +18,7 @@ namespace Microsoft.StreamProcessing
// This operator uses the equality method on payloads
if (this.Properties.IsColumnar && !this.Properties.PayloadEqualityComparer.CanUsePayloadEquality())
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Type of payload, '{0}', to Stitch does not have a valid equality operator for columnar mode.", typeof(TPayload).FullName));
throw new InvalidOperationException($"Type of payload, '{typeof(TPayload).FullName}', to Stitch does not have a valid equality operator for columnar mode.");
}
Initialize();
@ -30,8 +29,9 @@ namespace Microsoft.StreamProcessing
var part = typeof(TKey).GetPartitionType();
if (part == null)
{
if (this.Source.Properties.IsColumnar) return GetPipe(observer);
else return new StitchPipe<TKey, TPayload>(this, observer);
return this.Source.Properties.IsColumnar
? GetPipe(observer)
: new StitchPipe<TKey, TPayload>(this, observer);
}
var outputType = typeof(PartitionedStitchPipe<,,>).MakeGenericType(

Просмотреть файл

@ -12,11 +12,9 @@ using Microsoft.StreamProcessing.Internal.Collections;
namespace Microsoft.StreamProcessing
{
internal class MultiStringTransformer
internal sealed class MultiStringTransformer
{
private MultiStringTransformer(Type t) {
this.batchType = t;
}
private MultiStringTransformer(Type t) => this.batchType = t;
private readonly Type batchType;
@ -67,21 +65,17 @@ namespace Microsoft.StreamProcessing
return vectorVisitor.IsVectorizable;
}
private class VectorizableVisitor : ExpressionVisitor
private sealed class VectorizableVisitor : ExpressionVisitor
{
public bool IsVectorizable = true;
private readonly Type batchType;
public VectorizableVisitor(Type batchType)
{
this.batchType = batchType;
}
public VectorizableVisitor(Type batchType) => this.batchType = batchType;
public static bool IsMemberOnBatchField(MemberExpression memberBinding, Type batchType)
{
if (memberBinding == null) return false;
return memberBinding.Member.DeclaringType == batchType;
}
=> memberBinding == null
? false
: memberBinding.Member.DeclaringType == batchType;
protected override Expression VisitBinary(BinaryExpression node)
{
@ -97,6 +91,7 @@ namespace Microsoft.StreamProcessing
return node;
}
}
protected override Expression VisitUnary(UnaryExpression node)
{
switch (node.NodeType)
@ -109,6 +104,7 @@ namespace Microsoft.StreamProcessing
return node;
}
}
protected override Expression VisitMethodCall(MethodCallExpression methodCall)
{
var calledMethod = methodCall.Method;
@ -145,17 +141,17 @@ namespace Microsoft.StreamProcessing
var calledMethod = methodCall.Method;
// static bool System.Text.RegularExpressions.Regex.IsMatch(string input, string pattern)
return (calledMethod.DeclaringType.Equals(typeof(System.Text.RegularExpressions.Regex)) &&
return calledMethod.DeclaringType.Equals(typeof(System.Text.RegularExpressions.Regex)) &&
calledMethod.IsStatic &&
calledMethod.Name.Equals("IsMatch") &&
methodCall.Arguments.Count == 2 &&
methodCall.Arguments.All(a => a.Type.Equals(typeof(string))) &&
methodCall.Arguments.All(a => !SearchForMultiStringMethod.ContainsMultiStringCall(batchType, a)) &&
IsMemberOnBatchField(methodCall.Arguments.ElementAt(0) as MemberExpression, batchType));
IsMemberOnBatchField(methodCall.Arguments.ElementAt(0) as MemberExpression, batchType);
}
}
private class Vectorize : ExpressionVisitor
private sealed class Vectorize : ExpressionVisitor
{
private static int counter = 0;
public List<string> vectorStatements = new List<string>();
@ -164,10 +160,7 @@ namespace Microsoft.StreamProcessing
private string resultBV;
private readonly Type batchType;
private Vectorize(Type batchType)
{
this.batchType = batchType;
}
private Vectorize(Type batchType) => this.batchType = batchType;
public static IEnumerable<string> Transform(Type batchType, Expression e)
{
@ -182,15 +175,6 @@ namespace Microsoft.StreamProcessing
return me.vectorStatements;
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider",
MessageId = "System.string.Format(System.String,System.Object,System.Object)",
Justification = "This is CLR API substitution, additional intent should not be added on behalf of the user of the API."),
System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider",
MessageId = "System.string.Format(System.String,System.Object)",
Justification = "This is CLR API substitution, additional intent should not be added on behalf of the user of the API."),
System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider",
MessageId = "System.string.Format(System.String,System.Object,System.Object,System.Object)",
Justification="This is CLR API substitution, additional intent should not be added on behalf of the user of the API.")]
protected override Expression VisitBinary(BinaryExpression node)
{
string left_result;
@ -201,20 +185,18 @@ namespace Microsoft.StreamProcessing
left_result = this.resultBV;
var bv_i = "bv" + counter++;
// var bv_i = invert(left_result) | incomingBV;
this.vectorStatements.Add(string.Format(
"var {0} = MultiString.InvertLeftThenOrWithRight({1}, {2}, this.pool.bitvectorPool);",
bv_i, left_result, this.incomingBV));
this.vectorStatements.Add($"var {bv_i} = MultiString.InvertLeftThenOrWithRight({left_result}, {this.incomingBV}, this.pool.bitvectorPool);");
this.incomingBV = bv_i;
Visit(node.Right);
// free bv_i
this.vectorStatements.Add(string.Format("{0}.ReturnClear();", bv_i));
this.vectorStatements.Add($"{bv_i}.ReturnClear();");
// this.resultBV &= left_result;
this.vectorStatements.Add(string.Format("MultiString.AndEquals({0}, {1});", this.resultBV, left_result));
this.vectorStatements.Add($"MultiString.AndEquals({this.resultBV}, {left_result});");
// free left_result
this.vectorStatements.Add(string.Format("{0}.ReturnClear();", left_result));
this.vectorStatements.Add($"{left_result}.ReturnClear();");
break;
case ExpressionType.AndAlso:
@ -224,7 +206,7 @@ namespace Microsoft.StreamProcessing
Visit(node.Right);
// free left_result
this.vectorStatements.Add(string.Format("{0}.ReturnClear();", left_result));
this.vectorStatements.Add($"{left_result}.ReturnClear();");
break;
default:
@ -283,7 +265,7 @@ namespace Microsoft.StreamProcessing
}
else
{
this.resultBV = string.Format("bv{0}", counter++);
this.resultBV = $"bv{counter++}";
s = "var " + this.resultBV + " = ";
}
s += methodCall;
@ -297,10 +279,7 @@ namespace Microsoft.StreamProcessing
private bool found = false;
private readonly Type batchType;
private SearchForMultiStringMethod(Type t)
{
this.batchType = t;
}
private SearchForMultiStringMethod(Type t) => this.batchType = t;
public static bool ContainsMultiStringCall(Type batchType, Expression e)
{
@ -313,8 +292,9 @@ namespace Microsoft.StreamProcessing
{
if (memberBinding == null) return false;
var field = memberBinding.Member as FieldInfo;
if (field == null) return false;
return field.DeclaringType == this.batchType;
return field == null
? false
: field.DeclaringType == this.batchType;
}
protected override Expression VisitMethodCall(MethodCallExpression methodCall)
@ -372,10 +352,7 @@ namespace Microsoft.StreamProcessing
private readonly Type batchType;
public Dictionary<FieldInfo, ParameterExpression> multiStringTable = new Dictionary<FieldInfo, ParameterExpression>();
public WrapperTransformer(Type t)
{
this.batchType = t;
}
public WrapperTransformer(Type t) => this.batchType = t;
/// <summary>
/// Translate calls to method calls on multistrings into calls to MultiString wrapper methods

Просмотреть файл

@ -3,7 +3,6 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Globalization;
using System.IO;
using System.Runtime.Serialization;
using System.Text;
@ -169,7 +168,7 @@ namespace Microsoft.StreamProcessing.Serializer
if (read != array.Length)
{
throw new SerializationException(
string.Format(CultureInfo.InvariantCulture, "Unexpected end of stream: '{0}' bytes missing.", array.Length - read));
$"Unexpected end of stream: '{array.Length - read}' bytes missing.");
}
}

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.Serialization;
@ -20,10 +19,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
var getLength = this.RuntimeType.GetTypeInfo().GetProperty("Length");
if (getLength == null)
{
throw new SerializationException(
string.Format(
CultureInfo.InvariantCulture,
"Runtime type '{0}' is being serialized as array, but does not have 'Length' property.", this.RuntimeType));
throw new SerializationException($"Runtime type '{this.RuntimeType}' is being serialized as array, but does not have 'Length' property.");
}
var body = new List<Expression>();

Просмотреть файл

@ -136,10 +136,6 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
}
private static Exception UnexpectedNullValueException(Type type)
=> new SerializationException(
string.Format(
CultureInfo.InvariantCulture,
"Unexpected null value for the object of type '{0}'. Please check the schema.",
type));
=> new SerializationException($"Unexpected null value for the object of type '{type}'. Please check the schema.");
}
}

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.Serialization;
@ -226,8 +225,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
?? this.RuntimeType.GetMethodByName("Push", typeof(TItem));
if (result == null)
throw new SerializationException(
string.Format(CultureInfo.InvariantCulture, "Collection type '{0}' does not have Add/Enqueue/Push method.", this.RuntimeType));
throw new SerializationException($"Collection type '{this.RuntimeType}' does not have Add/Enqueue/Push method.");
return result;
}
}

Просмотреть файл

@ -4,7 +4,6 @@
// *********************************************************************
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
@ -84,7 +83,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private ObjectSerializerBase CreateSchema(Type type, uint currentDepth)
{
if (currentDepth == this.settings.MaxItemsInSchemaTree)
throw new SerializationException(string.Format(CultureInfo.InvariantCulture, "Maximum depth of object graph reached."));
throw new SerializationException("Maximum depth of object graph reached.");
var surrogate = this.settings.Surrogate;
if (surrogate != null)
@ -96,8 +95,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
if (type.IsUnsupported())
{
throw new SerializationException(
string.Format(CultureInfo.InvariantCulture, "Type '{0}' is not supported.", type));
throw new SerializationException($"Type '{type}' is not supported.");
}
}
@ -143,7 +141,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
// Others
if (typeInfo.IsClass || typeInfo.IsValueType) return BuildRecordTypeSchema(type, currentDepth);
throw new SerializationException(string.Format(CultureInfo.InvariantCulture, "Type '{0}' is not supported.", type));
throw new SerializationException($"Type '{type}' is not supported.");
}
private ObjectSerializerBase BuildEnumTypeSchema(Type type)
@ -184,8 +182,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
var applicable = this.knownTypes.Where(t => t.CanBeKnownTypeOf(type)).ToList();
if (applicable.Count == 0)
{
throw new SerializationException(
string.Format(CultureInfo.InvariantCulture, "Could not find any matching known type for '{0}'.", type));
throw new SerializationException($"Could not find any matching known type for '{type}'.");
}
if (!type.IsAbstract && !type.IsInterface && !this.knownTypes.Contains(type)) applicable.Add(type);

Просмотреть файл

@ -107,9 +107,7 @@ namespace Microsoft.StreamProcessing.Sharding
}
public void OnNext(StreamMessage<TKey, TPayload> value)
{
this.elements.Add(new QueuedMessage<StreamMessage<TKey, TPayload>> { Kind = MessageKind.DataBatch, Message = value });
}
=> this.elements.Add(new QueuedMessage<StreamMessage<TKey, TPayload>> { Kind = MessageKind.DataBatch, Message = value });
public void Wait()
{
@ -131,8 +129,6 @@ namespace Microsoft.StreamProcessing.Sharding
/// <param name="source">The sharded streamable to cache</param>
/// <returns>A cached sharded streamable</returns>
public static ShardedStreamCache<TKey, TPayload> Cache<TKey, TPayload>(this IShardedStreamable<TKey, TPayload> source)
{
return new ShardedStreamCache<TKey, TPayload>(source);
}
=> new ShardedStreamCache<TKey, TPayload>(source);
}
}
}

Просмотреть файл

@ -103,7 +103,6 @@ namespace Microsoft.StreamProcessing.Sharding
}
}
/// <summary>
/// Static class to provide deserialization services to sharded streamable from and to binary streams
/// </summary>
@ -156,8 +155,6 @@ namespace Microsoft.StreamProcessing.Sharding
/// <param name="async">States whether serialization should be able to be done asynchronously</param>
/// <param name="writePropertiesToStream">Write stream properties to the binary stream</param>
public static void ToBinaryStream<TKey, TPayload>(this IShardedStreamable<TKey, TPayload> source, Stream[] destinations, bool async = false, bool writePropertiesToStream = false)
{
new ShardedStreamSerializer<TKey, TPayload>(source, destinations, async, writePropertiesToStream);
}
=> new ShardedStreamSerializer<TKey, TPayload>(source, destinations, async, writePropertiesToStream);
}
}

Просмотреть файл

@ -25,17 +25,13 @@ namespace Microsoft.StreamProcessing.Sharding
/// Default constructor - do not use directly
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public ShardedStreamable()
{ }
public ShardedStreamable() { }
/// <summary>
/// Constructor to create a new sharded streamable instance from an existing set of streamables
/// </summary>
/// <param name="streamables"></param>
public ShardedStreamable(IStreamable<TKey, TPayload>[] streamables)
{
this.streamables = streamables;
}
public ShardedStreamable(IStreamable<TKey, TPayload>[] streamables) => this.streamables = streamables;
/// <summary>
/// Constructor to create a new sharded streamable instance from an existing set of sharded streamables

Просмотреть файл

@ -40,9 +40,7 @@ namespace Microsoft.StreamProcessing.Sharding
/// <param name="source">The stream to be returned from sharded to strictly a unified stream</param>
/// <returns>A streamable brought together from all shards</returns>
public static IStreamable<Empty, TPayload> Unshard<TPayload>(this IShardedStreamable<Empty, TPayload> source)
{
return new MultiUnionStreamable<Empty, TPayload>(((ShardedStreamable<Empty, TPayload>)source).Streamables, false);
}
=> new MultiUnionStreamable<Empty, TPayload>(((ShardedStreamable<Empty, TPayload>)source).Streamables, false);
/// <summary>
/// Unshard operation on a partitioned stream
@ -52,9 +50,6 @@ namespace Microsoft.StreamProcessing.Sharding
/// <param name="source">The stream to be returned from sharded to strictly a unified stream</param>
/// <returns>A streamable brought together from all shards</returns>
public static IStreamable<PartitionKey<TKey>, TPayload> Unshard<TKey, TPayload>(this IShardedStreamable<PartitionKey<TKey>, TPayload> source)
{
return new MultiUnionStreamable<PartitionKey<TKey>, TPayload>(((ShardedStreamable<PartitionKey<TKey>, TPayload>)source).Streamables, false);
}
=> new MultiUnionStreamable<PartitionKey<TKey>, TPayload>(((ShardedStreamable<PartitionKey<TKey>, TPayload>)source).Streamables, false);
}
}

Просмотреть файл

@ -168,6 +168,19 @@ namespace Microsoft.StreamProcessing
public static IAbstractPattern<TKey, TPayload, Empty, bool> DefinePattern<TKey, TPayload>(this IStreamable<TKey, TPayload> source)
=> new PatternMatcher<TKey, TPayload, Empty, bool>(source);
/// <summary>
/// Define a pattern against which data in the input stream may be matched
/// </summary>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TPayload">Payload type</typeparam>
/// <typeparam name="TRegister">Result type (output of matcher is the register at an accepting state of the AFA)</typeparam>
/// <param name="source">Source stream</param>
/// <param name="defaultRegister">Default register value for the automata</param>
/// <returns>The beginning of a builder from which a pattern may be defined</returns>
public static IAbstractPattern<TKey, TPayload, TRegister, bool> DefinePattern<TKey, TPayload, TRegister>(
this IStreamable<TKey, TPayload> source, TRegister defaultRegister)
=> new PatternMatcher<TKey, TPayload, TRegister, bool>(source, null, defaultRegister);
/// <summary>
/// Define a pattern against which data in the input stream may be matched
/// </summary>

Просмотреть файл

@ -5,7 +5,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
@ -159,7 +158,7 @@ namespace Microsoft.StreamProcessing
{
if (this.doMultiStringTransform && IsMultiStringCall(body, out string s))
{
this.multiStringOperations.Add(string.Format(CultureInfo.InvariantCulture, "resultBatch.{0} = {1};", this.resultTypeInformation.PseudoField.Name, s));
this.multiStringOperations.Add($"resultBatch.{this.resultTypeInformation.PseudoField.Name} = {s};");
}
else
{
@ -330,7 +329,7 @@ namespace Microsoft.StreamProcessing
if (this.doMultiStringTransform && IsMultiStringCall(argument, out string s))
{
this.multiStringResultFields.Add(resultField);
this.multiStringOperations.Add(string.Format(CultureInfo.InvariantCulture, "resultBatch.{0} = {1};", resultField.Name, s));
this.multiStringOperations.Add($"resultBatch.{resultField.Name} = {s};");
continue;
}
}
@ -374,7 +373,7 @@ namespace Microsoft.StreamProcessing
if (this.doMultiStringTransform && IsMultiStringCall(node.Expression, out string s))
{
this.multiStringResultFields.Add(destinationColumn);
this.multiStringOperations.Add(string.Format(CultureInfo.InvariantCulture, "resultBatch.{0} = {1};", destinationColumn.Name, s));
this.multiStringOperations.Add($"resultBatch.{destinationColumn.Name} = {s};");
}
else
{
@ -475,7 +474,7 @@ namespace Microsoft.StreamProcessing
}
if (e == null) return false;
var s = string.Format(CultureInfo.InvariantCulture, "sourceBatch.{0}{1}.{2}", Transformer.ColumnFieldPrefix, memberInfo.Name, MapStringArgsToMultiStringArgs(methodCall));
var s = $"sourceBatch.{Transformer.ColumnFieldPrefix}{memberInfo.Name}.{MapStringArgsToMultiStringArgs(methodCall)}";
vectorCall = s;
return true;
}
@ -494,12 +493,7 @@ namespace Microsoft.StreamProcessing
}
if (e == null) return false;
var s = string.Format(
CultureInfo.InvariantCulture,
"{0}{1}_col.{2}(batch.bitvector, false);",
Transformer.ColumnFieldPrefix,
memberInfo.Name,
memberBinding.Member.Name);
var s = $"{Transformer.ColumnFieldPrefix}{memberInfo.Name}_col.{memberBinding.Member.Name}(batch.bitvector, false);";
vectorCall = s;
return true;
}
@ -559,8 +553,8 @@ namespace Microsoft.StreamProcessing
if (n == 1)
{
firstArgsToMultiStringCall = firstArgIsChar
? string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), 0, StringComparison.Ordinal", firstArgAsCSharpString)
: string.Format(CultureInfo.InvariantCulture, "{0}, 0, StringComparison.Ordinal", firstArgAsCSharpString);
? $"{firstArgAsCSharpString}.ToString(), 0, StringComparison.Ordinal"
: $"{firstArgAsCSharpString}, 0, StringComparison.Ordinal";
}
else
{
@ -570,13 +564,13 @@ namespace Microsoft.StreamProcessing
if (methodCall.Arguments.ElementAt(1).Type.Equals(typeof(int)))
{
firstArgsToMultiStringCall = firstArgIsChar
? string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString)
: string.Format(CultureInfo.InvariantCulture, "{0}, {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString);
? $"{firstArgAsCSharpString}.ToString(), {secondArgAsCSharpString}, StringComparison.Ordinal"
: $"{firstArgAsCSharpString}, {secondArgAsCSharpString}, StringComparison.Ordinal";
}
else
{
// IndexOf/LastIndexOf(string, StringComparison)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, 0, {1}", firstArgAsCSharpString, secondArgAsCSharpString);
firstArgsToMultiStringCall = $"{firstArgAsCSharpString}, 0, {secondArgAsCSharpString}";
}
}
else
@ -585,12 +579,12 @@ namespace Microsoft.StreamProcessing
if (n == 3)
{
if (firstArgIsChar) // IndexOf/LastIndexOf(char, int, int)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString);
firstArgsToMultiStringCall = $"{firstArgAsCSharpString}.ToString(), {secondArgAsCSharpString}, {thirdArgAsCSharpString}, StringComparison.Ordinal";
else
{
firstArgsToMultiStringCall = methodCall.Method.GetParameters().ElementAt(2).ParameterType.Equals(typeof(int))
? string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString)
: string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString);
? $"{firstArgAsCSharpString}, {secondArgAsCSharpString}, {thirdArgAsCSharpString}, StringComparison.Ordinal"
: $"{firstArgAsCSharpString}, {secondArgAsCSharpString}, {thirdArgAsCSharpString}";
}
}
else
@ -598,7 +592,7 @@ namespace Microsoft.StreamProcessing
Contract.Assume(n == 4, "meant to be exhaustive");
var fourthArgAsCSharpString = args[3];
// IndexOf/LastIndexOf(string, int, int, StringComparison)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}, {3}", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString, fourthArgAsCSharpString);
firstArgsToMultiStringCall = $"{firstArgAsCSharpString}, {secondArgAsCSharpString}, {thirdArgAsCSharpString}, {fourthArgAsCSharpString}";
}
}
}
@ -610,8 +604,8 @@ namespace Microsoft.StreamProcessing
}
var s = methodName.Equals("Substring")
? string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector)", methodToCall, firstArgsToMultiStringCall)
: string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector, false)", methodToCall, firstArgsToMultiStringCall);
? $"{methodToCall}({firstArgsToMultiStringCall}, sourceBatch.bitvector)"
: $"{methodToCall}({firstArgsToMultiStringCall}, sourceBatch.bitvector, false)";
return s;
}

Просмотреть файл

@ -417,25 +417,23 @@ namespace System.Runtime.CompilerServices
internal static string GetBatchClassName(Type keyType, Type payloadType)
{
if (!payloadType.CanRepresentAsColumnar())
return string.Format(CultureInfo.InvariantCulture, "StreamMessage<{0}, {1}>", keyType.GetCSharpSourceSyntax(), payloadType.GetCSharpSourceSyntax());
return $"StreamMessage<{keyType.GetCSharpSourceSyntax()}, {payloadType.GetCSharpSourceSyntax()}>";
var dictionaryKey = CacheKey.Create(keyType, payloadType);
var generatedBatchClassName = batchType2Name.GetOrAdd(
return batchType2Name.GetOrAdd(
dictionaryKey,
key => string.Format(CultureInfo.InvariantCulture, "GeneratedBatch_{0}", BatchClassSequenceNumber++));
return generatedBatchClassName;
key => $"GeneratedBatch_{BatchClassSequenceNumber++}");
}
internal static string GetMemoryPoolClassName(Type keyType, Type payloadType)
{
if (!keyType.KeyTypeNeedsGeneratedMemoryPool() && payloadType.MemoryPoolHasGetMethodFor())
return string.Format(CultureInfo.InvariantCulture, "MemoryPool<{0}, {1}>", keyType.GetCSharpSourceSyntax(), payloadType.GetCSharpSourceSyntax());
return $"MemoryPool<{keyType.GetCSharpSourceSyntax()}, {payloadType.GetCSharpSourceSyntax()}>";
if (!payloadType.CanRepresentAsColumnar())
return string.Format(CultureInfo.InvariantCulture, "MemoryPool<{0}, {1}>", keyType.GetCSharpSourceSyntax(), payloadType.GetCSharpSourceSyntax());
return $"MemoryPool<{keyType.GetCSharpSourceSyntax()}, {payloadType.GetCSharpSourceSyntax()}>";
var generatedMemoryPoolName = string.Format(CultureInfo.InvariantCulture, "MemoryPool_{0}_{1}", keyType.Name.CleanUpIdentifierName(), payloadType.Name.CleanUpIdentifierName());
return generatedMemoryPoolName;
return $"MemoryPool_{keyType.Name.CleanUpIdentifierName()}_{payloadType.Name.CleanUpIdentifierName()}";
}
internal static string GetValidIdentifier(Type t) => t.GetCSharpSourceSyntax().CleanUpIdentifierName();
@ -939,7 +937,7 @@ namespace System.Runtime.CompilerServices
return method == null
? objectToConvert.ToString()
: (string)(method.Invoke(objectToConvert, new object[] { this.formatProviderField }));
: (string)method.Invoke(objectToConvert, new object[] { this.formatProviderField });
}
}

Просмотреть файл

@ -414,41 +414,37 @@ namespace Microsoft.StreamProcessing
}
private static string SchedToStr(IInternalScheduler sch)
{
return sch == null
=> sch == null
? "NULL"
: string.Format(
System.Globalization.CultureInfo.InvariantCulture,
"{0}(Map:{1}, Reduce:{2})", sch.GetType().Name, sch.MapArity, sch.ReduceArity);
}
/// <summary>
/// Provides a string representation of the configuration settings.
/// </summary>
/// <returns>A string representation of the configuration settings.</returns>
public static string Describe()
{
return new
{
MapArity,
ReduceArity,
ForceRowBasedExecution,
DeterministicWithinTimestamp,
ClearColumnsOnReturn,
DisableMemoryPooling,
DataBatchSize,
UseMultiString,
IngressSortingTechnique,
MultiStringTransforms,
Scheduler = SchedToStr(StreamScheduler.scheduler),
GeneratedCodePath,
CodegenOptions.GenerateDebugInfo,
CodegenOptions.BreakIntoCodeGen,
CodegenOptions.DontFallBackToRowBasedExecution,
CodegenOptions.SuperStrictColumnar,
CodegenOptions.CodeGenAfa,
}.ToString();
}
=> new
{
MapArity,
ReduceArity,
ForceRowBasedExecution,
DeterministicWithinTimestamp,
ClearColumnsOnReturn,
DisableMemoryPooling,
DataBatchSize,
UseMultiString,
IngressSortingTechnique,
MultiStringTransforms,
Scheduler = SchedToStr(StreamScheduler.scheduler),
GeneratedCodePath,
CodegenOptions.GenerateDebugInfo,
CodegenOptions.BreakIntoCodeGen,
CodegenOptions.DontFallBackToRowBasedExecution,
CodegenOptions.SuperStrictColumnar,
CodegenOptions.CodeGenAfa,
}.ToString();
}
// ConfigModifier allows to modify multiple Config values at once, guaranteeing that only
@ -639,12 +635,9 @@ namespace Microsoft.StreamProcessing
{
private readonly Action dispose;
public ConfigModifierDisposable(Action dispose) { this.dispose = dispose; }
public ConfigModifierDisposable(Action dispose) => this.dispose = dispose;
public void Dispose()
{
this.dispose();
}
public void Dispose() => this.dispose();
}
private interface IGatedModification
@ -658,18 +651,13 @@ namespace Microsoft.StreamProcessing
private Func<T, T> modifier;
public static GatedModification<T> Create(T newValue, Func<T, T> modifier)
{
return new GatedModification<T>
{
val = newValue,
modifier = modifier
};
}
=> new GatedModification<T>
{
val = newValue,
modifier = modifier
};
public void Modify()
{
this.val = this.modifier(this.val);
}
public void Modify() => this.val = this.modifier(this.val);
}
}
}

Просмотреть файл

@ -115,15 +115,11 @@ namespace Microsoft.StreamProcessing
}
public static LambdaExpression RemoveCastToObject(this LambdaExpression lambda)
{
if (lambda.Body is UnaryExpression body &&
(body.NodeType == ExpressionType.Convert || body.NodeType == ExpressionType.TypeAs) &&
(body.Type == typeof(object)))
{
return Expression.Lambda(body.Operand, lambda.Parameters);
}
return lambda;
}
=> lambda.Body is UnaryExpression body
&& (body.NodeType == ExpressionType.Convert || body.NodeType == ExpressionType.TypeAs)
&& (body.Type == typeof(object))
? Expression.Lambda(body.Operand, lambda.Parameters)
: lambda;
public static Expression ReplaceParametersInBody(this LambdaExpression lambda, Expression exp0)
=> ParameterSubstituter.Replace(lambda.Parameters[0], exp0, lambda.Body);
@ -435,20 +431,20 @@ namespace Microsoft.StreamProcessing
}
public static Expression Replace(ParameterExpression eOld, Expression eNew, Expression expression)
=> (new ParameterSubstituter(new Dictionary<ParameterExpression, Expression> { { eOld, eNew } })).Visit(expression);
=> new ParameterSubstituter(new Dictionary<ParameterExpression, Expression> { { eOld, eNew } }).Visit(expression);
public static Expression Replace(
ParameterExpression eOld, Expression eNew,
ParameterExpression eOld2, Expression eNew2,
Expression expression)
=> (new ParameterSubstituter(new Dictionary<ParameterExpression, Expression> { { eOld, eNew }, { eOld2, eNew2 } })).Visit(expression);
=> new ParameterSubstituter(new Dictionary<ParameterExpression, Expression> { { eOld, eNew }, { eOld2, eNew2 } }).Visit(expression);
public static Expression Replace(
ParameterExpression eOld, Expression eNew,
ParameterExpression eOld2, Expression eNew2,
ParameterExpression eOld3, Expression eNew3,
Expression expression)
=> (new ParameterSubstituter(new Dictionary<ParameterExpression, Expression> { { eOld, eNew }, { eOld2, eNew2 }, { eOld3, eNew3 } })).Visit(expression);
=> new ParameterSubstituter(new Dictionary<ParameterExpression, Expression> { { eOld, eNew }, { eOld2, eNew2 }, { eOld3, eNew3 } }).Visit(expression);
public static Expression<Func<PartitionKey<TPartitionKey>, TPayload, TResult>> FirstParameterAsPartitionKey<TPartitionKey, TPayload, TResult>(
Expression<Func<TPartitionKey, TPayload, TResult>> expression)
@ -521,30 +517,8 @@ namespace Microsoft.StreamProcessing
}
}
// Summary:
// Dispatches the expression to one of the more specialized visit methods in
// this class.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
public override Expression Visit(Expression node) => base.Visit(node);
//
// Summary:
// Visits the children of the System.Linq.Expressions.BinaryExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitBinary(BinaryExpression node)
{
if (!node.NodeType.ToString().Contains("Assign"))
@ -561,17 +535,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.BlockExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitBlock(BlockExpression node)
{
this.writer.WriteLine("{");
@ -586,31 +549,12 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.CatchBlock.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override CatchBlock VisitCatchBlock(CatchBlock node) {
this.writer.Write(node.ToString()); return base.VisitCatchBlock(node); }
protected override CatchBlock VisitCatchBlock(CatchBlock node)
{
this.writer.Write(node.ToString());
return base.VisitCatchBlock(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.ConditionalExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitConditional(ConditionalExpression node)
{
if (node.Type != typeof(void)
@ -624,7 +568,8 @@ namespace Microsoft.StreamProcessing
this.writer.Write(" : ");
base.Visit(node.IfFalse);
this.writer.Write(")");
} else
}
else
{
this.writer.Write("if (");
base.Visit(node.Test);
@ -641,17 +586,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the System.Linq.Expressions.ConstantExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitConstant(ConstantExpression node)
{
var t = node.Type;
@ -689,25 +623,18 @@ namespace Microsoft.StreamProcessing
{
// A constant of a non-primitive struct type must be a default value for that type
// At least, that is what is assumed here.
this.writer.Write(string.Format(CultureInfo.InvariantCulture, "default({0})", GetTypeName(t)));
this.writer.Write($"default({GetTypeName(t)})");
return null;
}
var isString = (t == typeof(string));
if (isString)
if (v == null) this.writer.Write("null");
else
{
if (v == null)
this.writer.Write("null");
else
if (t == typeof(string))
{
var s = (string)v;
this.writer.Write(ToLiteral(s));
}
}
else
{
if (v == null)
this.writer.Write("null");
else
{
this.writer.Write(v.ToString());
@ -724,34 +651,12 @@ namespace Microsoft.StreamProcessing
return s;
}
//
// Summary:
// Visits the System.Linq.Expressions.DebugInfoExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitDebugInfo(DebugInfoExpression node)
{
this.writer.Write(node.ToString());
return base.VisitDebugInfo(node);
}
//
// Summary:
// Visits the System.Linq.Expressions.DefaultExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitDefault(DefaultExpression node)
{
this.writer.Write(node.ToString());
@ -759,17 +664,6 @@ namespace Microsoft.StreamProcessing
}
#if !DOTNETCORE
//
// Summary:
// Visits the children of the System.Linq.Expressions.DynamicExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitDynamic(DynamicExpression node)
{
this.writer.Write(node.ToString());
@ -777,51 +671,18 @@ namespace Microsoft.StreamProcessing
}
#endif
//
// Summary:
// Visits the children of the System.Linq.Expressions.ElementInit.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override ElementInit VisitElementInit(ElementInit node)
{
this.writer.Write(node.ToString());
return base.VisitElementInit(node);
}
//
// Summary:
// Visits the children of the extension expression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitExtension(Expression node)
{
this.writer.Write(node.ToString());
return base.VisitExtension(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.GotoExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitGoto(GotoExpression node)
{
if (node.Kind == GotoExpressionKind.Break)
@ -832,17 +693,6 @@ namespace Microsoft.StreamProcessing
throw new NotImplementedException();
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.IndexExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitIndex(IndexExpression node)
{
Visit(node.Object);
@ -859,17 +709,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.InvocationExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitInvocation(InvocationExpression node)
{
var inlinedLambda = ParameterSubstituter.InlineInvocation(node);
@ -887,55 +726,18 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.LabelExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitLabel(LabelExpression node)
{
this.writer.Write(node.ToString());
return base.VisitLabel(node);
}
//
// Summary:
// Visits the System.Linq.Expressions.LabelTarget.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override LabelTarget VisitLabelTarget(LabelTarget node)
{
this.writer.Write(node.Name);
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.Expression<TDelegate>.
//
// Parameters:
// node:
// The expression to visit.
//
// Type parameters:
// T:
// The type of the delegate.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitLambda<T>(Expression<T> node)
{
this.writer.Write("(");
@ -960,17 +762,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.ListInitExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitListInit(ListInitExpression node)
{
Visit(node.NewExpression);
@ -989,17 +780,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.LoopExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitLoop(LoopExpression node)
{
this.writer.Write("while (true) {");
@ -1013,17 +793,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MemberExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitMember(MemberExpression node)
{
if (node.Expression != null) Visit(node.Expression);
@ -1034,17 +803,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MemberAssignment.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override MemberAssignment VisitMemberAssignment(MemberAssignment node)
{
this.writer.Write(node.Member.Name);
@ -1053,31 +811,9 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MemberBinding.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override MemberBinding VisitMemberBinding(MemberBinding node)
=> base.VisitMemberBinding(node); // taken care of by VisitMemberAssignment, but what if there are other subtypes of MemberBinding?
//
// Summary:
// Visits the children of the System.Linq.Expressions.MemberInitExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitMemberInit(MemberInitExpression node)
{
Visit(node.NewExpression);
@ -1094,51 +830,18 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MemberListBinding.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override MemberListBinding VisitMemberListBinding(MemberListBinding node)
{
this.writer.Write(node.ToString());
return base.VisitMemberListBinding(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MemberMemberBinding.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override MemberMemberBinding VisitMemberMemberBinding(MemberMemberBinding node)
{
this.writer.Write(node.ToString());
return base.VisitMemberMemberBinding(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MethodCallExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitMethodCall(MethodCallExpression node)
{
var isIndexer = node.Method.IsSpecialName && node.Method.Name.Equals("get_Item");
@ -1185,17 +888,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.NewExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitNew(NewExpression node)
{
this.writer.Write("new ");
@ -1228,17 +920,6 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.NewArrayExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitNewArray(NewArrayExpression node)
{
this.writer.Write("new {0} {{ ", GetTypeName(node.Type));
@ -1250,119 +931,42 @@ namespace Microsoft.StreamProcessing
return null;
}
//
// Summary:
// Visits the System.Linq.Expressions.ParameterExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitParameter(ParameterExpression node)
{
this.writer.Write(node.ToString());
return base.VisitParameter(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.RuntimeVariablesExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitRuntimeVariables(RuntimeVariablesExpression node)
{
this.writer.Write(node.ToString());
return base.VisitRuntimeVariables(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.SwitchExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitSwitch(SwitchExpression node)
{
this.writer.Write(node.ToString());
return base.VisitSwitch(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.SwitchCase.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override SwitchCase VisitSwitchCase(SwitchCase node)
{
this.writer.Write(node.ToString());
return base.VisitSwitchCase(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.TryExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitTry(TryExpression node)
{
this.writer.Write(node.ToString());
return base.VisitTry(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.TypeBinaryExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitTypeBinary(TypeBinaryExpression node)
{
this.writer.Write(node.ToString());
return base.VisitTypeBinary(node);
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.UnaryExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitUnary(UnaryExpression node)
{
this.writer.Write("(");
@ -1793,7 +1397,7 @@ namespace Microsoft.StreamProcessing
protected override Expression VisitParameter(ParameterExpression node)
{
if (this.parameterTableForAtomicTypes.TryGetValue(node, out ParameterInformation parameterInfo))
if (this.parameterTableForAtomicTypes.TryGetValue(node, out var parameterInfo))
{
var arrayAccess = Expression.ArrayAccess(parameterInfo.ArrayVariable, parameterInfo.IndexVariable);
return arrayAccess;
@ -1866,7 +1470,7 @@ namespace Microsoft.StreamProcessing
var m = node.Member;
if (node.Expression is ParameterExpression parameter)
{
if (this.parameterTableForDecomposableTypes.TryGetValue(Tuple.Create(parameter, m.Name), out ParameterInformation parameterInfo))
if (this.parameterTableForDecomposableTypes.TryGetValue(Tuple.Create(parameter, m.Name), out var parameterInfo))
return MakeIndexedAccessExpression(parameterInfo);
if (this.parameterTableForAtomicTypes.TryGetValue(parameter, out parameterInfo))
throw new InvalidOperationException();
@ -2065,17 +1669,7 @@ namespace Microsoft.StreamProcessing
me.Visit(function.Body);
return me.foundInstance;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.MethodCallExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitMethodCall(MethodCallExpression node)
{
if (node.Object != null)
@ -2090,17 +1684,7 @@ namespace Microsoft.StreamProcessing
}
return node;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.BinaryExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitBinary(BinaryExpression node)
{
if (node.Left is ParameterExpression p && this.parameters.Contains(p)) this.foundInstance = true;
@ -2112,17 +1696,7 @@ namespace Microsoft.StreamProcessing
return node;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.UnaryExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitUnary(UnaryExpression node)
{
if (node.Operand is ParameterExpression p && this.parameters.Contains(p)) this.foundInstance = true;
@ -2130,17 +1704,7 @@ namespace Microsoft.StreamProcessing
return node;
}
//
// Summary:
// Visits the children of the System.Linq.Expressions.ConditionalExpression.
//
// Parameters:
// node:
// The expression to visit.
//
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override Expression VisitConditional(ConditionalExpression node)
{
if (node.Test is ParameterExpression p && this.parameters.Contains(p)) this.foundInstance = true;
@ -2156,6 +1720,5 @@ namespace Microsoft.StreamProcessing
return node;
}
}
}

Просмотреть файл

@ -181,9 +181,7 @@ namespace Microsoft.StreamProcessing
nameForIndexVariable = indexVariableName,
typeOfBatchVariable = batchType,
});
var l_prime = ColumnOriented.Transform(f, d);
if (l_prime == null) return null;
return l_prime;
return ColumnOriented.Transform(f, d);
}
public static string AccessExpressionForRowValue(this MyFieldInfo f, string batchVariableName, string indexVariableName)

Просмотреть файл

@ -56,19 +56,19 @@ namespace Microsoft.StreamProcessing
public static bool CanBeKnownTypeOf(this Type type, Type baseType)
{
TypeInfo typeInfo = type.GetTypeInfo();
TypeInfo baseTypeInfo = baseType.GetTypeInfo();
var typeInfo = type.GetTypeInfo();
var baseTypeInfo = baseType.GetTypeInfo();
return !typeInfo.IsAbstract
&& !type.IsUnsupported()
&& (typeInfo.IsSubclassOf(baseType)
|| type == baseType
|| (baseTypeInfo.IsInterface && baseTypeInfo.IsAssignableFrom(type))
|| (baseTypeInfo.IsGenericType && baseTypeInfo.IsInterface && baseType.GenericIsAssignable(type)
&& typeInfo.GetGenericArguments()
.Zip(baseTypeInfo.GetGenericArguments(), (type1, type2) => new Tuple<Type, Type>(type1, type2))
.ToList()
.TrueForAll(tuple => CanBeKnownTypeOf(tuple.Item1, tuple.Item2))));
&& !type.IsUnsupported()
&& (typeInfo.IsSubclassOf(baseType)
|| type == baseType
|| (baseTypeInfo.IsInterface && baseTypeInfo.IsAssignableFrom(type))
|| (baseTypeInfo.IsGenericType && baseTypeInfo.IsInterface && baseType.GenericIsAssignable(type)
&& typeInfo.GetGenericArguments()
.Zip(baseTypeInfo.GetGenericArguments(), (type1, type2) => new Tuple<Type, Type>(type1, type2))
.ToList()
.TrueForAll(tuple => CanBeKnownTypeOf(tuple.Item1, tuple.Item2))));
}
private static bool GenericIsAssignable(this Type type, Type instanceType)
@ -592,7 +592,7 @@ namespace Microsoft.StreamProcessing
// Only instance fields (including field-like events) affect the outcome.
if (field.IsStatic) continue;
Type fieldType = field.FieldType;
var fieldType = field.FieldType;
switch (IsManagedTypeHelper(fieldType))
{
case true:
@ -625,7 +625,6 @@ namespace Microsoft.StreamProcessing
return true;
}
#endregion
/// <summary>
@ -649,14 +648,14 @@ namespace Microsoft.StreamProcessing
|| type == typeof(object)
|| type.GetTypeInfo().ContainsGenericParameters
|| (!type.IsArray
&& !type.GetTypeInfo().IsValueType
&& !type.HasSupportedParameterizedConstructor()
&& !type.HasParameterlessConstructor()
&& type != typeof(string)
&& type != typeof(Uri)
&& !type.GetTypeInfo().IsAbstract
&& !type.GetTypeInfo().IsInterface
&& !(type.GetTypeInfo().IsGenericType && SupportedInterfaces.Contains(type.GetGenericTypeDefinition())));
&& !type.GetTypeInfo().IsValueType
&& !type.HasSupportedParameterizedConstructor()
&& !type.HasParameterlessConstructor()
&& type != typeof(string)
&& type != typeof(Uri)
&& !type.GetTypeInfo().IsAbstract
&& !type.GetTypeInfo().IsInterface
&& !(type.GetTypeInfo().IsGenericType && SupportedInterfaces.Contains(type.GetGenericTypeDefinition())));
private static readonly HashSet<Type> SupportedInterfaces = new HashSet<Type>
{
@ -677,10 +676,7 @@ namespace Microsoft.StreamProcessing
Contract.EndContractBlock();
if (type.IsUnsupported())
{
throw new SerializationException(
string.Format(CultureInfo.InvariantCulture, "Type '{0}' is not supported by the resolver.", type));
}
throw new SerializationException($"Type '{type}' is not supported by the resolver.");
return type;
}

Просмотреть файл

@ -934,10 +934,7 @@ namespace SimpleTesting
return this;
}
public void Dispose()
{
this.observer = null;
}
public void Dispose() => this.observer = null;
internal void Checkpoint()
{
@ -948,15 +945,9 @@ namespace SimpleTesting
}
}
internal void SendEvent(StreamEvent<InputEvent> evt)
{
this.observer.OnNext(evt);
}
internal void SendEvent(StreamEvent<InputEvent> evt) => this.observer.OnNext(evt);
internal void Flush()
{
this.observer.OnCompleted();
}
internal void Flush() => this.observer.OnCompleted();
private IStreamable<Empty, OutputEvent> Query(IObservableIngressStreamable<InputEvent> input)
{
@ -981,14 +972,13 @@ namespace SimpleTesting
string clusterName = (i % 15).ToString();
var evt = new InputEvent() { ClusterName = clusterName, IntValue = 0, };
DateTime start = startTime.AddMilliseconds(i * 111); // we want to cover about one hour with about 50,000 events --> 111 milliseconds
DateTime end = start.AddMinutes(60);
var start = startTime.AddMilliseconds(i * 111); // we want to cover about one hour with about 50,000 events --> 111 milliseconds
var end = start.AddMinutes(60);
bool isPunc = i % 5000 == 0;
if (isPunc)
yield return StreamEvent.CreatePunctuation<InputEvent>(start.Ticks);
else
yield return StreamEvent.CreateInterval(start.Ticks, end.Ticks, evt);
yield return isPunc
? StreamEvent.CreatePunctuation<InputEvent>(start.Ticks)
: StreamEvent.CreateInterval(start.Ticks, end.Ticks, evt);
}
}
@ -1135,10 +1125,9 @@ namespace SimpleTesting
public Expression<Func<SomeClass, int>> GetGetHashCodeExpr()
=> sc => sc.Timestamp.GetHashCode() ^ sc.CycleStart.GetHashCode() ^ sc.BC.GetHashCode();
}
private static long GetDateTimeTicks(int year, int month, int day, int hours, int minutes, int seconds)
{
return new DateTime(year, month, day, hours, minutes, seconds).Ticks;
}
=> new DateTime(year, month, day, hours, minutes, seconds).Ticks;
[TestMethod, TestCategory("Gated")]
public void What_is_wrong()
@ -1249,20 +1238,14 @@ namespace SimpleTesting
{
public DateTime Timestamp;
public Guid SessionID;
public override string ToString()
{
return new { this.Timestamp, this.SessionID }.ToString();
}
public override string ToString() => new { this.Timestamp, this.SessionID }.ToString();
}
public class UlsSessionBootFinishedEvent
{
public DateTime Timestamp;
public Guid SessionID;
public override string ToString()
{
return new { this.Timestamp, this.SessionID }.ToString();
}
public override string ToString() => new { this.Timestamp, this.SessionID }.ToString();
}
[TestMethod]
@ -1283,17 +1266,13 @@ namespace SimpleTesting
var inputStreamStarts =
inputDataStarts
.Select(
e =>
StreamEvent.CreatePoint(e.Timestamp.Ticks, e))
.Select(e => StreamEvent.CreatePoint(e.Timestamp.Ticks, e))
.ToObservable()
.ToStreamable();
var inputStreamFinishes =
inputDataFinishes
.Select(
e =>
StreamEvent.CreatePoint(e.Timestamp.Ticks, e))
.Select(e => StreamEvent.CreatePoint(e.Timestamp.Ticks, e))
.ToObservable()
.ToStreamable();
@ -1404,20 +1383,12 @@ namespace SimpleTesting
if (!(obj is ClassWithAutoProps other)) return false;
return this.IntAutoProp == other.IntAutoProp && this.IntField == other.IntField;
}
public override int GetHashCode()
{
return this.IntAutoProp.GetHashCode() ^ this.IntField.GetHashCode();
}
public override int GetHashCode() => this.IntAutoProp.GetHashCode() ^ this.IntField.GetHashCode();
public Expression<Func<ClassWithAutoProps, ClassWithAutoProps, bool>> GetEqualsExpr()
{
return (c1, c2) => c1.IntAutoProp == c2.IntAutoProp && c1.IntField == c2.IntField;
}
=> (c1, c2) => c1.IntAutoProp == c2.IntAutoProp && c1.IntField == c2.IntField;
public Expression<Func<ClassWithAutoProps, int>> GetGetHashCodeExpr()
{
return c => c.IntAutoProp ^ c.IntField;
}
public Expression<Func<ClassWithAutoProps, int>> GetGetHashCodeExpr() => c => c.IntAutoProp ^ c.IntField;
}
[TestMethod, TestCategory("Gated")]
@ -1538,10 +1509,7 @@ namespace SimpleTesting
public override bool Equals(object obj) => obj is ClassOverridingEquals other && other.x == this.x;
public override int GetHashCode()
{
return this.x.GetHashCode();
}
public override int GetHashCode() => this.x.GetHashCode();
}
/// <summary>
@ -1599,10 +1567,7 @@ namespace SimpleTesting
if (!(obj is Basetype other)) return false;
return other.x == this.x;
}
public override int GetHashCode()
{
return this.x.GetHashCode();
}
public override int GetHashCode() => this.x.GetHashCode();
}
public class Subtype : Basetype
@ -1610,10 +1575,7 @@ namespace SimpleTesting
public int fieldOfSubtype;
public override bool Equals(object obj) => obj is Subtype other && this.fieldOfSubtype == other.fieldOfSubtype && base.Equals(obj);
public override int GetHashCode()
{
return this.fieldOfSubtype.GetHashCode() ^ base.GetHashCode();
}
public override int GetHashCode() => this.fieldOfSubtype.GetHashCode() ^ base.GetHashCode();
}
[TestMethod, TestCategory("Gated")]
@ -1650,7 +1612,6 @@ namespace SimpleTesting
{
if (!a[j].Equals(b[j])) return false;
}
return true;
}
@ -1690,10 +1651,7 @@ namespace SimpleTesting
}
[TestMethod, TestCategory("Gated")]
public void WhereWithNonColumnar()
{
Enumerable.Range(0, 100).Select(i => new NonColumnarClass(i, 'x')).TestWhere(r => r.x > 3);
}
public void WhereWithNonColumnar() => Enumerable.Range(0, 100).Select(i => new NonColumnarClass(i, 'x')).TestWhere(r => r.x > 3);
// Total number of input events
private const int NumEvents = 100;
@ -1701,10 +1659,7 @@ namespace SimpleTesting
.Select(e =>
new MyStruct2 { field1 = e, field2 = new MyString(Convert.ToString("string" + e)), field3 = new NestedStruct { nestedField = e } });
private void TestWhere(Expression<Func<MyStruct2, bool>> predicate)
{
Assert.IsTrue(this.enumerable.TestWhere<MyStruct2>(predicate));
}
private void TestWhere(Expression<Func<MyStruct2, bool>> predicate) => Assert.IsTrue(this.enumerable.TestWhere(predicate));
[TestMethod, TestCategory("Gated")]
public void SelectWhereAnonymousTypeWithColToRow()

Просмотреть файл

@ -350,8 +350,7 @@ namespace SimpleTesting
}.ToObservable().ToStreamable().AlterEventDuration(1000);
var result =
source1
.DefinePattern()
.SetRegister(0) // or .SetRegister<int>()
.DefinePattern(0)
.SingleElement(e => e.Field1 == "A")
.KleeneStar(r => r.SingleElement(e => e.Field1 == "B", (ev, d) => d + ev.Field2))
.SingleElement(e => e.Field1 == "C")
@ -383,8 +382,7 @@ namespace SimpleTesting
}.ToObservable().ToStreamable().AlterEventDuration(1000);
var result =
source1
.DefinePattern()
.SetRegister(10) // or .SetRegister<int>()
.DefinePattern(10)
.SingleElement(e => e.Field1 == "A", (l, p, i) => i + p.Field2)
.SingleElement(e => e.Field1 == "C", (l, p, i) => i + p.Field2)
.Detect()

Просмотреть файл

@ -144,7 +144,6 @@ namespace SimpleTesting
}
}
[TestClass]
public class CheckpointRestoreTestsRow : TestWithConfigSettingsWithoutMemoryLeakDetection
{
@ -156,12 +155,10 @@ namespace SimpleTesting
{ }
private static IStreamable<Empty, int> CreateBasicQuery(IStreamable<Empty, int> input)
{
return input
=> input
.Where(e => e % 2 == 1)
.AlterEventLifetime(e => e + 1, StreamEvent.InfinitySyncTime)
.Select(e => e / 2);
}
[TestMethod, TestCategory("Gated")]
public void BasicIngressDiagnosticRow()
@ -1630,7 +1627,6 @@ namespace SimpleTesting
}
}
[TestClass]
public class CheckpointRestoreTestsRowSmallBatch : TestWithConfigSettingsWithoutMemoryLeakDetection
{
@ -1643,12 +1639,10 @@ namespace SimpleTesting
{ }
private static IStreamable<Empty, int> CreateBasicQuery(IStreamable<Empty, int> input)
{
return input
=> input
.Where(e => e % 2 == 1)
.AlterEventLifetime(e => e + 1, StreamEvent.InfinitySyncTime)
.Select(e => e / 2);
}
[TestMethod, TestCategory("Gated")]
public void BasicIngressDiagnosticRowSmallBatch()
@ -3116,7 +3110,6 @@ namespace SimpleTesting
}
}
[TestClass]
public class CheckpointRestoreTestsColumnar : TestWithConfigSettingsWithoutMemoryLeakDetection
{
@ -3128,12 +3121,10 @@ namespace SimpleTesting
{ }
private static IStreamable<Empty, int> CreateBasicQuery(IStreamable<Empty, int> input)
{
return input
=> input
.Where(e => e % 2 == 1)
.AlterEventLifetime(e => e + 1, StreamEvent.InfinitySyncTime)
.Select(e => e / 2);
}
[TestMethod, TestCategory("Gated")]
public void BasicIngressDiagnosticColumnar()
@ -4602,7 +4593,6 @@ namespace SimpleTesting
}
}
[TestClass]
public class CheckpointRestoreTestsColumnarSmallBatch : TestWithConfigSettingsWithoutMemoryLeakDetection
{
@ -4615,12 +4605,10 @@ namespace SimpleTesting
{ }
private static IStreamable<Empty, int> CreateBasicQuery(IStreamable<Empty, int> input)
{
return input
=> input
.Where(e => e % 2 == 1)
.AlterEventLifetime(e => e + 1, StreamEvent.InfinitySyncTime)
.Select(e => e / 2);
}
[TestMethod, TestCategory("Gated")]
public void BasicIngressDiagnosticColumnarSmallBatch()

Просмотреть файл

@ -168,7 +168,6 @@ foreach (var batch in new [] { string.Empty, "SmallBatch" })
}
}
[TestClass]
public class CheckpointRestoreTests<#= suffix #> : TestWithConfigSettingsWithoutMemoryLeakDetection
{
@ -192,12 +191,10 @@ foreach (var batch in new [] { string.Empty, "SmallBatch" })
{ }
private static IStreamable<Empty, int> CreateBasicQuery(IStreamable<Empty, int> input)
{
return input
=> input
.Where(e => e % 2 == 1)
.AlterEventLifetime(e => e + 1, StreamEvent.InfinitySyncTime)
.Select(e => e / 2);
}
[TestMethod, TestCategory("Gated")]
public void BasicIngressDiagnostic<#= suffix #>()

Просмотреть файл

@ -159,6 +159,6 @@ namespace SimpleTesting.Flush
private int ingressCount = 0;
// Events that have already been validated (kept around for debugging purposes)
private List<StreamEvent<int>> validatedOutput = new List<StreamEvent<int>>();
private readonly List<StreamEvent<int>> validatedOutput = new List<StreamEvent<int>>();
}
}

Просмотреть файл

@ -8,7 +8,6 @@ using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;
using Microsoft.StreamProcessing;
using Microsoft.StreamProcessing.Internal;
using Microsoft.StreamProcessing.Internal.Collections;
using Microsoft.VisualStudio.TestTools.UnitTesting;

Просмотреть файл

@ -157,7 +157,7 @@ namespace SimpleTesting.PartitionedIngressAndEgress
}
// Determines whether the given watermark should reset the punctuation generation period
private bool LowWatermarkResetsPunctuationPeriod(long lowWatermark, long lowWatermarkArrivalTime, int lastCTI,
private static bool LowWatermarkResetsPunctuationPeriod(long lowWatermark, long lowWatermarkArrivalTime, int lastCTI,
long newPunctuationPreSnap, long newPunctuationArrivalTime, bool newPunctuationTriggeredByLowWatermark)
{
// If the watermark timestamp is before the last CTI, it won't reset anything.
@ -311,7 +311,7 @@ namespace SimpleTesting.PartitionedIngressAndEgress
AddEvent(1, 200);
AddEvent(2, 200);
ProcessInput(out List<OutOfOrderPartitionedStreamEvent<int, int>> diagnosticEvents, out List<PartitionedStreamEvent<int, int>> dataEvents);
ProcessInput(out var diagnosticEvents, out var dataEvents);
for (int key = 0; key < 3; key++)
{

Просмотреть файл

@ -17,18 +17,14 @@ namespace SimpleTesting.PartitionedIngressAndEgress.LaggingCleanup
{
public static IStreamable<PartitionKey<int>, double> GroupAggregateAverageTestRecord(
this IStreamable<PartitionKey<int>, LaggingPartitionCleanupTests.TestRecord> source)
{
return source.GroupAggregate(
=> source.GroupAggregate(
testRecord => testRecord.Key,
w => w.Average(testRecord => testRecord.Payload),
(key, testRecord) => testRecord);
}
public static IStreamable<PartitionKey<int>, double> AggregateAverageLong(
this IStreamable<PartitionKey<int>, long> source)
{
return source.Aggregate(w => w.Average(payload => payload));
}
=> source.Aggregate(w => w.Average(payload => payload));
}
// Tests scenarios where some partitions lag behind the low watermark to ensure appropriate and correct cleanup
@ -44,7 +40,6 @@ namespace SimpleTesting.PartitionedIngressAndEgress.LaggingCleanup
HoppingWindowWorker(
ingress => ingress.HoppingWindowLifetime(WindowSize, HopSize).GroupAggregateAverageTestRecord(), this.testRecordCreator);
[TestMethod, TestCategory("Gated")]
public void HoppingWindowSimple() =>
HoppingWindowWorker(
@ -322,13 +317,13 @@ namespace SimpleTesting.PartitionedIngressAndEgress.LaggingCleanup
private Task egress;
private Process process;
private PartitionedStreamEvent<int, TInput> CreateInputInterval<TInput>(int key, long startTime, Func<int, long, TInput> resultCreator) =>
private static PartitionedStreamEvent<int, TInput> CreateInputInterval<TInput>(int key, long startTime, Func<int, long, TInput> resultCreator) =>
PartitionedStreamEvent.CreateInterval(key, startTime, startTime + IntervalLength, resultCreator(key, startTime));
private PartitionedStreamEvent<int, double> CreateOutputStart(int key, long startTime, double average) =>
private static PartitionedStreamEvent<int, double> CreateOutputStart(int key, long startTime, double average) =>
PartitionedStreamEvent.CreateStart(key, startTime, average);
private PartitionedStreamEvent<int, double> CreateOutputEnd(int key, long endTime, long originalStart, double average) =>
private static PartitionedStreamEvent<int, double> CreateOutputEnd(int key, long endTime, long originalStart, double average) =>
PartitionedStreamEvent.CreateEnd(key, endTime, originalStart, average);
private PartitionedStreamEvent<int, double> CreateOutputInterval(int key, long startTime, long endTime, double average) =>
private static PartitionedStreamEvent<int, double> CreateOutputInterval(int key, long startTime, long endTime, double average) =>
PartitionedStreamEvent.CreateInterval(key, startTime, endTime, average);
public class TestRecord

Просмотреть файл

@ -108,15 +108,9 @@ namespace ComponentTesting.Aggregates
return 0;
}
public bool Equals(SumCount other)
{
return this.Sum == other.Sum && this.Count == other.Count;
}
public bool Equals(SumCount other) => this.Sum == other.Sum && this.Count == other.Count;
public override string ToString()
{
return "[Sum=" + this.Sum + ", Count=" + this.Count + "]";
}
public override string ToString() => "[Sum=" + this.Sum + ", Count=" + this.Count + "]";
}
}
}

Просмотреть файл

@ -168,10 +168,7 @@ namespace ComponentTesting.Serializer
}
// Note: for generic types compare with generic type definition.
if (TypesToSkip.Any(e => t.IsGenericType ? t.GetGenericTypeDefinition() == e : t == e))
return true;
return false;
return TypesToSkip.Any(e => t.IsGenericType ? t.GetGenericTypeDefinition() == e : t == e);
}
private static string DescribeGeneric(string name, IEnumerable<Type> args)