update performance tests and add new scripts (#23)

* update hellocities benchmarks.

* improve tracing for work items.

* support distributed launching of orchestrations in benchmarks; add distributed ping

* fix taskhub creation race

* prefix logs with hub name.

* better formatting of ping results.

* document the http triggers for orchestration throughput testing.

* update test scripts

* fix scripts
This commit is contained in:
Sebastian Burckhardt 2021-02-19 07:06:47 -08:00 коммит произвёл GitHub
Родитель b244e26021
Коммит bf31804b23
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
28 изменённых файлов: 751 добавлений и 273 удалений

Просмотреть файл

@ -17,7 +17,7 @@ if (-not (Test-Path -Path ./bin/Release/netcoreapp3.1/bin)) {
# look up the eventhubs namespace connection string
$eventHubsConnectionString = (az eventhubs namespace authorization-rule keys list --resource-group $groupName --namespace-name $namespaceName --name RootManageSharedAccessKey | ConvertFrom-Json).primaryConnectionString
if ((az functionapp show --resource-group $groupName --name $functionAppName | ConvertFrom-Json).name -ne $functionAppName)
if (-not ((az functionapp list -g $groupName --query "[].name"| ConvertFrom-Json) -contains $functionAppName))
{
Write-Host "Creating Function App..."
az functionapp plan create --resource-group $groupName --name $functionAppName --location $location --sku $planSku
@ -31,8 +31,8 @@ else
}
Write-Host "Configuring Scale..."
az functionapp plan update -g $groupName -n $functionAppName --max-burst $numNodes --number-of-workers $numNodes --min-instances $numNodes
az resource update -n $functionAppName/config/web -g $groupName --set properties.minimumElasticInstanceCount=$numNodes --resource-type Microsoft.Web/sites
az functionapp plan update -g $groupName -n $functionAppName --max-burst $numNodes --number-of-workers $numNodes --min-instances $numNodes
az resource update -n $functionAppName/config/web -g $groupName --set properties.minimumElasticInstanceCount=$numNodes --resource-type Microsoft.Web/sites
Write-Host "Publishing Code to Function App..."
func azure functionapp publish $functionAppName

Просмотреть файл

@ -26,7 +26,7 @@ namespace DurableTask.Netherite.AzureFunctions
readonly Timer timer;
#pragma warning restore IDE0052
public BlobLogger(string storageConnectionString, string workerId)
public BlobLogger(string storageConnectionString, string hubName, string workerId)
{
this.starttime = DateTime.UtcNow;
@ -34,7 +34,7 @@ namespace DurableTask.Netherite.AzureFunctions
CloudBlobClient client = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = client.GetContainerReference("logs");
container.CreateIfNotExists();
this.blob = container.GetAppendBlobReference($"{workerId}.{this.starttime:o}.log");
this.blob = container.GetAppendBlobReference($"{hubName}.{workerId}.{this.starttime:o}.log");
this.blob.CreateOrReplace();
this.memoryStream = new MemoryStream();

Просмотреть файл

@ -102,7 +102,7 @@ namespace DurableTask.Netherite.AzureFunctions
if (this.TraceToBlob && BlobLogger == null)
{
BlobLogger = new BlobLogger(settings.ResolvedStorageConnectionString, settings.WorkerId);
BlobLogger = new BlobLogger(settings.ResolvedStorageConnectionString, settings.HubName, settings.WorkerId);
}
var key = new DurableClientAttribute()

Просмотреть файл

@ -19,8 +19,8 @@ namespace DurableTask.Netherite
/// <summary>
/// Creates this taskhub in storage.
/// </summary>
/// <returns>after the taskhub has been created in storage.</returns>
Task CreateAsync();
/// <returns>true if the taskhub was actually created, false if it already existed.</returns>
Task<bool> CreateIfNotExistsAsync();
/// <summary>
/// Deletes this taskhub and all of its associated data in storage.

Просмотреть файл

@ -186,12 +186,12 @@ namespace DurableTask.Netherite
if (recreateInstanceStore)
{
await this.taskHub.DeleteAsync().ConfigureAwait(false);
await this.taskHub.CreateAsync().ConfigureAwait(false);
await this.taskHub.CreateIfNotExistsAsync().ConfigureAwait(false);
}
}
else
{
await this.taskHub.CreateAsync().ConfigureAwait(false);
await this.taskHub.CreateIfNotExistsAsync().ConfigureAwait(false);
}
if (!(this.LoadMonitorService is null))

Просмотреть файл

@ -275,7 +275,7 @@ namespace DurableTask.Netherite
this.Partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration,
evt.WorkItemId, evt.InstanceId,
session != null ? this.GetSessionId(session) : null);
session != null ? this.GetSessionPosition(session) : null);
return;
};

Просмотреть файл

@ -156,10 +156,10 @@ namespace DurableTask.Netherite
}
[Event(226, Level = EventLevel.Warning, Version = 1)]
public void WorkItemDiscarded(string Account, string TaskHub, int PartitionId, string WorkItemType, string WorkItemId, string InstanceId, string SessionId, string ExtensionVersion)
public void WorkItemDiscarded(string Account, string TaskHub, int PartitionId, string WorkItemType, string WorkItemId, string InstanceId, string ReplacedBy, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(226, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, SessionId, ExtensionVersion);
this.WriteEvent(226, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, ReplacedBy, ExtensionVersion);
}
[Event(227, Level = EventLevel.Verbose, Version = 1)]

Просмотреть файл

@ -90,7 +90,7 @@ namespace DurableTask.Netherite
}
}
public void TraceWorkItemDiscarded(uint partitionId, WorkItemType workItemType, string workItemId, string instanceId, string sessionId)
public void TraceWorkItemDiscarded(uint partitionId, WorkItemType workItemType, string workItemId, string instanceId, string replacedBy)
{
if (this.logLevelLimit <= LogLevel.Warning)
{
@ -99,11 +99,11 @@ namespace DurableTask.Netherite
(long commitLogPosition, string eventId) = EventTraceContext.Current;
string prefix = commitLogPosition > 0 ? $".{commitLogPosition:D10} " : "";
this.logger.LogWarning("Part{partition:D2}{prefix} discarded {workItemType}WorkItem {workItemId} because session was replaced; instanceId={instanceId} sessionId={sessionId}",
partitionId, prefix, workItemType, workItemId, instanceId, sessionId);
this.logger.LogWarning("Part{partition:D2}{prefix} discarded {workItemType}WorkItem {workItemId} because session was replaced; instanceId={instanceId} replacedBy={replacedBy}",
partitionId, prefix, workItemType, workItemId, instanceId, replacedBy);
}
this.etw?.WorkItemDiscarded(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, sessionId ?? "", TraceUtils.ExtensionVersion);
this.etw?.WorkItemDiscarded(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, replacedBy ?? "", TraceUtils.ExtensionVersion);
}
}
@ -123,7 +123,7 @@ namespace DurableTask.Netherite
public void TraceTaskMessageReceived(uint partitionId, TaskMessage message, string workItemId, string queuePosition)
{
if (this.logLevelLimit <= LogLevel.Debug)
if (this.logLevelLimit <= LogLevel.Trace)
{
(long commitLogPosition, string eventId) = EventTraceContext.Current;
string messageId = FormatMessageId(message, workItemId);
@ -141,7 +141,7 @@ namespace DurableTask.Netherite
public void TraceTaskMessageSent(uint partitionId, TaskMessage message, string workItemId, string sentEventId)
{
if (this.logLevelLimit <= LogLevel.Debug)
if (this.logLevelLimit <= LogLevel.Trace)
{
string messageId = FormatMessageId(message, workItemId);

Просмотреть файл

@ -326,7 +326,7 @@ namespace DurableTask.Netherite.EventHubs
}
else
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} ignored packet #{seqno} for different taskhub", this.eventHubName, this.eventHubPartition, seqno);
this.traceHelper.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition} ignored packet #{seqno} for different taskhub", this.eventHubName, this.eventHubPartition, seqno);
continue;
}

Просмотреть файл

@ -86,15 +86,10 @@ namespace DurableTask.Netherite.EventHubs
return (parameters != null && parameters.TaskhubName == this.settings.HubName);
}
async Task ITaskHub.CreateAsync()
async Task<bool> ITaskHub.CreateIfNotExistsAsync()
{
await this.cloudBlobContainer.CreateIfNotExistsAsync().ConfigureAwait(false);
if (await this.TryLoadExistingTaskhubAsync().ConfigureAwait(false) != null)
{
throw new InvalidOperationException("Cannot create TaskHub: TaskHub already exists");
}
// ensure the task hubs exist, creating them if necessary
var tasks = new List<Task>();
tasks.Add(EventHubsUtil.EnsureEventHubExistsAsync(this.settings.ResolvedTransportConnectionString, PartitionHubs[0], this.settings.PartitionCount));
@ -119,14 +114,27 @@ namespace DurableTask.Netherite.EventHubs
StartPositions = startPositions
};
// save the taskhub parameters in a blob
var jsonText = JsonConvert.SerializeObject(
taskHubParameters,
Newtonsoft.Json.Formatting.Indented,
new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None });
await this.taskhubParameters.UploadTextAsync(jsonText).ConfigureAwait(false);
// try to create the taskhub blob
try
{
var jsonText = JsonConvert.SerializeObject(
taskHubParameters,
Newtonsoft.Json.Formatting.Indented,
new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None });
var noOverwrite = AccessCondition.GenerateIfNoneMatchCondition("*");
await this.taskhubParameters.UploadTextAsync(jsonText, null, noOverwrite, null, null).ConfigureAwait(false);
}
catch(StorageException e) when (BlobUtils.BlobAlreadyExists(e))
{
// taskhub already exists, possibly because a different node created it faster
return false;
}
// we successfully created the taskhub
return true;
}
async Task ITaskHub.DeleteAsync()
{
if (await this.taskhubParameters.ExistsAsync().ConfigureAwait(false))

Просмотреть файл

@ -362,7 +362,7 @@ namespace DurableTask.Netherite.EventHubs
}
else
{
this.host.logger.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) ignored packet #{seqno} for different taskhub", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno);
this.host.logger.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) ignored packet #{seqno} for different taskhub", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno);
continue;
}

Просмотреть файл

@ -38,12 +38,13 @@ namespace DurableTask.Netherite.Emulated
this.logger = logger;
}
async Task ITaskHub.CreateAsync()
async Task<bool> ITaskHub.CreateIfNotExistsAsync()
{
await Task.Delay(simulatedDelay).ConfigureAwait(false);
this.clientQueues = new Dictionary<Guid, IMemoryQueue<ClientEvent>>();
this.partitionQueues = new IMemoryQueue<PartitionEvent>[this.numberPartitions];
this.partitions = new TransportAbstraction.IPartition[this.numberPartitions];
return true;
}
Task ITaskHub.DeleteAsync()

Просмотреть файл

@ -126,5 +126,10 @@ namespace DurableTask.Netherite
var information = e.RequestInformation.ExtendedErrorInformation;
return (e.RequestInformation.HttpStatusCode == 404) && (information.ErrorCode.Equals(BlobErrorCodeStrings.BlobNotFound));
}
public static bool BlobAlreadyExists(StorageException e)
{
return (e.RequestInformation.HttpStatusCode == 409);
}
}
}

Просмотреть файл

@ -120,7 +120,7 @@ namespace PerformanceTests
endtime = state.LastUpdatedTime;
}
response = new OkObjectResult(new Response {
Result = await stringContent.ReadAsStringAsync(),
Result = (string)JToken.Parse(await stringContent.ReadAsStringAsync()),
Time = endtime,
Duration = (endtime - starttime).TotalMilliseconds
});

Просмотреть файл

@ -11,9 +11,12 @@ namespace PerformanceTests
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Threading;
using System.Collections.Generic;
using System.Net.Http;
using Newtonsoft.Json.Linq;
using System.Linq;
using Newtonsoft.Json;
/// <summary>
/// A very simple Http Trigger that is useful for testing whether the orchestration service has started correctly, and what its
@ -22,14 +25,61 @@ namespace PerformanceTests
public static class Ping
{
[FunctionName(nameof(Ping))]
public static Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
log.LogInformation("C# HTTP trigger function processed a request.");
var is64bit = Environment.Is64BitProcess;
return Task.FromResult<IActionResult>(new OkObjectResult($"Hello from {client} ({(is64bit ? "x64":"x32")})\n"));
}
if (int.TryParse(requestBody, out int numPartitionsToPing))
{
async Task<string> Ping(int partition)
{
string instanceId = $"ping!{partition:D2}";
await client.StartNewAsync(nameof(Pingee), instanceId);
var response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, instanceId, TimeSpan.FromSeconds(30));
if (response is ObjectResult objectResult
&& objectResult.Value is HttpResponseMessage responseMessage
&& responseMessage.StatusCode == System.Net.HttpStatusCode.OK
&& responseMessage.Content is StringContent stringContent)
{
return (string) JToken.Parse(await stringContent.ReadAsStringAsync());
}
else
{
return "timeout";
}
}
var tasks = Enumerable.Range(0, numPartitionsToPing).Select(partition => Ping(partition)).ToList();
await Task.WhenAll(tasks);
JObject result = new JObject();
var distinct = new HashSet<string>();
for (int i = 0; i < tasks.Count; i++)
{
result.Add(i.ToString(), tasks[i].Result);
distinct.Add(tasks[i].Result);
}
result.Add("distinct", distinct.Count);
return new OkObjectResult(result.ToString());
}
else
{
return new OkObjectResult($"Hello from {client} ({(is64bit ? "x64" : "x32")})\n");
}
}
[FunctionName(nameof(Pingee))]
public static Task<string> Pingee([OrchestrationTrigger] IDurableOrchestrationContext context)
{
return Task.FromResult(Environment.MachineName);
}
}
}

Просмотреть файл

@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
static class Common
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> items, int maxConcurrency, bool useThreadpool, Func<T, Task> action)
{
List<Task> tasks;
if (items is ICollection<T> itemCollection)
{
tasks = new List<Task>(itemCollection.Count);
}
else
{
tasks = new List<Task>();
}
using var semaphore = new SemaphoreSlim(maxConcurrency);
foreach (T item in items)
{
tasks.Add(InvokeThrottledAction(item, action, semaphore, useThreadpool));
}
await Task.WhenAll(tasks);
}
static async Task InvokeThrottledAction<T>(T item, Func<T, Task> action, SemaphoreSlim semaphore, bool useThreadPool)
{
await semaphore.WaitAsync();
try
{
if (useThreadPool)
{
await Task.Run(() => action(item));
}
else
{
await action(item);
}
}
finally
{
semaphore.Release();
}
}
}
}

Просмотреть файл

@ -1,4 +1,7 @@
namespace PerformanceTests.Orchestrations
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.Orchestrations
{
using System;
using System.Collections.Generic;

Просмотреть файл

@ -1,192 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
/// <summary>
/// A simple microbenchmark orchestration that calls three trivial activities in a sequence.
/// </summary>
public static class HelloCities
{
[FunctionName(nameof(HelloCities))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "hellocities")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int? numberOrchestrations = string.IsNullOrEmpty(requestBody) || requestBody == "null" ? null : (int?) JsonConvert.DeserializeObject<int>(requestBody);
TimeSpan timeout = TimeSpan.FromSeconds(200);
if (!numberOrchestrations.HasValue)
{
// we are running a single orchestration.
string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence));
var response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, timeout);
return response;
}
else
{
// call several orchestrations in a loop and wait for all of them to complete
var testname = Util.MakeTestName(req);
try
{
log.LogWarning($"Starting {testname} {numberOrchestrations.Value}...");
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
var tasks = new List<Task<bool>>();
async Task<bool> RunOrchestration(int iteration)
{
var startTime = DateTime.UtcNow;
var orchestrationInstanceId = $"Orch{iteration}";
log.LogInformation($"{testname} starting {orchestrationInstanceId}");
await client.StartNewAsync(nameof(HelloSequence), orchestrationInstanceId);
await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, timeout);
if (DateTime.UtcNow < startTime + timeout)
{
log.LogInformation($"{testname} completed {orchestrationInstanceId}");
return true;
}
else
{
log.LogInformation($"{testname} timeout {orchestrationInstanceId}");
return false;
}
}
for (int i = 0; i < numberOrchestrations; i++)
{
tasks.Add(RunOrchestration(i));
}
await Task.WhenAll(tasks);
stopwatch.Stop();
int timeouts = tasks.Count(t => !t.Result);
double elapsedSeconds = elapsedSeconds = stopwatch.Elapsed.TotalSeconds;
log.LogWarning($"Completed {testname} with {timeouts} timeouts in {elapsedSeconds}s.");
object resultObject = timeouts > 0 ? (object)new { testname, timeouts } : new { testname, elapsedSeconds };
string resultString = $"{JsonConvert.SerializeObject(resultObject, Formatting.None)}\n";
return new OkObjectResult(resultString);
}
catch (Exception e)
{
return new ObjectResult(
new
{
testname,
error = e.ToString(),
});
}
}
}
[FunctionName(nameof(CountCities))]
public static async Task<IActionResult> CountCities(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "countcities")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
var queryCondition = new OrchestrationStatusQueryCondition()
{
InstanceIdPrefix = "Orch",
};
int completed = 0;
int pending = 0;
int running = 0;
int other = 0;
do
{
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(queryCondition, CancellationToken.None);
queryCondition.ContinuationToken = result.ContinuationToken;
foreach (var status in result.DurableOrchestrationState)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Pending)
{
pending++;
}
else if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
{
running++;
}
else if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
completed++;
}
else
{
other++;
}
}
} while (queryCondition.ContinuationToken != null);
return new OkObjectResult($"{pending+running+completed+other} orchestration instances ({pending} pending, {running} running, {completed} completed, {other} other)\n");
}
[FunctionName(nameof(PurgeCities))]
public static async Task<IActionResult> PurgeCities(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "purgecities")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
var queryCondition = new OrchestrationStatusQueryCondition()
{
InstanceIdPrefix = "Orch",
};
PurgeHistoryResult result = await client.PurgeInstanceHistoryAsync(default,default,default);
return new OkObjectResult($"purged {result.InstancesDeleted} orchestration instances.\n");
}
[FunctionName(nameof(HelloSequence))]
public static async Task<List<string>> HelloSequence([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var result = new List<string>
{
await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
await context.CallActivityAsync<string>(nameof(SayHello), "London")
};
return result;
}
[FunctionName(nameof(SayHello))]
public static Task<string> SayHello([ActivityTrigger] IDurableActivityContext context)
{
return Task.FromResult(context.GetInput<string>());
}
}
}

Просмотреть файл

@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
/// <summary>
/// A simple microbenchmark orchestration that some trivial activities in a sequence.
/// </summary>
public static class HelloCitiesSequence
{
[FunctionName(nameof(HelloCities))]
public static async Task<IActionResult> HelloCities(
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
// start the orchestration
string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence));
// wait for it to complete and return the result
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200));
}
[FunctionName(nameof(HelloCities5))]
public static async Task<IActionResult> HelloCities5(
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
// start the orchestration
string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence5));
// wait for it to complete and return the result
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200));
}
[FunctionName(nameof(HelloSequence))]
public static async Task<List<string>> HelloSequence([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var result = new List<string>
{
await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
await context.CallActivityAsync<string>(nameof(SayHello), "London")
};
return result;
}
[FunctionName(nameof(HelloSequence5))]
public static async Task<List<string>> HelloSequence5([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var outputs = new List<string>
{
await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
await context.CallActivityAsync<string>(nameof(SayHello), "London"),
await context.CallActivityAsync<string>(nameof(SayHello), "Amsterdam"),
await context.CallActivityAsync<string>(nameof(SayHello), "Mumbai")
};
return outputs;
}
[FunctionName(nameof(SayHello))]
public static string SayHello([ActivityTrigger] string name) => $"Hello {name}!";
}
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
public class LauncherEntity
{
public void Launch((string orchestrationName, int numberOrchestrations, int offset) input)
{
// start all the orchestrations
for (int iteration = 0; iteration < input.numberOrchestrations; iteration++)
{
var orchestrationInstanceId = ManyOrchestrations.InstanceId(iteration + input.offset);
Entity.Current.StartNewOrchestration(input.orchestrationName, null, orchestrationInstanceId);
};
}
[FunctionName(nameof(LauncherEntity))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<LauncherEntity>();
}
}

Просмотреть файл

@ -0,0 +1,387 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
using System.Net.Http;
using DurableTask.Core.Stats;
/// <summary>
/// Http triggers for starting, awaiting, counting, or purging large numbers of orchestration instances
///
/// Example invocations:
/// curl https://.../start -d HelloSequence.1000 launch 1000 HelloSequence instances, from the http trigger
/// curl https://.../start -d HelloSequence.10000.200 launch 10000 HelloSequence instances, in portions of 200, from launcher entities
/// curl https://.../await -d 1000 waits for the 1000 instances to complete
/// curl https://.../count -d 1000 check the status of the 1000 instances and reports the (last completed - first started) time range
/// curl https://.../purge -d 1000 purges the 1000 instances
/// curl https://.../query waits for the 1000 instances to complete
///
/// </summary>
public static class ManyOrchestrations
{
[FunctionName(nameof(Start))]
public static async Task<IActionResult> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int firstdot = requestBody.IndexOf('.');
int seconddot = requestBody.LastIndexOf('.');
string orchestrationName = requestBody.Substring(0, firstdot);
int numberOrchestrations;
int? portionSize;
if (firstdot == seconddot)
{
numberOrchestrations = int.Parse(requestBody.Substring(firstdot + 1));
portionSize = null;
}
else
{
numberOrchestrations = int.Parse(requestBody.Substring(firstdot + 1, seconddot - (firstdot + 1)));
portionSize = int.Parse(requestBody.Substring(seconddot + 1));
}
try
{
if (!portionSize.HasValue)
{
log.LogWarning($"Starting {numberOrchestrations} instances of {orchestrationName} from within HttpTrigger...");
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
// start all the orchestrations
await Enumerable.Range(0, numberOrchestrations).ParallelForEachAsync(200, true, (iteration) =>
{
var orchestrationInstanceId = InstanceId(iteration);
log.LogInformation($"starting {orchestrationInstanceId}");
return client.StartNewAsync(orchestrationName, orchestrationInstanceId);
});
double elapsedSeconds = stopwatch.Elapsed.TotalSeconds;
string message = $"Started all {numberOrchestrations} orchestrations in {elapsedSeconds:F2}s.";
log.LogWarning($"Started all {numberOrchestrations} orchestrations in {elapsedSeconds:F2}s.");
return new OkObjectResult($"{message}\n");
}
else
{
log.LogWarning($"Starting {numberOrchestrations} instances of {orchestrationName} via launcher entities...");
int pos = 0;
int launcher = 0;
var tasks = new List<Task>();
while (pos < numberOrchestrations)
{
int portion = Math.Min(portionSize.Value, (numberOrchestrations - pos));
var entityId = new EntityId(nameof(LauncherEntity), $"launcher{launcher / 100:D6}!{launcher % 100:D2}");
tasks.Add(client.SignalEntityAsync(entityId, nameof(LauncherEntity.Launch), (orchestrationName, portion, pos)));
pos += portion;
launcher++;
}
await Task.WhenAll(tasks);
return new OkObjectResult($"Signaled {launcher} entities for starting {numberOrchestrations} orchestrations.\n");
}
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
[FunctionName(nameof(Await))]
public static async Task<IActionResult> Await(
[HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numberOrchestrations = int.Parse(requestBody);
DateTime deadline = DateTime.UtcNow + TimeSpan.FromMinutes(5);
// wait for the specified number of orchestration instances to complete
try
{
log.LogWarning($"Awaiting {numberOrchestrations} orchestration instances...");
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
var tasks = new List<Task<bool>>();
int completed = 0;
// start all the orchestrations
await Enumerable.Range(0, numberOrchestrations).ParallelForEachAsync(200, true, async (iteration) =>
{
var orchestrationInstanceId = InstanceId(iteration);
IActionResult response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, deadline - DateTime.UtcNow);
if (response is ObjectResult objectResult
&& objectResult.Value is HttpResponseMessage responseMessage
&& responseMessage.StatusCode == System.Net.HttpStatusCode.OK
&& responseMessage.Content is StringContent stringContent)
{
log.LogInformation($"{orchestrationInstanceId} completed");
Interlocked.Increment(ref completed);
}
});
return new OkObjectResult(
completed == numberOrchestrations
? $"all {numberOrchestrations} orchestration instances completed.\n"
: $"only {completed}/{numberOrchestrations} orchestration instances completed.\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
[FunctionName(nameof(Count))]
public static async Task<IActionResult> Count(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "count")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numberOrchestrations = int.Parse(requestBody);
DateTime deadline = DateTime.UtcNow + TimeSpan.FromMinutes(5);
var queryCondition = new OrchestrationStatusQueryCondition()
{
InstanceIdPrefix = "Orch",
};
int completed = 0;
int pending = 0;
int running = 0;
int other = 0;
long earliestStart = long.MaxValue;
long latestUpdate = 0;
bool gotTimeRange = false;
object lockForUpdate = new object();
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
var tasks = new List<Task<bool>>();
log.LogWarning($"Checking the status of {numberOrchestrations} orchestration instances...");
await Enumerable.Range(0, numberOrchestrations).ParallelForEachAsync(200, true, async (iteration) =>
{
var orchestrationInstanceId = InstanceId(iteration);
var status = await client.GetStatusAsync(orchestrationInstanceId);
lock (lockForUpdate)
{
if (status == null)
{
other++;
}
else
{
earliestStart = Math.Min(earliestStart, status.CreatedTime.Ticks);
latestUpdate = Math.Max(latestUpdate, status.LastUpdatedTime.Ticks);
gotTimeRange = true;
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Pending)
{
pending++;
}
else if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
{
running++;
}
else if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
completed++;
}
else
{
other++;
}
}
}
});
double elapsedSeconds = 0;
if (gotTimeRange)
{
elapsedSeconds = (new DateTime(latestUpdate) - new DateTime(earliestStart)).TotalSeconds;
}
var resultObject = new
{
completed,
running,
pending,
other,
elapsedSeconds,
};
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
[FunctionName(nameof(Query))]
public static async Task<IActionResult> Query(
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
var queryCondition = new OrchestrationStatusQueryCondition()
{
InstanceIdPrefix = "Orch",
};
int completed = 0;
int pending = 0;
int running = 0;
int other = 0;
long earliestStart = long.MaxValue;
long latestUpdate = 0;
do
{
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(queryCondition, CancellationToken.None);
queryCondition.ContinuationToken = result.ContinuationToken;
foreach (var status in result.DurableOrchestrationState)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Pending)
{
pending++;
}
else if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
{
running++;
}
else if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
completed++;
}
else
{
other++;
}
earliestStart = Math.Min(earliestStart, status.CreatedTime.Ticks);
latestUpdate = Math.Max(latestUpdate, status.LastUpdatedTime.Ticks);
}
} while (queryCondition.ContinuationToken != null);
double elapsedSeconds = 0;
if (completed + pending + running + other > 0)
{
elapsedSeconds = (new DateTime(latestUpdate) - new DateTime(earliestStart)).TotalSeconds;
}
var resultObject = new
{
completed,
running,
pending,
other,
elapsedSeconds,
};
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
[FunctionName(nameof(Purge))]
public static async Task<IActionResult> Purge(
[HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numberOrchestrations = int.Parse(requestBody);
DateTime deadline = DateTime.UtcNow + TimeSpan.FromMinutes(5);
// wait for the specified number of orchestration instances to complete
try
{
log.LogWarning($"Purging {numberOrchestrations} orchestration instances...");
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
var tasks = new List<Task<bool>>();
int deleted = 0;
// start all the orchestrations
await Enumerable.Range(0, numberOrchestrations).ParallelForEachAsync(200, true, async (iteration) =>
{
var orchestrationInstanceId = InstanceId(iteration);
var response = await client.PurgeInstanceHistoryAsync(orchestrationInstanceId);
Interlocked.Add(ref deleted, response.InstancesDeleted);
});
return new OkObjectResult(
deleted == numberOrchestrations
? $"all {numberOrchestrations} orchestration instances purged.\n"
: $"only {deleted}/{numberOrchestrations} orchestration instances purged.\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
// we can use this to run on a subset of the available partitions
static readonly int? restrictedPlacement = null;
public static string InstanceId(int index)
{
if (restrictedPlacement == null)
{
return $"Orch{index:X5}";
}
else
{
return $"Orch{index:X5}!{(index % restrictedPlacement):D2}";
}
}
}
}

Просмотреть файл

@ -13,27 +13,11 @@
<ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="clear.ps1">
<CopyToOutputDirectory>Never</CopyToOutputDirectory>
</None>
<None Update="delete.ps1">
<CopyToOutputDirectory>Never</CopyToOutputDirectory>
</None>
<None Update="deploy-on-premium.ps1">
<CopyToOutputDirectory>Never</CopyToOutputDirectory>
</None>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="init.ps1">
<CopyToOutputDirectory>Never</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
<None Update="run.ps1">
<CopyToOutputDirectory>Never</CopyToOutputDirectory>
</None>
<None Update="settings.ps1">
<CopyToOutputDirectory>Never</CopyToOutputDirectory>

Просмотреть файл

@ -1,26 +1,26 @@
#!/usr/bin/pwsh
param (
$Plan="EP2",
$NumNodes="4",
$Configuration="Release"
)
# review these parameters before running the script
$numNodes=12
$planSku="EP2"
# read the parameters
# read the settings
. ./settings.ps1
# enter the directory with the release binaries
if (-not (Test-Path -Path ./bin/Release/netcoreapp3.1/bin)) {
throw 'No release binaries found. Must `dotnet build -c Release` first.'
} else {
cd bin/Release/netcoreapp3.1
}
Write-Host Building $Configuration Configuration...
dotnet build -c $Configuration
# enter the directory with the binaries
Push-Location -Path bin/$Configuration/netcoreapp3.1
# look up the eventhubs namespace connection string
$eventHubsConnectionString = (az eventhubs namespace authorization-rule keys list --resource-group $groupName --namespace-name $namespaceName --name RootManageSharedAccessKey | ConvertFrom-Json).primaryConnectionString
if ((az functionapp show --resource-group $groupName --name $functionAppName | ConvertFrom-Json).name -ne $functionAppName)
if (-not ((az functionapp list -g $groupName --query "[].name"| ConvertFrom-Json) -contains $functionAppName))
{
Write-Host "Creating Function App..."
az functionapp plan create --resource-group $groupName --name $functionAppName --location $location --sku $planSku
Write-Host "Creating $Plan Function App..."
az functionapp plan create --resource-group $groupName --name $functionAppName --location $location --sku $Plan
az functionapp create --name $functionAppName --storage-account $storageName --plan $functionAppName --resource-group $groupName --functions-version 3
az functionapp config set -n $functionAppName -g $groupName --use-32bit-worker-process false
az functionapp config appsettings set -n $functionAppName -g $groupName --settings EventHubsConnection=$eventHubsConnectionString
@ -30,9 +30,11 @@ else
Write-Host "Function app already exists."
}
Write-Host "Configuring Scale..."
az functionapp plan update -g $groupName -n $functionAppName --max-burst $numNodes --number-of-workers $numNodes --min-instances $numNodes
az resource update -n $functionAppName/config/web -g $groupName --set properties.minimumElasticInstanceCount=$numNodes --resource-type Microsoft.Web/sites
Write-Host "Configuring Scale=$NumNodes..."
az functionapp plan update -g $groupName -n $functionAppName --max-burst $numNodes --number-of-workers $numNodes --min-instances $numNodes
az resource update -n $functionAppName/config/web -g $groupName --set properties.minimumElasticInstanceCount=$numNodes --resource-type Microsoft.Web/sites
Write-Host "Publishing Code to Function App..."
func azure functionapp publish $functionAppName
Pop-Location

Просмотреть файл

Просмотреть файл

@ -1,13 +1,16 @@
#!/usr/bin/pwsh
param (
$Configuration="Release"
)
# read the parameters
. ./settings.ps1
# enter the directory with the debug binaries
if (-not (Test-Path -Path ./bin/Debug/netcoreapp3.1/bin)) {
throw 'No debug binaries found. Must `dotnet build` first.'
# enter the directory with the binaries
if (-not (Test-Path -Path ./bin/$Configuration/netcoreapp3.1/bin)) {
throw "No $Configuration binaries found. Must `dotnet build -c $Configuration` first."
} else {
cd bin/Debug/netcoreapp3.1
Push-Location -Path bin/$Configuration/netcoreapp3.1
}
# look up the two connection strings and assign them to the respective environment variables
@ -15,4 +18,6 @@ $Env:AzureWebJobsStorage = (az storage account show-connection-string --name $st
$Env:EventHubsConnection = (az eventhubs namespace authorization-rule keys list --resource-group $groupName --namespace-name $namespaceName --name RootManageSharedAccessKey | ConvertFrom-Json).primaryConnectionString
# start the function app locally
func start --no-build
func start --no-build
Pop-Location

Просмотреть файл

@ -0,0 +1,53 @@
#!/usr/bin/pwsh
param (
$Plan="EP2",
$NumNodes="4",
$Configuration="Release",
$WaitForDeploy=60,
$NumReps=6,
$Orchestration="HelloSequence",
$NumOrchestrations=10000,
$StarterEntities=200,
$DelayAfterRun=25
)
# import the parameter settings from the file in the same directory
. ./settings.ps1
# create the resources
./scripts/init.ps1
# deploy to a premium plan
./scripts/deploy.ps1 -Plan $Plan -NumNodes $NumNodes -Configuration $Configuration
Write-Host "Waiting $WaitForDeploy seconds for deployment to load-balance and start partitions..."
Start-Sleep -Seconds $WaitForDeploy
for($i = 0; $i -lt $NumReps; $i++)
{
Write-Host "---------- Experiment $i/$NumReps"
if ($StarterEntities -eq 0)
{
$arg = $Orchestration + "." + $NumOrchestrations
}
else
{
$arg = $Orchestration + "." +$NumOrchestrations + "." + $StarterEntities
}
Write-Host "Starting $arg orchestrations..."
curl.exe https://$functionAppName.azurewebsites.net/start -d $arg
Write-Host "Waiting $DelayAfterRun seconds before checking results..."
Start-Sleep -Seconds $DelayAfterRun
Write-Host "Checking results..."
curl.exe https://$functionAppName.azurewebsites.net/count -d $NumOrchestrations
Write-Host "Deleting $NumOrchestrations instances..."
curl.exe https://$functionAppName.azurewebsites.net/purge -d $NumOrchestrations
}
Write-Host "Deleting all resources..."
az group delete --name $groupName --yes