* wip

* fixing async stream->buffer
This commit is contained in:
Max Gortman 2019-10-21 12:24:25 -07:00 коммит произвёл GitHub
Родитель a9d0723ba5
Коммит abda435273
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 58 добавлений и 37 удалений

Просмотреть файл

@ -6,13 +6,10 @@ namespace DotNetty.Buffers
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using DotNetty.Common;
using DotNetty.Common.Utilities;
abstract class PooledByteBuffer<T> : AbstractReferenceCountedByteBuffer
{
readonly ThreadLocalPool.Handle recyclerHandle;
protected internal PoolChunk<T> Chunk;
protected internal long Handle;
protected internal T Memory;
@ -22,10 +19,9 @@ namespace DotNetty.Buffers
internal PoolThreadCache<T> Cache;
PooledByteBufferAllocator allocator;
protected PooledByteBuffer(ThreadLocalPool.Handle recyclerHandle, int maxCapacity)
protected PooledByteBuffer(int maxCapacity)
: base(maxCapacity)
{
this.recyclerHandle = recyclerHandle;
}
internal virtual void Init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache<T> cache) =>
@ -51,14 +47,6 @@ namespace DotNetty.Buffers
/**
* Method must be called before reuse this {@link PooledByteBufAllocator}
*/
internal void Reuse(int maxCapacity)
{
this.SetMaxCapacity(maxCapacity);
this.SetReferenceCount(1);
this.SetIndex0(0, 0);
this.DiscardMarks();
}
public override int Capacity
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@ -143,12 +131,9 @@ namespace DotNetty.Buffers
this.Memory = default(T);
this.Chunk.Arena.Free(this.Chunk, handle, this.MaxLength, this.Cache);
this.Chunk = null;
this.Recycle();
}
}
void Recycle() => this.recyclerHandle.Release(this);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected int Idx(int index) => this.Offset + index;
}

Просмотреть файл

@ -7,22 +7,17 @@ namespace DotNetty.Buffers
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common;
using DotNetty.Common.Internal;
sealed class PooledHeapByteBuffer : PooledByteBuffer<byte[]>
{
static readonly ThreadLocalPool<PooledHeapByteBuffer> Recycler = new ThreadLocalPool<PooledHeapByteBuffer>(handle => new PooledHeapByteBuffer(handle, 0));
internal static PooledHeapByteBuffer NewInstance(int maxCapacity)
{
PooledHeapByteBuffer buf = Recycler.Take();
buf.Reuse(maxCapacity);
return buf;
return new PooledHeapByteBuffer(maxCapacity);
}
internal PooledHeapByteBuffer(ThreadLocalPool.Handle recyclerHandle, int maxCapacity)
: base(recyclerHandle, maxCapacity)
internal PooledHeapByteBuffer(int maxCapacity)
: base(maxCapacity)
{
}
@ -108,6 +103,11 @@ namespace DotNetty.Buffers
public override async Task<int> SetBytesAsync(int index, Stream src, int length, CancellationToken cancellationToken)
{
if (length == 0)
{
return 0;
}
int readTotal = 0;
int read;
int offset = this.ArrayOffset + index;

Просмотреть файл

@ -8,23 +8,18 @@ namespace DotNetty.Buffers
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common;
sealed unsafe class PooledUnsafeDirectByteBuffer : PooledByteBuffer<byte[]>
{
static readonly ThreadLocalPool<PooledUnsafeDirectByteBuffer> Recycler = new ThreadLocalPool<PooledUnsafeDirectByteBuffer>(handle => new PooledUnsafeDirectByteBuffer(handle, 0));
byte* memoryAddress;
internal static PooledUnsafeDirectByteBuffer NewInstance(int maxCapacity)
{
PooledUnsafeDirectByteBuffer buf = Recycler.Take();
buf.Reuse(maxCapacity);
return buf;
return new PooledUnsafeDirectByteBuffer(maxCapacity);
}
PooledUnsafeDirectByteBuffer(ThreadLocalPool.Handle recyclerHandle, int maxCapacity)
: base(recyclerHandle, maxCapacity)
PooledUnsafeDirectByteBuffer(int maxCapacity)
: base(maxCapacity)
{
}
@ -48,6 +43,14 @@ namespace DotNetty.Buffers
public override bool IsDirect => true;
internal void Reuse(int maxCapacity)
{
this.SetMaxCapacity(maxCapacity);
this.SetReferenceCount(1);
this.SetIndex0(0, 0);
this.DiscardMarks();
}
protected internal override byte _GetByte(int index) => *(this.memoryAddress + index);
protected internal override short _GetShort(int index) => UnsafeByteBufferUtil.GetShort(this.Addr(index));
@ -121,8 +124,7 @@ namespace DotNetty.Buffers
public override Task<int> SetBytesAsync(int index, Stream src, int length, CancellationToken cancellationToken)
{
this.CheckIndex(index, length);
int read = UnsafeByteBufferUtil.SetBytes(this, this.Addr(index), index, src, length);
return Task.FromResult(read);
return UnsafeByteBufferUtil.SetBytesAsync(this, this.Addr(index), index, src, length, cancellationToken);
}
public override IByteBuffer Copy(int index, int length)

Просмотреть файл

@ -8,9 +8,11 @@ namespace DotNetty.Buffers
using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal;
using DotNetty.Common.Utilities;
public unsafe class UnpooledUnsafeDirectByteBuffer : AbstractReferenceCountedByteBuffer
{
@ -294,10 +296,8 @@ namespace DotNetty.Buffers
public override Task<int> SetBytesAsync(int index, Stream src, int length, CancellationToken cancellationToken)
{
this.CheckIndex(index, length);
int read;
fixed (byte* addr = &this.Addr(index))
read = UnsafeByteBufferUtil.SetBytes(this, addr, index, src, length);
return Task.FromResult(read);
return UnsafeByteBufferUtil.SetBytesAsync(this, addr, index, src, length, cancellationToken);
}
public override int IoBufferCount => 1;

Просмотреть файл

@ -8,7 +8,10 @@ namespace DotNetty.Buffers
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal;
using DotNetty.Common.Utilities;
static unsafe class UnsafeByteBufferUtil
{
@ -214,6 +217,11 @@ namespace DotNetty.Buffers
internal static int SetBytes(AbstractByteBuffer buf, byte* addr, int index, Stream input, int length)
{
if (length == 0)
{
return 0;
}
IByteBuffer tmpBuf = buf.Allocator.HeapBuffer(length);
try
{
@ -233,6 +241,32 @@ namespace DotNetty.Buffers
}
}
internal static Task<int> SetBytesAsync(AbstractByteBuffer buf, byte* addr, int index, Stream input, int length, CancellationToken cancellationToken)
{
if (length == 0)
{
return TaskEx.Zero;
}
IByteBuffer tmpBuf = buf.Allocator.HeapBuffer(length);
return tmpBuf.SetBytesAsync(0, input, length, cancellationToken)
.ContinueWith(t => {
try
{
var read = t.Result;
if (read > 0)
{
PlatformDependent.CopyMemory(tmpBuf.Array, tmpBuf.ArrayOffset, addr, read);
}
return read;
}
finally
{
tmpBuf.Release();
}
});
}
internal static void GetBytes(AbstractByteBuffer buf, byte* addr, int index, IByteBuffer dst, int dstIndex, int length)
{
Contract.Requires(dst != null);