Handle InvocationCancel message and signal cancellation for in-flight invocations (#972)
This commit is contained in:
Родитель
90e6f922f9
Коммит
3b4ae8c222
|
@ -5,4 +5,5 @@
|
|||
|
||||
- Add Retry Policy Attributes (#977, #971)
|
||||
- Bug fix - GetOutputBindings returns incorrect data when OutputBindingData is not set (#983)
|
||||
- Worker version and environment information returned in init call (WorkerMetadata)
|
||||
- Worker version and environment information returned in init call (#1022)
|
||||
- Handle InvocationCancel message and signal cancellation for in-flight invocations (#972)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker;
|
||||
using Microsoft.Azure.Functions.Worker.Http;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace FunctionApp
|
||||
{
|
||||
public static class HttpTriggerWithCancellation
|
||||
{
|
||||
[Function(nameof(HttpTriggerWithCancellation))]
|
||||
public static async Task<HttpResponseData> Run(
|
||||
[HttpTrigger(AuthorizationLevel.Anonymous,"get", "post", Route = null)]
|
||||
HttpRequestData req,
|
||||
FunctionContext executionContext,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var logger = executionContext.GetLogger("FunctionApp.HttpTriggerWithCancellation");
|
||||
logger.LogInformation("HttpTriggerWithCancellation function triggered");
|
||||
|
||||
try
|
||||
{
|
||||
var response = req.CreateResponse(HttpStatusCode.OK);
|
||||
response.WriteString($"Hello world!");
|
||||
|
||||
await Task.Delay(5000, cancellationToken);
|
||||
|
||||
return response;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
logger.LogInformation("Function invocation cancelled");
|
||||
|
||||
var response = req.CreateResponse(HttpStatusCode.ServiceUnavailable);
|
||||
response.WriteString("Invocation cancelled");
|
||||
|
||||
return response;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,9 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
@ -13,7 +16,7 @@ namespace FunctionApp
|
|||
static async Task Main(string[] args)
|
||||
{
|
||||
// #if DEBUG
|
||||
// Debugger.Launch();
|
||||
// Debugger.Launch();
|
||||
// #endif
|
||||
//<docsnippet_startup>
|
||||
var host = new HostBuilder()
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
|
@ -12,15 +13,15 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
{
|
||||
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
private readonly FunctionInvocation _invocation;
|
||||
|
||||
private IServiceScope? _instanceServicesScope;
|
||||
private IServiceProvider? _instanceServices;
|
||||
private BindingContext? _bindingContext;
|
||||
|
||||
public DefaultFunctionContext(IServiceScopeFactory serviceScopeFactory, IInvocationFeatures features)
|
||||
public DefaultFunctionContext(IServiceScopeFactory serviceScopeFactory, IInvocationFeatures features, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
|
||||
Features = features ?? throw new ArgumentNullException(nameof(features));
|
||||
CancellationToken = cancellationToken;
|
||||
|
||||
_invocation = features.Get<FunctionInvocation>() ?? throw new InvalidOperationException($"The '{nameof(FunctionInvocation)}' feature is required.");
|
||||
FunctionDefinition = features.Get<FunctionDefinition>() ?? throw new InvalidOperationException($"The {nameof(Worker.FunctionDefinition)} feature is required.");
|
||||
|
@ -36,6 +37,8 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
|
||||
public override IInvocationFeatures Features { get; }
|
||||
|
||||
public override CancellationToken CancellationToken { get; }
|
||||
|
||||
public override IServiceProvider InstanceServices
|
||||
{
|
||||
get
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Pipeline
|
||||
|
@ -15,9 +16,9 @@ namespace Microsoft.Azure.Functions.Worker.Pipeline
|
|||
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
|
||||
}
|
||||
|
||||
public FunctionContext Create(IInvocationFeatures invocationFeatures)
|
||||
public FunctionContext Create(IInvocationFeatures invocationFeatures, CancellationToken token = default)
|
||||
{
|
||||
return new DefaultFunctionContext(_serviceScopeFactory, invocationFeatures);
|
||||
return new DefaultFunctionContext(_serviceScopeFactory, invocationFeatures, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker
|
||||
{
|
||||
|
@ -59,5 +60,10 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
/// Gets a collection containing the features supported by this context.
|
||||
/// </summary>
|
||||
public abstract IInvocationFeatures Features { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the <see cref="CancellationToken"/> that signals a function invocation is being cancelled.
|
||||
/// </summary>
|
||||
public virtual CancellationToken CancellationToken { get; } = CancellationToken.None;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
namespace Microsoft.Azure.Functions.Worker
|
||||
{
|
||||
/// <summary>
|
||||
/// Exposes information about retry acvitity for the event that triggered
|
||||
/// Exposes information about retry activity for the event that triggered
|
||||
/// the current function invocation.
|
||||
/// </summary>
|
||||
public abstract class RetryContext
|
||||
|
@ -19,6 +19,5 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
/// before the event is considered undeliverable.
|
||||
/// </summary>
|
||||
public abstract int MaxRetryCount { get; }
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Converters
|
||||
{
|
||||
internal class CancellationTokenConverter : IInputConverter
|
||||
{
|
||||
public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
|
||||
{
|
||||
if (context.TargetType == typeof(CancellationToken) || context.TargetType == typeof(CancellationToken?))
|
||||
{
|
||||
return new ValueTask<ConversionResult>(ConversionResult.Success(context.FunctionContext.CancellationToken));
|
||||
}
|
||||
|
||||
return new ValueTask<ConversionResult>(ConversionResult.Unhandled());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Diagnostics;
|
||||
using Microsoft.Azure.Functions.Worker.Middleware;
|
||||
|
@ -21,8 +22,12 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
private readonly ILogger<FunctionsApplication> _logger;
|
||||
private readonly IWorkerDiagnostics _diagnostics;
|
||||
|
||||
public FunctionsApplication(FunctionExecutionDelegate functionExecutionDelegate, IFunctionContextFactory functionContextFactory,
|
||||
IOptions<WorkerOptions> workerOptions, ILogger<FunctionsApplication> logger, IWorkerDiagnostics diagnostics)
|
||||
public FunctionsApplication(
|
||||
FunctionExecutionDelegate functionExecutionDelegate,
|
||||
IFunctionContextFactory functionContextFactory,
|
||||
IOptions<WorkerOptions> workerOptions,
|
||||
ILogger<FunctionsApplication> logger,
|
||||
IWorkerDiagnostics diagnostics)
|
||||
{
|
||||
_functionExecutionDelegate = functionExecutionDelegate ?? throw new ArgumentNullException(nameof(functionExecutionDelegate));
|
||||
_functionContextFactory = functionContextFactory ?? throw new ArgumentNullException(nameof(functionContextFactory));
|
||||
|
@ -31,14 +36,14 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
|
||||
}
|
||||
|
||||
public FunctionContext CreateContext(IInvocationFeatures features)
|
||||
public FunctionContext CreateContext(IInvocationFeatures features, CancellationToken token = default)
|
||||
{
|
||||
var invocation = features.Get<FunctionInvocation>() ?? throw new InvalidOperationException($"The {nameof(FunctionInvocation)} feature is required.");
|
||||
|
||||
var functionDefinition = _functionMap[invocation.FunctionId];
|
||||
features.Set<FunctionDefinition>(functionDefinition);
|
||||
|
||||
return _functionContextFactory.Create(features);
|
||||
return _functionContextFactory.Create(features, token);
|
||||
}
|
||||
|
||||
public void LoadFunction(FunctionDefinition definition)
|
||||
|
|
|
@ -113,13 +113,14 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
workerOption.InputConverters.Register<StringToByteConverter>();
|
||||
workerOption.InputConverters.Register<JsonPocoConverter>();
|
||||
workerOption.InputConverters.Register<ArrayConverter>();
|
||||
workerOption.InputConverters.Register<CancellationTokenConverter>();
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Run extension startup execution code.
|
||||
/// Our source generator creates a class(WorkerExtensionStartupCodeExecutor)
|
||||
/// which internally calls the "Configure" method on each of the participating
|
||||
/// which internally calls the "Configure" method on each of the participating
|
||||
/// extensions. Here we are calling the uber "Configure" method on the generated class.
|
||||
/// </summary>
|
||||
private static void RunExtensionStartupCode(IFunctionsWorkerApplicationBuilder builder)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker
|
||||
|
@ -11,6 +12,6 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
|
||||
Task InvokeFunctionAsync(FunctionContext context);
|
||||
|
||||
FunctionContext CreateContext(IInvocationFeatures features);
|
||||
FunctionContext CreateContext(IInvocationFeatures features, CancellationToken token = default);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Pipeline
|
||||
{
|
||||
internal interface IFunctionContextFactory
|
||||
{
|
||||
FunctionContext Create(IInvocationFeatures features);
|
||||
FunctionContext Create(IInvocationFeatures features, CancellationToken token);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ using System.Threading.Tasks;
|
|||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker
|
||||
{
|
||||
{
|
||||
internal class DefaultFunctionMetadataProvider : IFunctionMetadataProvider
|
||||
{
|
||||
private const string FileName = "functions.metadata";
|
||||
|
@ -96,7 +96,7 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
{
|
||||
throw new FormatException("Bindings must declare a direction and type.");
|
||||
}
|
||||
|
||||
|
||||
BindingInfo bindingInfo = new BindingInfo
|
||||
{
|
||||
Direction = direction,
|
||||
|
|
|
@ -85,7 +85,7 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
|
||||
Grpc.Core.Channel grpcChannel = new Grpc.Core.Channel(arguments.Host, arguments.Port, ChannelCredentials.Insecure, options);
|
||||
|
||||
#endif
|
||||
#endif
|
||||
return new FunctionRpcClient(grpcChannel);
|
||||
});
|
||||
|
||||
|
|
|
@ -2,8 +2,6 @@
|
|||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
|
@ -12,17 +10,18 @@ using Azure.Core.Serialization;
|
|||
using Grpc.Core;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
using Microsoft.Azure.Functions.Worker.Handlers;
|
||||
using Microsoft.Azure.Functions.Worker.Invocation;
|
||||
using Microsoft.Azure.Functions.Worker.Logging;
|
||||
using Microsoft.Azure.Functions.Worker.OutputBindings;
|
||||
using Microsoft.Azure.Functions.Worker.Rpc;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using static Microsoft.Azure.Functions.Worker.Grpc.Messages.FunctionRpc;
|
||||
using MsgType = Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage.ContentOneofCase;
|
||||
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker
|
||||
{
|
||||
internal class GrpcWorker : IWorker
|
||||
|
@ -40,13 +39,15 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
private readonly ObjectSerializer _serializer;
|
||||
private readonly IFunctionMetadataProvider _functionMetadataProvider;
|
||||
private readonly IHostApplicationLifetime _hostApplicationLifetime;
|
||||
private readonly IInvocationHandler _invocationHandler;
|
||||
|
||||
public GrpcWorker(IFunctionsApplication application, FunctionRpcClient rpcClient, GrpcHostChannel outputChannel, IInvocationFeaturesFactory invocationFeaturesFactory,
|
||||
IOutputBindingsInfoProvider outputBindingsInfoProvider, IMethodInfoLocator methodInfoLocator,
|
||||
IOptions<GrpcWorkerStartupOptions> startupOptions, IOptions<WorkerOptions> workerOptions,
|
||||
IInputConversionFeatureProvider inputConversionFeatureProvider,
|
||||
IFunctionMetadataProvider functionMetadataProvider,
|
||||
IHostApplicationLifetime hostApplicationLifetime)
|
||||
IHostApplicationLifetime hostApplicationLifetime,
|
||||
ILogger<GrpcWorker> logger)
|
||||
{
|
||||
if (outputChannel == null)
|
||||
{
|
||||
|
@ -66,6 +67,9 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
_serializer = workerOptions.Value.Serializer ?? throw new InvalidOperationException(nameof(workerOptions.Value.Serializer));
|
||||
_inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider));
|
||||
_functionMetadataProvider = functionMetadataProvider ?? throw new ArgumentNullException(nameof(functionMetadataProvider));
|
||||
|
||||
// Handlers (TODO: dependency inject handlers instead of creating here)
|
||||
_invocationHandler = new InvocationHandler(_application, _invocationFeaturesFactory, _serializer, _outputBindingsInfoProvider, _inputConversionFeatureProvider, logger);
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken token)
|
||||
|
@ -128,125 +132,60 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
RequestId = request.RequestId
|
||||
};
|
||||
|
||||
if (request.ContentCase == MsgType.InvocationRequest)
|
||||
switch(request.ContentCase)
|
||||
{
|
||||
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest, _application,
|
||||
_invocationFeaturesFactory, _serializer, _outputBindingsInfoProvider, _inputConversionFeatureProvider);
|
||||
}
|
||||
else if (request.ContentCase == MsgType.WorkerInitRequest)
|
||||
{
|
||||
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest);
|
||||
}
|
||||
else if (request.ContentCase == MsgType.WorkerStatusRequest)
|
||||
{
|
||||
responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
|
||||
}
|
||||
else if (request.ContentCase == MsgType.FunctionsMetadataRequest)
|
||||
{
|
||||
responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
|
||||
}
|
||||
else if (request.ContentCase == MsgType.WorkerTerminate)
|
||||
{
|
||||
WorkerTerminateRequestHandler(request.WorkerTerminate);
|
||||
}
|
||||
else if (request.ContentCase == MsgType.FunctionLoadRequest)
|
||||
{
|
||||
responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
|
||||
}
|
||||
else if (request.ContentCase == MsgType.FunctionEnvironmentReloadRequest)
|
||||
{
|
||||
// No-op for now, but return a response.
|
||||
responseMessage.FunctionEnvironmentReloadResponse = new FunctionEnvironmentReloadResponse
|
||||
{
|
||||
Result = new StatusResult { Status = StatusResult.Types.Status.Success }
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: Trace failure here.
|
||||
return;
|
||||
case MsgType.InvocationRequest:
|
||||
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
|
||||
break;
|
||||
|
||||
case MsgType.WorkerInitRequest:
|
||||
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest);
|
||||
break;
|
||||
|
||||
case MsgType.WorkerStatusRequest:
|
||||
responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
|
||||
break;
|
||||
|
||||
case MsgType.FunctionsMetadataRequest:
|
||||
responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
|
||||
break;
|
||||
|
||||
case MsgType.WorkerTerminate:
|
||||
WorkerTerminateRequestHandler(request.WorkerTerminate);
|
||||
break;
|
||||
|
||||
case MsgType.FunctionLoadRequest:
|
||||
responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
|
||||
break;
|
||||
|
||||
case MsgType.FunctionEnvironmentReloadRequest:
|
||||
// No-op for now, but return a response.
|
||||
responseMessage.FunctionEnvironmentReloadResponse = new FunctionEnvironmentReloadResponse
|
||||
{
|
||||
Result = new StatusResult { Status = StatusResult.Types.Status.Success }
|
||||
};
|
||||
break;
|
||||
|
||||
case MsgType.InvocationCancel:
|
||||
InvocationCancelRequestHandler(request.InvocationCancel);
|
||||
break;
|
||||
|
||||
default:
|
||||
// TODO: Trace failure here.
|
||||
return;
|
||||
}
|
||||
|
||||
await _outputWriter.WriteAsync(responseMessage);
|
||||
}
|
||||
|
||||
internal static async Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request, IFunctionsApplication application,
|
||||
IInvocationFeaturesFactory invocationFeaturesFactory, ObjectSerializer serializer,
|
||||
IOutputBindingsInfoProvider outputBindingsInfoProvider,
|
||||
IInputConversionFeatureProvider functionInputConversionFeatureProvider)
|
||||
internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
|
||||
{
|
||||
FunctionContext? context = null;
|
||||
InvocationResponse response = new()
|
||||
{
|
||||
InvocationId = request.InvocationId
|
||||
};
|
||||
return _invocationHandler.InvokeAsync(request);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var invocation = new GrpcFunctionInvocation(request);
|
||||
|
||||
IInvocationFeatures invocationFeatures = invocationFeaturesFactory.Create();
|
||||
invocationFeatures.Set<FunctionInvocation>(invocation);
|
||||
invocationFeatures.Set<IExecutionRetryFeature>(invocation);
|
||||
|
||||
context = application.CreateContext(invocationFeatures);
|
||||
invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, outputBindingsInfoProvider));
|
||||
|
||||
if (functionInputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
|
||||
{
|
||||
invocationFeatures.Set<IInputConversionFeature>(conversion!);
|
||||
}
|
||||
|
||||
await application.InvokeFunctionAsync(context);
|
||||
|
||||
var functionBindings = context.GetBindings();
|
||||
|
||||
foreach (var binding in functionBindings.OutputBindingData)
|
||||
{
|
||||
var parameterBinding = new ParameterBinding
|
||||
{
|
||||
Name = binding.Key
|
||||
};
|
||||
|
||||
if (binding.Value is not null)
|
||||
{
|
||||
parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
|
||||
}
|
||||
|
||||
response.OutputData.Add(parameterBinding);
|
||||
}
|
||||
|
||||
if (functionBindings.InvocationResult != null)
|
||||
{
|
||||
TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
|
||||
|
||||
response.ReturnValue = returnVal;
|
||||
}
|
||||
|
||||
response.Result = new StatusResult
|
||||
{
|
||||
Status = StatusResult.Types.Status.Success
|
||||
};
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
response.Result = new StatusResult
|
||||
{
|
||||
Exception = ex.ToRpcException(),
|
||||
Status = StatusResult.Types.Status.Failure
|
||||
};
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (context is IAsyncDisposable asyncContext)
|
||||
{
|
||||
await asyncContext.DisposeAsync();
|
||||
}
|
||||
|
||||
(context as IDisposable)?.Dispose();
|
||||
}
|
||||
|
||||
return response;
|
||||
internal void InvocationCancelRequestHandler(InvocationCancel request)
|
||||
{
|
||||
_invocationHandler.TryCancel(request.InvocationId);
|
||||
}
|
||||
|
||||
internal static WorkerInitResponse WorkerInitRequestHandler(WorkerInitRequest request)
|
||||
|
@ -273,6 +212,7 @@ namespace Microsoft.Azure.Functions.Worker
|
|||
response.Capabilities.Add("TypedDataCollection", bool.TrueString);
|
||||
response.Capabilities.Add("WorkerStatus", bool.TrueString);
|
||||
response.Capabilities.Add("HandlesWorkerTerminateMessage", bool.TrueString);
|
||||
response.Capabilities.Add("HandlesInvocationCancelMessage", bool.TrueString);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Handlers
|
||||
{
|
||||
internal interface IInvocationHandler
|
||||
{
|
||||
/// <summary>
|
||||
/// Invokes a function based on incoming <see cref="InvocationRequest"/> and returns
|
||||
/// an <see cref="InvocationResponse"/>. This includes creating and keeping track of
|
||||
/// an associated cancellation token source for the invocation.
|
||||
/// </summary>
|
||||
/// <param name="request">Function invocation request</param>
|
||||
/// <returns><see cref="InvocationResponse"/></returns>
|
||||
Task<InvocationResponse> InvokeAsync(InvocationRequest request);
|
||||
|
||||
/// <summary>
|
||||
/// Cancels an invocation's associated <see cref="CancellationTokenSource"/>.
|
||||
/// </summary>
|
||||
/// <param name="invocationId">Invocation ID</param>
|
||||
/// <returns>Returns bool indicating success or failure</returns>
|
||||
bool TryCancel(string invocationId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,158 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Core.Serialization;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
using Microsoft.Azure.Functions.Worker.OutputBindings;
|
||||
using Microsoft.Azure.Functions.Worker.Rpc;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Handlers
|
||||
{
|
||||
internal class InvocationHandler : IInvocationHandler
|
||||
{
|
||||
private readonly IFunctionsApplication _application;
|
||||
private readonly IInvocationFeaturesFactory _invocationFeaturesFactory;
|
||||
private readonly IOutputBindingsInfoProvider _outputBindingsInfoProvider;
|
||||
private readonly IInputConversionFeatureProvider _inputConversionFeatureProvider;
|
||||
private readonly ObjectSerializer _serializer;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
private ConcurrentDictionary<string, CancellationTokenSource> _inflightInvocations;
|
||||
|
||||
public InvocationHandler(
|
||||
IFunctionsApplication application,
|
||||
IInvocationFeaturesFactory invocationFeaturesFactory,
|
||||
ObjectSerializer serializer,
|
||||
IOutputBindingsInfoProvider outputBindingsInfoProvider,
|
||||
IInputConversionFeatureProvider inputConversionFeatureProvider,
|
||||
ILogger logger)
|
||||
{
|
||||
_application = application ?? throw new ArgumentNullException(nameof(application));
|
||||
_invocationFeaturesFactory = invocationFeaturesFactory ?? throw new ArgumentNullException(nameof(invocationFeaturesFactory));
|
||||
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
|
||||
_outputBindingsInfoProvider = outputBindingsInfoProvider ?? throw new ArgumentNullException(nameof(outputBindingsInfoProvider));
|
||||
_inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
|
||||
_inflightInvocations = new ConcurrentDictionary<string, CancellationTokenSource>();
|
||||
}
|
||||
|
||||
public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
|
||||
{
|
||||
using CancellationTokenSource cancellationTokenSource = new();
|
||||
FunctionContext? context = null;
|
||||
InvocationResponse response = new()
|
||||
{
|
||||
InvocationId = request.InvocationId,
|
||||
Result = new StatusResult()
|
||||
};
|
||||
|
||||
if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
|
||||
{
|
||||
var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
|
||||
response.Result.Status = StatusResult.Types.Status.Failure;
|
||||
response.Result.Exception = exception.ToRpcException();
|
||||
return response;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var invocation = new GrpcFunctionInvocation(request);
|
||||
|
||||
IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
|
||||
invocationFeatures.Set<FunctionInvocation>(invocation);
|
||||
invocationFeatures.Set<IExecutionRetryFeature>(invocation);
|
||||
|
||||
context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
|
||||
invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));
|
||||
|
||||
if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
|
||||
{
|
||||
invocationFeatures.Set<IInputConversionFeature>(conversion!);
|
||||
}
|
||||
|
||||
await _application.InvokeFunctionAsync(context);
|
||||
|
||||
var functionBindings = context.GetBindings();
|
||||
|
||||
foreach (var binding in functionBindings.OutputBindingData)
|
||||
{
|
||||
var parameterBinding = new ParameterBinding
|
||||
{
|
||||
Name = binding.Key
|
||||
};
|
||||
|
||||
if (binding.Value is not null)
|
||||
{
|
||||
parameterBinding.Data = await binding.Value.ToRpcAsync(_serializer);
|
||||
}
|
||||
|
||||
response.OutputData.Add(parameterBinding);
|
||||
}
|
||||
|
||||
if (functionBindings.InvocationResult is not null)
|
||||
{
|
||||
TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(_serializer);
|
||||
response.ReturnValue = returnVal;
|
||||
}
|
||||
|
||||
response.Result.Status = StatusResult.Types.Status.Success;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
response.Result.Exception = ex.ToRpcException();
|
||||
response.Result.Status = StatusResult.Types.Status.Failure;
|
||||
|
||||
if (ex.InnerException is TaskCanceledException or OperationCanceledException)
|
||||
{
|
||||
response.Result.Status = StatusResult.Types.Status.Cancelled;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_inflightInvocations.TryRemove(request.InvocationId, out var cts);
|
||||
|
||||
if (context is IAsyncDisposable asyncContext)
|
||||
{
|
||||
await asyncContext.DisposeAsync();
|
||||
}
|
||||
|
||||
(context as IDisposable)?.Dispose();
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
public bool TryCancel(string invocationId)
|
||||
{
|
||||
if (_inflightInvocations.TryGetValue(invocationId, out var cancellationTokenSource))
|
||||
{
|
||||
try
|
||||
{
|
||||
cancellationTokenSource?.Cancel();
|
||||
_logger.LogWarning("Unable to cancel invocation {invocationId}.", invocationId);
|
||||
return true;
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// Do nothing, normal behavior
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Unable to cancel invocation {invocationId}.", invocationId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Converters;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Tests.Converters
|
||||
{
|
||||
public class CancellationTokenConverterTests
|
||||
{
|
||||
private CancellationTokenConverter _converter = new CancellationTokenConverter();
|
||||
|
||||
[Theory]
|
||||
[InlineData(typeof(CancellationToken))]
|
||||
[InlineData(typeof(CancellationToken?))]
|
||||
public async Task ConversionSuccessfulForValidTargetTypeAsync(Type targetType)
|
||||
{
|
||||
var context = new TestConverterContext(targetType, null);
|
||||
var conversionResult = await _converter.ConvertAsync(context);
|
||||
|
||||
Assert.Equal(ConversionStatus.Succeeded, conversionResult.Status);
|
||||
Assert.Equal(typeof(CancellationToken), conversionResult.Value.GetType());
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(typeof(Guid))]
|
||||
[InlineData(typeof(String))]
|
||||
[InlineData(typeof(Object))]
|
||||
[InlineData(typeof(DateTimeOffset))]
|
||||
public async Task ConversionFailedForInvalidTargetType(Type targetType)
|
||||
{
|
||||
var context = new TestConverterContext(targetType, null);
|
||||
var conversionResult = await _converter.ConvertAsync(context);
|
||||
|
||||
Assert.Equal(ConversionStatus.Unhandled, conversionResult.Status);
|
||||
Assert.Null(conversionResult.Value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Moq;
|
||||
|
@ -33,7 +34,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
features.Set<FunctionDefinition>(definition);
|
||||
features.Set<FunctionInvocation>(invocation);
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, features, CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
|
|
@ -7,6 +7,7 @@ using System.Collections.Immutable;
|
|||
using System.Collections.ObjectModel;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Converters;
|
||||
|
@ -197,7 +198,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
mockServiceProvider.Setup(a => a.GetService(typeof(IBindingCache<ConversionResult>)))
|
||||
.Returns(new DefaultBindingCache<ConversionResult>());
|
||||
|
||||
return new TestFunctionContext(definition, invocation, serviceProvider: mockServiceProvider.Object);
|
||||
return new TestFunctionContext(definition, invocation, CancellationToken.None, serviceProvider: mockServiceProvider.Object);
|
||||
}
|
||||
|
||||
private class Functions
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using Azure.Core.Serialization;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Tests.Features;
|
||||
|
@ -46,7 +47,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
{ "myQueueItem", new TestBindingMetadata("myQueueItem","queueTrigger",BindingDirection.In) },
|
||||
{ "myGuid", new TestBindingMetadata("myGuid","queueTrigger",BindingDirection.In) }
|
||||
});
|
||||
var functionContext = new TestFunctionContext(definition, invocation: null, serviceProvider: _serviceProvider, features: features);
|
||||
var functionContext = new TestFunctionContext(definition, invocation: null, CancellationToken.None, serviceProvider: _serviceProvider, features: features);
|
||||
|
||||
// Act
|
||||
var parameterValuesArray = await _modelBindingFeature.BindFunctionInputAsync(functionContext);
|
||||
|
@ -85,7 +86,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
{
|
||||
{ "myQueueItem", new TestBindingMetadata("myQueueItem","queueTrigger",BindingDirection.In) }
|
||||
});
|
||||
var functionContext = new TestFunctionContext(definition, invocation: null, serviceProvider: _serviceProvider, features: features);
|
||||
var functionContext = new TestFunctionContext(definition, invocation: null, CancellationToken.None, serviceProvider: _serviceProvider, features: features);
|
||||
|
||||
// Act
|
||||
// bind to the queue trigger input binding item.
|
||||
|
|
|
@ -6,6 +6,7 @@ using System.Collections.Generic;
|
|||
using System.Collections.ObjectModel;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Converters;
|
||||
|
@ -57,7 +58,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
{ "blob2", new TestBindingMetadata("blob2","blob",BindingDirection.In) }
|
||||
});
|
||||
_features.Set<FunctionDefinition>(qTriggerFunctionDefinition);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
|
@ -111,7 +112,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
{ "myQueueItem", new TestBindingMetadata("myQueueItem","queueTrigger",BindingDirection.In) }
|
||||
});
|
||||
_features.Set<FunctionDefinition>(qTriggerFunctionDefinition);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
|
@ -154,7 +155,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
{ "myQueueItem", new TestBindingMetadata("myQueueItem","queueTrigger",BindingDirection.In) }
|
||||
});
|
||||
_features.Set<FunctionDefinition>(qTriggerFunctionDefinition);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
|
@ -182,7 +183,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
});
|
||||
_features.Set<FunctionDefinition>(functionDefinition);
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
|
||||
var functionBindings = new TestFunctionBindingsFeature();
|
||||
var grpcHttpResponse = new GrpcHttpResponseData(_defaultFunctionContext, HttpStatusCode.OK);
|
||||
|
@ -227,7 +228,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
});
|
||||
_features.Set<FunctionDefinition>(functionDefinition);
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
|
||||
var functionBindings = new TestFunctionBindingsFeature();
|
||||
// Not setting any values to functionBindings.OutputBindingData dictionary.
|
||||
|
|
|
@ -6,6 +6,7 @@ using System.Collections.Generic;
|
|||
using System.Collections.ObjectModel;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Converters;
|
||||
|
@ -55,7 +56,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
});
|
||||
_features.Set<FunctionDefinition>(qTriggerFunctionDefinition);
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
InputData = new ReadOnlyDictionary<string, object>(new Dictionary<string, object> { { "myQueueItem", "foo" } })
|
||||
|
@ -83,7 +84,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
});
|
||||
_features.Set<FunctionDefinition>(httpFunctionDefinition);
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
var grpcHttpReq = new GrpcHttpRequestData(CreateRpcHttp(), _defaultFunctionContext);
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
|
@ -91,7 +92,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
};
|
||||
_features.Set<IFunctionBindingsFeature>(functionBindings);
|
||||
|
||||
// Mock input conversion feature to return a successfully converted HttpRequestData value.
|
||||
// Mock input conversion feature to return a successfully converted HttpRequestData value.
|
||||
_mockConversionFeature.Setup(a => a.ConvertAsync(It.IsAny<ConverterContext>()))
|
||||
.ReturnsAsync(ConversionResult.Success(grpcHttpReq));
|
||||
_features.Set<IInputConversionFeature>(_mockConversionFeature.Object);
|
||||
|
@ -116,7 +117,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
// Arrange
|
||||
_features.Set<FunctionDefinition>(new TestFunctionDefinition());
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
var grpcHttpReq = new GrpcHttpRequestData(CreateRpcHttp(), _defaultFunctionContext);
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
|
@ -151,7 +152,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
{ "MyHttpResponse", new TestBindingMetadata("MyHttpResponse","http",BindingDirection.Out) }
|
||||
}));
|
||||
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features);
|
||||
_defaultFunctionContext = new DefaultFunctionContext(_serviceScopeFactory, _features, CancellationToken.None);
|
||||
var grpcHttpReq = new GrpcHttpRequestData(CreateRpcHttp(), _defaultFunctionContext);
|
||||
var functionBindings = new TestFunctionBindingsFeature
|
||||
{
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
using Microsoft.Azure.Functions.Worker.Http;
|
||||
using Microsoft.Azure.Functions.Worker.Invocation;
|
||||
|
@ -77,5 +78,13 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
return req.CreateResponse();
|
||||
}
|
||||
}
|
||||
|
||||
private class MyFunctionClassWithCancellation
|
||||
{
|
||||
public HttpResponseData Run(HttpRequestData req, CancellationToken cancellationToken)
|
||||
{
|
||||
return req.CreateResponse();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,12 +5,15 @@ using System;
|
|||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Core.Serialization;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
using Microsoft.Azure.Functions.Worker.Handlers;
|
||||
using Microsoft.Azure.Functions.Worker.Invocation;
|
||||
using Microsoft.Azure.Functions.Worker.OutputBindings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
||||
|
@ -26,6 +29,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
private readonly Mock<IMethodInfoLocator> _mockMethodInfoLocator = new(MockBehavior.Strict);
|
||||
private TestFunctionContext _context = new();
|
||||
private TestAsyncFunctionContext _asyncContext = new();
|
||||
private ILogger<InvocationHandler> _testLogger;
|
||||
|
||||
public GrpcWorkerTests()
|
||||
{
|
||||
|
@ -33,10 +37,9 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
.Setup(m => m.LoadFunction(It.IsAny<FunctionDefinition>()));
|
||||
|
||||
_mockApplication
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>()))
|
||||
.Returns<IInvocationFeatures>(f =>
|
||||
{
|
||||
_context = new TestFunctionContext(f);
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>(), It.IsAny<CancellationToken>()))
|
||||
.Returns((IInvocationFeatures f, CancellationToken ct) => {
|
||||
_context = new TestFunctionContext(f, ct);
|
||||
return _context;
|
||||
});
|
||||
|
||||
|
@ -54,8 +57,10 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
|
||||
IInputConversionFeature conversionFeature = mockConversionFeature.Object;
|
||||
_mockInputConversionFeatureProvider
|
||||
.Setup(m=>m.TryCreate(typeof(DefaultInputConversionFeature), out conversionFeature))
|
||||
.Setup(m => m.TryCreate(typeof(DefaultInputConversionFeature), out conversionFeature))
|
||||
.Returns(true);
|
||||
|
||||
_testLogger = TestLoggerProvider.Factory.CreateLogger<InvocationHandler>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
@ -130,10 +135,13 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
[Fact]
|
||||
public async Task Invoke_ReturnsSuccess()
|
||||
{
|
||||
var request = CreateInvocationRequest();
|
||||
var request = TestUtility.CreateInvocationRequest();
|
||||
|
||||
var response = await GrpcWorker.InvocationRequestHandlerAsync(request, _mockApplication.Object, _mockFeaturesFactory.Object,
|
||||
new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, _mockInputConversionFeatureProvider.Object);
|
||||
var invocationHandler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
var response = await invocationHandler.InvokeAsync(request);
|
||||
|
||||
Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status);
|
||||
Assert.True(_context.IsDisposed);
|
||||
|
@ -142,19 +150,22 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
[Fact]
|
||||
public async Task Invoke_ReturnsSuccess_AsyncFunctionContext()
|
||||
{
|
||||
var request = CreateInvocationRequest();
|
||||
var request = TestUtility.CreateInvocationRequest();
|
||||
|
||||
// Mock IFunctionApplication.CreateContext to return TestAsyncFunctionContext instance.
|
||||
_mockApplication
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>()))
|
||||
.Returns<IInvocationFeatures>(f =>
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>(), It.IsAny<CancellationToken>()))
|
||||
.Returns<IInvocationFeatures, CancellationToken>((f,ct) =>
|
||||
{
|
||||
_context = new TestAsyncFunctionContext(f);
|
||||
return _context;
|
||||
});
|
||||
|
||||
var response = await GrpcWorker.InvocationRequestHandlerAsync(request, _mockApplication.Object, _mockFeaturesFactory.Object,
|
||||
new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, _mockInputConversionFeatureProvider.Object);
|
||||
var invocationHandler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
var response = await invocationHandler.InvokeAsync(request);
|
||||
|
||||
Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status);
|
||||
Assert.True((_context as TestAsyncFunctionContext).IsAsyncDisposed);
|
||||
|
@ -164,11 +175,14 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
[Fact]
|
||||
public async Task Invoke_SetsRetryContext()
|
||||
{
|
||||
var request = CreateInvocationRequest();
|
||||
var request = TestUtility.CreateInvocationRequest();
|
||||
|
||||
var invocationHandler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
var response = await invocationHandler.InvokeAsync(request);
|
||||
|
||||
var response = await GrpcWorker.InvocationRequestHandlerAsync(request, _mockApplication.Object, _mockFeaturesFactory.Object,
|
||||
new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, _mockInputConversionFeatureProvider.Object);
|
||||
|
||||
Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status);
|
||||
Assert.True(_context.IsDisposed);
|
||||
Assert.Equal(request.RetryContext.RetryCount, _context.RetryContext.RetryCount);
|
||||
|
@ -179,36 +193,22 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
public async Task Invoke_CreateContextThrows_ReturnsFailure()
|
||||
{
|
||||
_mockApplication
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>()))
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>(), It.IsAny<CancellationToken>()))
|
||||
.Throws(new InvalidOperationException("whoops"));
|
||||
|
||||
var request = CreateInvocationRequest();
|
||||
var request = TestUtility.CreateInvocationRequest();
|
||||
|
||||
var response = await GrpcWorker.InvocationRequestHandlerAsync(request, _mockApplication.Object, _mockFeaturesFactory.Object,
|
||||
new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, _mockInputConversionFeatureProvider.Object);
|
||||
var invocationHandler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
var response = await invocationHandler.InvokeAsync(request);
|
||||
|
||||
Assert.Equal(StatusResult.Types.Status.Failure, response.Result.Status);
|
||||
Assert.Contains("InvalidOperationException: whoops", response.Result.Exception.Message);
|
||||
Assert.Contains("CreateContext", response.Result.Exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Invoke_InvokeAsyncThrows_ReturnsFailure()
|
||||
{
|
||||
_mockApplication
|
||||
.Setup(m => m.InvokeFunctionAsync(It.IsAny<FunctionContext>()))
|
||||
.Throws(new InvalidOperationException("whoops"));
|
||||
|
||||
var request = CreateInvocationRequest();
|
||||
|
||||
var response = await GrpcWorker.InvocationRequestHandlerAsync(request, _mockApplication.Object, _mockFeaturesFactory.Object,
|
||||
new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, _mockInputConversionFeatureProvider.Object);
|
||||
|
||||
Assert.Equal(StatusResult.Types.Status.Failure, response.Result.Status);
|
||||
Assert.Contains("InvalidOperationException: whoops", response.Result.Exception.Message);
|
||||
Assert.Contains("InvokeFunctionAsync", response.Result.Exception.Message);
|
||||
}
|
||||
|
||||
private static FunctionLoadRequest CreateFunctionLoadRequest()
|
||||
{
|
||||
return new FunctionLoadRequest
|
||||
|
@ -220,23 +220,6 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
};
|
||||
}
|
||||
|
||||
private static InvocationRequest CreateInvocationRequest()
|
||||
{
|
||||
return new InvocationRequest
|
||||
{
|
||||
TraceContext = new RpcTraceContext
|
||||
{
|
||||
TraceParent = Guid.NewGuid().ToString(),
|
||||
TraceState = Guid.NewGuid().ToString()
|
||||
},
|
||||
RetryContext = new Grpc.Messages.RetryContext
|
||||
{
|
||||
MaxRetryCount = 3,
|
||||
RetryCount = 2
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Used for MethodInfo in tests
|
||||
private void TestRun()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Core.Serialization;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
using Microsoft.Azure.Functions.Worker.Handlers;
|
||||
using Microsoft.Azure.Functions.Worker.OutputBindings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Tests
|
||||
{
|
||||
public class InvocationHandlerTests
|
||||
{
|
||||
private readonly Mock<IFunctionsApplication> _mockApplication = new(MockBehavior.Strict);
|
||||
private readonly Mock<IInvocationFeaturesFactory> _mockInvocationFeaturesFactory = new(MockBehavior.Strict);
|
||||
private readonly Mock<IOutputBindingsInfoProvider> _mockOutputBindingsInfoProvider = new(MockBehavior.Strict);
|
||||
private readonly Mock<IInputConversionFeatureProvider> _mockInputConversionFeatureProvider = new(MockBehavior.Strict);
|
||||
private readonly Mock<IInputConversionFeature> mockConversionFeature = new(MockBehavior.Strict);
|
||||
private TestFunctionContext _context = new();
|
||||
private ILogger<InvocationHandler> _testLogger;
|
||||
|
||||
public InvocationHandlerTests()
|
||||
{
|
||||
_mockApplication
|
||||
.Setup(m => m.CreateContext(It.IsAny<IInvocationFeatures>(), It.IsAny<CancellationToken>()))
|
||||
.Returns((IInvocationFeatures f, CancellationToken ct) => {
|
||||
_context = new TestFunctionContext(f, ct);
|
||||
return _context;
|
||||
});
|
||||
|
||||
_mockApplication
|
||||
.Setup(m => m.InvokeFunctionAsync(It.IsAny<FunctionContext>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
|
||||
_mockInvocationFeaturesFactory
|
||||
.Setup(m => m.Create())
|
||||
.Returns(new InvocationFeatures(Enumerable.Empty<IInvocationFeatureProvider>()));
|
||||
|
||||
IInputConversionFeature conversionFeature = mockConversionFeature.Object;
|
||||
_mockInputConversionFeatureProvider
|
||||
.Setup(m => m.TryCreate(typeof(DefaultInputConversionFeature), out conversionFeature))
|
||||
.Returns(true);
|
||||
|
||||
_testLogger = TestLoggerProvider.Factory.CreateLogger<InvocationHandler>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InvokeAsync_CreatesValidCancellationToken_ReturnsSuccess()
|
||||
{
|
||||
var invocationId = "5fb3a9b4-0b38-450a-9d46-35946e7edea7";
|
||||
var request = TestUtility.CreateInvocationRequest(invocationId);
|
||||
|
||||
var handler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
var response = await handler.InvokeAsync(request);
|
||||
|
||||
// InvokeAsync should create a real cancellation token which can be cancelled,
|
||||
// otherwise we set the token to be CancellationToken.Empty which **cannot** be cancelled
|
||||
Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status);
|
||||
Assert.True(_context.CancellationToken.CanBeCanceled);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InvokeAsync_ThrowsTaskCanceledException_ReturnsCancelled()
|
||||
{
|
||||
_mockApplication
|
||||
.Setup(m => m.InvokeFunctionAsync(It.IsAny<FunctionContext>()))
|
||||
.Throws(new AggregateException(new Exception[] { new TaskCanceledException() }));
|
||||
|
||||
var request = TestUtility.CreateInvocationRequest("abc");
|
||||
|
||||
var invocationHandler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
var response = await invocationHandler.InvokeAsync(request);
|
||||
|
||||
Assert.Equal(StatusResult.Types.Status.Cancelled, response.Result.Status);
|
||||
Assert.Contains("TaskCanceledException", response.Result.Exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Cancel_InvocationInProgress_CancelsTokenSource_ReturnsTrue()
|
||||
{
|
||||
var invocationId = "5fb3a9b4-0b38-450a-9d46-35946e7edea7";
|
||||
var request = TestUtility.CreateInvocationRequest(invocationId);
|
||||
var cts = new CancellationTokenSource();
|
||||
|
||||
var handler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
// Mock delay in InvokeFunctionAsync so that we can cancel mid invocation
|
||||
_mockApplication
|
||||
.Setup(m => m.InvokeFunctionAsync(It.IsAny<FunctionContext>()))
|
||||
.Callback(() => Thread.Sleep(1000))
|
||||
.Returns(Task.CompletedTask);
|
||||
|
||||
// Don't wait for InvokeAsync so we can cancel whilst it's in progress
|
||||
_ = Task.Run(async () => {
|
||||
await handler.InvokeAsync(request);
|
||||
});
|
||||
|
||||
// Buffer to ensure the cancellation token source was created before we try to cancel
|
||||
Thread.Sleep(500);
|
||||
|
||||
var result = handler.TryCancel(invocationId);
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancel_InvocationCompleted_ReturnsFalse()
|
||||
{
|
||||
var invocationId = "5fb3a9b4-0b38-450a-9d46-35946e7edea7";
|
||||
var request = TestUtility.CreateInvocationRequest(invocationId);
|
||||
|
||||
var handler = new InvocationHandler(_mockApplication.Object,
|
||||
_mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object,
|
||||
_mockInputConversionFeatureProvider.Object, _testLogger);
|
||||
|
||||
_ = await handler.InvokeAsync(request);
|
||||
|
||||
var result = handler.TryCancel(invocationId);
|
||||
Assert.False(result);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
|
||||
public static TestLogger<T> Create()
|
||||
{
|
||||
// We want to use the logic for category naming which is internal to LoggerFactory.
|
||||
// We want to use the logic for category naming which is internal to LoggerFactory.
|
||||
// So we'll create a TestLogger via the LoggerFactory and grab it's category.
|
||||
TestLoggerProvider testLoggerProvider = new TestLoggerProvider();
|
||||
LoggerFactory _testLoggerFactory = new LoggerFactory(new[] { testLoggerProvider });
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.OutputBindings;
|
||||
|
@ -14,7 +15,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
internal class TestAsyncFunctionContext : TestFunctionContext, IAsyncDisposable
|
||||
{
|
||||
public TestAsyncFunctionContext()
|
||||
: base(new TestFunctionDefinition(), new TestFunctionInvocation())
|
||||
: base(new TestFunctionDefinition(), new TestFunctionInvocation(), CancellationToken.None)
|
||||
{
|
||||
}
|
||||
public TestAsyncFunctionContext(IInvocationFeatures features) : base(features)
|
||||
|
@ -33,21 +34,33 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
internal class TestFunctionContext : FunctionContext, IDisposable
|
||||
{
|
||||
private readonly FunctionInvocation _invocation;
|
||||
private readonly CancellationToken _cancellationToken;
|
||||
|
||||
public TestFunctionContext()
|
||||
: this(new TestFunctionDefinition(), new TestFunctionInvocation())
|
||||
: this(new TestFunctionDefinition(), new TestFunctionInvocation(), CancellationToken.None)
|
||||
{
|
||||
}
|
||||
|
||||
public TestFunctionContext(IInvocationFeatures features)
|
||||
: this(new TestFunctionDefinition(), new TestFunctionInvocation(), features)
|
||||
: this(new TestFunctionDefinition(), new TestFunctionInvocation(), CancellationToken.None, features)
|
||||
{
|
||||
}
|
||||
|
||||
public TestFunctionContext(FunctionDefinition functionDefinition, FunctionInvocation invocation, IInvocationFeatures features = null, IServiceProvider serviceProvider = null)
|
||||
public TestFunctionContext(IInvocationFeatures features, CancellationToken token)
|
||||
: this(new TestFunctionDefinition(), new TestFunctionInvocation(), token, features)
|
||||
{
|
||||
}
|
||||
|
||||
public TestFunctionContext(FunctionDefinition functionDefinition, FunctionInvocation invocation)
|
||||
: this(functionDefinition, invocation, CancellationToken.None)
|
||||
{
|
||||
}
|
||||
|
||||
public TestFunctionContext(FunctionDefinition functionDefinition, FunctionInvocation invocation, CancellationToken cancellationToken, IInvocationFeatures features = null, IServiceProvider serviceProvider = null)
|
||||
{
|
||||
FunctionDefinition = functionDefinition;
|
||||
_invocation = invocation;
|
||||
_cancellationToken = cancellationToken;
|
||||
|
||||
if (features != null)
|
||||
{
|
||||
|
@ -86,6 +99,8 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
|
||||
public override RetryContext RetryContext => Features.Get<IExecutionRetryFeature>()?.Context;
|
||||
|
||||
public override CancellationToken CancellationToken => _cancellationToken;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
IsDisposed = true;
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.Azure.Functions.Worker.Context;
|
||||
|
||||
namespace Microsoft.Azure.Functions.Worker.Tests
|
||||
{
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
using System;
|
||||
using Microsoft.Azure.Functions.Worker.Context.Features;
|
||||
using Microsoft.Azure.Functions.Worker.Converters;
|
||||
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Xunit.Sdk;
|
||||
|
@ -54,5 +55,23 @@ namespace Microsoft.Azure.Functions.Worker.Tests
|
|||
options ??= new TOptions();
|
||||
return new OptionsWrapper<TOptions>(options);
|
||||
}
|
||||
|
||||
public static InvocationRequest CreateInvocationRequest(string invocationId = "")
|
||||
{
|
||||
return new InvocationRequest
|
||||
{
|
||||
InvocationId = invocationId,
|
||||
TraceContext = new RpcTraceContext
|
||||
{
|
||||
TraceParent = Guid.NewGuid().ToString(),
|
||||
TraceState = Guid.NewGuid().ToString()
|
||||
},
|
||||
RetryContext = new Grpc.Messages.RetryContext
|
||||
{
|
||||
MaxRetryCount = 3,
|
||||
RetryCount = 2
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,6 +72,32 @@
|
|||
"maximumInterval": "00:15:00"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "HttpTriggerWithCancellation",
|
||||
"scriptFile": "FunctionApp.dll",
|
||||
"entryPoint": "FunctionApp.HttpTriggerWithCancellation.Run",
|
||||
"language": "dotnet-isolated",
|
||||
"properties": {
|
||||
"IsCodeless": false
|
||||
},
|
||||
"bindings": [
|
||||
{
|
||||
"name": "req",
|
||||
"type": "httpTrigger",
|
||||
"direction": "In",
|
||||
"authLevel": "Anonymous",
|
||||
"methods": [
|
||||
"get",
|
||||
"post"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "$return",
|
||||
"type": "http",
|
||||
"direction": "Out"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "HttpTriggerWithDependencyInjection",
|
||||
"scriptFile": "FunctionApp.dll",
|
||||
|
|
Загрузка…
Ссылка в новой задаче