diff --git a/test/PerformanceTests/Benchmarks/Bank/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/Bank/HttpTriggers.cs index 43205c9..ff8b273 100644 --- a/test/PerformanceTests/Benchmarks/Bank/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/Bank/HttpTriggers.cs @@ -15,6 +15,7 @@ namespace PerformanceTests.Bank using System.Collections.Generic; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using System.Linq; + using System.Net; /// /// A microbenchmark using durable entities for bank accounts, and an orchestration with a critical section for transferring @@ -37,7 +38,7 @@ namespace PerformanceTests.Bank } catch (Exception e) { - return new ObjectResult(e.ToString()); + return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError }; } } } diff --git a/test/PerformanceTests/Benchmarks/CollisionSearch/FlatParallelSearch.cs b/test/PerformanceTests/Benchmarks/CollisionSearch/FlatParallelSearch.cs index 6cf2692..2a26326 100644 --- a/test/PerformanceTests/Benchmarks/CollisionSearch/FlatParallelSearch.cs +++ b/test/PerformanceTests/Benchmarks/CollisionSearch/FlatParallelSearch.cs @@ -13,7 +13,7 @@ namespace PerformanceTests.CollisionSearch using System.Linq; /// - /// An orchestration that searches for hash collisions using a recursive divide-and-conquer algorithm. + /// An orchestration that searches for hash collisions using the simple fan-out-fan-in pattern. /// public static class FlatParallelSearch { diff --git a/test/PerformanceTests/Benchmarks/CollisionSearch/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/CollisionSearch/HttpTriggers.cs index 8e16ff5..f60b2ca 100644 --- a/test/PerformanceTests/Benchmarks/CollisionSearch/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/CollisionSearch/HttpTriggers.cs @@ -13,6 +13,7 @@ namespace PerformanceTests.CollisionSearch using Microsoft.Azure.WebJobs.Extensions.DurableTask; using System.Net.Http; using Newtonsoft.Json.Linq; + using System.Net; public static class HttpTriggers { @@ -79,7 +80,7 @@ namespace PerformanceTests.CollisionSearch } catch (Exception e) { - return new ObjectResult(new { error = e.ToString() }); + return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError }; } } } diff --git a/test/PerformanceTests/Benchmarks/Counter/Counter.cs b/test/PerformanceTests/Benchmarks/Counter/Counter.cs index cd4cc1b..1295c55 100644 --- a/test/PerformanceTests/Benchmarks/Counter/Counter.cs +++ b/test/PerformanceTests/Benchmarks/Counter/Counter.cs @@ -24,25 +24,36 @@ namespace PerformanceTests.Orchestrations.Counter public class Counter { - [JsonProperty("value")] public int CurrentValue { get; set; } - [JsonProperty("modified")] + public DateTime? StartTime { get; set; } + public DateTime LastModified { get; set; } public void Add(int amount) { this.CurrentValue += amount; - this.LastModified = DateTime.UtcNow; + this.UpdateTimestamps(); } - public void Reset() + void UpdateTimestamps() { - this.CurrentValue = 0; + if (!this.StartTime.HasValue) + { + this.StartTime = DateTime.UtcNow; + } this.LastModified = DateTime.UtcNow; } - public (int, DateTime) Get() => (this.CurrentValue, DateTime.UtcNow); + public void Crash(DateTime timeStamp) + { + // crash if less that 4 seconds have passed since the signal was sent + // (so that we no longer crash when retrying after recovery) + if ((DateTime.UtcNow - timeStamp) < TimeSpan.FromSeconds(4)) + { + System.Environment.Exit(333); + } + } [FunctionName(nameof(Counter))] public static Task Run([EntityTrigger] IDurableEntityContext ctx) diff --git a/test/PerformanceTests/Benchmarks/Counter/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/Counter/HttpTriggers.cs index fee4990..5a08774 100644 --- a/test/PerformanceTests/Benchmarks/Counter/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/Counter/HttpTriggers.cs @@ -7,6 +7,7 @@ namespace PerformanceTests.Orchestrations.Counter using System.Diagnostics; using System.IO; using System.Linq; + using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http; @@ -18,191 +19,128 @@ namespace PerformanceTests.Orchestrations.Counter public static class HttpTriggers { - class Input - { - public string Key { get; set; } - public int Expected { get; set; } - } - - [FunctionName(nameof(WaitForCount))] - public static async Task WaitForCount( - [HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = nameof(WaitForCount))] HttpRequest req, - [DurableClient] IDurableClient client) - { - string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - var input = JsonConvert.DeserializeObject(requestBody); - var entityId = new EntityId("Counter", input.Key); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); - - // poll the entity until the expected count is reached - while (stopwatch.Elapsed < TimeSpan.FromMinutes(5)) - { - var response = await client.ReadEntityStateAsync(entityId); - - if (response.EntityExists - && response.EntityState.CurrentValue >= input.Expected) - { - return new OkObjectResult($"{JsonConvert.SerializeObject(response.EntityState)}\n"); - } - - await Task.Delay(TimeSpan.FromSeconds(2)); - } - - return new OkObjectResult("timed out.\n"); - } - - [FunctionName(nameof(Increment))] - public static async Task Increment( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = nameof(Increment))] HttpRequest req, + [FunctionName(nameof(Add))] + public static async Task Add( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "counter/{key}/add")] HttpRequest req, + string key, [DurableClient] IDurableClient client) { try { - string entityKey = await new StreamReader(req.Body).ReadToEndAsync(); - var entityId = new EntityId("Counter", entityKey); - await client.SignalEntityAsync(entityId, "add", 1); - return new OkObjectResult($"increment was sent to {entityId}.\n"); + string input = await new StreamReader(req.Body).ReadToEndAsync(); + int amount = int.Parse(input); + var entityId = new EntityId("Counter", key); + await client.SignalEntityAsync(entityId, "add", amount); + return new OkObjectResult($"add({amount}) was sent to {entityId}.\n"); } catch (Exception e) { - return new OkObjectResult(e.ToString()); + return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } + } + + [FunctionName(nameof(ReadCounter))] + public static async Task ReadCounter( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "counter/{key}")] HttpRequest req, + string key, + [DurableClient] IDurableClient client) + { + try + { + var entityId = new EntityId("Counter", key); + var response = await client.ReadEntityStateAsync(entityId); + + if (!response.EntityExists) + { + return new NotFoundObjectResult($"no such entity: {entityId}"); + } + else + { + return new OkObjectResult(response.EntityState); + } + } + catch (Exception e) + { + return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } + } + + [FunctionName(nameof(DeleteCounter))] + public static async Task DeleteCounter( + [HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route = "counter/{key}")] HttpRequest req, + string key, + [DurableClient] IDurableClient client) + { + try + { + var entityId = new EntityId("Counter", key); + await client.SignalEntityAsync(entityId, "delete"); + return new OkObjectResult($"delete was sent to {entityId}.\n"); + } + catch (Exception e) + { + return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } + } + + [FunctionName(nameof(CrashCounter))] + public static async Task CrashCounter( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "counter/{key}/crash")] HttpRequest req, + string key, + [DurableClient] IDurableClient client) + { + try + { + var entityId = new EntityId("Counter", key); + await client.SignalEntityAsync(entityId, "crash", DateTime.UtcNow); + return new OkObjectResult($"crash was sent to {entityId}.\n"); + } + catch (Exception e) + { + return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError }; } } [FunctionName(nameof(CountSignals))] public static async Task CountSignals( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = nameof(CountSignals))] HttpRequest req, - [DurableClient] IDurableClient client) + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "signal-counter/{count}")] HttpRequest req, + int count, + [DurableClient] IDurableClient client) { try { - int numberSignals = int.Parse(await new StreamReader(req.Body).ReadToEndAsync()); - var entityId = new EntityId("Counter", Guid.NewGuid().ToString("N")); + string key = Guid.NewGuid().ToString("N"); + var entityId = new EntityId("Counter", key); DateTime startTime = DateTime.UtcNow; - // send the specified number of signals to the entity - await SendIncrementSignals(client, numberSignals, 50, (i) => entityId); + // send the specified number of signals to the counter entity + // for better send throughput we do not send signals one at a time, but use parallel tasks + await Enumerable.Range(0, count).ParallelForEachAsync(50, true, i => client.SignalEntityAsync(entityId, "add", 1)); // poll the entity until the expected count is reached - while ((DateTime.UtcNow - startTime) < TimeSpan.FromMinutes(5)) + while ((DateTime.UtcNow - startTime) < TimeSpan.FromMinutes(3)) { var response = await client.ReadEntityStateAsync(entityId); if (response.EntityExists - && response.EntityState.CurrentValue == numberSignals) + && response.EntityState.CurrentValue == count) { - return new OkObjectResult($"received {numberSignals} signals in {(response.EntityState.LastModified - startTime).TotalSeconds:F1}s.\n"); + var state = response.EntityState; + var elapsedSeconds = (state.LastModified - state.StartTime.Value).TotalSeconds; + var throughput = count / elapsedSeconds; + return new OkObjectResult(new { elapsedSeconds, count, throughput }); } - await Task.Delay(TimeSpan.FromSeconds(2)); + await Task.Delay(TimeSpan.FromSeconds(5)); } - return new OkObjectResult($"timed out after {(DateTime.UtcNow - startTime)}.\n"); + return new OkObjectResult($"timed out after {(DateTime.UtcNow - startTime)}.\n") { StatusCode = (int)HttpStatusCode.RequestTimeout }; } catch (Exception e) { - return new OkObjectResult(e.ToString()); - } - } - - [FunctionName(nameof(CountParallelSignals))] - public static async Task CountParallelSignals( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = nameof(CountParallelSignals))] HttpRequest req, - [DurableClient] IDurableClient client) - { - try - { - // input is of the form "nnn,mmm" - // nnn - number of signals to send - // mmm - number of entities to distribute the signals over - - string input = await new StreamReader(req.Body).ReadToEndAsync(); - int commaPosition = input.IndexOf(','); - int numberSignals = int.Parse(input.Substring(0, commaPosition)); - int numberEntities = int.Parse(input.Substring(commaPosition + 1)); - var entityPrefix = Guid.NewGuid().ToString("N"); - EntityId MakeEntityId(int i) => new EntityId("Counter", $"{entityPrefix}-!{i:D6}"); - DateTime startTime = DateTime.UtcNow; - - if (numberSignals % numberEntities != 0) - { - throw new ArgumentException("numberSignals must be a multiple of numberEntities"); - } - - // send the specified number of signals to the entity - await SendIncrementSignals(client, numberSignals, 50, (i) => MakeEntityId(i % numberEntities)); - - // poll the entities until the expected count is reached - async Task WaitForCount(int i) - { - var random = new Random(); - - while ((DateTime.UtcNow - startTime) < TimeSpan.FromMinutes(5)) - { - var response = await client.ReadEntityStateAsync(MakeEntityId(i)); - - if (response.EntityExists - && response.EntityState.CurrentValue == numberSignals / numberEntities) - { - return (response.EntityState.LastModified - startTime).TotalSeconds; - } - - await Task.Delay(TimeSpan.FromSeconds(2 + random.NextDouble())); - } - - return null; - }; - - var waitTasks = Enumerable.Range(0, numberEntities).Select(i => WaitForCount(i)).ToList(); - - await Task.WhenAll(waitTasks); - - var results = waitTasks.Select(t => t.Result); - - if (results.Any(result => result == null)) - { - return new OkObjectResult($"timed out after {(DateTime.UtcNow - startTime)}.\n"); - } - - return new OkObjectResult($"received {numberSignals} signals on {numberEntities} entities in {results.Max():F1}s.\n"); - } - catch (Exception e) - { - return new OkObjectResult(e.ToString()); - } - } - - static async Task SendIncrementSignals(IDurableClient client, int numberSignals, int maxConcurrency, Func entityIdFactory) - { - // send the specified number of signals to the entity - // for better throughput we do this in parallel - - using var semaphore = new SemaphoreSlim(maxConcurrency); - - for (int i = 0; i < numberSignals; i++) - { - var entityId = entityIdFactory(i); - await semaphore.WaitAsync(); - var task = Task.Run(async () => - { - try - { - await client.SignalEntityAsync(entityId, "add", 1); - } - finally - { - semaphore.Release(); - } - }); - } - - // wait for tasks to complete - for (int i = 0; i < maxConcurrency; i++) - { - await semaphore.WaitAsync(); + return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError }; } } } -} +} \ No newline at end of file diff --git a/test/PerformanceTests/Benchmarks/EventHubs/DestinationEntity.cs b/test/PerformanceTests/Benchmarks/EventHubs/DestinationEntity.cs new file mode 100644 index 0000000..7f0d98c --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/DestinationEntity.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventHubs +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Microsoft.Azure.WebJobs; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + + [JsonObject(MemberSerialization.OptIn)] + public class DestinationEntity + { + public static EntityId GetEntityId(int number) => new EntityId(nameof(DestinationEntity), $"!{number}"); + + [JsonProperty] + public int EventCount { get; set; } + + [JsonProperty] + public DateTime? LastUpdated { get; set; } + + + ILogger logger; + + public DestinationEntity(ILogger logger) + { + this.logger = logger; + } + + public ILogger Logger { set { this.logger = value; } } + + public void Delete() + { + Entity.Current.DeleteState(); + } + + public void Receive((Event evt, int receiverPartition) input) + { + this.EventCount++; + + if (this.logger.IsEnabled(LogLevel.Debug)) + { + this.logger.LogInformation("{entityId} Received signal #{eventCount}", Entity.Current.EntityId, this.EventCount); + } + + this.LastUpdated = DateTime.UtcNow; + + // send an ack to the receiver entity + Entity.Current.SignalEntity(PullerEntity.GetEntityId(input.receiverPartition), nameof(PullerEntity.Ack)); + } + + [FunctionName(nameof(DestinationEntity))] + public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger) + { + return context.DispatchAsync(logger); + } + } +} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/DestinationOrchestration.cs b/test/PerformanceTests/Benchmarks/EventHubs/DestinationOrchestration.cs new file mode 100644 index 0000000..0cd0f9c --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/DestinationOrchestration.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventHubs +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Microsoft.Azure.WebJobs; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + + public static class DestinationOrchestration + { + [FunctionName(nameof(DestinationOrchestration))] + public static async Task RunOrchestrator( + [OrchestrationTrigger] IDurableOrchestrationContext context, + ILogger logger) + { + var input = context.GetInput<(Event evt, int receiverPartition)> (); + + logger.LogInformation("Started orchestration {instanceId}", context.InstanceId); + + // execute an activity with the event payload as the input + await context.CallActivityAsync(nameof(Activities.SayHello), input.evt.Payload); + + logger.LogInformation("Completed orchestration {instanceId}", context.InstanceId); + + // when this orchestration completes, we notify the puller entity + context.SignalEntity(PullerEntity.GetEntityId(input.receiverPartition), nameof(PullerEntity.Ack)); + } + } +} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/Event.cs b/test/PerformanceTests/Benchmarks/EventHubs/Event.cs index 47a2e0e..4e0f87d 100644 --- a/test/PerformanceTests/Benchmarks/EventHubs/Event.cs +++ b/test/PerformanceTests/Benchmarks/EventHubs/Event.cs @@ -5,21 +5,33 @@ namespace PerformanceTests.EventHubs { using System; using System.Collections.Generic; + using System.IO; using System.Text; using Newtonsoft.Json; - [JsonObject(MemberSerialization.OptOut)] public struct Event { - public int Partition { get; set; } + public int Destination { get; set; } + + public string Payload { get; set; } - public long SeqNo { get; set; } - - public byte[] Payload { get; set; } - - public override string ToString() + public static Event FromStream(Stream s) { - return $"Event {this.Partition}.{this.SeqNo} size={this.Payload.Length}"; + var r = new BinaryReader(s); + return new Event + { + Destination = r.ReadInt32(), + Payload = r.ReadString(), + }; + } + + public byte[] ToBytes() + { + var m = new MemoryStream(); + var r = new BinaryWriter(m); + r.Write(this.Destination); + r.Write(this.Payload); + return m.ToArray(); } } } diff --git a/test/PerformanceTests/Benchmarks/EventHubs/EventHubTrigger.cs b/test/PerformanceTests/Benchmarks/EventHubs/EventHubTrigger.cs deleted file mode 100644 index cc5a49c..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/EventHubTrigger.cs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventHubs -{ - using System; - using System.Collections.Generic; - using System.IO; - using System.Threading.Tasks; - using Microsoft.Azure.EventHubs; - using Microsoft.Azure.WebJobs; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Extensions.Logging; - - public static class EventHubTrigger - { - // must set this manually, and comment out the [Disable], to test the trigger - const int numEntities = 1000; - - [Disable] - [FunctionName("MyEventHubTrigger")] - public static async Task RunAsync( - [EventHubTrigger(Parameters.EventHubName, Connection = "EventHubsConnection")] EventData[] eventDataSet, - [DurableClient] IDurableClient client, - ILogger log) - { - int numTries = 0; - - while (numTries < 10) - { - numTries++; - - try - { - var signalTasks = new List(); - - // send one signal for each packet - for (int i = 0; i < eventDataSet.Length; i++) - { - var eventData = eventDataSet[i]; - var sp = eventData.SystemProperties; - byte[] payload = eventData.Body.ToArray(); - var evt = new Event() - { - Partition = payload[0], - SeqNo = eventData.SystemProperties.SequenceNumber, - Payload = payload - }; - log.LogDebug($"Sending signal for {evt}"); - signalTasks.Add(client.SignalEntityAsync(Parameters.GetDestinationEntity(evt, numEntities), nameof(ReceiverEntity.Receive), evt)); - } - - log.LogInformation($"Sent {eventDataSet.Length} signals"); - - await Task.WhenAll(signalTasks); - - break; - } - catch (NullReferenceException) - { - // during startup we may get these exceptions if the trigger goes off before the - // Orchestration service has been started - await Task.Delay(TimeSpan.FromSeconds(1)); - } - } - } - } -} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Channels.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Channels.cs new file mode 100644 index 0000000..50bfc6e --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Channels.cs @@ -0,0 +1,82 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventHubs +{ + using System; + using System.IO; + using System.Threading.Tasks; + using Microsoft.AspNetCore.Mvc; + using Microsoft.Azure.WebJobs; + using Microsoft.Azure.WebJobs.Extensions.Http; + using Microsoft.AspNetCore.Http; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Azure.Messaging.EventHubs.Consumer; + using Azure.Messaging.EventHubs.Producer; + using System.Collections.Generic; + using System.Linq; + using Microsoft.Extensions.Logging; + using System.Net; + + public static class Channels + { + [FunctionName("EH_" + nameof(Channels))] + public static async Task Run( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/channels/{action}")] HttpRequest req, + [DurableClient] IDurableClient client, + string action, + ILogger log) + { + try + { + string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); + int numEventHubs = int.Parse(requestBody); + + if (numEventHubs < 1 || numEventHubs > Parameters.MaxEventHubs) + { + return new ObjectResult("invalid number of event hubs.\n") { StatusCode = (int)HttpStatusCode.BadRequest }; + } + + switch (action) + { + case "create": + + log.LogWarning("Creating {numEventHubs} EventHubs...", numEventHubs); + int numCreated = 0; + await Enumerable.Range(0, numEventHubs).ParallelForEachAsync(200, false, async (index) => + { + if (await EventHubsUtil.EnsureEventHubExistsAsync(Parameters.EventHubConnectionString, Parameters.EventHubName(index), Parameters.MaxPartitionsPerEventHub)) + { + numCreated++; + } + }); + return new OkObjectResult($"Created {numCreated} EventHubs.\n"); + + case "delete": + + log.LogWarning("Deleting {numEventHubs} EventHubs...", numEventHubs); + int numDeleted = 0; + await Enumerable.Range(0, numEventHubs).ParallelForEachAsync(200, false, async (index) => + { + if (await EventHubsUtil.DeleteEventHubIfExistsAsync(Parameters.EventHubConnectionString, Parameters.EventHubName(index))) + { + numDeleted++; + } + }); + + return new OkObjectResult($"Deleted {numDeleted} EventHubs.\n"); + + default: + return new ObjectResult($"Unknown action: {action}\n") + { + StatusCode = (int)System.Net.HttpStatusCode.NotFound + }; + } + } + catch (Exception e) + { + return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } + } + } +} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Clear.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Clear.cs deleted file mode 100644 index d1ecb18..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Clear.cs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventHubs -{ - using System; - using System.IO; - using System.Threading.Tasks; - using Microsoft.AspNetCore.Mvc; - using Microsoft.Azure.WebJobs; - using Microsoft.Azure.WebJobs.Extensions.Http; - using Microsoft.AspNetCore.Http; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Azure.Messaging.EventHubs.Consumer; - using Azure.Messaging.EventHubs.Producer; - using System.Collections.Generic; - using System.Linq; - using Microsoft.Extensions.Logging; - - public static class Clear - { - [FunctionName("EH_" + nameof(Clear))] - public static async Task Run( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Clear))] HttpRequest req, - [DurableClient] IDurableClient client, - ILogger log) - { - try - { - string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - int numEntities = int.Parse(requestBody); - - log.LogWarning($"Deleting {numEntities + 32} entities..."); - await Enumerable.Range(0, numEntities + 32).ParallelForEachAsync(200, true, async (index) => - { - if (index <= numEntities) - { - var entityId = ReceiverEntity.GetEntityId(index); - await client.SignalEntityAsync(entityId, "delete"); - } - else - { - int partition = index - numEntities; - await client.SignalEntityAsync(PartitionReceiverEntity.GetEntityId(Parameters.EventHubName, partition.ToString()), "delete"); - } - }); - - return new OkObjectResult($"Deleted {numEntities + 32} entities.\n"); - } - catch (Exception e) - { - return new ObjectResult(new { - error = e.ToString() - }); - } - } - } -} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Consume.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Consume.cs deleted file mode 100644 index 2315469..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Consume.cs +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventHubs -{ - using System; - using System.IO; - using System.Threading.Tasks; - using Microsoft.AspNetCore.Mvc; - using Microsoft.Azure.WebJobs; - using Microsoft.Azure.WebJobs.Extensions.Http; - using Microsoft.AspNetCore.Http; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Azure.Messaging.EventHubs.Consumer; - using Azure.Messaging.EventHubs.Producer; - using System.Collections.Generic; - - public static class Consume - { - [FunctionName("EH_" + nameof(Consume))] - public static async Task Run( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Consume))] HttpRequest req, - [DurableClient] IDurableClient client) - { - try - { - string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - int numEntities = int.Parse(requestBody); - - var connectionString = Environment.GetEnvironmentVariable("EventHubsConnection"); - var eventHubName = Parameters.EventHubName; - var producer = new EventHubProducerClient(connectionString, eventHubName); - var partitionIds = await producer.GetPartitionIdsAsync(); - - List signalTasks = new List(); - foreach (var partitionId in partitionIds) - { - var partitionProperties = await producer.GetPartitionPropertiesAsync(partitionId); - var startParameters = new PartitionReceiverEntity.StartParameters() - { - NumEntities = numEntities, - PartitionId = partitionId, - NextSequenceNumberToReceive = 0, - }; - signalTasks.Add(client.SignalEntityAsync(PartitionReceiverEntity.GetEntityId(eventHubName, partitionId), nameof(PartitionReceiverEntity.Start), startParameters)); - } - await Task.WhenAll(signalTasks); - - return new OkObjectResult($"Started PartitionReceiverEntities for {signalTasks.Count} partitions.\n"); - } - catch (Exception e) - { - return new ObjectResult(new { - error = e.ToString() - }); - } - } - } -} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Consumers.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Consumers.cs new file mode 100644 index 0000000..2fcac25 --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Consumers.cs @@ -0,0 +1,147 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventHubs +{ + using System; + using System.IO; + using System.Threading.Tasks; + using Microsoft.AspNetCore.Mvc; + using Microsoft.Azure.WebJobs; + using Microsoft.Azure.WebJobs.Extensions.Http; + using Microsoft.AspNetCore.Http; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Azure.Messaging.EventHubs.Consumer; + using Azure.Messaging.EventHubs.Producer; + using System.Collections.Generic; + using System.Linq; + using System.Net; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + + public static class Consumers + { + [FunctionName("EH_" + nameof(Consumers))] + public static async Task Run( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/consumers/{action}")] HttpRequest req, + [DurableClient] IDurableClient client, + string action, + ILogger log) + { + try + { + string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); + int numPullers = int.Parse(requestBody); + + if (numPullers < 1 || numPullers > Parameters.MaxPartitions) + { + return new ObjectResult("invalid number of consumers.\n") { StatusCode = (int)HttpStatusCode.BadRequest }; + } + + switch (action) + { + case "query": + + object lockForUpdate = new object(); + + int delivered = 0; + long pulled = 0; + int pending = 0; + int active = 0; + DateTime? firstReceived = null; + DateTime? lastUpdated = null; + int errors = 0; + + log.LogWarning($"Checking the status of {numPullers} puller entities..."); + await Enumerable.Range(0, numPullers).ParallelForEachAsync(500, true, async (partition) => + { + var entityId = PullerEntity.GetEntityId(partition); + var response = await client.ReadEntityStateAsync(entityId); + if (response.EntityExists) + { + lock (lockForUpdate) + { + pulled += response.EntityState.TotalEventsPulled; + pending += response.EntityState.NumPending; + active += response.EntityState.IsActive ? 1 : 0; + errors += response.EntityState.Errors; + + if (!firstReceived.HasValue || response.EntityState.FirstReceived < firstReceived) + { + firstReceived = response.EntityState.FirstReceived; + } + } + } + }); + + log.LogWarning($"Checking the status of {Parameters.NumberDestinationEntities} destination entities..."); + await Enumerable.Range(0, Parameters.NumberDestinationEntities).ParallelForEachAsync(500, true, async (destination) => + { + var entityId = DestinationEntity.GetEntityId(destination); + var response = await client.ReadEntityStateAsync(entityId); + if (response.EntityExists) + { + lock (lockForUpdate) + { + delivered += response.EntityState.EventCount; + + if (!lastUpdated.HasValue || response.EntityState.LastUpdated > lastUpdated) + { + lastUpdated = response.EntityState.LastUpdated; + } + } + } + }); + + TimeSpan? duration = lastUpdated - firstReceived; + + var resultObject = new + { + pending, + active, + delivered, + pulled, + errors, + firstReceived, + lastUpdated, + duration + }; + + return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n"); + + case "start": + case "stop": + + await Enumerable.Range(0, numPullers).ParallelForEachAsync(200, true, async (number) => + { + var entityId = PullerEntity.GetEntityId(number); + await client.SignalEntityAsync(entityId, action); + }); + + return new OkObjectResult($"Sent {action} signal to {numPullers} puller entities.\n"); + + + case "delete": + + await Enumerable.Range(0, numPullers + Parameters.NumberDestinationEntities).ParallelForEachAsync(500, true, async (i) => + { + var entityId = i < numPullers ? PullerEntity.GetEntityId(i) : DestinationEntity.GetEntityId(i - numPullers); + await client.SignalEntityAsync(entityId, action); + }); + + return new OkObjectResult($"Sent delete signal to {numPullers} puller entities and {Parameters.NumberDestinationEntities} destination entities.\n"); + + default: + return new ObjectResult($"Unknown action: {action}\n") + { + StatusCode = (int)System.Net.HttpStatusCode.NotFound + }; + } + } + catch (Exception e) + { + return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } + } + } +} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Count.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Count.cs deleted file mode 100644 index 9395d5d..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Count.cs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventHubs -{ - using System; - using System.IO; - using System.Threading.Tasks; - using Microsoft.AspNetCore.Mvc; - using Microsoft.Azure.WebJobs; - using Microsoft.Azure.WebJobs.Extensions.Http; - using Microsoft.AspNetCore.Http; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Azure.Messaging.EventHubs.Consumer; - using Azure.Messaging.EventHubs.Producer; - using System.Collections.Generic; - using Microsoft.Extensions.Logging; - using System.Linq; - using Newtonsoft.Json; - - public static class Count - { - [FunctionName("EH_" + nameof(Count))] - public static async Task Run( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Count))] HttpRequest req, - [DurableClient] IDurableClient client, - ILogger log) - { - try - { - string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - int numEntities = int.Parse(requestBody); - - int numEvents = 0; - long earliestStart = long.MaxValue; - long latestUpdate = 0; - long bytesReceived = 0; - bool gotTimeRange = false; - - object lockForUpdate = new object(); - - var tasks = new List>(); - - log.LogWarning($"Checking the status of {numEntities} entities..."); - await Enumerable.Range(0, numEntities).ParallelForEachAsync(200, true, async (index) => - { - var entityId = ReceiverEntity.GetEntityId(index); - var response = await client.ReadEntityStateAsync(entityId); - if (response.EntityExists) - { - lock (lockForUpdate) - { - numEvents += response.EntityState.EventCount; - bytesReceived += response.EntityState.BytesReceived; - earliestStart = Math.Min(earliestStart, response.EntityState.StartTime.Ticks); - latestUpdate = Math.Max(latestUpdate, response.EntityState.LastTime.Ticks); - gotTimeRange = true; - } - } - }); - - double elapsedSeconds = 0; - - if (gotTimeRange) - { - elapsedSeconds = (new DateTime(latestUpdate) - new DateTime(earliestStart)).TotalSeconds; - } - - string volume = $"{1.0 * bytesReceived / (1024 * 1024):F2}MB"; - - log.LogWarning($"Received a total of {numEvents} ({volume}) on {numEntities} entities in {elapsedSeconds:F2}s."); - - var resultObject = new - { - numEvents, - bytesReceived, - volume, - elapsedSeconds, - }; - - return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n"); - } - catch (Exception e) - { - return new ObjectResult(new { error = e.ToString() }); - } - } - } -} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Produce.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Produce.cs deleted file mode 100644 index 4d76833..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Produce.cs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventProducer -{ - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - using Microsoft.Azure.EventHubs; - using EventHubs; - using Microsoft.AspNetCore.Mvc; - using Microsoft.AspNetCore.Routing; - using Microsoft.Azure.WebJobs; - using Microsoft.AspNetCore.Http; - using Microsoft.Extensions.Logging; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Azure.WebJobs.Extensions.Http; - using Newtonsoft.Json; - using System.Linq; - using System.Diagnostics; - using System.IO; - - public static class Produce - { - // Populates the event hubs with events. This should be called prior to consuming the events. - // - // for example, to distribute 1000 events of 1k each over 12 partitions, call - // curl http://localhost:7071/eh/produce -d 1000x1024/12 - - [FunctionName("EH_" + nameof(Produce))] - public static async Task Run( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Produce))] HttpRequest req, - [DurableClient] IDurableClient client, - ILogger log) - { - try - { - string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - int xpos = requestBody.IndexOf('x'); - int spos = requestBody.IndexOf('/'); - int numEvents = int.Parse(requestBody.Substring(0, xpos)); - int payloadSize = int.Parse(requestBody.Substring(xpos + 1, spos - (xpos + 1))); - int numPartitions = int.Parse(requestBody.Substring(spos + 1)); - int producerBatchSize = 500 * 1024 / payloadSize; - string volume = $"{1.0 * payloadSize * numEvents / (1024 * 1024):F2}MB"; - - string connectionString = Environment.GetEnvironmentVariable(Parameters.EventHubsConnectionName); - await EventHubsUtil.EnsureEventHubExistsAsync(connectionString, Parameters.EventHubName, numPartitions); - var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString) - { - EntityPath = Parameters.EventHubName, - }; - var ehc = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString()); - var ehInfo = await ehc.GetRuntimeInformationAsync(); - - if (ehInfo.PartitionCount != numPartitions) - { - throw new ArgumentException("Wrong number of partitions"); - } - - async Task SendEventsAsync(int offset, int events, string partitionId) - { - var partitionSender = ehc.CreatePartitionSender(partitionId); - var eventBatch = new List(100); - for (int x = 0; x < events; x++) - { - byte[] payload = new byte[payloadSize]; - (new Random()).NextBytes(payload); - payload[0] = byte.Parse(partitionId); - var e = new EventData(payload); - eventBatch.Add(e); - - if (x % producerBatchSize == (producerBatchSize - 1)) - { - await partitionSender.SendAsync(eventBatch); - log.LogWarning($"Sent {x + 1} events to partition {partitionId}"); - eventBatch = new List(100); - } - } - if (eventBatch.Count > 0) - { - await partitionSender.SendAsync(eventBatch); - log.LogWarning($"Sent {events} events to partition {partitionId}"); - } - } - - log.LogWarning($"Sending {numEvents} EventHub events ({volume}) to {numPartitions} partitions"); - var partitionTasks = new List(); - var remaining = numEvents; - var stopwatch = new Stopwatch(); - stopwatch.Start(); - - for (int i = numPartitions; i >= 1; i--) - { - var portion = remaining / i; - remaining -= portion; - partitionTasks.Add(SendEventsAsync(remaining, portion, ehInfo.PartitionIds[i - 1])); - } - - await Task.WhenAll(partitionTasks); - - double elapsedSeconds = stopwatch.Elapsed.TotalSeconds; - - log.LogWarning($"Sent all {numEvents} EventHub events ({volume}) to {numPartitions} partitions in {elapsedSeconds}"); - - var resultObject = new { numEvents, volume, numPartitions, elapsedSeconds }; - return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n"); - } - catch (Exception e) - { - return new ObjectResult(new { error = e.ToString() }); - } - } - } -} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Producers.cs b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Producers.cs new file mode 100644 index 0000000..b8f9c51 --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/HttpTriggers/Producers.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventProducer +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using Microsoft.Azure.EventHubs; + using EventHubs; + using Microsoft.AspNetCore.Mvc; + using Microsoft.AspNetCore.Routing; + using Microsoft.Azure.WebJobs; + using Microsoft.AspNetCore.Http; + using Microsoft.Extensions.Logging; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Azure.WebJobs.Extensions.Http; + using Newtonsoft.Json; + using System.Linq; + using System.Diagnostics; + using System.IO; + using System.Net; + + public static class Producers + { + [FunctionName("EH_" + nameof(Producers))] + public static async Task Run( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/producers/{action}")] HttpRequest req, + [DurableClient] IDurableClient client, + string action, + ILogger log) + { + try + { + string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); + int numProducers = int.Parse(requestBody); + + switch (action) + { + case "query": + + object lockForUpdate = new object(); + + long sent = 0; + long exceptions = 0; + int active = 0; + DateTime? Starttime = null; + DateTime? LastUpdate = null; + + log.LogWarning("Checking the status of {NumProducers} producer entities...", numProducers); + await Enumerable.Range(0, numProducers).ParallelForEachAsync(500, true, async (partition) => + { + var entityId = ProducerEntity.GetEntityId(partition); + var response = await client.ReadEntityStateAsync(entityId); + if (response.EntityExists) + { + lock (lockForUpdate) + { + sent += response.EntityState.SentEvents; + exceptions += response.EntityState.Exceptions; + active += response.EntityState.IsActive ? 1 : 0; + + if (!Starttime.HasValue || response.EntityState.Starttime < Starttime) + { + Starttime = response.EntityState.Starttime; + } + if (!LastUpdate.HasValue || response.EntityState.LastUpdate > LastUpdate) + { + LastUpdate = response.EntityState.LastUpdate; + } + } + } + }); + + double? throughput = null; + if (Starttime.HasValue && LastUpdate.HasValue) + { + throughput = 1.0 * sent / (LastUpdate.Value - Starttime.Value).TotalSeconds; + } + + var resultObject = new + { + sent, + exceptions, + active, + throughput, + }; + + return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n"); + + case "start": + case "stop": + case "delete": + + await Enumerable.Range(0, numProducers).ParallelForEachAsync(500, true, async (number) => + { + var entityId = ProducerEntity.GetEntityId(number); + await client.SignalEntityAsync(entityId, action); + }); + + return new OkObjectResult($"sent {action} signal to {numProducers} producer entities\n"); + + default: + return new ObjectResult($"Unknown action: {action}\n") + { + StatusCode = (int)System.Net.HttpStatusCode.NotFound + }; + } + } + catch (Exception e) + { + return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } + } + } +} diff --git a/test/PerformanceTests/Benchmarks/EventHubs/Parameters.cs b/test/PerformanceTests/Benchmarks/EventHubs/Parameters.cs index 189939e..e1163fa 100644 --- a/test/PerformanceTests/Benchmarks/EventHubs/Parameters.cs +++ b/test/PerformanceTests/Benchmarks/EventHubs/Parameters.cs @@ -6,25 +6,44 @@ namespace PerformanceTests.EventHubs using System; using System.Collections.Generic; using System.Text; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Newtonsoft.Json; + using Microsoft.Identity.Client; - [JsonObject(MemberSerialization.OptOut)] public static class Parameters { - /// - /// The name of the event hubs connection - /// - public const string EventHubsConnectionName = "EventHubsConnection"; + // the name of the environment variable containing the event hubs connection string for the event hub + // that we are pushing events to and pulling events from + public const string EventHubsEnvVar = "EHNamespace"; - /// - /// The name of the Event Hub that connects the consumer and producer - /// - public const string EventHubName = "eventstest"; + // EventHubs dimensions + public const int MaxEventHubs = 10; + public const int MaxPartitionsPerEventHub = 32; + public const int MaxPartitions = MaxEventHubs * MaxPartitionsPerEventHub; + public const int MaxPullers = MaxEventHubs * MaxPartitionsPerEventHub; // max 1 puller per partition - /// - /// Routing function (determines target entity for an event) - /// - public static EntityId GetDestinationEntity(Event evt, int numberEntities) => ReceiverEntity.GetEntityId(Math.Abs((evt.Partition, evt.SeqNo).GetHashCode()) % numberEntities); + // Event content + public const int PayloadStringLength = 5; + + // Limit on the number of pending signals and orchestrations + // Serves as flow control to avoid pulling more from event hub than what can be processed + public const int PendingLimit = 3000; + + // Forwarding of events to entities + public static bool SendEntitySignalForEachEvent = true; + public const int NumberDestinationEntities = 100; + + // Starting of orchestrations for events + public static bool StartOrchestrationForEachEvent = false; + public static bool PlaceOrchestrationInstance = true; + + public static string EventHubConnectionString => Environment.GetEnvironmentVariable(EventHubsEnvVar); + + public static string EventHubName(int index) => $"hub{index}"; + + // assignment of puller entities to eventhubs partitions + public static string EventHubNameForPuller(int number) => EventHubName(number / MaxPartitionsPerEventHub); + public static string EventHubPartitionIdForPuller(int number) => (number % MaxPartitionsPerEventHub).ToString(); + + // assignment of producer entities to eventhubs + public static string EventHubNameForProducer(int number) => EventHubName((number / MaxPartitionsPerEventHub) % MaxEventHubs); } } \ No newline at end of file diff --git a/test/PerformanceTests/Benchmarks/EventHubs/PartitionReceiverEntity.cs b/test/PerformanceTests/Benchmarks/EventHubs/PartitionReceiverEntity.cs deleted file mode 100644 index d332e54..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/PartitionReceiverEntity.cs +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventHubs -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.IO; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Azure.Messaging.EventHubs; - using Azure.Messaging.EventHubs.Consumer; - using Azure.Messaging.EventHubs.Primitives; - using Azure.Messaging.EventHubs.Producer; - using Microsoft.Azure.WebJobs; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Extensions.Logging; - using Newtonsoft.Json; - - [JsonObject(MemberSerialization.OptIn)] - public class PartitionReceiverEntity - { - public static EntityId GetEntityId(string EventHubName, string partitionId) - { - return new EntityId(nameof(PartitionReceiverEntity), $"{EventHubName}!{int.Parse(partitionId):D2}"); - } - - [JsonProperty] - int NumEntities { get; set; } - - [JsonProperty] - string PartitionId { get; set; } - - [JsonProperty] - long Position { get; set; } - - [JsonProperty] - double BackoffSeconds { get; set; } - - [JsonProperty] - TimeSpan? IdleInterval { get; set; } - - [JsonProperty] - Guid InstanceGuid { get; set; } - - ILogger logger; - - readonly static ConcurrentDictionary cache = new ConcurrentDictionary(); - - public PartitionReceiverEntity(ILogger logger) - { - this.logger = logger; - } - - public ILogger Logger { set { this.logger = value; } } - - [JsonObject(MemberSerialization.OptOut)] - public struct StartParameters - { - public int NumEntities { get; set; } - - public string PartitionId { get; set; } - - public long NextSequenceNumberToReceive { get; set; } - - public TimeSpan? IdleInterval { get; set; } - } - - void ProcessEvent(EventData eventData) - { - byte[] payload = eventData.EventBody.ToArray(); - var evt = new Event() - { - Partition = payload[0], - SeqNo = eventData.SequenceNumber, - Payload = payload - }; - this.logger.LogDebug($"Sending signal for {evt}"); - Entity.Current.SignalEntity(Parameters.GetDestinationEntity(evt, this.NumEntities), nameof(ReceiverEntity.Receive), evt); - } - - public async Task Start(StartParameters parameters) - { - // close any receivers that may already exist because of a previous run - if (cache.TryRemove(this.InstanceGuid, out var receiver)) - { - await receiver.CloseAsync(); - } - - this.NumEntities = parameters.NumEntities; - this.PartitionId = parameters.PartitionId; - this.Position = parameters.NextSequenceNumberToReceive - 1; - this.IdleInterval = parameters.IdleInterval; - this.InstanceGuid = Guid.NewGuid(); - - await this.Continue(this.InstanceGuid); - } - - public async Task Delete() - { - if (cache.TryRemove(this.InstanceGuid, out var receiver)) - { - await receiver.CloseAsync(); - } - Entity.Current.DeleteState(); - } - - public async Task Continue(Guid instanceGuid) - { - try - { - if (this.InstanceGuid != instanceGuid) - { - // the signal is from an instance that was deleted or overwritten. Ignore it. - return; - } - - this.logger.LogInformation($"{Entity.Current.EntityKey} Continuing after {this.Position}"); - - PartitionReceiver receiver = cache.GetOrAdd(this.InstanceGuid, (partitionId) => - { - this.logger.LogDebug($"{Entity.Current.EntityKey} Creating PartitionReceiver"); - - return new PartitionReceiver( - EventHubConsumerClient.DefaultConsumerGroupName, - this.PartitionId, - EventPosition.FromSequenceNumber(this.Position, false), - Environment.GetEnvironmentVariable("EventHubsConnection"), - Parameters.EventHubName); - }); - - int batchSize = 100; - TimeSpan waitTime = TimeSpan.FromSeconds(1); - long? lastReceivedSequenceNumber = null; - - this.logger.LogDebug($"{Entity.Current.EntityKey} Receiving events after {this.Position}..."); - var eventBatch = await receiver.ReceiveBatchAsync(batchSize, waitTime); - this.logger.LogDebug($"{Entity.Current.EntityKey} ...response received."); - - foreach (EventData eventData in eventBatch) - { - try - { - this.ProcessEvent(eventData); - } - catch (Exception e) - { - this.logger.LogError($"{Entity.Current.EntityKey} Failed to process event {eventData.SequenceNumber}: {e}"); - } - lastReceivedSequenceNumber = eventData.SequenceNumber; - } - - - if (lastReceivedSequenceNumber.HasValue) - { - this.logger.LogInformation($"Processed {lastReceivedSequenceNumber.Value - this.Position} signals"); - this.Position = lastReceivedSequenceNumber.Value; - this.ScheduleContinuation(0); - } - else if (this.BackoffSeconds == 0) - { - this.ScheduleContinuation(1); - } - else if (this.BackoffSeconds < 10) - { - this.ScheduleContinuation(this.BackoffSeconds * 3); - } - else - { - cache.TryRemove(this.InstanceGuid, out _); - await receiver.CloseAsync(); - - if (this.IdleInterval.HasValue) - { - this.logger.LogInformation($"{Entity.Current.EntityKey} Going on standby for {this.IdleInterval.Value}"); - this.ScheduleContinuation(this.IdleInterval.Value.TotalSeconds); - } - else - { - this.logger.LogInformation($"{Entity.Current.EntityKey} is done"); - Entity.Current.DeleteState(); - return; - } - } - } - catch (Exception e) // TODO catch - { - this.logger.LogError($"{Entity.Current.EntityKey} failed: {e}"); - if (this.IdleInterval.HasValue) - { - this.ScheduleContinuation(this.IdleInterval.Value.TotalSeconds); - } - } - } - - void ScheduleContinuation(double BackoffSeconds) - { - if (BackoffSeconds == 0) - { - this.logger.LogDebug($"{Entity.Current.EntityKey} Scheduling continuation immediately"); - Entity.Current.SignalEntity( - Entity.Current.EntityId, - nameof(Continue), - this.InstanceGuid); - } - else - { - var targetTime = DateTime.UtcNow + TimeSpan.FromSeconds(BackoffSeconds); - this.logger.LogDebug($"{Entity.Current.EntityKey} Scheduling continuation delayed by {BackoffSeconds}s"); - Entity.Current.SignalEntity( - Entity.Current.EntityId, - targetTime, - nameof(Continue), - this.InstanceGuid); - } - this.BackoffSeconds = BackoffSeconds; - } - - [FunctionName(nameof(PartitionReceiverEntity))] - public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger) - { - return context.DispatchAsync(logger); - } - } -} - \ No newline at end of file diff --git a/test/PerformanceTests/Benchmarks/EventHubs/ProducerEntity.cs b/test/PerformanceTests/Benchmarks/EventHubs/ProducerEntity.cs new file mode 100644 index 0000000..931dec0 --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/ProducerEntity.cs @@ -0,0 +1,145 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventHubs +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using Azure.Messaging.EventHubs; + using Azure.Messaging.EventHubs.Producer; + using Microsoft.Azure.Documents.SystemFunctions; + using Microsoft.Azure.WebJobs; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + + [JsonObject(MemberSerialization.OptIn)] + public class ProducerEntity + { + public static EntityId GetEntityId(int number) + { + return new EntityId(nameof(ProducerEntity), $"!{number}"); + } + + [JsonProperty] + public bool IsActive { get; set; } + + [JsonProperty] + public long SentEvents { get; set; } + + [JsonProperty] + public int Exceptions { get; set; } + + [JsonProperty] + public DateTime? Starttime { get; set; } + + [JsonProperty] + public DateTime? LastUpdate { get; set; } + + readonly ILogger logger; + + public ProducerEntity(ILogger logger) + { + this.logger = logger; + } + + public Task Start() + { + this.IsActive = true; + this.Starttime = DateTime.UtcNow; + return this.ProduceMore(); + } + + public void Stop() + { + this.IsActive = false; + } + + public void Delete() + { + Entity.Current.DeleteState(); + } + + public string CreateRandomPayload(Random random, int length) + { + if (length == 0) + { + return string.Empty; + } + var sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.Append(Convert.ToChar(random.Next(0, 26) + 65)); + } + return sb.ToString(); + } + + async Task ProduceMore() + { + if (!this.IsActive) + { + return; + } + + var sw = new Stopwatch(); + sw.Start(); + + int myNumber = int.Parse(Entity.Current.EntityId.EntityKey.Substring(1)); + + Random random = new Random(myNumber); + + await using (var producer = new EventHubProducerClient(Parameters.EventHubConnectionString, Parameters.EventHubNameForProducer(myNumber))) + { + var r = new Random(); + + while (sw.Elapsed < TimeSpan.FromSeconds(3)) + { + try + { + using EventDataBatch eventBatch = await producer.CreateBatchAsync(); + using MemoryStream stream = new MemoryStream(); + int batchsize = 100; + for (int i = 0; i < batchsize; i++) + { + var evt = new Event() + { + Destination = r.Next(Parameters.NumberDestinationEntities), + Payload = this.CreateRandomPayload(random, Parameters.PayloadStringLength), + }; + eventBatch.TryAdd(new EventData(evt.ToBytes())); + } + + int numberEventsInBatch = eventBatch.Count; + await producer.SendAsync(eventBatch); + this.SentEvents += numberEventsInBatch; + } + + catch (Exception e) + { + this.Exceptions++; + this.logger.LogError("{entityId} Failed to send events: {exception}", Entity.Current.EntityId, e); + } + } + + this.LastUpdate = DateTime.UtcNow; + + // schedule a continuation to send more + Entity.Current.SignalEntity(Entity.Current.EntityId, nameof(ProduceMore)); + } + } + + [FunctionName(nameof(ProducerEntity))] + public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger) + { + return context.DispatchAsync(logger); + } + } +} + \ No newline at end of file diff --git a/test/PerformanceTests/Benchmarks/EventHubs/PullerEntity.cs b/test/PerformanceTests/Benchmarks/EventHubs/PullerEntity.cs new file mode 100644 index 0000000..c8a667a --- /dev/null +++ b/test/PerformanceTests/Benchmarks/EventHubs/PullerEntity.cs @@ -0,0 +1,225 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace PerformanceTests.EventHubs +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Azure.Messaging.EventHubs; + using Azure.Messaging.EventHubs.Consumer; + using Azure.Messaging.EventHubs.Primitives; + using Azure.Messaging.EventHubs.Producer; + using Microsoft.Azure.WebJobs; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + + [JsonObject(MemberSerialization.OptIn)] + public class PullerEntity + { + public static EntityId GetEntityId(int number) + { + return new EntityId(nameof(PullerEntity), $"!{number}"); + } + + readonly ILogger logger; + + public PullerEntity(ILogger logger) + { + this.logger = logger; + } + + // EventHub partition receivers are heavyweight; we use a cache so we can reuse them between entity invocations + readonly static ConcurrentDictionary cache = new ConcurrentDictionary(); + + + [JsonProperty] + public bool IsActive { get; set; } + + [JsonProperty] + public long ReceivePosition { get; set; } + + [JsonProperty] + public int NumPending { get; set; } + + [JsonProperty] + public DateTime? FirstReceived { get; set; } + + [JsonProperty] + public int Errors { get; set; } + + [JsonProperty] + public long TotalEventsPulled { get; set; } + + + public async Task Start() + { + this.IsActive = true; + await this.PullMore(); + } + + public async Task Stop() + { + int number = int.Parse(Entity.Current.EntityId.EntityKey.Substring(1)); + + try + { + if (cache.TryRemove(number, out var partitionReceiver)) + { + await partitionReceiver.DisposeAsync(); + } + } + catch(Exception e) + { + this.logger.LogWarning("{entityId} failed to dispose PartitionReceiver: {exception}", Entity.Current.EntityId, e); + } + + this.IsActive = false; + } + + public async Task Delete() + { + await this.Stop(); + Entity.Current.DeleteState(); + } + + public Task Ack() + { + this.NumPending--; + + return this.PullMore(); + } + + public async Task PullMore() + { + if (!this.IsActive) + { + return; // we are not currently active + } + + if (Entity.Current.BatchPosition < Entity.Current.BatchSize - 1) + { + return; // we have more operations in this batch to process, so do those first + } + + if (this.NumPending >= Parameters.PendingLimit) + { + return; // do not continue until we get more acks + } + + try + { + int myNumber = int.Parse(Entity.Current.EntityId.EntityKey.Substring(1)); + string myEntityId = Entity.Current.EntityId.ToString(); + + PartitionReceiver receiver = cache.GetOrAdd(myNumber, (partitionId) => + { + this.logger.LogDebug("{entityId} Creating PartitionReceiver at position {receivePosition}", myEntityId, this.ReceivePosition); + return new PartitionReceiver( + EventHubConsumerClient.DefaultConsumerGroupName, + Parameters.EventHubPartitionIdForPuller(myNumber), + EventPosition.FromSequenceNumber(this.ReceivePosition - 1, false), + Parameters.EventHubConnectionString, + Parameters.EventHubNameForPuller(myNumber)); + }); + + int batchSize = 1000; + TimeSpan waitTime = TimeSpan.FromSeconds(3); + + this.logger.LogDebug("{entityId} Receiving events from position {receivePosition}", myEntityId, this.ReceivePosition); + var eventBatch = await receiver.ReceiveBatchAsync(batchSize, waitTime); + DateTime timestamp = DateTime.UtcNow; + int numEventsInBatch = 0; + + foreach (EventData eventData in eventBatch) + { + numEventsInBatch++; + + if (eventData.SequenceNumber != this.ReceivePosition) + { + // for sanity, check that we received the next event in the sequence. + // this can fail if events were discarded internally by the EventHub. + this.logger.LogError("{entityId} Received wrong event, sequence number expected={expected} actual={actual}", myEntityId, this.ReceivePosition, eventData.SequenceNumber); + this.Errors++; + } + + if (Parameters.SendEntitySignalForEachEvent || Parameters.StartOrchestrationForEachEvent) + { + try + { + // parse the event data + var e = Event.FromStream(eventData.EventBody.ToStream()); + + if (Parameters.SendEntitySignalForEachEvent) + { + EntityId destinationEntityId = DestinationEntity.GetEntityId(e.Destination); + this.logger.LogDebug("{entityId} Sending signal to {destinationEntityId}", myEntityId, destinationEntityId); + Entity.Current.SignalEntity(destinationEntityId, nameof(DestinationEntity.Receive), (e, myNumber)); + this.NumPending++; + } + + if (Parameters.StartOrchestrationForEachEvent) + { + var instanceId = this.GetRandomOrchestrationInstanceId(myNumber); + this.logger.LogDebug("{entityId} Scheduling orchestration {instanceId}", myEntityId, instanceId); + Entity.Current.StartNewOrchestration(nameof(DestinationOrchestration), (e, myNumber), instanceId); + this.NumPending++; + } + } + catch (Exception e) + { + this.logger.LogError("{entityId} Failed to process event {sequenceNumber}: {e}", myEntityId, eventData.SequenceNumber, e); + } + } + + this.TotalEventsPulled++; + this.ReceivePosition = eventData.SequenceNumber + 1; + } + + this.logger.LogDebug("{entityId} Processed batch of {numEventsInBatch} events", myEntityId, numEventsInBatch); + + if (!this.FirstReceived.HasValue && this.TotalEventsPulled > 0) + { + this.FirstReceived = timestamp; + } + + // if we have not reached the limit of pending deliveries yet, continue + if (this.NumPending < Parameters.PendingLimit) + { + this.logger.LogDebug("{entityId} Scheduling continuation for position {receivePosition}", myEntityId, this.ReceivePosition); + Entity.Current.SignalEntity(Entity.Current.EntityId, nameof(PullMore)); + } + } + catch (Exception e) + { + this.logger.LogError("{entityId} failed: {exception}", Entity.Current.EntityId, e); + this.Errors++; + this.IsActive = false; + } + } + + string GetRandomOrchestrationInstanceId(int myNumber) + { + if (Parameters.PlaceOrchestrationInstance) + { + return $"{Guid.NewGuid():n}!{myNumber}"; + } + else + { + return $"{Guid.NewGuid():n}"; + } + } + + [FunctionName(nameof(PullerEntity))] + public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger) + { + return context.DispatchAsync(logger); + } + } +} + \ No newline at end of file diff --git a/test/PerformanceTests/Benchmarks/EventHubs/ReceiverEntity.cs b/test/PerformanceTests/Benchmarks/EventHubs/ReceiverEntity.cs deleted file mode 100644 index 3907051..0000000 --- a/test/PerformanceTests/Benchmarks/EventHubs/ReceiverEntity.cs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace PerformanceTests.EventHubs -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.IO; - using System.Threading.Tasks; - using Microsoft.Azure.WebJobs; - using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Extensions.Logging; - using Newtonsoft.Json; - - [JsonObject(MemberSerialization.OptIn)] - public class ReceiverEntity - { - public static EntityId GetEntityId(int index) => new EntityId(nameof(ReceiverEntity), index.ToString()); - - [JsonProperty] - public int EventCount { get; set; } - - [JsonProperty] - public DateTime StartTime { get; set; } - - [JsonProperty] - public DateTime LastTime { get; set; } - - [JsonProperty] - public long BytesReceived { get; set; } - - ILogger logger; - - public ReceiverEntity(ILogger logger) - { - this.logger = logger; - } - - public ILogger Logger { set { this.logger = value; } } - - public void Receive(Event evt) - { - this.LastTime = DateTime.UtcNow; - - if (this.EventCount++ == 0) - { - this.StartTime = this.LastTime; - } - - this.BytesReceived += evt.Payload.Length; - - if (this.logger.IsEnabled(LogLevel.Information)) - { - this.logger.LogInformation($"{Entity.Current.EntityId} Received event #{this.EventCount}: {evt}"); - } - } - - [FunctionName(nameof(ReceiverEntity))] - public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger) - { - return context.DispatchAsync(logger); - } - } -} - \ No newline at end of file diff --git a/test/PerformanceTests/Benchmarks/FanOutFanIn/FanOutFanInOrchestration.cs b/test/PerformanceTests/Benchmarks/FanOutFanIn/FanOutFanInOrchestration.cs index eb464ed..2b0f100 100644 --- a/test/PerformanceTests/Benchmarks/FanOutFanIn/FanOutFanInOrchestration.cs +++ b/test/PerformanceTests/Benchmarks/FanOutFanIn/FanOutFanInOrchestration.cs @@ -30,7 +30,7 @@ namespace PerformanceTests.FanOutFanIn Task[] tasks = new Task[count]; for (int i = 0; i < tasks.Length; i++) { - tasks[i] = context.CallActivityAsync("SayHello", i.ToString("00000")); + tasks[i] = context.CallActivityAsync("SayHello", i.ToString()); } await Task.WhenAll(tasks); diff --git a/test/PerformanceTests/Benchmarks/FanOutFanIn/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/FanOutFanIn/HttpTriggers.cs index 1f3b695..92bb368 100644 --- a/test/PerformanceTests/Benchmarks/FanOutFanIn/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/FanOutFanIn/HttpTriggers.cs @@ -17,6 +17,9 @@ namespace PerformanceTests.FanOutFanIn using System.Linq; using System.Collections.Concurrent; using System.Threading; + using Newtonsoft.Json.Linq; + using System.Net.Http; + using System.Net; /// /// A simple microbenchmark orchestration that executes activities in parallel. @@ -25,18 +28,35 @@ namespace PerformanceTests.FanOutFanIn { [FunctionName(nameof(FanOutFanIn))] public static async Task RunFanOutFanIn( - [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "fanoutfanin")] HttpRequest req, + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "fanoutfanin/{count}")] HttpRequest req, + int count, [DurableClient] IDurableClient client) { - // get the number of tasks - string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - int count = int.Parse(requestBody); + try + { + // start the orchestration + string orchestrationInstanceId = await client.StartNewAsync(nameof(FanOutFanInOrchestration), null, count); - // start the orchestration - string orchestrationInstanceId = await client.StartNewAsync(nameof(FanOutFanInOrchestration), null, count); + // wait for it to complete and return the result + var response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(600)); - // wait for it to complete and return the result - return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200)); + if (response is ObjectResult objectResult + && objectResult.Value is HttpResponseMessage responseMessage + && responseMessage.StatusCode == System.Net.HttpStatusCode.OK + && responseMessage.Content is StringContent stringContent) + { + var state = await client.GetStatusAsync(orchestrationInstanceId, false, false, false); + var elapsedSeconds = (state.LastUpdatedTime - state.CreatedTime).TotalSeconds; + var throughput = count / elapsedSeconds; + response = new OkObjectResult(new { elapsedSeconds, count, throughput }); + } + + return response; + } + catch (Exception e) + { + return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError }; + } } } } diff --git a/test/PerformanceTests/Benchmarks/FileHash/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/FileHash/HttpTriggers.cs index 1ef6d94..8dc1838 100644 --- a/test/PerformanceTests/Benchmarks/FileHash/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/FileHash/HttpTriggers.cs @@ -14,6 +14,7 @@ namespace PerformanceTests.FileHash using Microsoft.Extensions.Logging; using System.Net.Http; using Newtonsoft.Json.Linq; + using System.Net; public static class HttpTriggers { @@ -59,7 +60,7 @@ namespace PerformanceTests.FileHash } catch (Exception e) { - return new ObjectResult(new { error = e.ToString() }); + return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int) HttpStatusCode.InternalServerError }; } } } diff --git a/test/PerformanceTests/Benchmarks/HelloCities/HelloSequence.cs b/test/PerformanceTests/Benchmarks/HelloCities/HelloSequence.cs index 790bfb8..a78d7ce 100644 --- a/test/PerformanceTests/Benchmarks/HelloCities/HelloSequence.cs +++ b/test/PerformanceTests/Benchmarks/HelloCities/HelloSequence.cs @@ -17,6 +17,7 @@ namespace PerformanceTests.HelloCities using System.Linq; using System.Collections.Concurrent; using System.Threading; + using System.Text; /// /// A simple microbenchmark orchestration that executes several simple "hello" activities in a sequence. @@ -50,5 +51,21 @@ namespace PerformanceTests.HelloCities return outputs; } + + [FunctionName(nameof(HelloSequenceN))] + public static async Task HelloSequenceN([OrchestrationTrigger] IDurableOrchestrationContext context) + { + int numberSteps = context.GetInput(); + for (int i = 0; i < numberSteps; i++) + { + await context.CallActivityAsync(nameof(Activities.SayHello), $"City{i}"); + } + } + + [FunctionName(nameof(HelloSequence3Nested))] + public static async Task> HelloSequence3Nested([OrchestrationTrigger] IDurableOrchestrationContext context) + { + return await context.CallSubOrchestratorAsync>(nameof(HelloSequence3), null); + } } } diff --git a/test/PerformanceTests/Benchmarks/HelloCities/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/HelloCities/HttpTriggers.cs index 048d8b1..2af15d8 100644 --- a/test/PerformanceTests/Benchmarks/HelloCities/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/HelloCities/HttpTriggers.cs @@ -17,6 +17,8 @@ namespace PerformanceTests.HelloCities using System.Linq; using System.Collections.Concurrent; using System.Threading; + using Newtonsoft.Json.Linq; + using System.Net.Http; /// /// A simple microbenchmark orchestration that some trivial activities in a sequence. @@ -48,5 +50,50 @@ namespace PerformanceTests.HelloCities // wait for it to complete and return the result return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200)); } + + [FunctionName(nameof(HelloCitiesN))] + public static async Task HelloCitiesN( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "hellocities/{count}")] HttpRequest req, + [DurableClient] IDurableClient client, + int count, + ILogger log) + { + // start the orchestration + string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence.HelloSequenceN), null, count); + + // wait for it to complete and return the result + var response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(500)); + + if (response is ObjectResult objectResult + && objectResult.Value is HttpResponseMessage responseMessage + && responseMessage.StatusCode == System.Net.HttpStatusCode.OK + && responseMessage.Content is StringContent stringContent) + { + var state = await client.GetStatusAsync(orchestrationInstanceId, false, false, false); + var elapsedSeconds = (state.LastUpdatedTime - state.CreatedTime).TotalSeconds; + var throughput = count / elapsedSeconds; + response = new OkObjectResult(new { elapsedSeconds, count, throughput }); + } + + return response; + } + + [FunctionName(nameof(HelloCities3Nested))] + public static async Task HelloCities3Nested( + [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req, + [DurableClient] IDurableClient client, + ILogger log) + { + // start the orchestration + string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence.HelloSequence3Nested)); + + // wait for it to complete + await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200)); + + DurableOrchestrationStatus status = await client.GetStatusAsync(orchestrationInstanceId, true, true, true); + + // return the history + return new OkObjectResult(status.History); + } } -} +} \ No newline at end of file diff --git a/test/PerformanceTests/host.json b/test/PerformanceTests/host.json index 05db8f8..dc2cb59 100644 --- a/test/PerformanceTests/host.json +++ b/test/PerformanceTests/host.json @@ -29,9 +29,12 @@ "DurableTask.Netherite.Client": "Warning", "DurableTask.Netherite.LoadMonitor": "Warning" }, + "applicationInsights": { - "sampling": { - "isEnabled": false + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 1, + "excludedTypes": "Exception" } } }, diff --git a/test/PerformanceTests/scripts/deploy.ps1 b/test/PerformanceTests/scripts/deploy.ps1 index d129334..9da3cf6 100644 --- a/test/PerformanceTests/scripts/deploy.ps1 +++ b/test/PerformanceTests/scripts/deploy.ps1 @@ -26,7 +26,11 @@ if ($DeployCode) } if (-not ($MaxA -eq "")) { - $hostconf.extensions.durableTask | Add-Member -NotePropertyName "maxConcurrentActivityFunctions" -NotePropertyValue $MaxA + $hostconf.extensions.durableTask | Add-Member -Force -NotePropertyName "maxConcurrentActivityFunctions" -NotePropertyValue $MaxA + } + if (-not ($MaxO -eq "")) + { + $hostconf.extensions.durableTask | Add-Member -Force -NotePropertyName "maxConcurrentOrchestratorFunctions" -NotePropertyValue $MaxO } $hostconf | ConvertTo-Json -depth 32 | set-content "./bin/$Configuration/net6.0/host.json" @@ -52,7 +56,7 @@ if (-not ((az functionapp list -g $groupName --query "[].name"| ConvertFrom-Json az functionapp config set -n $functionAppName -g $groupName --use-32bit-worker-process false az functionapp config appsettings set -n $functionAppName -g $groupName --settings EventHubsConnection=$eventHubsConnectionString - az functionapp config appsettings set -n $functionAppName -g $groupName --settings CorpusConnection=$corpusConnectionString + az functionapp config appsettings set -n $functionAppName -g $groupName --settings EHNamespace=$Env:EHNamespace } else {