This commit is contained in:
Greg Oliver 2019-02-21 11:30:02 +00:00
Родитель 61e235e4ac
Коммит a8ca38c8ba
8 изменённых файлов: 815 добавлений и 7 удалений

Просмотреть файл

@ -5,6 +5,7 @@ using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace nsgFunc
{
@ -71,8 +72,10 @@ namespace nsgFunc
try
{
byte[] nsgMessages = new byte[dataLength];
CloudBlockBlob blob = nsgDataBlobBinder.BindAsync<CloudBlockBlob>(attributes).Result;
await blob.DownloadRangeToByteArrayAsync(nsgMessages, 0, startingByte, dataLength);
nsgMessagesString = System.Text.Encoding.UTF8.GetString(nsgMessages);
}
catch (Exception ex)

227
NwNsgProject/Extensions.cs Normal file
Просмотреть файл

@ -0,0 +1,227 @@
// Copyright (c) Microsoft. All Rights Reserved. Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using Microsoft.CodeAnalysis.PooledObjects;
namespace Microsoft.CodeAnalysis
{
internal static class SharedPoolExtensions
{
private const int Threshold = 512;
public static PooledObject<StringBuilder> GetPooledObject(this ObjectPool<StringBuilder> pool)
{
return PooledObject<StringBuilder>.Create(pool);
}
public static PooledObject<Stopwatch> GetPooledObject(this ObjectPool<Stopwatch> pool)
{
return PooledObject<Stopwatch>.Create(pool);
}
public static PooledObject<Stack<TItem>> GetPooledObject<TItem>(this ObjectPool<Stack<TItem>> pool)
{
return PooledObject<Stack<TItem>>.Create(pool);
}
public static PooledObject<Queue<TItem>> GetPooledObject<TItem>(this ObjectPool<Queue<TItem>> pool)
{
return PooledObject<Queue<TItem>>.Create(pool);
}
public static PooledObject<HashSet<TItem>> GetPooledObject<TItem>(this ObjectPool<HashSet<TItem>> pool)
{
return PooledObject<HashSet<TItem>>.Create(pool);
}
public static PooledObject<Dictionary<TKey, TValue>> GetPooledObject<TKey, TValue>(this ObjectPool<Dictionary<TKey, TValue>> pool)
{
return PooledObject<Dictionary<TKey, TValue>>.Create(pool);
}
public static PooledObject<List<TItem>> GetPooledObject<TItem>(this ObjectPool<List<TItem>> pool)
{
return PooledObject<List<TItem>>.Create(pool);
}
public static PooledObject<T> GetPooledObject<T>(this ObjectPool<T> pool) where T : class
{
return new PooledObject<T>(pool, p => p.Allocate(), (p, o) => p.Free(o));
}
public static StringBuilder AllocateAndClear(this ObjectPool<StringBuilder> pool)
{
var sb = pool.Allocate();
sb.Clear();
return sb;
}
public static Stopwatch AllocateAndClear(this ObjectPool<Stopwatch> pool)
{
var watch = pool.Allocate();
watch.Reset();
return watch;
}
public static Stack<T> AllocateAndClear<T>(this ObjectPool<Stack<T>> pool)
{
var set = pool.Allocate();
set.Clear();
return set;
}
public static Queue<T> AllocateAndClear<T>(this ObjectPool<Queue<T>> pool)
{
var set = pool.Allocate();
set.Clear();
return set;
}
public static HashSet<T> AllocateAndClear<T>(this ObjectPool<HashSet<T>> pool)
{
var set = pool.Allocate();
set.Clear();
return set;
}
public static Dictionary<TKey, TValue> AllocateAndClear<TKey, TValue>(this ObjectPool<Dictionary<TKey, TValue>> pool)
{
var map = pool.Allocate();
map.Clear();
return map;
}
public static List<T> AllocateAndClear<T>(this ObjectPool<List<T>> pool)
{
var list = pool.Allocate();
list.Clear();
return list;
}
public static void ClearAndFree(this ObjectPool<StringBuilder> pool, StringBuilder sb)
{
if (sb == null)
{
return;
}
sb.Clear();
if (sb.Capacity > Threshold)
{
sb.Capacity = Threshold;
}
pool.Free(sb);
}
public static void ClearAndFree(this ObjectPool<Stopwatch> pool, Stopwatch watch)
{
if (watch == null)
{
return;
}
watch.Reset();
pool.Free(watch);
}
public static void ClearAndFree<T>(this ObjectPool<HashSet<T>> pool, HashSet<T> set)
{
if (set == null)
{
return;
}
var count = set.Count;
set.Clear();
if (count > Threshold)
{
set.TrimExcess();
}
pool.Free(set);
}
public static void ClearAndFree<T>(this ObjectPool<Stack<T>> pool, Stack<T> set)
{
if (set == null)
{
return;
}
var count = set.Count;
set.Clear();
if (count > Threshold)
{
set.TrimExcess();
}
pool.Free(set);
}
public static void ClearAndFree<T>(this ObjectPool<Queue<T>> pool, Queue<T> set)
{
if (set == null)
{
return;
}
var count = set.Count;
set.Clear();
if (count > Threshold)
{
set.TrimExcess();
}
pool.Free(set);
}
public static void ClearAndFree<TKey, TValue>(this ObjectPool<Dictionary<TKey, TValue>> pool, Dictionary<TKey, TValue> map)
{
if (map == null)
{
return;
}
// if map grew too big, don't put it back to pool
if (map.Count > Threshold)
{
pool.ForgetTrackedObject(map);
return;
}
map.Clear();
pool.Free(map);
}
public static void ClearAndFree<T>(this ObjectPool<List<T>> pool, List<T> list)
{
if (list == null)
{
return;
}
list.Clear();
if (list.Capacity > Threshold)
{
list.Capacity = Threshold;
}
pool.Free(list);
}
}
}

Просмотреть файл

@ -0,0 +1,168 @@
// Copyright (c) Microsoft. All Rights Reserved. Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using Microsoft.CodeAnalysis.PooledObjects;
namespace Microsoft.CodeAnalysis
{
/// <summary>
/// this is RAII object to automatically release pooled object when its owning pool
/// </summary>
internal struct PooledObject<T> : IDisposable where T : class
{
private readonly Action<ObjectPool<T>, T> _releaser;
private readonly ObjectPool<T> _pool;
private T _pooledObject;
public PooledObject(ObjectPool<T> pool, Func<ObjectPool<T>, T> allocator, Action<ObjectPool<T>, T> releaser) : this()
{
_pool = pool;
_pooledObject = allocator(pool);
_releaser = releaser;
}
public T Object => _pooledObject;
public void Dispose()
{
if (_pooledObject != null)
{
_releaser(_pool, _pooledObject);
_pooledObject = null;
}
}
#region factory
public static PooledObject<StringBuilder> Create(ObjectPool<StringBuilder> pool)
{
return new PooledObject<StringBuilder>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
public static PooledObject<Stopwatch> Create(ObjectPool<Stopwatch> pool)
{
return new PooledObject<Stopwatch>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
public static PooledObject<Stack<TItem>> Create<TItem>(ObjectPool<Stack<TItem>> pool)
{
return new PooledObject<Stack<TItem>>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
public static PooledObject<Queue<TItem>> Create<TItem>(ObjectPool<Queue<TItem>> pool)
{
return new PooledObject<Queue<TItem>>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
public static PooledObject<HashSet<TItem>> Create<TItem>(ObjectPool<HashSet<TItem>> pool)
{
return new PooledObject<HashSet<TItem>>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
public static PooledObject<Dictionary<TKey, TValue>> Create<TKey, TValue>(ObjectPool<Dictionary<TKey, TValue>> pool)
{
return new PooledObject<Dictionary<TKey, TValue>>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
public static PooledObject<List<TItem>> Create<TItem>(ObjectPool<List<TItem>> pool)
{
return new PooledObject<List<TItem>>(
pool,
p => Allocator(p),
(p, sb) => Releaser(p, sb));
}
#endregion
#region allocators and releasers
private static StringBuilder Allocator(ObjectPool<StringBuilder> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser(ObjectPool<StringBuilder> pool, StringBuilder sb)
{
pool.ClearAndFree(sb);
}
private static Stopwatch Allocator(ObjectPool<Stopwatch> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser(ObjectPool<Stopwatch> pool, Stopwatch sb)
{
pool.ClearAndFree(sb);
}
private static Stack<TItem> Allocator<TItem>(ObjectPool<Stack<TItem>> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser<TItem>(ObjectPool<Stack<TItem>> pool, Stack<TItem> obj)
{
pool.ClearAndFree(obj);
}
private static Queue<TItem> Allocator<TItem>(ObjectPool<Queue<TItem>> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser<TItem>(ObjectPool<Queue<TItem>> pool, Queue<TItem> obj)
{
pool.ClearAndFree(obj);
}
private static HashSet<TItem> Allocator<TItem>(ObjectPool<HashSet<TItem>> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser<TItem>(ObjectPool<HashSet<TItem>> pool, HashSet<TItem> obj)
{
pool.ClearAndFree(obj);
}
private static Dictionary<TKey, TValue> Allocator<TKey, TValue>(ObjectPool<Dictionary<TKey, TValue>> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser<TKey, TValue>(ObjectPool<Dictionary<TKey, TValue>> pool, Dictionary<TKey, TValue> obj)
{
pool.ClearAndFree(obj);
}
private static List<TItem> Allocator<TItem>(ObjectPool<List<TItem>> pool)
{
return pool.AllocateAndClear();
}
private static void Releaser<TItem>(ObjectPool<List<TItem>> pool, List<TItem> obj)
{
pool.ClearAndFree(obj);
}
#endregion
}
}

Просмотреть файл

@ -0,0 +1,279 @@
// Copyright (c) Microsoft. All Rights Reserved. Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// define TRACE_LEAKS to get additional diagnostics that can lead to the leak sources. note: it will
// make everything about 2-3x slower
//
// #define TRACE_LEAKS
// define DETECT_LEAKS to detect possible leaks
// #if DEBUG
// #define DETECT_LEAKS //for now always enable DETECT_LEAKS in debug.
// #endif
using System;
using System.Diagnostics;
using System.Threading;
#if DETECT_LEAKS
using System.Runtime.CompilerServices;
#endif
namespace Microsoft.CodeAnalysis.PooledObjects
{
/// <summary>
/// Generic implementation of object pooling pattern with predefined pool size limit. The main
/// purpose is that limited number of frequently used objects can be kept in the pool for
/// further recycling.
///
/// Notes:
/// 1) it is not the goal to keep all returned objects. Pool is not meant for storage. If there
/// is no space in the pool, extra returned objects will be dropped.
///
/// 2) it is implied that if object was obtained from a pool, the caller will return it back in
/// a relatively short time. Keeping checked out objects for long durations is ok, but
/// reduces usefulness of pooling. Just new up your own.
///
/// Not returning objects to the pool in not detrimental to the pool's work, but is a bad practice.
/// Rationale:
/// If there is no intent for reusing the object, do not use pool - just use "new".
/// </summary>
internal class ObjectPool<T> where T : class
{
[DebuggerDisplay("{Value,nq}")]
private struct Element
{
internal T Value;
}
/// <remarks>
/// Not using System.Func{T} because this file is linked into the (debugger) Formatter,
/// which does not have that type (since it compiles against .NET 2.0).
/// </remarks>
internal delegate T Factory();
// Storage for the pool objects. The first item is stored in a dedicated field because we
// expect to be able to satisfy most requests from it.
private T _firstItem;
private readonly Element[] _items;
// factory is stored for the lifetime of the pool. We will call this only when pool needs to
// expand. compared to "new T()", Func gives more flexibility to implementers and faster
// than "new T()".
private readonly Factory _factory;
#if DETECT_LEAKS
private static readonly ConditionalWeakTable<T, LeakTracker> leakTrackers = new ConditionalWeakTable<T, LeakTracker>();
private class LeakTracker : IDisposable
{
private volatile bool disposed;
#if TRACE_LEAKS
internal volatile object Trace = null;
#endif
public void Dispose()
{
disposed = true;
GC.SuppressFinalize(this);
}
private string GetTrace()
{
#if TRACE_LEAKS
return Trace == null ? "" : Trace.ToString();
#else
return "Leak tracing information is disabled. Define TRACE_LEAKS on ObjectPool`1.cs to get more info \n";
#endif
}
~LeakTracker()
{
if (!this.disposed && !Environment.HasShutdownStarted)
{
var trace = GetTrace();
// If you are seeing this message it means that object has been allocated from the pool
// and has not been returned back. This is not critical, but turns pool into rather
// inefficient kind of "new".
Debug.WriteLine($"TRACEOBJECTPOOLLEAKS_BEGIN\nPool detected potential leaking of {typeof(T)}. \n Location of the leak: \n {GetTrace()} TRACEOBJECTPOOLLEAKS_END");
}
}
}
#endif
internal ObjectPool(Factory factory)
: this(factory, Environment.ProcessorCount * 2)
{ }
internal ObjectPool(Factory factory, int size)
{
Debug.Assert(size >= 1);
_factory = factory;
_items = new Element[size - 1];
}
private T CreateInstance()
{
var inst = _factory();
return inst;
}
/// <summary>
/// Produces an instance.
/// </summary>
/// <remarks>
/// Search strategy is a simple linear probing which is chosen for it cache-friendliness.
/// Note that Free will try to store recycled objects close to the start thus statistically
/// reducing how far we will typically search.
/// </remarks>
internal T Allocate()
{
// PERF: Examine the first element. If that fails, AllocateSlow will look at the remaining elements.
// Note that the initial read is optimistically not synchronized. That is intentional.
// We will interlock only when we have a candidate. in a worst case we may miss some
// recently returned objects. Not a big deal.
T inst = _firstItem;
if (inst == null || inst != Interlocked.CompareExchange(ref _firstItem, null, inst))
{
inst = AllocateSlow();
}
#if DETECT_LEAKS
var tracker = new LeakTracker();
leakTrackers.Add(inst, tracker);
#if TRACE_LEAKS
var frame = CaptureStackTrace();
tracker.Trace = frame;
#endif
#endif
return inst;
}
private T AllocateSlow()
{
var items = _items;
for (int i = 0; i < items.Length; i++)
{
// Note that the initial read is optimistically not synchronized. That is intentional.
// We will interlock only when we have a candidate. in a worst case we may miss some
// recently returned objects. Not a big deal.
T inst = items[i].Value;
if (inst != null)
{
if (inst == Interlocked.CompareExchange(ref items[i].Value, null, inst))
{
return inst;
}
}
}
return CreateInstance();
}
/// <summary>
/// Returns objects to the pool.
/// </summary>
/// <remarks>
/// Search strategy is a simple linear probing which is chosen for it cache-friendliness.
/// Note that Free will try to store recycled objects close to the start thus statistically
/// reducing how far we will typically search in Allocate.
/// </remarks>
internal void Free(T obj)
{
Validate(obj);
ForgetTrackedObject(obj);
if (_firstItem == null)
{
// Intentionally not using interlocked here.
// In a worst case scenario two objects may be stored into same slot.
// It is very unlikely to happen and will only mean that one of the objects will get collected.
_firstItem = obj;
}
else
{
FreeSlow(obj);
}
}
private void FreeSlow(T obj)
{
var items = _items;
for (int i = 0; i < items.Length; i++)
{
if (items[i].Value == null)
{
// Intentionally not using interlocked here.
// In a worst case scenario two objects may be stored into same slot.
// It is very unlikely to happen and will only mean that one of the objects will get collected.
items[i].Value = obj;
break;
}
}
}
/// <summary>
/// Removes an object from leak tracking.
///
/// This is called when an object is returned to the pool. It may also be explicitly
/// called if an object allocated from the pool is intentionally not being returned
/// to the pool. This can be of use with pooled arrays if the consumer wants to
/// return a larger array to the pool than was originally allocated.
/// </summary>
[Conditional("DEBUG")]
internal void ForgetTrackedObject(T old, T replacement = null)
{
#if DETECT_LEAKS
LeakTracker tracker;
if (leakTrackers.TryGetValue(old, out tracker))
{
tracker.Dispose();
leakTrackers.Remove(old);
}
else
{
var trace = CaptureStackTrace();
Debug.WriteLine($"TRACEOBJECTPOOLLEAKS_BEGIN\nObject of type {typeof(T)} was freed, but was not from pool. \n Callstack: \n {trace} TRACEOBJECTPOOLLEAKS_END");
}
if (replacement != null)
{
tracker = new LeakTracker();
leakTrackers.Add(replacement, tracker);
}
#endif
}
#if DETECT_LEAKS
private static Lazy<Type> _stackTraceType = new Lazy<Type>(() => Type.GetType("System.Diagnostics.StackTrace"));
private static object CaptureStackTrace()
{
return Activator.CreateInstance(_stackTraceType.Value);
}
#endif
[Conditional("DEBUG")]
private void Validate(object obj)
{
Debug.Assert(obj != null, "freeing null?");
Debug.Assert(_firstItem != obj, "freeing twice?");
var items = _items;
for (int i = 0; i < items.Length; i++)
{
var value = items[i].Value;
if (value == null)
{
return;
}
Debug.Assert(value != obj, "freeing twice?");
}
}
}
}

Просмотреть файл

@ -0,0 +1,86 @@
// Copyright (c) Microsoft. All Rights Reserved. Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using Microsoft.CodeAnalysis.PooledObjects;
namespace Microsoft.CodeAnalysis
{
/// <summary>
/// Shared object pool for roslyn
///
/// Use this shared pool if only concern is reducing object allocations.
/// if perf of an object pool itself is also a concern, use ObjectPool directly.
///
/// For example, if you want to create a million of small objects within a second,
/// use the ObjectPool directly. it should have much less overhead than using this.
/// </summary>
internal static class SharedPools
{
/// <summary>
/// pool that uses default constructor with 100 elements pooled
/// </summary>
public static ObjectPool<T> BigDefault<T>() where T : class, new()
{
return DefaultBigPool<T>.Instance;
}
/// <summary>
/// pool that uses default constructor with 20 elements pooled
/// </summary>
public static ObjectPool<T> Default<T>() where T : class, new()
{
return DefaultNormalPool<T>.Instance;
}
/// <summary>
/// pool that uses string as key with StringComparer.OrdinalIgnoreCase as key comparer
/// </summary>
public static ObjectPool<Dictionary<string, T>> StringIgnoreCaseDictionary<T>()
{
return StringIgnoreCaseDictionaryNormalPool<T>.Instance;
}
/// <summary>
/// pool that uses string as element with StringComparer.OrdinalIgnoreCase as element comparer
/// </summary>
public static readonly ObjectPool<HashSet<string>> StringIgnoreCaseHashSet =
new ObjectPool<HashSet<string>>(() => new HashSet<string>(StringComparer.OrdinalIgnoreCase), 20);
/// <summary>
/// pool that uses string as element with StringComparer.Ordinal as element comparer
/// </summary>
public static readonly ObjectPool<HashSet<string>> StringHashSet =
new ObjectPool<HashSet<string>>(() => new HashSet<string>(StringComparer.Ordinal), 20);
/// <summary>
/// Used to reduce the # of temporary byte[]s created to satisfy serialization and
/// other I/O requests
/// </summary>
public static readonly ObjectPool<byte[]> ByteArray = new ObjectPool<byte[]>(() => new byte[ByteBufferSize], ByteBufferCount);
public static readonly ObjectPool<char[]> CharArray = new ObjectPool<char[]>(() => new char[ByteBufferSize], CharBufferCount);
// byte pooled memory : 4K * 512 = 4MB
public const int ByteBufferSize = 4 * 1024;
private const int ByteBufferCount = 512;
// char pooled memory : 8K * 5 = 40K
private const int CharBufferCount = 5;
private static class DefaultBigPool<T> where T : class, new()
{
public static readonly ObjectPool<T> Instance = new ObjectPool<T>(() => new T(), 100);
}
private static class DefaultNormalPool<T> where T : class, new()
{
public static readonly ObjectPool<T> Instance = new ObjectPool<T>(() => new T(), 20);
}
private static class StringIgnoreCaseDictionaryNormalPool<T>
{
public static readonly ObjectPool<Dictionary<string, T>> Instance =
new ObjectPool<Dictionary<string, T>>(() => new Dictionary<string, T>(StringComparer.OrdinalIgnoreCase), 20);
}
}
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All Rights Reserved. Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Text;
namespace Microsoft.CodeAnalysis.Formatting
{
internal static class StringBuilderPool
{
public static StringBuilder Allocate()
{
return SharedPools.Default<StringBuilder>().AllocateAndClear();
}
public static void Free(StringBuilder builder)
{
SharedPools.Default<StringBuilder>().ClearAndFree(builder);
}
public static string ReturnAndFree(StringBuilder builder)
{
SharedPools.Default<StringBuilder>().ForgetTrackedObject(builder);
return builder.ToString();
}
}
}

Просмотреть файл

@ -5,6 +5,15 @@ using System.Text.RegularExpressions;
using Newtonsoft.Json;
using System.Collections.Generic;
class ByteArray
{
public byte[] b;
public ByteArray ()
{
b = new byte[1024 * 1024];
}
}
class SplunkEventMessage
{
public string sourcetype { get; set; }

Просмотреть файл

@ -9,6 +9,7 @@ using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Formatting;
namespace nsgFunc
{
@ -76,17 +77,27 @@ namespace nsgFunc
{
foreach (var messageList in denormalizedSplunkEvents(newClientContent, null, log))
{
StringBuilder outgoingJson = new StringBuilder(MAXTRANSMISSIONSIZE);
foreach (var message in messageList)
StringBuilder outgoingJson = StringBuilderPool.Allocate();
outgoingJson.Capacity = MAXTRANSMISSIONSIZE;
try
{
var messageAsString = JsonConvert.SerializeObject(message, new JsonSerializerSettings
foreach (var message in messageList)
{
NullValueHandling = NullValueHandling.Ignore
});
outgoingJson.Append(messageAsString);
var messageAsString = JsonConvert.SerializeObject(message, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
});
outgoingJson.Append(messageAsString);
}
yield return outgoingJson.ToString();
}
finally
{
StringBuilderPool.Free(outgoingJson);
}
yield return outgoingJson.ToString();
}
}