diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs index 9aec0b0..33ebe80 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs @@ -96,14 +96,14 @@ namespace Microsoft.StreamProcessing /// [MethodImpl(MethodImplOptions.AggressiveInlining)] [EditorBrowsable(EditorBrowsableState.Never)] - public bool IsFull() => (((this.tail + 1) & this.capacityMask) == this.head); + public bool IsFull() => ((this.tail + 1) & this.capacityMask) == this.head; /// /// Currently for internal use only - do not use directly. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] [EditorBrowsable(EditorBrowsableState.Never)] - public bool IsEmpty() => (this.head == this.tail); + public bool IsEmpty() => this.head == this.tail; /// /// Currently for internal use only - do not use directly. @@ -134,10 +134,9 @@ namespace Microsoft.StreamProcessing [EditorBrowsable(EditorBrowsableState.Never)] public sealed class ElasticCircularBuffer : IEnumerable { - private LinkedList> buffers; + private readonly LinkedList> buffers = new LinkedList>(); private LinkedListNode> head; private LinkedListNode> tail; - private int count; /// /// Currently for internal use only - do not use directly. @@ -145,11 +144,10 @@ namespace Microsoft.StreamProcessing [EditorBrowsable(EditorBrowsableState.Never)] public ElasticCircularBuffer() { - this.buffers = new LinkedList>(); var node = new LinkedListNode>(new CircularBuffer()); this.buffers.AddFirst(node); this.tail = this.head = node; - this.count = 0; + this.Count = 0; } /// @@ -174,7 +172,7 @@ namespace Microsoft.StreamProcessing } this.tail.Value.Enqueue(ref value); - this.count++; + this.Count++; } /// @@ -201,7 +199,7 @@ namespace Microsoft.StreamProcessing if (this.head == null) this.head = this.buffers.First; } - this.count--; + this.Count--; return this.head.Value.Dequeue(); } @@ -241,27 +239,25 @@ namespace Microsoft.StreamProcessing /// /// [EditorBrowsable(EditorBrowsableState.Never)] - public bool IsEmpty() => (this.head.Value.IsEmpty() && (this.head == this.tail)); - - private IEnumerator Iterate() - { - foreach (CircularBuffer buffer in this.buffers) - foreach (T item in buffer.Iterate()) - yield return item; - } + public bool IsEmpty() => this.head.Value.IsEmpty() && (this.head == this.tail); /// /// Currently for internal use only - do not use directly. /// [EditorBrowsable(EditorBrowsableState.Never)] - public int Count => this.count; + public int Count { get; private set; } /// /// Currently for internal use only - do not use directly. /// /// [EditorBrowsable(EditorBrowsableState.Never)] - public IEnumerator GetEnumerator() => Iterate(); + public IEnumerator GetEnumerator() + { + foreach (var buffer in this.buffers) + foreach (var item in buffer.Iterate()) + yield return item; + } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); } diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnBatch.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnBatch.cs index 0f08894..3e2d5b1 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnBatch.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnBatch.cs @@ -108,7 +108,7 @@ namespace Microsoft.StreamProcessing.Internal } else { - pool.Get(out ColumnBatch result); + pool.Get(out var result); System.Array.Copy(this.col, result.col, this.col.Length); result.UsedLength = this.UsedLength; Interlocked.Decrement(ref this.RefCount); diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnPool.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnPool.cs index 3721936..0e22b27 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnPool.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/ColumnPool.cs @@ -54,21 +54,13 @@ namespace Microsoft.StreamProcessing.Internal.Collections [EditorBrowsable(EditorBrowsableState.Never)] public class ColumnPool : ColumnPoolBase { - private readonly ConcurrentQueue> queue; + private readonly ConcurrentQueue> queue = new ConcurrentQueue>(); private long createdObjects; private readonly int size; - internal ColumnPool() - { - this.queue = new ConcurrentQueue>(); - this.size = Config.DataBatchSize; - } + internal ColumnPool() => this.size = Config.DataBatchSize; - internal ColumnPool(int size) - { - this.queue = new ConcurrentQueue>(); - this.size = size; - } + internal ColumnPool(int size) => this.size = size; /// /// Currently for internal use only - do not use directly. @@ -124,7 +116,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections [EditorBrowsable(EditorBrowsableState.Never)] public override void Free(bool reset = false) { - while (this.queue.TryDequeue(out ColumnBatch result)) + while (this.queue.TryDequeue(out var result)) { result.pool = null; result.col = null; diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/DataStructurePool.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/DataStructurePool.cs index 56b376c..59d5389 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/DataStructurePool.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/DataStructurePool.cs @@ -16,7 +16,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections [EditorBrowsable(EditorBrowsableState.Never)] public sealed class DataStructurePool : IDisposable where T : new() { - private ConcurrentQueue queue; + private readonly ConcurrentQueue queue; private readonly Func creator; /// diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/EndPointHeap.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/EndPointHeap.cs index 40d5c36..76aed4d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/EndPointHeap.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/EndPointHeap.cs @@ -64,42 +64,37 @@ namespace Microsoft.StreamProcessing.Internal.Collections public unsafe void Insert(long time, int value) { // If out of space in the stack, then grow. - if (this.count == this.capacity) - { - Grow(); - } + if (this.count == this.capacity) Grow(); fixed (long* timeArray = this.times) + fixed (int* valueArray = this.values) { - fixed (int* valueArray = this.values) + // Find location for new element in heap, default to end. + int insertPos = this.count; + this.count++; + + // Loop while position 'pos' still has a parent. + while (insertPos > 0) { - // Find location for new element in heap, default to end. - int insertPos = this.count; - this.count++; - - // Loop while position 'pos' still has a parent. - while (insertPos > 0) + // Determine if position 'insertPos' would be consistent with its parent. + int parentPos = (insertPos - 1) >> 1; + long parentTime = *(timeArray + parentPos); + if (parentTime <= time) { - // Determine if position 'insertPos' would be consistent with its parent. - int parentPos = (insertPos - 1) >> 1; - long parentTime = *(timeArray + parentPos); - if (parentTime <= time) - { - // Parent is <= time, so heap would be consistent and we are done. - break; - } - - // Heap is not consistent, so move insertion point to location of - // parent and move parent to current 'insertPos'. - *(timeArray + insertPos) = parentTime; - *(valueArray + insertPos) = *(valueArray + parentPos); - insertPos = parentPos; + // Parent is <= time, so heap would be consistent and we are done. + break; } - // Insert element into heap. - *(timeArray + insertPos) = time; - *(valueArray + insertPos) = value; + // Heap is not consistent, so move insertion point to location of + // parent and move parent to current 'insertPos'. + *(timeArray + insertPos) = parentTime; + *(valueArray + insertPos) = *(valueArray + parentPos); + insertPos = parentPos; } + + // Insert element into heap. + *(timeArray + insertPos) = time; + *(valueArray + insertPos) = value; } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/FastLinkedList.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/FastLinkedList.cs index 23baf36..48be384 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/FastLinkedList.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/FastLinkedList.cs @@ -247,6 +247,33 @@ namespace Microsoft.StreamProcessing.Internal.Collections } } + /// + /// Currently for internal use only - do not use directly. + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [EditorBrowsable(EditorBrowsableState.Never)] + public unsafe bool Iterate(ref int index) + { + fixed (int* hashAndNextArray = this.next) + { + int* hashNextPtr = hashAndNextArray + index; + while (index < this.initialized) + { + index++; + hashNextPtr++; + + long currHashNext = *hashNextPtr; + int currNext = (int)currHashNext; + if (currNext >= 0) return true; + } + + index = 0; + return false; + } + } + /// /// Currently for internal use only - do not use directly. /// @@ -373,64 +400,5 @@ namespace Microsoft.StreamProcessing.Internal.Collections this.list.count--; } } - - /// - /// Currently for internal use only - do not use directly. - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public struct VisibleTraverser - { - private readonly FastLinkedList list; - - /// - /// Currently for internal use only - do not use directly. - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public int currIndex; - - /// - /// Currently for internal use only - do not use directly. - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - [EditorBrowsable(EditorBrowsableState.Never)] - public VisibleTraverser(FastLinkedList list) - { - this.list = list; - this.currIndex = 0; - } - - /// - /// Currently for internal use only - do not use directly. - /// - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - [EditorBrowsable(EditorBrowsableState.Never)] - public unsafe bool Next(out int index) - { - fixed (int* hashAndNextArray = this.list.next) - { - int* hashNextPtr = hashAndNextArray + this.currIndex; - int initialized = this.list.initialized; - while (this.currIndex < initialized) - { - this.currIndex++; - hashNextPtr++; - - long currHashNext = *hashNextPtr; - int currNext = (int)currHashNext; - if (currNext >= 0) - { - index = this.currIndex; - return true; - } - } - - index = 0; - return false; - } - } - } } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/FastStack.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/FastStack.cs index 6a0b535..4c9e5a4 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/FastStack.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/FastStack.cs @@ -62,26 +62,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections get => this.count; } - /// - /// Currently for internal use only - do not use directly. - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public bool IsEmpty - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => this.count == 0; - } - - /// - /// Currently for internal use only - do not use directly. - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public int Capacity - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => this.capacity; - } - /// /// Currently for internal use only - do not use directly. /// @@ -108,53 +88,9 @@ namespace Microsoft.StreamProcessing.Internal.Collections /// /// Currently for internal use only - do not use directly. /// - /// - /// [MethodImpl(MethodImplOptions.AggressiveInlining)] [EditorBrowsable(EditorBrowsableState.Never)] - public int Push(T value) - { - int index = Push(); - this.values[index] = value; - return index; - } - - /// - /// Currently for internal use only - do not use directly. - /// - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - [EditorBrowsable(EditorBrowsableState.Never)] - public int Push(ref T value) - { - int index = Push(); - this.values[index] = value; - return index; - } - - /// - /// Currently for internal use only - do not use directly. - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - [EditorBrowsable(EditorBrowsableState.Never)] - public int Pop() - { - Contract.Assume(this.count > 0); - - return --this.count; - } - - /// - /// Currently for internal use only - do not use directly. - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - [EditorBrowsable(EditorBrowsableState.Never)] - public void Clear() - { - this.count = 0; - } + public void Clear() => this.count = 0; private void Grow() { diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/GlobalColumnPoolManager.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/GlobalColumnPoolManager.cs index b6bfffb..62b36a5 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/GlobalColumnPoolManager.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/GlobalColumnPoolManager.cs @@ -18,8 +18,8 @@ namespace Microsoft.StreamProcessing { private static SafeConcurrentDictionary doublingArrayPools = new SafeConcurrentDictionary(); private static SafeConcurrentDictionary columnPools = new SafeConcurrentDictionary(); - private static SafeConcurrentDictionary charArrayPools = new SafeConcurrentDictionary(); - private static SafeConcurrentDictionary bitvectorPools = new SafeConcurrentDictionary(); + private static SafeConcurrentDictionary charArrayPools = new SafeConcurrentDictionary(); + private static SafeConcurrentDictionary> bitvectorPools = new SafeConcurrentDictionary>(); private static SafeConcurrentDictionary eventBatchPools = new SafeConcurrentDictionary(); private static SafeConcurrentDictionary memoryPools = new SafeConcurrentDictionary(); @@ -46,10 +46,10 @@ namespace Microsoft.StreamProcessing key => new ColumnPool(size < 0 ? Config.DataBatchSize : size)); internal static CharArrayPool GetCharArrayPool() - => (CharArrayPool)charArrayPools.GetOrAdd(CacheKey.Create(), key => new CharArrayPool()); + => charArrayPools.GetOrAdd(CacheKey.Create(), key => new CharArrayPool()); internal static ColumnPool GetBVPool(int size) - => (ColumnPool)bitvectorPools.GetOrAdd(CacheKey.Create(size), key => new ColumnPool(size)); + => bitvectorPools.GetOrAdd(CacheKey.Create(size), key => new ColumnPool(size)); internal static StreamMessagePool GetStreamMessagePool(MemoryPool memoryPool, bool isColumnar) => (StreamMessagePool)eventBatchPools.GetOrAdd( @@ -85,7 +85,7 @@ namespace Microsoft.StreamProcessing } var lookupKey = CacheKey.Create(typeOfTKey, typeOfTPayload); - Type generatedMemoryPool = cachedMemoryPools.GetOrAdd(lookupKey, key => Transformer.GenerateMemoryPoolClass()); + var generatedMemoryPool = cachedMemoryPools.GetOrAdd(lookupKey, key => Transformer.GenerateMemoryPoolClass()); return (MemoryPool)memoryPools.GetOrAdd(cacheKey, t => Activator.CreateInstance(generatedMemoryPool)); } diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/HashHelpers.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/HashHelpers.cs index 5c78b9f..2e03a60 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/HashHelpers.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/HashHelpers.cs @@ -85,7 +85,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections public int hash; } - internal static class HashHelpers { public static readonly int[] primes = new int[] { @@ -99,11 +98,9 @@ namespace Microsoft.StreamProcessing.Internal.Collections public static int ExpandPrime(int oldSize) { int min = 2 * oldSize; - if ((min > 0x7feffffd) && (oldSize < 0x7feffffd)) - { - return 0x7feffffd; - } - return GetPrime(min); + return (min > 0x7feffffd) && (oldSize < 0x7feffffd) + ? 0x7feffffd + : GetPrime(min); } public static int GetPrime(int min) @@ -124,7 +121,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections return min; } - public static bool IsPrime(int candidate) + private static bool IsPrime(int candidate) { if ((candidate & 1) == 0) { diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/PooledCircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/PooledCircularBuffer.cs index ebae07a..02039eb 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/PooledCircularBuffer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/PooledCircularBuffer.cs @@ -54,10 +54,10 @@ namespace Microsoft.StreamProcessing.Internal.Collections } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool IsFull() => (((this.tail + 1) & this.DefaultCapacity) == this.head); + public bool IsFull() => ((this.tail + 1) & this.DefaultCapacity) == this.head; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool IsEmpty() => (this.head == this.tail); + public bool IsEmpty() => this.head == this.tail; public IEnumerable Iterate() { @@ -88,7 +88,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections public sealed class PooledElasticCircularBuffer : IEnumerable, IDisposable { private const int Capacity = 0xff; - private LinkedList> buffers; + private readonly LinkedList> buffers = new LinkedList>(); private LinkedListNode> head; private LinkedListNode> tail; private readonly ColumnPool pool; @@ -100,7 +100,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections public PooledElasticCircularBuffer() { this.pool = MemoryManager.GetColumnPool(Capacity + 1); - this.buffers = new LinkedList>(); var node = new LinkedListNode>(new PooledCircularBuffer(Capacity, this.pool)); this.buffers.AddFirst(node); this.tail = this.head = node; @@ -233,14 +232,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections /// /// [EditorBrowsable(EditorBrowsableState.Never)] - public bool IsEmpty() => (this.head.Value.IsEmpty() && (this.head == this.tail)); - - private IEnumerable Iterate() - { - foreach (PooledCircularBuffer buffer in this.buffers) - foreach (T item in buffer.Iterate()) - yield return item; - } + public bool IsEmpty() => this.head.Value.IsEmpty() && (this.head == this.tail); /// /// Currently for internal use only - do not use directly. @@ -253,7 +245,12 @@ namespace Microsoft.StreamProcessing.Internal.Collections /// /// [EditorBrowsable(EditorBrowsableState.Never)] - public IEnumerator GetEnumerator() => Iterate().GetEnumerator(); + public IEnumerator GetEnumerator() + { + foreach (var buffer in this.buffers) + foreach (var item in buffer.Iterate()) + yield return item; + } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs index 9d0f709..944f34d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs @@ -33,10 +33,7 @@ namespace Microsoft.StreamProcessing /// /// The comparer to be used on elements added to the queue. [EditorBrowsable(EditorBrowsableState.Never)] - public PriorityQueue(IComparer comp) - { - this.comp = comp; - } + public PriorityQueue(IComparer comp) => this.comp = comp; /// /// Add a new item to the priority queue. @@ -63,7 +60,7 @@ namespace Microsoft.StreamProcessing /// /// True if the priority queue is empty. [EditorBrowsable(EditorBrowsableState.Never)] - public bool IsEmpty() => (this.data.Count == 0); + public bool IsEmpty() => this.data.Count == 0; /// /// Dequeue an element from the priority queue. @@ -74,7 +71,7 @@ namespace Microsoft.StreamProcessing { // assumes pq is not empty; up to calling code int li = this.data.Count - 1; // last index (before removal) - T frontItem = this.data[0]; // fetch the front + var frontItem = this.data[0]; // fetch the front this.data[0] = this.data[li]; this.data.RemoveAt(li); @@ -88,7 +85,7 @@ namespace Microsoft.StreamProcessing if (rc <= li && this.comp.Compare(this.data[rc], this.data[ci]) < 0) // if there is a rc (ci + 1), and it is smaller than left child, use the rc instead ci = rc; if (this.comp.Compare(this.data[pi], this.data[ci]) <= 0) break; // parent is smaller than (or equal to) smallest child so done - T tmp = this.data[pi]; + var tmp = this.data[pi]; this.data[pi] = this.data[ci]; this.data[ci] = tmp; // swap parent and child pi = ci; @@ -105,11 +102,7 @@ namespace Microsoft.StreamProcessing /// /// The item next to be dequeued. [EditorBrowsable(EditorBrowsableState.Never)] - public T Peek() - { - T frontItem = this.data[0]; - return frontItem; - } + public T Peek() => this.data[0]; /// /// Returns the number of elements in the priority queue. diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs index ac05e42..9b04f72 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs @@ -12,7 +12,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections { /// /// A dictionary that supports concurrency with similar interface to .NET's ConcurrentDictionary. - /// However, this dictionary changes the implementation of AddOrUpdate and GetOrAdd functions to + /// However, this dictionary changes the implementation and GetOrAdd functions to /// guarantee atomicity per-key for factory lambdas. /// /// Type of values in the dictionary diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessage.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessage.cs index 3b8e146..deaad53 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessage.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessage.cs @@ -484,27 +484,25 @@ namespace Microsoft.StreamProcessing var localOffset = offset; fixed (long* vsync = this.vsync.col) + fixed (long* vother = this.vother.col) { - fixed (long* vother = this.vother.col) + while ((count < Config.DataBatchSize) && (localOffset < n)) { - while ((count < Config.DataBatchSize) && (localOffset < n)) + var partition = partitionExtractor(largeBatch.Array[localOffset]); + var start = startEdgeExtractor(largeBatch.Array[localOffset]); + if (currentTime.ContainsKey(partition) && start < currentTime[partition]) { - var partition = partitionExtractor(largeBatch.Array[localOffset]); - var start = startEdgeExtractor(largeBatch.Array[localOffset]); - if (currentTime.ContainsKey(partition) && start < currentTime[partition]) - { - throw new IngressException("Out-of-order event encountered during ingress, under a disorder policy of Throw"); - } - currentTime[partition] = start; - vsync[count] = start; - vother[count] = endEdgeExtractor(largeBatch.Array[localOffset]); - this.payload.col[count] = largeBatch.Array[localOffset]; - var p = partitionConstructor(partition); - this.key.col[count] = p; - this.hash.col[count] = HashCode(p); - localOffset++; - count++; + throw new IngressException("Out-of-order event encountered during ingress, under a disorder policy of Throw"); } + currentTime[partition] = start; + vsync[count] = start; + vother[count] = endEdgeExtractor(largeBatch.Array[localOffset]); + this.payload.col[count] = largeBatch.Array[localOffset]; + var p = partitionConstructor(partition); + this.key.col[count] = p; + this.hash.col[count] = HashCode(p); + localOffset++; + count++; } } @@ -533,18 +531,16 @@ namespace Microsoft.StreamProcessing var localOffset = offset; fixed (long* vsync = this.vsync.col) + fixed (long* vother = this.vother.col) { - fixed (long* vother = this.vother.col) + while ((count < Config.DataBatchSize) && (localOffset < n)) { - while ((count < Config.DataBatchSize) && (localOffset < n)) - { - currentTime = DateTimeOffset.UtcNow.Ticks; - vsync[count] = currentTime; - vother[count] = StreamEvent.InfinitySyncTime; - this.payload.col[count] = largeBatch.Array[localOffset]; - localOffset++; - count++; - } + currentTime = DateTimeOffset.UtcNow.Ticks; + vsync[count] = currentTime; + vother[count] = StreamEvent.InfinitySyncTime; + this.payload.col[count] = largeBatch.Array[localOffset]; + localOffset++; + count++; } } @@ -582,26 +578,24 @@ namespace Microsoft.StreamProcessing encounteredPunctuation = false; // let's be optimistic! fixed (long* vsync = this.vsync.col) + fixed (long* vother = this.vother.col) { - fixed (long* vother = this.vother.col) + while ((count < Config.DataBatchSize) && (localOffset < n)) { - while ((count < Config.DataBatchSize) && (localOffset < n)) - { - currentTime = currentSync; - vsync[count] = currentTime; - vother[count] = StreamEvent.InfinitySyncTime; - this.payload.col[count] = largeBatch.Array[localOffset]; - localOffset++; - count++; - eventCount++; + currentTime = currentSync; + vsync[count] = currentTime; + vother[count] = StreamEvent.InfinitySyncTime; + this.payload.col[count] = largeBatch.Array[localOffset]; + localOffset++; + count++; + eventCount++; - if (eventCount == eventsPerSample) - { - eventCount = 0; - currentSync++; - encounteredPunctuation = true; - break; - } + if (eventCount == eventsPerSample) + { + eventCount = 0; + currentSync++; + encounteredPunctuation = true; + break; } } } @@ -645,7 +639,7 @@ namespace Microsoft.StreamProcessing if (this.key != null) this.key.col[this.Count] = default; if (this.payload != null) this.payload.col[this.Count] = default; this.hash.col[this.Count] = 0; - this.bitvector.col[this.Count >> 6] |= (1L << (this.Count & 0x3f)); + this.bitvector.col[this.Count >> 6] |= 1L << (this.Count & 0x3f); this.Count++; return this.Count == this.vsync.col.Length; @@ -864,10 +858,7 @@ namespace Microsoft.StreamProcessing this.Count = value.Count; this.iter = value.iter; - if (swing) - { - return; - } + if (swing) return; value.vsync.IncrementRefCount(1); value.vother.IncrementRefCount(1); @@ -954,7 +945,7 @@ namespace Microsoft.StreamProcessing } } - return (endIndex - startIndex + 1) - hammingWeight; + return endIndex - startIndex + 1 - hammingWeight; } } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessagePoolColumnar.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessagePoolColumnar.cs index db6372a..dcfcea1 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessagePoolColumnar.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/StreamMessagePoolColumnar.cs @@ -31,9 +31,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections } public override string GetStatusReport() - { - return string.Format(CultureInfo.InvariantCulture, "[{0}] Objects Created - {1,5} - Queue Size - {2,5}\t<{3},{4}>", this.createdObjects == this.batchQueue.Count ? " " : "X", this.createdObjects, this.batchQueue.Count, typeof(TKey).GetCSharpSourceSyntax(), typeof(TPayload).GetCSharpSourceSyntax()); - } + => string.Format(CultureInfo.InvariantCulture, "[{0}] Objects Created - {1,5} - Queue Size - {2,5}\t<{3},{4}>", this.createdObjects == this.batchQueue.Count ? " " : "X", this.createdObjects, this.batchQueue.Count, typeof(TKey).GetCSharpSourceSyntax(), typeof(TPayload).GetCSharpSourceSyntax()); public override ColumnPoolBase Leaked => this.createdObjects != this.batchQueue.Count ? this : null; @@ -72,4 +70,4 @@ namespace Microsoft.StreamProcessing.Internal.Collections } } } -#endif +#endif \ No newline at end of file diff --git a/Sources/Core/Microsoft.StreamProcessing/Egress/EgressBoundary.cs b/Sources/Core/Microsoft.StreamProcessing/Egress/EgressBoundary.cs index a9e2903..fed2410 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Egress/EgressBoundary.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Egress/EgressBoundary.cs @@ -56,10 +56,8 @@ namespace Microsoft.StreamProcessing.Internal /// [EditorBrowsable(EditorBrowsableState.Never)] public virtual void ProduceQueryPlan(PlanNode previous) - { - this.activeProcess.RegisterQueryPlan(this.identifier, new EgressPlanNode( + => this.activeProcess.RegisterQueryPlan(this.identifier, new EgressPlanNode( previous, this, typeof(TKey), typeof(TPayload), false, null)); - } /// /// Currently for internal use only - do not use directly. diff --git a/Sources/Core/Microsoft.StreamProcessing/Egress/Temporal/TemporalEgressTransformer.cs b/Sources/Core/Microsoft.StreamProcessing/Egress/Temporal/TemporalEgressTransformer.cs index afe5caa..c380ce6 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Egress/Temporal/TemporalEgressTransformer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Egress/Temporal/TemporalEgressTransformer.cs @@ -105,12 +105,12 @@ namespace Microsoft.StreamProcessing { var template = new TemporalEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "StartEdge", startEdgeObservable.source.Properties.IsColumnar); if (startEdgeObservable.constructor != null) - template.startEdgeFunction = ((x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.startEdgeFunction = (x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { startEdgeObservable.constructor.Parameters[0], x }, { startEdgeObservable.constructor.Parameters[1], y }, - })); + }); var keyType = typeof(Empty); var expandedCode = template.TransformText(); @@ -139,13 +139,13 @@ namespace Microsoft.StreamProcessing { var template = new TemporalEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "Interval", intervalObservable.source.Properties.IsColumnar); if (intervalObservable.constructor != null) - template.intervalFunction = ((x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.intervalFunction = (x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { intervalObservable.constructor.Parameters[0], x }, { intervalObservable.constructor.Parameters[1], y }, { intervalObservable.constructor.Parameters[2], z }, - })); + }); var keyType = typeof(Empty); var expandedCode = template.TransformText(); @@ -202,13 +202,13 @@ namespace Microsoft.StreamProcessing { var template = new TemporalEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "StartEdge", partitionedStartEdgeObservable.source.Properties.IsColumnar); if (partitionedStartEdgeObservable.constructor != null) - template.startEdgeFunction = ((x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.startEdgeFunction = (x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { partitionedStartEdgeObservable.constructor.Parameters[0], "colkey[i].Key" }, { partitionedStartEdgeObservable.constructor.Parameters[0], x }, { partitionedStartEdgeObservable.constructor.Parameters[1], y }, - })); + }); var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey)); var expandedCode = template.TransformText(); @@ -237,14 +237,14 @@ namespace Microsoft.StreamProcessing { var template = new TemporalEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "Interval", partitionedIntervalObservable.source.Properties.IsColumnar); if (partitionedIntervalObservable.constructor != null) - template.intervalFunction = ((x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.intervalFunction = (x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { partitionedIntervalObservable.constructor.Parameters[0], "colkey[i].Key" }, { partitionedIntervalObservable.constructor.Parameters[0], x }, { partitionedIntervalObservable.constructor.Parameters[1], y }, { partitionedIntervalObservable.constructor.Parameters[2], z }, - })); + }); var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey)); var expandedCode = template.TransformText(); diff --git a/Sources/Core/Microsoft.StreamProcessing/Egress/TemporalArray/TemporalArrayEgressTransformer.cs b/Sources/Core/Microsoft.StreamProcessing/Egress/TemporalArray/TemporalArrayEgressTransformer.cs index 8485d3e..1a2b19f 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Egress/TemporalArray/TemporalArrayEgressTransformer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Egress/TemporalArray/TemporalArrayEgressTransformer.cs @@ -120,12 +120,12 @@ namespace Microsoft.StreamProcessing { var template = new TemporalArrayEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "StartEdge", startEdgeObservable.source.Properties.IsColumnar); if (startEdgeObservable.constructor != null) - template.startEdgeFunction = ((x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.startEdgeFunction = (x, y) => startEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { startEdgeObservable.constructor.Parameters[0], x }, { startEdgeObservable.constructor.Parameters[1], y }, - })); + }); var keyType = typeof(Empty); var expandedCode = template.TransformText(); @@ -165,13 +165,13 @@ namespace Microsoft.StreamProcessing { var template = new TemporalArrayEgressTemplate(typeof(Empty), typeof(TPayload), typeof(TResult), string.Empty, "Interval", intervalObservable.source.Properties.IsColumnar); if (intervalObservable.constructor != null) - template.intervalFunction = ((x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.intervalFunction = (x, y, z) => intervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { intervalObservable.constructor.Parameters[0], x }, { intervalObservable.constructor.Parameters[1], y }, { intervalObservable.constructor.Parameters[2], z }, - })); + }); var keyType = typeof(Empty); var expandedCode = template.TransformText(); @@ -250,13 +250,13 @@ namespace Microsoft.StreamProcessing { var template = new TemporalArrayEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "StartEdge", partitionedStartEdgeObservable.source.Properties.IsColumnar); if (partitionedStartEdgeObservable.constructor != null) - template.startEdgeFunction = ((x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.startEdgeFunction = (x, y) => partitionedStartEdgeObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { partitionedStartEdgeObservable.constructor.Parameters[0], "colkey[i].Key" }, { partitionedStartEdgeObservable.constructor.Parameters[0], x }, { partitionedStartEdgeObservable.constructor.Parameters[1], y }, - })); + }); var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey)); var expandedCode = template.TransformText(); @@ -296,14 +296,14 @@ namespace Microsoft.StreamProcessing { var template = new TemporalArrayEgressTemplate(typeof(TKey), typeof(TPayload), typeof(TResult), "Partitioned", "Interval", partitionedIntervalObservable.source.Properties.IsColumnar); if (partitionedIntervalObservable.constructor != null) - template.intervalFunction = ((x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( + template.intervalFunction = (x, y, z) => partitionedIntervalObservable.constructor.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { partitionedIntervalObservable.constructor.Parameters[0], "colkey[i].Key" }, { partitionedIntervalObservable.constructor.Parameters[0], x }, { partitionedIntervalObservable.constructor.Parameters[1], y }, { partitionedIntervalObservable.constructor.Parameters[2], z }, - })); + }); var keyType = typeof(PartitionKey<>).MakeGenericType(typeof(TKey)); var expandedCode = template.TransformText(); diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/Atemporal/AtemporalIngressStreamable.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/Atemporal/AtemporalIngressStreamable.cs index 922bac6..5545a42 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Ingress/Atemporal/AtemporalIngressStreamable.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/Atemporal/AtemporalIngressStreamable.cs @@ -54,19 +54,13 @@ namespace Microsoft.StreamProcessing public override IDisposable Subscribe(IStreamObserver observer) { Contract.EnsuresOnThrow(true); - IIngressStreamObserver subscription; - if (this.timelinePolicy.timelineEnum == TimelineEnum.WallClock) - { - subscription = new MonotonicSubscriptionWallClock(this.observable, this.IngressSiteIdentifier, + var subscription = this.timelinePolicy.timelineEnum == TimelineEnum.WallClock + ? new MonotonicSubscriptionWallClock(this.observable, this.IngressSiteIdentifier, + this, + observer, this.flushPolicy, this.punctuationPolicy, this.onCompletedPolicy, this.timelinePolicy) + : (IIngressStreamObserver)new MonotonicSubscriptionSequence(this.observable, this.IngressSiteIdentifier, this, observer, this.flushPolicy, this.punctuationPolicy, this.onCompletedPolicy, this.timelinePolicy); - } - else - { - subscription = new MonotonicSubscriptionSequence(this.observable, this.IngressSiteIdentifier, - this, - observer, this.flushPolicy, this.punctuationPolicy, this.onCompletedPolicy, this.timelinePolicy); - } if (this.delayed) { diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/AtemporalArray/AtemporalArrayIngressStreamable.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/AtemporalArray/AtemporalArrayIngressStreamable.cs index ebfdebc..362490d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Ingress/AtemporalArray/AtemporalArrayIngressStreamable.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/AtemporalArray/AtemporalArrayIngressStreamable.cs @@ -48,19 +48,13 @@ namespace Microsoft.StreamProcessing public override IDisposable Subscribe(IStreamObserver observer) { Contract.EnsuresOnThrow(true); - IIngressStreamObserver subscription; - if (this.timelinePolicy.timelineEnum == TimelineEnum.WallClock) - { - subscription = new MonotonicArraySubscriptionWallClock(this.observable, this.IngressSiteIdentifier, + var subscription = this.timelinePolicy.timelineEnum == TimelineEnum.WallClock + ? new MonotonicArraySubscriptionWallClock(this.observable, this.IngressSiteIdentifier, + this, + observer, this.onCompletedPolicy, this.timelinePolicy) + : (IIngressStreamObserver)new MonotonicArraySubscriptionSequence(this.observable, this.IngressSiteIdentifier, this, observer, this.onCompletedPolicy, this.timelinePolicy); - } - else - { - subscription = new MonotonicArraySubscriptionSequence(this.observable, this.IngressSiteIdentifier, - this, - observer, this.onCompletedPolicy, this.timelinePolicy); - } if (this.delayed) { @@ -76,5 +70,4 @@ namespace Microsoft.StreamProcessing public string IngressSiteIdentifier { get; } } - } \ No newline at end of file diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressTransformer.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressTransformer.cs index 9ad9a91..9c89d9d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressTransformer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressTransformer.cs @@ -45,10 +45,8 @@ namespace Microsoft.StreamProcessing string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression, + => Generate("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple GenerateFused( Expression> startEdgeExpression, @@ -56,10 +54,8 @@ namespace Microsoft.StreamProcessing string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression, + => Generate("Interval", string.Empty, null, startEdgeExpression, endEdgeExpression, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple Generate( Expression> partitionExpression, @@ -68,10 +64,8 @@ namespace Microsoft.StreamProcessing string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression, + => Generate("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple GenerateFused( Expression> partitionExpression, @@ -80,46 +74,36 @@ namespace Microsoft.StreamProcessing string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression, + => Generate("Interval", "Partitioned", partitionExpression, startEdgeExpression, endEdgeExpression, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple Generate( string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("StreamEvent", string.Empty, null, null, null, + => Generate("StreamEvent", string.Empty, null, null, null, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple GenerateFused( string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("StreamEvent", string.Empty, null, null, null, + => Generate("StreamEvent", string.Empty, null, null, null, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple Generate( string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("StreamEvent", "Partitioned", null, null, null, + => Generate("StreamEvent", "Partitioned", null, null, null, latencyOption, diagnosticOption, fuseModule); - } internal static Tuple GenerateFused( string latencyOption, string diagnosticOption, FuseModule fuseModule) - { - return Generate("StreamEvent", "Partitioned", null, null, null, + => Generate("StreamEvent", "Partitioned", null, null, null, latencyOption, diagnosticOption, fuseModule); - } private static Tuple Generate( string ingressType, @@ -158,25 +142,25 @@ namespace Microsoft.StreamProcessing template.ingressType = ingressType; template.latencyOption = latencyOption; - template.partitionFunction = (x => partitionExpression == null ? + template.partitionFunction = x => partitionExpression == null ? string.Empty : partitionExpression.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { partitionExpression.Parameters.Single(), x } - })); - template.startEdgeFunction = (x => startEdgeExpression == null ? + }); + template.startEdgeFunction = x => startEdgeExpression == null ? string.Empty : startEdgeExpression.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { startEdgeExpression.Parameters.Single(), x } - })); - template.endEdgeFunction = (x => endEdgeExpression == null ? + }); + template.endEdgeFunction = x => endEdgeExpression == null ? "StreamEvent.InfinitySyncTime" : endEdgeExpression.Body.ExpressionToCSharpStringWithParameterSubstitution( new Dictionary { { endEdgeExpression.Parameters.Single(), x } - })); + }); template.fusionOption = fuseModule.IsEmpty ? "Simple" : (Config.AllowFloatingReorderPolicy ? "Disordered" : "Fused"); diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs index 540c5b4..4e57f29 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs @@ -24,9 +24,7 @@ namespace Microsoft.StreamProcessing /// /// public static Afa Create(TRegister defaultRegister = default, TAccumulator defaultAccumulator = default) - { - return new Afa(defaultRegister, defaultAccumulator); - } + => new Afa(defaultRegister, defaultAccumulator); /// /// @@ -36,9 +34,7 @@ namespace Microsoft.StreamProcessing /// /// public static Afa Create(TRegister defaultRegister = default) - { - return new Afa(defaultRegister); - } + => new Afa(defaultRegister); } @@ -107,9 +103,7 @@ namespace Microsoft.StreamProcessing /// An added condition that must be met for the transition to occur /// An expression to mutate the register value when the transition occurs public void AddSingleElementArc(int fromState, int toState, Expression> fence, Expression> transfer = null) - { - AddArc(fromState, toState, new SingleElementArc { Fence = fence, Transfer = transfer }); - } + => AddArc(fromState, toState, new SingleElementArc { Fence = fence, Transfer = transfer }); /// /// Adds a transition to the AFA triggered by a list of concurrent elements @@ -119,9 +113,7 @@ namespace Microsoft.StreamProcessing /// An added condition that must be met for the transition to occur /// An expression to mutate the register value when the transition occurs public void AddListElementArc(int fromState, int toState, Expression, TRegister, bool>> fence, Expression, TRegister, TRegister>> transfer = null) - { - AddArc(fromState, toState, new ListElementArc { Fence = fence, Transfer = transfer }); - } + => AddArc(fromState, toState, new ListElementArc { Fence = fence, Transfer = transfer }); /// /// Adds an epsilon (no action) arc to the AFA @@ -129,9 +121,7 @@ namespace Microsoft.StreamProcessing /// Starting state of the transition /// Ending state of the transition public void AddEpsilonElementArc(int fromState, int toState) - { - AddArc(fromState, toState, new EpsilonArc()); - } + => AddArc(fromState, toState, new EpsilonArc()); /// /// Adds a transition that handles multiple elements (events) at a given timestamp @@ -151,12 +141,15 @@ namespace Microsoft.StreamProcessing Expression> fence = null, Expression> transfer = null, Expression> dispose = null) - { - AddArc(fromState, toState, new MultiElementArc + => AddArc(fromState, toState, new MultiElementArc { - Initialize = initialize, Accumulate = accumulate, SkipToEnd = skipToEnd, Fence = fence, Transfer = transfer, Dispose = dispose + Initialize = initialize, + Accumulate = accumulate, + SkipToEnd = skipToEnd, + Fence = fence, + Transfer = transfer, + Dispose = dispose }); - } /// /// Adds an arc to the AFA. @@ -249,10 +242,7 @@ namespace Microsoft.StreamProcessing /// Set default value of register. /// /// - public void SetDefaultRegister(TRegister register) - { - this.DefaultRegister = register; - } + public void SetDefaultRegister(TRegister register) => this.DefaultRegister = register; internal CompiledAfa Compile() => new CompiledAfa(this); diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_SingleEvent.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_SingleEvent.cs index ccb4a02..172d239 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_SingleEvent.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_SingleEvent.cs @@ -45,7 +45,7 @@ namespace Microsoft.StreamProcessing var stack = new Stack(); var activeFindTraverser = new FastLinkedList>.ListTraverser(this.activeStates); var tentativeFindTraverser = new FastLinkedList>.ListTraverser(this.tentativeOutput); - var tentativeVisibleTraverser = new FastLinkedList>.VisibleTraverser(this.tentativeOutput); + var tentativeOutputIndex = 0; var count = batch.Count; @@ -74,11 +74,11 @@ namespace Microsoft.StreamProcessing if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out int index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = this.tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; dest_vsync[this.iter] = this.lastSyncTime; dest_vother[this.iter] = elem.other; @@ -148,10 +148,9 @@ namespace Microsoft.StreamProcessing if (arcinfo.Fence(synctime, batch[i], state.register)) { - TRegister newReg; - if (arcinfo.Transfer == null) newReg = state.register; - else newReg = arcinfo.Transfer(synctime, batch[i], state.register); - + var newReg = arcinfo.Transfer == null + ? state.register + : arcinfo.Transfer(synctime, batch[i], state.register); int ns = arcinfo.toState; while (true) { @@ -228,10 +227,9 @@ namespace Microsoft.StreamProcessing var arcinfo = startStateMap[cnt]; if (arcinfo.Fence(synctime, batch[i], this.defaultRegister)) { - TRegister newReg; - if (arcinfo.Transfer == null) newReg = this.defaultRegister; - else newReg = arcinfo.Transfer(synctime, batch[i], this.defaultRegister); - + var newReg = arcinfo.Transfer == null + ? this.defaultRegister + : arcinfo.Transfer(synctime, batch[i], this.defaultRegister); int ns = arcinfo.toState; while (true) { @@ -300,11 +298,11 @@ namespace Microsoft.StreamProcessing if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out int index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = this.tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; this.batch.vsync.col[this.iter] = this.lastSyncTime; this.batch.vother.col[this.iter] = elem.other; @@ -312,10 +310,7 @@ namespace Microsoft.StreamProcessing this.batch.hash.col[this.iter] = 0; this.iter++; - if (this.iter == Config.DataBatchSize) - { - FlushContents(); - } + if (this.iter == Config.DataBatchSize) FlushContents(); } this.tentativeOutput.Clear(); // Clear the tentative output list diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedDAfaPipe_SingleEvent.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedDAfaPipe_SingleEvent.cs index 745fc55..b33ee03 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedDAfaPipe_SingleEvent.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedDAfaPipe_SingleEvent.cs @@ -50,7 +50,7 @@ namespace Microsoft.StreamProcessing public override unsafe void OnNext(StreamMessage batch) { var tentativeFindTraverser = new FastLinkedList>.ListTraverser(this.tentativeOutput); - var tentativeVisibleTraverser = new FastLinkedList>.VisibleTraverser(this.tentativeOutput); + var tentativeOutputIndex = 0; var count = batch.Count; @@ -79,11 +79,11 @@ namespace Microsoft.StreamProcessing if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out int index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = this.tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; dest_vsync[this.iter] = this.lastSyncTime; dest_vother[this.iter] = elem.other; @@ -205,10 +205,9 @@ namespace Microsoft.StreamProcessing var arcinfo = startStateMap[cnt]; if (arcinfo.Fence(synctime, batch[i], this.defaultRegister)) { - if (arcinfo.Transfer != null) - this.activeState_register = arcinfo.Transfer(synctime, batch[i], this.defaultRegister); - else - this.activeState_register = this.defaultRegister; + this.activeState_register = arcinfo.Transfer != null + ? arcinfo.Transfer(synctime, batch[i], this.defaultRegister) + : this.defaultRegister; this.activeState_state = arcinfo.toState; if (this.isFinal[this.activeState_state]) @@ -264,11 +263,11 @@ namespace Microsoft.StreamProcessing if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out int index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = this.tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; this.batch.vsync.col[this.iter] = this.lastSyncTime; this.batch.vother.col[this.iter] = elem.other; @@ -276,10 +275,7 @@ namespace Microsoft.StreamProcessing this.batch.hash.col[this.iter] = 0; this.iter++; - if (this.iter == Config.DataBatchSize) - { - FlushContents(); - } + if (this.iter == Config.DataBatchSize) FlushContents(); } this.tentativeOutput.Clear(); // Clear the tentative output list @@ -296,9 +292,6 @@ namespace Microsoft.StreamProcessing batch.Free(); } - protected override void UpdatePointers() - { - this.startState = this.startStates[0]; - } + protected override void UpdatePointers() => this.startState = this.startStates[0]; } } \ No newline at end of file diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.cs index 864371a..4531251 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.cs @@ -91,10 +91,8 @@ using Microsoft.StreamProcessing.Internal.Collections; this.Write(">>.ListTraverser(activeStates);\r\n var tentativeFindTraverser = new FastLin" + "kedList>.ListTraverser(tentativeOutput);\r\n var tentativeVisibleTraverser = new F" + - "astLinkedList>.VisibleTraverser(tentativeOutput);\r\n\r\n "); + this.Write(">>.ListTraverser(tentativeOutput);\r\n var tentativeOutputIndex = 0;\r\n\r\n " + + " "); this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName)); this.Write(" sourceBatch = batch as "); this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName)); @@ -147,11 +145,11 @@ using Microsoft.StreamProcessing.Internal.Collections; if (tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = tentativeOutput.Values[tentativeOutputIndex]; dest_vsync[iter] = lastSyncTime; dest_vother[iter] = elem.other; @@ -314,13 +312,13 @@ using Microsoft.StreamProcessing.Internal.Collections; int index, hash; seenEvent = 0; - if (tentativeOutput.Count > 0) + if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; this.batch.vsync.col[iter] = lastSyncTime; this.batch.vother.col[iter] = elem.other; diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.tt b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.tt index 64dfe5b..7e663a3 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.tt +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedAfaTemplate.tt @@ -60,7 +60,7 @@ public sealed class <#= className #> : CompiledAfaPipeBase stack = new Stack(); var activeFindTraverser = new FastLinkedList>>.ListTraverser(activeStates); var tentativeFindTraverser = new FastLinkedList>>.ListTraverser(tentativeOutput); - var tentativeVisibleTraverser = new FastLinkedList>>.VisibleTraverser(tentativeOutput); + var tentativeOutputIndex = 0; <#= sourceBatchTypeName #> sourceBatch = batch as <#= sourceBatchTypeName #>; <#= resultBatchTypeName #> resultBatch = this.batch as <#= resultBatchTypeName #>; @@ -103,11 +103,11 @@ public sealed class <#= className #> : CompiledAfaPipeBase 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = tentativeOutput.Values[tentativeOutputIndex]; dest_vsync[iter] = lastSyncTime; dest_vother[iter] = elem.other; @@ -261,13 +261,13 @@ public sealed class <#= className #> : CompiledAfaPipeBase 0) + if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; this.batch.vsync.col[iter] = lastSyncTime; this.batch.vother.col[iter] = elem.other; diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.cs index 0e42b2c..3c280f9 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.cs @@ -93,10 +93,8 @@ using Microsoft.StreamProcessing.Internal.Collections; " var tentativeFindTraverser = new FastLinkedList>.ListTraverser(tentativeOutput);\r\n var tentativeVisibleTraverser = n" + - "ew FastLinkedList>.VisibleTraverser(tentativeOutput);\r\n\r\n "); + this.Write(">>.ListTraverser(tentativeOutput);\r\n var tentativeOutputIndex = 0;\r\n\r\n" + + " "); this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName)); this.Write(" sourceBatch = batch as "); this.Write(this.ToStringHelper.ToStringWithCulture(sourceBatchTypeName)); @@ -147,13 +145,13 @@ using Microsoft.StreamProcessing.Internal.Collections; { seenEvent = 0; - if (tentativeOutput.Count > 0) + if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; dest_vsync[iter] = lastSyncTime; dest_vother[iter] = elem.other; @@ -302,13 +300,13 @@ using Microsoft.StreamProcessing.Internal.Collections; int index, hash; seenEvent = 0; - if (tentativeOutput.Count > 0) + if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; this.batch.vsync.col[iter] = lastSyncTime; this.batch.vother.col[iter] = elem.other; diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.tt b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.tt index b56baa9..7b573e8 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.tt +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/UngroupedDAfaTemplate.tt @@ -64,7 +64,7 @@ using Microsoft.StreamProcessing.Internal.Collections; { Stack stack = new Stack(); var tentativeFindTraverser = new FastLinkedList>>.ListTraverser(tentativeOutput); - var tentativeVisibleTraverser = new FastLinkedList>>.VisibleTraverser(tentativeOutput); + var tentativeOutputIndex = 0; <#= sourceBatchTypeName #> sourceBatch = batch as <#= sourceBatchTypeName #>; <#= resultBatchTypeName #> resultBatch = this.batch as <#= resultBatchTypeName #>; @@ -105,13 +105,13 @@ using Microsoft.StreamProcessing.Internal.Collections; { seenEvent = 0; - if (tentativeOutput.Count > 0) + if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; dest_vsync[iter] = lastSyncTime; dest_vother[iter] = elem.other; @@ -253,13 +253,13 @@ using Microsoft.StreamProcessing.Internal.Collections; int index, hash; seenEvent = 0; - if (tentativeOutput.Count > 0) + if (this.tentativeOutput.Count > 0) { - tentativeVisibleTraverser.currIndex = 0; + tentativeOutputIndex = 0; - while (tentativeVisibleTraverser.Next(out index)) + while (this.tentativeOutput.Iterate(ref tentativeOutputIndex)) { - var elem = tentativeOutput.Values[index]; + var elem = this.tentativeOutput.Values[tentativeOutputIndex]; this.batch.vsync.col[iter] = lastSyncTime; this.batch.vother.col[iter] = elem.other; diff --git a/Sources/Core/Microsoft.StreamProcessing/Pipes/BinaryPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Pipes/BinaryPipe.cs index 9c5363a..bcff1bf 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Pipes/BinaryPipe.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Pipes/BinaryPipe.cs @@ -613,5 +613,4 @@ namespace Microsoft.StreamProcessing } } } - } diff --git a/Sources/Core/Microsoft.StreamProcessing/QueryContainer.cs b/Sources/Core/Microsoft.StreamProcessing/QueryContainer.cs index be72adf..d6e6429 100644 --- a/Sources/Core/Microsoft.StreamProcessing/QueryContainer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/QueryContainer.cs @@ -312,9 +312,6 @@ namespace Microsoft.StreamProcessing } } - internal void RegisterQueryPlan(string identifier, PlanNode node) - { - this.queryPlans.Add(identifier, node); - } + internal void RegisterQueryPlan(string identifier, PlanNode node) => this.queryPlans.Add(identifier, node); } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryBase.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryBase.cs index 368e8cb..6d62fe5 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryBase.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryBase.cs @@ -24,13 +24,13 @@ namespace Microsoft.StreamProcessing.Serializer if (size == Config.DataBatchSize) { var pool = (ColumnPool)columnPools.GetOrAdd(Tuple.Create(typeof(T), Config.DataBatchSize), t => MemoryManager.GetColumnPool(t.Item2)); - pool.Get(out ColumnBatch result); + pool.Get(out var result); return result; } else if (typeof(T) == typeof(long) && size == (1 + (Config.DataBatchSize >> 6))) // bitvector { var pool = (ColumnPool)bitVectorPools.GetOrAdd(size, t => MemoryManager.GetBVPool(size)); // TODO: Push magic incantation "1 + (Config.DataBatchSize >> 6)" into method call - pool.Get(out ColumnBatch result); + pool.Get(out var result); return result; } return new ColumnBatch(size); @@ -41,13 +41,13 @@ namespace Microsoft.StreamProcessing.Serializer if (size == Config.DataBatchSize) { var pool = (ColumnPool)columnPools.GetOrAdd(Tuple.Create(typeof(T), Config.DataBatchSize), t => MemoryManager.GetColumnPool(t.Item2)); - pool.Get(out ColumnBatch result); + pool.Get(out var result); return result.col; } else if (typeof(T) == typeof(long) && size == (1 + (Config.DataBatchSize >> 6))) // bitvector { var pool = (ColumnPool)bitVectorPools.GetOrAdd(size, t => MemoryManager.GetBVPool(size)); // TODO: Push magic incantation "1 + (Config.DataBatchSize >> 6)" into method call - pool.Get(out ColumnBatch result); + pool.Get(out var result); return result.col; } return new T[size]; diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryDecoder.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryDecoder.cs index 35e0852..7aa7c99 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryDecoder.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryDecoder.cs @@ -225,8 +225,8 @@ namespace Microsoft.StreamProcessing.Serializer Buffer.BlockCopy(buffer, 0, blob, 0, buffer.Length); } - private static DoublingArrayPool bytePool; - private static CharArrayPool charArrayPool; + private static readonly Lazy> bytePool = new Lazy>(MemoryManager.GetDoublingArrayPool); + private static readonly Lazy charArrayPool = new Lazy(MemoryManager.GetCharArrayPool); public CharArrayWrapper Decode_CharArrayWrapper() { @@ -234,12 +234,11 @@ namespace Microsoft.StreamProcessing.Serializer int usedLength = DecodeInt(); int length = DecodeInt(); - if (charArrayPool == null) charArrayPool = MemoryManager.GetCharArrayPool(); CharArrayWrapper result; if (usedLength == 0) - charArrayPool.Get(out result, 1); + charArrayPool.Value.Get(out result, 1); else - charArrayPool.Get(out result, usedLength); + charArrayPool.Value.Get(out result, usedLength); if (header == 0) { @@ -249,19 +248,17 @@ namespace Microsoft.StreamProcessing.Serializer } else { - if (bytePool == null) bytePool = MemoryManager.GetDoublingArrayPool(); - var encodedSize = DecodeInt(); byte[] buffer; if (encodedSize > 0) - bytePool.Get(out buffer, encodedSize); + bytePool.Value.Get(out buffer, encodedSize); else - bytePool.Get(out buffer, 1); + bytePool.Value.Get(out buffer, 1); this.stream.ReadAllRequiredBytes(buffer, 0, encodedSize); Encoding.UTF8.GetChars(buffer, 0, encodedSize, result.charArray.content, 0); - bytePool.Return(buffer); + bytePool.Value.Return(buffer); } return result; @@ -278,8 +275,6 @@ namespace Microsoft.StreamProcessing.Serializer public ColumnBatch Decode_ColumnBatch_String() { - if (bytePool == null) bytePool = MemoryManager.GetDoublingArrayPool(); - var usedLength = ReadIntFixed(); if (usedLength == 0) { diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryEncoder.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryEncoder.cs index cbe26ad..e4a2f4d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryEncoder.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Encoders/BinaryEncoder.cs @@ -162,7 +162,7 @@ namespace Microsoft.StreamProcessing.Serializer this.stream.Write(buffer, 0, buffer.Length); } - private static DoublingArrayPool bytePool; + private static readonly Lazy> bytePool = new Lazy>(MemoryManager.GetDoublingArrayPool); public unsafe void Encode(CharArrayWrapper value) { @@ -176,21 +176,19 @@ namespace Microsoft.StreamProcessing.Serializer if (Config.SerializationCompressionLevel.HasFlag(SerializationCompressionLevel.CharArrayToUTF8)) { - if (bytePool == null) bytePool = MemoryManager.GetDoublingArrayPool(); - byte[] result; int encodedSize = Encoding.UTF8.GetByteCount(value.charArray.content, 0, value.UsedLength); Encode(encodedSize); if (encodedSize > 0) - bytePool.Get(out result, encodedSize); + bytePool.Value.Get(out result, encodedSize); else - bytePool.Get(out result, 1); + bytePool.Value.Get(out result, 1); Encoding.UTF8.GetBytes(value.charArray.content, 0, value.UsedLength, result, 0); this.stream.Write(result, 0, encodedSize); - bytePool.Return(result); + bytePool.Value.Return(result); } else { @@ -218,9 +216,8 @@ namespace Microsoft.StreamProcessing.Serializer public unsafe void Encode(ColumnBatch value) { - if (bytePool == null || charPool == null || intPool == null || shortPool == null || batchSize != Config.DataBatchSize) + if (charPool == null || intPool == null || shortPool == null || batchSize != Config.DataBatchSize) { - bytePool = MemoryManager.GetDoublingArrayPool(); charPool = MemoryManager.GetCharArrayPool(); intPool = MemoryManager.GetColumnPool(); shortPool = MemoryManager.GetColumnPool(); @@ -245,7 +242,7 @@ namespace Microsoft.StreamProcessing.Serializer if (maxLength <= short.MaxValue) { - shortPool.Get(out ColumnBatch lengths); + shortPool.Get(out var lengths); lengths.UsedLength = value.UsedLength; CharArrayWrapper caw; diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ArraySerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ArraySerializer.cs index c212d2b..ca932d4 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ArraySerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ArraySerializer.cs @@ -17,7 +17,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers protected override Expression BuildSerializerSafe(Expression encoder, Expression value) { - PropertyInfo getLength = this.RuntimeType.GetTypeInfo().GetProperty("Length"); + var getLength = this.RuntimeType.GetTypeInfo().GetProperty("Length"); if (getLength == null) { throw new SerializationException( @@ -28,12 +28,12 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers var body = new List(); - ParameterExpression arrayLength = Expression.Variable(typeof(int), "arrayLength"); + var arrayLength = Expression.Variable(typeof(int), "arrayLength"); body.Add(Expression.Assign(arrayLength, Expression.Property(value, getLength))); body.Add(EncodeArrayChunkMethod.ReplaceParametersInBody(encoder, arrayLength)); - LabelTarget label = Expression.Label(); - ParameterExpression counter = Expression.Variable(typeof(int), "counter"); + var label = Expression.Label(); + var counter = Expression.Variable(typeof(int), "counter"); body.Add(Expression.Assign(counter, ConstantZero)); body.Add( @@ -53,22 +53,22 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers protected override Expression BuildDeserializerSafe(Expression decoder) { - Type arrayType = this.RuntimeType; + var arrayType = this.RuntimeType; - MethodInfo resize = typeof(Array).GetTypeInfo().GetMethod("Resize").MakeGenericMethod(arrayType.GetElementType()); + var resize = typeof(Array).GetTypeInfo().GetMethod("Resize").MakeGenericMethod(arrayType.GetElementType()); var body = new List(); - ParameterExpression result = Expression.Variable(arrayType, "result"); + var result = Expression.Variable(arrayType, "result"); body.Add(Expression.Assign(result, Expression.NewArrayBounds(arrayType.GetElementType(), ConstantZero))); - ParameterExpression index = Expression.Variable(typeof(int), "index"); + var index = Expression.Variable(typeof(int), "index"); body.Add(Expression.Assign(index, ConstantZero)); - ParameterExpression chunkSize = Expression.Variable(typeof(int), "chunkSize"); - ParameterExpression counter = Expression.Variable(typeof(int), "counter"); + var chunkSize = Expression.Variable(typeof(int), "chunkSize"); + var counter = Expression.Variable(typeof(int), "counter"); - LabelTarget chunkReadLoop = Expression.Label(); - LabelTarget arrayReadLoop = Expression.Label(); + var chunkReadLoop = Expression.Label(); + var arrayReadLoop = Expression.Label(); body.Add( Expression.Loop( diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ClassSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ClassSerializer.cs index 94bdbfd..8d8dedf 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ClassSerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ClassSerializer.cs @@ -66,15 +66,15 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers private Expression> GenerateCachedDeserializer() { - ParameterExpression decoderParam = Expression.Parameter(typeof(BinaryDecoder), "decoder"); - ParameterExpression instance = Expression.Variable(this.RuntimeType, "instance"); + var decoderParam = Expression.Parameter(typeof(BinaryDecoder), "decoder"); + var instance = Expression.Variable(this.RuntimeType, "instance"); var body = new List(); if (this.RuntimeType.HasSupportedParameterizedConstructor()) { // Cannot create an object beforehand. Have to call a constructor with parameters. var properties = this.fields.Select(f => f.Schema.BuildDeserializer(decoderParam)); - ConstructorInfo ctor = this.RuntimeType.GetTypeInfo() + var ctor = this.RuntimeType.GetTypeInfo() .GetConstructors() .Single(c => c.GetParameters().Select(p => p.ParameterType).SequenceEqual(this.fields.Select(f => f.Schema.RuntimeType))); body.Add(Expression.Assign(instance, Expression.New(ctor, properties))); @@ -88,15 +88,15 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers } body.Add(instance); - BlockExpression result = Expression.Block(new[] { instance }, body); + var result = Expression.Block(new[] { instance }, body); return Expression.Lambda>(result, decoderParam); } private Action GenerateCachedSerializer() { - ParameterExpression instanceParam = Expression.Parameter(this.RuntimeType, "instance"); - ParameterExpression encoderParam = Expression.Parameter(typeof(BinaryEncoder), "encoder"); - Expression block = SerializeFields(encoderParam, instanceParam); + var instanceParam = Expression.Parameter(this.RuntimeType, "instance"); + var encoderParam = Expression.Parameter(typeof(BinaryEncoder), "encoder"); + var block = SerializeFields(encoderParam, instanceParam); var lambda = Expression.Lambda>(block, instanceParam, encoderParam); return lambda.Compile(); } @@ -131,17 +131,15 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers public static Expression ThrowUnexpectedNullCheckExpression(Type type) { - MethodInfo exceptionMethodInfo = typeof(ClassSerializer).GetMethod("UnexpectedNullValueException", BindingFlags.NonPublic | BindingFlags.Static); + var exceptionMethodInfo = typeof(ClassSerializer).GetMethod("UnexpectedNullValueException", BindingFlags.NonPublic | BindingFlags.Static); return Expression.Throw(Expression.Call(exceptionMethodInfo, Expression.Constant(type))); } private static Exception UnexpectedNullValueException(Type type) - { - return new SerializationException( + => new SerializationException( string.Format( CultureInfo.InvariantCulture, "Unexpected null value for the object of type '{0}'. Please check the schema.", type)); - } } } \ No newline at end of file diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/EnumerableSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/EnumerableSerializer.cs index 8a3153a..ada234b 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/EnumerableSerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/EnumerableSerializer.cs @@ -30,7 +30,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers private static Expression, TItem>> ListAddExpression = (c, i) => c.Add(i); private static Expression, TItem>> CollectionAddExpression = (c, i) => c.Add(i); - private ObjectSerializerBase itemSchema; + private readonly ObjectSerializerBase itemSchema; public EnumerableSerializer(ObjectSerializerBase item) : base(typeof(TCollection)) => this.itemSchema = item; @@ -155,23 +155,21 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers } protected override Expression BuildDeserializerSafe(Expression decoder) - { - if (typeof(ICollection).GetTypeInfo().IsAssignableFrom(typeof(TCollection).GetTypeInfo())) - return BuildDeserializerForCollection(decoder); - return BuildDeserializerForEnumerable(decoder); - } + => typeof(ICollection).GetTypeInfo().IsAssignableFrom(typeof(TCollection).GetTypeInfo()) + ? BuildDeserializerForCollection(decoder) + : BuildDeserializerForEnumerable(decoder); private Expression BuildDeserializerForEnumerable(Expression decoder) { - MethodInfo addElement = GetAddMethod(); + var addElement = GetAddMethod(); var result = Expression.Variable(typeof(TCollection), "result"); var index = Expression.Variable(typeof(int), "index"); var chunkSize = Expression.Variable(typeof(int), "chunkSize"); var counter = Expression.Variable(typeof(int), "counter"); - LabelTarget chunkLoop = Expression.Label(); - LabelTarget enumerableLoop = Expression.Label(); + var chunkLoop = Expression.Label(); + var enumerableLoop = Expression.Label(); return Expression.Block( new[] { result, index, chunkSize, counter }, @@ -196,11 +194,11 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers private Expression BuildDeserializerForCollection(Expression decoder) { - ParameterExpression result = Expression.Variable(typeof(TCollection)); - ParameterExpression currentNumberOfElements = Expression.Variable(typeof(int)); + var result = Expression.Variable(typeof(TCollection)); + var currentNumberOfElements = Expression.Variable(typeof(int)); - ParameterExpression counter = Expression.Variable(typeof(int)); - LabelTarget internalLoopLabel = Expression.Label(); + var counter = Expression.Variable(typeof(int)); + var internalLoopLabel = Expression.Label(); // For types that have a count property, we encode as a single chunk and can thus decode in a single pass. var body = Expression.Block( @@ -223,14 +221,13 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers private MethodInfo GetAddMethod() { - Type type = this.RuntimeType; - MethodInfo result = type.GetMethodByName("Add", typeof(TItem)) - ?? type.GetMethodByName("Enqueue", typeof(TItem)) - ?? type.GetMethodByName("Push", typeof(TItem)); + var result = this.RuntimeType.GetMethodByName("Add", typeof(TItem)) + ?? this.RuntimeType.GetMethodByName("Enqueue", typeof(TItem)) + ?? this.RuntimeType.GetMethodByName("Push", typeof(TItem)); if (result == null) throw new SerializationException( - string.Format(CultureInfo.InvariantCulture, "Collection type '{0}' does not have Add/Enqueue/Push method.", type)); + string.Format(CultureInfo.InvariantCulture, "Collection type '{0}' does not have Add/Enqueue/Push method.", this.RuntimeType)); return result; } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/MultidimensionalArraySerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/MultidimensionalArraySerializer.cs index 3077f5b..636b693 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/MultidimensionalArraySerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/MultidimensionalArraySerializer.cs @@ -33,13 +33,13 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers if (currentRank == maxRank) return this.ItemSchema.BuildSerializer(encoder, Expression.ArrayIndex(value, indexes)); - MethodInfo getLength = this.RuntimeType.GetTypeInfo().GetMethod("GetLength"); - ParameterExpression length = Expression.Variable(typeof(int), "length"); + var getLength = this.RuntimeType.GetTypeInfo().GetMethod("GetLength"); + var length = Expression.Variable(typeof(int), "length"); body.Add(Expression.Assign(length, Expression.Call(value, getLength, new Expression[] { Expression.Constant(currentRank) }))); body.Add(EncodeArrayChunkMethod.ReplaceParametersInBody(encoder, length)); - LabelTarget label = Expression.Label(); - ParameterExpression counter = Expression.Variable(typeof(int), "counter"); + var label = Expression.Label(); + var counter = Expression.Variable(typeof(int), "counter"); body.Add(Expression.Assign(counter, ConstantZero)); indexes.Add(counter); @@ -62,10 +62,10 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers { var body = new List(); - Type type = this.RuntimeType; - Type jaggedType = GenerateJaggedType(this.RuntimeType); + var type = this.RuntimeType; + var jaggedType = GenerateJaggedType(this.RuntimeType); - ParameterExpression deserialized = Expression.Variable(jaggedType, "deserialized"); + var deserialized = Expression.Variable(jaggedType, "deserialized"); body.Add(Expression.Assign(deserialized, GenerateBuildJaggedDeserializer(decoder, jaggedType, 0, type.GetArrayRank()))); var lengths = new List(); @@ -76,7 +76,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers currentObject = Expression.Property(currentObject, "Item", new Expression[] { ConstantZero }); } - ParameterExpression result = Expression.Variable(type, "result"); + var result = Expression.Variable(type, "result"); body.Add(Expression.Assign(result, Expression.NewArrayBounds(type.GetElementType(), lengths))); body.Add(GenerateCopy(new List(), result, deserialized, 0, type.GetArrayRank())); body.Add(result); @@ -86,8 +86,8 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers private static Type GenerateJaggedType(Type sourceType) { int rank = sourceType.GetArrayRank(); - Type elementType = sourceType.GetElementType(); - Type result = elementType; + var elementType = sourceType.GetElementType(); + var result = elementType; for (int i = 0; i < rank; i++) { result = typeof(List<>).MakeGenericType(result); @@ -103,18 +103,18 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers return this.ItemSchema.BuildDeserializer(decoder); } - ParameterExpression result = Expression.Variable(valueType, "result"); - ParameterExpression index = Expression.Variable(typeof(int), "index"); - ParameterExpression chunkSize = Expression.Variable(typeof(int), "chunkSize"); + var result = Expression.Variable(valueType, "result"); + var index = Expression.Variable(typeof(int), "index"); + var chunkSize = Expression.Variable(typeof(int), "chunkSize"); body.Add(Expression.Assign(result, Expression.New(valueType))); body.Add(Expression.Assign(index, ConstantZero)); - LabelTarget internalLoopLabel = Expression.Label(); - ParameterExpression counter = Expression.Variable(typeof(int)); + var internalLoopLabel = Expression.Label(); + var counter = Expression.Variable(typeof(int)); body.Add(Expression.Assign(counter, ConstantZero)); - LabelTarget allRead = Expression.Label(); + var allRead = Expression.Label(); body.Add( Expression.Loop( @@ -155,11 +155,11 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers return Expression.Assign(Expression.ArrayAccess(destination, indexes), source); } - ParameterExpression length = Expression.Variable(typeof(int), "length"); + var length = Expression.Variable(typeof(int), "length"); body.Add(Expression.Assign(length, Expression.Property(source, "Count"))); - LabelTarget label = Expression.Label(); - ParameterExpression counter = Expression.Variable(typeof(int), "counter"); + var label = Expression.Label(); + var counter = Expression.Variable(typeof(int), "counter"); body.Add(Expression.Assign(counter, ConstantZero)); indexes.Add(counter); diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/RecordFieldSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/RecordFieldSerializer.cs index ef83767..e403b1d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/RecordFieldSerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/RecordFieldSerializer.cs @@ -33,7 +33,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers var tmp = Expression.Variable(this.Schema.RuntimeType, Guid.NewGuid().ToString()); var assignment = Expression.Assign(tmp, member); - Expression serialized = this.Schema.BuildSerializer(encoder, tmp); + var serialized = this.Schema.BuildSerializer(encoder, tmp); return Expression.Block(new[] { tmp }, new[] { assignment, serialized }); } @@ -57,9 +57,8 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers } private Expression GetMember(Expression @object) - { - if (this.MemberInfo.isField) return Expression.Field(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName); - else return Expression.Property(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName); - } + => this.MemberInfo.isField + ? Expression.Field(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName) + : Expression.Property(@object, this.MemberInfo.DeclaringType, this.MemberInfo.OriginalName); } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ReflectionSchemaBuilder.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ReflectionSchemaBuilder.cs index a09b80b..f49ca3b 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ReflectionSchemaBuilder.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/ReflectionSchemaBuilder.cs @@ -89,7 +89,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers var surrogate = this.settings.Surrogate; if (surrogate != null) { - if (surrogate.IsSupportedType(type, out MethodInfo serialize, out MethodInfo deserialize)) + if (surrogate.IsSupportedType(type, out var serialize, out var deserialize)) { return new SurrogateSerializer(this.settings, type, serialize, deserialize); } @@ -121,8 +121,8 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers private ObjectSerializerBase CreateNotNullableSchema(Type type, uint currentDepth) { - if (RuntimeTypeToSerializer.TryGetValue(type, out Func p)) return p(); - if (this.seenTypes.TryGetValue(type, out ObjectSerializerBase schema)) return schema; + if (RuntimeTypeToSerializer.TryGetValue(type, out var p)) return p(); + if (this.seenTypes.TryGetValue(type, out var schema)) return schema; var typeInfo = type.GetTypeInfo(); if (typeInfo.IsEnum) return BuildEnumTypeSchema(type); @@ -170,7 +170,7 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers var members = type.ResolveMembers(); foreach (var info in members) { - ObjectSerializerBase fieldSchema = CreateSchema(info.Type, currentDepth + 1); + var fieldSchema = CreateSchema(info.Type, currentDepth + 1); var recordField = new RecordFieldSerializer(fieldSchema, info); record.AddField(recordField); diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/SurrogateSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/SurrogateSerializer.cs index 5611073..71584f7 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/SurrogateSerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/SurrogateSerializer.cs @@ -23,16 +23,17 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers protected override Expression BuildSerializerSafe(Expression encoder, Expression value) { var surrogate = Expression.Constant(this.Surrogate); - Expression stream = Expression.Field(encoder, "stream"); + var stream = Expression.Field(encoder, "stream"); return Expression.Call(surrogate, this.serialize, new[] { value, stream }); } protected override Expression BuildDeserializerSafe(Expression decoder) { var surrogate = Expression.Constant(this.Surrogate); - Expression stream = Expression.Field(decoder, "stream"); + var stream = Expression.Field(decoder, "stream"); return Expression.Call(surrogate, this.deserialize, new[] { stream }); } + private ISurrogate Surrogate { get; } } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/UnionSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/UnionSerializer.cs index 3937d9b..964108d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/UnionSerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/Serializers/UnionSerializer.cs @@ -69,9 +69,9 @@ namespace Microsoft.StreamProcessing.Serializer.Serializers protected override Expression BuildDeserializerSafe(Expression decoder) { - ParameterExpression resultParameter = Expression.Variable(this.RuntimeType, "result"); - ParameterExpression unionTypeParameter = Expression.Variable(typeof(int), "unionType"); - BinaryExpression assignUnionType = Expression.Assign( + var resultParameter = Expression.Variable(this.RuntimeType, "result"); + var unionTypeParameter = Expression.Variable(typeof(int), "unionType"); + var assignUnionType = Expression.Assign( unionTypeParameter, decoderExpression.ReplaceParametersInBody(decoder)); diff --git a/Sources/Core/Microsoft.StreamProcessing/Serializer/StateSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Serializer/StateSerializer.cs index b1fe32c..e3bc51a 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Serializer/StateSerializer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Serializer/StateSerializer.cs @@ -12,40 +12,37 @@ namespace Microsoft.StreamProcessing.Serializer internal sealed class StateSerializer { private readonly ObjectSerializerBase schema; - private Action serialize; - private Func deserialize; + private readonly Lazy> serialize; + private readonly Lazy> deserialize; - internal StateSerializer(ObjectSerializerBase schema) => this.schema = schema ?? throw new ArgumentNullException(nameof(schema)); - - public void Serialize(Stream stream, T obj) + internal StateSerializer(ObjectSerializerBase schema) { - if (this.serialize == null) GenerateSerializer(); - this.serialize(new BinaryEncoder(stream), obj); + this.schema = schema ?? throw new ArgumentNullException(nameof(schema)); + this.serialize = new Lazy>(GenerateSerializer); + this.deserialize = new Lazy>(GenerateDeserializer); } - public T Deserialize(Stream stream) - { - if (this.deserialize == null) GenerateDeserializer(); - return this.deserialize(new BinaryDecoder(stream)); - } + public void Serialize(Stream stream, T obj) => this.serialize.Value(new BinaryEncoder(stream), obj); - private void GenerateSerializer() + public T Deserialize(Stream stream) => this.deserialize.Value(new BinaryDecoder(stream)); + + private Action GenerateSerializer() { var instance = Expression.Parameter(typeof(T), "instance"); var encoder = Expression.Parameter(typeof(BinaryEncoder), "encoder"); var result = this.schema.BuildSerializer(encoder, instance); var lambda = Expression.Lambda>(result, encoder, instance); - this.serialize = lambda.Compile(); + return lambda.Compile(); } - private void GenerateDeserializer() + private Func GenerateDeserializer() { var decoder = Expression.Parameter(typeof(BinaryDecoder), "decoder"); var result = this.schema.BuildDeserializer(decoder); var lambda = Expression.Lambda>(result, decoder); - this.deserialize = lambda.Compile(); + return lambda.Compile(); } } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Streamables/Streamable.cs b/Sources/Core/Microsoft.StreamProcessing/Streamables/Streamable.cs index 684c7cd..adf21e5 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Streamables/Streamable.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Streamables/Streamable.cs @@ -62,8 +62,7 @@ namespace Microsoft.StreamProcessing { public StreamableContract(StreamProperties properties) : base(properties) - { - } + { } public override IDisposable Subscribe(IStreamObserver observer) { diff --git a/Sources/Core/Microsoft.StreamProcessing/Streamables/UnaryStreamable.cs b/Sources/Core/Microsoft.StreamProcessing/Streamables/UnaryStreamable.cs index 153e44d..e42223d 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Streamables/UnaryStreamable.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Streamables/UnaryStreamable.cs @@ -27,7 +27,7 @@ namespace Microsoft.StreamProcessing public override IDisposable Subscribe(IStreamObserver observer) { - IStreamObserver pipe = CreatePipe(observer); + var pipe = CreatePipe(observer); return this.Source.Subscribe(pipe); } diff --git a/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayPool.cs b/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayPool.cs index 29deeab..9de16bd 100644 --- a/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayPool.cs +++ b/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayPool.cs @@ -92,7 +92,7 @@ namespace Microsoft.StreamProcessing.Internal.Collections { if (queue != null) { - while (queue.TryDequeue(out CharArrayWrapper result)) + while (queue.TryDequeue(out var result)) { result = null; Interlocked.Decrement(ref this.createdObjects); diff --git a/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayWrapper.cs b/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayWrapper.cs index d08f78b..f449caa 100644 --- a/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayWrapper.cs +++ b/Sources/Core/Microsoft.StreamProcessing/StringHandling/CharArrayWrapper.cs @@ -41,15 +41,13 @@ namespace Microsoft.StreamProcessing else { Interlocked.Decrement(ref this.RefCount); - pool.Get(out CharArrayWrapper result, this.UsedLength); + pool.Get(out var result, this.UsedLength); if (copyData) { fixed (char* dest = result.charArray.content) + fixed (char* src = this.charArray.content) { - fixed (char* src = this.charArray.content) - { - MyStringBuilder.Wstrcpy(dest, src, this.UsedLength); - } + MyStringBuilder.Wstrcpy(dest, src, this.UsedLength); } } return result; diff --git a/Sources/Core/Microsoft.StreamProcessing/StringHandling/MultiString.cs b/Sources/Core/Microsoft.StreamProcessing/StringHandling/MultiString.cs index 34d5cff..3fbbdcf 100644 --- a/Sources/Core/Microsoft.StreamProcessing/StringHandling/MultiString.cs +++ b/Sources/Core/Microsoft.StreamProcessing/StringHandling/MultiString.cs @@ -47,10 +47,10 @@ namespace Microsoft.StreamProcessing.Internal.Collections [DataMember] private MyStringBuilder msb; - private CharArrayPool charArrayPool; - private ColumnPool intPool; - private ColumnPool shortPool; - private ColumnPool bitvectorPool; + private readonly CharArrayPool charArrayPool; + private readonly ColumnPool intPool; + private readonly ColumnPool shortPool; + private readonly ColumnPool bitvectorPool; /// /// Currently for internal use only - do not use directly. @@ -263,17 +263,6 @@ namespace Microsoft.StreamProcessing.Internal.Collections #endregion - internal void AssignPools(CharArrayPool caPool, ColumnPool intPool, ColumnPool shortPool, ColumnPool bitvectorPool) - { - Contract.Requires(this.State == MultiStringState.Unsealed); - Contract.Ensures(this.State == MultiStringState.Unsealed); - - this.charArrayPool = caPool; - this.intPool = intPool; - this.shortPool = shortPool; - this.bitvectorPool = bitvectorPool; - } - // Clone the multi-string shell only /// /// Currently for internal use only - do not use directly. diff --git a/Sources/Core/Microsoft.StreamProcessing/StringHandling/MyStringBuilder.cs b/Sources/Core/Microsoft.StreamProcessing/StringHandling/MyStringBuilder.cs index 580709f..a9e8a04 100644 --- a/Sources/Core/Microsoft.StreamProcessing/StringHandling/MyStringBuilder.cs +++ b/Sources/Core/Microsoft.StreamProcessing/StringHandling/MyStringBuilder.cs @@ -263,7 +263,7 @@ namespace Microsoft.StreamProcessing private MyStringBuilder FindChunkForIndex(int index) { - MyStringBuilder chunkPrevious = this; + var chunkPrevious = this; while (chunkPrevious.m_ChunkOffset > index) { chunkPrevious = chunkPrevious.m_ChunkPrevious; @@ -291,7 +291,7 @@ namespace Microsoft.StreamProcessing if (this.Length == 0) return resultCaw; - MyStringBuilder chunkPrevious = this; + var chunkPrevious = this; fixed (char* str2 = result) { char* charPtr = str2; @@ -379,7 +379,7 @@ namespace Microsoft.StreamProcessing } else { - MyStringBuilder builder = FindChunkForIndex(value); + var builder = FindChunkForIndex(value); if (builder != this) { int num3 = capacity - builder.m_ChunkOffset; diff --git a/Sources/Core/Microsoft.StreamProcessing/Transformer/EventBatchManager.cs b/Sources/Core/Microsoft.StreamProcessing/Transformer/EventBatchManager.cs index 2e7ddff..c6b39fc 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Transformer/EventBatchManager.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Transformer/EventBatchManager.cs @@ -43,7 +43,7 @@ namespace Microsoft.StreamProcessing if (Config.ForceRowBasedExecution) return new StreamMessage(pool); if (!typeof(TPayload).CanRepresentAsColumnar()) return new StreamMessage(pool); - Type generatedBatchType = GetStreamMessageType(); + var generatedBatchType = GetStreamMessageType(); var instance = Activator.CreateInstance(generatedBatchType, pool); var returnValue = (StreamMessage)instance; diff --git a/Sources/Core/Microsoft.StreamProcessing/Transformer/SelectTransformation.cs b/Sources/Core/Microsoft.StreamProcessing/Transformer/SelectTransformation.cs index 401aba5..168c662 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Transformer/SelectTransformation.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Transformer/SelectTransformation.cs @@ -192,12 +192,12 @@ namespace Microsoft.StreamProcessing this.error = true; return; } - if (!this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi)) + if (!this.parameterInformation.TryGetValue(parameter, out var selectParameter)) { this.error = true; return; } - var columnarField = spi.parameterRepresentation.Fields[fieldOrAutoProp.Name]; + var columnarField = selectParameter.parameterRepresentation.Fields[fieldOrAutoProp.Name]; if (this.resultTypeInformation.noFields) { // Then e.f is of type R (the result type) where R is a primitive type or some type that doesn't get decomposed. @@ -232,12 +232,12 @@ namespace Microsoft.StreamProcessing private ParameterExpression GetIndexVariable(ParameterExpression parameter) { - if (!this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi)) + if (!this.parameterInformation.TryGetValue(parameter, out var selectParameter)) { // then this parameter must be a parameter of an inner lambda or a parameter not being substituted for return parameter; } - var indexVariable = Expression.Variable(typeof(int), spi.IndexVariableName); + var indexVariable = Expression.Variable(typeof(int), selectParameter.IndexVariableName); return indexVariable; } @@ -273,7 +273,7 @@ namespace Microsoft.StreamProcessing { Contract.Assume(parameter.Type == this.resultTypeInformation.RepresentationFor); - if (!this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi)) + if (!this.parameterInformation.TryGetValue(parameter, out var selectParameter)) { if (!hasStartEdge && !this.resultTypeInformation.noFields) { @@ -288,7 +288,7 @@ namespace Microsoft.StreamProcessing } foreach (var resultField in this.resultTypeInformation.AllFields) { - var matchingField = spi.parameterRepresentation.AllFields.First(f => f.OriginalName == resultField.OriginalName); + var matchingField = selectParameter.parameterRepresentation.AllFields.First(f => f.OriginalName == resultField.OriginalName); if (this.noSwingingFields) { var a = GetBatchColumnIndexer(parameter, matchingField); @@ -391,9 +391,9 @@ namespace Microsoft.StreamProcessing { if (node.Expression is ParameterExpression parameter) { - if (this.parameterInformation.TryGetValue(parameter, out SelectParameterInformation spi)) + if (this.parameterInformation.TryGetValue(parameter, out var selectParameter)) { - var columnarField = spi.parameterRepresentation.Fields[member.Name]; + var columnarField = selectParameter.parameterRepresentation.Fields[member.Name]; var a = GetBatchColumnIndexer(parameter, columnarField); return a; } @@ -428,11 +428,11 @@ namespace Microsoft.StreamProcessing protected override Expression VisitParameter(ParameterExpression node) { - if (this.parameterInformation.TryGetValue(node, out SelectParameterInformation spi)) + if (this.parameterInformation.TryGetValue(node, out var selectParameter)) { - if (spi.parameterRepresentation.noFields) + if (selectParameter.parameterRepresentation.noFields) { - var columnarField = spi.parameterRepresentation.PseudoField; + var columnarField = selectParameter.parameterRepresentation.PseudoField; var a = GetBatchColumnIndexer(node, columnarField); return a; } @@ -558,10 +558,9 @@ namespace Microsoft.StreamProcessing var n = methodCall.Arguments.Count; if (n == 1) { - if (firstArgIsChar) // IndexOf/LastIndexOf(char) - firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), 0, StringComparison.Ordinal", firstArgAsCSharpString); - else // IndexOf/LastIndexOf(string) - firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, 0, StringComparison.Ordinal", firstArgAsCSharpString); + firstArgsToMultiStringCall = firstArgIsChar + ? string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), 0, StringComparison.Ordinal", firstArgAsCSharpString) + : string.Format(CultureInfo.InvariantCulture, "{0}, 0, StringComparison.Ordinal", firstArgAsCSharpString); } else { @@ -570,10 +569,9 @@ namespace Microsoft.StreamProcessing { if (methodCall.Arguments.ElementAt(1).Type.Equals(typeof(int))) { - if (firstArgIsChar) // IndexOf/LastIndexOf(char, int) - firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString); - else // IndexOf/LastIndexOf(string, int) - firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString); + firstArgsToMultiStringCall = firstArgIsChar + ? string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString) + : string.Format(CultureInfo.InvariantCulture, "{0}, {1}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString); } else { @@ -590,10 +588,9 @@ namespace Microsoft.StreamProcessing firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}.ToString(), {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString); else { - if (methodCall.Method.GetParameters().ElementAt(2).ParameterType.Equals(typeof(int))) // IndexOf/LastIndexOf(string, int, int) - firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString); - else // IndexOf/LastIndexOf(string, int, StringComparison), - firstArgsToMultiStringCall = string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString); + firstArgsToMultiStringCall = methodCall.Method.GetParameters().ElementAt(2).ParameterType.Equals(typeof(int)) + ? string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}, StringComparison.Ordinal", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString) + : string.Format(CultureInfo.InvariantCulture, "{0}, {1}, {2}", firstArgAsCSharpString, secondArgAsCSharpString, thirdArgAsCSharpString); } } else @@ -612,11 +609,9 @@ namespace Microsoft.StreamProcessing break; } - string s = null; - if (methodName.Equals("Substring")) - s = string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector)", methodToCall, firstArgsToMultiStringCall); - else - s = string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector, false)", methodToCall, firstArgsToMultiStringCall); + var s = methodName.Equals("Substring") + ? string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector)", methodToCall, firstArgsToMultiStringCall) + : string.Format(CultureInfo.InvariantCulture, "{0}({1}, sourceBatch.bitvector, false)", methodToCall, firstArgsToMultiStringCall); return s; } diff --git a/Sources/Core/Microsoft.StreamProcessing/Transformer/TemplateClasses.cs b/Sources/Core/Microsoft.StreamProcessing/Transformer/TemplateClasses.cs index d616a06..638247c 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Transformer/TemplateClasses.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Transformer/TemplateClasses.cs @@ -35,55 +35,46 @@ namespace Microsoft.StreamProcessing public static Assembly SystemCoreDll = typeof(BinaryExpression).GetTypeInfo().Assembly; #if DOTNETCORE - internal static IEnumerable NetCoreAssemblyReferences + internal static IEnumerable GetNetCoreAssemblyReferences() { - get + var allAvailableAssemblies = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) + ? ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(';') + : ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(':'); + + // From: http://source.roslyn.io/#Microsoft.CodeAnalysis.Scripting/ScriptOptions.cs,40 + // These references are resolved lazily. Keep in sync with list in core csi.rsp. + var files = new[] { - if (netCoreAssemblyReferences == null) - { - string[] allAvailableAssemblies; - - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - allAvailableAssemblies = ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(';'); - else - allAvailableAssemblies = ((string)AppContext.GetData("TRUSTED_PLATFORM_ASSEMBLIES")).Split(':'); - - // From: http://source.roslyn.io/#Microsoft.CodeAnalysis.Scripting/ScriptOptions.cs,40 - // These references are resolved lazily. Keep in sync with list in core csi.rsp. - var files = new[] - { - "System.Collections", - "System.Collections.Concurrent", - "System.Console", - "System.Diagnostics.Debug", - "System.Diagnostics.Process", - "System.Diagnostics.StackTrace", - "System.Globalization", - "System.IO", - "System.IO.FileSystem", - "System.IO.FileSystem.Primitives", - "System.Reflection", - "System.Reflection.Extensions", - "System.Reflection.Primitives", - "System.Runtime", - "System.Runtime.Extensions", - "System.Runtime.InteropServices", - "System.Text.Encoding", - "System.Text.Encoding.CodePages", - "System.Text.Encoding.Extensions", - "System.Text.RegularExpressions", - "System.Threading", - "System.Threading.Tasks", - "System.Threading.Tasks.Parallel", - "System.Threading.Thread", - }; - var filteredPaths = allAvailableAssemblies.Where(p => files.Concat(new string[] { "mscorlib", "netstandard", "System.Private.CoreLib", "System.Runtime.Serialization.Primitives", }).Any(f => Path.GetFileNameWithoutExtension(p).Equals(f))); - netCoreAssemblyReferences = filteredPaths.Select(p => MetadataReference.CreateFromFile(p)); - } - return netCoreAssemblyReferences; - } + "System.Collections", + "System.Collections.Concurrent", + "System.Console", + "System.Diagnostics.Debug", + "System.Diagnostics.Process", + "System.Diagnostics.StackTrace", + "System.Globalization", + "System.IO", + "System.IO.FileSystem", + "System.IO.FileSystem.Primitives", + "System.Reflection", + "System.Reflection.Extensions", + "System.Reflection.Primitives", + "System.Runtime", + "System.Runtime.Extensions", + "System.Runtime.InteropServices", + "System.Text.Encoding", + "System.Text.Encoding.CodePages", + "System.Text.Encoding.Extensions", + "System.Text.RegularExpressions", + "System.Threading", + "System.Threading.Tasks", + "System.Threading.Tasks.Parallel", + "System.Threading.Thread", + }; + var filteredPaths = allAvailableAssemblies.Where(p => files.Concat(new string[] { "mscorlib", "netstandard", "System.Private.CoreLib", "System.Runtime.Serialization.Primitives", }).Any(f => Path.GetFileNameWithoutExtension(p).Equals(f))); + return filteredPaths.Select(p => MetadataReference.CreateFromFile(p)); } - private static IEnumerable netCoreAssemblyReferences; + private static readonly Lazy> netCoreAssemblyReferences + = new Lazy>(GetNetCoreAssemblyReferences); #endif /// @@ -103,9 +94,8 @@ namespace Microsoft.StreamProcessing #if DEBUG includeDebugInfo = (Config.CodegenOptions.BreakIntoCodeGen & Config.CodegenOptions.DebugFlags.Operators) != 0; #endif - if (includeDebugInfo) - { - return string.Format( + return includeDebugInfo + ? string.Format( CultureInfo.InvariantCulture, @" static {0}() {{ @@ -113,9 +103,8 @@ namespace Microsoft.StreamProcessing System.Diagnostics.Debugger.Break(); else System.Diagnostics.Debugger.Launch(); - }}", className); - } - else return string.Empty; + }}", className) + : string.Empty; } /// @@ -240,7 +229,7 @@ namespace Microsoft.StreamProcessing .Concat(uniqueReferences.Where(reference => metadataReferenceCache.ContainsKey(reference)).Select(reference => metadataReferenceCache[reference])); #if DOTNETCORE - refs = refs.Concat(NetCoreAssemblyReferences); + refs = refs.Concat(netCoreAssemblyReferences.Value); #endif var options = new CSharpCompilationOptions( @@ -255,7 +244,6 @@ namespace Microsoft.StreamProcessing var ignoreAccessibility = binderFlagsType.GetTypeInfo().GetField("IgnoreAccessibility", BindingFlags.Static | BindingFlags.Public); topLevelBinderFlagsProperty.SetValue(options, (uint)ignoreCorLibraryDuplicatedTypesMember.GetValue(null) | (uint)ignoreAccessibility.GetValue(null)); - var compilation = CSharpCompilation.Create(assemblyName, trees, refs, options); var a = EmitCompilationAndLoadAssembly(compilation, includeDebugInfo, out errorMessages); @@ -426,6 +414,7 @@ namespace System.Runtime.CompilerServices private static int BatchClassSequenceNumber = 0; private static SafeConcurrentDictionary batchType2Name = new SafeConcurrentDictionary(); + internal static string GetBatchClassName(Type keyType, Type payloadType) { if (!payloadType.CanRepresentAsColumnar()) @@ -704,10 +693,7 @@ namespace System.Runtime.CompilerServices this.DeclaringType = t.DeclaringType; } - public override string ToString() - { - return "|" + this.TypeName + ", " + this.Name + "|"; - } + public override string ToString() => "|" + this.TypeName + ", " + this.Name + "|"; } internal partial class SafeBatchTemplate @@ -810,53 +796,23 @@ namespace System.Runtime.CompilerServices { public abstract string TransformText(); #region Fields - private StringBuilder generationEnvironmentField; - private List indentLengthsField; private bool endsWithNewline; - private IDictionary sessionField; #endregion #region Properties /// /// The string builder that generation-time code is using to assemble generated output /// - protected StringBuilder GenerationEnvironment - { - get - { - if ((this.generationEnvironmentField == null)) this.generationEnvironmentField = new StringBuilder(); - return this.generationEnvironmentField; - } - set - { - this.generationEnvironmentField = value; - } - } + protected StringBuilder GenerationEnvironment { get; } = new StringBuilder(); /// /// A list of the lengths of each indent that was added with PushIndent /// - private List IndentLengths - { - get - { - if (this.indentLengthsField == null) this.indentLengthsField = new List(); - return this.indentLengthsField; - } - } + private List IndentLengths { get; } = new List(); /// /// Gets the current indent we use when adding lines to the output /// public string CurrentIndent { get; private set; } = string.Empty; - - /// - /// Current transformation session - /// - public virtual IDictionary Session - { - get => this.sessionField; - set => this.sessionField = value; - } #endregion #region Transform-time helpers /// @@ -868,8 +824,7 @@ namespace System.Runtime.CompilerServices // If we're starting off, or if the previous text ended with a newline, // we have to append the current indent first. - if (((this.GenerationEnvironment.Length == 0) - || this.endsWithNewline)) + if ((this.GenerationEnvironment.Length == 0) || this.endsWithNewline) { this.GenerationEnvironment.Append(this.CurrentIndent); this.endsWithNewline = false; @@ -881,7 +836,7 @@ namespace System.Runtime.CompilerServices } // This is an optimization. If the current indent is "", then we don't have to do any // of the more complex stuff further down. - if ((this.CurrentIndent.Length == 0)) + if (this.CurrentIndent.Length == 0) { this.GenerationEnvironment.Append(textToAppend); return; @@ -933,18 +888,19 @@ namespace System.Runtime.CompilerServices public string PopIndent() { string returnValue = string.Empty; - if ((this.IndentLengths.Count > 0)) + if (this.IndentLengths.Count > 0) { - int indentLength = this.IndentLengths[(this.IndentLengths.Count - 1)]; - this.IndentLengths.RemoveAt((this.IndentLengths.Count - 1)); - if ((indentLength > 0)) + int indentLength = this.IndentLengths[this.IndentLengths.Count - 1]; + this.IndentLengths.RemoveAt(this.IndentLengths.Count - 1); + if (indentLength > 0) { - returnValue = this.CurrentIndent.Substring((this.CurrentIndent.Length - indentLength)); - this.CurrentIndent = this.CurrentIndent.Remove((this.CurrentIndent.Length - indentLength)); + returnValue = this.CurrentIndent.Substring(this.CurrentIndent.Length - indentLength); + this.CurrentIndent = this.CurrentIndent.Remove(this.CurrentIndent.Length - indentLength); } } return returnValue; } + /// /// Remove any indentation /// @@ -958,7 +914,7 @@ namespace System.Runtime.CompilerServices /// /// Utility class to produce culture-oriented representation of an object as a string. /// - public class ToStringInstanceHelper + public sealed class ToStringInstanceHelper { private IFormatProvider formatProviderField = CultureInfo.InvariantCulture; /// @@ -969,7 +925,7 @@ namespace System.Runtime.CompilerServices get => this.formatProviderField; set { - if ((value != null)) this.formatProviderField = value; + if (value != null) this.formatProviderField = value; } } /// @@ -977,15 +933,14 @@ namespace System.Runtime.CompilerServices /// public string ToStringWithCulture(object objectToConvert) { - if ((objectToConvert == null)) throw new ArgumentNullException(nameof(objectToConvert)); + if (objectToConvert == null) throw new ArgumentNullException(nameof(objectToConvert)); - Type t = objectToConvert.GetType(); - MethodInfo method = t.GetTypeInfo().GetMethod("ToString", new Type[] { typeof(IFormatProvider) }); + var t = objectToConvert.GetType(); + var method = t.GetTypeInfo().GetMethod("ToString", new Type[] { typeof(IFormatProvider) }); - if ((method == null)) - return objectToConvert.ToString(); - else - return ((string)(method.Invoke(objectToConvert, new object[] { this.formatProviderField }))); + return method == null + ? objectToConvert.ToString() + : (string)(method.Invoke(objectToConvert, new object[] { this.formatProviderField })); } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs index a65b195..57d7c8b 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs @@ -45,7 +45,7 @@ namespace Microsoft.StreamProcessing public static bool TryGetCachedComparer(out IComparerExpression comparer) { - Type t = typeof(T); + var t = typeof(T); comparer = null; if (typeComparerCache.TryGetValue(t, out object temp)) { @@ -84,7 +84,7 @@ namespace Microsoft.StreamProcessing { get { - Type type = typeof(T); + var type = typeof(T); lock (sentinel) { diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExtensions.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExtensions.cs index 7bf1d80..a93a32e 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExtensions.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExtensions.cs @@ -63,10 +63,8 @@ namespace Microsoft.StreamProcessing => EqualityComparer.IsEqual(source.GetCompareExpr(), other.GetCompareExpr()); public static bool ExpressionEquals(this IEqualityComparerExpression source, IEqualityComparerExpression other) - { - return EqualityComparer.IsEqual(source.GetEqualsExpr(), other.GetEqualsExpr()) - && EqualityComparer.IsEqual(source.GetGetHashCodeExpr(), other.GetGetHashCodeExpr()); - } + => EqualityComparer.IsEqual(source.GetEqualsExpr(), other.GetEqualsExpr()) + && EqualityComparer.IsEqual(source.GetGetHashCodeExpr(), other.GetGetHashCodeExpr()); /// /// Performs a special kind of equality test on IEqualityComparer<T> in which case, both the Equals function and GetHashCode function are checked @@ -91,10 +89,8 @@ namespace Microsoft.StreamProcessing private static dynamic TryIsEqualIECE(dynamic obj1, dynamic obj2) => IsEqualIECE(obj1, obj2); private static bool IsEqualIECE(IEqualityComparerExpression o1, IEqualityComparerExpression o2) - { - return EqualityComparer.IsEqual(o1.GetEqualsExpr(), o2.GetEqualsExpr()) && - EqualityComparer.IsEqual(o1.GetGetHashCodeExpr(), o2.GetGetHashCodeExpr()); - } + => EqualityComparer.IsEqual(o1.GetEqualsExpr(), o2.GetEqualsExpr()) + && EqualityComparer.IsEqual(o1.GetGetHashCodeExpr(), o2.GetGetHashCodeExpr()); // catch-all for other types private static bool IsEqualIECE(object o1, object o2) => false; diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/EqualityComparerExpression.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/EqualityComparerExpression.cs index e11293f..f13705b 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/EqualityComparerExpression.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/EqualityComparerExpression.cs @@ -82,12 +82,12 @@ namespace Microsoft.StreamProcessing public static bool TryGetCachedGetHashCodeFunction(out Func getHashCodeFunction) { var t = typeof(T); - getHashCodeFunction = null; if (getHashCodeCache.TryGetValue(t, out object temp)) { getHashCodeFunction = (Func)temp; return true; } + getHashCodeFunction = null; return false; } diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/ExpressionExtensions.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/ExpressionExtensions.cs index f924381..0dd3eb3 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/ExpressionExtensions.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/ExpressionExtensions.cs @@ -111,8 +111,7 @@ namespace Microsoft.StreamProcessing var stringBuilder = new StringBuilder(); var visitor = new ConvertToCSharpButWithStringParameters(new StringWriter(stringBuilder, CultureInfo.InvariantCulture), map); visitor.Visit(e); - var s = stringBuilder.ToString(); - return s; + return stringBuilder.ToString(); } public static LambdaExpression RemoveCastToObject(this LambdaExpression lambda) @@ -157,10 +156,9 @@ namespace Microsoft.StreamProcessing } protected override Expression VisitParameter(ParameterExpression node) - { - if (this.arguments.TryGetValue(node, out Expression replacement) && replacement != null) return replacement; - return node; - } + => this.arguments.TryGetValue(node, out var replacement) && replacement != null + ? replacement + : node; } internal sealed class ConstantExpressionFinder : ExpressionVisitor @@ -181,12 +179,14 @@ namespace Microsoft.StreamProcessing me.Visit(function.Body); return me.isConstant; } + protected override Expression VisitConstant(ConstantExpression node) { var t = node.Type; if (!t.GetTypeInfo().IsPrimitive) this.isConstant = false; return base.VisitConstant(node); } + protected override Expression VisitMember(MemberExpression node) { if (!(node.Expression is MemberExpression me)) // if it is a member expression, then let visitor recurse down to the left-most branch @@ -313,8 +313,9 @@ namespace Microsoft.StreamProcessing if (e1 is NewExpression new1) { var new2 = e2 as NewExpression; - if (new1.Constructor == null) return new2.Constructor == null; - return new1.Constructor.Equals(new2.Constructor) && Equals(new1.Arguments, new2.Arguments); + return new1.Constructor == null + ? new2.Constructor == null + : new1.Constructor.Equals(new2.Constructor) && Equals(new1.Arguments, new2.Arguments); } if (e1 is NewArrayExpression newarr1) { @@ -475,10 +476,9 @@ namespace Microsoft.StreamProcessing => this.arguments = new Dictionary(arguments); protected override Expression VisitParameter(ParameterExpression node) - { - if (this.arguments.TryGetValue(node, out Expression replacement)) return replacement; - return base.VisitParameter(node); - } + => this.arguments.TryGetValue(node, out var replacement) + ? replacement + : base.VisitParameter(node); } /// @@ -532,7 +532,7 @@ namespace Microsoft.StreamProcessing // Returns: // The modified expression, if it or any subexpression was modified; otherwise, // returns the original expression. - public override Expression Visit(Expression node) { return base.Visit(node); } + public override Expression Visit(Expression node) => base.Visit(node); // // Summary: @@ -1065,9 +1065,7 @@ namespace Microsoft.StreamProcessing // The modified expression, if it or any subexpression was modified; otherwise, // returns the original expression. protected override MemberBinding VisitMemberBinding(MemberBinding node) - { - return base.VisitMemberBinding(node); // taken care of by VisitMemberAssignment, but what if there are other subtypes of MemberBinding? - } + => base.VisitMemberBinding(node); // taken care of by VisitMemberAssignment, but what if there are other subtypes of MemberBinding? // // Summary: @@ -2027,10 +2025,9 @@ namespace Microsoft.StreamProcessing } protected override Expression VisitMember(MemberExpression node) - { - if (node.Member.Name == "Key" && node.Expression == parameter) return newParameter; - return base.VisitMember(node); - } + => node.Member.Name == "Key" && node.Expression == parameter + ? newParameter + : base.VisitMember(node); protected override Expression VisitParameter(ParameterExpression node) { diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/TransformerExtensions.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/TransformerExtensions.cs index dff497b..7ddaa8c 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/TransformerExtensions.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/TransformerExtensions.cs @@ -26,10 +26,9 @@ namespace Microsoft.StreamProcessing { Contract.Requires(types != null); var i = types.Count(t => t.IsAnonymousType()); - if (i > 0) - return s + "`" + i.ToString(CultureInfo.InvariantCulture); - else - return s; + return i > 0 + ? s + "`" + i.ToString(CultureInfo.InvariantCulture) + : s; } public static bool OptimizeString(this MyFieldInfo field) => Config.UseMultiString && field.Type == typeof(string); diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/TypeExtensions.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/TypeExtensions.cs index bbbd584..c1aa4d4 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Utilities/TypeExtensions.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/TypeExtensions.cs @@ -113,8 +113,9 @@ namespace Microsoft.StreamProcessing public static Tuple, bool> GetAnnotatedFields(this Type t) { var fields = t.ResolveMembers(); - if (fields.Any()) return Tuple.Create(fields, false); - else return Tuple.Create(new MyFieldInfo(t).Yield(), true); + return fields.Any() + ? Tuple.Create(fields, false) + : Tuple.Create(new MyFieldInfo(t).Yield(), true); } private static readonly Dictionary OperatorNameLookup; @@ -158,9 +159,9 @@ namespace Microsoft.StreamProcessing var genericArgs = types .Distinct() .Where(g => IsAnonymousType(g)); - if (!genericArgs.Any()) - return type; - return type.MakeGenericType(genericArgs.ToArray()); + return !genericArgs.Any() + ? type + : type.MakeGenericType(genericArgs.ToArray()); } public static bool IsAnonymousTypeName(this Type type) @@ -315,8 +316,9 @@ namespace Microsoft.StreamProcessing return outerKeyType.KeyTypeNeedsGeneratedMemoryPool() || innerKeyType.KeyTypeNeedsGeneratedMemoryPool(); } } - if (keyType == typeof(Empty)) return false; - return keyType.NeedGeneratedMemoryPool(); + return keyType == typeof(Empty) + ? false + : keyType.NeedGeneratedMemoryPool(); } /// @@ -531,28 +533,19 @@ namespace Microsoft.StreamProcessing } public static bool ImplementsIEqualityComparerExpression(this Type t) - { - return t - .GetTypeInfo() + => t.GetTypeInfo() .GetInterfaces() .Any(i => i.Namespace.Equals("Microsoft.StreamProcessing") && i.Name.Equals("IEqualityComparerExpression`1") && i.GetTypeInfo().GetGenericArguments().Length == 1 && i.GetTypeInfo().GetGenericArguments()[0] == t); - } public static bool ImplementsIEqualityComparer(this Type t) - { - return t - .GetTypeInfo() + => t.GetTypeInfo() .GetInterfaces() .Any(i => i.Namespace.Equals("System.Collections.Generic") && i.Name.Equals("IEqualityComparer`1") && i.GetTypeInfo().GetGenericArguments().Length == 1 && i.GetTypeInfo().GetGenericArguments()[0] == t); - } public static bool ImplementsIEquatable(this Type t) - { - return t - .GetTypeInfo() + => t.GetTypeInfo() .GetInterfaces() .Any(i => i.Namespace.Equals("System") && i.Name.Equals("IEquatable`1") && i.GetTypeInfo().GetGenericArguments().Length == 1 && i.GetTypeInfo().GetGenericArguments()[0] == t); - } #region Borrowed from Roslyn @@ -651,21 +644,19 @@ namespace Microsoft.StreamProcessing /// true if the type is unsupported; otherwise, false. /// public static bool IsUnsupported(this Type type) - { - return type == typeof(IntPtr) - || type == typeof(UIntPtr) - || type == typeof(object) - || type.GetTypeInfo().ContainsGenericParameters - || (!type.IsArray - && !type.GetTypeInfo().IsValueType - && !type.HasSupportedParameterizedConstructor() - && !type.HasParameterlessConstructor() - && type != typeof(string) - && type != typeof(Uri) - && !type.GetTypeInfo().IsAbstract - && !type.GetTypeInfo().IsInterface - && !(type.GetTypeInfo().IsGenericType && SupportedInterfaces.Contains(type.GetGenericTypeDefinition()))); - } + => type == typeof(IntPtr) + || type == typeof(UIntPtr) + || type == typeof(object) + || type.GetTypeInfo().ContainsGenericParameters + || (!type.IsArray + && !type.GetTypeInfo().IsValueType + && !type.HasSupportedParameterizedConstructor() + && !type.HasParameterlessConstructor() + && type != typeof(string) + && type != typeof(Uri) + && !type.GetTypeInfo().IsAbstract + && !type.GetTypeInfo().IsInterface + && !(type.GetTypeInfo().IsGenericType && SupportedInterfaces.Contains(type.GetGenericTypeDefinition()))); private static readonly HashSet SupportedInterfaces = new HashSet {