This commit is contained in:
xinchen 2019-08-18 15:38:58 -07:00
Родитель 5f627e710f
Коммит 99c24db6fb
6 изменённых файлов: 56 добавлений и 24 удалений

Просмотреть файл

@ -30,6 +30,7 @@ namespace Amqp.Listener
using System.Security.Principal;
using System.Threading.Tasks;
using Amqp.Framing;
using Amqp.Handler;
using Amqp.Sasl;
using Amqp.Types;
@ -119,6 +120,15 @@ namespace Amqp.Listener
}
}
/// <summary>
/// Gets or sets a factory that creates a <see cref="IHandler"/> for an accepted connection.
/// </summary>
public Func<ConnectionListener, IHandler> HandlerFactory
{
get;
set;
}
/// <summary>
/// Opens the listener.
/// </summary>
@ -227,7 +237,8 @@ namespace Amqp.Listener
principal = profile.GetPrincipal();
}
var connection = new ListenerConnection(this, this.address, this.amqpSettings?.Handler, transport);
IHandler handler = this.HandlerFactory?.Invoke(this);
var connection = new ListenerConnection(this, this.address, handler, transport);
if (principal == null)
{
// SASL principal preferred. If not present, check transport.

Просмотреть файл

@ -17,8 +17,6 @@
namespace Amqp
{
using Amqp.Handler;
/// <summary>
/// Contains the AMQP settings for a <see cref="Connection"/>.
/// </summary>
@ -77,14 +75,5 @@ namespace Amqp
get;
set;
}
/// <summary>
/// Gets or sets a protocol handler.
/// </summary>
public IHandler Handler
{
get;
set;
}
}
}

Просмотреть файл

@ -24,6 +24,7 @@ namespace Amqp
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using Amqp.Framing;
using Amqp.Handler;
using Amqp.Sasl;
/// <summary>
@ -94,6 +95,17 @@ namespace Amqp
get { return this.sslSettings; }
}
/// <summary>
/// Creates a new connection with a protocol handler.
/// </summary>
/// <param name="address">The address of remote endpoint to connect to.</param>
/// <param name="handler">The protocol handler.</param>
/// <returns>A task for the connection creation operation. On success, the result is an AMQP <see cref="Connection"/></returns>
public Task<Connection> CreateAsync(Address address, IHandler handler)
{
return this.CreateAsync(address, null, null, handler);
}
/// <summary>
/// Creates a new connection with a custom open frame and a callback to handle remote open frame.
/// </summary>
@ -101,7 +113,12 @@ namespace Amqp
/// <param name="open">If specified, it is sent to open the connection, otherwise an open frame created from the AMQP settings property is sent.</param>
/// <param name="onOpened">If specified, it is invoked when an open frame is received from the remote peer.</param>
/// <returns>A task for the connection creation operation. On success, the result is an AMQP <see cref="Connection"/></returns>
public async Task<Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null)
public Task<Connection> CreateAsync(Address address, Open open = null, OnOpened onOpened = null)
{
return this.CreateAsync(address, open, onOpened, null);
}
async Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened, IHandler handler)
{
IAsyncTransport transport;
TransportProvider provider;
@ -148,8 +165,7 @@ namespace Amqp
}
AsyncPump pump = new AsyncPump(this.BufferManager, transport);
Connection connection = new Connection(this.BufferManager, this.AMQP, address, transport,
open, onOpened, this.amqpSettings?.Handler);
Connection connection = new Connection(this.BufferManager, this.AMQP, address, transport, open, onOpened, handler);
pump.Start(connection);
return connection;

Просмотреть файл

@ -20,6 +20,7 @@ namespace Amqp
using System;
using System.Threading.Tasks;
using Amqp.Framing;
using Amqp.Handler;
using Amqp.Sasl;
#if UWP
using Windows.Networking.Sockets;
@ -71,10 +72,21 @@ namespace Amqp
/// Creates a new connection.
/// </summary>
/// <param name="address">The address of remote endpoint to connect to.</param>
/// <returns></returns>
/// <returns>A task for the connection creation operation. On success, the result is an AMQP <see cref="Connection"/></returns>
public Task<Connection> CreateAsync(Address address)
{
return this.CreateAsync(address, null, null);
return this.CreateAsync(address, null, null, null);
}
/// <summary>
/// Creates a new connection with a protocol handler.
/// </summary>
/// <param name="address">The address of remote endpoint to connect to.</param>
/// <param name="handler">The protocol handler.</param>
/// <returns>A task for the connection creation operation. On success, the result is an AMQP <see cref="Connection"/></returns>
public Task<Connection> CreateAsync(Address address, IHandler handler)
{
return this.CreateAsync(address, null, null, handler);
}
/// <summary>
@ -83,8 +95,13 @@ namespace Amqp
/// <param name="address">The address of remote endpoint to connect to.</param>
/// <param name="open">If specified, it is sent to open the connection, otherwise an open frame created from the AMQP settings property is sent.</param>
/// <param name="onOpened">If specified, it is invoked when an open frame is received from the remote peer.</param>
/// <returns></returns>
public async Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened)
/// <returns>A task for the connection creation operation. On success, the result is an AMQP <see cref="Connection"/></returns>
public Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened)
{
return this.CreateAsync(address, open, onOpened, null);
}
async Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened, IHandler handler)
{
IAsyncTransport transport;
#if !WINDOWS_PHONE
@ -119,7 +136,7 @@ namespace Amqp
}
AsyncPump pump = new AsyncPump(null, transport);
Connection connection = new Connection(null, this.AMQP, address, transport, open, onOpened, this.amqpSettings?.Handler);
Connection connection = new Connection(null, this.AMQP, address, transport, open, onOpened, handler);
pump.Start(connection);
return connection;

Просмотреть файл

@ -24,13 +24,13 @@ using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Handler;
using Amqp.Listener;
using Amqp.Sasl;
using Amqp.Types;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Net.Sockets;
using System.Text;
using Amqp.Handler;
namespace Test.Amqp
{
@ -673,7 +673,7 @@ namespace Test.Amqp
List<Message> messages = new List<Message>();
this.host.RegisterMessageProcessor(name, new TestMessageProcessor(10, messages));
this.host.Listeners[0].AMQP.Handler = listenerHandler;
this.host.Listeners[0].HandlerFactory = a => listenerHandler;
var connection = new Connection(Address, clientHandler);
var session = new Session(connection);

Просмотреть файл

@ -1729,8 +1729,7 @@ namespace Test.Amqp
});
var factory = new ConnectionFactory();
factory.AMQP.Handler = handler;
Connection connection = await factory.CreateAsync(this.address);
Connection connection = await factory.CreateAsync(this.address, handler);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
await sender.SendAsync(new Message("test") { Properties = new Properties() { MessageId = testName } });