This commit is contained in:
Pavel Krymets 2017-10-06 12:07:03 -07:00 коммит произвёл GitHub
Родитель 4f047ea6ac
Коммит 39e2551b02
8 изменённых файлов: 111 добавлений и 38 удалений

Просмотреть файл

@ -35,7 +35,8 @@ namespace System.IO.Pipelines.Testing
// Create a segment that has offset relative to the OwnedMemory and OwnedMemory itself has offset relative to array // Create a segment that has offset relative to the OwnedMemory and OwnedMemory itself has offset relative to array
var ownedBuffer = new OwnedArray<byte>(chars); var ownedBuffer = new OwnedArray<byte>(chars);
var current = new BufferSegment(ownedBuffer, length, length * 2); var current = new BufferSegment();
current.SetMemory(ownedBuffer, length, length * 2);
if (first == null) if (first == null)
{ {
first = current; first = current;

Просмотреть файл

@ -7,8 +7,7 @@ using System.Text;
namespace System.IO.Pipelines namespace System.IO.Pipelines
{ {
// TODO: Pool segments internal class BufferSegment
internal class BufferSegment : IDisposable
{ {
/// <summary> /// <summary>
/// The Start represents the offset into Array where the range of "active" bytes begins. At the point when the block is leased /// The Start represents the offset into Array where the range of "active" bytes begins. At the point when the block is leased
@ -44,21 +43,27 @@ namespace System.IO.Pipelines
private Memory<byte> _buffer; private Memory<byte> _buffer;
public BufferSegment(OwnedMemory<byte> buffer) public void SetMemory(OwnedMemory<byte> buffer)
{ {
_owned = buffer; SetMemory(buffer, 0, 0);
Start = 0;
End = 0;
_owned.Retain();
_buffer = _owned.Memory;
} }
public BufferSegment(OwnedMemory<byte> buffer, int start, int end): this(buffer) public void SetMemory(OwnedMemory<byte> buffer, int start, int end, bool readOnly = false)
{ {
_owned = buffer;
_owned.Retain();
_buffer = _owned.Memory;
RunningLength = 0;
Start = start; Start = start;
End = end; End = end;
ReadOnly = true; Next = null;
}
public void ResetMemory()
{
_owned.Release();
_owned = null;
} }
public Memory<byte> Buffer => _buffer; public Memory<byte> Buffer => _buffer;
@ -79,17 +84,17 @@ namespace System.IO.Pipelines
/// </summary> /// </summary>
public int WritableBytes => _buffer.Length - End; public int WritableBytes => _buffer.Length - End;
public void Dispose()
{
_owned.Release();
}
/// <summary> /// <summary>
/// ToString overridden for debugger convenience. This displays the "active" byte information in this block as ASCII characters. /// ToString overridden for debugger convenience. This displays the "active" byte information in this block as ASCII characters.
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public override string ToString() public override string ToString()
{ {
if (_owned == null)
{
return "<NO MEMORY ATTACHED>";
}
var builder = new StringBuilder(); var builder = new StringBuilder();
var data = _owned.Memory.Slice(Start, ReadableBytes).Span; var data = _owned.Memory.Slice(Start, ReadableBytes).Span;
@ -107,24 +112,29 @@ namespace System.IO.Pipelines
if (beginOrig == endOrig) if (beginOrig == endOrig)
{ {
lastSegment = new BufferSegment(beginOrig._owned, beginBuffer.Index, endBuffer.Index); lastSegment = new BufferSegment();
lastSegment.SetMemory(beginOrig._owned, beginBuffer.Index, endBuffer.Index);
return lastSegment; return lastSegment;
} }
var beginClone = new BufferSegment(beginOrig._owned, beginBuffer.Index, beginOrig.End); var beginClone = new BufferSegment();
beginClone.SetMemory(beginOrig._owned, beginBuffer.Index, beginOrig.End);
var endClone = beginClone; var endClone = beginClone;
beginOrig = beginOrig.Next; beginOrig = beginOrig.Next;
while (beginOrig != endOrig) while (beginOrig != endOrig)
{ {
endClone.SetNext(new BufferSegment(beginOrig._owned, beginOrig.Start, beginOrig.End)); var next = new BufferSegment();
next.SetMemory(beginOrig._owned, beginOrig.Start, beginOrig.End);
endClone.SetNext(next);
endClone = endClone.Next; endClone = endClone.Next;
beginOrig = beginOrig.Next; beginOrig = beginOrig.Next;
} }
lastSegment = new BufferSegment(endOrig._owned, endOrig.Start, endBuffer.Index); lastSegment = new BufferSegment();
lastSegment.SetMemory(endOrig._owned, endOrig.Start, endBuffer.Index);
endClone.SetNext(lastSegment); endClone.SetNext(lastSegment);
return beginClone; return beginClone;
@ -138,6 +148,7 @@ namespace System.IO.Pipelines
Next = segment; Next = segment;
segment = this; segment = this;
while (segment.Next != null) while (segment.Next != null)
{ {
segment.Next.RunningLength = segment.RunningLength + segment.ReadableBytes; segment.Next.RunningLength = segment.RunningLength + segment.ReadableBytes;

Просмотреть файл

@ -14,6 +14,8 @@ namespace System.IO.Pipelines
/// </summary> /// </summary>
public class Pipe : IPipe, IPipeReader, IPipeWriter, IReadableBufferAwaiter, IWritableBufferAwaiter public class Pipe : IPipe, IPipeReader, IPipeWriter, IReadableBufferAwaiter, IWritableBufferAwaiter
{ {
private const int SegmentPoolSize = 16;
private static readonly Action<object> _signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested(); private static readonly Action<object> _signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested();
private static readonly Action<object> _signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested(); private static readonly Action<object> _signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested();
private static readonly Action<object> _invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state).Execute(); private static readonly Action<object> _invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state).Execute();
@ -35,12 +37,15 @@ namespace System.IO.Pipelines
private long _length; private long _length;
private long _currentWriteLength; private long _currentWriteLength;
private int _pooledSegmentCount;
private PipeAwaitable _readerAwaitable; private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable; private PipeAwaitable _writerAwaitable;
private PipeCompletion _writerCompletion; private PipeCompletion _writerCompletion;
private PipeCompletion _readerCompletion; private PipeCompletion _readerCompletion;
private BufferSegment[] _bufferSegmentPool;
// The read head which is the extent of the IPipelineReader's consumed bytes // The read head which is the extent of the IPipelineReader's consumed bytes
private BufferSegment _readHead; private BufferSegment _readHead;
@ -85,6 +90,8 @@ namespace System.IO.Pipelines
throw new ArgumentException(nameof(options.MaximumSizeHigh) + " should be greater or equal to " + nameof(options.MaximumSizeLow), nameof(options.MaximumSizeHigh)); throw new ArgumentException(nameof(options.MaximumSizeHigh) + " should be greater or equal to " + nameof(options.MaximumSizeLow), nameof(options.MaximumSizeHigh));
} }
_bufferSegmentPool = new BufferSegment[SegmentPoolSize];
_pool = options.BufferPool; _pool = options.BufferPool;
_maximumSizeHigh = options.MaximumSizeHigh; _maximumSizeHigh = options.MaximumSizeHigh;
_maximumSizeLow = options.MaximumSizeLow; _maximumSizeLow = options.MaximumSizeLow;
@ -166,8 +173,13 @@ namespace System.IO.Pipelines
// If inadequate bytes left or if the segment is readonly // If inadequate bytes left or if the segment is readonly
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < count || segment.ReadOnly) if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < count || segment.ReadOnly)
{ {
var nextBuffer = _pool.Rent(count); BufferSegment nextSegment;
var nextSegment = new BufferSegment(nextBuffer); lock (_sync)
{
nextSegment = CreateSegmentUnsynchronized();
}
nextSegment.SetMemory(_pool.Rent(count));
segment.SetNext(nextSegment); segment.SetNext(nextSegment);
@ -194,7 +206,8 @@ namespace System.IO.Pipelines
if (segment == null) if (segment == null)
{ {
// No free tail space, allocate a new segment // No free tail space, allocate a new segment
segment = new BufferSegment(_pool.Rent(count)); segment = CreateSegmentUnsynchronized();
segment.SetMemory(_pool.Rent(count));
} }
if (_commitHead == null) if (_commitHead == null)
@ -215,6 +228,26 @@ namespace System.IO.Pipelines
return segment; return segment;
} }
private BufferSegment CreateSegmentUnsynchronized()
{
if (_pooledSegmentCount > 0)
{
_pooledSegmentCount--;
return _bufferSegmentPool[_pooledSegmentCount];
}
return new BufferSegment();
}
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
if (_pooledSegmentCount < _bufferSegmentPool.Length)
{
_bufferSegmentPool[_pooledSegmentCount] = segment;
_pooledSegmentCount++;
}
}
internal void Append(ReadableBuffer buffer) internal void Append(ReadableBuffer buffer)
{ {
if (buffer.IsEmpty) if (buffer.IsEmpty)
@ -409,7 +442,7 @@ namespace System.IO.Pipelines
if (readerCompleted) if (readerCompleted)
{ {
Dispose(); CompletePipe();
} }
} }
@ -485,12 +518,13 @@ namespace System.IO.Pipelines
} }
_readingState.End(ExceptionResource.NoReadToComplete); _readingState.End(ExceptionResource.NoReadToComplete);
}
while (returnStart != null && returnStart != returnEnd) while (returnStart != null && returnStart != returnEnd)
{ {
returnStart.Dispose(); returnStart.ResetMemory();
returnStart = returnStart.Next; ReturnSegmentUnsynchronized(returnStart);
returnStart = returnStart.Next;
}
} }
TrySchedule(_writerScheduler, continuation); TrySchedule(_writerScheduler, continuation);
@ -527,7 +561,7 @@ namespace System.IO.Pipelines
if (writerCompleted) if (writerCompleted)
{ {
Dispose(); CompletePipe();
} }
} }
@ -650,7 +684,7 @@ namespace System.IO.Pipelines
} }
} }
private void Dispose() private void CompletePipe()
{ {
lock (_sync) lock (_sync)
{ {
@ -667,7 +701,7 @@ namespace System.IO.Pipelines
var returnSegment = segment; var returnSegment = segment;
segment = segment.Next; segment = segment.Next;
returnSegment.Dispose(); returnSegment.ResetMemory();
} }
_readHead = null; _readHead = null;

Просмотреть файл

@ -33,7 +33,7 @@ namespace System.IO.Pipelines
{ {
var returnSegment = returnStart; var returnSegment = returnStart;
returnStart = returnStart?.Next; returnStart = returnStart?.Next;
returnSegment?.Dispose(); returnSegment?.ResetMemory();
if (returnSegment == returnEnd) if (returnSegment == returnEnd)
{ {

Просмотреть файл

@ -260,9 +260,9 @@ namespace System.IO.Pipelines
private static ReadableBuffer CreateInternal(OwnedMemory<byte> data, int offset, int length) private static ReadableBuffer CreateInternal(OwnedMemory<byte> data, int offset, int length)
{ {
var segment = new BufferSegment(data); var segment = new BufferSegment();
segment.Start = offset; segment.SetMemory(data, offset, offset + length);
segment.End = offset + length;
return new ReadableBuffer(new ReadCursor(segment, offset), new ReadCursor(segment, offset + length)); return new ReadableBuffer(new ReadCursor(segment, offset), new ReadCursor(segment, offset + length));
} }

Просмотреть файл

@ -77,6 +77,29 @@ namespace System.IO.Pipelines.Performance.Tests
} }
} }
[Benchmark(OperationsPerInvoke = InnerLoopCount)]
public void LongWriteInline()
{
for (int i = 0; i < InnerLoopCount; i++)
{
for (int j = 0; j < 15; j++)
{
var writableBuffer = _pipe.Writer.Alloc(WriteLength);
writableBuffer.Advance(WriteLength);
writableBuffer.Commit();
if (j == 14)
{
writableBuffer.FlushAsync().GetResult();
}
}
var result = _pipe.Reader.ReadAsync().GetResult();
var buffer = result.Buffer;
_pipe.Reader.Advance(buffer.End, buffer.End);
}
}
[Benchmark(OperationsPerInvoke = InnerLoopCount)] [Benchmark(OperationsPerInvoke = InnerLoopCount)]
public void WritePlaintextResponse() public void WritePlaintextResponse()
{ {

Просмотреть файл

@ -26,7 +26,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.10.5" /> <PackageReference Include="BenchmarkDotNet" Version="0.10.9" />
</ItemGroup> </ItemGroup>
</Project> </Project>

Просмотреть файл

@ -3,9 +3,11 @@
using BenchmarkDotNet.Columns; using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs; using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Engines; using BenchmarkDotNet.Engines;
using BenchmarkDotNet.Jobs; using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Validators; using BenchmarkDotNet.Validators;
using BenchmarkDotNet.Toolchains.CsProj;
namespace System.IO.Pipelines.Performance.Tests namespace System.IO.Pipelines.Performance.Tests
{ {
@ -15,9 +17,11 @@ namespace System.IO.Pipelines.Performance.Tests
{ {
Add(JitOptimizationsValidator.FailOnError); Add(JitOptimizationsValidator.FailOnError);
Add(StatisticColumn.OperationsPerSecond); Add(StatisticColumn.OperationsPerSecond);
Add(MemoryDiagnoser.Default);
Add(Job.Default Add(Job.Default
.With(BenchmarkDotNet.Environments.Runtime.Core) .With(BenchmarkDotNet.Environments.Runtime.Core)
.With(CsProjCoreToolchain.NetCoreApp20)
.WithRemoveOutliers(false) .WithRemoveOutliers(false)
.With(new GcMode() { Server = true }) .With(new GcMode() { Server = true })
.With(RunStrategy.Throughput) .With(RunStrategy.Throughput)