This commit is contained in:
James Terwilliger 2019-03-06 14:10:25 -08:00
Родитель 07f317796b
Коммит 0f8fc99686
56 изменённых файлов: 507 добавлений и 802 удалений

Просмотреть файл

@ -96,14 +96,14 @@ namespace Microsoft.StreamProcessing
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsFull() => (((this.tail + 1) & this.capacityMask) == this.head);
public bool IsFull() => ((this.tail + 1) & this.capacityMask) == this.head;
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => (this.head == this.tail);
public bool IsEmpty() => this.head == this.tail;
/// <summary>
/// Currently for internal use only - do not use directly.
@ -134,10 +134,9 @@ namespace Microsoft.StreamProcessing
[EditorBrowsable(EditorBrowsableState.Never)]
public sealed class ElasticCircularBuffer<T> : IEnumerable<T>
{
private LinkedList<CircularBuffer<T>> buffers;
private readonly LinkedList<CircularBuffer<T>> buffers = new LinkedList<CircularBuffer<T>>();
private LinkedListNode<CircularBuffer<T>> head;
private LinkedListNode<CircularBuffer<T>> tail;
private int count;
/// <summary>
/// Currently for internal use only - do not use directly.
@ -145,11 +144,10 @@ namespace Microsoft.StreamProcessing
[EditorBrowsable(EditorBrowsableState.Never)]
public ElasticCircularBuffer()
{
this.buffers = new LinkedList<CircularBuffer<T>>();
var node = new LinkedListNode<CircularBuffer<T>>(new CircularBuffer<T>());
this.buffers.AddFirst(node);
this.tail = this.head = node;
this.count = 0;
this.Count = 0;
}
/// <summary>
@ -174,7 +172,7 @@ namespace Microsoft.StreamProcessing
}
this.tail.Value.Enqueue(ref value);
this.count++;
this.Count++;
}
/// <summary>
@ -201,7 +199,7 @@ namespace Microsoft.StreamProcessing
if (this.head == null) this.head = this.buffers.First;
}
this.count--;
this.Count--;
return this.head.Value.Dequeue();
}
@ -241,27 +239,25 @@ namespace Microsoft.StreamProcessing
/// </summary>
/// <returns></returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => (this.head.Value.IsEmpty() && (this.head == this.tail));
private IEnumerator<T> Iterate()
{
foreach (CircularBuffer<T> buffer in this.buffers)
foreach (T item in buffer.Iterate())
yield return item;
}
public bool IsEmpty() => this.head.Value.IsEmpty() && (this.head == this.tail);
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public int Count => this.count;
public int Count { get; private set; }
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <returns></returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public IEnumerator<T> GetEnumerator() => Iterate();
public IEnumerator<T> GetEnumerator()
{
foreach (var buffer in this.buffers)
foreach (var item in buffer.Iterate())
yield return item;
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}

Просмотреть файл

@ -108,7 +108,7 @@ namespace Microsoft.StreamProcessing.Internal
}
else
{
pool.Get(out ColumnBatch<T> result);
pool.Get(out var result);
System.Array.Copy(this.col, result.col, this.col.Length);
result.UsedLength = this.UsedLength;
Interlocked.Decrement(ref this.RefCount);

Просмотреть файл

@ -54,21 +54,13 @@ namespace Microsoft.StreamProcessing.Internal.Collections
[EditorBrowsable(EditorBrowsableState.Never)]
public class ColumnPool<T> : ColumnPoolBase
{
private readonly ConcurrentQueue<ColumnBatch<T>> queue;
private readonly ConcurrentQueue<ColumnBatch<T>> queue = new ConcurrentQueue<ColumnBatch<T>>();
private long createdObjects;
private readonly int size;
internal ColumnPool()
{
this.queue = new ConcurrentQueue<ColumnBatch<T>>();
this.size = Config.DataBatchSize;
}
internal ColumnPool() => this.size = Config.DataBatchSize;
internal ColumnPool(int size)
{
this.queue = new ConcurrentQueue<ColumnBatch<T>>();
this.size = size;
}
internal ColumnPool(int size) => this.size = size;
/// <summary>
/// Currently for internal use only - do not use directly.
@ -124,7 +116,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
[EditorBrowsable(EditorBrowsableState.Never)]
public override void Free(bool reset = false)
{
while (this.queue.TryDequeue(out ColumnBatch<T> result))
while (this.queue.TryDequeue(out var result))
{
result.pool = null;
result.col = null;

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
[EditorBrowsable(EditorBrowsableState.Never)]
public sealed class DataStructurePool<T> : IDisposable where T : new()
{
private ConcurrentQueue<T> queue;
private readonly ConcurrentQueue<T> queue;
private readonly Func<T> creator;
/// <summary>

Просмотреть файл

@ -64,42 +64,37 @@ namespace Microsoft.StreamProcessing.Internal.Collections
public unsafe void Insert(long time, int value)
{
// If out of space in the stack, then grow.
if (this.count == this.capacity)
{
Grow();
}
if (this.count == this.capacity) Grow();
fixed (long* timeArray = this.times)
fixed (int* valueArray = this.values)
{
fixed (int* valueArray = this.values)
// Find location for new element in heap, default to end.
int insertPos = this.count;
this.count++;
// Loop while position 'pos' still has a parent.
while (insertPos > 0)
{
// Find location for new element in heap, default to end.
int insertPos = this.count;
this.count++;
// Loop while position 'pos' still has a parent.
while (insertPos > 0)
// Determine if position 'insertPos' would be consistent with its parent.
int parentPos = (insertPos - 1) >> 1;
long parentTime = *(timeArray + parentPos);
if (parentTime <= time)
{
// Determine if position 'insertPos' would be consistent with its parent.
int parentPos = (insertPos - 1) >> 1;
long parentTime = *(timeArray + parentPos);
if (parentTime <= time)
{
// Parent is <= time, so heap would be consistent and we are done.
break;
}
// Heap is not consistent, so move insertion point to location of
// parent and move parent to current 'insertPos'.
*(timeArray + insertPos) = parentTime;
*(valueArray + insertPos) = *(valueArray + parentPos);
insertPos = parentPos;
// Parent is <= time, so heap would be consistent and we are done.
break;
}
// Insert element into heap.
*(timeArray + insertPos) = time;
*(valueArray + insertPos) = value;
// Heap is not consistent, so move insertion point to location of
// parent and move parent to current 'insertPos'.
*(timeArray + insertPos) = parentTime;
*(valueArray + insertPos) = *(valueArray + parentPos);
insertPos = parentPos;
}
// Insert element into heap.
*(timeArray + insertPos) = time;
*(valueArray + insertPos) = value;
}
}

Просмотреть файл

@ -247,6 +247,33 @@ namespace Microsoft.StreamProcessing.Internal.Collections
}
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <param name="index"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public unsafe bool Iterate(ref int index)
{
fixed (int* hashAndNextArray = this.next)
{
int* hashNextPtr = hashAndNextArray + index;
while (index < this.initialized)
{
index++;
hashNextPtr++;
long currHashNext = *hashNextPtr;
int currNext = (int)currHashNext;
if (currNext >= 0) return true;
}
index = 0;
return false;
}
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
@ -373,64 +400,5 @@ namespace Microsoft.StreamProcessing.Internal.Collections
this.list.count--;
}
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public struct VisibleTraverser
{
private readonly FastLinkedList<T> list;
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public int currIndex;
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <param name="list"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public VisibleTraverser(FastLinkedList<T> list)
{
this.list = list;
this.currIndex = 0;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <param name="index"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public unsafe bool Next(out int index)
{
fixed (int* hashAndNextArray = this.list.next)
{
int* hashNextPtr = hashAndNextArray + this.currIndex;
int initialized = this.list.initialized;
while (this.currIndex < initialized)
{
this.currIndex++;
hashNextPtr++;
long currHashNext = *hashNextPtr;
int currNext = (int)currHashNext;
if (currNext >= 0)
{
index = this.currIndex;
return true;
}
}
index = 0;
return false;
}
}
}
}
}

Просмотреть файл

@ -62,26 +62,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections
get => this.count;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => this.count == 0;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public int Capacity
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => this.capacity;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
@ -108,53 +88,9 @@ namespace Microsoft.StreamProcessing.Internal.Collections
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public int Push(T value)
{
int index = Push();
this.values[index] = value;
return index;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public int Push(ref T value)
{
int index = Push();
this.values[index] = value;
return index;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public int Pop()
{
Contract.Assume(this.count > 0);
return --this.count;
}
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public void Clear()
{
this.count = 0;
}
public void Clear() => this.count = 0;
private void Grow()
{

Просмотреть файл

@ -18,8 +18,8 @@ namespace Microsoft.StreamProcessing
{
private static SafeConcurrentDictionary<ColumnPoolBase> doublingArrayPools = new SafeConcurrentDictionary<ColumnPoolBase>();
private static SafeConcurrentDictionary<ColumnPoolBase> columnPools = new SafeConcurrentDictionary<ColumnPoolBase>();
private static SafeConcurrentDictionary<ColumnPoolBase> charArrayPools = new SafeConcurrentDictionary<ColumnPoolBase>();
private static SafeConcurrentDictionary<ColumnPoolBase> bitvectorPools = new SafeConcurrentDictionary<ColumnPoolBase>();
private static SafeConcurrentDictionary<CharArrayPool> charArrayPools = new SafeConcurrentDictionary<CharArrayPool>();
private static SafeConcurrentDictionary<ColumnPool<long>> bitvectorPools = new SafeConcurrentDictionary<ColumnPool<long>>();
private static SafeConcurrentDictionary<ColumnPoolBase> eventBatchPools = new SafeConcurrentDictionary<ColumnPoolBase>();
private static SafeConcurrentDictionary<object> memoryPools = new SafeConcurrentDictionary<object>();
@ -46,10 +46,10 @@ namespace Microsoft.StreamProcessing
key => new ColumnPool<T>(size < 0 ? Config.DataBatchSize : size));
internal static CharArrayPool GetCharArrayPool()
=> (CharArrayPool)charArrayPools.GetOrAdd(CacheKey.Create(), key => new CharArrayPool());
=> charArrayPools.GetOrAdd(CacheKey.Create(), key => new CharArrayPool());
internal static ColumnPool<long> GetBVPool(int size)
=> (ColumnPool<long>)bitvectorPools.GetOrAdd(CacheKey.Create(size), key => new ColumnPool<long>(size));
=> bitvectorPools.GetOrAdd(CacheKey.Create(size), key => new ColumnPool<long>(size));
internal static StreamMessagePool<TKey, TPayload> GetStreamMessagePool<TKey, TPayload>(MemoryPool<TKey, TPayload> memoryPool, bool isColumnar)
=> (StreamMessagePool<TKey, TPayload>)eventBatchPools.GetOrAdd(
@ -85,7 +85,7 @@ namespace Microsoft.StreamProcessing
}
var lookupKey = CacheKey.Create(typeOfTKey, typeOfTPayload);
Type generatedMemoryPool = cachedMemoryPools.GetOrAdd(lookupKey, key => Transformer.GenerateMemoryPoolClass<TKey, TPayload>());
var generatedMemoryPool = cachedMemoryPools.GetOrAdd(lookupKey, key => Transformer.GenerateMemoryPoolClass<TKey, TPayload>());
return (MemoryPool<TKey, TPayload>)memoryPools.GetOrAdd(cacheKey, t => Activator.CreateInstance(generatedMemoryPool));
}

Просмотреть файл

@ -85,7 +85,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections
public int hash;
}
internal static class HashHelpers
{
public static readonly int[] primes = new int[] {
@ -99,11 +98,9 @@ namespace Microsoft.StreamProcessing.Internal.Collections
public static int ExpandPrime(int oldSize)
{
int min = 2 * oldSize;
if ((min > 0x7feffffd) && (oldSize < 0x7feffffd))
{
return 0x7feffffd;
}
return GetPrime(min);
return (min > 0x7feffffd) && (oldSize < 0x7feffffd)
? 0x7feffffd
: GetPrime(min);
}
public static int GetPrime(int min)
@ -124,7 +121,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
return min;
}
public static bool IsPrime(int candidate)
private static bool IsPrime(int candidate)
{
if ((candidate & 1) == 0)
{

Просмотреть файл

@ -54,10 +54,10 @@ namespace Microsoft.StreamProcessing.Internal.Collections
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsFull() => (((this.tail + 1) & this.DefaultCapacity) == this.head);
public bool IsFull() => ((this.tail + 1) & this.DefaultCapacity) == this.head;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsEmpty() => (this.head == this.tail);
public bool IsEmpty() => this.head == this.tail;
public IEnumerable<T> Iterate()
{
@ -88,7 +88,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
public sealed class PooledElasticCircularBuffer<T> : IEnumerable<T>, IDisposable
{
private const int Capacity = 0xff;
private LinkedList<PooledCircularBuffer<T>> buffers;
private readonly LinkedList<PooledCircularBuffer<T>> buffers = new LinkedList<PooledCircularBuffer<T>>();
private LinkedListNode<PooledCircularBuffer<T>> head;
private LinkedListNode<PooledCircularBuffer<T>> tail;
private readonly ColumnPool<T> pool;
@ -100,7 +100,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections
public PooledElasticCircularBuffer()
{
this.pool = MemoryManager.GetColumnPool<T>(Capacity + 1);
this.buffers = new LinkedList<PooledCircularBuffer<T>>();
var node = new LinkedListNode<PooledCircularBuffer<T>>(new PooledCircularBuffer<T>(Capacity, this.pool));
this.buffers.AddFirst(node);
this.tail = this.head = node;
@ -233,14 +232,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
/// </summary>
/// <returns></returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => (this.head.Value.IsEmpty() && (this.head == this.tail));
private IEnumerable<T> Iterate()
{
foreach (PooledCircularBuffer<T> buffer in this.buffers)
foreach (T item in buffer.Iterate())
yield return item;
}
public bool IsEmpty() => this.head.Value.IsEmpty() && (this.head == this.tail);
/// <summary>
/// Currently for internal use only - do not use directly.
@ -253,7 +245,12 @@ namespace Microsoft.StreamProcessing.Internal.Collections
/// </summary>
/// <returns></returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public IEnumerator<T> GetEnumerator() => Iterate().GetEnumerator();
public IEnumerator<T> GetEnumerator()
{
foreach (var buffer in this.buffers)
foreach (var item in buffer.Iterate())
yield return item;
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();

Просмотреть файл

@ -33,10 +33,7 @@ namespace Microsoft.StreamProcessing
/// </summary>
/// <param name="comp">The comparer to be used on elements added to the queue.</param>
[EditorBrowsable(EditorBrowsableState.Never)]
public PriorityQueue(IComparer<T> comp)
{
this.comp = comp;
}
public PriorityQueue(IComparer<T> comp) => this.comp = comp;
/// <summary>
/// Add a new item to the priority queue.
@ -63,7 +60,7 @@ namespace Microsoft.StreamProcessing
/// </summary>
/// <returns>True if the priority queue is empty.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => (this.data.Count == 0);
public bool IsEmpty() => this.data.Count == 0;
/// <summary>
/// Dequeue an element from the priority queue.
@ -74,7 +71,7 @@ namespace Microsoft.StreamProcessing
{
// assumes pq is not empty; up to calling code
int li = this.data.Count - 1; // last index (before removal)
T frontItem = this.data[0]; // fetch the front
var frontItem = this.data[0]; // fetch the front
this.data[0] = this.data[li];
this.data.RemoveAt(li);
@ -88,7 +85,7 @@ namespace Microsoft.StreamProcessing
if (rc <= li && this.comp.Compare(this.data[rc], this.data[ci]) < 0) // if there is a rc (ci + 1), and it is smaller than left child, use the rc instead
ci = rc;
if (this.comp.Compare(this.data[pi], this.data[ci]) <= 0) break; // parent is smaller than (or equal to) smallest child so done
T tmp = this.data[pi];
var tmp = this.data[pi];
this.data[pi] = this.data[ci];
this.data[ci] = tmp; // swap parent and child
pi = ci;
@ -105,11 +102,7 @@ namespace Microsoft.StreamProcessing
/// </summary>
/// <returns>The item next to be dequeued.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public T Peek()
{
T frontItem = this.data[0];
return frontItem;
}
public T Peek() => this.data[0];
/// <summary>
/// Returns the number of elements in the priority queue.

Просмотреть файл

@ -12,7 +12,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
{
/// <summary>
/// A dictionary that supports concurrency with similar interface to .NET's ConcurrentDictionary.
/// However, this dictionary changes the implementation of AddOrUpdate and GetOrAdd functions to
/// However, this dictionary changes the implementation and GetOrAdd functions to
/// guarantee atomicity per-key for factory lambdas.
/// </summary>
/// <typeparam name="TValue">Type of values in the dictionary</typeparam>

Просмотреть файл

@ -484,27 +484,25 @@ namespace Microsoft.StreamProcessing
var localOffset = offset;
fixed (long* vsync = this.vsync.col)
fixed (long* vother = this.vother.col)
{
fixed (long* vother = this.vother.col)
while ((count < Config.DataBatchSize) && (localOffset < n))
{
while ((count < Config.DataBatchSize) && (localOffset < n))
var partition = partitionExtractor(largeBatch.Array[localOffset]);
var start = startEdgeExtractor(largeBatch.Array[localOffset]);
if (currentTime.ContainsKey(partition) && start < currentTime[partition])
{
var partition = partitionExtractor(largeBatch.Array[localOffset]);
var start = startEdgeExtractor(largeBatch.Array[localOffset]);
if (currentTime.ContainsKey(partition) && start < currentTime[partition])
{
throw new IngressException("Out-of-order event encountered during ingress, under a disorder policy of Throw");
}
currentTime[partition] = start;
vsync[count] = start;
vother[count] = endEdgeExtractor(largeBatch.Array[localOffset]);
this.payload.col[count] = largeBatch.Array[localOffset];
var p = partitionConstructor(partition);
this.key.col[count] = p;
this.hash.col[count] = HashCode(p);
localOffset++;
count++;
throw new IngressException("Out-of-order event encountered during ingress, under a disorder policy of Throw");
}
currentTime[partition] = start;
vsync[count] = start;
vother[count] = endEdgeExtractor(largeBatch.Array[localOffset]);
this.payload.col[count] = largeBatch.Array[localOffset];
var p = partitionConstructor(partition);
this.key.col[count] = p;
this.hash.col[count] = HashCode(p);
localOffset++;
count++;
}
}
@ -533,18 +531,16 @@ namespace Microsoft.StreamProcessing
var localOffset = offset;
fixed (long* vsync = this.vsync.col)
fixed (long* vother = this.vother.col)
{
fixed (long* vother = this.vother.col)
while ((count < Config.DataBatchSize) && (localOffset < n))
{
while ((count < Config.DataBatchSize) && (localOffset < n))
{
currentTime = DateTimeOffset.UtcNow.Ticks;
vsync[count] = currentTime;
vother[count] = StreamEvent.InfinitySyncTime;
this.payload.col[count] = largeBatch.Array[localOffset];
localOffset++;
count++;
}
currentTime = DateTimeOffset.UtcNow.Ticks;
vsync[count] = currentTime;
vother[count] = StreamEvent.InfinitySyncTime;
this.payload.col[count] = largeBatch.Array[localOffset];
localOffset++;
count++;
}
}
@ -582,26 +578,24 @@ namespace Microsoft.StreamProcessing
encounteredPunctuation = false; // let's be optimistic!
fixed (long* vsync = this.vsync.col)
fixed (long* vother = this.vother.col)
{
fixed (long* vother = this.vother.col)
while ((count < Config.DataBatchSize) && (localOffset < n))
{
while ((count < Config.DataBatchSize) && (localOffset < n))
{
currentTime = currentSync;
vsync[count] = currentTime;
vother[count] = StreamEvent.InfinitySyncTime;
this.payload.col[count] = largeBatch.Array[localOffset];
localOffset++;
count++;
eventCount++;
currentTime = currentSync;
vsync[count] = currentTime;
vother[count] = StreamEvent.InfinitySyncTime;
this.payload.col[count] = largeBatch.Array[localOffset];
localOffset++;
count++;
eventCount++;
if (eventCount == eventsPerSample)
{
eventCount = 0;
currentSync++;
encounteredPunctuation = true;
break;
}
if (eventCount == eventsPerSample)
{
eventCount = 0;
currentSync++;
encounteredPunctuation = true;
break;
}
}
}
@ -645,7 +639,7 @@ namespace Microsoft.StreamProcessing
if (this.key != null) this.key.col[this.Count] = default;
if (this.payload != null) this.payload.col[this.Count] = default;
this.hash.col[this.Count] = 0;
this.bitvector.col[this.Count >> 6] |= (1L << (this.Count & 0x3f));
this.bitvector.col[this.Count >> 6] |= 1L << (this.Count & 0x3f);
this.Count++;
return this.Count == this.vsync.col.Length;
@ -864,10 +858,7 @@ namespace Microsoft.StreamProcessing
this.Count = value.Count;
this.iter = value.iter;
if (swing)
{
return;
}
if (swing) return;
value.vsync.IncrementRefCount(1);
value.vother.IncrementRefCount(1);
@ -954,7 +945,7 @@ namespace Microsoft.StreamProcessing
}
}
return (endIndex - startIndex + 1) - hammingWeight;
return endIndex - startIndex + 1 - hammingWeight;
}
}
}

Просмотреть файл

@ -31,9 +31,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
}
public override string GetStatusReport()
{
return string.Format(CultureInfo.InvariantCulture, "[{0}] Objects Created - {1,5} - Queue Size - {2,5}\t<{3},{4}>", this.createdObjects == this.batchQueue.Count ? " " : "X", this.createdObjects, this.batchQueue.Count, typeof(TKey).GetCSharpSourceSyntax(), typeof(TPayload).GetCSharpSourceSyntax());
}
=> string.Format(CultureInfo.InvariantCulture, "[{0}] Objects Created - {1,5} - Queue Size - {2,5}\t<{3},{4}>", this.createdObjects == this.batchQueue.Count ? " " : "X", this.createdObjects, this.batchQueue.Count, typeof(TKey).GetCSharpSourceSyntax(), typeof(TPayload).GetCSharpSourceSyntax());
public override ColumnPoolBase Leaked => this.createdObjects != this.batchQueue.Count ? this : null;
@ -72,4 +70,4 @@ namespace Microsoft.StreamProcessing.Internal.Collections
}
}
}
#endif
#endif

Просмотреть файл

@ -56,10 +56,8 @@ namespace Microsoft.StreamProcessing.Internal
/// <param name="previous"></param>
[EditorBrowsable(EditorBrowsableState.Never)]
public virtual void ProduceQueryPlan(PlanNode previous)
{
this.activeProcess.RegisterQueryPlan(this.identifier, new EgressPlanNode(
=> this.activeProcess.RegisterQueryPlan(this.identifier, new EgressPlanNode(
previous, this, typeof(TKey), typeof(TPayload), false, null));
}
/// <summary>
/// Currently for internal use only - do not use directly.

Просмотреть файл

@ -105,12 +105,12 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "StartEdge", startEdgeObservable.source.Properties.IsColumnar);
if (startEdgeObservable.constructor != null)
template.startEdgeFunction = ((x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.startEdgeFunction = (x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ startEdgeObservable.constructor.Parameters[0], x },
{ startEdgeObservable.constructor.Parameters[1], y },
}));
});
var keyType = typeof(Empty);
var expandedCode = template.TransformText();
@ -139,13 +139,13 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "Interval", intervalObservable.source.Properties.IsColumnar);
if (intervalObservable.constructor != null)
template.intervalFunction = ((x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.intervalFunction = (x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ intervalObservable.constructor.Parameters[0], x },
{ intervalObservable.constructor.Parameters[1], y },
{ intervalObservable.constructor.Parameters[2], z },
}));
});
var keyType = typeof(Empty);
var expandedCode = template.TransformText();
@ -202,13 +202,13 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "StartEdge", partitionedStartEdgeObservable.source.Properties.IsColumnar);
if (partitionedStartEdgeObservable.constructor != null)
template.startEdgeFunction = ((x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.startEdgeFunction = (x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ partitionedStartEdgeObservable.constructor.Parameters[0], "colkey[i].Key" },
{ partitionedStartEdgeObservable.constructor.Parameters[0], x },
{ partitionedStartEdgeObservable.constructor.Parameters[1], y },
}));
});
var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey));
var expandedCode = template.TransformText();
@ -237,14 +237,14 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "Interval", partitionedIntervalObservable.source.Properties.IsColumnar);
if (partitionedIntervalObservable.constructor != null)
template.intervalFunction = ((x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.intervalFunction = (x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ partitionedIntervalObservable.constructor.Parameters[0], "colkey[i].Key" },
{ partitionedIntervalObservable.constructor.Parameters[0], x },
{ partitionedIntervalObservable.constructor.Parameters[1], y },
{ partitionedIntervalObservable.constructor.Parameters[2], z },
}));
});
var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey));
var expandedCode = template.TransformText();

Просмотреть файл

@ -120,12 +120,12 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalArrayEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "StartEdge", startEdgeObservable.source.Properties.IsColumnar);
if (startEdgeObservable.constructor != null)
template.startEdgeFunction = ((x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.startEdgeFunction = (x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ startEdgeObservable.constructor.Parameters[0], x },
{ startEdgeObservable.constructor.Parameters[1], y },
}));
});
var keyType = typeof(Empty);
var expandedCode = template.TransformText();
@ -165,13 +165,13 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalArrayEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "Interval", intervalObservable.source.Properties.IsColumnar);
if (intervalObservable.constructor != null)
template.intervalFunction = ((x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.intervalFunction = (x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ intervalObservable.constructor.Parameters[0], x },
{ intervalObservable.constructor.Parameters[1], y },
{ intervalObservable.constructor.Parameters[2], z },
}));
});
var keyType = typeof(Empty);
var expandedCode = template.TransformText();
@ -250,13 +250,13 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalArrayEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "StartEdge", partitionedStartEdgeObservable.source.Properties.IsColumnar);
if (partitionedStartEdgeObservable.constructor != null)
template.startEdgeFunction = ((x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.startEdgeFunction = (x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ partitionedStartEdgeObservable.constructor.Parameters[0], "colkey[i].Key" },
{ partitionedStartEdgeObservable.constructor.Parameters[0], x },
{ partitionedStartEdgeObservable.constructor.Parameters[1], y },
}));
});
var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey));
var expandedCode = template.TransformText();
@ -296,14 +296,14 @@ namespace Microsoft.StreamProcessing
{
var template = new TemporalArrayEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "Interval", partitionedIntervalObservable.source.Properties.IsColumnar);
if (partitionedIntervalObservable.constructor != null)
template.intervalFunction = ((x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
template.intervalFunction = (x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ partitionedIntervalObservable.constructor.Parameters[0], "colkey[i].Key" },
{ partitionedIntervalObservable.constructor.Parameters[0], x },
{ partitionedIntervalObservable.constructor.Parameters[1], y },
{ partitionedIntervalObservable.constructor.Parameters[2], z },
}));
});
var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey));
var expandedCode = template.TransformText();

Просмотреть файл

@ -54,19 +54,13 @@ namespace Microsoft.StreamProcessing
public override IDisposable Subscribe(IStreamObserver<Empty, TPayload> observer)
{
Contract.EnsuresOnThrow<IngressException>(true);
IIngressStreamObserver subscription;
if (this.timelinePolicy.timelineEnum == TimelineEnum.WallClock)
{
subscription = new MonotonicSubscriptionWallClock<TPayload>(this.observable, this.IngressSiteIdentifier,
var subscription = this.timelinePolicy.timelineEnum == TimelineEnum.WallClock
? new MonotonicSubscriptionWallClock<TPayload>(this.observable, this.IngressSiteIdentifier,
this,
observer, this.flushPolicy, this.punctuationPolicy, this.onCompletedPolicy, this.timelinePolicy)
: (IIngressStreamObserver)new MonotonicSubscriptionSequence<TPayload>(this.observable, this.IngressSiteIdentifier,
this,
observer, this.flushPolicy, this.punctuationPolicy, this.onCompletedPolicy, this.timelinePolicy);
}
else
{
subscription = new MonotonicSubscriptionSequence<TPayload>(this.observable, this.IngressSiteIdentifier,
this,
observer, this.flushPolicy, this.punctuationPolicy, this.onCompletedPolicy, this.timelinePolicy);
}
if (this.delayed)
{

Просмотреть файл

@ -48,19 +48,13 @@ namespace Microsoft.StreamProcessing
public override IDisposable Subscribe(IStreamObserver<Empty, TPayload> observer)
{
Contract.EnsuresOnThrow<IngressException>(true);
IIngressStreamObserver subscription;
if (this.timelinePolicy.timelineEnum == TimelineEnum.WallClock)
{
subscription = new MonotonicArraySubscriptionWallClock<TPayload>(this.observable, this.IngressSiteIdentifier,
var subscription = this.timelinePolicy.timelineEnum == TimelineEnum.WallClock
? new MonotonicArraySubscriptionWallClock<TPayload>(this.observable, this.IngressSiteIdentifier,
this,
observer, this.onCompletedPolicy, this.timelinePolicy)
: (IIngressStreamObserver)new MonotonicArraySubscriptionSequence<TPayload>(this.observable, this.IngressSiteIdentifier,
this,
observer, this.onCompletedPolicy, this.timelinePolicy);
}
else
{
subscription = new MonotonicArraySubscriptionSequence<TPayload>(this.observable, this.IngressSiteIdentifier,
this,
observer, this.onCompletedPolicy, this.timelinePolicy);
}
if (this.delayed)
{
@ -76,5 +70,4 @@ namespace Microsoft.StreamProcessing
public string IngressSiteIdentifier { get; }
}
}

Просмотреть файл

@ -45,10 +45,8 @@ namespace Microsoft.StreamProcessing
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<Empty, TSource, TSource>("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression,
=> Generate<Empty, TSource, TSource>("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> GenerateFused<TSource, TResult>(
Expression<Func<TSource, long>> startEdgeExpression,
@ -56,10 +54,8 @@ namespace Microsoft.StreamProcessing
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<Empty, TSource, TResult>("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression,
=> Generate<Empty, TSource, TResult>("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> Generate<TKey, TSource>(
Expression<Func<TSource, TKey>> partitionExpression,
@ -68,10 +64,8 @@ namespace Microsoft.StreamProcessing
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<TKey, TSource, TSource>("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression,
=> Generate<TKey, TSource, TSource>("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> GenerateFused<TKey, TSource, TResult>(
Expression<Func<TSource, TKey>> partitionExpression,
@ -80,46 +74,36 @@ namespace Microsoft.StreamProcessing
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<TKey, TSource, TResult>("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression,
=> Generate<TKey, TSource, TResult>("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> Generate<TSource>(
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<Empty, TSource, TSource>("StreamEvent", string.Empty, null, null, null,
=> Generate<Empty, TSource, TSource>("StreamEvent", string.Empty, null, null, null,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> GenerateFused<TSource, TResult>(
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<Empty, TSource, TResult>("StreamEvent", string.Empty, null, null, null,
=> Generate<Empty, TSource, TResult>("StreamEvent", string.Empty, null, null, null,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> Generate<TKey, TSource>(
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<TKey, TSource, TSource>("StreamEvent", "Partitioned", null, null, null,
=> Generate<TKey, TSource, TSource>("StreamEvent", "Partitioned", null, null, null,
latencyOption, diagnosticOption, fuseModule);
}
internal static Tuple<Type, string> GenerateFused<TKey, TSource, TResult>(
string latencyOption,
string diagnosticOption,
FuseModule fuseModule)
{
return Generate<TKey, TSource, TResult>("StreamEvent", "Partitioned", null, null, null,
=> Generate<TKey, TSource, TResult>("StreamEvent", "Partitioned", null, null, null,
latencyOption, diagnosticOption, fuseModule);
}
private static Tuple<Type, string> Generate<TKey, TSource, TResult>(
string ingressType,
@ -158,25 +142,25 @@ namespace Microsoft.StreamProcessing
template.ingressType = ingressType;
template.latencyOption = latencyOption;
template.partitionFunction = (x => partitionExpression == null ?
template.partitionFunction = x => partitionExpression == null ?
string.Empty : partitionExpression.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ partitionExpression.Parameters.Single(), x }
}));
template.startEdgeFunction = (x => startEdgeExpression == null ?
});
template.startEdgeFunction = x => startEdgeExpression == null ?
string.Empty : startEdgeExpression.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ startEdgeExpression.Parameters.Single(), x }
}));
template.endEdgeFunction = (x => endEdgeExpression == null ?
});
template.endEdgeFunction = x => endEdgeExpression == null ?
"StreamEvent.InfinitySyncTime" :
endEdgeExpression.Body.ExpressionToCSharpStringWithParameterSubstitution(
new Dictionary<ParameterExpression, string>
{
{ endEdgeExpression.Parameters.Single(), x }
}));
});
template.fusionOption = fuseModule.IsEmpty ? "Simple" :
(Config.AllowFloatingReorderPolicy ? "Disordered" : "Fused");

Просмотреть файл

@ -24,9 +24,7 @@ namespace Microsoft.StreamProcessing
/// <param name="defaultAccumulator"></param>
/// <returns></returns>
public static Afa<TInput, TRegister, TAccumulator> Create<TInput, TRegister, TAccumulator>(TRegister defaultRegister = default, TAccumulator defaultAccumulator = default)
{
return new Afa<TInput, TRegister, TAccumulator>(defaultRegister, defaultAccumulator);
}
=> new Afa<TInput, TRegister, TAccumulator>(defaultRegister, defaultAccumulator);
/// <summary>
///
@ -36,9 +34,7 @@ namespace Microsoft.StreamProcessing
/// <param name="defaultRegister"></param>
/// <returns></returns>
public static Afa<TInput, TRegister> Create<TInput, TRegister>(TRegister defaultRegister = default)
{
return new Afa<TInput, TRegister>(defaultRegister);
}
=> new Afa<TInput, TRegister>(defaultRegister);
}
@ -107,9 +103,7 @@ namespace Microsoft.StreamProcessing
/// <param name="fence">An added condition that must be met for the transition to occur</param>
/// <param name="transfer">An expression to mutate the register value when the transition occurs</param>
public void AddSingleElementArc(int fromState, int toState, Expression<Func<long, TInput, TRegister, bool>> fence, Expression<Func<long, TInput, TRegister, TRegister>> transfer = null)
{
AddArc(fromState, toState, new SingleElementArc<TInput, TRegister> { Fence = fence, Transfer = transfer });
}
=> AddArc(fromState, toState, new SingleElementArc<TInput, TRegister> { Fence = fence, Transfer = transfer });
/// <summary>
/// Adds a transition to the AFA triggered by a list of concurrent elements
@ -119,9 +113,7 @@ namespace Microsoft.StreamProcessing
/// <param name="fence">An added condition that must be met for the transition to occur</param>
/// <param name="transfer">An expression to mutate the register value when the transition occurs</param>
public void AddListElementArc(int fromState, int toState, Expression<Func<long, List<TInput>, TRegister, bool>> fence, Expression<Func<long, List<TInput>, TRegister, TRegister>> transfer = null)
{
AddArc(fromState, toState, new ListElementArc<TInput, TRegister> { Fence = fence, Transfer = transfer });
}
=> AddArc(fromState, toState, new ListElementArc<TInput, TRegister> { Fence = fence, Transfer = transfer });
/// <summary>
/// Adds an epsilon (no action) arc to the AFA
@ -129,9 +121,7 @@ namespace Microsoft.StreamProcessing
/// <param name="fromState">Starting state of the transition</param>
/// <param name="toState">Ending state of the transition</param>
public void AddEpsilonElementArc(int fromState, int toState)
{
AddArc(fromState, toState, new EpsilonArc<TInput, TRegister>());
}
=> AddArc(fromState, toState, new EpsilonArc<TInput, TRegister>());
/// <summary>
/// Adds a transition that handles multiple elements (events) at a given timestamp
@ -151,12 +141,15 @@ namespace Microsoft.StreamProcessing
Expression<Func<long, TAccumulator, TRegister, bool>> fence = null,
Expression<Func<long, TAccumulator, TRegister, TRegister>> transfer = null,
Expression<Action<TAccumulator>> dispose = null)
{
AddArc(fromState, toState, new MultiElementArc<TInput, TRegister, TAccumulator>
=> AddArc(fromState, toState, new MultiElementArc<TInput, TRegister, TAccumulator>
{
Initialize = initialize, Accumulate = accumulate, SkipToEnd = skipToEnd, Fence = fence, Transfer = transfer, Dispose = dispose
Initialize = initialize,
Accumulate = accumulate,
SkipToEnd = skipToEnd,
Fence = fence,
Transfer = transfer,
Dispose = dispose
});
}
/// <summary>
/// Adds an arc to the AFA.
@ -249,10 +242,7 @@ namespace Microsoft.StreamProcessing
/// Set default value of register.
/// </summary>
/// <param name="register"></param>
public void SetDefaultRegister(TRegister register)
{
this.DefaultRegister = register;
}
public void SetDefaultRegister(TRegister register) => this.DefaultRegister = register;
internal CompiledAfa<TInput, TRegister, TAccumulator> Compile() => new CompiledAfa<TInput, TRegister, TAccumulator>(this);

Просмотреть файл

@ -45,7 +45,7 @@ namespace Microsoft.StreamProcessing
var stack = new Stack<int>();
var activeFindTraverser = new FastLinkedList<GroupedActiveState<Empty, TRegister>>.ListTraverser(this.activeStates);
var tentativeFindTraverser = new FastLinkedList<OutputEvent<Empty, TRegister>>.ListTraverser(this.tentativeOutput);
var tentativeVisibleTraverser = new FastLinkedList<OutputEvent<Empty, TRegister>>.VisibleTraverser(this.tentativeOutput);
var tentativeOutputIndex = 0;
var count = batch.Count;
@ -74,11 +74,11 @@ namespace Microsoft.StreamProcessing
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out int index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = this.tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
dest_vsync[this.iter] = this.lastSyncTime;
dest_vother[this.iter] = elem.other;
@ -148,10 +148,9 @@ namespace Microsoft.StreamProcessing
if (arcinfo.Fence(synctime, batch[i], state.register))
{
TRegister newReg;
if (arcinfo.Transfer == null) newReg = state.register;
else newReg = arcinfo.Transfer(synctime, batch[i], state.register);
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, batch[i], state.register);
int ns = arcinfo.toState;
while (true)
{
@ -228,10 +227,9 @@ namespace Microsoft.StreamProcessing
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, batch[i], this.defaultRegister))
{
TRegister newReg;
if (arcinfo.Transfer == null) newReg = this.defaultRegister;
else newReg = arcinfo.Transfer(synctime, batch[i], this.defaultRegister);
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, batch[i], this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
@ -300,11 +298,11 @@ namespace Microsoft.StreamProcessing
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out int index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = this.tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
this.batch.vsync.col[this.iter] = this.lastSyncTime;
this.batch.vother.col[this.iter] = elem.other;
@ -312,10 +310,7 @@ namespace Microsoft.StreamProcessing
this.batch.hash.col[this.iter] = 0;
this.iter++;
if (this.iter == Config.DataBatchSize)
{
FlushContents();
}
if (this.iter == Config.DataBatchSize) FlushContents();
}
this.tentativeOutput.Clear(); // Clear the tentative output list

Просмотреть файл

@ -50,7 +50,7 @@ namespace Microsoft.StreamProcessing
public override unsafe void OnNext(StreamMessage<Empty, TPayload> batch)
{
var tentativeFindTraverser = new FastLinkedList<OutputEvent<Empty, TRegister>>.ListTraverser(this.tentativeOutput);
var tentativeVisibleTraverser = new FastLinkedList<OutputEvent<Empty, TRegister>>.VisibleTraverser(this.tentativeOutput);
var tentativeOutputIndex = 0;
var count = batch.Count;
@ -79,11 +79,11 @@ namespace Microsoft.StreamProcessing
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out int index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = this.tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
dest_vsync[this.iter] = this.lastSyncTime;
dest_vother[this.iter] = elem.other;
@ -205,10 +205,9 @@ namespace Microsoft.StreamProcessing
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, batch[i], this.defaultRegister))
{
if (arcinfo.Transfer != null)
this.activeState_register = arcinfo.Transfer(synctime, batch[i], this.defaultRegister);
else
this.activeState_register = this.defaultRegister;
this.activeState_register = arcinfo.Transfer != null
? arcinfo.Transfer(synctime, batch[i], this.defaultRegister)
: this.defaultRegister;
this.activeState_state = arcinfo.toState;
if (this.isFinal[this.activeState_state])
@ -264,11 +263,11 @@ namespace Microsoft.StreamProcessing
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out int index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = this.tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
this.batch.vsync.col[this.iter] = this.lastSyncTime;
this.batch.vother.col[this.iter] = elem.other;
@ -276,10 +275,7 @@ namespace Microsoft.StreamProcessing
this.batch.hash.col[this.iter] = 0;
this.iter++;
if (this.iter == Config.DataBatchSize)
{
FlushContents();
}
if (this.iter == Config.DataBatchSize) FlushContents();
}
this.tentativeOutput.Clear(); // Clear the tentative output list
@ -296,9 +292,6 @@ namespace Microsoft.StreamProcessing
batch.Free();
}
protected override void UpdatePointers()
{
this.startState = this.startStates[0];
}
protected override void UpdatePointers() => this.startState = this.startStates[0];
}
}

Просмотреть файл

@ -91,10 +91,8 @@ using Microsoft.StreamProcessing.Internal.Collections;
this.Write(">>.ListTraverser(activeStates);\r\n var tentativeFindTraverser = new FastLin" +
"kedList<OutputEvent<Microsoft.StreamProcessing.Empty, ");
this.Write(this.ToStringHelper.ToStringWithCulture(TRegister));
this.Write(">>.ListTraverser(tentativeOutput);\r\n var tentativeVisibleTraverser = new F" +
"astLinkedList<OutputEvent<Microsoft.StreamProcessing.Empty, ");
this.Write(this.ToStringHelper.ToStringWithCulture(TRegister));
this.Write(">>.VisibleTraverser(tentativeOutput);\r\n\r\n ");
this.Write(">>.ListTraverser(tentativeOutput);\r\n var tentativeOutputIndex = 0;\r\n\r\n " +
" ");
this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName));
this.Write(" sourceBatch = batch as ");
this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName));
@ -147,11 +145,11 @@ using Microsoft.StreamProcessing.Internal.Collections;
if (tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = tentativeOutput.Values[tentativeOutputIndex];
dest_vsync[iter] = lastSyncTime;
dest_vother[iter] = elem.other;
@ -314,13 +312,13 @@ using Microsoft.StreamProcessing.Internal.Collections;
int index, hash;
seenEvent = 0;
if (tentativeOutput.Count > 0)
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
this.batch.vsync.col[iter] = lastSyncTime;
this.batch.vother.col[iter] = elem.other;

Просмотреть файл

@ -60,7 +60,7 @@ public sealed class <#= className #> : CompiledAfaPipeBase<Microsoft.StreamProce
Stack<int> stack = new Stack<int>();
var activeFindTraverser = new FastLinkedList<GroupedActiveState<Microsoft.StreamProcessing.Empty, <#= TRegister #>>>.ListTraverser(activeStates);
var tentativeFindTraverser = new FastLinkedList<OutputEvent<Microsoft.StreamProcessing.Empty, <#= TRegister #>>>.ListTraverser(tentativeOutput);
var tentativeVisibleTraverser = new FastLinkedList<OutputEvent<Microsoft.StreamProcessing.Empty, <#= TRegister #>>>.VisibleTraverser(tentativeOutput);
var tentativeOutputIndex = 0;
<#= sourceBatchTypeName #> sourceBatch = batch as <#= sourceBatchTypeName #>;
<#= resultBatchTypeName #> resultBatch = this.batch as <#= resultBatchTypeName #>;
@ -103,11 +103,11 @@ public sealed class <#= className #> : CompiledAfaPipeBase<Microsoft.StreamProce
if (tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = tentativeOutput.Values[tentativeOutputIndex];
dest_vsync[iter] = lastSyncTime;
dest_vother[iter] = elem.other;
@ -261,13 +261,13 @@ public sealed class <#= className #> : CompiledAfaPipeBase<Microsoft.StreamProce
int index, hash;
seenEvent = 0;
if (tentativeOutput.Count > 0)
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
this.batch.vsync.col[iter] = lastSyncTime;
this.batch.vother.col[iter] = elem.other;

Просмотреть файл

@ -93,10 +93,8 @@ using Microsoft.StreamProcessing.Internal.Collections;
" var tentativeFindTraverser = new FastLinkedList<OutputEvent<Microsoft.StreamPr" +
"ocessing.Empty, ");
this.Write(this.ToStringHelper.ToStringWithCulture(TRegister));
this.Write(">>.ListTraverser(tentativeOutput);\r\n var tentativeVisibleTraverser = n" +
"ew FastLinkedList<OutputEvent<Microsoft.StreamProcessing.Empty, ");
this.Write(this.ToStringHelper.ToStringWithCulture(TRegister));
this.Write(">>.VisibleTraverser(tentativeOutput);\r\n\r\n ");
this.Write(">>.ListTraverser(tentativeOutput);\r\n var tentativeOutputIndex = 0;\r\n\r\n" +
" ");
this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName));
this.Write(" sourceBatch = batch as ");
this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName));
@ -147,13 +145,13 @@ using Microsoft.StreamProcessing.Internal.Collections;
{
seenEvent = 0;
if (tentativeOutput.Count > 0)
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
dest_vsync[iter] = lastSyncTime;
dest_vother[iter] = elem.other;
@ -302,13 +300,13 @@ using Microsoft.StreamProcessing.Internal.Collections;
int index, hash;
seenEvent = 0;
if (tentativeOutput.Count > 0)
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
this.batch.vsync.col[iter] = lastSyncTime;
this.batch.vother.col[iter] = elem.other;

Просмотреть файл

@ -64,7 +64,7 @@ using Microsoft.StreamProcessing.Internal.Collections;
{
Stack<int> stack = new Stack<int>();
var tentativeFindTraverser = new FastLinkedList<OutputEvent<Microsoft.StreamProcessing.Empty, <#= TRegister #>>>.ListTraverser(tentativeOutput);
var tentativeVisibleTraverser = new FastLinkedList<OutputEvent<Microsoft.StreamProcessing.Empty, <#= TRegister #>>>.VisibleTraverser(tentativeOutput);
var tentativeOutputIndex = 0;
<#= sourceBatchTypeName #> sourceBatch = batch as <#= sourceBatchTypeName #>;
<#= resultBatchTypeName #> resultBatch = this.batch as <#= resultBatchTypeName #>;
@ -105,13 +105,13 @@ using Microsoft.StreamProcessing.Internal.Collections;
{
seenEvent = 0;
if (tentativeOutput.Count > 0)
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
dest_vsync[iter] = lastSyncTime;
dest_vother[iter] = elem.other;
@ -253,13 +253,13 @@ using Microsoft.StreamProcessing.Internal.Collections;
int index, hash;
seenEvent = 0;
if (tentativeOutput.Count > 0)
if (this.tentativeOutput.Count > 0)
{
tentativeVisibleTraverser.currIndex = 0;
tentativeOutputIndex = 0;
while (tentativeVisibleTraverser.Next(out index))
while (this.tentativeOutput.Iterate(ref tentativeOutputIndex))
{
var elem = tentativeOutput.Values[index];
var elem = this.tentativeOutput.Values[tentativeOutputIndex];
this.batch.vsync.col[iter] = lastSyncTime;
this.batch.vother.col[iter] = elem.other;

Просмотреть файл

@ -613,5 +613,4 @@ namespace Microsoft.StreamProcessing
}
}
}
}

Просмотреть файл

@ -312,9 +312,6 @@ namespace Microsoft.StreamProcessing
}
}
internal void RegisterQueryPlan(string identifier, PlanNode node)
{
this.queryPlans.Add(identifier, node);
}
internal void RegisterQueryPlan(string identifier, PlanNode node) => this.queryPlans.Add(identifier, node);
}
}

Просмотреть файл

@ -24,13 +24,13 @@ namespace Microsoft.StreamProcessing.Serializer
if (size == Config.DataBatchSize)
{
var pool = (ColumnPool<T>)columnPools.GetOrAdd(Tuple.Create(typeof(T), Config.DataBatchSize), t => MemoryManager.GetColumnPool<T>(t.Item2));
pool.Get(out ColumnBatch<T> result);
pool.Get(out var result);
return result;
}
else if (typeof(T) == typeof(long) && size == (1 + (Config.DataBatchSize >> 6))) // bitvector
{
var pool = (ColumnPool<T>)bitVectorPools.GetOrAdd(size, t => MemoryManager.GetBVPool(size)); // TODO: Push magic incantation "1 + (Config.DataBatchSize >> 6)" into method call
pool.Get(out ColumnBatch<T> result);
pool.Get(out var result);
return result;
}
return new ColumnBatch<T>(size);
@ -41,13 +41,13 @@ namespace Microsoft.StreamProcessing.Serializer
if (size == Config.DataBatchSize)
{
var pool = (ColumnPool<T>)columnPools.GetOrAdd(Tuple.Create(typeof(T), Config.DataBatchSize), t => MemoryManager.GetColumnPool<T>(t.Item2));
pool.Get(out ColumnBatch<T> result);
pool.Get(out var result);
return result.col;
}
else if (typeof(T) == typeof(long) && size == (1 + (Config.DataBatchSize >> 6))) // bitvector
{
var pool = (ColumnPool<T>)bitVectorPools.GetOrAdd(size, t => MemoryManager.GetBVPool(size)); // TODO: Push magic incantation "1 + (Config.DataBatchSize >> 6)" into method call
pool.Get(out ColumnBatch<T> result);
pool.Get(out var result);
return result.col;
}
return new T[size];

Просмотреть файл

@ -225,8 +225,8 @@ namespace Microsoft.StreamProcessing.Serializer
Buffer.BlockCopy(buffer, 0, blob, 0, buffer.Length);
}
private static DoublingArrayPool<byte> bytePool;
private static CharArrayPool charArrayPool;
private static readonly Lazy<DoublingArrayPool<byte>> bytePool = new Lazy<DoublingArrayPool<byte>>(MemoryManager.GetDoublingArrayPool<byte>);
private static readonly Lazy<CharArrayPool> charArrayPool = new Lazy<CharArrayPool>(MemoryManager.GetCharArrayPool);
public CharArrayWrapper Decode_CharArrayWrapper()
{
@ -234,12 +234,11 @@ namespace Microsoft.StreamProcessing.Serializer
int usedLength = DecodeInt();
int length = DecodeInt();
if (charArrayPool == null) charArrayPool = MemoryManager.GetCharArrayPool();
CharArrayWrapper result;
if (usedLength == 0)
charArrayPool.Get(out result, 1);
charArrayPool.Value.Get(out result, 1);
else
charArrayPool.Get(out result, usedLength);
charArrayPool.Value.Get(out result, usedLength);
if (header == 0)
{
@ -249,19 +248,17 @@ namespace Microsoft.StreamProcessing.Serializer
}
else
{
if (bytePool == null) bytePool = MemoryManager.GetDoublingArrayPool<byte>();
var encodedSize = DecodeInt();
byte[] buffer;
if (encodedSize > 0)
bytePool.Get(out buffer, encodedSize);
bytePool.Value.Get(out buffer, encodedSize);
else
bytePool.Get(out buffer, 1);
bytePool.Value.Get(out buffer, 1);
this.stream.ReadAllRequiredBytes(buffer, 0, encodedSize);
Encoding.UTF8.GetChars(buffer, 0, encodedSize, result.charArray.content, 0);
bytePool.Return(buffer);
bytePool.Value.Return(buffer);
}
return result;
@ -278,8 +275,6 @@ namespace Microsoft.StreamProcessing.Serializer
public ColumnBatch<string> Decode_ColumnBatch_String()
{
if (bytePool == null) bytePool = MemoryManager.GetDoublingArrayPool<byte>();
var usedLength = ReadIntFixed();
if (usedLength == 0)
{

Просмотреть файл

@ -162,7 +162,7 @@ namespace Microsoft.StreamProcessing.Serializer
this.stream.Write(buffer, 0, buffer.Length);
}
private static DoublingArrayPool<byte> bytePool;
private static readonly Lazy<DoublingArrayPool<byte>> bytePool = new Lazy<DoublingArrayPool<byte>>(MemoryManager.GetDoublingArrayPool<byte>);
public unsafe void Encode(CharArrayWrapper value)
{
@ -176,21 +176,19 @@ namespace Microsoft.StreamProcessing.Serializer
if (Config.SerializationCompressionLevel.HasFlag(SerializationCompressionLevel.CharArrayToUTF8))
{
if (bytePool == null) bytePool = MemoryManager.GetDoublingArrayPool<byte>();
byte[] result;
int encodedSize = Encoding.UTF8.GetByteCount(value.charArray.content, 0, value.UsedLength);
Encode(encodedSize);
if (encodedSize > 0)
bytePool.Get(out result, encodedSize);
bytePool.Value.Get(out result, encodedSize);
else
bytePool.Get(out result, 1);
bytePool.Value.Get(out result, 1);
Encoding.UTF8.GetBytes(value.charArray.content, 0, value.UsedLength, result, 0);
this.stream.Write(result, 0, encodedSize);
bytePool.Return(result);
bytePool.Value.Return(result);
}
else
{
@ -218,9 +216,8 @@ namespace Microsoft.StreamProcessing.Serializer
public unsafe void Encode(ColumnBatch<string> value)
{
if (bytePool == null || charPool == null || intPool == null || shortPool == null || batchSize != Config.DataBatchSize)
if (charPool == null || intPool == null || shortPool == null || batchSize != Config.DataBatchSize)
{
bytePool = MemoryManager.GetDoublingArrayPool<byte>();
charPool = MemoryManager.GetCharArrayPool();
intPool = MemoryManager.GetColumnPool<int>();
shortPool = MemoryManager.GetColumnPool<short>();
@ -245,7 +242,7 @@ namespace Microsoft.StreamProcessing.Serializer
if (maxLength <= short.MaxValue)
{
shortPool.Get(out ColumnBatch<short> lengths);
shortPool.Get(out var lengths);
lengths.UsedLength = value.UsedLength;
CharArrayWrapper caw;

Просмотреть файл

@ -17,7 +17,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
protected override Expression BuildSerializerSafe(Expression encoder, Expression value)
{
PropertyInfo getLength = this.RuntimeType.GetTypeInfo().GetProperty("Length");
var getLength = this.RuntimeType.GetTypeInfo().GetProperty("Length");
if (getLength == null)
{
throw new SerializationException(
@ -28,12 +28,12 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
var body = new List<Expression>();
ParameterExpression arrayLength = Expression.Variable(typeof(int), "arrayLength");
var arrayLength = Expression.Variable(typeof(int), "arrayLength");
body.Add(Expression.Assign(arrayLength, Expression.Property(value, getLength)));
body.Add(EncodeArrayChunkMethod.ReplaceParametersInBody(encoder, arrayLength));
LabelTarget label = Expression.Label();
ParameterExpression counter = Expression.Variable(typeof(int), "counter");
var label = Expression.Label();
var counter = Expression.Variable(typeof(int), "counter");
body.Add(Expression.Assign(counter, ConstantZero));
body.Add(
@ -53,22 +53,22 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
protected override Expression BuildDeserializerSafe(Expression decoder)
{
Type arrayType = this.RuntimeType;
var arrayType = this.RuntimeType;
MethodInfo resize = typeof(Array).GetTypeInfo().GetMethod("Resize").MakeGenericMethod(arrayType.GetElementType());
var resize = typeof(Array).GetTypeInfo().GetMethod("Resize").MakeGenericMethod(arrayType.GetElementType());
var body = new List<Expression>();
ParameterExpression result = Expression.Variable(arrayType, "result");
var result = Expression.Variable(arrayType, "result");
body.Add(Expression.Assign(result, Expression.NewArrayBounds(arrayType.GetElementType(), ConstantZero)));
ParameterExpression index = Expression.Variable(typeof(int), "index");
var index = Expression.Variable(typeof(int), "index");
body.Add(Expression.Assign(index, ConstantZero));
ParameterExpression chunkSize = Expression.Variable(typeof(int), "chunkSize");
ParameterExpression counter = Expression.Variable(typeof(int), "counter");
var chunkSize = Expression.Variable(typeof(int), "chunkSize");
var counter = Expression.Variable(typeof(int), "counter");
LabelTarget chunkReadLoop = Expression.Label();
LabelTarget arrayReadLoop = Expression.Label();
var chunkReadLoop = Expression.Label();
var arrayReadLoop = Expression.Label();
body.Add(
Expression.Loop(

Просмотреть файл

@ -66,15 +66,15 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private Expression<Func<BinaryDecoder, T>> GenerateCachedDeserializer()
{
ParameterExpression decoderParam = Expression.Parameter(typeof(BinaryDecoder), "decoder");
ParameterExpression instance = Expression.Variable(this.RuntimeType, "instance");
var decoderParam = Expression.Parameter(typeof(BinaryDecoder), "decoder");
var instance = Expression.Variable(this.RuntimeType, "instance");
var body = new List<Expression>();
if (this.RuntimeType.HasSupportedParameterizedConstructor())
{
// Cannot create an object beforehand. Have to call a constructor with parameters.
var properties = this.fields.Select(f => f.Schema.BuildDeserializer(decoderParam));
ConstructorInfo ctor = this.RuntimeType.GetTypeInfo()
var ctor = this.RuntimeType.GetTypeInfo()
.GetConstructors()
.Single(c => c.GetParameters().Select(p => p.ParameterType).SequenceEqual(this.fields.Select(f => f.Schema.RuntimeType)));
body.Add(Expression.Assign(instance, Expression.New(ctor, properties)));
@ -88,15 +88,15 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
}
body.Add(instance);
BlockExpression result = Expression.Block(new[] { instance }, body);
var result = Expression.Block(new[] { instance }, body);
return Expression.Lambda<Func<BinaryDecoder, T>>(result, decoderParam);
}
private Action<T, BinaryEncoder> GenerateCachedSerializer()
{
ParameterExpression instanceParam = Expression.Parameter(this.RuntimeType, "instance");
ParameterExpression encoderParam = Expression.Parameter(typeof(BinaryEncoder), "encoder");
Expression block = SerializeFields(encoderParam, instanceParam);
var instanceParam = Expression.Parameter(this.RuntimeType, "instance");
var encoderParam = Expression.Parameter(typeof(BinaryEncoder), "encoder");
var block = SerializeFields(encoderParam, instanceParam);
var lambda = Expression.Lambda<Action<T, BinaryEncoder>>(block, instanceParam, encoderParam);
return lambda.Compile();
}
@ -131,17 +131,15 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
public static Expression ThrowUnexpectedNullCheckExpression(Type type)
{
MethodInfo exceptionMethodInfo = typeof(ClassSerializer).GetMethod("UnexpectedNullValueException", BindingFlags.NonPublic | BindingFlags.Static);
var exceptionMethodInfo = typeof(ClassSerializer).GetMethod("UnexpectedNullValueException", BindingFlags.NonPublic | BindingFlags.Static);
return Expression.Throw(Expression.Call(exceptionMethodInfo, Expression.Constant(type)));
}
private static Exception UnexpectedNullValueException(Type type)
{
return new SerializationException(
=> new SerializationException(
string.Format(
CultureInfo.InvariantCulture,
"Unexpected null value for the object of type '{0}'. Please check the schema.",
type));
}
}
}

Просмотреть файл

@ -30,7 +30,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private static Expression<Action<List<TItem>, TItem>> ListAddExpression = (c, i) => c.Add(i);
private static Expression<Action<ICollection<TItem>, TItem>> CollectionAddExpression = (c, i) => c.Add(i);
private ObjectSerializerBase itemSchema;
private readonly ObjectSerializerBase itemSchema;
public EnumerableSerializer(ObjectSerializerBase item) : base(typeof(TCollection)) => this.itemSchema = item;
@ -155,23 +155,21 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
}
protected override Expression BuildDeserializerSafe(Expression decoder)
{
if (typeof(ICollection<TItem>).GetTypeInfo().IsAssignableFrom(typeof(TCollection).GetTypeInfo()))
return BuildDeserializerForCollection(decoder);
return BuildDeserializerForEnumerable(decoder);
}
=> typeof(ICollection<TItem>).GetTypeInfo().IsAssignableFrom(typeof(TCollection).GetTypeInfo())
? BuildDeserializerForCollection(decoder)
: BuildDeserializerForEnumerable(decoder);
private Expression BuildDeserializerForEnumerable(Expression decoder)
{
MethodInfo addElement = GetAddMethod();
var addElement = GetAddMethod();
var result = Expression.Variable(typeof(TCollection), "result");
var index = Expression.Variable(typeof(int), "index");
var chunkSize = Expression.Variable(typeof(int), "chunkSize");
var counter = Expression.Variable(typeof(int), "counter");
LabelTarget chunkLoop = Expression.Label();
LabelTarget enumerableLoop = Expression.Label();
var chunkLoop = Expression.Label();
var enumerableLoop = Expression.Label();
return Expression.Block(
new[] { result, index, chunkSize, counter },
@ -196,11 +194,11 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private Expression BuildDeserializerForCollection(Expression decoder)
{
ParameterExpression result = Expression.Variable(typeof(TCollection));
ParameterExpression currentNumberOfElements = Expression.Variable(typeof(int));
var result = Expression.Variable(typeof(TCollection));
var currentNumberOfElements = Expression.Variable(typeof(int));
ParameterExpression counter = Expression.Variable(typeof(int));
LabelTarget internalLoopLabel = Expression.Label();
var counter = Expression.Variable(typeof(int));
var internalLoopLabel = Expression.Label();
// For types that have a count property, we encode as a single chunk and can thus decode in a single pass.
var body = Expression.Block(
@ -223,14 +221,13 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private MethodInfo GetAddMethod()
{
Type type = this.RuntimeType;
MethodInfo result = type.GetMethodByName("Add", typeof(TItem))
?? type.GetMethodByName("Enqueue", typeof(TItem))
?? type.GetMethodByName("Push", typeof(TItem));
var result = this.RuntimeType.GetMethodByName("Add", typeof(TItem))
?? this.RuntimeType.GetMethodByName("Enqueue", typeof(TItem))
?? this.RuntimeType.GetMethodByName("Push", typeof(TItem));
if (result == null)
throw new SerializationException(
string.Format(CultureInfo.InvariantCulture, "Collection type '{0}' does not have Add/Enqueue/Push method.", type));
string.Format(CultureInfo.InvariantCulture, "Collection type '{0}' does not have Add/Enqueue/Push method.", this.RuntimeType));
return result;
}
}

Просмотреть файл

@ -33,13 +33,13 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
if (currentRank == maxRank)
return this.ItemSchema.BuildSerializer(encoder, Expression.ArrayIndex(value, indexes));
MethodInfo getLength = this.RuntimeType.GetTypeInfo().GetMethod("GetLength");
ParameterExpression length = Expression.Variable(typeof(int), "length");
var getLength = this.RuntimeType.GetTypeInfo().GetMethod("GetLength");
var length = Expression.Variable(typeof(int), "length");
body.Add(Expression.Assign(length, Expression.Call(value, getLength, new Expression[] { Expression.Constant(currentRank) })));
body.Add(EncodeArrayChunkMethod.ReplaceParametersInBody(encoder, length));
LabelTarget label = Expression.Label();
ParameterExpression counter = Expression.Variable(typeof(int), "counter");
var label = Expression.Label();
var counter = Expression.Variable(typeof(int), "counter");
body.Add(Expression.Assign(counter, ConstantZero));
indexes.Add(counter);
@ -62,10 +62,10 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
{
var body = new List<Expression>();
Type type = this.RuntimeType;
Type jaggedType = GenerateJaggedType(this.RuntimeType);
var type = this.RuntimeType;
var jaggedType = GenerateJaggedType(this.RuntimeType);
ParameterExpression deserialized = Expression.Variable(jaggedType, "deserialized");
var deserialized = Expression.Variable(jaggedType, "deserialized");
body.Add(Expression.Assign(deserialized, GenerateBuildJaggedDeserializer(decoder, jaggedType, 0, type.GetArrayRank())));
var lengths = new List<Expression>();
@ -76,7 +76,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
currentObject = Expression.Property(currentObject, "Item", new Expression[] { ConstantZero });
}
ParameterExpression result = Expression.Variable(type, "result");
var result = Expression.Variable(type, "result");
body.Add(Expression.Assign(result, Expression.NewArrayBounds(type.GetElementType(), lengths)));
body.Add(GenerateCopy(new List<Expression>(), result, deserialized, 0, type.GetArrayRank()));
body.Add(result);
@ -86,8 +86,8 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private static Type GenerateJaggedType(Type sourceType)
{
int rank = sourceType.GetArrayRank();
Type elementType = sourceType.GetElementType();
Type result = elementType;
var elementType = sourceType.GetElementType();
var result = elementType;
for (int i = 0; i < rank; i++)
{
result = typeof(List<>).MakeGenericType(result);
@ -103,18 +103,18 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
return this.ItemSchema.BuildDeserializer(decoder);
}
ParameterExpression result = Expression.Variable(valueType, "result");
ParameterExpression index = Expression.Variable(typeof(int), "index");
ParameterExpression chunkSize = Expression.Variable(typeof(int), "chunkSize");
var result = Expression.Variable(valueType, "result");
var index = Expression.Variable(typeof(int), "index");
var chunkSize = Expression.Variable(typeof(int), "chunkSize");
body.Add(Expression.Assign(result, Expression.New(valueType)));
body.Add(Expression.Assign(index, ConstantZero));
LabelTarget internalLoopLabel = Expression.Label();
ParameterExpression counter = Expression.Variable(typeof(int));
var internalLoopLabel = Expression.Label();
var counter = Expression.Variable(typeof(int));
body.Add(Expression.Assign(counter, ConstantZero));
LabelTarget allRead = Expression.Label();
var allRead = Expression.Label();
body.Add(
Expression.Loop(
@ -155,11 +155,11 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
return Expression.Assign(Expression.ArrayAccess(destination, indexes), source);
}
ParameterExpression length = Expression.Variable(typeof(int), "length");
var length = Expression.Variable(typeof(int), "length");
body.Add(Expression.Assign(length, Expression.Property(source, "Count")));
LabelTarget label = Expression.Label();
ParameterExpression counter = Expression.Variable(typeof(int), "counter");
var label = Expression.Label();
var counter = Expression.Variable(typeof(int), "counter");
body.Add(Expression.Assign(counter, ConstantZero));
indexes.Add(counter);

Просмотреть файл

@ -33,7 +33,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
var tmp = Expression.Variable(this.Schema.RuntimeType, Guid.NewGuid().ToString());
var assignment = Expression.Assign(tmp, member);
Expression serialized = this.Schema.BuildSerializer(encoder, tmp);
var serialized = this.Schema.BuildSerializer(encoder, tmp);
return Expression.Block(new[] { tmp }, new[] { assignment, serialized });
}
@ -57,9 +57,8 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
}
private Expression GetMember(Expression @object)
{
if (this.MemberInfo.isField) return Expression.Field(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName);
else return Expression.Property(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName);
}
=> this.MemberInfo.isField
? Expression.Field(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName)
: Expression.Property(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName);
}
}

Просмотреть файл

@ -89,7 +89,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
var surrogate = this.settings.Surrogate;
if (surrogate != null)
{
if (surrogate.IsSupportedType(type, out MethodInfo serialize, out MethodInfo deserialize))
if (surrogate.IsSupportedType(type, out var serialize, out var deserialize))
{
return new SurrogateSerializer(this.settings, type, serialize, deserialize);
}
@ -121,8 +121,8 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
private ObjectSerializerBase CreateNotNullableSchema(Type type, uint currentDepth)
{
if (RuntimeTypeToSerializer.TryGetValue(type, out Func<ObjectSerializerBase> p)) return p();
if (this.seenTypes.TryGetValue(type, out ObjectSerializerBase schema)) return schema;
if (RuntimeTypeToSerializer.TryGetValue(type, out var p)) return p();
if (this.seenTypes.TryGetValue(type, out var schema)) return schema;
var typeInfo = type.GetTypeInfo();
if (typeInfo.IsEnum) return BuildEnumTypeSchema(type);
@ -170,7 +170,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
var members = type.ResolveMembers();
foreach (var info in members)
{
ObjectSerializerBase fieldSchema = CreateSchema(info.Type, currentDepth + 1);
var fieldSchema = CreateSchema(info.Type, currentDepth + 1);
var recordField = new RecordFieldSerializer(fieldSchema, info);
record.AddField(recordField);

Просмотреть файл

@ -23,16 +23,17 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
protected override Expression BuildSerializerSafe(Expression encoder, Expression value)
{
var surrogate = Expression.Constant(this.Surrogate);
Expression stream = Expression.Field(encoder, "stream");
var stream = Expression.Field(encoder, "stream");
return Expression.Call(surrogate, this.serialize, new[] { value, stream });
}
protected override Expression BuildDeserializerSafe(Expression decoder)
{
var surrogate = Expression.Constant(this.Surrogate);
Expression stream = Expression.Field(decoder, "stream");
var stream = Expression.Field(decoder, "stream");
return Expression.Call(surrogate, this.deserialize, new[] { stream });
}
private ISurrogate Surrogate { get; }
}
}

Просмотреть файл

@ -69,9 +69,9 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers
protected override Expression BuildDeserializerSafe(Expression decoder)
{
ParameterExpression resultParameter = Expression.Variable(this.RuntimeType, "result");
ParameterExpression unionTypeParameter = Expression.Variable(typeof(int), "unionType");
BinaryExpression assignUnionType = Expression.Assign(
var resultParameter = Expression.Variable(this.RuntimeType, "result");
var unionTypeParameter = Expression.Variable(typeof(int), "unionType");
var assignUnionType = Expression.Assign(
unionTypeParameter,
decoderExpression.ReplaceParametersInBody(decoder));

Просмотреть файл

@ -12,40 +12,37 @@ namespace Microsoft.StreamProcessing.Serializer
internal sealed class StateSerializer<T>
{
private readonly ObjectSerializerBase schema;
private Action<BinaryEncoder, T> serialize;
private Func<BinaryDecoder, T> deserialize;
private readonly Lazy<Action<BinaryEncoder, T>> serialize;
private readonly Lazy<Func<BinaryDecoder, T>> deserialize;
internal StateSerializer(ObjectSerializerBase schema) => this.schema = schema ?? throw new ArgumentNullException(nameof(schema));
public void Serialize(Stream stream, T obj)
internal StateSerializer(ObjectSerializerBase schema)
{
if (this.serialize == null) GenerateSerializer();
this.serialize(new BinaryEncoder(stream), obj);
this.schema = schema ?? throw new ArgumentNullException(nameof(schema));
this.serialize = new Lazy<Action<BinaryEncoder, T>>(GenerateSerializer);
this.deserialize = new Lazy<Func<BinaryDecoder, T>>(GenerateDeserializer);
}
public T Deserialize(Stream stream)
{
if (this.deserialize == null) GenerateDeserializer();
return this.deserialize(new BinaryDecoder(stream));
}
public void Serialize(Stream stream, T obj) => this.serialize.Value(new BinaryEncoder(stream), obj);
private void GenerateSerializer()
public T Deserialize(Stream stream) => this.deserialize.Value(new BinaryDecoder(stream));
private Action<BinaryEncoder, T> GenerateSerializer()
{
var instance = Expression.Parameter(typeof(T), "instance");
var encoder = Expression.Parameter(typeof(BinaryEncoder), "encoder");
var result = this.schema.BuildSerializer(encoder, instance);
var lambda = Expression.Lambda<Action<BinaryEncoder, T>>(result, encoder, instance);
this.serialize = lambda.Compile();
return lambda.Compile();
}
private void GenerateDeserializer()
private Func<BinaryDecoder, T> GenerateDeserializer()
{
var decoder = Expression.Parameter(typeof(BinaryDecoder), "decoder");
var result = this.schema.BuildDeserializer(decoder);
var lambda = Expression.Lambda<Func<BinaryDecoder, T>>(result, decoder);
this.deserialize = lambda.Compile();
return lambda.Compile();
}
}
}

Просмотреть файл

@ -62,8 +62,7 @@ namespace Microsoft.StreamProcessing
{
public StreamableContract(StreamProperties<TKey, TPayload> properties)
: base(properties)
{
}
{ }
public override IDisposable Subscribe(IStreamObserver<TKey, TPayload> observer)
{

Просмотреть файл

@ -27,7 +27,7 @@ namespace Microsoft.StreamProcessing
public override IDisposable Subscribe(IStreamObserver<TKey, TResult> observer)
{
IStreamObserver<TKey, TSource> pipe = CreatePipe(observer);
var pipe = CreatePipe(observer);
return this.Source.Subscribe(pipe);
}

Просмотреть файл

@ -92,7 +92,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections
{
if (queue != null)
{
while (queue.TryDequeue(out CharArrayWrapper result))
while (queue.TryDequeue(out var result))
{
result = null;
Interlocked.Decrement(ref this.createdObjects);

Просмотреть файл

@ -41,15 +41,13 @@ namespace Microsoft.StreamProcessing
else
{
Interlocked.Decrement(ref this.RefCount);
pool.Get(out CharArrayWrapper result, this.UsedLength);
pool.Get(out var result, this.UsedLength);
if (copyData)
{
fixed (char* dest = result.charArray.content)
fixed (char* src = this.charArray.content)
{
fixed (char* src = this.charArray.content)
{
MyStringBuilder.Wstrcpy(dest, src, this.UsedLength);
}
MyStringBuilder.Wstrcpy(dest, src, this.UsedLength);
}
}
return result;

Просмотреть файл

@ -47,10 +47,10 @@ namespace Microsoft.StreamProcessing.Internal.Collections
[DataMember]
private MyStringBuilder msb;
private CharArrayPool charArrayPool;
private ColumnPool<int> intPool;
private ColumnPool<short> shortPool;
private ColumnPool<long> bitvectorPool;
private readonly CharArrayPool charArrayPool;
private readonly ColumnPool<int> intPool;
private readonly ColumnPool<short> shortPool;
private readonly ColumnPool<long> bitvectorPool;
/// <summary>
/// Currently for internal use only - do not use directly.
@ -263,17 +263,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections
#endregion
internal void AssignPools(CharArrayPool caPool, ColumnPool<int> intPool, ColumnPool<short> shortPool, ColumnPool<long> bitvectorPool)
{
Contract.Requires(this.State == MultiStringState.Unsealed);
Contract.Ensures(this.State == MultiStringState.Unsealed);
this.charArrayPool = caPool;
this.intPool = intPool;
this.shortPool = shortPool;
this.bitvectorPool = bitvectorPool;
}
// Clone the multi-string shell only
/// <summary>
/// Currently for internal use only - do not use directly.

Просмотреть файл

@ -263,7 +263,7 @@ namespace Microsoft.StreamProcessing
private MyStringBuilder FindChunkForIndex(int index)
{
MyStringBuilder chunkPrevious = this;
var chunkPrevious = this;
while (chunkPrevious.m_ChunkOffset > index)
{
chunkPrevious = chunkPrevious.m_ChunkPrevious;
@ -291,7 +291,7 @@ namespace Microsoft.StreamProcessing
if (this.Length == 0) return resultCaw;
MyStringBuilder chunkPrevious = this;
var chunkPrevious = this;
fixed (char* str2 = result)
{
char* charPtr = str2;
@ -379,7 +379,7 @@ namespace Microsoft.StreamProcessing
}
else
{
MyStringBuilder builder = FindChunkForIndex(value);
var builder = FindChunkForIndex(value);
if (builder != this)
{
int num3 = capacity - builder.m_ChunkOffset;

Просмотреть файл

@ -43,7 +43,7 @@ namespace Microsoft.StreamProcessing
if (Config.ForceRowBasedExecution) return new StreamMessage<TKey, TPayload>(pool);
if (!typeof(TPayload).CanRepresentAsColumnar()) return new StreamMessage<TKey, TPayload>(pool);
Type generatedBatchType = GetStreamMessageType<TKey, TPayload>();
var generatedBatchType = GetStreamMessageType<TKey, TPayload>();
var instance = Activator.CreateInstance(generatedBatchType, pool);
var returnValue = (StreamMessage<TKey, TPayload>)instance;

Просмотреть файл

@ -192,12 +192,12 @@ namespace Microsoft.StreamProcessing
this.error = true;
return;
}
if (!this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi))
if (!this.parameterInformation.TryGetValue(parameter, out var selectParameter))
{
this.error = true;
return;
}
var columnarField = spi.parameterRepresentation.Fields[fieldOrAutoProp.Name];
var columnarField = selectParameter.parameterRepresentation.Fields[fieldOrAutoProp.Name];
if (this.resultTypeInformation.noFields)
{
// Then e.f is of type R (the result type) where R is a primitive type or some type that doesn't get decomposed.
@ -232,12 +232,12 @@ namespace Microsoft.StreamProcessing
private ParameterExpression GetIndexVariable(ParameterExpression parameter)
{
if (!this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi))
if (!this.parameterInformation.TryGetValue(parameter, out var selectParameter))
{
// then this parameter must be a parameter of an inner lambda or a parameter not being substituted for
return parameter;
}
var indexVariable = Expression.Variable(typeof(int), spi.IndexVariableName);
var indexVariable = Expression.Variable(typeof(int), selectParameter.IndexVariableName);
return indexVariable;
}
@ -273,7 +273,7 @@ namespace Microsoft.StreamProcessing
{
Contract.Assume(parameter.Type == this.resultTypeInformation.RepresentationFor);
if (!this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi))
if (!this.parameterInformation.TryGetValue(parameter, out var selectParameter))
{
if (!hasStartEdge && !this.resultTypeInformation.noFields)
{
@ -288,7 +288,7 @@ namespace Microsoft.StreamProcessing
}
foreach (var resultField in this.resultTypeInformation.AllFields)
{
var matchingField = spi.parameterRepresentation.AllFields.First(f => f.OriginalName == resultField.OriginalName);
var matchingField = selectParameter.parameterRepresentation.AllFields.First(f => f.OriginalName == resultField.OriginalName);
if (this.noSwingingFields)
{
var a = GetBatchColumnIndexer(parameter, matchingField);
@ -391,9 +391,9 @@ namespace Microsoft.StreamProcessing
{
if (node.Expression is ParameterExpression parameter)
{
if (this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi))
if (this.parameterInformation.TryGetValue(parameter, out var selectParameter))
{
var columnarField = spi.parameterRepresentation.Fields[member.Name];
var columnarField = selectParameter.parameterRepresentation.Fields[member.Name];
var a = GetBatchColumnIndexer(parameter, columnarField);
return a;
}
@ -428,11 +428,11 @@ namespace Microsoft.StreamProcessing
protected override Expression VisitParameter(ParameterExpression node)
{
if (this.parameterInformation.TryGetValue(node, out SelectParameterInformation spi))
if (this.parameterInformation.TryGetValue(node, out var selectParameter))
{
if (spi.parameterRepresentation.noFields)
if (selectParameter.parameterRepresentation.noFields)
{
var columnarField = spi.parameterRepresentation.PseudoField;
var columnarField = selectParameter.parameterRepresentation.PseudoField;
var a = GetBatchColumnIndexer(node, columnarField);
return a;
}
@ -558,10 +558,9 @@ namespace Microsoft.StreamProcessing
var n = methodCall.Arguments.Count;
if (n == 1)
{
if (firstArgIsChar) // IndexOf/LastIndexOf(char)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), 0, StringComparison.Ordinal", firstArgAsCSharpString);
else // IndexOf/LastIndexOf(string)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, 0, StringComparison.Ordinal", firstArgAsCSharpString);
firstArgsToMultiStringCall = firstArgIsChar
? string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), 0, StringComparison.Ordinal", firstArgAsCSharpString)
: string.Format(CultureInfo.InvariantCulture, "{0}, 0, StringComparison.Ordinal", firstArgAsCSharpString);
}
else
{
@ -570,10 +569,9 @@ namespace Microsoft.StreamProcessing
{
if (methodCall.Arguments.ElementAt(1).Type.Equals(typeof(int)))
{
if (firstArgIsChar) // IndexOf/LastIndexOf(char, int)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString);
else // IndexOf/LastIndexOf(string, int)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString);
firstArgsToMultiStringCall = firstArgIsChar
? string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString)
: string.Format(CultureInfo.InvariantCulture, "{0}, {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString);
}
else
{
@ -590,10 +588,9 @@ namespace Microsoft.StreamProcessing
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString);
else
{
if (methodCall.Method.GetParameters().ElementAt(2).ParameterType.Equals(typeof(int))) // IndexOf/LastIndexOf(string, int, int)
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString);
else // IndexOf/LastIndexOf(string, int, StringComparison),
firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString);
firstArgsToMultiStringCall = methodCall.Method.GetParameters().ElementAt(2).ParameterType.Equals(typeof(int))
? string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString)
: string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString);
}
}
else
@ -612,11 +609,9 @@ namespace Microsoft.StreamProcessing
break;
}
string s = null;
if (methodName.Equals("Substring"))
s = string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector)", methodToCall, firstArgsToMultiStringCall);
else
s = string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector, false)", methodToCall, firstArgsToMultiStringCall);
var s = methodName.Equals("Substring")
? string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector)", methodToCall, firstArgsToMultiStringCall)
: string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector, false)", methodToCall, firstArgsToMultiStringCall);
return s;
}

Просмотреть файл

@ -35,55 +35,46 @@ namespace Microsoft.StreamProcessing
public static Assembly SystemCoreDll = typeof(BinaryExpression).GetTypeInfo().Assembly;
#if DOTNETCORE
internal static IEnumerable<PortableExecutableReference> NetCoreAssemblyReferences
internal static IEnumerable<PortableExecutableReference> GetNetCoreAssemblyReferences()
{
get
var allAvailableAssemblies = RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
? ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(';')
: ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(':');
// From: http://source.roslyn.io/#Microsoft.CodeAnalysis.Scripting/ScriptOptions.cs,40
// These references are resolved lazily. Keep in sync with list in core csi.rsp.
var files = new[]
{
if (netCoreAssemblyReferences == null)
{
string[] allAvailableAssemblies;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
allAvailableAssemblies = ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(';');
else
allAvailableAssemblies = ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(':');
// From: http://source.roslyn.io/#Microsoft.CodeAnalysis.Scripting/ScriptOptions.cs,40
// These references are resolved lazily. Keep in sync with list in core csi.rsp.
var files = new[]
{
"System.Collections",
"System.Collections.Concurrent",
"System.Console",
"System.Diagnostics.Debug",
"System.Diagnostics.Process",
"System.Diagnostics.StackTrace",
"System.Globalization",
"System.IO",
"System.IO.FileSystem",
"System.IO.FileSystem.Primitives",
"System.Reflection",
"System.Reflection.Extensions",
"System.Reflection.Primitives",
"System.Runtime",
"System.Runtime.Extensions",
"System.Runtime.InteropServices",
"System.Text.Encoding",
"System.Text.Encoding.CodePages",
"System.Text.Encoding.Extensions",
"System.Text.RegularExpressions",
"System.Threading",
"System.Threading.Tasks",
"System.Threading.Tasks.Parallel",
"System.Threading.Thread",
};
var filteredPaths = allAvailableAssemblies.Where(p => files.Concat(new string[] { "mscorlib", "netstandard", "System.Private.CoreLib", "System.Runtime.Serialization.Primitives", }).Any(f => Path.GetFileNameWithoutExtension(p).Equals(f)));
netCoreAssemblyReferences = filteredPaths.Select(p => MetadataReference.CreateFromFile(p));
}
return netCoreAssemblyReferences;
}
"System.Collections",
"System.Collections.Concurrent",
"System.Console",
"System.Diagnostics.Debug",
"System.Diagnostics.Process",
"System.Diagnostics.StackTrace",
"System.Globalization",
"System.IO",
"System.IO.FileSystem",
"System.IO.FileSystem.Primitives",
"System.Reflection",
"System.Reflection.Extensions",
"System.Reflection.Primitives",
"System.Runtime",
"System.Runtime.Extensions",
"System.Runtime.InteropServices",
"System.Text.Encoding",
"System.Text.Encoding.CodePages",
"System.Text.Encoding.Extensions",
"System.Text.RegularExpressions",
"System.Threading",
"System.Threading.Tasks",
"System.Threading.Tasks.Parallel",
"System.Threading.Thread",
};
var filteredPaths = allAvailableAssemblies.Where(p => files.Concat(new string[] { "mscorlib", "netstandard", "System.Private.CoreLib", "System.Runtime.Serialization.Primitives", }).Any(f => Path.GetFileNameWithoutExtension(p).Equals(f)));
return filteredPaths.Select(p => MetadataReference.CreateFromFile(p));
}
private static IEnumerable<PortableExecutableReference> netCoreAssemblyReferences;
private static readonly Lazy<IEnumerable<PortableExecutableReference>> netCoreAssemblyReferences
= new Lazy<IEnumerable<PortableExecutableReference>>(GetNetCoreAssemblyReferences);
#endif
/// <summary>
@ -103,9 +94,8 @@ namespace Microsoft.StreamProcessing
#if DEBUG
includeDebugInfo = (Config.CodegenOptions.BreakIntoCodeGen & Config.CodegenOptions.DebugFlags.Operators) != 0;
#endif
if (includeDebugInfo)
{
return string.Format(
return includeDebugInfo
? string.Format(
CultureInfo.InvariantCulture,
@"
static {0}() {{
@ -113,9 +103,8 @@ namespace Microsoft.StreamProcessing
System.Diagnostics.Debugger.Break();
else
System.Diagnostics.Debugger.Launch();
}}", className);
}
else return string.Empty;
}}", className)
: string.Empty;
}
/// <summary>
@ -240,7 +229,7 @@ namespace Microsoft.StreamProcessing
.Concat(uniqueReferences.Where(reference => metadataReferenceCache.ContainsKey(reference)).Select(reference => metadataReferenceCache[reference]));
#if DOTNETCORE
refs = refs.Concat(NetCoreAssemblyReferences);
refs = refs.Concat(netCoreAssemblyReferences.Value);
#endif
var options = new CSharpCompilationOptions(
@ -255,7 +244,6 @@ namespace Microsoft.StreamProcessing
var ignoreAccessibility = binderFlagsType.GetTypeInfo().GetField("IgnoreAccessibility", BindingFlags.Static | BindingFlags.Public);
topLevelBinderFlagsProperty.SetValue(options, (uint)ignoreCorLibraryDuplicatedTypesMember.GetValue(null) | (uint)ignoreAccessibility.GetValue(null));
var compilation = CSharpCompilation.Create(assemblyName, trees, refs, options);
var a = EmitCompilationAndLoadAssembly(compilation, includeDebugInfo, out errorMessages);
@ -426,6 +414,7 @@ namespace System.Runtime.CompilerServices
private static int BatchClassSequenceNumber = 0;
private static SafeConcurrentDictionary<string> batchType2Name = new SafeConcurrentDictionary<string>();
internal static string GetBatchClassName(Type keyType, Type payloadType)
{
if (!payloadType.CanRepresentAsColumnar())
@ -704,10 +693,7 @@ namespace System.Runtime.CompilerServices
this.DeclaringType = t.DeclaringType;
}
public override string ToString()
{
return "|" + this.TypeName + ", " + this.Name + "|";
}
public override string ToString() => "|" + this.TypeName + ", " + this.Name + "|";
}
internal partial class SafeBatchTemplate
@ -810,53 +796,23 @@ namespace System.Runtime.CompilerServices
{
public abstract string TransformText();
#region Fields
private StringBuilder generationEnvironmentField;
private List<int> indentLengthsField;
private bool endsWithNewline;
private IDictionary<string, object> sessionField;
#endregion
#region Properties
/// <summary>
/// The string builder that generation-time code is using to assemble generated output
/// </summary>
protected StringBuilder GenerationEnvironment
{
get
{
if ((this.generationEnvironmentField == null)) this.generationEnvironmentField = new StringBuilder();
return this.generationEnvironmentField;
}
set
{
this.generationEnvironmentField = value;
}
}
protected StringBuilder GenerationEnvironment { get; } = new StringBuilder();
/// <summary>
/// A list of the lengths of each indent that was added with PushIndent
/// </summary>
private List<int> IndentLengths
{
get
{
if (this.indentLengthsField == null) this.indentLengthsField = new List<int>();
return this.indentLengthsField;
}
}
private List<int> IndentLengths { get; } = new List<int>();
/// <summary>
/// Gets the current indent we use when adding lines to the output
/// </summary>
public string CurrentIndent { get; private set; } = string.Empty;
/// <summary>
/// Current transformation session
/// </summary>
public virtual IDictionary<string, object> Session
{
get => this.sessionField;
set => this.sessionField = value;
}
#endregion
#region Transform-time helpers
/// <summary>
@ -868,8 +824,7 @@ namespace System.Runtime.CompilerServices
// If we're starting off, or if the previous text ended with a newline,
// we have to append the current indent first.
if (((this.GenerationEnvironment.Length == 0)
|| this.endsWithNewline))
if ((this.GenerationEnvironment.Length == 0) || this.endsWithNewline)
{
this.GenerationEnvironment.Append(this.CurrentIndent);
this.endsWithNewline = false;
@ -881,7 +836,7 @@ namespace System.Runtime.CompilerServices
}
// This is an optimization. If the current indent is "", then we don't have to do any
// of the more complex stuff further down.
if ((this.CurrentIndent.Length == 0))
if (this.CurrentIndent.Length == 0)
{
this.GenerationEnvironment.Append(textToAppend);
return;
@ -933,18 +888,19 @@ namespace System.Runtime.CompilerServices
public string PopIndent()
{
string returnValue = string.Empty;
if ((this.IndentLengths.Count > 0))
if (this.IndentLengths.Count > 0)
{
int indentLength = this.IndentLengths[(this.IndentLengths.Count - 1)];
this.IndentLengths.RemoveAt((this.IndentLengths.Count - 1));
if ((indentLength > 0))
int indentLength = this.IndentLengths[this.IndentLengths.Count - 1];
this.IndentLengths.RemoveAt(this.IndentLengths.Count - 1);
if (indentLength > 0)
{
returnValue = this.CurrentIndent.Substring((this.CurrentIndent.Length - indentLength));
this.CurrentIndent = this.CurrentIndent.Remove((this.CurrentIndent.Length - indentLength));
returnValue = this.CurrentIndent.Substring(this.CurrentIndent.Length - indentLength);
this.CurrentIndent = this.CurrentIndent.Remove(this.CurrentIndent.Length - indentLength);
}
}
return returnValue;
}
/// <summary>
/// Remove any indentation
/// </summary>
@ -958,7 +914,7 @@ namespace System.Runtime.CompilerServices
/// <summary>
/// Utility class to produce culture-oriented representation of an object as a string.
/// </summary>
public class ToStringInstanceHelper
public sealed class ToStringInstanceHelper
{
private IFormatProvider formatProviderField = CultureInfo.InvariantCulture;
/// <summary>
@ -969,7 +925,7 @@ namespace System.Runtime.CompilerServices
get => this.formatProviderField;
set
{
if ((value != null)) this.formatProviderField = value;
if (value != null) this.formatProviderField = value;
}
}
/// <summary>
@ -977,15 +933,14 @@ namespace System.Runtime.CompilerServices
/// </summary>
public string ToStringWithCulture(object objectToConvert)
{
if ((objectToConvert == null)) throw new ArgumentNullException(nameof(objectToConvert));
if (objectToConvert == null) throw new ArgumentNullException(nameof(objectToConvert));
Type t = objectToConvert.GetType();
MethodInfo method = t.GetTypeInfo().GetMethod("ToString", new Type[] { typeof(IFormatProvider) });
var t = objectToConvert.GetType();
var method = t.GetTypeInfo().GetMethod("ToString", new Type[] { typeof(IFormatProvider) });
if ((method == null))
return objectToConvert.ToString();
else
return ((string)(method.Invoke(objectToConvert, new object[] { this.formatProviderField })));
return method == null
? objectToConvert.ToString()
: (string)(method.Invoke(objectToConvert, new object[] { this.formatProviderField }));
}
}

Просмотреть файл

@ -45,7 +45,7 @@ namespace Microsoft.StreamProcessing
public static bool TryGetCachedComparer<T>(out IComparerExpression<T> comparer)
{
Type t = typeof(T);
var t = typeof(T);
comparer = null;
if (typeComparerCache.TryGetValue(t, out object temp))
{
@ -84,7 +84,7 @@ namespace Microsoft.StreamProcessing
{
get
{
Type type = typeof(T);
var type = typeof(T);
lock (sentinel)
{

Просмотреть файл

@ -63,10 +63,8 @@ namespace Microsoft.StreamProcessing
=> EqualityComparer.IsEqual(source.GetCompareExpr(), other.GetCompareExpr());
public static bool ExpressionEquals<T>(this IEqualityComparerExpression<T> source, IEqualityComparerExpression<T> other)
{
return EqualityComparer.IsEqual(source.GetEqualsExpr(), other.GetEqualsExpr())
&& EqualityComparer.IsEqual(source.GetGetHashCodeExpr(), other.GetGetHashCodeExpr());
}
=> EqualityComparer.IsEqual(source.GetEqualsExpr(), other.GetEqualsExpr())
&& EqualityComparer.IsEqual(source.GetGetHashCodeExpr(), other.GetGetHashCodeExpr());
/// <summary>
/// Performs a special kind of equality test on IEqualityComparer&lt;T&gt; in which case, both the Equals function and GetHashCode function are checked
@ -91,10 +89,8 @@ namespace Microsoft.StreamProcessing
private static dynamic TryIsEqualIECE(dynamic obj1, dynamic obj2) => IsEqualIECE(obj1, obj2);
private static bool IsEqualIECE<T>(IEqualityComparerExpression<T> o1, IEqualityComparerExpression<T> o2)
{
return EqualityComparer.IsEqual(o1.GetEqualsExpr(), o2.GetEqualsExpr()) &&
EqualityComparer.IsEqual(o1.GetGetHashCodeExpr(), o2.GetGetHashCodeExpr());
}
=> EqualityComparer.IsEqual(o1.GetEqualsExpr(), o2.GetEqualsExpr())
&& EqualityComparer.IsEqual(o1.GetGetHashCodeExpr(), o2.GetGetHashCodeExpr());
// catch-all for other types
private static bool IsEqualIECE(object o1, object o2) => false;

Просмотреть файл

@ -82,12 +82,12 @@ namespace Microsoft.StreamProcessing
public static bool TryGetCachedGetHashCodeFunction<T>(out Func<T, int> getHashCodeFunction)
{
var t = typeof(T);
getHashCodeFunction = null;
if (getHashCodeCache.TryGetValue(t, out object temp))
{
getHashCodeFunction = (Func<T, int>)temp;
return true;
}
getHashCodeFunction = null;
return false;
}

Просмотреть файл

@ -111,8 +111,7 @@ namespace Microsoft.StreamProcessing
var stringBuilder = new StringBuilder();
var visitor = new ConvertToCSharpButWithStringParameters(new StringWriter(stringBuilder, CultureInfo.InvariantCulture), map);
visitor.Visit(e);
var s = stringBuilder.ToString();
return s;
return stringBuilder.ToString();
}
public static LambdaExpression RemoveCastToObject(this LambdaExpression lambda)
@ -157,10 +156,9 @@ namespace Microsoft.StreamProcessing
}
protected override Expression VisitParameter(ParameterExpression node)
{
if (this.arguments.TryGetValue(node, out Expression replacement) && replacement != null) return replacement;
return node;
}
=> this.arguments.TryGetValue(node, out var replacement) && replacement != null
? replacement
: node;
}
internal sealed class ConstantExpressionFinder : ExpressionVisitor
@ -181,12 +179,14 @@ namespace Microsoft.StreamProcessing
me.Visit(function.Body);
return me.isConstant;
}
protected override Expression VisitConstant(ConstantExpression node)
{
var t = node.Type;
if (!t.GetTypeInfo().IsPrimitive) this.isConstant = false;
return base.VisitConstant(node);
}
protected override Expression VisitMember(MemberExpression node)
{
if (!(node.Expression is MemberExpression me)) // if it is a member expression, then let visitor recurse down to the left-most branch
@ -313,8 +313,9 @@ namespace Microsoft.StreamProcessing
if (e1 is NewExpression new1)
{
var new2 = e2 as NewExpression;
if (new1.Constructor == null) return new2.Constructor == null;
return new1.Constructor.Equals(new2.Constructor) && Equals(new1.Arguments, new2.Arguments);
return new1.Constructor == null
? new2.Constructor == null
: new1.Constructor.Equals(new2.Constructor) && Equals(new1.Arguments, new2.Arguments);
}
if (e1 is NewArrayExpression newarr1)
{
@ -475,10 +476,9 @@ namespace Microsoft.StreamProcessing
=> this.arguments = new Dictionary<ParameterExpression, Expression>(arguments);
protected override Expression VisitParameter(ParameterExpression node)
{
if (this.arguments.TryGetValue(node, out Expression replacement)) return replacement;
return base.VisitParameter(node);
}
=> this.arguments.TryGetValue(node, out var replacement)
? replacement
: base.VisitParameter(node);
}
/// <summary>
@ -532,7 +532,7 @@ namespace Microsoft.StreamProcessing
// Returns:
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
public override Expression Visit(Expression node) { return base.Visit(node); }
public override Expression Visit(Expression node) => base.Visit(node);
//
// Summary:
@ -1065,9 +1065,7 @@ namespace Microsoft.StreamProcessing
// The modified expression, if it or any subexpression was modified; otherwise,
// returns the original expression.
protected override MemberBinding VisitMemberBinding(MemberBinding node)
{
return base.VisitMemberBinding(node); // taken care of by VisitMemberAssignment, but what if there are other subtypes of MemberBinding?
}
=> base.VisitMemberBinding(node); // taken care of by VisitMemberAssignment, but what if there are other subtypes of MemberBinding?
//
// Summary:
@ -2027,10 +2025,9 @@ namespace Microsoft.StreamProcessing
}
protected override Expression VisitMember(MemberExpression node)
{
if (node.Member.Name == "Key" && node.Expression == parameter) return newParameter;
return base.VisitMember(node);
}
=> node.Member.Name == "Key" && node.Expression == parameter
? newParameter
: base.VisitMember(node);
protected override Expression VisitParameter(ParameterExpression node)
{

Просмотреть файл

@ -26,10 +26,9 @@ namespace Microsoft.StreamProcessing
{
Contract.Requires(types != null);
var i = types.Count(t => t.IsAnonymousType());
if (i > 0)
return s + "`" + i.ToString(CultureInfo.InvariantCulture);
else
return s;
return i > 0
? s + "`" + i.ToString(CultureInfo.InvariantCulture)
: s;
}
public static bool OptimizeString(this MyFieldInfo field) => Config.UseMultiString && field.Type == typeof(string);

Просмотреть файл

@ -113,8 +113,9 @@ namespace Microsoft.StreamProcessing
public static Tuple<IEnumerable<MyFieldInfo>, bool> GetAnnotatedFields(this Type t)
{
var fields = t.ResolveMembers();
if (fields.Any()) return Tuple.Create(fields, false);
else return Tuple.Create(new MyFieldInfo(t).Yield(), true);
return fields.Any()
? Tuple.Create(fields, false)
: Tuple.Create(new MyFieldInfo(t).Yield(), true);
}
private static readonly Dictionary<string, string> OperatorNameLookup;
@ -158,9 +159,9 @@ namespace Microsoft.StreamProcessing
var genericArgs = types
.Distinct()
.Where(g => IsAnonymousType(g));
if (!genericArgs.Any())
return type;
return type.MakeGenericType(genericArgs.ToArray());
return !genericArgs.Any()
? type
: type.MakeGenericType(genericArgs.ToArray());
}
public static bool IsAnonymousTypeName(this Type type)
@ -315,8 +316,9 @@ namespace Microsoft.StreamProcessing
return outerKeyType.KeyTypeNeedsGeneratedMemoryPool() || innerKeyType.KeyTypeNeedsGeneratedMemoryPool();
}
}
if (keyType == typeof(Empty)) return false;
return keyType.NeedGeneratedMemoryPool();
return keyType == typeof(Empty)
? false
: keyType.NeedGeneratedMemoryPool();
}
/// <summary>
@ -531,28 +533,19 @@ namespace Microsoft.StreamProcessing
}
public static bool ImplementsIEqualityComparerExpression(this Type t)
{
return t
.GetTypeInfo()
=> t.GetTypeInfo()
.GetInterfaces()
.Any(i => i.Namespace.Equals("Microsoft.StreamProcessing") && i.Name.Equals("IEqualityComparerExpression`1") && i.GetTypeInfo().GetGenericArguments().Length == 1 && i.GetTypeInfo().GetGenericArguments()[0] == t);
}
public static bool ImplementsIEqualityComparer(this Type t)
{
return t
.GetTypeInfo()
=> t.GetTypeInfo()
.GetInterfaces()
.Any(i => i.Namespace.Equals("System.Collections.Generic") && i.Name.Equals("IEqualityComparer`1") && i.GetTypeInfo().GetGenericArguments().Length == 1 && i.GetTypeInfo().GetGenericArguments()[0] == t);
}
public static bool ImplementsIEquatable(this Type t)
{
return t
.GetTypeInfo()
=> t.GetTypeInfo()
.GetInterfaces()
.Any(i => i.Namespace.Equals("System") && i.Name.Equals("IEquatable`1") && i.GetTypeInfo().GetGenericArguments().Length == 1 && i.GetTypeInfo().GetGenericArguments()[0] == t);
}
#region Borrowed from Roslyn
@ -651,21 +644,19 @@ namespace Microsoft.StreamProcessing
/// <c>true</c> if the type is unsupported; otherwise, <c>false</c>.
/// </returns>
public static bool IsUnsupported(this Type type)
{
return type == typeof(IntPtr)
|| type == typeof(UIntPtr)
|| type == typeof(object)
|| type.GetTypeInfo().ContainsGenericParameters
|| (!type.IsArray
&& !type.GetTypeInfo().IsValueType
&& !type.HasSupportedParameterizedConstructor()
&& !type.HasParameterlessConstructor()
&& type != typeof(string)
&& type != typeof(Uri)
&& !type.GetTypeInfo().IsAbstract
&& !type.GetTypeInfo().IsInterface
&& !(type.GetTypeInfo().IsGenericType && SupportedInterfaces.Contains(type.GetGenericTypeDefinition())));
}
=> type == typeof(IntPtr)
|| type == typeof(UIntPtr)
|| type == typeof(object)
|| type.GetTypeInfo().ContainsGenericParameters
|| (!type.IsArray
&& !type.GetTypeInfo().IsValueType
&& !type.HasSupportedParameterizedConstructor()
&& !type.HasParameterlessConstructor()
&& type != typeof(string)
&& type != typeof(Uri)
&& !type.GetTypeInfo().IsAbstract
&& !type.GetTypeInfo().IsInterface
&& !(type.GetTypeInfo().IsGenericType && SupportedInterfaces.Contains(type.GetGenericTypeDefinition())));
private static readonly HashSet<Type> SupportedInterfaces = new HashSet<Type>
{