Ensure handler factory on connection listener is invoked once per transport.

This commit is contained in:
xinchen 2021-08-04 10:09:01 -07:00
Родитель 54cdce0e10
Коммит ee1ab1b3d9
1 изменённых файлов: 11 добавлений и 10 удалений

Просмотреть файл

@ -133,6 +133,8 @@ namespace Amqp.Listener
/// <summary>
/// Gets or sets a factory that creates a <see cref="IHandler"/> for an accepted connection.
/// </summary>
/// <remarks>The delegate is called once for each accepted transport. It allows for creating
/// a handler per connection if needed (<see cref="Amqp.Connection.Handler"/>).</remarks>
public Func<ConnectionListener, IHandler> HandlerFactory
{
get;
@ -237,7 +239,7 @@ namespace Amqp.Listener
throw new ArgumentNullException("certificate");
}
async Task HandleTransportAsync(IAsyncTransport transport, object context)
async Task HandleTransportAsync(IAsyncTransport transport, IHandler handler, object context)
{
IPrincipal principal = null;
if (this.saslSettings != null)
@ -247,7 +249,6 @@ namespace Amqp.Listener
principal = profile.GetPrincipal();
}
IHandler handler = this.HandlerFactory?.Invoke(this);
var connection = new ListenerConnection(this, this.address, handler, transport);
if (principal == null)
{
@ -631,7 +632,7 @@ namespace Amqp.Listener
IAsyncTransport transport = await this.CreateTransportAsync(socket);
await this.Listener.HandleTransportAsync(transport, socket);
await this.Listener.HandleTransportAsync(transport, handler, socket);
}
catch (Exception exception)
{
@ -736,7 +737,8 @@ namespace Amqp.Listener
try
{
var transport = await this.provider.CreateAsync(this.Listener.address);
await this.Listener.HandleTransportAsync(transport, null);
IHandler handler = this.Listener.HandlerFactory?.Invoke(this.Listener);
await this.Listener.HandleTransportAsync(transport, handler, null);
}
catch (ObjectDisposedException)
{
@ -805,11 +807,11 @@ namespace Amqp.Listener
this.httpListener.Close();
}
async Task HandleListenerContextAsync(HttpListenerContext context)
async Task HandleListenerContextAsync(HttpListenerContext context, IHandler handler)
{
try
{
int status = await this.CreateTransportAsync(context);
int status = await this.CreateTransportAsync(context, handler);
if (status != 0)
{
Trace.WriteLine(TraceLevel.Error, "Failed to create ws transport ", status);
@ -826,7 +828,7 @@ namespace Amqp.Listener
}
}
async Task<int> CreateTransportAsync(HttpListenerContext context)
async Task<int> CreateTransportAsync(HttpListenerContext context, IHandler handler)
{
X509Certificate2 clientCertificate = null;
@ -888,14 +890,13 @@ namespace Amqp.Listener
}
var wsContext = await context.AcceptWebSocketAsync(subProtocol);
IHandler handler = this.Listener.HandlerFactory?.Invoke(this.Listener);
if (handler != null && handler.CanHandle(EventId.WebSocketAccept))
{
handler.Handle(Event.Create(EventId.WebSocketAccept, null, context: wsContext));
}
var wsTransport = new ListenerWebSocketTransport(wsContext.WebSocket, principal);
await this.Listener.HandleTransportAsync(wsTransport, wsContext.WebSocket);
await this.Listener.HandleTransportAsync(wsTransport, handler, wsContext.WebSocket);
return 0;
}
@ -914,7 +915,7 @@ namespace Amqp.Listener
handler.Handle(Event.Create(EventId.HttpAccept, null, context: context));
}
var task = this.HandleListenerContextAsync(context);
var task = this.HandleListenerContextAsync(context, handler);
}
catch (Exception exception)
{