This commit is contained in:
xinchen 2019-06-18 12:38:00 -07:00
Родитель ae67bf3766
Коммит 58f705f5ff
6 изменённых файлов: 140 добавлений и 3 удалений

Просмотреть файл

@ -19,6 +19,9 @@ namespace Amqp.Types
{
using System.Collections;
/// <summary>
/// Defines an AMQP map.
/// </summary>
public partial class Map : Hashtable
{
object GetValue(object key)

Просмотреть файл

@ -17,34 +17,100 @@
namespace Amqp
{
/// <summary>
/// Defines the error codes.
/// </summary>
public enum ErrorCode
{
// client error codes
/// <summary>
/// No handle (session channel or link handle) can be allocated.
/// </summary>
ClientNoHandleAvailable = 1000,
/// <summary>
/// The handle (session channel or link handle) is already assigned.
/// </summary>
ClientHandlInUse = 1001,
/// <summary>
/// The operation times out.
/// </summary>
ClientWaitTimeout = 1002,
/// <summary>
/// The received SASL performatives do not have correct item count.
/// </summary>
ClientInitializeWrongBodyCount = 1003,
/// <summary>
/// The received SASL mechanisms do not contain the expected value.
/// </summary>
ClientInitializeWrongSymbol = 1004,
/// <summary>
/// The received protocol header does not match the client config.
/// </summary>
ClientInitializeHeaderCheckFailed = 1005,
/// <summary>
/// The SASL negotiation failed.
/// </summary>
ClientInitializeSaslFailed = 1006,
/// <summary>
/// An invalid link handle is received.
/// </summary>
ClientInvalidHandle = 1007,
/// <summary>
/// The requested link was not found (by handle or by name).
/// </summary>
ClientLinkNotFound = 1008,
/// <summary>
/// The received frame has an invalid performative code.
/// </summary>
ClientInvalidCodeOnFrame = 1009,
/// <summary>
/// The format code from the buffer is invalid.
/// </summary>
ClientInvalidFormatCodeRead = 1010,
/// <summary>
/// The frame type is invalid.
/// </summary>
ClientInvalidFrameType = 1011,
/// <summary>
/// The session channel is invalid.
/// </summary>
ClientInvalidChannel = 1012,
/// <summary>
/// The descriptor code is invalid.
/// </summary>
ClientInvalidCode = 1013,
/// <summary>
/// The list in the performative is invalid.
/// </summary>
ClientInvalidFieldList = 1014,
/// <summary>
/// The payload for a performative is invalid.
/// </summary>
ClientInvalidPayload = 1015,
/// <summary>
/// The property is not allowed to set after the client is connected.
/// </summary>
ClientNotAllowedAfterConnect = 1016,
/// <summary>
/// The client is not connected.
/// </summary>
ClientNotConnected = 1017,
// received error codes
/// <summary>
/// The receiver is not in a invalid state to start.
/// </summary>
ReceiverStartInvalidState = 2000,
/// <summary>
/// The receiver is not in a invalid state to accept a message.
/// </summary>
ReceiverAcceptInvalidState = 2001,
/// <summary>
/// The receiver does not have link credit for an incoming message.
/// </summary>
InvalidCreditOnTransfer = 2002,
// sender error codes
/// <summary>
/// The sender is not in a valid state to send a message.
/// </summary>
SenderSendInvalidState = 3000
}
}

Просмотреть файл

@ -19,8 +19,14 @@ namespace Amqp
{
using Amqp.Types;
/// <summary>
/// Defines an AMQP link.
/// </summary>
public abstract class Link
{
/// <summary>
/// The link name.
/// </summary>
public string Name;
internal bool Role;

Просмотреть файл

@ -19,6 +19,9 @@ namespace Amqp
{
using Amqp.Types;
/// <summary>
/// Defines an AMQP message.
/// </summary>
public class Message
{
// List of the fields defined in Properties
@ -26,36 +29,54 @@ namespace Amqp
// To access others, user can access the Properties list
List properties;
/// <summary>
/// Gets or sets the message-id field.
/// </summary>
public string MessageId
{
get { return (string)this.Properties[0]; }
set { this.Properties[0] = value; }
}
/// <summary>
/// Gets or sets the to field.
/// </summary>
public string To
{
get { return (string)this.Properties[2]; }
set { this.Properties[2] = value; }
}
/// <summary>
/// Gets or sets the subject field.
/// </summary>
public string Subject
{
get { return (string)this.Properties[3]; }
set { this.Properties[3] = value; }
}
/// <summary>
/// Gets or sets the correlation-id field.
/// </summary>
public string CorrelationId
{
get { return (string)this.Properties[5]; }
set { this.Properties[5] = value; }
}
/// <summary>
/// Gets or sets the message-annotations section.
/// </summary>
public Map MessageAnnotations
{
get;
set;
}
/// <summary>
/// Gets the properties.
/// </summary>
public List Properties
{
get
@ -69,12 +90,18 @@ namespace Amqp
}
}
/// <summary>
/// Gets or sets the application-properties section.
/// </summary>
public Map ApplicationProperties
{
get;
set;
}
/// <summary>
/// Gets or sets the body.
/// </summary>
public object Body { get; set; }
internal uint deliveryId;

Просмотреть файл

@ -20,8 +20,16 @@ namespace Amqp
using System;
using Amqp.Types;
/// <summary>
/// The delegate to be invoked when a message is received.
/// </summary>
/// <param name="receiver">The receiver.</param>
/// <param name="message">The received message.</param>
public delegate void OnMessage(Receiver receiver, Message message);
/// <summary>
/// A receiver link.
/// </summary>
public class Receiver : Link
{
Client client;
@ -42,6 +50,11 @@ namespace Amqp
this.lastDeliveryId = uint.MaxValue;
}
/// <summary>
/// Starts the receiver link.
/// </summary>
/// <param name="credit">The link credit.</param>
/// <param name="onMessage">The message callback.</param>
public void Start(uint credit, OnMessage onMessage)
{
Fx.AssertAndThrow(ErrorCode.ReceiverStartInvalidState, this.State < 0xff);
@ -51,6 +64,10 @@ namespace Amqp
this.client.SendFlow(this.Handle, this.deliveryCount, credit);
}
/// <summary>
/// Accepts a received message.
/// </summary>
/// <param name="message">The received message.</param>
public void Accept(Message message)
{
Fx.AssertAndThrow(ErrorCode.ReceiverAcceptInvalidState, this.State < 0xff);
@ -73,6 +90,9 @@ namespace Amqp
}
}
/// <summary>
/// Closes the receiver.
/// </summary>
public void Close()
{
if (this.State < 0xff)

Просмотреть файл

@ -20,6 +20,9 @@ namespace Amqp
using System;
using Amqp.Types;
/// <summary>
/// A sender link.
/// </summary>
public class Sender : Link
{
const int defaultTimeout = 60000;
@ -37,11 +40,20 @@ namespace Amqp
this.client = client;
}
/// <summary>
/// Sends a message.
/// </summary>
/// <param name="message">The message.</param>
public void Send(Message message)
{
this.Send(message, defaultTimeout);
}
/// <summary>
/// Sends a message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="timeout">The timeout in seconds.</param>
public void Send(Message message, int timeout)
{
Fx.AssertAndThrow(ErrorCode.SenderSendInvalidState, this.State < 0xff);
@ -66,6 +78,9 @@ namespace Amqp
}
}
/// <summary>
/// Closes the sender.
/// </summary>
public void Close()
{
this.client.CloseLink(this);