[Durable] Implement retry policy support (#590)

This commit is contained in:
Anatoli Beliaev 2021-02-23 14:05:52 -08:00 коммит произвёл GitHub
Родитель e38705846f
Коммит 30250f43a2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
33 изменённых файлов: 1051 добавлений и 37 удалений

6
.gitignore поставляемый
Просмотреть файл

@ -63,8 +63,10 @@ PowerShell.sln.DotSettings.user
StyleCop.Cache
examples/PSCoreApp/Modules
src/Modules
!src/Modules/Microsoft.Azure.Functions.PowerShellWorker
src/Modules/Microsoft.PowerShell.*
src/Modules/PackageManagement
src/Modules/PowerShellGet
src/Modules/ThreadJob
# protobuf
protobuf/*

Просмотреть файл

@ -0,0 +1,9 @@
{
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,9 @@
param($name)
# Intentional intermittent error
$random = Get-Random -Minimum 0.0 -Maximum 1.0
if ($random -gt 0.2) {
throw 'Nope, no luck this time...'
}
"Hello $name"

Просмотреть файл

@ -0,0 +1,9 @@
{
"bindings": [
{
"name": "Context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,17 @@
using namespace System.Net
param($Context)
$ErrorActionPreference = 'Stop'
$output = @()
$retryOptions = New-DurableRetryOptions `
-FirstRetryInterval (New-Timespan -Seconds 1) `
-MaxNumberOfAttempts 7
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Tokyo' -RetryOptions $retryOptions
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'Seattle' -RetryOptions $retryOptions
$output += Invoke-ActivityFunction -FunctionName 'FlakyActivity' -Input 'London' -RetryOptions $retryOptions
$output

Просмотреть файл

@ -0,0 +1,24 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "Response"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,13 @@
using namespace System.Net
param($Request, $TriggerMetadata)
Write-Host 'FunctionChainingWithRetriesStart started'
$InstanceId = Start-NewOrchestration -FunctionName 'FunctionChainingWithRetriesOrchestrator' -InputObject 'Hello'
Write-Host "Started orchestration with ID = '$InstanceId'"
$Response = New-OrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
Write-Host 'FunctionChainingWithRetriesStart completed'

Просмотреть файл

@ -0,0 +1,71 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Collections.Generic;
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions
{
/// <summary>
/// An orchestration action that represents calling an activity function with retry.
/// </summary>
internal class CallActivityWithRetryAction : OrchestrationAction
{
/// <summary>
/// The activity function name.
/// </summary>
public readonly string FunctionName;
/// <summary>
/// The input to the activity function.
/// </summary>
public readonly object Input;
/// <summary>
/// Retry options.
/// </summary>
public readonly Dictionary<string, object> RetryOptions;
public CallActivityWithRetryAction(string functionName, object input, RetryOptions retryOptions)
: base(ActionType.CallActivityWithRetry)
{
FunctionName = functionName;
Input = input;
RetryOptions = ToDictionary(retryOptions);
}
private static Dictionary<string, object> ToDictionary(RetryOptions retryOptions)
{
var result = new Dictionary<string, object>()
{
{ "firstRetryIntervalInMilliseconds", ToIntMilliseconds(retryOptions.FirstRetryInterval) },
{ "maxNumberOfAttempts", retryOptions.MaxNumberOfAttempts }
};
AddOptionalValue(result, "backoffCoefficient", retryOptions.BackoffCoefficient, x => x);
AddOptionalValue(result, "maxRetryIntervalInMilliseconds", retryOptions.MaxRetryInterval, ToIntMilliseconds);
AddOptionalValue(result, "retryTimeoutInMilliseconds", retryOptions.RetryTimeout, ToIntMilliseconds);
return result;
}
private static void AddOptionalValue<T>(
Dictionary<string, object> dictionary,
string name,
T? nullable,
Func<T, object> transformValue) where T : struct
{
if (nullable.HasValue)
{
dictionary.Add(name, transformValue(nullable.Value));
}
}
private static object ToIntMilliseconds(TimeSpan timespan)
{
return (int)timespan.TotalMilliseconds;
}
}
}

Просмотреть файл

@ -33,6 +33,10 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
[Parameter]
public SwitchParameter NoWait { get; set; }
[Parameter]
[ValidateNotNull]
public RetryOptions RetryOptions { get; set; }
private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler();
protected override void EndProcessing()
@ -41,11 +45,14 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];
var loadedFunctions = FunctionLoader.GetLoadedFunctions();
var task = new ActivityInvocationTask(FunctionName, Input);
var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions);
ActivityInvocationTask.ValidateTask(task, loadedFunctions);
_durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
task, context, NoWait.IsPresent, WriteObject, failureReason => DurableActivityErrorHandler.Handle(this, failureReason));
task, context, NoWait.IsPresent,
output: WriteObject,
onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason),
retryOptions: RetryOptions);
}
protected override void StopProcessing()

Просмотреть файл

@ -20,7 +20,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
OrchestrationContext context,
bool noWait,
Action<object> output,
Action<string> onFailure)
Action<string> onFailure,
RetryOptions retryOptions = null)
{
context.OrchestrationActionCollector.Add(task.CreateOrchestrationAction());
@ -30,6 +31,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
}
else
{
context.OrchestrationActionCollector.NextBatch();
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context);
var completedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent);
@ -56,7 +59,32 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
break;
case HistoryEventType.TaskFailed:
onFailure(completedHistoryEvent.Reason);
if (retryOptions == null)
{
onFailure(completedHistoryEvent.Reason);
}
else
{
// Reset IsProcessed, let RetryProcessor handle these events instead.
scheduledHistoryEvent.IsProcessed = false;
completedHistoryEvent.IsProcessed = false;
var shouldContinueProcessing =
RetryProcessor.Process(
context.History,
scheduledHistoryEvent,
retryOptions.MaxNumberOfAttempts,
onSuccess:
result => {
output(TypeExtensions.ConvertFromJson(result));
},
onFailure);
if (shouldContinueProcessing)
{
InitiateAndWaitForStop(context);
}
}
break;
}
}
@ -73,6 +101,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
OrchestrationContext context,
Action<object> output)
{
context.OrchestrationActionCollector.NextBatch();
var completedEvents = new List<HistoryEvent>();
foreach (var task in tasksToWaitFor)
{
@ -118,6 +148,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
OrchestrationContext context,
Action<object> output)
{
context.OrchestrationActionCollector.NextBatch();
var completedTasks = new List<DurableTask>();
DurableTask firstCompletedTask = null;
int firstCompletedHistoryEventIndex = -1;

Просмотреть файл

@ -58,5 +58,12 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
// Internal used only
public bool IsProcessed { get; set; }
public override string ToString()
{
var relatedEventId = EventType == HistoryEventType.TimerFired ? TimerId : TaskScheduledId;
var processedMarker = IsProcessed ? "X" : " ";
return $"[{EventId}] {EventType} '{Name}' ({relatedEventId}) [{processedMarker}]";
}
}
}

Просмотреть файл

@ -7,22 +7,36 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
internal class OrchestrationActionCollector
{
private readonly List<OrchestrationAction> _actions = new List<OrchestrationAction>();
private readonly List<List<OrchestrationAction>> _actions = new();
private readonly AutoResetEvent _stopEvent = new AutoResetEvent(initialState: false);
private bool _nextBatch = true;
public void Add(OrchestrationAction action)
{
_actions.Add(action);
if (_nextBatch)
{
_actions.Add(new List<OrchestrationAction>());
_nextBatch = false;
}
_actions.Last().Add(action);
}
public Tuple<bool, List<OrchestrationAction>> WaitForActions(WaitHandle completionWaitHandle)
public void NextBatch()
{
_nextBatch = true;
}
public Tuple<bool, List<List<OrchestrationAction>>> WaitForActions(WaitHandle completionWaitHandle)
{
var waitHandles = new[] { _stopEvent, completionWaitHandle };
var signaledHandleIndex = WaitHandle.WaitAny(waitHandles);

Просмотреть файл

@ -24,17 +24,17 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
}
public OrchestrationFailureException(List<OrchestrationAction> actions, Exception innerException)
public OrchestrationFailureException(List<List<OrchestrationAction>> actions, Exception innerException)
: base(FormatOrchestrationFailureMessage(actions, innerException), innerException)
{
}
private static string FormatOrchestrationFailureMessage(List<OrchestrationAction> actions, Exception exception)
private static string FormatOrchestrationFailureMessage(List<List<OrchestrationAction>> actions, Exception exception)
{
// For more details on why this message looks like this, see:
// - https://github.com/Azure/azure-functions-durable-js/pull/145
// - https://github.com/Azure/azure-functions-durable-extension/pull/1171
var orchestrationMessage = new OrchestrationMessage(isDone: false, new List<List<OrchestrationAction>> { actions }, output: null, exception.Message);
var orchestrationMessage = new OrchestrationMessage(isDone: false, actions, output: null, exception.Message);
var message = $"{exception.Message}{OutOfProcDataLabel}{JsonConvert.SerializeObject(orchestrationMessage)}";
return message;
}

Просмотреть файл

@ -67,10 +67,10 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
private static Hashtable CreateOrchestrationResult(
bool isDone,
List<OrchestrationAction> actions,
List<List<OrchestrationAction>> actions,
object output)
{
var orchestrationMessage = new OrchestrationMessage(isDone, new List<List<OrchestrationAction>> { actions }, output);
var orchestrationMessage = new OrchestrationMessage(isDone, actions, output);
return new Hashtable { { AzFunctionInfo.DollarReturn, orchestrationMessage } };
}
}

Просмотреть файл

@ -0,0 +1,38 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
#pragma warning disable 1591 // Missing XML comment for publicly visible type or member 'member'
using System;
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
public class RetryOptions
{
public TimeSpan FirstRetryInterval { get; }
public int MaxNumberOfAttempts { get; }
public double? BackoffCoefficient { get; }
public TimeSpan? MaxRetryInterval { get; }
public TimeSpan? RetryTimeout { get; }
public RetryOptions(
TimeSpan firstRetryInterval,
int maxNumberOfAttempts,
double? backoffCoefficient,
TimeSpan? maxRetryInterval,
TimeSpan? retryTimeout)
{
this.FirstRetryInterval = firstRetryInterval;
this.MaxNumberOfAttempts = maxNumberOfAttempts;
this.BackoffCoefficient = backoffCoefficient;
this.MaxRetryInterval = maxRetryInterval;
this.RetryTimeout = retryTimeout;
}
}
}

Просмотреть файл

@ -0,0 +1,130 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
internal class RetryProcessor
{
// Returns true to indicate that processing this activity invocation should continue.
public static bool Process(
HistoryEvent[] history,
HistoryEvent firstTaskScheduledEvent,
int maxNumberOfAttempts,
Action<string> onSuccess,
Action<string> onFinalFailure)
{
var firstTaskScheduledEventIndex = FindEventIndex(history, firstTaskScheduledEvent);
// Inspired by https://github.com/Azure/azure-functions-durable-js/commit/d789181234ace85df51ce8a849f15b7c8ae2a4f1
var attempt = 1;
HistoryEvent taskScheduled = null;
HistoryEvent taskFailed = null;
HistoryEvent taskRetryTimer = null;
for (var i = firstTaskScheduledEventIndex; i < history.Length; i++)
{
var historyEvent = history[i];
if (historyEvent.IsProcessed)
{
continue;
}
if (taskScheduled == null)
{
if (historyEvent.EventType == HistoryEventType.TaskScheduled)
{
taskScheduled = historyEvent;
}
continue;
}
if (historyEvent.EventType == HistoryEventType.TaskCompleted)
{
if (historyEvent.TaskScheduledId == taskScheduled.EventId)
{
taskScheduled.IsProcessed = true;
historyEvent.IsProcessed = true;
onSuccess(historyEvent.Result);
return false;
}
else
{
continue;
}
}
if (taskFailed == null)
{
if (historyEvent.EventType == HistoryEventType.TaskFailed)
{
if (historyEvent.TaskScheduledId == taskScheduled.EventId)
{
taskFailed = historyEvent;
}
}
continue;
}
if (taskRetryTimer == null)
{
if (historyEvent.EventType == HistoryEventType.TimerCreated)
{
taskRetryTimer = historyEvent;
}
else
{
continue;
}
}
if (historyEvent.EventType == HistoryEventType.TimerFired)
{
if (historyEvent.TimerId == taskRetryTimer.EventId)
{
taskScheduled.IsProcessed = true;
taskFailed.IsProcessed = true;
taskRetryTimer.IsProcessed = true;
historyEvent.IsProcessed = true;
if (attempt >= maxNumberOfAttempts)
{
onFinalFailure(taskFailed.Reason);
return false;
}
else
{
attempt++;
taskScheduled = null;
taskFailed = null;
taskRetryTimer = null;
}
}
else
{
continue;
}
}
}
return true;
}
private static int FindEventIndex(HistoryEvent[] orchestrationHistory, HistoryEvent historyEvent)
{
var result = 0;
foreach (var e in orchestrationHistory)
{
if (ReferenceEquals(historyEvent, e))
{
return result;
}
result++;
}
return -1;
}
}
}

Просмотреть файл

@ -23,10 +23,18 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks
private object Input { get; }
internal ActivityInvocationTask(string functionName, object functionInput)
private RetryOptions RetryOptions { get; }
internal ActivityInvocationTask(string functionName, object functionInput, RetryOptions retryOptions)
{
FunctionName = functionName;
Input = functionInput;
RetryOptions = retryOptions;
}
internal ActivityInvocationTask(string functionName, object functionInput)
: this(functionName, functionInput, retryOptions: null)
{
}
internal override HistoryEvent GetScheduledHistoryEvent(OrchestrationContext context)
@ -49,7 +57,9 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks
internal override OrchestrationAction CreateOrchestrationAction()
{
return new CallActivityAction(FunctionName, Input);
return RetryOptions == null
? new CallActivityAction(FunctionName, Input)
: new CallActivityWithRetryAction(FunctionName, Input, RetryOptions);
}
internal static void ValidateTask(ActivityInvocationTask task, IEnumerable<AzFunctionInfo> loadedFunctions)

Просмотреть файл

@ -49,14 +49,13 @@ NestedModules = @('Microsoft.Azure.Functions.PowerShellWorker.psm1', 'Microsoft.
# Functions to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no functions to export.
FunctionsToExport = @(
'New-DurableRetryOptions',
'New-OrchestrationCheckStatusResponse',
'Send-DurableExternalEvent',
'Start-NewOrchestration')
# Cmdlets to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no cmdlets to export.
CmdletsToExport = @(
'Get-OutputBinding',
'Invoke-ActivityFunction',
'Push-OutputBinding',
@ -72,7 +71,6 @@ VariablesToExport = @()
# Aliases to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no aliases to export.
AliasesToExport = @(
'Wait-ActivityFunction')
# Private data to pass to the module specified in RootModule/ModuleToProcess. This may also contain a PSData hashtable with additional module metadata used by PowerShell.

Просмотреть файл

@ -207,4 +207,30 @@ function GetRaiseEventUrl(
}
return $RequestUrl
}
}
function New-DurableRetryOptions(
[Parameter(Mandatory = $true)]
[timespan]
$FirstRetryInterval,
[Parameter(Mandatory = $true)]
[int]
$MaxNumberOfAttempts,
[double]
$BackoffCoefficient,
[timespan]
$MaxRetryInterval,
[timespan]
$RetryTimeout) {
[Microsoft.Azure.Functions.PowerShellWorker.Durable.RetryOptions]::new(
$FirstRetryInterval,
$MaxNumberOfAttempts,
$PSBoundParameters.ContainsKey('BackoffCoefficient') ? $BackoffCoefficient : $null,
$MaxRetryInterval,
$RetryTimeout)
}

Просмотреть файл

@ -142,7 +142,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
private static string ConvertToJson(object fromObj)
{
var context = new JsonObject.ConvertToJsonContext(
maxDepth: 3,
maxDepth: 4,
enumsAsStrings: false,
compressOutput: true);

Просмотреть файл

@ -42,7 +42,7 @@ namespace Azure.Functions.PowerShell.Tests.E2E
Assert.NotNull(initialResponseBodyObject.terminatePostUri);
Assert.NotNull(initialResponseBodyObject.rewindPostUri);
var orchestrationCompletionTimeout = TimeSpan.FromSeconds(60);
var orchestrationCompletionTimeout = TimeSpan.FromSeconds(90);
var startTime = DateTime.UtcNow;
using (var httpClient = new HttpClient())
@ -76,6 +76,7 @@ namespace Azure.Functions.PowerShell.Tests.E2E
Assert.Equal("Hello Tokyo", statusResponseBody.output[0].ToString());
Assert.Equal("Hello Seattle", statusResponseBody.output[1].ToString());
Assert.Equal("Hello London", statusResponseBody.output[2].ToString());
Assert.Equal("Hello Toronto", statusResponseBody.output[3].ToString());
return;
}

Просмотреть файл

@ -0,0 +1,9 @@
{
"bindings": [
{
"name": "InputData",
"type": "activityTrigger",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,9 @@
param([hashtable]$InputData)
# Intentional intermittent error, eventually "self-healing"
$elapsedTime = (Get-Date).ToUniversalTime() - $InputData.StartTime
if ($elapsedTime.TotalSeconds -lt 3) {
throw 'Nope, no luck this time...'
}
"Hello $($InputData.Name)"

Просмотреть файл

@ -16,6 +16,11 @@ $tasks += Invoke-ActivityFunction -FunctionName "DurableActivity" -Input "Seattl
$tasks += Invoke-ActivityFunction -FunctionName "DurableActivity" -Input "London" -NoWait
$output += Wait-DurableTask -Task $tasks
# Retries
$retryOptions = New-DurableRetryOptions -FirstRetryInterval (New-Timespan -Seconds 2) -MaxNumberOfAttempts 5
$inputData = @{ Name = 'Toronto'; StartTime = $Context.CurrentUtcDateTime }
$output += Invoke-ActivityFunction -FunctionName "DurableActivityFlaky" -Input $inputData -RetryOptions $retryOptions
Write-Host "DurableOrchestrator: finished."
return $output

Просмотреть файл

@ -289,7 +289,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
private static void VerifyCallActivityActionAdded(OrchestrationContext orchestrationContext)
{
var actions = DurableTestUtilities.GetCollectedActions(orchestrationContext);
var action = (CallActivityAction) actions.Single();
var action = (CallActivityAction)actions.Single().Single();
Assert.Equal(FunctionName, action.FunctionName);
Assert.Equal(FunctionInput, action.Input);
}

Просмотреть файл

@ -0,0 +1,65 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
{
using System;
using Microsoft.Azure.Functions.PowerShellWorker.Durable;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
using Xunit;
public class CallActivityWithRetryActionTests
{
[Theory]
[InlineData(1, 1, null, null, null)]
[InlineData(5, 3, null, null, null)]
[InlineData(2, 3, 1.0, null, null)]
[InlineData(4, 3, null, 1, null)]
[InlineData(8, 3, null, null, 1)]
[InlineData(1, 3, 0.5, 6, 7)]
public void RetryOptionsContainsNonNullProperties(
int firstRetryIntervalInMilliseconds,
int maxNumberOfAttempts,
double? backoffCoefficient,
int? maxRetryIntervalInMilliseconds,
int? retryTimeoutInMilliseconds)
{
var retryOptions = new RetryOptions(
TimeSpan.FromMilliseconds(firstRetryIntervalInMilliseconds),
maxNumberOfAttempts,
backoffCoefficient,
CreateTimeSpanOrNull(maxRetryIntervalInMilliseconds),
CreateTimeSpanOrNull(retryTimeoutInMilliseconds));
var action = new CallActivityWithRetryAction("FunctionName", "input", retryOptions);
Assert.Equal(firstRetryIntervalInMilliseconds, action.RetryOptions["firstRetryIntervalInMilliseconds"]);
Assert.Equal(maxNumberOfAttempts, action.RetryOptions["maxNumberOfAttempts"]);
AssertRetryOptionsEntry("backoffCoefficient", backoffCoefficient, action);
AssertRetryOptionsEntry("maxRetryIntervalInMilliseconds", maxRetryIntervalInMilliseconds, action);
AssertRetryOptionsEntry("retryTimeoutInMilliseconds", retryTimeoutInMilliseconds, action);
}
private static void AssertRetryOptionsEntry<T>(
string key,
T? expectedValue,
CallActivityWithRetryAction actualAction) where T : struct
{
if (expectedValue.HasValue)
{
Assert.Equal(expectedValue.Value, actualAction.RetryOptions[key]);
}
else
{
Assert.False(actualAction.RetryOptions.ContainsKey(key));
}
}
private static TimeSpan? CreateTimeSpanOrNull(double? milliseconds)
{
return milliseconds.HasValue ? TimeSpan.FromMilliseconds(milliseconds.Value) : (TimeSpan?)null;
}
}
}

Просмотреть файл

@ -8,7 +8,10 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using Microsoft.Azure.Functions.PowerShellWorker.Durable;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks;
using Xunit;
@ -191,6 +194,116 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
});
}
[Theory]
[InlineData(false, 1, 1)]
[InlineData(false, 5, 5)]
[InlineData(true, 1, 1)]
[InlineData(true, 5, 1)]
public void StopAndInitiateDurableTaskOrReplay_AddsActivityBatch_UnlessNoWait(bool noWait, int numberOfActions, int expectedNumberOfBatches)
{
var orchestrationContext = new OrchestrationContext { History = new HistoryEvent[0] };
var durableTaskHandler = new DurableTaskHandler();
for (var i = 0; i < numberOfActions; ++i)
{
durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event
durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
new ActivityInvocationTask("Function", "Input"),
orchestrationContext,
noWait: noWait,
output: _ => {},
onFailure: _ => {}
);
}
var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new AutoResetEvent(initialState: true));
Assert.Equal(expectedNumberOfBatches, actions.Count);
}
[Theory]
[InlineData(false, false, 1)]
[InlineData(true, false, 2)]
[InlineData(false, true, 2)]
[InlineData(true, true, 2)]
public void WaitAll_And_WaitAny_StartNewActivityBatch(bool invokeWaitAll, bool invokeWaitAny, int expectedNumberOfBatches)
{
var orchestrationContext = new OrchestrationContext { History = new HistoryEvent[0] };
var durableTaskHandler = new DurableTaskHandler();
durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
new ActivityInvocationTask("Function", "Input"),
orchestrationContext,
noWait: true,
output: _ => {},
onFailure: _ => {}
);
if (invokeWaitAll)
{
durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event
durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {});
}
if (invokeWaitAny)
{
durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event
durableTaskHandler.WaitAny(new DurableTask[0], orchestrationContext, output: _ => {});
}
durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
new ActivityInvocationTask("Function", "Input"),
orchestrationContext,
noWait: true,
output: _ => {},
onFailure: _ => {}
);
var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new AutoResetEvent(initialState: true));
Assert.Equal(expectedNumberOfBatches, actions.Count);
}
[Fact]
public void StopAndInitiateDurableTaskOrReplay_RetriesOnFailure()
{
const string FunctionName = "Function";
const string FunctionInput = "Input";
var history = new[]
{
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, Name = FunctionName, EventId = 1 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, TaskScheduledId = 1 },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, TimerId = 2 },
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, Name = FunctionName, EventId = 3 },
new HistoryEvent { EventType = HistoryEventType.TaskCompleted, Result = "\"OK\"", TaskScheduledId = 3 },
};
var orchestrationContext = new OrchestrationContext { History = history };
var durableTaskHandler = new DurableTaskHandler();
var retryOptions = new RetryOptions(TimeSpan.FromSeconds(1), 2, null, null, null);
object result = null;
durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
new ActivityInvocationTask(FunctionName, FunctionInput, retryOptions),
orchestrationContext,
noWait: false,
output: output => { result = output; },
onFailure: _ => { Assert.True(false, "Unexpected failure"); },
retryOptions: retryOptions
);
Assert.Equal("OK", result);
var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new AutoResetEvent(initialState: true));
var action = (CallActivityWithRetryAction)actions.Single().Single();
Assert.Equal(FunctionName, action.FunctionName);
Assert.Equal(FunctionInput, action.Input);
Assert.NotEmpty(action.RetryOptions);
}
private HistoryEvent[] CreateActivityHistory(string name, bool scheduled, bool completed, string output) {
return CreateActivityHistory(name: name, scheduled: scheduled, restartTime: _restartTime, completed: completed, output: output, orchestratorStartedIsProcessed: false);
}

Просмотреть файл

@ -86,7 +86,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
});
}
public static List<OrchestrationAction> GetCollectedActions(OrchestrationContext orchestrationContext)
public static List<List<OrchestrationAction>> GetCollectedActions(OrchestrationContext orchestrationContext)
{
var (_, actions) = orchestrationContext.OrchestrationActionCollector.WaitForActions(new ManualResetEvent(true));
return actions;

Просмотреть файл

@ -227,7 +227,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
private void VerifyCreateDurableTimerActionAdded(OrchestrationContext context, DateTime fireAt)
{
var actions = DurableTestUtilities.GetCollectedActions(context);
var action = (CreateDurableTimerAction)actions.Last();
var action = (CreateDurableTimerAction)actions.Last().Last();
Assert.Equal(action.FireAt, fireAt);
}
}

Просмотреть файл

@ -0,0 +1,149 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Microsoft.Azure.Functions.PowerShellWorker.Durable;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
using Xunit;
public class OrchestrationActionCollectorTests
{
private readonly OrchestrationAction[] _expectedActions =
Enumerable.Range(0, 14).Select(i => new CallActivityAction($"Name{i}", $"Input{i}")).ToArray();
[Fact]
public void IndicatesShouldNotStopOnSignalledCompletionWaitHandle()
{
var collector = new OrchestrationActionCollector();
var (shouldStop, _) = collector.WaitForActions(new AutoResetEvent(initialState: true));
Assert.False(shouldStop);
}
[Fact]
public void IndicatesShouldStopOnStopEvent()
{
var collector = new OrchestrationActionCollector();
collector.Stop();
var (shouldStop, _) = collector.WaitForActions(new AutoResetEvent(initialState: false));
Assert.True(shouldStop);
}
[Fact]
public void ReturnsNoActionsWhenNoneAdded()
{
var collector = new OrchestrationActionCollector();
var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true));
Assert.Empty(actions);
}
[Fact]
public void ReturnsSingleAction()
{
var collector = new OrchestrationActionCollector();
collector.Add(_expectedActions[0]);
var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true));
Assert.Single(actions);
Assert.Single(actions.Single());
Assert.Same(_expectedActions[0], actions.Single().Single());
}
[Fact]
public void ReturnsSequentialActions()
{
var collector = new OrchestrationActionCollector();
collector.Add(_expectedActions[0]);
collector.NextBatch();
collector.Add(_expectedActions[1]);
var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true));
var expected = new[] {
new[] { _expectedActions[0] },
new[] { _expectedActions[1] }
};
AssertExpectedActions(expected, actions);
}
[Fact]
public void ReturnsParallelActions()
{
var collector = new OrchestrationActionCollector();
collector.Add(_expectedActions[0]);
collector.Add(_expectedActions[1]);
var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true));
var expected = new[] {
new[] { _expectedActions[0], _expectedActions[1] }
};
AssertExpectedActions(expected, actions);
}
[Fact]
public void ReturnsMixOfSequentialAndParallelActions()
{
var collector = new OrchestrationActionCollector();
collector.Add(_expectedActions[0]);
collector.NextBatch();
collector.Add(_expectedActions[1]);
collector.Add(_expectedActions[2]);
collector.NextBatch();
collector.Add(_expectedActions[3]);
collector.NextBatch();
collector.Add(_expectedActions[4]);
collector.Add(_expectedActions[5]);
collector.Add(_expectedActions[6]);
collector.NextBatch();
collector.Add(_expectedActions[7]);
collector.NextBatch();
collector.Add(_expectedActions[8]);
collector.NextBatch();
collector.Add(_expectedActions[9]);
collector.NextBatch();
collector.Add(_expectedActions[10]);
collector.Add(_expectedActions[11]);
collector.Add(_expectedActions[12]);
collector.Add(_expectedActions[13]);
var (_, actions) = collector.WaitForActions(new AutoResetEvent(initialState: true));
var expected = new[] {
new[] { _expectedActions[0] },
new[] { _expectedActions[1], _expectedActions[2] },
new[] { _expectedActions[3] },
new[] { _expectedActions[4], _expectedActions[5], _expectedActions[6] },
new[] { _expectedActions[7] },
new[] { _expectedActions[8] },
new[] { _expectedActions[9] },
new[] { _expectedActions[10], _expectedActions[11], _expectedActions[12], _expectedActions[13] }
};
AssertExpectedActions(expected, actions);
}
private void AssertExpectedActions(OrchestrationAction[][] expected, List<List<OrchestrationAction>> actual)
{
Assert.Equal(expected.Count(), actual.Count());
for (var batchIndex = 0; batchIndex < expected.Count(); ++batchIndex)
{
for (var actionIndex = 0; actionIndex < expected[batchIndex].Count(); ++actionIndex)
{
Assert.Same(expected[batchIndex][actionIndex], actual[batchIndex][actionIndex]);
}
}
}
}
}

Просмотреть файл

@ -20,7 +20,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
[Fact]
public void MessageContainsInnerExceptionMessage()
{
var e = new OrchestrationFailureException(new List<OrchestrationAction>(), _innerException);
var e = new OrchestrationFailureException(new List<List<OrchestrationAction>>(), _innerException);
var labelPos = e.Message.IndexOf(OrchestrationFailureException.OutOfProcDataLabel);
Assert.Equal(_innerException.Message, e.Message.Substring(0, labelPos));
@ -29,11 +29,13 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
[Fact]
public void MessageContainsSerializedOrchestrationMessage()
{
var actions = new List<OrchestrationAction> {
new CallActivityAction("activity1", "input1"),
new CallActivityAction("activity2", "input2")
};
var actions = new List<List<OrchestrationAction>> {
new List<OrchestrationAction> {
new CallActivityAction("activity1", "input1"),
new CallActivityAction("activity2", "input2")
}
};
var e = new OrchestrationFailureException(actions, _innerException);
var labelPos = e.Message.IndexOf(OrchestrationFailureException.OutOfProcDataLabel);
@ -44,10 +46,9 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
Assert.Null(orchestrationMessage.Output.Value);
Assert.Equal(_innerException.Message, (string)orchestrationMessage.Error);
var deserializedActions = (IEnumerable<dynamic>)((IEnumerable<dynamic>)orchestrationMessage.Actions).Single();
Assert.Equal(actions.Count(), deserializedActions.Count());
for (var i = 0; i < actions.Count(); i++)
for (var i = 0; i < actions.Single().Count(); i++)
{
AssertEqualAction((OrchestrationAction)actions[i], deserializedActions.ElementAt(i));
AssertEqualAction((OrchestrationAction)actions.Single()[i], deserializedActions.ElementAt(i));
}
}

Просмотреть файл

@ -96,9 +96,16 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
Assert.Single(result);
var returnOrchestrationMessage = (OrchestrationMessage)result["$return"];
Assert.Single(returnOrchestrationMessage.Actions);
Assert.Equal(actions.Length, returnOrchestrationMessage.Actions.Single().Count);
Assert.Equal(actions, returnOrchestrationMessage.Actions.Single());
if (actionCount == 0)
{
Assert.Empty(returnOrchestrationMessage.Actions);
}
else
{
Assert.Single(returnOrchestrationMessage.Actions);
Assert.Equal(actions.Length, returnOrchestrationMessage.Actions.Single().Count);
Assert.Equal(actions, returnOrchestrationMessage.Actions.Single());
}
}
[Fact]

Просмотреть файл

@ -0,0 +1,239 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.Azure.Functions.PowerShellWorker.Test.Durable
{
using System;
using System.Linq;
using Microsoft.Azure.Functions.PowerShellWorker.Durable;
using Xunit;
public class RetryProcessorTests
{
[Fact]
public void ContinuesAfterFirstFailure()
{
var history = new[]
{
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" }
};
AssertRetryProcessorReportsContinue(history, firstEventIndex: 0, maxNumberOfAttempts: 2);
AssertNoEventsProcessed(history);
}
[Fact]
public void ContinuesAfterSecondFailure()
{
var history = new[]
{
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 },
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 3, Reason = "Failure 2" },
};
AssertRetryProcessorReportsContinue(history, firstEventIndex: 0, maxNumberOfAttempts: 2);
AssertEventsProcessed(history, 0, 1, 2, 3); // Don't expect the last Scheduled/Failed pair to be processed
}
[Fact]
public void FailsOnMaxNumberOfAttempts()
{
var history = new[]
{
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 },
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 3, Reason = "Failure 2" },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 4 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 4 },
};
AssertRetryProcessorReportsFailure(history, firstEventIndex: 0, maxNumberOfAttempts: 2, "Failure 2");
AssertAllEventsProcessed(history);
}
[Fact]
public void SucceedsOnRetry()
{
var history = new[]
{
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 },
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 },
new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 3, Result = "Success" },
};
AssertRetryProcessorReportsSuccess(history, firstEventIndex: 0, maxNumberOfAttempts: 2, "Success");
AssertAllEventsProcessed(history);
}
[Fact]
public void IgnoresPreviousHistory()
{
var history = new[]
{
// From a previous activity invocation
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "Failure 1" },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 2 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 2 },
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 },
new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 3, Result = "Success 1" },
// The current invocation starts here:
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 4 },
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 4, Reason = "Failure 2" },
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 5 },
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 5 },
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 6 },
new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 6, Result = "Success 2" },
};
AssertRetryProcessorReportsSuccess(history, firstEventIndex: 6, maxNumberOfAttempts: 2, "Success 2");
AssertEventsProcessed(history, 6, 7, 8, 9, 10, 11);
}
// This history emulates the situation when multiple activity invocations are scheduled at the same time
// ("fan-out" scenario):
// - Activity A failed on the first attempt and succeeded on the second attempt.
// - Activity B failed after two attempts.
// - Activity C failed on the first attempt and has not been retried yet.
private static HistoryEvent[] CreateInterleavingHistory()
{
return new[]
{
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 1 }, // 0: A
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 2 }, // 1: B
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 3 }, // 2: C
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 1, Reason = "A1" }, // 3: A
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 4 }, // 4: A
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 2, Reason = "B1" }, // 5: B
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 5 }, // 6: B
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 4 }, // 7: A
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 6 }, // 8: A
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 5 }, // 9: B
new HistoryEvent { EventType = HistoryEventType.TaskScheduled, EventId = 7 }, // 10: B
new HistoryEvent { EventType = HistoryEventType.TaskCompleted, EventId = -1, TaskScheduledId = 6, Result = "OK" }, // 11: A
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 7, Reason = "B2" }, // 12: B
new HistoryEvent { EventType = HistoryEventType.TimerCreated, EventId = 8 }, // 13: B
new HistoryEvent { EventType = HistoryEventType.TimerFired, EventId = -1, TimerId = 8 }, // 14: B
new HistoryEvent { EventType = HistoryEventType.TaskFailed, EventId = -1, TaskScheduledId = 3, Reason = "C1" }, // 15: C
};
}
[Fact]
public void InterleavingRetries_ReportsSuccess()
{
var history = CreateInterleavingHistory();
// Activity A
AssertRetryProcessorReportsSuccess(history, firstEventIndex: 0, maxNumberOfAttempts: 2, "OK");
AssertEventsProcessed(history, 0, 3, 4, 7, 8, 11);
}
[Fact]
public void InterleavingRetries_ReportsFailure()
{
var history = CreateInterleavingHistory();
// Activity B
AssertRetryProcessorReportsFailure(history, firstEventIndex: 1, maxNumberOfAttempts: 2, "B2");
AssertEventsProcessed(history, 1, 5, 6, 9, 10, 12, 13, 14);
}
[Fact]
public void InterleavingRetries_ReportsContinue()
{
var history = CreateInterleavingHistory();
// Activity C
AssertRetryProcessorReportsContinue(history, firstEventIndex: 2, maxNumberOfAttempts: 2);
AssertNoEventsProcessed(history);
}
private static void AssertRetryProcessorReportsContinue(HistoryEvent[] history, int firstEventIndex, int maxNumberOfAttempts)
{
var shouldRetry = RetryProcessor.Process(
history,
history[firstEventIndex],
maxNumberOfAttempts,
onSuccess: result => { Assert.True(false, $"Unexpected output: {result}"); },
onFinalFailure: reason => { Assert.True(false, $"Unexpected failure: {reason}"); });
Assert.True(shouldRetry);
}
private static void AssertRetryProcessorReportsFailure(HistoryEvent[] history, int firstEventIndex, int maxNumberOfAttempts, string expectedFailureReason)
{
string actualFailureReason = null;
var shouldRetry = RetryProcessor.Process(
history,
history[firstEventIndex],
maxNumberOfAttempts,
onSuccess: result => { Assert.True(false, $"Unexpected output: {result}"); },
onFinalFailure: reason =>
{
Assert.Null(actualFailureReason);
actualFailureReason = reason;
});
Assert.False(shouldRetry);
Assert.Equal(expectedFailureReason, actualFailureReason);
}
private static void AssertRetryProcessorReportsSuccess(HistoryEvent[] history, int firstEventIndex, int maxNumberOfAttempts, string expectedOutput)
{
string actualOutput = null;
var shouldRetry = RetryProcessor.Process(
history,
history[firstEventIndex],
maxNumberOfAttempts,
onSuccess: result =>
{
Assert.Null(actualOutput);
actualOutput = result;
},
onFinalFailure: reason => { Assert.True(false, $"Unexpected failure: {reason}"); });
Assert.False(shouldRetry);
Assert.Equal(expectedOutput, actualOutput);
}
private static void AssertEventsProcessed(HistoryEvent[] history, params int[] expectedProcessedIndexes)
{
for (var i = 0; i < history.Length; ++i)
{
var expectedProcessed = expectedProcessedIndexes.Contains(i);
Assert.Equal(expectedProcessed, history[i].IsProcessed);
}
}
private static void AssertAllEventsProcessed(HistoryEvent[] history)
{
Assert.True(history.All(e => e.IsProcessed));
}
private static void AssertNoEventsProcessed(HistoryEvent[] history)
{
AssertEventsProcessed(history); // Note: passing nothing to expectedProcessedIndexes
}
}
}