Test broker: remove implicit queue when the last connection is closed. [#487]

This commit is contained in:
xinchen 2022-02-10 10:12:23 -08:00
Родитель 3b5b32e8a8
Коммит 293d3c29b8
3 изменённых файлов: 99 добавлений и 19 удалений

Просмотреть файл

@ -15,7 +15,9 @@ At least one Uri should be specified. Multiple are allowed (typically one for "a
If "amqps" Uri is present, the "cert" option must exist to specify a server certificate for the Tls listener.
If "queues" option is present, the broker is preconfigured with a list of queues. If it does not exist, the broker implicitly creates a queue upon an attach request and deletes it when the associated connection is closed. This allows running the tests easily without creating or draining the queue. Note that this is different from AMQP dynamic nodes. You can still create dynamic nodes through the protocol.
If "queues" option is present, the broker is preconfigured with a list of queues. If it does not exist, the broker implicitly creates a queue upon the first attach request
and deletes it when the last connection is closed. This allows running the tests easily without creating or draining the queue. Note that this is different from AMQP dynamic nodes.
You can still create dynamic nodes through the protocol.
When "trace" option is specified, the traces will be printed to the console window.

Просмотреть файл

@ -46,7 +46,7 @@ namespace Listener.IContainer
{
foreach (string q in queues)
{
this.queues.Add(q, new TestQueue(this));
this.queues.Add(q, new TestQueue(this, q));
}
}
else
@ -111,7 +111,7 @@ namespace Listener.IContainer
{
lock (this.queues)
{
this.queues.Add(queue, new TestQueue(this));
this.queues.Add(queue, new TestQueue(this, queue));
}
}
@ -220,9 +220,8 @@ namespace Listener.IContainer
{
if (dynamic || (this.implicitQueue && !link.Name.StartsWith("$explicit:")))
{
queue = new TestQueue(this);
queue = new TestQueue(this, address, !dynamic);
this.queues.Add(address, queue);
connection.Closed += (o, e) => this.RemoveQueue(address);
}
else
{
@ -361,6 +360,9 @@ namespace Listener.IContainer
sealed class TestQueue
{
readonly TestAmqpBroker broker;
readonly string address;
readonly bool isImplicit;
readonly HashSet<Connection> connections;
readonly LinkedList<BrokerMessage> messages;
readonly LinkedList<Consumer> waiters;
readonly Dictionary<int, Publisher> publishers;
@ -368,23 +370,27 @@ namespace Listener.IContainer
readonly object syncRoot;
int currentId;
public TestQueue(TestAmqpBroker broker)
public TestQueue(TestAmqpBroker broker, string address, bool isImplicit = false)
{
this.broker = broker;
this.address = address;
this.isImplicit = isImplicit;
this.connections = isImplicit ? new HashSet<Connection>() : null;
this.messages = new LinkedList<BrokerMessage>();
this.waiters = new LinkedList<Consumer>();
this.publishers = new Dictionary<int, Publisher>();
this.consumers = new Dictionary<int, Consumer>();
this.syncRoot = this.waiters;
this.syncRoot = new object();
}
public void CreatePublisher(ListenerLink link)
{
int id = Interlocked.Increment(ref this.currentId);
Publisher publisher = new Publisher(this, link, id);
lock (this.publishers)
lock (this.syncRoot)
{
this.publishers.Add(id, publisher);
this.OnClientConnected(link);
}
}
@ -392,9 +398,34 @@ namespace Listener.IContainer
{
int id = Interlocked.Increment(ref this.currentId);
Consumer consumer = new Consumer(this, link, id);
lock (this.consumers)
lock (this.syncRoot)
{
this.consumers.Add(id, consumer);
this.OnClientConnected(link);
}
}
void OnClientConnected(Link link)
{
if (this.isImplicit && !this.connections.Contains(link.Session.Connection))
{
this.connections.Add(link.Session.Connection);
link.Session.Connection.Closed += OnConnectionClosed;
}
}
void OnConnectionClosed(IAmqpObject sender, Error error)
{
if (this.isImplicit)
{
lock (this.syncRoot)
{
this.connections.Remove((Connection)sender);
if (this.connections.Count == 0)
{
this.broker.RemoveQueue(this.address);
}
}
}
}
@ -538,6 +569,14 @@ namespace Listener.IContainer
}
}
void OnPublisherClosed(int id, Publisher publisher)
{
lock (this.syncRoot)
{
this.publishers.Remove(id);
}
}
void OnConsumerClosed(int id, Consumer consumer)
{
lock (this.syncRoot)
@ -576,10 +615,7 @@ namespace Listener.IContainer
void OnLinkClosed(IAmqpObject sender, Error error)
{
lock (this.queue.publishers)
{
this.queue.publishers.Remove(this.id);
}
this.queue.OnPublisherClosed(this.id, this);
}
static void OnMessage(ListenerLink link, Message message, DeliveryState deliveryState, object state)
@ -631,7 +667,6 @@ namespace Listener.IContainer
readonly TestQueue queue;
readonly ListenerLink link;
readonly int id;
int tag;
public Consumer(TestQueue queue, ListenerLink link, int id)
{
@ -652,11 +687,6 @@ namespace Listener.IContainer
this.link.SendMessage(message, message.Buffer);
}
ArraySegment<byte> GetNextTag()
{
return new ArraySegment<byte>(BitConverter.GetBytes(Interlocked.Increment(ref this.tag)));
}
void OnLinkClosed(IAmqpObject sender, Error error)
{
this.Credit = 0;

Просмотреть файл

@ -150,6 +150,54 @@ namespace Test.Amqp
await connection.CloseAsync();
}
[TestMethod]
public async Task ReceiverSenderAsync()
{
string testName = "ReceiverSenderAsync";
ConnectionFactory connectionFactory = new ConnectionFactory();
// Creating first ReceiverLink
Connection firstReceiverConnection = await connectionFactory.CreateAsync(this.testTarget.Address);
Session firstReceiverSession = new Session(firstReceiverConnection);
ReceiverLink firstReceiverLink = new ReceiverLink(firstReceiverSession, "receiver-link", testName);
// Does not work when creating SenderLink after first ReceiverLink
var senderConnection = await connectionFactory.CreateAsync(this.testTarget.Address);
var senderSession = new Session(senderConnection);
var senderLink = new SenderLink(senderSession, "sender-link", testName);
// Send and receive message
await senderLink.SendAsync(new Message(testName));
Message firstMessageReceived = await firstReceiverLink.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
// Close first reveiver link
await firstReceiverLink.CloseAsync();
await firstReceiverSession.CloseAsync();
await firstReceiverConnection.CloseAsync();
// Creating second ReceiverLink
Connection secondReceiverConnection = await connectionFactory.CreateAsync(this.testTarget.Address);
Session secondReceiverSession = new Session(secondReceiverConnection);
ReceiverLink secondReceiverLink = new ReceiverLink(secondReceiverSession, "receiver-link", testName);
// Send and receive message
await senderLink.SendAsync(new Message(testName));
Message message = await secondReceiverLink.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
Assert.IsTrue(message != null, "No message received");
secondReceiverLink.Accept(message);
// Close second reveiver link
await secondReceiverLink.CloseAsync();
await secondReceiverSession.CloseAsync();
await secondReceiverConnection.CloseAsync();
// Close sender link
await senderLink.CloseAsync();
await senderSession.CloseAsync();
await senderConnection.CloseAsync();
}
#endif
#if NETFX40