+ Remove Microsoft.Bot.Builder.Streaming assembly
+ Move streaming into BotFrameworkAdapter + HttpAdapter
+ Loosen coupling between StreamingHandler and Streaming adapter
+ General code improvements
This commit is contained in:
Carlos Castro 2019-10-11 00:27:35 -07:00
Родитель 31f556c9c4
Коммит 3e45593930
20 изменённых файлов: 379 добавлений и 665 удалений

Просмотреть файл

@ -104,11 +104,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bot.Connector.Tea
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bot.Builder.Teams", "libraries\Microsoft.Bot.Builder.Teams\Microsoft.Bot.Builder.Teams.csproj", "{56230F58-02EF-4C88-9C28-BE37B8A4D074}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bot.Builder.Streaming", "libraries\Microsoft.Bot.Builder.Streaming\Microsoft.Bot.Builder.Streaming.csproj", "{323E16EB-9491-45E0-86A4-0B567D63A083}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bot.Streaming", "libraries\Microsoft.Bot.Streaming\Microsoft.Bot.Streaming.csproj", "{4C82FD14-418F-43E4-AC59-3D926B55CEA3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bot.Builder.Streaming.Tests", "tests\Microsoft.Bot.Builder.Streaming.Tests\Microsoft.Bot.Builder.Streaming.Tests.csproj", "{77BD3C7F-34D9-47FC-AFC0-6D92A2081770}"
ProjectSection(ProjectDependencies) = postProject
{ADA8AB8B-2066-4193-B8F7-985669B23E00} = {ADA8AB8B-2066-4193-B8F7-985669B23E00}
{053BD8B8-B5C1-4C45-81D4-C9BA8D5B3CE2} = {053BD8B8-B5C1-4C45-81D4-C9BA8D5B3CE2}
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bot.Streaming.Tests", "tests\Microsoft.Bot.Streaming.Tests\Microsoft.Bot.Streaming.Tests.csproj", "{D243AC2D-7823-4177-9D8A-23FDFDA274D2}"
EndProject
@ -377,12 +379,6 @@ Global
{56230F58-02EF-4C88-9C28-BE37B8A4D074}.Debug|Any CPU.Build.0 = Debug|Any CPU
{56230F58-02EF-4C88-9C28-BE37B8A4D074}.Release|Any CPU.ActiveCfg = Release|Any CPU
{56230F58-02EF-4C88-9C28-BE37B8A4D074}.Release|Any CPU.Build.0 = Release|Any CPU
{323E16EB-9491-45E0-86A4-0B567D63A083}.Debug - NuGet Packages|Any CPU.ActiveCfg = Debug|Any CPU
{323E16EB-9491-45E0-86A4-0B567D63A083}.Debug - NuGet Packages|Any CPU.Build.0 = Debug|Any CPU
{323E16EB-9491-45E0-86A4-0B567D63A083}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{323E16EB-9491-45E0-86A4-0B567D63A083}.Debug|Any CPU.Build.0 = Debug|Any CPU
{323E16EB-9491-45E0-86A4-0B567D63A083}.Release|Any CPU.ActiveCfg = Release|Any CPU
{323E16EB-9491-45E0-86A4-0B567D63A083}.Release|Any CPU.Build.0 = Release|Any CPU
{4C82FD14-418F-43E4-AC59-3D926B55CEA3}.Debug - NuGet Packages|Any CPU.ActiveCfg = Debug|Any CPU
{4C82FD14-418F-43E4-AC59-3D926B55CEA3}.Debug - NuGet Packages|Any CPU.Build.0 = Debug|Any CPU
{4C82FD14-418F-43E4-AC59-3D926B55CEA3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
@ -454,7 +450,6 @@ Global
{1F8ACA9B-7721-4D83-8545-6EE449B3A100} = {4269F3C3-6B42-419B-B64A-3E6DC0F1574A}
{630CF216-BA97-4128-8563-214660B153DC} = {4269F3C3-6B42-419B-B64A-3E6DC0F1574A}
{56230F58-02EF-4C88-9C28-BE37B8A4D074} = {4269F3C3-6B42-419B-B64A-3E6DC0F1574A}
{323E16EB-9491-45E0-86A4-0B567D63A083} = {4269F3C3-6B42-419B-B64A-3E6DC0F1574A}
{4C82FD14-418F-43E4-AC59-3D926B55CEA3} = {4269F3C3-6B42-419B-B64A-3E6DC0F1574A}
{77BD3C7F-34D9-47FC-AFC0-6D92A2081770} = {AD743B78-D61F-4FBF-B620-FA83CE599A50}
{D243AC2D-7823-4177-9D8A-23FDFDA274D2} = {AD743B78-D61F-4FBF-B620-FA83CE599A50}

Просмотреть файл

@ -1,34 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.Bot.Builder.Integration.AspNet.Core;
namespace Microsoft.Bot.Builder.Streaming
{
/// <summary>
/// Maps various endpoint handlers for the registered bot into the request execution pipeline using the V4 protocol.
/// </summary>
public static class BotFrameworkApplicationBuilderExtensions
{
/// <summary>
/// Maps various endpoint handlers for the registered bot into the request execution pipeline using the V4 protocol.
/// </summary>
/// <param name="applicationBuilder">The application builder that defines the bot's pipeline.<see cref="IApplicationBuilder"/>.</param>
/// <param name="pipeName">The name of the named pipe to use when creating the server.</param>
/// <returns>A reference to this instance after the operation has completed.</returns>
public static IApplicationBuilder UseBotFrameworkNamedPipe(this IApplicationBuilder applicationBuilder, string pipeName = "bfv4.pipes")
{
if (applicationBuilder == null)
{
throw new ArgumentNullException(nameof(applicationBuilder));
}
var bot = applicationBuilder.ApplicationServices.GetService(typeof(IBot)) as IBot;
_ = (applicationBuilder.ApplicationServices.GetService(typeof(IBotFrameworkHttpAdapter)) as DirectLineAdapter).AddNamedPipeConnection(pipeName, bot);
return applicationBuilder;
}
}
}

Просмотреть файл

@ -1,494 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Security.Claims;
using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Bot.Builder.BotFramework;
using Microsoft.Bot.Builder.Integration.AspNet.Core;
using Microsoft.Bot.Connector;
using Microsoft.Bot.Connector.Authentication;
using Microsoft.Bot.Schema;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.Bot.Builder.Streaming
{
/// <summary>
/// A Bot Builder Adapter implementation used to handle Bot Framework HTTP and streaming requests. Supports the Bot Framework Protocol v3 with Streaming Extensions.
/// </summary>
public class DirectLineAdapter : BotFrameworkAdapter, IBotFrameworkHttpAdapter
{
private const string AuthHeaderName = "authorization";
private const string ChannelIdHeaderName = "channelid";
private const string InvokeResponseKey = "DirectLineAdapter.InvokeResponse";
private const string BotIdentityKey = "BotIdentity";
private IBot _bot;
private ClaimsIdentity _claimsIdentity;
private IList<StreamingRequestHandler> _requestHandlers;
/// <summary>
/// Initializes a new instance of the <see cref="DirectLineAdapter"/> class for processing HTTP or streaming requests.
/// </summary>
/// <param name="credentialProvider">Optional credential provider to use for authorization.</param>
/// <param name="channelProvider">Optional channel provider for use with authorization.</param>
/// <param name="logger">The ILogger implementation this adapter should use.</param>
public DirectLineAdapter(ICredentialProvider credentialProvider = null, IChannelProvider channelProvider = null, ILogger<BotFrameworkHttpAdapter> logger = null)
: base(credentialProvider ?? new SimpleCredentialProvider(), channelProvider, null, null, null, logger)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="DirectLineAdapter"/> class for processing HTTP requests.
/// </summary>
/// <param name="credentialProvider">The credential provider to use for authorization.</param>
/// <param name="channelProvider">The channel provider for use with authorization.</param>
/// <param name="httpClient">The HTTP client to use when sending messages to the channel, services, and skills.</param>
/// <param name="logger">The ILogger implementation this adapter should use.</param>
public DirectLineAdapter(ICredentialProvider credentialProvider, IChannelProvider channelProvider, HttpClient httpClient, ILogger<BotFrameworkHttpAdapter> logger)
: base(credentialProvider ?? new SimpleCredentialProvider(), channelProvider, null, httpClient, null, logger)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="DirectLineAdapter"/> class for processing streaming requests.
/// The StreamingRequestHandler serves as a translation layer between the transport layer and bot adapter.
/// It receives ReceiveRequests from the transport and provides them to the bot adapter in a form
/// it is able to build activities out of, which are then handed to the bot itself to processed.
/// Throws <see cref="ArgumentNullException"/> if arguments are null.
/// </summary>
/// <param name="onTurnError">Optional function to perform on turn errors.</param>
/// <param name="bot">The <see cref="IBot"/> to be used for all requests to this handler.</param>
/// <param name="logger">The ILogger implementation this adapter should use.</param>
public DirectLineAdapter(Func<ITurnContext, Exception, Task> onTurnError, IBot bot, ILogger<BotFrameworkHttpAdapter> logger = null)
: base(new SimpleCredentialProvider(), logger: logger)
{
OnTurnError = onTurnError;
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
}
/// <summary>
/// Initializes a new instance of the <see cref="DirectLineAdapter"/> class for processing HTTP requests.
/// </summary>
/// <param name="configuration"> The configuration containing credential and channel provider details for this adapter. </param>
/// <param name="logger">The ILogger implementation this adapter should use.</param>
protected DirectLineAdapter(IConfiguration configuration, ILogger<BotFrameworkHttpAdapter> logger = null)
: base(new ConfigurationCredentialProvider(configuration), new ConfigurationChannelProvider(configuration), customHttpClient: null, middleware: null, logger: logger)
{
var openIdEndpoint = configuration.GetSection(AuthenticationConstants.BotOpenIdMetadataKey)?.Value;
if (!string.IsNullOrEmpty(openIdEndpoint))
{
// Indicate which Cloud we are using, for example, Public or Sovereign.
ChannelValidation.OpenIdMetadataUrl = openIdEndpoint;
GovernmentChannelValidation.OpenIdMetadataUrl = openIdEndpoint;
}
}
/// <summary>
/// Initial entry point from the bot controller. Validates request and invokes a response from the bot.
/// Also detects and handles WebSocket upgrade requests in the case of streaming connections.
/// </summary>
/// <param name="httpRequest">The request to process.</param>
/// <param name="httpResponse">The response to return to the client.</param>
/// <param name="bot">The bot to use when processing the request.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A task signifying if the work has been completed.</returns>
public async Task ProcessAsync(HttpRequest httpRequest, HttpResponse httpResponse, IBot bot, CancellationToken cancellationToken = default)
{
if (httpRequest == null)
{
throw new ArgumentNullException(nameof(httpRequest));
}
if (httpResponse == null)
{
throw new ArgumentNullException(nameof(httpResponse));
}
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
if (httpRequest.Method == HttpMethods.Get)
{
await ConnectWebSocket(httpRequest, httpResponse).ConfigureAwait(false);
}
else
{
// deserialize the incoming Activity
var activity = HttpHelper.ReadRequest(httpRequest);
// grab the auth header from the inbound http request
var authHeader = httpRequest.Headers["Authorization"];
try
{
// process the inbound activity with the bot
var invokeResponse = await ProcessActivityAsync(authHeader, activity, bot.OnTurnAsync, cancellationToken).ConfigureAwait(false);
// write the response, potentially serializing the InvokeResponse
HttpHelper.WriteResponse(httpResponse, invokeResponse);
}
catch (UnauthorizedAccessException)
{
// handle unauthorized here as this layer creates the http response
httpResponse.StatusCode = (int)HttpStatusCode.Unauthorized;
}
}
}
/// <summary>
/// Replaces the implementation in the base adapter in order to add
/// support for streaming connections.
/// </summary>
/// <param name="turnContext">The current turn context.</param>
/// <param name="activities">The collection of activities to send to the channel.</param>
/// <param name="cancellationToken">Required cancellation token.</param>
/// <returns>A task that represents the work queued to execute.</returns>
public override async Task<ResourceResponse[]> SendActivitiesAsync(ITurnContext turnContext, Activity[] activities, CancellationToken cancellationToken)
{
if (turnContext == null)
{
throw new ArgumentNullException(nameof(turnContext));
}
if (activities == null)
{
throw new ArgumentNullException(nameof(activities));
}
var responses = new ResourceResponse[activities.Length];
/*
* NOTE: we're using for here (vs. foreach) because we want to simultaneously index into the
* activities array to get the activity to process as well as use that index to assign
* the response to the responses array and this is the most cost effective way to do that.
*/
for (var index = 0; index < activities.Length; index++)
{
var activity = activities[index] ?? throw new ArgumentNullException("Found null activity in SendActivitiesAsync.");
var response = default(ResourceResponse);
_logger.LogInformation($"Sending activity. ReplyToId: {activity.ReplyToId}");
if (activity.Type == ActivityTypesEx.Delay)
{
// The Activity Schema doesn't have a delay type build in, so it's simulated
// here in the Bot. This matches the behavior in the Node connector.
var delayMs = (int)activity.Value;
await Task.Delay(delayMs, cancellationToken).ConfigureAwait(false);
// No need to create a response. One will be created below.
}
else if (activity.Type == ActivityTypesEx.InvokeResponse)
{
turnContext.TurnState.Add(InvokeResponseKey, activity);
// No need to create a response. One will be created below.
}
else if (activity.Type == ActivityTypes.Trace && activity.ChannelId != "emulator")
{
// if it is a Trace activity we only send to the channel if it's the emulator.
}
else if (activity.ServiceUrl.StartsWith("u"))
{
// The ServiceUrl for streaming channels begin with the string "urn" and contain
// information unique to streaming connections. If the ServiceUrl for this
// activity begins with a "u" we hand it off to be processed via a new or
// existing streaming connection.
response = await SendStreamingActivityAsync(activity);
}
else if (!string.IsNullOrWhiteSpace(activity.ReplyToId))
{
var connectorClient = turnContext.TurnState.Get<IConnectorClient>();
response = await connectorClient.Conversations.ReplyToActivityAsync(activity, cancellationToken).ConfigureAwait(false);
}
else
{
var connectorClient = turnContext.TurnState.Get<IConnectorClient>();
response = await connectorClient.Conversations.SendToConversationAsync(activity, cancellationToken).ConfigureAwait(false);
}
// If No response is set, then default to a "simple" response. This can't really be done
// above, as there are cases where the ReplyTo/SendTo methods will also return null
// (See below) so the check has to happen here.
// Note: In addition to the Invoke / Delay / Activity cases, this code also applies
// with Skype and Teams with regards to typing events. When sending a typing event in
// these _channels they do not return a RequestResponse which causes the bot to blow up.
// https://github.com/Microsoft/botbuilder-dotnet/issues/460
// bug report : https://github.com/Microsoft/botbuilder-dotnet/issues/465
if (response == null)
{
response = new ResourceResponse(activity.Id ?? string.Empty);
}
responses[index] = response;
}
return responses;
}
/// <summary>
/// Primary adapter method for processing activities sent from streaming channel.
/// Creates a turn context and runs the middleware pipeline for an incoming activity.
/// Throws <see cref="ArgumentNullException"/> on null arguments.
/// </summary>
/// <param name="activity">The <see cref="Activity"/> to process.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute. If the activity type
/// was 'Invoke' and the corresponding key (channelId + activityId) was found
/// then an InvokeResponse is returned, otherwise null is returned.</returns>
/// <remarks>Call this method to reactively send a message to a conversation.
/// If the task completes successfully, then if the activity's <see cref="Activity.Type"/>
/// is <see cref="ActivityTypes.Invoke"/> and the corresponding key
/// (<see cref="Activity.ChannelId"/> + <see cref="Activity.Id"/>) is found
/// then an <see cref="InvokeResponse"/> is returned, otherwise null is returned.
/// <para>This method registers the following services for the turn.<list type="bullet"/></para>
/// </remarks>
public async Task<InvokeResponse> ProcessActivityForStreamingChannelAsync(Activity activity, CancellationToken cancellationToken = default)
{
BotAssert.ActivityNotNull(activity);
_logger.LogInformation($"Received an incoming activity. ActivityId: {activity.Id}");
// If a conversation has moved from one connection to another for the same Channel or Skill and
// hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler
// the conversation has been associated with should always be the active connection.
var requestHandler = _requestHandlers.Where(x => x.ServiceUrl == activity.ServiceUrl).Where(y => y.HasConversation(activity.Conversation.Id)).LastOrDefault();
using (var context = new TurnContext(this, activity))
{
// Pipes are unauthenticated. Pending to check that we are in pipes right now. Do not merge to master without that.
if (_claimsIdentity != null)
{
context.TurnState.Add<IIdentity>(BotIdentityKey, _claimsIdentity);
}
var connectorClient = CreateStreamingConnectorClient(activity, requestHandler);
context.TurnState.Add(connectorClient);
await RunPipelineAsync(context, _bot.OnTurnAsync, cancellationToken).ConfigureAwait(false);
if (activity.Type == ActivityTypes.Invoke)
{
var activityInvokeResponse = context.TurnState.Get<Activity>(InvokeResponseKey);
if (activityInvokeResponse == null)
{
return new InvokeResponse { Status = (int)HttpStatusCode.NotImplemented };
}
else
{
return (InvokeResponse)activityInvokeResponse.Value;
}
}
return null;
}
}
/// <summary>
/// Creates a new StreamingRequestHandler to listen to the specififed Named Pipe
/// and pass requests to this adapter.
/// </summary>
/// <param name="pipeName">The name of the Named Pipe to connect to.</param>
/// <param name="bot">The bot to use when processing activities received over the Named Pipe.</param>
/// <returns>A task that completes only once the StreamingRequestHandler has stopped listening
/// for incoming requests on the Named Pipe.</returns>
public async Task AddNamedPipeConnection(string pipeName, IBot bot)
{
if (_requestHandlers == null)
{
_requestHandlers = new List<StreamingRequestHandler>();
}
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
var requestHandler = new StreamingRequestHandler(_logger, this, pipeName);
_requestHandlers.Add(requestHandler);
await requestHandler.StartListening();
}
/// <summary>
/// Sends activities over streaming connections.
/// If an existing connection is known the adapter will look for it and make use of it.
/// If no existing connection is found a new connection will be opened.
/// </summary>
/// <param name="activity">The <see cref="Activity"/> to send.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A task that resolves to a <see cref="ResourceResponse"/>.</returns>
private async Task<ResourceResponse> SendStreamingActivityAsync(Activity activity, CancellationToken cancellationToken = default)
{
// Check to see if any of this adapter's StreamingRequestHandlers is associated with this conversation.
var possibleHandlers = _requestHandlers.Where(x => x.ServiceUrl == activity.ServiceUrl).Where(y => y.HasConversation(activity.Conversation.Id));
if (possibleHandlers.Count() > 0)
{
if (possibleHandlers.Count() > 1)
{
// The conversation has moved to a new connection and the former StreamingRequestHandler needs to be told to forget about it.
var correctHandler = possibleHandlers.OrderBy(x => x.ConversationAddedTime(activity.Conversation.Id)).Last();
foreach (var handler in possibleHandlers)
{
if (handler != correctHandler)
{
handler.ForgetConversation(activity.Conversation.Id);
}
}
return await correctHandler.SendActivityAsync(activity, cancellationToken);
}
return await possibleHandlers.FirstOrDefault().SendActivityAsync(activity, cancellationToken);
}
else
{
// This is a proactive message that will need a new streaming connection opened.
// The ServiceUrl of a streaming connection follows the pattern "urn:[ChannelName]:[Protocol]:[Host]".
// TODO: This connection needs authentication headers added to it.
var connection = new ClientWebSocket();
var uri = activity.ServiceUrl.Split(':');
var protocol = uri[uri.Length - 2];
var host = uri[uri.Length - 1];
await connection.ConnectAsync(new Uri(protocol + host + "/api/messages"), cancellationToken);
var handler = new StreamingRequestHandler(_logger, this, connection);
if (_requestHandlers == null)
{
_requestHandlers = new List<StreamingRequestHandler>();
}
_requestHandlers.Add(handler);
return await handler.SendActivityAsync(activity, cancellationToken);
}
}
/// <summary>
/// Process the initial request to establish a long lived connection via a streaming server.
/// </summary>
/// <param name="httpRequest">The connection request.</param>
/// <param name="httpResponse">The response sent on error or connection termination.</param>
/// <returns>Returns on task completion.</returns>
private async Task ConnectWebSocket(HttpRequest httpRequest, HttpResponse httpResponse)
{
if (httpRequest == null)
{
throw new ArgumentNullException(nameof(httpRequest));
}
if (httpResponse == null)
{
throw new ArgumentNullException(nameof(httpResponse));
}
if (!httpRequest.HttpContext.WebSockets.IsWebSocketRequest)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest;
await httpRequest.HttpContext.Response.WriteAsync("Upgrade to WebSocket is required.").ConfigureAwait(false);
return;
}
if (!await AuthCheck(httpRequest))
{
return;
}
try
{
var socket = await httpRequest.HttpContext.WebSockets.AcceptWebSocketAsync().ConfigureAwait(false);
var requestHandler = new StreamingRequestHandler(_logger, this, socket);
if (_requestHandlers == null)
{
_requestHandlers = new List<StreamingRequestHandler>();
}
_requestHandlers.Add(requestHandler);
await requestHandler.StartListening().ConfigureAwait(false);
}
catch (Exception ex)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
await httpRequest.HttpContext.Response.WriteAsync("Unable to create transport server.").ConfigureAwait(false);
throw ex;
}
}
private IConnectorClient CreateStreamingConnectorClient(Activity activity, StreamingRequestHandler requestHandler)
{
// TODO: When this is merged into the existing adapter it should be moved inside of
// the existing CreateConnectorClient and use the serviceURL to determine which
// version of the connector to construct.
var emptyCredentials = (_channelProvider != null && _channelProvider.IsGovernment()) ?
MicrosoftGovernmentAppCredentials.Empty :
MicrosoftAppCredentials.Empty;
var streamingClient = new StreamingHttpClient(requestHandler, _logger);
var connectorClient = new ConnectorClient(new Uri(activity.ServiceUrl), emptyCredentials, customHttpClient: streamingClient);
return connectorClient;
}
private async Task<bool> AuthCheck(HttpRequest httpRequest)
{
try
{
if (!await _credentialProvider.IsAuthenticationDisabledAsync().ConfigureAwait(false))
{
var authHeader = httpRequest.Headers.Where(x => x.Key.ToLower() == AuthHeaderName).FirstOrDefault().Value.FirstOrDefault();
var channelId = httpRequest.Headers.Where(x => x.Key.ToLower() == ChannelIdHeaderName).FirstOrDefault().Value.FirstOrDefault();
if (string.IsNullOrWhiteSpace(authHeader))
{
await MissingAuthHeaderHelperAsync(AuthHeaderName, httpRequest).ConfigureAwait(false);
return false;
}
if (string.IsNullOrWhiteSpace(channelId))
{
await MissingAuthHeaderHelperAsync(ChannelIdHeaderName, httpRequest).ConfigureAwait(false);
return false;
}
var claimsIdentity = await JwtTokenValidation.ValidateAuthHeader(authHeader, _credentialProvider, _channelProvider, channelId).ConfigureAwait(false);
if (!claimsIdentity.IsAuthenticated)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
return false;
}
_claimsIdentity = claimsIdentity;
}
return true;
}
catch (Exception ex)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
await httpRequest.HttpContext.Response.WriteAsync("Error while attempting to authorize connection.").ConfigureAwait(false);
throw ex;
}
}
private async Task MissingAuthHeaderHelperAsync(string headerName, HttpRequest httpRequest)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
await httpRequest.HttpContext.Response.WriteAsync($"Unable to authentiate. Missing header: {headerName}").ConfigureAwait(false);
}
}
}

Просмотреть файл

@ -1,81 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Text;
using Microsoft.AspNetCore.Http;
using Microsoft.Bot.Schema;
using Microsoft.Rest.Serialization;
using Newtonsoft.Json;
namespace Microsoft.Bot.Builder.Streaming
{
/// <summary>
/// Added for the preview only, as the real class is internal to the integration library
/// and cannot be accessed by the preview library.
/// TODO: Delete once adapter is merged into integration library.
/// </summary>
internal static class HttpHelper
{
public static readonly JsonSerializer BotMessageSerializer = JsonSerializer.Create(new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
Formatting = Formatting.Indented,
DateFormatHandling = DateFormatHandling.IsoDateFormat,
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
ReferenceLoopHandling = ReferenceLoopHandling.Serialize,
ContractResolver = new ReadOnlyJsonContractResolver(),
Converters = new List<JsonConverter> { new Iso8601TimeSpanConverter() },
});
public static Activity ReadRequest(HttpRequest request)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request));
}
var activity = default(Activity);
using (var bodyReader = new JsonTextReader(new StreamReader(request.Body, Encoding.UTF8)))
{
activity = BotMessageSerializer.Deserialize<Activity>(bodyReader);
}
return activity;
}
public static void WriteResponse(HttpResponse response, InvokeResponse invokeResponse)
{
if (response == null)
{
throw new ArgumentNullException(nameof(response));
}
if (invokeResponse == null)
{
response.StatusCode = (int)HttpStatusCode.OK;
}
else
{
response.StatusCode = invokeResponse.Status;
if (invokeResponse.Body != null)
{
response.ContentType = "application/json";
using (var writer = new StreamWriter(response.Body))
{
using (var jsonWriter = new JsonTextWriter(writer))
{
BotMessageSerializer.Serialize(jsonWriter, invokeResponse.Body);
}
}
}
}
}
}
}

Просмотреть файл

@ -7,6 +7,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.WebSockets;
using System.Security.Claims;
using System.Security.Principal;
using System.Text;
@ -14,9 +15,11 @@ using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Bot.Builder.Integration;
using Microsoft.Bot.Builder.Streaming;
using Microsoft.Bot.Connector;
using Microsoft.Bot.Connector.Authentication;
using Microsoft.Bot.Schema;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Rest.TransientFaultHandling;
@ -43,18 +46,22 @@ namespace Microsoft.Bot.Builder
/// <seealso cref="IActivity"/>
/// <seealso cref="IBot"/>
/// <seealso cref="IMiddleware"/>
public class BotFrameworkAdapter : BotAdapter, IAdapterIntegration, IUserTokenProvider
public class BotFrameworkAdapter : BotAdapter, IAdapterIntegration, IUserTokenProvider, IStreamingActivityProcessor
{
#pragma warning disable SA1401 // Fields should be private
protected const string InvokeResponseKey = "BotFrameworkAdapter.InvokeResponse";
protected const string BotIdentityKey = "BotIdentity";
protected readonly ICredentialProvider _credentialProvider;
protected readonly IChannelProvider _channelProvider;
protected readonly ILogger _logger;
#pragma warning restore SA1401 // Fields should be private
private const string InvokeResponseKey = "BotFrameworkAdapter.InvokeResponse";
private const string BotIdentityKey = "BotIdentity";
protected IBot _bot;
protected ClaimsIdentity _claimsIdentity;
protected IList<StreamingRequestHandler> _requestHandlers;
private static readonly HttpClient _defaultHttpClient = new HttpClient();
private readonly HttpClient _httpClient;
private readonly RetryPolicy _connectorClientRetryPolicy;
private readonly ConcurrentDictionary<string, MicrosoftAppCredentials> _appCredentialMap = new ConcurrentDictionary<string, MicrosoftAppCredentials>();
@ -356,6 +363,14 @@ namespace Microsoft.Bot.Builder
{
// if it is a Trace activity we only send to the channel if it's the emulator.
}
else if (activity.IsFromStreamingConnection())
{
// The ServiceUrl for streaming channels begin with the string "urn" and contain
// information unique to streaming connections. If the ServiceUrl for this
// activity begins with a "u" we hand it off to be processed via a new or
// existing streaming connection.
response = await SendStreamingActivityAsync(activity).ConfigureAwait(false);
}
else if (!string.IsNullOrWhiteSpace(activity.ReplyToId))
{
var connectorClient = turnContext.TurnState.Get<IConnectorClient>();
@ -845,6 +860,86 @@ namespace Microsoft.Bot.Builder
}
}
/// <summary>
/// Primary adapter method for processing activities sent from streaming channel.
/// Creates a turn context and runs the middleware pipeline for an incoming activity.
/// Throws <see cref="ArgumentNullException"/> on null arguments.
/// </summary>
/// <param name="activity">The <see cref="Activity"/> to process.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute. If the activity type
/// was 'Invoke' and the corresponding key (channelId + activityId) was found
/// then an InvokeResponse is returned, otherwise null is returned.</returns>
/// <remarks>Call this method to reactively send a message to a conversation.
/// If the task completes successfully, then if the activity's <see cref="Activity.Type"/>
/// is <see cref="ActivityTypes.Invoke"/> and the corresponding key
/// (<see cref="Activity.ChannelId"/> + <see cref="Activity.Id"/>) is found
/// then an <see cref="InvokeResponse"/> is returned, otherwise null is returned.
/// <para>This method registers the following services for the turn.<list type="bullet"/></para>
/// </remarks>
public async Task<InvokeResponse> ProcessStreamingActivityAsync(Activity activity, CancellationToken cancellationToken = default)
{
BotAssert.ActivityNotNull(activity);
_logger.LogInformation($"Received an incoming activity. ActivityId: {activity.Id}");
// If a conversation has moved from one connection to another for the same Channel or Skill and
// hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler
// the conversation has been associated with should always be the active connection.
var requestHandler = _requestHandlers.Where(x => x.ServiceUrl == activity.ServiceUrl).Where(y => y.HasConversation(activity.Conversation.Id)).LastOrDefault();
using (var context = new TurnContext(this, activity))
{
// Pipes are unauthenticated. Pending to check that we are in pipes right now. Do not merge to master without that.
if (_claimsIdentity != null)
{
context.TurnState.Add<IIdentity>(BotIdentityKey, _claimsIdentity);
}
var connectorClient = CreateStreamingConnectorClient(activity, requestHandler);
context.TurnState.Add(connectorClient);
await RunPipelineAsync(context, _bot.OnTurnAsync, cancellationToken).ConfigureAwait(false);
if (activity.Type == ActivityTypes.Invoke)
{
var activityInvokeResponse = context.TurnState.Get<Activity>(InvokeResponseKey);
if (activityInvokeResponse == null)
{
return new InvokeResponse { Status = (int)HttpStatusCode.NotImplemented };
}
else
{
return (InvokeResponse)activityInvokeResponse.Value;
}
}
return null;
}
}
/// <summary>
/// Creates a new StreamingRequestHandler to listen to the specififed Named Pipe
/// and pass requests to this adapter.
/// </summary>
/// <param name="pipeName">The name of the Named Pipe to connect to.</param>
/// <param name="bot">The bot to use when processing activities received over the Named Pipe.</param>
/// <returns>A task that completes only once the StreamingRequestHandler has stopped listening
/// for incoming requests on the Named Pipe.</returns>
public async Task AddNamedPipeConnection(string pipeName, IBot bot)
{
if (_requestHandlers == null)
{
_requestHandlers = new List<StreamingRequestHandler>();
}
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
var requestHandler = new StreamingRequestHandler(_logger, this, pipeName);
_requestHandlers.Add(requestHandler);
await requestHandler.StartListening().ConfigureAwait(false);
}
/// <summary>
/// Creates an OAuth client for the bot.
/// </summary>
@ -876,6 +971,74 @@ namespace Microsoft.Bot.Builder
return new OAuthClient(new Uri(OAuthClientConfig.OAuthEndpoint), connectorClient.Credentials);
}
/// <summary>
/// Sends activities over streaming connections.
/// If an existing connection is known the adapter will look for it and make use of it.
/// If no existing connection is found a new connection will be opened.
/// </summary>
/// <param name="activity">The <see cref="Activity"/> to send.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A task that resolves to a <see cref="ResourceResponse"/>.</returns>
private async Task<ResourceResponse> SendStreamingActivityAsync(Activity activity, CancellationToken cancellationToken = default)
{
// Check to see if any of this adapter's StreamingRequestHandlers is associated with this conversation.
var possibleHandlers = _requestHandlers.Where(x => x.ServiceUrl == activity.ServiceUrl).Where(y => y.HasConversation(activity.Conversation.Id));
if (possibleHandlers.Count() > 0)
{
if (possibleHandlers.Count() > 1)
{
// The conversation has moved to a new connection and the former StreamingRequestHandler needs to be told to forget about it.
var correctHandler = possibleHandlers.OrderBy(x => x.ConversationAddedTime(activity.Conversation.Id)).Last();
foreach (var handler in possibleHandlers)
{
if (handler != correctHandler)
{
handler.ForgetConversation(activity.Conversation.Id);
}
}
return await correctHandler.SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
}
return await possibleHandlers.FirstOrDefault().SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
}
else
{
// This is a proactive message that will need a new streaming connection opened.
// The ServiceUrl of a streaming connection follows the pattern "urn:[ChannelName]:[Protocol]:[Host]".
// TODO: This connection needs authentication headers added to it.
var connection = new ClientWebSocket();
var uri = activity.ServiceUrl.Split(':');
var protocol = uri[uri.Length - 2];
var host = uri[uri.Length - 1];
await connection.ConnectAsync(new Uri(protocol + host + "/api/messages"), cancellationToken).ConfigureAwait(false);
var handler = new StreamingRequestHandler(_logger, this, connection);
if (_requestHandlers == null)
{
_requestHandlers = new List<StreamingRequestHandler>();
}
_requestHandlers.Add(handler);
return await handler.SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
}
}
private IConnectorClient CreateStreamingConnectorClient(Activity activity, StreamingRequestHandler requestHandler)
{
// TODO: When this is merged into the existing adapter it should be moved inside of
// the existing CreateConnectorClient and use the serviceURL to determine which
// version of the connector to construct.
var emptyCredentials = (_channelProvider != null && _channelProvider.IsGovernment()) ?
MicrosoftGovernmentAppCredentials.Empty :
MicrosoftAppCredentials.Empty;
var streamingClient = new StreamingHttpClient(requestHandler, _logger);
var connectorClient = new ConnectorClient(new Uri(activity.ServiceUrl), emptyCredentials, customHttpClient: streamingClient);
return connectorClient;
}
/// <summary>
/// Creates the connector client asynchronous.
/// </summary>
@ -998,4 +1161,5 @@ namespace Microsoft.Bot.Builder
}
}
}
#pragma warning restore SA1401 // Fields should be private
}

Просмотреть файл

@ -33,6 +33,7 @@
<ItemGroup>
<ProjectReference Include="..\Microsoft.Bot.Connector\Microsoft.Bot.Connector.csproj" />
<ProjectReference Include="..\Microsoft.Bot.Streaming\Microsoft.Bot.Streaming.csproj" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Bot.Schema;
namespace Microsoft.Bot.Builder.Streaming
{
/// <summary>
/// Processes streaming activities.
/// </summary>
public interface IStreamingActivityProcessor
{
Task<InvokeResponse> ProcessStreamingActivityAsync(Activity activity, CancellationToken cancellationToken = default);
}
}

Просмотреть файл

@ -49,6 +49,7 @@ namespace Microsoft.Bot.Builder.Streaming
{
var serverResponse = await _requestHandler.SendStreamingRequestAsync(request, cancellation).ConfigureAwait(false);
// TODO ccastro serverResponse could be null
if (serverResponse.StatusCode == (int)HttpStatusCode.OK)
{
return serverResponse.ReadBodyAsJson<T>();

Просмотреть файл

@ -26,7 +26,7 @@ namespace Microsoft.Bot.Builder.Streaming
public class StreamingRequestHandler : RequestHandler
{
private readonly ILogger _logger;
private readonly DirectLineAdapter _adapter;
private readonly IStreamingActivityProcessor _activityProcessor;
private readonly string _userAgent;
private readonly IDictionary<string, DateTime> _conversations;
@ -39,12 +39,12 @@ namespace Microsoft.Bot.Builder.Streaming
/// establishes a connection over a WebSocket to a streaming channel.
/// </summary>
/// <param name="logger">A logger.</param>
/// <param name="adapter">The adapter for use when processing incoming requests.</param>
/// <param name="activityProcessor">The procesor for incoming requests.</param>
/// <param name="socket">The base socket to use when connecting to the channel.</param>
public StreamingRequestHandler(ILogger logger, DirectLineAdapter adapter, WebSocket socket)
public StreamingRequestHandler(ILogger logger, IStreamingActivityProcessor activityProcessor, WebSocket socket)
{
_logger = logger ?? NullLogger.Instance;
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_activityProcessor = activityProcessor ?? throw new ArgumentNullException(nameof(activityProcessor));
if (socket == null)
{
@ -63,12 +63,12 @@ namespace Microsoft.Bot.Builder.Streaming
/// establishes a connection over a Named Pipe to a streaming channel.
/// </summary>
/// <param name="logger">A logger.</param>
/// <param name="adapter">The adapter for use when processing incoming requests.</param>
/// <param name="activityProcessor">The processor for incoming requests.</param>
/// <param name="pipeName">The name of the Named Pipe to use when connecting to the channel.</param>
public StreamingRequestHandler(ILogger logger, DirectLineAdapter adapter, string pipeName)
public StreamingRequestHandler(ILogger logger, IStreamingActivityProcessor activityProcessor, string pipeName)
{
_logger = logger ?? NullLogger.Instance;
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_activityProcessor = activityProcessor ?? throw new ArgumentNullException(nameof(activityProcessor));
if (string.IsNullOrWhiteSpace(pipeName))
{
@ -206,7 +206,7 @@ namespace Microsoft.Bot.Builder.Streaming
}
// Now that the request has been converted into an activity we can send it to the adapter.
var adapterResponse = await _adapter.ProcessActivityForStreamingChannelAsync(activity, cancellationToken).ConfigureAwait(false);
var adapterResponse = await _activityProcessor.ProcessStreamingActivityAsync(activity, cancellationToken).ConfigureAwait(false);
// Now we convert the invokeResponse returned by the adapter into a StreamingResponse we can send back to the channel.
if (adapterResponse == null)
@ -265,7 +265,7 @@ namespace Microsoft.Bot.Builder.Streaming
{
if (!ServerIsConnected)
{
await Reconnect();
await Reconnect().ConfigureAwait(false);
}
var serverResponse = await _server.SendAsync(request, cancellationToken).ConfigureAwait(false);
@ -295,7 +295,7 @@ namespace Microsoft.Bot.Builder.Streaming
{
if (!ServerIsConnected)
{
await Reconnect();
await Reconnect().ConfigureAwait(false);
}
var serverResponse = await _server.SendAsync(request, cancellationToken).ConfigureAwait(false);

Просмотреть файл

@ -507,6 +507,19 @@ namespace Microsoft.Bot.Schema
return this;
}
/// <summary>
/// Determine if the Activity was sent via an Http/Https connection or Streaming
/// This can be determined by looking at the ServiceUrl property:
/// (1) All channels that send messages via http/https are not streaming
/// (2) Channels that send messages via streaming have a ServiceUrl that does not begin with http/https.
/// </summary>
/// <returns>True if the Activity originated from a streaming connection.</returns>
public bool IsFromStreamingConnection()
{
var isHttp = ServiceUrl?.StartsWith("http", StringComparison.InvariantCultureIgnoreCase);
return isHttp.HasValue ? !isHttp.Value : false;
}
/// <summary>
/// Indicates whether this activity is of a specified activity type.
/// </summary>

Просмотреть файл

@ -20,7 +20,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
<PackageReference Include="Microsoft.Net.Http.Headers" Version="2.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
</ItemGroup>

Просмотреть файл

@ -71,5 +71,24 @@ namespace Microsoft.Bot.Builder.Integration.AspNet.Core
return applicationBuilder;
}
/// <summary>
/// Maps various endpoint handlers for the registered bot into the request execution pipeline using the V4 protocol.
/// </summary>
/// <param name="applicationBuilder">The application builder that defines the bot's pipeline.<see cref="IApplicationBuilder"/>.</param>
/// <param name="pipeName">The name of the named pipe to use when creating the server.</param>
/// <returns>A reference to this instance after the operation has completed.</returns>
public static IApplicationBuilder UseNamedPipes(this IApplicationBuilder applicationBuilder, string pipeName = "bfv4.pipes")
{
if (applicationBuilder == null)
{
throw new ArgumentNullException(nameof(applicationBuilder));
}
var bot = applicationBuilder.ApplicationServices.GetService(typeof(IBot)) as IBot;
_ = (applicationBuilder.ApplicationServices.GetService(typeof(IBotFrameworkHttpAdapter)) as BotFrameworkHttpAdapter).AddNamedPipeConnection(pipeName, bot);
return applicationBuilder;
}
}
}

Просмотреть файл

@ -2,12 +2,15 @@
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Bot.Builder.BotFramework;
using Microsoft.Bot.Builder.Streaming;
using Microsoft.Bot.Connector.Authentication;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
@ -19,18 +22,21 @@ namespace Microsoft.Bot.Builder.Integration.AspNet.Core
/// </summary>
public class BotFrameworkHttpAdapter : BotFrameworkAdapter, IBotFrameworkHttpAdapter
{
private const string AuthHeaderName = "authorization";
private const string ChannelIdHeaderName = "channelid";
public BotFrameworkHttpAdapter(ICredentialProvider credentialProvider = null, IChannelProvider channelProvider = null, ILogger<BotFrameworkHttpAdapter> logger = null)
: base(credentialProvider ?? new SimpleCredentialProvider(), channelProvider, null, null, null, logger)
: base(credentialProvider ?? new SimpleCredentialProvider(), channelProvider, logger: logger)
{
}
public BotFrameworkHttpAdapter(ICredentialProvider credentialProvider, IChannelProvider channelProvider, HttpClient httpClient, ILogger<BotFrameworkHttpAdapter> logger)
: base(credentialProvider ?? new SimpleCredentialProvider(), channelProvider, null, httpClient, null, logger)
: base(credentialProvider ?? new SimpleCredentialProvider(), channelProvider, customHttpClient: httpClient, logger: logger)
{
}
protected BotFrameworkHttpAdapter(IConfiguration configuration, ILogger<BotFrameworkHttpAdapter> logger = null)
: base(new ConfigurationCredentialProvider(configuration), new ConfigurationChannelProvider(configuration), customHttpClient: null, middleware: null, logger: logger)
: base(new ConfigurationCredentialProvider(configuration), new ConfigurationChannelProvider(configuration), logger: logger)
{
var openIdEndpoint = configuration.GetSection(AuthenticationConstants.BotOpenIdMetadataKey)?.Value;
@ -42,7 +48,7 @@ namespace Microsoft.Bot.Builder.Integration.AspNet.Core
}
}
public async Task ProcessAsync(HttpRequest httpRequest, HttpResponse httpResponse, IBot bot, CancellationToken cancellationToken = default(CancellationToken))
public async Task ProcessAsync(HttpRequest httpRequest, HttpResponse httpResponse, IBot bot, CancellationToken cancellationToken = default)
{
if (httpRequest == null)
{
@ -54,30 +60,139 @@ namespace Microsoft.Bot.Builder.Integration.AspNet.Core
throw new ArgumentNullException(nameof(httpResponse));
}
if (bot == null)
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
if (httpRequest.Method == HttpMethods.Get)
{
throw new ArgumentNullException(nameof(bot));
await ConnectWebSocket(httpRequest, httpResponse).ConfigureAwait(false);
}
else
{
// deserialize the incoming Activity
var activity = HttpHelper.ReadRequest(httpRequest);
// grab the auth header from the inbound http request
var authHeader = httpRequest.Headers["Authorization"];
try
{
// process the inbound activity with the bot
var invokeResponse = await ProcessActivityAsync(authHeader, activity, bot.OnTurnAsync, cancellationToken).ConfigureAwait(false);
// write the response, potentially serializing the InvokeResponse
HttpHelper.WriteResponse(httpResponse, invokeResponse);
}
catch (UnauthorizedAccessException)
{
// handle unauthorized here as this layer creates the http response
httpResponse.StatusCode = (int)HttpStatusCode.Unauthorized;
}
}
}
/// <summary>
/// Process the initial request to establish a long lived connection via a streaming server.
/// </summary>
/// <param name="httpRequest">The connection request.</param>
/// <param name="httpResponse">The response sent on error or connection termination.</param>
/// <returns>Returns on task completion.</returns>
private async Task ConnectWebSocket(HttpRequest httpRequest, HttpResponse httpResponse)
{
if (httpRequest == null)
{
throw new ArgumentNullException(nameof(httpRequest));
}
// deserialize the incoming Activity
var activity = HttpHelper.ReadRequest(httpRequest);
if (httpResponse == null)
{
throw new ArgumentNullException(nameof(httpResponse));
}
// grab the auth header from the inbound http request
var authHeader = httpRequest.Headers["Authorization"];
if (!httpRequest.HttpContext.WebSockets.IsWebSocketRequest)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest;
await httpRequest.HttpContext.Response.WriteAsync("Upgrade to WebSocket is required.").ConfigureAwait(false);
return;
}
if (!await AuthCheck(httpRequest).ConfigureAwait(false))
{
return;
}
try
{
// process the inbound activity with the bot
var invokeResponse = await ProcessActivityAsync(authHeader, activity, bot.OnTurnAsync, cancellationToken).ConfigureAwait(false);
var socket = await httpRequest.HttpContext.WebSockets.AcceptWebSocketAsync().ConfigureAwait(false);
var requestHandler = new StreamingRequestHandler(_logger, this, socket);
// write the response, potentially serializing the InvokeResponse
HttpHelper.WriteResponse(httpResponse, invokeResponse);
if (_requestHandlers == null)
{
_requestHandlers = new List<StreamingRequestHandler>();
}
_requestHandlers.Add(requestHandler);
await requestHandler.StartListening().ConfigureAwait(false);
}
catch (UnauthorizedAccessException)
catch (Exception ex)
{
// handle unauthorized here as this layer creates the http response
httpResponse.StatusCode = (int)HttpStatusCode.Unauthorized;
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
await httpRequest.HttpContext.Response.WriteAsync("Unable to create transport server.").ConfigureAwait(false);
throw ex;
}
}
private async Task<bool> AuthCheck(HttpRequest httpRequest)
{
try
{
if (!await _credentialProvider.IsAuthenticationDisabledAsync().ConfigureAwait(false))
{
var authHeader = httpRequest.Headers.Where(x => x.Key.ToLower() == AuthHeaderName).FirstOrDefault().Value.FirstOrDefault();
var channelId = httpRequest.Headers.Where(x => x.Key.ToLower() == ChannelIdHeaderName).FirstOrDefault().Value.FirstOrDefault();
if (string.IsNullOrWhiteSpace(authHeader))
{
await MissingAuthHeaderHelperAsync(AuthHeaderName, httpRequest).ConfigureAwait(false);
return false;
}
if (string.IsNullOrWhiteSpace(channelId))
{
await MissingAuthHeaderHelperAsync(ChannelIdHeaderName, httpRequest).ConfigureAwait(false);
return false;
}
var claimsIdentity = await JwtTokenValidation.ValidateAuthHeader(authHeader, _credentialProvider, _channelProvider, channelId).ConfigureAwait(false);
if (!claimsIdentity.IsAuthenticated)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
return false;
}
_claimsIdentity = claimsIdentity;
}
return true;
}
catch (Exception ex)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
await httpRequest.HttpContext.Response.WriteAsync("Error while attempting to authorize connection.").ConfigureAwait(false);
throw ex;
}
}
private async Task MissingAuthHeaderHelperAsync(string headerName, HttpRequest httpRequest)
{
httpRequest.HttpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
await httpRequest.HttpContext.Response.WriteAsync($"Unable to authentiate. Missing header: {headerName}").ConfigureAwait(false);
}
}
}

Просмотреть файл

@ -41,7 +41,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
botMock.Setup(b => b.OnTurnAsync(It.IsAny<TurnContext>(), It.IsAny<CancellationToken>())).Returns(Task.CompletedTask);
// Act
var adapter = new DirectLineAdapter();
var adapter = new BotFrameworkHttpAdapter();
await adapter.ProcessAsync(httpRequestMock.Object, httpResponseMock.Object, botMock.Object);
// Assert
@ -178,7 +178,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
return stream;
}
private class MyAdapter : DirectLineAdapter
private class MyAdapter : BotFrameworkHttpAdapter
{
public MyAdapter(IConfiguration configuration)
: base(configuration)

Просмотреть файл

@ -18,7 +18,6 @@
<ItemGroup>
<ProjectReference Include="..\..\libraries\integration\Microsoft.Bot.Builder.Integration.AspNet.Core\Microsoft.Bot.Builder.Integration.AspNet.Core.csproj" />
<ProjectReference Include="..\..\libraries\Microsoft.Bot.Builder.Streaming\Microsoft.Bot.Builder.Streaming.csproj" />
<ProjectReference Include="..\..\libraries\Microsoft.Bot.Builder\Microsoft.Bot.Builder.csproj" />
</ItemGroup>

Просмотреть файл

@ -6,6 +6,7 @@ using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Bot.Builder.Integration.AspNet.Core;
using Microsoft.Bot.Schema;
using Microsoft.Bot.Streaming;
using Microsoft.Bot.Streaming.Payloads;
@ -20,7 +21,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
public void CanBeConstructedWithANamedPipe()
{
// Act
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), "fakePipe");
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), "fakePipe");
// Assert
Assert.NotNull(handler);
@ -35,7 +36,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
// Act
try
{
var handler = new StreamingRequestHandler(logger: null, adapter: new DirectLineAdapter(), socket: null);
var handler = new StreamingRequestHandler(logger: null, activityProcessor: new BotFrameworkHttpAdapter(), socket: null);
}
catch (Exception ex)
{
@ -55,7 +56,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
// Act
try
{
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), string.Empty);
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), string.Empty);
}
catch (Exception ex)
{
@ -70,7 +71,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
public void CanBeConstructedWithAWebSocket()
{
// Act
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), new FauxSock());
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), new FauxSock());
// Assert
Assert.NotNull(handler);
@ -80,7 +81,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
public async Task RequestHandlerRespondsWith500OnError()
{
// Arrange
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), "fakePipe");
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), "fakePipe");
var conversationId = "testconvoid";
var membersAdded = new List<ChannelAccount>();
var member = new ChannelAccount
@ -117,7 +118,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
public async Task RequestHandlerRemembersConversations()
{
// Arrange
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), "fakePipe");
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), "fakePipe");
var conversationId = "testconvoid";
var membersAdded = new List<ChannelAccount>();
var member = new ChannelAccount
@ -154,7 +155,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
public async Task RequestHandlerForgetsConversations()
{
// Arrange
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), "fakePipe");
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), "fakePipe");
var conversationId = "testconvoid";
var membersAdded = new List<ChannelAccount>();
var member = new ChannelAccount
@ -192,7 +193,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
public async Task RequestHandlerAssignsAServiceUrl()
{
// Arrange
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), "fakePipe");
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), "fakePipe");
var conversationId = "testconvoid";
var serviceUrl = "urn:FakeName:fakeProtocol://fakePath";
var membersAdded = new List<ChannelAccount>();
@ -233,7 +234,7 @@ namespace Microsoft.Bot.Builder.Streaming.Tests
// Arrange
// Act
var handler = new StreamingRequestHandler(null, new DirectLineAdapter(), "fakePipe");
var handler = new StreamingRequestHandler(null, new BotFrameworkHttpAdapter(), "fakePipe");
var activity = new Schema.Activity()
{
Type = "message",

Просмотреть файл

@ -35,7 +35,7 @@ namespace Microsoft.BotBuilderSamples.Tests.Controllers
var mockBot = new Mock<IBot>();
// Create and initialize controller
var sut = new BotController(mockAdapter.Object, arg => mockBot.Object)
var sut = new BotController(mockAdapter.Object, mockBot.Object)
{
ControllerContext = new ControllerContext(actionContext),
};

Просмотреть файл

@ -20,7 +20,6 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\libraries\Microsoft.Bot.Builder.Streaming\Microsoft.Bot.Builder.Streaming.csproj" />
<ProjectReference Include="..\..\libraries\Microsoft.Bot.Streaming\Microsoft.Bot.Streaming.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -7,7 +7,6 @@ using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Microsoft.Bot.Builder.Streaming;
using Microsoft.Bot.Connector;
using Microsoft.Bot.Schema;
using Microsoft.Bot.Streaming.Payloads;