This commit is contained in:
xinchen 2019-02-13 18:34:07 -08:00
Родитель ae520d9bd1
Коммит 446277a65a
14 изменённых файлов: 259 добавлений и 76 удалений

Просмотреть файл

@ -46,6 +46,9 @@
<Compile Include="..\src\Address.cs" />
<Compile Include="..\src\AmqpBitConverter.cs" />
<Compile Include="..\src\AmqpException.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\IAmqpObject.cs" />
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\ByteBuffer.cs" />

Просмотреть файл

@ -50,6 +50,9 @@
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\ByteBuffer.cs" />
<Compile Include="..\src\Connection.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\IAmqpObject.cs" />
<Compile Include="..\src\Net35\TcpTransport.cs">
<Link>Internal\TcpTransport.cs</Link>

Просмотреть файл

@ -51,6 +51,9 @@
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\ByteBuffer.cs" />
<Compile Include="..\src\Connection.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\IAmqpObject.cs" />
<Compile Include="..\src\Listener\ContainerHost.cs">
<Link>Listener\ContainerHost.cs</Link>

Просмотреть файл

@ -62,6 +62,9 @@
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\ByteBuffer.cs" />
<Compile Include="..\src\Connection.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Delivery.cs">
<Link>Internal\Delivery.cs</Link>
</Compile>

Просмотреть файл

@ -61,6 +61,9 @@
<Compile Include="..\src\Connection.cs">
<Link>Framing\Connection.cs</Link>
</Compile>
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Delivery.cs">
<Link>Internal\Delivery.cs</Link>
</Compile>

Просмотреть файл

@ -54,6 +54,7 @@
<Compile Include="..\src\Connection.cs">
<Link>Connection.cs</Link>
</Compile>
<Compile Include="..\src\CreditMode.cs" Link="CreditMode.cs" />
<Compile Include="..\src\Delivery.cs">
<Link>Delivery.cs</Link>
</Compile>

Просмотреть файл

@ -46,6 +46,9 @@
<Compile Include="..\src\ByteBuffer.cs" />
<Compile Include="..\src\Address.cs" />
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\IAmqpObject.cs" />
<Compile Include="..\src\LinkedList.cs" />
<Compile Include="..\src\Framing\AmqpSequence.cs">

Просмотреть файл

@ -42,6 +42,9 @@
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\ByteBuffer.cs" />
<Compile Include="..\src\Connection.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Delivery.cs" />
<Compile Include="..\src\Framing\Accepted.cs">
<Link>Framing\Accepted.cs</Link>

Просмотреть файл

@ -42,6 +42,9 @@
<Compile Include="..\src\AmqpObject.cs" />
<Compile Include="..\src\ByteBuffer.cs" />
<Compile Include="..\src\Connection.cs" />
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Delivery.cs" />
<Compile Include="..\src\Framing\Accepted.cs">
<Link>Framing\Accepted.cs</Link>

98
src/CreditMode.cs Normal file
Просмотреть файл

@ -0,0 +1,98 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp
{
using System;
enum RestoreCondition
{
OnReceive,
OnAck,
None
};
/// <summary>
/// Defines the mode to restore link credits in a <see cref="ReceiverLink"/>
/// and a threshold of restored credits to send a flow.
/// </summary>
public sealed class CreditMode
{
/// <summary>
/// Defines a mode that link credits are restored after a message is received. Threshold for flow is default.
/// </summary>
public static readonly CreditMode RestoreOnReceive =
new CreditMode() { Condition = RestoreCondition.OnReceive, Threshold = -1 };
/// <summary>
/// Defines a mode that link credits are restored after a message is acknowledged. Threshold for flow is default.
/// </summary>
public static readonly CreditMode RestoreOnAcknowledge =
new CreditMode() { Condition = RestoreCondition.OnAck, Threshold = -1 };
/// <summary>
/// Defines a mode that link credits are drained.
/// </summary>
public static readonly CreditMode Drain =
new CreditMode() { IsDrain = true };
/// <summary>
/// Defines a mode that link credits are managed entirely by the application.
/// </summary>
public static readonly CreditMode Manual =
new CreditMode() { Condition = RestoreCondition.None };
internal RestoreCondition Condition
{
get;
private set;
}
internal int Threshold
{
get;
private set;
}
internal bool IsDrain
{
get;
private set;
}
/// <summary>
/// Creates a CreditMode that auto-restores link credit on receive calls with a custom flow threshold.
/// </summary>
/// <param name="threshold">The threshold of restored credits to send a flow.</param>
/// <returns>A CreditMode object.</returns>
public static CreditMode OnReceive(int threshold)
{
return new CreditMode() { Condition = RestoreCondition.OnReceive, Threshold = threshold };
}
/// <summary>
/// Creates a CreditMode that auto-restores link credit on Accept/Reject/Release/Modify calls
/// with a custom flow threshold.
/// </summary>
/// <param name="threshold">The threshold of restored credits to send a flow.</param>
/// <returns>A CreditMode object.</returns>
public static CreditMode OnAcknowledge(int threshold)
{
return new CreditMode() { Condition = RestoreCondition.OnReceive, Threshold = threshold };
}
}
}

Просмотреть файл

@ -228,19 +228,19 @@ namespace Amqp
void Start(int credit, MessageCallback onMessage);
/// <summary>
/// Sets a credit on the link. It is the total number of unacknowledged messages the remote peer can send.
/// Sets a credit on the link. The credit controls how many messages the peer can send.
/// </summary>
/// <param name="credit">The new link credit.</param>
/// <param name="autoRestore">If true, enables credit auto-restore mode.</param>
/// <param name="autoRestore">If true, this method is the same as SetCredit(credit, CreditMode.RestoreOnAcknowledge);
/// if false, it is the same as SetCredit(credit, CreditMode.Manual).</param>
void SetCredit(int credit, bool autoRestore);
/// <summary>
/// Sets a credit on the link and enables credit auto-restore with a threshold.
/// Sets a credit on the link and the credit management mode.
/// </summary>
/// <param name="credit">The new link credit.</param>
/// <param name="autoRestoreThreshold">The threshold of restored credits to trigger
/// a flow command to increase delivery limit.</param>
void SetCredit(int credit, int autoRestoreThreshold);
/// <param name="creditMode">The credit management mode, see <see cref="CreditMode"/> for details.</param>
void SetCredit(int credit, CreditMode creditMode);
/// <summary>
/// Receives a message. The call is blocked until a message is available or after a default wait time.

Просмотреть файл

@ -35,11 +35,12 @@ namespace Amqp
// flow control
SequenceNumber deliveryCount;
int totalCredit; // total credit set by app or the default
bool drain; // draining in manual flow control
int autoRestoreThreshold; // restored credits that triggers a flow. -1 for manual flow
CreditMode creditMode; // credit management mode
bool drain; // a drain cycle is in progress
int pending; // queued or being processed by application
int credit; // remaining credit
int restored; // processed by the application
int flowThreshold; // restored threshold for a flow
// received messages queue
LinkedList receivedMessages;
@ -83,6 +84,7 @@ namespace Amqp
: base(session, name, onAttached)
{
this.totalCredit = -1;
this.creditMode = CreditMode.RestoreOnAcknowledge;
this.receivedMessages = new LinkedList();
this.waiterList = new LinkedList();
this.SendAttach(true, 0, attach);
@ -101,54 +103,52 @@ namespace Amqp
}
/// <summary>
/// Sets a credit on the link. It is the total number of unacknowledged messages the remote peer can send.
/// Sets a credit on the link. The credit controls how many messages the peer can send.
/// </summary>
/// <param name="credit">The new link credit.</param>
/// <param name="autoRestore">If true, enables credit auto-restore mode.</param>
/// <remarks>
/// By default the credit is set to 200 (20 for netmf). If the default value is not optimal,
/// application should call this method once after the receiver link is created.
/// In credit auto-restore mode, the link keeps track of acknowledged messages and triggers a flow
/// when a threshold is reached. The default threshold is half of <see cref="credit"/>. Application
/// acknowledges a message by calling <see cref="Accept(Message)"/> or <see cref="Reject(Message, Error)"/>
/// method. Please note the following.
/// 1. Calling this method multiple times with different credits is allowed but not recommended.
/// Application may do this if, for example, it needs to control local queue depth based on resource usage.
/// 2. The <paramref name="autoRestore"/> parameter should not be changed after it is initially set.
/// 3. To stop a receiver link, set <paramref name="credit"/> to 0. However application should expect
/// in-flight messages to come as a result of the previous credit.
/// When autoRestore is false, the link starts a drain cycle to request for messages allowed by credit.
/// If a drain cycle is still in progress, the call simply returns without sending a flow. When a credit
/// is set, application is expected to drain the messages by calling <see cref="Receive()"/> in a loop
/// until all messages are received or a null message is returned.
/// </remarks>
/// <param name="autoRestore">If true, this method is the same as SetCredit(credit, CreditMode.RestoreOnAcknowledge);
/// if false, it is the same as SetCredit(credit, CreditMode.Manual).</param>
public void SetCredit(int credit, bool autoRestore = true)
{
this.SetCredit(credit, autoRestore, autoRestore ? credit / 2 : -1);
this.SetCredit(credit, autoRestore ? CreditMode.RestoreOnAcknowledge : CreditMode.Manual);
}
/// <summary>
/// Sets a credit on the link and enables credit auto-restore with a threshold.
/// Sets a credit on the link and the credit management mode.
/// </summary>
/// <param name="credit">The new link credit.</param>
/// <param name="autoRestoreThreshold">The threshold of restored credits to trigger
/// a flow command to increase delivery limit.</param>
/// <remarks>See <see cref="SetCredit(int, bool)"/> for more details about credit auto-restore mode.</remarks>
public void SetCredit(int credit, int autoRestoreThreshold)
{
this.SetCredit(credit, true, autoRestoreThreshold);
}
void SetCredit(int credit, bool autoRestore, int autoRestoreThreshold)
/// <param name="creditMode">The credit management mode, see <see cref="CreditMode"/> for details.</param>
/// <remarks>
/// The receiver link has a default link credit (200). If the default value is not optimal,
/// application should call this method once after the receiver link is created.
/// In credit auto-restore modes, the link keeps track of acknowledged messages and triggers a flow
/// when a threshold is reached. The default threshold is half of <see cref="credit"/>. Application
/// acknowledges a message by calling <see cref="Accept(Message)"/>, <see cref="Reject(Message, Error)"/>,
/// <see cref="Release(Message)"/> or <see cref="Modify(Message, bool, bool, Fields)"/> method.
/// Please note the following.
/// 1. Calling this method multiple times with different credits is allowed but not recommended.
/// Application may do this if, for example, it needs to control local queue depth based on resource usage.
/// 2. The creditMode should not be changed after it is initially set.
/// 3. To stop a receiver link, set <paramref name="credit"/> to 0. However application should expect
/// in-flight messages to come as a result of the previous credit.
/// 4. In drain credit mode, if a drain cycle is still in progress, the call simply returns without
/// sending a flow. Application is expected to keep calling <see cref="Receive()"/> in a loop
/// until all messages are received or a null message is returned.
/// 5. In manual credit mode, application is responsible for keeping track of messages. The link
/// simply sends the flow with the supplied link credit.
/// </remarks>
public void SetCredit(int credit, CreditMode creditMode)
{
if (credit < 0)
{
throw new ArgumentOutOfRangeException("credit");
}
if (autoRestore && (autoRestoreThreshold < 0 || autoRestoreThreshold > credit))
if ((creditMode.Condition == RestoreCondition.OnReceive || creditMode.Condition == RestoreCondition.OnAck) &&
creditMode.Threshold >= 0 &&
(creditMode.Threshold < credit || creditMode.Threshold > credit))
{
throw new ArgumentOutOfRangeException("autoRestoreThreshold");
throw new ArgumentOutOfRangeException("threshold");
}
lock (this.ThisLock)
@ -164,9 +164,32 @@ namespace Amqp
}
var sendFlow = false;
if (autoRestore)
if (creditMode.IsDrain)
{
if (!this.drain)
{
// start a drain cycle.
this.pending = 0;
this.restored = 0;
this.flowThreshold = int.MaxValue;
this.drain = true;
this.credit = credit;
sendFlow = true;
}
}
else if (creditMode.Condition == RestoreCondition.None)
{
this.drain = false;
this.pending = 0;
this.restored = 0;
this.flowThreshold = int.MaxValue;
this.credit = credit;
sendFlow = true;
}
else
{
this.drain = false;
this.flowThreshold = creditMode.Threshold >= 0 ? creditMode.Threshold : credit / 2;
// Only change remaining credit if total credit was increased, to allow
// accepting incoming messages. If total credit is reduced, only update
// total so credit will be later auto-restored to the new limit.
@ -177,18 +200,9 @@ namespace Amqp
sendFlow = true;
}
}
else if (!this.drain)
{
// start a drain cycle.
this.pending = 0;
this.restored = 0;
this.drain = true;
this.credit = credit;
sendFlow = true;
}
this.totalCredit = credit;
this.autoRestoreThreshold = autoRestoreThreshold;
this.creditMode = creditMode;
if (sendFlow)
{
this.SendFlow(this.deliveryCount, (uint)this.credit, this.drain);
@ -330,6 +344,7 @@ namespace Amqp
{
if (waiter.Signal(delivery.Message))
{
this.OnRestoreCredit(RestoreCondition.OnReceive);
return;
}
@ -351,6 +366,7 @@ namespace Amqp
Fx.Assert(waiter == null, "waiter must be null now");
Fx.Assert(callback != null, "callback must not be null now");
callback(this, delivery.Message);
this.OnRestoreCredit(RestoreCondition.OnReceive);
}
else
{
@ -409,6 +425,7 @@ namespace Amqp
if (first != null)
{
this.receivedMessages.Remove(first);
this.OnRestoreCredit(RestoreCondition.OnReceive);
return first.Message;
}
@ -468,31 +485,7 @@ namespace Amqp
this.Session.DisposeDelivery(true, delivery, state, settled);
}
if (this.autoRestoreThreshold >= 0)
{
lock (this.ThisLock)
{
this.restored++;
this.pending--;
// 1. Threshold reached
// 2. App received all without updating before this one.
// Send flow to avoid receiver starvation. This should
// never happen in normal cases.
if (this.restored >= autoRestoreThreshold ||
(this.credit == 0 && this.receivedMessages.First == null))
{
// total credit may be reduced. restore to what is allowed
int delta = Math.Min(this.restored, this.totalCredit - this.credit - this.pending);
if (delta > 0)
{
this.credit += delta;
this.SendFlow(this.deliveryCount, (uint)this.credit, false);
}
this.restored = 0;
}
}
}
this.OnRestoreCredit(RestoreCondition.OnAck);
}
void OnDelivery(SequenceNumber deliveryId)
@ -513,6 +506,36 @@ namespace Amqp
}
}
void OnRestoreCredit(RestoreCondition condition)
{
if (this.creditMode.Condition == condition)
{
lock (this.ThisLock)
{
this.restored++;
this.pending--;
// 1. Threshold reached
// 2. App received all without acking before this one.
// Send flow to avoid receiver starvation. This should
// never happen in normal cases.
if (this.restored >= this.flowThreshold ||
(this.creditMode.Condition == RestoreCondition.OnAck && this.credit == 0 && this.receivedMessages.First == null))
{
// total credit may be reduced. restore to what is allowed
int delta = Math.Min(this.restored, this.totalCredit - this.credit - this.pending);
if (delta > 0)
{
this.credit += delta;
this.SendFlow(this.deliveryCount, (uint)this.credit, false);
}
this.restored = 0;
}
}
}
}
sealed class MessageNode : INode
{
public Message Message { get; set; }

Просмотреть файл

@ -1479,6 +1479,42 @@ namespace Test.Amqp
connection.Close();
}
[TestMethod]
public void ReceiverLinkRestoreCreditOnReceiveTest()
{
uint total = 0;
uint id = 0;
this.testListener.RegisterTarget(TestPoint.Flow, (stream, channel, fields) =>
{
uint current = total;
total = Math.Max(total, (uint)fields[5] + (uint)fields[6]);
for (uint i = current; i < total; i++)
{
TestListener.FRM(stream, 0x14UL, 0, channel, fields[4], id, BitConverter.GetBytes(id), 0u, false, false); // transfer
id++;
}
return TestOutcome.Stop;
});
string testName = "ReceiverLinkRestoreCreditOnReceiveTest";
Connection connection = new Connection(this.address);
Session session = new Session(connection);
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, "any");
receiver.SetCredit(10, CreditMode.RestoreOnReceive);
for (int i = 0; i < 19; i++)
{
Message msg = receiver.Receive();
Assert.IsTrue(msg != null);
}
Assert.IsTrue(total >= 20u, "total: " + total);
Assert.IsTrue(total <= 25u, "total: " + total);
connection.Close();
}
[TestMethod]
public void ReceiverLinkManualCreditTest()
{

Просмотреть файл

@ -477,6 +477,7 @@ namespace Listener.IContainer
else
{
current.Value.LockedBy = consumer;
current = current.Next;
}
consumer.Credit--;