diff --git a/src/System.IO.Pipelines.Testing/BufferUtilities.cs b/src/System.IO.Pipelines.Testing/BufferUtilities.cs index 638462899..dadc12070 100644 --- a/src/System.IO.Pipelines.Testing/BufferUtilities.cs +++ b/src/System.IO.Pipelines.Testing/BufferUtilities.cs @@ -35,7 +35,8 @@ namespace System.IO.Pipelines.Testing // Create a segment that has offset relative to the OwnedMemory and OwnedMemory itself has offset relative to array var ownedBuffer = new OwnedArray(chars); - var current = new BufferSegment(ownedBuffer, length, length * 2); + var current = new BufferSegment(); + current.SetMemory(ownedBuffer, length, length * 2); if (first == null) { first = current; diff --git a/src/System.IO.Pipelines/System/IO/Pipelines/BufferSegment.cs b/src/System.IO.Pipelines/System/IO/Pipelines/BufferSegment.cs index b0145e2a2..ecad38329 100644 --- a/src/System.IO.Pipelines/System/IO/Pipelines/BufferSegment.cs +++ b/src/System.IO.Pipelines/System/IO/Pipelines/BufferSegment.cs @@ -7,8 +7,7 @@ using System.Text; namespace System.IO.Pipelines { - // TODO: Pool segments - internal class BufferSegment : IDisposable + internal class BufferSegment { /// /// The Start represents the offset into Array where the range of "active" bytes begins. At the point when the block is leased @@ -44,21 +43,27 @@ namespace System.IO.Pipelines private Memory _buffer; - public BufferSegment(OwnedMemory buffer) + public void SetMemory(OwnedMemory buffer) { - _owned = buffer; - Start = 0; - End = 0; - - _owned.Retain(); - _buffer = _owned.Memory; + SetMemory(buffer, 0, 0); } - public BufferSegment(OwnedMemory buffer, int start, int end): this(buffer) + public void SetMemory(OwnedMemory buffer, int start, int end, bool readOnly = false) { + _owned = buffer; + _owned.Retain(); + _buffer = _owned.Memory; + + RunningLength = 0; Start = start; End = end; - ReadOnly = true; + Next = null; + } + + public void ResetMemory() + { + _owned.Release(); + _owned = null; } public Memory Buffer => _buffer; @@ -79,17 +84,17 @@ namespace System.IO.Pipelines /// public int WritableBytes => _buffer.Length - End; - public void Dispose() - { - _owned.Release(); - } - /// /// ToString overridden for debugger convenience. This displays the "active" byte information in this block as ASCII characters. /// /// public override string ToString() { + if (_owned == null) + { + return ""; + } + var builder = new StringBuilder(); var data = _owned.Memory.Slice(Start, ReadableBytes).Span; @@ -107,24 +112,29 @@ namespace System.IO.Pipelines if (beginOrig == endOrig) { - lastSegment = new BufferSegment(beginOrig._owned, beginBuffer.Index, endBuffer.Index); + lastSegment = new BufferSegment(); + lastSegment.SetMemory(beginOrig._owned, beginBuffer.Index, endBuffer.Index); return lastSegment; } - var beginClone = new BufferSegment(beginOrig._owned, beginBuffer.Index, beginOrig.End); + var beginClone = new BufferSegment(); + beginClone.SetMemory(beginOrig._owned, beginBuffer.Index, beginOrig.End); var endClone = beginClone; beginOrig = beginOrig.Next; while (beginOrig != endOrig) { - endClone.SetNext(new BufferSegment(beginOrig._owned, beginOrig.Start, beginOrig.End)); + var next = new BufferSegment(); + next.SetMemory(beginOrig._owned, beginOrig.Start, beginOrig.End); + endClone.SetNext(next); endClone = endClone.Next; beginOrig = beginOrig.Next; } - lastSegment = new BufferSegment(endOrig._owned, endOrig.Start, endBuffer.Index); + lastSegment = new BufferSegment(); + lastSegment.SetMemory(endOrig._owned, endOrig.Start, endBuffer.Index); endClone.SetNext(lastSegment); return beginClone; @@ -138,6 +148,7 @@ namespace System.IO.Pipelines Next = segment; segment = this; + while (segment.Next != null) { segment.Next.RunningLength = segment.RunningLength + segment.ReadableBytes; diff --git a/src/System.IO.Pipelines/System/IO/Pipelines/Pipe.cs b/src/System.IO.Pipelines/System/IO/Pipelines/Pipe.cs index a92751322..ed9ddede8 100644 --- a/src/System.IO.Pipelines/System/IO/Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/System/IO/Pipelines/Pipe.cs @@ -14,6 +14,8 @@ namespace System.IO.Pipelines /// public class Pipe : IPipe, IPipeReader, IPipeWriter, IReadableBufferAwaiter, IWritableBufferAwaiter { + private const int SegmentPoolSize = 16; + private static readonly Action _signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested(); private static readonly Action _signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested(); private static readonly Action _invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state).Execute(); @@ -35,12 +37,15 @@ namespace System.IO.Pipelines private long _length; private long _currentWriteLength; + private int _pooledSegmentCount; + private PipeAwaitable _readerAwaitable; private PipeAwaitable _writerAwaitable; private PipeCompletion _writerCompletion; private PipeCompletion _readerCompletion; + private BufferSegment[] _bufferSegmentPool; // The read head which is the extent of the IPipelineReader's consumed bytes private BufferSegment _readHead; @@ -85,6 +90,8 @@ namespace System.IO.Pipelines throw new ArgumentException(nameof(options.MaximumSizeHigh) + " should be greater or equal to " + nameof(options.MaximumSizeLow), nameof(options.MaximumSizeHigh)); } + _bufferSegmentPool = new BufferSegment[SegmentPoolSize]; + _pool = options.BufferPool; _maximumSizeHigh = options.MaximumSizeHigh; _maximumSizeLow = options.MaximumSizeLow; @@ -166,8 +173,13 @@ namespace System.IO.Pipelines // If inadequate bytes left or if the segment is readonly if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < count || segment.ReadOnly) { - var nextBuffer = _pool.Rent(count); - var nextSegment = new BufferSegment(nextBuffer); + BufferSegment nextSegment; + lock (_sync) + { + nextSegment = CreateSegmentUnsynchronized(); + } + + nextSegment.SetMemory(_pool.Rent(count)); segment.SetNext(nextSegment); @@ -194,7 +206,8 @@ namespace System.IO.Pipelines if (segment == null) { // No free tail space, allocate a new segment - segment = new BufferSegment(_pool.Rent(count)); + segment = CreateSegmentUnsynchronized(); + segment.SetMemory(_pool.Rent(count)); } if (_commitHead == null) @@ -215,6 +228,26 @@ namespace System.IO.Pipelines return segment; } + private BufferSegment CreateSegmentUnsynchronized() + { + if (_pooledSegmentCount > 0) + { + _pooledSegmentCount--; + return _bufferSegmentPool[_pooledSegmentCount]; + } + + return new BufferSegment(); + } + + private void ReturnSegmentUnsynchronized(BufferSegment segment) + { + if (_pooledSegmentCount < _bufferSegmentPool.Length) + { + _bufferSegmentPool[_pooledSegmentCount] = segment; + _pooledSegmentCount++; + } + } + internal void Append(ReadableBuffer buffer) { if (buffer.IsEmpty) @@ -409,7 +442,7 @@ namespace System.IO.Pipelines if (readerCompleted) { - Dispose(); + CompletePipe(); } } @@ -485,12 +518,13 @@ namespace System.IO.Pipelines } _readingState.End(ExceptionResource.NoReadToComplete); - } - while (returnStart != null && returnStart != returnEnd) - { - returnStart.Dispose(); - returnStart = returnStart.Next; + while (returnStart != null && returnStart != returnEnd) + { + returnStart.ResetMemory(); + ReturnSegmentUnsynchronized(returnStart); + returnStart = returnStart.Next; + } } TrySchedule(_writerScheduler, continuation); @@ -527,7 +561,7 @@ namespace System.IO.Pipelines if (writerCompleted) { - Dispose(); + CompletePipe(); } } @@ -650,7 +684,7 @@ namespace System.IO.Pipelines } } - private void Dispose() + private void CompletePipe() { lock (_sync) { @@ -667,7 +701,7 @@ namespace System.IO.Pipelines var returnSegment = segment; segment = segment.Next; - returnSegment.Dispose(); + returnSegment.ResetMemory(); } _readHead = null; diff --git a/src/System.IO.Pipelines/System/IO/Pipelines/PreservedBuffer.cs b/src/System.IO.Pipelines/System/IO/Pipelines/PreservedBuffer.cs index 1e37d6700..477763196 100644 --- a/src/System.IO.Pipelines/System/IO/Pipelines/PreservedBuffer.cs +++ b/src/System.IO.Pipelines/System/IO/Pipelines/PreservedBuffer.cs @@ -33,7 +33,7 @@ namespace System.IO.Pipelines { var returnSegment = returnStart; returnStart = returnStart?.Next; - returnSegment?.Dispose(); + returnSegment?.ResetMemory(); if (returnSegment == returnEnd) { diff --git a/src/System.IO.Pipelines/System/IO/Pipelines/ReadableBuffer.cs b/src/System.IO.Pipelines/System/IO/Pipelines/ReadableBuffer.cs index 24a0749fd..ba722a240 100644 --- a/src/System.IO.Pipelines/System/IO/Pipelines/ReadableBuffer.cs +++ b/src/System.IO.Pipelines/System/IO/Pipelines/ReadableBuffer.cs @@ -260,9 +260,9 @@ namespace System.IO.Pipelines private static ReadableBuffer CreateInternal(OwnedMemory data, int offset, int length) { - var segment = new BufferSegment(data); - segment.Start = offset; - segment.End = offset + length; + var segment = new BufferSegment(); + segment.SetMemory(data, offset, offset + length); + return new ReadableBuffer(new ReadCursor(segment, offset), new ReadCursor(segment, offset + length)); } diff --git a/tests/System.IO.Pipelines.Performance.Tests/PipeThroughput.cs b/tests/System.IO.Pipelines.Performance.Tests/PipeThroughput.cs index e9e05bbfa..b3c12e262 100644 --- a/tests/System.IO.Pipelines.Performance.Tests/PipeThroughput.cs +++ b/tests/System.IO.Pipelines.Performance.Tests/PipeThroughput.cs @@ -77,6 +77,29 @@ namespace System.IO.Pipelines.Performance.Tests } } + + [Benchmark(OperationsPerInvoke = InnerLoopCount)] + public void LongWriteInline() + { + for (int i = 0; i < InnerLoopCount; i++) + { + for (int j = 0; j < 15; j++) + { + var writableBuffer = _pipe.Writer.Alloc(WriteLength); + writableBuffer.Advance(WriteLength); + writableBuffer.Commit(); + if (j == 14) + { + writableBuffer.FlushAsync().GetResult(); + } + } + + var result = _pipe.Reader.ReadAsync().GetResult(); + var buffer = result.Buffer; + _pipe.Reader.Advance(buffer.End, buffer.End); + } + } + [Benchmark(OperationsPerInvoke = InnerLoopCount)] public void WritePlaintextResponse() { diff --git a/tests/System.IO.Pipelines.Performance.Tests/System.IO.Pipelines.Performance.Tests.csproj b/tests/System.IO.Pipelines.Performance.Tests/System.IO.Pipelines.Performance.Tests.csproj index 10000d11d..b98b69fbb 100644 --- a/tests/System.IO.Pipelines.Performance.Tests/System.IO.Pipelines.Performance.Tests.csproj +++ b/tests/System.IO.Pipelines.Performance.Tests/System.IO.Pipelines.Performance.Tests.csproj @@ -26,7 +26,7 @@ - + diff --git a/tests/System.IO.Pipelines.Performance.Tests/configs/CoreConfig.cs b/tests/System.IO.Pipelines.Performance.Tests/configs/CoreConfig.cs index 506584cc4..dfc5f697a 100644 --- a/tests/System.IO.Pipelines.Performance.Tests/configs/CoreConfig.cs +++ b/tests/System.IO.Pipelines.Performance.Tests/configs/CoreConfig.cs @@ -3,9 +3,11 @@ using BenchmarkDotNet.Columns; using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Engines; using BenchmarkDotNet.Jobs; using BenchmarkDotNet.Validators; +using BenchmarkDotNet.Toolchains.CsProj; namespace System.IO.Pipelines.Performance.Tests { @@ -15,9 +17,11 @@ namespace System.IO.Pipelines.Performance.Tests { Add(JitOptimizationsValidator.FailOnError); Add(StatisticColumn.OperationsPerSecond); + Add(MemoryDiagnoser.Default); Add(Job.Default .With(BenchmarkDotNet.Environments.Runtime.Core) + .With(CsProjCoreToolchain.NetCoreApp20) .WithRemoveOutliers(false) .With(new GcMode() { Server = true }) .With(RunStrategy.Throughput)