add a new producerConsumer performance test, for measuring throughput between entities (#21)

This commit is contained in:
Sebastian Burckhardt 2021-03-26 11:01:17 -07:00 коммит произвёл GitHub
Родитель d19e8feb91
Коммит 4c7fb58edc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 587 добавлений и 13 удалений

Просмотреть файл

@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.ProducerConsumer
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
public static class Completion
{
public enum Ops { Init, ProducerDone, ConsumerDone, Get, Ping }
public class State
{
public int completedProducers;
public int completedConsumers;
public DateTime? productionCompleted;
public DateTime? consumptionCompleted;
public Parameters parameters;
}
[FunctionName(nameof(Completion))]
public static Task HandleOperation(
[EntityTrigger] IDurableEntityContext context, ILogger log)
{
State state = context.GetState(() => new State());
switch (Enum.Parse<Ops>(context.OperationName))
{
case Ops.Init:
state.completedConsumers = 0;
state.parameters = context.GetInput<Parameters>();
log.LogWarning($"Initialized at {DateTime.UtcNow:o}");
break;
case Ops.ProducerDone:
if (++state.completedProducers >= state.parameters.producers)
{
state.productionCompleted = DateTime.UtcNow;
log.LogWarning($"Completed Production at {DateTime.UtcNow:o}");
}
break;
case Ops.ConsumerDone:
if (++state.completedConsumers >= state.parameters.consumers)
{
state.consumptionCompleted = DateTime.UtcNow;
log.LogWarning($"Completed Consumption at {DateTime.UtcNow:o}");
}
break;
case Ops.Get:
context.Return(state);
break;
case Ops.Ping:
break;
}
return Task.CompletedTask;
}
}
}

Просмотреть файл

@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.ProducerConsumer
{
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
public static class Consumer
{
public enum Ops { Init, Item, Ping }
public class State
{
public int[] count;
public string error;
public Parameters parameters;
}
[FunctionName(nameof(Consumer))]
public static async Task HandleOperation(
[EntityTrigger] IDurableEntityContext context, ILogger log)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var state = context.GetState(() => new State());
log.LogTrace("{Entity} loaded state in {Latency:F3}ms", context.EntityId, stopwatch.Elapsed.TotalMilliseconds);
switch (Enum.Parse<Ops>(context.OperationName))
{
case Ops.Init:
log.LogInformation("{Consumer} initializing; functionInstanceId {FunctionInstanceId}", context.EntityId, context.FunctionBindingContext.FunctionInstanceId);
state.parameters = context.GetInput<Parameters>();
state.count = new int[state.parameters.producers];
break;
case Ops.Item:
var input = context.GetInput<string>();
log.LogTrace("{Entity} read input in {Latency:F3}ms", context.EntityId, stopwatch.Elapsed.TotalMilliseconds);
var (producer, index) = Producer.Decode(input);
if (state.count[producer] == index)
{
log.LogTrace("{Consumer} received item {Count} from {Producer}", context.EntityId, index, producer);
}
else
{
state.error ??= $"producer:{producer} expected:{state.count[producer]} actual:{index}";
log.LogError("{Consumer} received item {Count} from {Producer}, expected {Expected}", context.EntityId, index, producer, state.count[producer]);
}
state.count[producer]++;
if (state.count.Sum() == state.parameters.SignalsPerConsumer)
{
context.SignalEntity(state.parameters.GetCompletionEntity(), nameof(Completion.Ops.ConsumerDone));
}
break;
case Ops.Ping:
break;
}
log.LogInformation("{Entity} performed {Operation} in {Latency:F3}ms", context.EntityId, context.OperationName, stopwatch.Elapsed.TotalMilliseconds);
}
}
}

Просмотреть файл

@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.ProducerConsumer
{
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Newtonsoft.Json;
/// <summary>
/// A parameter object passed to the orchestration that starts the test.
/// </summary>
[JsonObject(MemberSerialization.OptOut)]
public class Parameters
{
/// <summary>
/// The number of producer entities.
/// </summary>
public int producers { get; set; }
/// <summary>
/// The number of consumer entities.
/// </summary>
public int consumers { get; set; }
/// <summary>
/// THe number of signal batches sent by each producer entity.
/// </summary>
public int batches { get; set; }
/// <summary>
/// The number of signals contained in each batch.
/// </summary>
public int batchsize { get; set; }
/// <summary>
/// The size of the signal payload, in bytes
/// </summary>
public int messagesize { get; set; }
/// <summary>
/// The number of partitions that are used for placing producer entities.
/// </summary>
public int producerPartitions { get; set; }
/// <summary>
/// The number of partitions that are used for placing consumer entities.
/// </summary>
public int consumerPartitions { get; set; }
/// <summary>
/// A name for this test, to be used in the results table, and as an entity prefix.
/// Filled automatically if not explicitly specified.
/// </summary>
public string testname { get; set; }
/// <summary>
/// A time for keeping entities alive after the test, in minutes. If larger than zero,
/// we are pinging all entities on a 15-second interval at the end of the test.
/// This can be helpful on consumption plans, to ensure logs or telemetry are transmitted before scaling to zero.
/// </summary>
public int keepAliveMinutes { get; set; }
[JsonIgnore]
public int SignalsPerProducer => this.batches * this.batchsize;
[JsonIgnore]
public int TotalNumberOfSignals => this.producers * this.SignalsPerProducer;
[JsonIgnore]
public int SignalsPerConsumer => this.TotalNumberOfSignals / this.consumers;
[JsonIgnore]
public int TotalVolume => this.TotalNumberOfSignals * this.messagesize;
public EntityId GetCompletionEntity() => new EntityId(nameof(Completion), $"{this.testname}-!{this.producerPartitions + this.consumerPartitions:D2}");
public EntityId GetProducerEntity(int i) => new EntityId(nameof(Producer), $"{i:D4}-{this.testname}-!{i % this.producerPartitions:D2}");
public EntityId GetConsumerEntity(int i) => new EntityId(nameof(Consumer), $"{i:D4}-{this.testname}-!{this.producerPartitions + i % this.consumerPartitions:D2}");
}
}

Просмотреть файл

@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.ProducerConsumer
{
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
public static class Producer
{
public enum Ops { Init, Continue, Ping }
public class State
{
public int count;
public Parameters parameters;
public DateTime? startTime;
}
[FunctionName(nameof(Producer))]
public static Task HandleOperation(
[EntityTrigger] IDurableEntityContext context,
ILogger log)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var state = context.GetState(() => new State());
log.LogInformation("{Entity} loaded state in {Latency:F3}ms", context.EntityId, stopwatch.Elapsed.TotalMilliseconds);
switch (Enum.Parse<Ops>(context.OperationName))
{
case Ops.Init:
state.count = 0;
state.parameters = context.GetInput<Parameters>();
break;
case Ops.Continue:
{
var builder = new StringBuilder();
var producer = int.Parse(context.EntityKey.Substring(0, context.EntityKey.IndexOf('-')));
log.LogDebug("{Producer} is starting a batch", context.EntityId);
// send a batch of messages
for (int c = 0; c < state.parameters.batchsize && state.count < state.parameters.SignalsPerProducer; ++c, ++state.count)
{
var index = state.count / state.parameters.consumers;
var consumer = state.count % state.parameters.consumers;
context.SignalEntity(
state.parameters.GetConsumerEntity(consumer),
nameof(Consumer.Ops.Item),
Encode(builder, producer, index, state.parameters.messagesize));
log.LogTrace("{Producer} sent item ({Index},{Consumer})", context.EntityId, index, consumer);
}
if (state.count < state.parameters.SignalsPerProducer)
{
// send continue message to self
context.SignalEntity(context.EntityId, nameof(Ops.Continue));
}
else
{
// producer is finished
log.LogWarning("{Producer} is done", context.EntityId);
context.SignalEntity(state.parameters.GetCompletionEntity(), nameof(Completion.Ops.ProducerDone));
}
}
break;
case Ops.Ping:
break;
}
log.LogInformation("{Entity} performed {Operation} in {Latency:F3}ms", context.EntityId, context.OperationName, stopwatch.Elapsed.TotalMilliseconds);
return Task.CompletedTask;
}
static string Encode(StringBuilder builder, int sender, int index, int size)
{
builder.Clear();
builder.Append(sender.ToString());
builder.Append(',');
builder.Append(index.ToString());
builder.Append('.');
// pad to get to desired size with UTF-32
while (builder.Length * 4 < size)
{
builder.Append('a');
}
return builder.ToString();
}
public static (int sender, int index) Decode(string payload)
{
var pos1 = payload.IndexOf(',');
var pos2 = payload.IndexOf('.');
var sender = int.Parse(payload.Substring(0, pos1));
var index = int.Parse(payload.Substring(pos1 + 1, pos2 - pos1 - 1));
return (sender, index);
}
}
}

Просмотреть файл

@ -0,0 +1,127 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.ProducerConsumer
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
public static class HttpSurface
{
[FunctionName("ProducerConsumer")]
public static async Task<IActionResult> ProducerConsumer(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "producerconsumer")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
string request = await new StreamReader(req.Body).ReadToEndAsync();
Parameters parameters;
switch (request)
{
case "minimal":
parameters = new Parameters()
{
producers = 1,
producerPartitions = 1,
consumers = 1,
consumerPartitions = 1,
batches = 1,
batchsize = 1,
messagesize = 0,
};
break;
case "pipe-10x300x1k":
parameters = new Parameters()
{
producers = 1,
producerPartitions = 1,
consumers = 1,
consumerPartitions = 1,
batches = 10,
batchsize = 300,
messagesize = 1000,
};
break;
case "pipe-10x3x100k":
parameters = new Parameters()
{
producers = 1,
producerPartitions = 1,
consumers = 1,
consumerPartitions = 1,
batches = 10,
batchsize = 3,
messagesize = 100000,
};
break;
case "pipe-1x3000x1k":
parameters = new Parameters()
{
producers = 1,
producerPartitions = 1,
consumers = 1,
consumerPartitions = 1,
batches = 1,
batchsize = 3000,
messagesize = 1000,
};
break;
case "debug":
parameters = new Parameters()
{
producers = 10,
producerPartitions = 2,
consumers = 10,
consumerPartitions = 2,
batches = 3,
batchsize = 100,
messagesize = 1000,
};
break;
default:
parameters = JsonConvert.DeserializeObject<Parameters>(request);
break;
}
if (string.IsNullOrEmpty(parameters.testname))
{
parameters.testname = string.Format($"producerconsumer{Guid.NewGuid().ToString("N").Substring(0,5)}");
}
IActionResult result;
// initialize and start the shuffle
int partitionId = parameters.producerPartitions + parameters.consumerPartitions;
string instanceId = $"{parameters.testname}!{partitionId:D2}";
string orchestrationInstanceId = await client.StartNewAsync(nameof(ProducerConsumerOrchestration), instanceId, parameters);
result = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromMinutes(5));
return result;
}
catch (Exception e)
{
return new ObjectResult(
new
{
error = e.ToString(),
});
}
}
}
}

Просмотреть файл

@ -0,0 +1,114 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.ProducerConsumer
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
public static class ProducerConsumerOrchestration
{
[FunctionName(nameof(ProducerConsumerOrchestration))]
public static async Task<string> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
var parameters = context.GetInput<Parameters>();
void progress(string msg)
{
if (!context.IsReplaying)
{
logger.LogWarning($"{context.InstanceId} {context.CurrentUtcDateTime:o} {msg}");
}
}
// initialize all entities
progress("Initializing Entities");
var tasks = new[] { context.CallEntityAsync(parameters.GetCompletionEntity(), nameof(Completion.Ops.Init), parameters) }
.Concat(Enumerable.Range(0, parameters.producers)
.Select(i => context.CallEntityAsync(parameters.GetProducerEntity(i), nameof(Producer.Ops.Init), parameters)))
.Concat(Enumerable.Range(0, parameters.consumers)
.Select(i => context.CallEntityAsync(parameters.GetConsumerEntity(i), nameof(Consumer.Ops.Init), parameters)))
.ToArray();
await Task.WhenAll(tasks);
// start sender entities
progress("Starting Producers");
for (int i = 0; i < parameters.producers; i++)
{
context.SignalEntity(
parameters.GetProducerEntity(i),
nameof(Producer.Ops.Continue),
parameters);
}
DateTime startTime = context.CurrentUtcDateTime;
// poll the completion entity until the expected count is reached
string result = null;
while ((context.CurrentUtcDateTime - startTime) < TimeSpan.FromMinutes(5))
{
progress("Checking for completion");
var response = await context.CallEntityAsync<Completion.State>(parameters.GetCompletionEntity(), nameof(Completion.Ops.Get));
if (response.productionCompleted.HasValue && response.consumptionCompleted.HasValue)
{
result = JsonConvert.SerializeObject(new {
startTime = startTime,
production = (response.productionCompleted.Value - startTime).TotalSeconds,
consumption = (response.consumptionCompleted.Value - startTime).TotalSeconds,
}) + '\n';
break;
}
await context.CreateTimer(context.CurrentUtcDateTime + TimeSpan.FromSeconds(6), CancellationToken.None);
}
if (result == null)
{
result = $"Timed out after {(context.CurrentUtcDateTime - startTime)}\n";
}
else if (parameters.keepAliveMinutes > 0)
{
progress("Running keep-alive");
var starttime = context.CurrentUtcDateTime;
while (context.CurrentUtcDateTime < starttime + TimeSpan.FromMinutes(parameters.keepAliveMinutes))
{
context.SignalEntity(parameters.GetCompletionEntity(), nameof(Completion.Ops.Ping));
for (int i = 0; i < parameters.producers; i++)
{
context.SignalEntity(parameters.GetProducerEntity(i), nameof(Producer.Ops.Ping));
}
for (int i = 0; i < parameters.consumers; i++)
{
context.SignalEntity(parameters.GetConsumerEntity(i), nameof(Consumer.Ops.Ping));
}
await context.CreateTimer<object>(context.CurrentUtcDateTime + TimeSpan.FromSeconds(15), null, CancellationToken.None);
progress($"Keepalive {parameters.testname}, elapsed = {(context.CurrentUtcDateTime - startTime).TotalMinutes}m");
}
}
progress(result);
return result;
}
}
}

Просмотреть файл

@ -4,27 +4,28 @@
"logLevel": {
"default": "Warning",
// No need to trace function information for all functions
"Function": "None",
"Host": "None",
// ------ NOTE: the log levels in this section affect some, but NOT ALL consumers
// - limits what gets displayed in the func.exe console (which is extremely costly)
// - limits what gets stored in application insights
// - does not limit what is collected by ETW
// - does not limit what is traced to console
// - does not limit what is shown in Live Metrics side panel
// ---- the triggers below can be enabled to get specific host tracing
"Host": "None",
//"Host.Aggregator": "Trace",
//"Host.Results": "Error",
//"Host.Triggers.DurableTask": "Information",
// ---- the triggers below can be enabled to get information on specific functions
"Function": "None",
//"Function.HelloCities": "Information",
//"Function.HelloSequence": "Information",
//"Function.Sender": "Information",
//"Function.Receiver": "Information",
//"Function.Completion": "Information",
// ------ The log levels below affect some (but not all) consumers
// - limits what's displayed in the func.exe console
// - limits what gets stored in application insights
// - does not limit what is collected by ETW
// - does not limit what is traced to console
// - does not limit what is shown in Live Metrics side panel
"Function.Producer": "Warning",
"Function.Consumer": "Warning",
"Function.Completion": "Warning",
// --- the levels below are used to control the Netherite tracing
"DurableTask.Netherite": "Information",
"DurableTask.Netherite.FasterStorage": "Warning",
"DurableTask.Netherite.EventHubsTransport": "Warning",