Support address resolver in container host to allow custom address translation before processor lookup. [#307]

This commit is contained in:
xinchen 2022-01-27 13:24:25 -08:00
Родитель d643b00b62
Коммит 3b5b32e8a8
4 изменённых файлов: 159 добавлений и 31 удалений

Просмотреть файл

@ -5,10 +5,14 @@ ContainerHost is the easiest way to start an AMQP listener. Multiple endpoints c
The application is required to register at least one of the following processors to process AMQP events. Multiple message/request processors can be registered as long as their addresses are different. At most one link processor can be registered.
When the container host receives an attach performative from the remote peer,
(1) if a message/request processor is found at the registered address, the host creates a link endpoint for that address and all received messages are routed to that processor.
(2) otherwise, if a link processor is registered, the attach request is routed to the processor.
(3) otherwise, the attach is rejected with "amqp:not-found" error.
When the container host receives an attach performative from the remote peer,
(1) if an address resolver is set, the host first calls the resolver to translate the address from the incoming attach request.
The resolver allows the application to implement various logic for mapping a peer specified address to a listener address, which
is used to register a processor. Common scenarios are message processor to route messages to different destinations and message
source to serve messages from multiple nodes.
(2) if a message/request processor is found at the registered address, the host creates a link endpoint for that address and all received messages are routed to that processor.
(3) otherwise, if a link processor is registered, the attach request is routed to the processor.
(4) otherwise, the attach is rejected with "amqp:not-found" error.
Protocol behavior can be configured through the Listeners properties of the container host.

Просмотреть файл

@ -62,7 +62,9 @@ namespace Amqp.Listener
/// * After the link attach is handled, wrap it in a Target/SourceLinkEndpoint
/// that works with the previously implemented message processor or source.
///
/// Upon receiving an attach performative, the registered message level
/// Upon receiving an attach performative, the host uses the address to look up
/// the registered processors. If an address resolver is set, it is called first
/// and the result overwrites the address in attach. The registered message level
/// processors (IMessageProcessor, IMessageSource, IRequestProcessor) are
/// checked first. If a processor matches the address on the received attach
/// performative, a link is automatically created and the send/receive requests
@ -139,7 +141,7 @@ namespace Amqp.Listener
/// </summary>
public ContainerHost(IList<Address> addressList)
{
this.containerId = string.Join("-", this.GetType().Name, Guid.NewGuid().ToString("N"));
this.containerId = Connection.MakeAmqpContainerId();
this.customTransports = new Dictionary<string, TransportProvider>(StringComparer.OrdinalIgnoreCase);
this.linkCollection = new LinkCollection(this.containerId);
this.onLinkClosed = this.OnLinkClosed;
@ -183,6 +185,21 @@ namespace Amqp.Listener
get { return this.listeners; }
}
/// <summary>
/// Gets or sets an address resolver for all processors.
/// </summary>
/// <remarks>
/// The resolver returns a non-null string which was registered earlier for a
/// processor that can handle the incoming attach request. If a null string is
/// returned, the host continues to search for registered processors that matches
/// the address exactly in the attach.
/// </remarks>
public Func<ContainerHost, Attach, string> AddressResolver
{
get;
set;
}
/// <summary>
/// Opens the container host object.
/// </summary>
@ -323,9 +340,12 @@ namespace Amqp.Listener
static void ThrowIfExists<T>(string address, Dictionary<string, T> processors)
{
if (processors.ContainsKey(address))
lock (processors)
{
throw new AmqpException(ErrorCode.NotAllowed, typeof(T).Name + " processor has been registered at address " + address);
if (processors.ContainsKey(address))
{
throw new AmqpException(ErrorCode.NotAllowed, typeof(T).Name + " processor has been registered at address " + address);
}
}
}
@ -392,36 +412,44 @@ namespace Amqp.Listener
throw new AmqpException(ErrorCode.Stolen, string.Format("Link '{0}' has been attached already.", attach.LinkName));
}
string address = attach.Role ? ((Source)attach.Source).Address : ((Target)attach.Target).Address;
if (string.IsNullOrWhiteSpace(address))
string address = null;
if (this.AddressResolver != null)
{
throw new AmqpException(ErrorCode.InvalidField, "The address field cannot be empty");
address = this.AddressResolver(this, attach);
}
if (listenerLink.Role)
if (address == null)
{
MessageProcessor messageProcessor;
if (TryGetProcessor(this.messageProcessors, address, out messageProcessor))
{
messageProcessor.AddLink(listenerLink, address);
return true;
}
}
else
{
MessageSource messageSource;
if (TryGetProcessor(this.messageSources, address, out messageSource))
{
messageSource.AddLink(listenerLink, address);
return true;
}
address = attach.Role ? ((Source)attach.Source).Address : ((Target)attach.Target).Address;
}
RequestProcessor requestProcessor;
if (TryGetProcessor(this.requestProcessors, address, out requestProcessor))
if (address != null)
{
requestProcessor.AddLink(listenerLink, address, attach);
return true;
if (listenerLink.Role)
{
MessageProcessor messageProcessor;
if (TryGetProcessor(this.messageProcessors, address, out messageProcessor))
{
messageProcessor.AddLink(listenerLink, address);
return true;
}
}
else
{
MessageSource messageSource;
if (TryGetProcessor(this.messageSources, address, out messageSource))
{
messageSource.AddLink(listenerLink, address);
return true;
}
}
RequestProcessor requestProcessor;
if (TryGetProcessor(this.requestProcessors, address, out requestProcessor))
{
requestProcessor.AddLink(listenerLink, address, attach);
return true;
}
}
if (this.linkProcessor != null)
@ -430,6 +458,11 @@ namespace Amqp.Listener
return false;
}
if (string.IsNullOrWhiteSpace(address))
{
throw new AmqpException(ErrorCode.InvalidField, "The address field cannot be empty.");
}
throw new AmqpException(ErrorCode.NotFound, "No processor was found at " + address);
}

Просмотреть файл

@ -73,6 +73,7 @@ namespace Test.Amqp
public void TestCleanup()
{
this.host.AddressResolver = null;
if (this.linkProcessor != null)
{
this.host.UnregisterLinkProcessor(this.linkProcessor);
@ -93,6 +94,66 @@ namespace Test.Amqp
this.ClassCleanup();
}
[TestMethod]
public void ContainerHostAddressResolverTest()
{
string name = "router";
var processor = new TestMessageProcessor();
this.host.AddressResolver = (h, a) => name;
this.host.RegisterMessageProcessor(name, processor);
int count = 10;
var connection = new Connection(Address);
var session = new Session(connection);
for (int i = 0; i < count; i++)
{
var sender = new SenderLink(session, "send-link", "node" + i);
var message = new Message("msg" + i);
message.Properties = new Properties() { To = "node" + i };
sender.Send(message, null, null);
sender.Close();
}
session.Close();
connection.Close();
Assert.AreEqual(count, processor.Messages.Count);
for (int i = 0; i < count; i++)
{
var message = processor.Messages[i];
Assert.AreEqual("node" + (i % 10), message.Properties.To);
}
}
[TestMethod]
public void ContainerHostDynamicProcessorTest()
{
string name = "ContainerHostDynamicProcessorTest";
var processor = new TestMessageProcessor();
this.host.AddressResolver = (h, a) =>
{
h.RegisterMessageProcessor(name, processor);
return name;
};
int count = 10;
var connection = new Connection(Address);
var session = new Session(connection);
var sender = new SenderLink(session, "send-link", name);
for (int i = 0; i < count; i++)
{
var message = new Message("msg" + i);
message.Properties = new Properties() { GroupId = name };
sender.Send(message, Timeout);
}
sender.Close();
session.Close();
connection.Close();
}
[TestMethod]
public void ContainerHostMessageProcessorTest()
{
@ -177,6 +238,35 @@ namespace Test.Amqp
Assert.AreEqual(rejected, source.DeadLetterCount, string.Join(",", source.DeadletterMessage.Select(m => m.Properties.MessageId)));
}
[TestMethod]
public void ContainerHostAnyMessageSourceTest()
{
string name = "*";
int count = 10;
Queue<Message> messages = new Queue<Message>();
for (int i = 0; i < count; i++)
{
messages.Enqueue(new Message("test") { Properties = new Properties() { MessageId = name + i } });
}
this.host.AddressResolver = (h, a) => name;
var source = new TestMessageSource(messages);
this.host.RegisterMessageSource(name, source);
var connection = new Connection(Address);
var session = new Session(connection);
var receiver = new ReceiverLink(session, "receiver0", null);
for (int i = 1; i <= count; i++)
{
Message message = receiver.Receive();
receiver.Accept(message);
}
receiver.Close();
session.Close();
connection.Close();
}
[TestMethod]
public void ContainerHostRequestProcessorTest()
{

Просмотреть файл

@ -130,6 +130,7 @@ namespace Test.Amqp
Assert.AreEqual(ConnectionState.End, connection.ConnectionState);
try
{
Thread.Sleep(300);
networkStream.WriteByte(0);
Assert.IsTrue(false, "transport connection not closed");
}