Porting adaptive read buffer prediction

Motivation:
Provides good flexible default while lowering memory consumption on average.

Modifications:
- Synced IRecvByteBufAllocator and relevant parts up to date
- Ported AdaptiveRecvByteBufAllocator
- Extra: properly porting ChannelMetadata

Result:
Process of receiving bytes from network is more configurable and allows for both more conservative and more agressive ingestion.
This commit is contained in:
Max Gortman 2016-05-24 18:11:36 -07:00
Родитель 29315699f1
Коммит 0131fa9fe2
23 изменённых файлов: 519 добавлений и 205 удалений

Просмотреть файл

@ -636,7 +636,7 @@ namespace DotNetty.Buffers
for (int low = 0, high = this.components.Count; low <= high;)
{
int mid = (int)(((uint)low + (uint)high) >> 1);
int mid = (low + high).RightUShift(1);
ComponentEntry c = this.components[mid];
if (offset >= c.EndOffset)
{
@ -1025,7 +1025,7 @@ namespace DotNetty.Buffers
for (int low = 0, high = this.components.Count; low <= high;)
{
int mid = (int)(((uint)low + (uint)high) >> 1);
int mid = (low + high).RightUShift(1);
ComponentEntry c = this.components[mid];
if (offset >= c.EndOffset)
{

Просмотреть файл

@ -138,8 +138,7 @@ namespace DotNetty.Buffers
{
int q = bitmapIdx.RightUShift(6);
int r = bitmapIdx & 63;
Contract.Assert((this.bitmap[q].RightUShift(r) & 1) != 0)
;
Contract.Assert((this.bitmap[q].RightUShift(r) & 1) != 0);
this.bitmap[q] ^= 1L << r;
this.SetNextAvail(bitmapIdx);

Просмотреть файл

@ -37,14 +37,14 @@ namespace DotNetty.Common
}
ConditionalWeakTable<Stack, WeakOrderQueue> queueDictionary = DelayedPool.Value;
WeakOrderQueue queue;
if (!queueDictionary.TryGetValue(stack, out queue))
WeakOrderQueue delayedRecycled;
if (!queueDictionary.TryGetValue(stack, out delayedRecycled))
{
var newQueue = new WeakOrderQueue(stack, thread);
queue = newQueue;
queueDictionary.Add(stack, queue);
delayedRecycled = newQueue;
queueDictionary.Add(stack, delayedRecycled);
}
queue.Add(this);
delayedRecycled.Add(this);
}
}
@ -431,17 +431,6 @@ namespace DotNetty.Common
return handle;
}
public bool Release(T o, Handle handle)
{
if (handle.Stack.Parent != this)
{
return false;
}
handle.Release(o);
return true;
}
internal int ThreadLocalCapacity => this.threadLocal.Value.elements.Length;
internal int ThreadLocalSize => this.threadLocal.Value.Size;

Просмотреть файл

@ -408,10 +408,10 @@ namespace DotNetty.Transport.Bootstrapping
protected sealed class ChannelOptionValue<T> : ChannelOptionValue
{
readonly ChannelOption option;
readonly ChannelOption<T> option;
readonly T value;
public ChannelOptionValue(ChannelOption option, T value)
public ChannelOptionValue(ChannelOption<T> option, T value)
{
this.option = option;
this.value = value;

Просмотреть файл

@ -99,7 +99,7 @@ namespace DotNetty.Transport.Channels
public abstract bool Active { get; }
public abstract bool DisconnectSupported { get; }
public abstract ChannelMetadata Metadata { get; }
public EndPoint LocalAddress
{

Просмотреть файл

@ -319,7 +319,7 @@ namespace DotNetty.Transport.Channels
public Task DisconnectAsync()
{
if (!this.Channel.DisconnectSupported)
if (!this.Channel.Metadata.HasDisconnect)
{
return this.CloseAsync();
}

Просмотреть файл

@ -0,0 +1,178 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Transport.Channels
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using DotNetty.Common.Utilities;
/// <summary>
/// The <see cref="IRecvByteBufAllocator" /> that automatically increases and
/// decreases the predicted buffer size on feed back.
/// <p />
/// It gradually increases the expected number of readable bytes if the previous
/// read fully filled the allocated buffer. It gradually decreases the expected
/// number of readable bytes if the read operation was not able to fill a certain
/// amount of the allocated buffer two times consecutively. Otherwise, it keeps
/// returning the same prediction.
/// </summary>
public class AdaptiveRecvByteBufAllocator : DefaultMaxMessagesRecvByteBufAllocator
{
const int DefaultMinimum = 64;
const int DefaultInitial = 1024;
const int DefaultMaximum = 65536;
const int IndexIncrement = 4;
const int IndexDecrement = 1;
static readonly int[] SizeTable;
static AdaptiveRecvByteBufAllocator()
{
var sizeTable = new List<int>();
for (int i = 16; i < 512; i += 16)
{
sizeTable.Add(i);
}
for (int i = 512; i > 0; i <<= 1)
{
sizeTable.Add(i);
}
SizeTable = sizeTable.ToArray();
}
static int GetSizeTableIndex(int size)
{
for (int low = 0, high = SizeTable.Length - 1;;)
{
if (high < low)
{
return low;
}
if (high == low)
{
return high;
}
int mid = (low + high).RightUShift(1);
int a = SizeTable[mid];
int b = SizeTable[mid + 1];
if (size > b)
{
low = mid + 1;
}
else if (size < a)
{
high = mid - 1;
}
else if (size == a)
{
return mid;
}
else
{
return mid + 1;
}
}
}
sealed class HandleImpl : MaxMessageHandle<AdaptiveRecvByteBufAllocator>
{
readonly int minIndex;
readonly int maxIndex;
int index;
int nextReceiveBufferSize;
bool decreaseNow;
public HandleImpl(AdaptiveRecvByteBufAllocator owner, int minIndex, int maxIndex, int initial)
: base(owner)
{
this.minIndex = minIndex;
this.maxIndex = maxIndex;
this.index = GetSizeTableIndex(initial);
this.nextReceiveBufferSize = SizeTable[this.index];
}
public override int Guess() => this.nextReceiveBufferSize;
void Record(int actualReadBytes)
{
if (actualReadBytes <= SizeTable[Math.Max(0, this.index - IndexDecrement - 1)])
{
if (this.decreaseNow)
{
this.index = Math.Max(this.index - IndexDecrement, this.minIndex);
this.nextReceiveBufferSize = SizeTable[this.index];
this.decreaseNow = false;
}
else
{
this.decreaseNow = true;
}
}
else if (actualReadBytes >= this.nextReceiveBufferSize)
{
this.index = Math.Min(this.index + IndexIncrement, this.maxIndex);
this.nextReceiveBufferSize = SizeTable[this.index];
this.decreaseNow = false;
}
}
public override void ReadComplete() => this.Record(this.TotalBytesRead());
}
readonly int minIndex;
readonly int maxIndex;
readonly int initial;
/// <summary>
/// Creates a new predictor with the default parameters. With the default
/// parameters, the expected buffer size starts from <c>1024</c>, does not
/// go down below <c>64</c>, and does not go up above <c>65536</c>.
/// </summary>
public AdaptiveRecvByteBufAllocator()
: this(DefaultMinimum, DefaultInitial, DefaultMaximum)
{
}
/// <summary>Creates a new predictor with the specified parameters.</summary>
/// <param name="minimum">the inclusive lower bound of the expected buffer size</param>
/// <param name="initial">the initial buffer size when no feed back was received</param>
/// <param name="maximum">the inclusive upper bound of the expected buffer size</param>
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum)
{
Contract.Requires(minimum > 0);
Contract.Requires(initial >= minimum);
Contract.Requires(maximum >= initial);
int min = GetSizeTableIndex(minimum);
if (SizeTable[min] < minimum)
{
this.minIndex = min + 1;
}
else
{
this.minIndex = min;
}
int max = GetSizeTableIndex(maximum);
if (SizeTable[max] > maximum)
{
this.maxIndex = max - 1;
}
else
{
this.maxIndex = max;
}
this.initial = initial;
}
public override IRecvByteBufAllocatorHandle NewHandle() => new HandleImpl(this, this.minIndex, this.maxIndex, this.initial);
}
}

Просмотреть файл

@ -0,0 +1,52 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Transport.Channels
{
using System.Diagnostics.Contracts;
/// <summary>Represents the properties of a <see cref="IChannel" /> implementation.</summary>
public sealed class ChannelMetadata
{
/// <summary>Create a new instance</summary>
/// <param name="hasDisconnect">
/// <c>true</c> if and only if the channel has the <c>DisconnectAsync()</c> operation
/// that allows a user to disconnect and then call <see cref="IChannel.ConnectAsync(System.Net.EndPoint)" />
/// again, such as UDP/IP.
/// </param>
public ChannelMetadata(bool hasDisconnect)
: this(hasDisconnect, 1)
{
}
/// <summary>Create a new instance</summary>
/// <param name="hasDisconnect">
/// <c>true</c> if and only if the channel has the <c>DisconnectAsync</c> operation
/// that allows a user to disconnect and then call <see cref="IChannel.ConnectAsync(System.Net.EndPoint)" />
/// again, such as UDP/IP.
/// </param>
/// <param name="defaultMaxMessagesPerRead">
/// If a <see cref="IMaxMessagesRecvByteBufAllocator" /> is in use, then this value will be
/// set for <see cref="IMaxMessagesRecvByteBufAllocator.MaxMessagesPerRead" />. Must be <c> &gt; 0</c>.
/// </param>
public ChannelMetadata(bool hasDisconnect, int defaultMaxMessagesPerRead)
{
Contract.Requires(defaultMaxMessagesPerRead > 0);
this.HasDisconnect = hasDisconnect;
this.DefaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
}
/// <summary>
/// Returns <c>true</c> if and only if the channel has the <c>DisconnectAsync()</c> operation
/// that allows a user to disconnect and then call <see cref="IChannel.ConnectAsync(System.Net.EndPoint)" /> again,
/// such as UDP/IP.
/// </summary>
public bool HasDisconnect { get; }
/// <summary>
/// If a <see cref="IMaxMessagesRecvByteBufAllocator" /> is in use, then this is the default value for
/// <see cref="IMaxMessagesRecvByteBufAllocator.MaxMessagesPerRead" />.
/// </summary>
public int DefaultMaxMessagesPerRead { get; }
}
}

Просмотреть файл

@ -36,7 +36,6 @@ namespace DotNetty.Transport.Channels
public static readonly ChannelOption<IMessageSizeEstimator> MessageSizeEstimator = ValueOf<IMessageSizeEstimator>("MESSAGE_SIZE_ESTIMATOR");
public static readonly ChannelOption<TimeSpan> ConnectTimeout = ValueOf<TimeSpan>("CONNECT_TIMEOUT");
public static readonly ChannelOption<int> MaxMessagesPerRead = ValueOf<int>("MAX_MESSAGES_PER_READ");
public static readonly ChannelOption<int> WriteSpinCount = ValueOf<int>("WRITE_SPIN_COUNT");
public static readonly ChannelOption<int> WriteBufferHighWaterMark = ValueOf<int>("WRITE_BUFFER_HIGH_WATER_MARK");
public static readonly ChannelOption<int> WriteBufferLowWaterMark = ValueOf<int>("WRITE_BUFFER_LOW_WATER_MARK");

Просмотреть файл

@ -23,7 +23,6 @@ namespace DotNetty.Transport.Channels
volatile int autoRead = 1;
volatile int writeSpinCount = 16;
volatile int maxMessagesPerRead;
volatile int writeBufferHighWaterMark = 64 * 1024;
volatile int writeBufferLowWaterMark = 32 * 1024;
long connectTimeout = DefaultConnectTimeout.Ticks;
@ -31,22 +30,25 @@ namespace DotNetty.Transport.Channels
protected readonly IChannel Channel;
public DefaultChannelConfiguration(IChannel channel)
: this(channel, new AdaptiveRecvByteBufAllocator())
{
}
public DefaultChannelConfiguration(IChannel channel, IRecvByteBufAllocator allocator)
{
Contract.Requires(channel != null);
this.Channel = channel;
if (channel is IServerChannel || channel is AbstractSocketByteChannel)
this.Channel = channel;
var maxMessagesAllocator = allocator as IMaxMessagesRecvByteBufAllocator;
if (maxMessagesAllocator != null)
{
// Server channels: Accept as many incoming connections as possible.
// NIO byte channels: Implemented to reduce unnecessary system calls even if it's > 1.
// See https://github.com/netty/netty/issues/2079
// TODO: Add some property to ChannelMetadata so we can remove the ugly instanceof
this.maxMessagesPerRead = 16;
maxMessagesAllocator.MaxMessagesPerRead = channel.Metadata.DefaultMaxMessagesPerRead;
}
else
else if (allocator == null)
{
this.maxMessagesPerRead = 1;
throw new ArgumentNullException(nameof(allocator));
}
this.RecvByteBufAllocator = allocator;
}
public virtual T GetOption<T>(ChannelOption<T> option)
@ -57,10 +59,6 @@ namespace DotNetty.Transport.Channels
{
return (T)(object)this.ConnectTimeout; // no boxing will happen, compiler optimizes away such casts
}
if (ChannelOption.MaxMessagesPerRead.Equals(option))
{
return (T)(object)this.MaxMessagesPerRead;
}
if (ChannelOption.WriteSpinCount.Equals(option))
{
return (T)(object)this.WriteSpinCount;
@ -102,10 +100,6 @@ namespace DotNetty.Transport.Channels
{
this.ConnectTimeout = (TimeSpan)(object)value;
}
else if (ChannelOption.MaxMessagesPerRead.Equals(option))
{
this.MaxMessagesPerRead = (int)(object)value;
}
else if (ChannelOption.WriteSpinCount.Equals(option))
{
this.WriteSpinCount = (int)(object)value;
@ -211,16 +205,6 @@ namespace DotNetty.Transport.Channels
{
}
public int MaxMessagesPerRead
{
get { return this.maxMessagesPerRead; }
set
{
Contract.Requires(value >= 1);
this.maxMessagesPerRead = value;
}
}
public int WriteBufferHighWaterMark
{
get { return this.writeBufferHighWaterMark; }

Просмотреть файл

@ -0,0 +1,103 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Transport.Channels
{
using System.Diagnostics.Contracts;
using DotNetty.Buffers;
/// <summary>
/// Default implementation of <see cref="IMaxMessagesRecvByteBufAllocator" /> which respects
/// <see cref="IChannelConfiguration.AutoRead" />
/// and also prevents overflow.
/// </summary>
public abstract class DefaultMaxMessagesRecvByteBufAllocator : IMaxMessagesRecvByteBufAllocator
{
volatile int maxMessagesPerRead;
protected DefaultMaxMessagesRecvByteBufAllocator()
: this(1)
{
}
protected DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead)
{
this.MaxMessagesPerRead = maxMessagesPerRead;
}
public int MaxMessagesPerRead
{
get { return this.maxMessagesPerRead; }
set
{
Contract.Requires(value > 0);
this.maxMessagesPerRead = value;
}
}
public abstract IRecvByteBufAllocatorHandle NewHandle();
/// <summary>Focuses on enforcing the maximum messages per read condition for <see cref="ContinueReading" />.</summary>
protected abstract class MaxMessageHandle<T> : IRecvByteBufAllocatorHandle
where T : IMaxMessagesRecvByteBufAllocator
{
protected readonly T Owner;
IChannelConfiguration config;
int maxMessagePerRead;
int totalMessages;
int totalBytesRead;
int lastBytesRead;
protected MaxMessageHandle(T owner)
{
this.Owner = owner;
}
public abstract int Guess();
/// <summary>Only <see cref="IChannelConfiguration.MaxMessagesPerRead" /> is used.</summary>
public void Reset(IChannelConfiguration config)
{
this.config = config;
this.maxMessagePerRead = this.Owner.MaxMessagesPerRead;
this.totalMessages = this.totalBytesRead = 0;
}
public IByteBuffer Allocate(IByteBufferAllocator alloc) => alloc.Buffer(this.Guess());
public void IncMessagesRead(int amt) => this.totalMessages += amt;
public int LastBytesRead
{
get { return this.lastBytesRead; }
set
{
this.lastBytesRead = value;
// Ignore if bytes is negative, the interface contract states it will be detected externally after call.
// The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
this.totalBytesRead += value;
if (this.totalBytesRead < 0)
{
this.totalBytesRead = int.MaxValue;
}
}
}
public virtual bool ContinueReading()
{
return this.config.AutoRead
&& this.AttemptedBytesRead == this.lastBytesRead
&& this.totalMessages < this.maxMessagePerRead
&& this.totalBytesRead < int.MaxValue;
}
public virtual void ReadComplete()
{
}
public virtual int AttemptedBytesRead { get; set; }
protected int TotalBytesRead() => this.totalBytesRead;
}
}
}

Просмотреть файл

@ -26,9 +26,11 @@ namespace DotNetty.Transport.Channels.Embedded
static readonly IChannelHandler[] EMPTY_HANDLERS = new IChannelHandler[0];
//TODO: ChannelMetadata
static readonly IInternalLogger logger = InternalLoggerFactory.GetInstance<EmbeddedChannel>();
static readonly ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
static readonly ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
readonly EmbeddedEventLoop loop = new EmbeddedEventLoop();
Queue<object> inboundMessages;
@ -64,16 +66,24 @@ namespace DotNetty.Transport.Channels.Embedded
{
}
/// <summary>
/// Create a new instance with the pipeline initialized with the specified handlers.
/// </summary>
public EmbeddedChannel(IChannelId id, params IChannelHandler[] handlers)
: this(id, false, handlers)
{
}
/// <summary>Create a new instance with the pipeline initialized with the specified handlers.</summary>
/// <param name="id">The <see cref="IChannelId" /> of this channel.</param>
/// <param name="hasDisconnect">
/// <c>false</c> if this <see cref="IChannel" /> will delegate <see cref="DisconnectAsync" />
/// to <see cref="CloseAsync" />, <c>true</c> otherwise.
/// </param>
/// <param name="handlers">
/// The <see cref="IChannelHandler" />s that will be added to the <see cref="IChannelPipeline" />
/// </param>
public EmbeddedChannel(IChannelId id, params IChannelHandler[] handlers)
public EmbeddedChannel(IChannelId id, bool hasDisconnect, params IChannelHandler[] handlers)
: base(null, id)
{
this.Metadata = hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
this.Configuration = new DefaultChannelConfiguration(this);
if (handlers == null)
{
@ -99,6 +109,8 @@ namespace DotNetty.Transport.Channels.Embedded
p.AddLast(new LastInboundHandler(this.InboundMessages, this.RecordException));
}
public override ChannelMetadata Metadata { get; }
public override IChannelConfiguration Configuration { get; }
/// <summary>
@ -117,8 +129,6 @@ namespace DotNetty.Transport.Channels.Embedded
public T ReadOutbound<T>() => (T)Poll(this.outboundMessages);
public override bool DisconnectSupported => false;
protected override EndPoint LocalAddressInternal => this.Active ? LOCAL_ADDRESS : null;
protected override EndPoint RemoteAddressInternal => this.Active ? REMOTE_ADDRESS : null;

Просмотреть файл

@ -4,32 +4,26 @@
namespace DotNetty.Transport.Channels
{
using System.Diagnostics.Contracts;
using DotNetty.Buffers;
/// <summary>
/// The {@link RecvByteBufAllocator} that always yields the same buffer
/// The <see cref="IRecvByteBufAllocator" /> that always yields the same buffer
/// size prediction. This predictor ignores the feedback from the I/O thread.
/// </summary>
public sealed class FixedRecvByteBufAllocator : IRecvByteBufAllocator
public sealed class FixedRecvByteBufAllocator : DefaultMaxMessagesRecvByteBufAllocator
{
public static readonly FixedRecvByteBufAllocator Default = new FixedRecvByteBufAllocator(4 * 1024);
sealed class HandleImpl : IRecvByteBufAllocatorHandle
sealed class HandleImpl : MaxMessageHandle<FixedRecvByteBufAllocator>
{
readonly int bufferSize;
public HandleImpl(int bufferSize)
public HandleImpl(FixedRecvByteBufAllocator owner, int bufferSize)
: base(owner)
{
this.bufferSize = bufferSize;
}
public IByteBuffer Allocate(IByteBufferAllocator alloc) => alloc.Buffer(this.bufferSize);
public int Guess() => this.bufferSize;
public void Record(int actualReadBytes)
{
}
public override int Guess() => this.bufferSize;
}
readonly IRecvByteBufAllocatorHandle handle;
@ -42,9 +36,9 @@ namespace DotNetty.Transport.Channels
{
Contract.Requires(bufferSize > 0);
this.handle = new HandleImpl(bufferSize);
this.handle = new HandleImpl(this, bufferSize);
}
public IRecvByteBufAllocatorHandle NewHandle() => this.handle;
public override IRecvByteBufAllocatorHandle NewHandle() => this.handle;
}
}

Просмотреть файл

@ -19,14 +19,18 @@ namespace DotNetty.Transport.Channels
IChannel Parent { get; }
bool DisconnectSupported { get; }
bool Open { get; }
bool Active { get; }
bool Registered { get; }
/// <summary>
/// Return the <see cref="ChannelMetadata" /> of the <see cref="IChannel" /> which describe the nature of the
/// <see cref="IChannel" />.
/// </summary>
ChannelMetadata Metadata { get; }
EndPoint LocalAddress { get; }
EndPoint RemoteAddress { get; }

Просмотреть файл

@ -18,8 +18,6 @@ namespace DotNetty.Transport.Channels
TimeSpan ConnectTimeout { get; set; }
int MaxMessagesPerRead { get; set; }
int WriteSpinCount { get; set; }
IByteBufferAllocator Allocator { get; set; }

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Transport.Channels
{
/// <summary>
/// <see cref="IRecvByteBufAllocator" /> that limits the number of read operations that will be attempted when a read
/// operation
/// is attempted by the event loop.
/// </summary>
public interface IMaxMessagesRecvByteBufAllocator : IRecvByteBufAllocator
{
/// <summary>
/// Gets or sets the maximum number of messages to read per read loop.
/// If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
/// </summary>
int MaxMessagesPerRead { get; set; }
}
}

Просмотреть файл

@ -3,6 +3,7 @@
namespace DotNetty.Transport.Channels
{
using System.Diagnostics.Contracts;
using DotNetty.Buffers;
public interface IRecvByteBufAllocatorHandle
@ -14,16 +15,46 @@ namespace DotNetty.Transport.Channels
IByteBuffer Allocate(IByteBufferAllocator alloc);
/// <summary>
/// Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
/// Similar to <see cref="Allocate" /> except that it does not allocate anything but just tells the
/// capacity.
/// </summary>
int Guess();
/// <summary>
/// Records the the actual number of read bytes in the previous read operation so that the allocator allocates
/// the buffer with potentially more correct capacity.
/// @param actualReadBytes the actual number of read bytes in the previous read operation
/// Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
/// read loop.
/// <p>
/// This may be used by <see cref="ContinueReading" /> to determine if the read operation should complete.
/// </p>
/// This is only ever a hint and may be ignored by the implementation.
/// </summary>
void Record(int actualReadBytes);
/// <param name="config">The channel configuration which may impact this object's behavior.</param>
void Reset(IChannelConfiguration config);
/// <summary>Increment the number of messages that have been read for the current read loop.</summary>
/// <param name="numMessages">The amount to increment by.</param>
void IncMessagesRead(int numMessages);
/// <summary>
/// Get or set the bytes that have been read for the last read operation.
/// This may be used to increment the number of bytes that have been read.
/// </summary>
/// <remarks>
/// Returned value may be negative if an read error
/// occurs. If a negative value is seen it is expected to be return on the next set to
/// <see cref="LastBytesRead" />. A negative value will signal a termination condition enforced externally
/// to this class and is not required to be enforced in <see cref="ContinueReading" />.
/// </remarks>
int LastBytesRead { get; set; }
/// <summary>Get or set how many bytes the read operation will (or did) attempt to read.</summary>
int AttemptedBytesRead { get; set; }
/// <summary>Determine if the current read loop should should continue.</summary>
/// <returns><c>true</c> if the read loop should continue reading. <c>false</c> if the read loop is complete.</returns>
bool ContinueReading();
/// <summary>Signals read completion.</summary>
void ReadComplete();
}
}

Просмотреть файл

@ -7,6 +7,7 @@ namespace DotNetty.Transport.Channels.Sockets
using System.Net.Sockets;
using System.Threading;
using DotNetty.Buffers;
using DotNetty.Common.Utilities;
/// <summary>
/// {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
@ -14,19 +15,16 @@ namespace DotNetty.Transport.Channels.Sockets
public abstract class AbstractSocketByteChannel : AbstractSocketChannel
{
static readonly string ExpectedTypes =
$" (expected: {typeof(IByteBuffer).Name})"; //+ ", " +
$" (expected: {StringUtil.SimpleClassName<IByteBuffer>()})"; //+ ", " +
// todo: FileRegion support
//typeof(FileRegion).Name + ')';
static readonly Action<object> FlushAction = _ => ((AbstractSocketByteChannel)_).Flush();
static readonly Action<object, object> ReadCompletedSyncCallback = OnReadCompletedSync;
/// <summary>
/// Create a new instance
/// @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
/// @param ch the underlying {@link SelectableChannel} on which it operates
/// </summary>
/// <summary>Create a new instance</summary>
/// <param name="parent">the parent <see cref="IChannel"/> by which this instance was created. May be <c>null</c></param>
/// <param name="socket">the underlying <see cref="Socket"/> on which it operates</param>
protected AbstractSocketByteChannel(IChannel parent, Socket socket)
: base(parent, socket)
{
@ -58,7 +56,8 @@ namespace DotNetty.Transport.Channels.Sockets
}
}
void HandleReadException(IChannelPipeline pipeline, IByteBuffer byteBuf, Exception cause, bool close)
void HandleReadException(IChannelPipeline pipeline, IByteBuffer byteBuf, Exception cause, bool close,
IRecvByteBufAllocatorHandle allocHandle)
{
if (byteBuf != null)
{
@ -72,6 +71,7 @@ namespace DotNetty.Transport.Channels.Sockets
byteBuf.Release();
}
}
allocHandle.ReadComplete();
pipeline.FireChannelReadComplete();
pipeline.FireExceptionCaught(cause);
if (close || cause is SocketException)
@ -87,75 +87,48 @@ namespace DotNetty.Transport.Channels.Sockets
IChannelConfiguration config = ch.Configuration;
IChannelPipeline pipeline = ch.Pipeline;
IByteBufferAllocator allocator = config.Allocator;
int maxMessagesPerRead = config.MaxMessagesPerRead;
IRecvByteBufAllocatorHandle allocHandle = this.RecvBufAllocHandle;
allocHandle.Reset(config);
IByteBuffer byteBuf = null;
int messages = 0;
bool close = false;
try
{
operation.Validate();
int totalReadAmount = 0;
bool readPendingReset = false;
do
{
byteBuf = allocHandle.Allocate(allocator);
int writable = byteBuf.WritableBytes;
int localReadAmount = ch.DoReadBytes(byteBuf);
if (localReadAmount <= 0)
//int writable = byteBuf.WritableBytes;
allocHandle.LastBytesRead = ch.DoReadBytes(byteBuf);
if (allocHandle.LastBytesRead <= 0)
{
// not was read release the buffer
// nothing was read -> release the buffer.
byteBuf.Release();
byteBuf = null;
close = localReadAmount < 0;
close = allocHandle.LastBytesRead < 0;
break;
}
if (!readPendingReset)
{
readPendingReset = true;
ch.ReadPending = false;
}
allocHandle.IncMessagesRead(1);
this.Channel.ReadPending = false;
pipeline.FireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= int.MaxValue - localReadAmount)
{
// Avoid overflow.
totalReadAmount = int.MaxValue;
break;
}
totalReadAmount += localReadAmount;
// stop reading
if (!config.AutoRead)
{
break;
}
if (localReadAmount < writable)
{
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
}
while (++messages < maxMessagesPerRead);
while (allocHandle.ContinueReading());
allocHandle.ReadComplete();
pipeline.FireChannelReadComplete();
allocHandle.Record(totalReadAmount);
if (close)
{
this.CloseOnRead();
close = false;
}
}
catch (Exception t)
{
this.HandleReadException(pipeline, byteBuf, t, close);
this.HandleReadException(pipeline, byteBuf, t, close, allocHandle);
}
finally
{
@ -366,11 +339,9 @@ namespace DotNetty.Transport.Channels.Sockets
/// </summary>
protected abstract int DoReadBytes(IByteBuffer buf);
/// <summary>
/// Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
/// @param buf the {@link ByteBuf} from which the bytes should be written
/// @return amount the amount of written bytes
/// </summary>
/// <summary>Write bytes form the given <see cref="IByteBuffer"/> to the underlying <see cref="IChannel"/>.</summary>
/// <param name="buf">the <see cref="IByteBuffer"/> from which the bytes should be written</param>
/// <returns>the amount of written bytes</returns>
protected abstract int DoWriteBytes(IByteBuffer buf);
}
}

Просмотреть файл

@ -36,26 +36,23 @@ namespace DotNetty.Transport.Channels.Sockets
public override void FinishRead(SocketChannelAsyncOperation operation)
{
Contract.Requires(this.channel.EventLoop.InEventLoop);
Contract.Assert(this.channel.EventLoop.InEventLoop);
AbstractSocketMessageChannel ch = this.Channel;
ch.ResetState(StateFlags.ReadScheduled);
IChannelConfiguration config = ch.Configuration;
if (!ch.ReadPending && !config.AutoRead)
{
// ChannelConfig.setAutoRead(false) was called in the meantime
//removeReadOp(); -- noop with IOCP, just don't schedule receive again
return;
}
int maxMessagesPerRead = config.MaxMessagesPerRead;
IChannelPipeline pipeline = ch.Pipeline;
IRecvByteBufAllocatorHandle allocHandle = this.Channel.Unsafe.RecvBufAllocHandle;
allocHandle.Reset(config);
bool closed = false;
Exception exception = null;
try
{
try
{
while (true)
do
{
int localRead = ch.DoReadMessages(this.readBuf);
if (localRead == 0)
@ -68,30 +65,23 @@ namespace DotNetty.Transport.Channels.Sockets
break;
}
// stop reading and remove op
if (!config.AutoRead)
{
break;
}
if (this.readBuf.Count >= maxMessagesPerRead)
{
break;
}
allocHandle.IncMessagesRead(localRead);
}
while (allocHandle.ContinueReading());
}
catch (Exception t)
{
exception = t;
}
ch.ReadPending = false;
int size = this.readBuf.Count;
for (int i = 0; i < size; i++)
{
ch.ReadPending = false;
pipeline.FireChannelRead(this.readBuf[i]);
}
this.readBuf.Clear();
allocHandle.ReadComplete();
pipeline.FireChannelReadComplete();
if (exception != null)

Просмотреть файл

@ -10,12 +10,13 @@ namespace DotNetty.Transport.Channels.Sockets
using DotNetty.Common.Internal.Logging;
/// <summary>
/// A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses
/// NIO selector based implementation to accept new connections.
/// A <see cref="IServerSocketChannel" /> implementation which uses Socket-based implementation to accept new
/// connections.
/// </summary>
public class TcpServerSocketChannel : AbstractSocketChannel, IServerSocketChannel
{
static readonly IInternalLogger Logger = InternalLoggerFactory.GetInstance<TcpServerSocketChannel>();
static readonly ChannelMetadata METADATA = new ChannelMetadata(false, 16);
static readonly Action<object, object> ReadCompletedSyncCallback = OnReadCompletedSync;
@ -40,7 +41,7 @@ namespace DotNetty.Transport.Channels.Sockets
}
/// <summary>
/// Create a new instance using the given {@link Socket}.
/// Create a new instance using the given <see cref="Socket"/>.
/// </summary>
public TcpServerSocketChannel(Socket socket)
: base(null, socket)
@ -52,12 +53,12 @@ namespace DotNetty.Transport.Channels.Sockets
public override bool Active => this.Socket.IsBound;
public override ChannelMetadata Metadata => METADATA;
protected override EndPoint RemoteAddressInternal => null;
protected override EndPoint LocalAddressInternal => this.Socket.LocalEndPoint;
public override bool DisconnectSupported => false;
SocketChannelAsyncOperation AcceptOperation => this.acceptOperation ?? (this.acceptOperation = new SocketChannelAsyncOperation(this, false));
protected override IChannelUnsafe NewUnsafe() => new TcpServerSocketChannelUnsafe(this);
@ -131,14 +132,13 @@ namespace DotNetty.Transport.Channels.Sockets
TcpServerSocketChannel ch = this.Channel;
ch.ResetState(StateFlags.ReadScheduled);
IChannelConfiguration config = ch.Configuration;
int maxMessagesPerRead = config.MaxMessagesPerRead;
IChannelPipeline pipeline = ch.Pipeline;
IRecvByteBufAllocatorHandle allocHandle = this.Channel.Unsafe.RecvBufAllocHandle;
allocHandle.Reset(config);
bool closed = false;
Exception exception = null;
int messageCount = 0;
try
{
Socket connectedSocket = null;
@ -151,7 +151,7 @@ namespace DotNetty.Transport.Channels.Sockets
var message = new TcpSocketChannel(ch, connectedSocket, true);
ch.ReadPending = false;
pipeline.FireChannelRead(message);
messageCount++;
allocHandle.IncMessagesRead(1);
if (!config.AutoRead && !ch.ReadPending)
{
@ -160,19 +160,15 @@ namespace DotNetty.Transport.Channels.Sockets
return;
}
while (messageCount < maxMessagesPerRead)
while (allocHandle.ContinueReading())
{
connectedSocket = null;
connectedSocket = ch.Socket.Accept();
message = new TcpSocketChannel(ch, connectedSocket, true);
ch.ReadPending = false;
pipeline.FireChannelRead(message);
// stop reading and remove op
if (!config.AutoRead)
{
break;
}
messageCount++;
allocHandle.IncMessagesRead(1);
}
}
catch (ObjectDisposedException)
@ -200,6 +196,7 @@ namespace DotNetty.Transport.Channels.Sockets
}
}
allocHandle.ReadComplete();
pipeline.FireChannelReadComplete();
if (exception != null)
@ -221,7 +218,7 @@ namespace DotNetty.Transport.Channels.Sockets
finally
{
// Check if there is a readPending which was not processed yet.
if (!closed && (config.AutoRead || ch.ReadPending))
if (!closed && (ch.ReadPending || config.AutoRead))
{
ch.DoBeginRead();
}

Просмотреть файл

@ -12,41 +12,38 @@ namespace DotNetty.Transport.Channels.Sockets
using DotNetty.Common.Concurrency;
/// <summary>
/// {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
/// <see cref="ISocketChannel" /> which uses Socket-based implementation.
/// </summary>
public class TcpSocketChannel : AbstractSocketByteChannel, ISocketChannel
{
static readonly ChannelMetadata METADATA = new ChannelMetadata(false, 16);
readonly ISocketChannelConfiguration config;
/// <summary>
/// Create a new instance
/// </summary>
/// <summary>Create a new instance</summary>
public TcpSocketChannel()
: this(new Socket(SocketType.Stream, ProtocolType.Tcp))
{
}
/// <summary>
/// Create a new instance
/// </summary>
/// <summary>Create a new instance</summary>
public TcpSocketChannel(AddressFamily addressFamily)
: this(new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp))
{
}
/// <summary>
/// Create a new instance using the given {@link SocketChannel}.
/// </summary>
/// <summary>Create a new instance using the given <see cref="ISocketChannel" />.</summary>
public TcpSocketChannel(Socket socket)
: this(null, socket)
{
}
/// <summary>
/// Create a new instance
/// @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user
/// @param socket the {@link SocketChannel} which will be used
/// </summary>
/// <summary>Create a new instance</summary>
/// <param name="parent">
/// the <see cref="IChannel" /> which created this instance or <c>null</c> if it was created by the
/// user
/// </param>
/// <param name="socket">the <see cref="ISocketChannel" /> which will be used</param>
public TcpSocketChannel(IChannel parent, Socket socket)
: this(parent, socket, false)
{
@ -62,12 +59,7 @@ namespace DotNetty.Transport.Channels.Sockets
}
}
//public new IServerSocketChannel Parent
//{
// get { return (IServerSocketChannel)base.Parent; }
//}
public override bool DisconnectSupported => false;
public override ChannelMetadata Metadata => METADATA;
public override IChannelConfiguration Configuration => this.config;
@ -121,7 +113,7 @@ namespace DotNetty.Transport.Channels.Sockets
protected override void DoBind(EndPoint localAddress) => this.Socket.Bind(localAddress);
protected override bool DoConnect(EndPoint remoteAddress, EndPoint localAddress)
protected override bool DoConnect(EndPoint remoteAddress, EndPoint localAddress)
{
if (localAddress != null)
{
@ -170,7 +162,7 @@ namespace DotNetty.Transport.Channels.Sockets
protected override void DoDisconnect() => this.DoClose();
protected override void DoClose()
protected override void DoClose()
{
base.DoClose();
if (this.ResetState(StateFlags.Open | StateFlags.Active))
@ -328,12 +320,12 @@ namespace DotNetty.Transport.Channels.Sockets
// Release the fully written buffers, and update the indexes of the partially written buffer.
input.RemoveBytes(writtenBytes);
// Did not write all buffers completely.
this.IncompleteWrite(setOpWrite, asyncOperation);
break;
}
// Release the fully written buffers, and update the indexes of the partially written buffer.
input.RemoveBytes(writtenBytes);
}

Просмотреть файл

@ -56,6 +56,8 @@
<Compile Include="Bootstrapping\DefaultNameResolver.cs" />
<Compile Include="Bootstrapping\INameResolver.cs" />
<Compile Include="Bootstrapping\ServerBootstrap.cs" />
<Compile Include="Channels\AdaptiveRecvByteBufAllocator.cs" />
<Compile Include="Channels\ChannelMetadata.cs" />
<Compile Include="Channels\ChannelOutboundBuffer.cs" />
<Compile Include="Channels\AbstractChannel.cs" />
<Compile Include="Channels\AbstractChannelHandlerContext.cs" />
@ -71,6 +73,7 @@
<Compile Include="Channels\DefaultChannelHandlerInvoker.cs" />
<Compile Include="Channels\DefaultChannelId.cs" />
<Compile Include="Channels\DefaultChannelPipeline.cs" />
<Compile Include="Channels\DefaultMaxMessagesRecvByteBufAllocator.cs" />
<Compile Include="Channels\DefaultMessageSizeEstimator.cs" />
<Compile Include="Channels\Embedded\EmbeddedChannel.cs" />
<Compile Include="Channels\Embedded\EmbeddedChannelId.cs" />
@ -86,6 +89,7 @@
<Compile Include="Channels\Groups\IChannelGroupTaskCompletionSource.cs" />
<Compile Include="Channels\Groups\IChannelMatcher.cs" />
<Compile Include="Channels\IChannelId.cs" />
<Compile Include="Channels\IMaxMessagesRecvByteBufAllocator.cs" />
<Compile Include="Channels\MultithreadEventLoopGroup.cs" />
<Compile Include="Channels\IChannel.cs" />
<Compile Include="Channels\IChannelConfiguration.cs" />

Просмотреть файл

@ -13,7 +13,7 @@ namespace DotNetty.Common.Tests
public void MultipleReleaseTest()
{
RecyclableObject obj = RecyclableObject.NewInstance();
Assert.True(obj.Release());
obj.Release();
var exception = Assert.ThrowsAny<InvalidOperationException>(() => obj.Release());
Assert.True(exception != null);
}
@ -22,10 +22,10 @@ namespace DotNetty.Common.Tests
public void ReleaseTest()
{
RecyclableObject obj = RecyclableObject.NewInstance();
Assert.True(obj.Release());
obj.Release();
RecyclableObject obj2 = RecyclableObject.NewInstance();
Assert.Same(obj, obj2);
Assert.True(obj2.Release());
obj2.Release();
}
[Fact]
@ -34,11 +34,11 @@ namespace DotNetty.Common.Tests
RecyclableObject obj = RecyclableObject.NewInstance();
RecyclableObject prevObject = obj;
Task.Run(() => { Assert.True(obj.Release()); }).Wait();
Task.Run(() => { obj.Release(); }).Wait();
obj = RecyclableObject.NewInstance();
Assert.True(obj == prevObject);
Assert.True(obj.Release());
obj.Release();
}
class RecyclableObject
@ -56,7 +56,7 @@ namespace DotNetty.Common.Tests
public static RecyclableObject NewInstance() => pool.Take();
public bool Release() => pool.Release(this, this.handle);
public void Release() => handle.Release(this);
}
class HandledObject