New EventHubs performance tests (#178)

* rebase, and remove changes to scale monitor

* add hello cities test that prints the history of a nested orchestration
This commit is contained in:
Sebastian Burckhardt 2023-10-12 14:22:33 -07:00 коммит произвёл GitHub
Родитель 2ae01f892b
Коммит 83d16d2bd9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
28 изменённых файлов: 1088 добавлений и 883 удалений

Просмотреть файл

@ -15,6 +15,7 @@ namespace PerformanceTests.Bank
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Linq;
using System.Net;
/// <summary>
/// A microbenchmark using durable entities for bank accounts, and an orchestration with a critical section for transferring
@ -37,7 +38,7 @@ namespace PerformanceTests.Bank
}
catch (Exception e)
{
return new ObjectResult(e.ToString());
return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}

Просмотреть файл

@ -13,7 +13,7 @@ namespace PerformanceTests.CollisionSearch
using System.Linq;
/// <summary>
/// An orchestration that searches for hash collisions using a recursive divide-and-conquer algorithm.
/// An orchestration that searches for hash collisions using the simple fan-out-fan-in pattern.
/// </summary>
public static class FlatParallelSearch
{

Просмотреть файл

@ -13,6 +13,7 @@ namespace PerformanceTests.CollisionSearch
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Net.Http;
using Newtonsoft.Json.Linq;
using System.Net;
public static class HttpTriggers
{
@ -79,7 +80,7 @@ namespace PerformanceTests.CollisionSearch
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}

Просмотреть файл

@ -24,25 +24,36 @@ namespace PerformanceTests.Orchestrations.Counter
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
[JsonProperty("modified")]
public DateTime? StartTime { get; set; }
public DateTime LastModified { get; set; }
public void Add(int amount)
{
this.CurrentValue += amount;
this.LastModified = DateTime.UtcNow;
this.UpdateTimestamps();
}
public void Reset()
void UpdateTimestamps()
{
this.CurrentValue = 0;
if (!this.StartTime.HasValue)
{
this.StartTime = DateTime.UtcNow;
}
this.LastModified = DateTime.UtcNow;
}
public (int, DateTime) Get() => (this.CurrentValue, DateTime.UtcNow);
public void Crash(DateTime timeStamp)
{
// crash if less that 4 seconds have passed since the signal was sent
// (so that we no longer crash when retrying after recovery)
if ((DateTime.UtcNow - timeStamp) < TimeSpan.FromSeconds(4))
{
System.Environment.Exit(333);
}
}
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)

Просмотреть файл

@ -7,6 +7,7 @@ namespace PerformanceTests.Orchestrations.Counter
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
@ -18,191 +19,128 @@ namespace PerformanceTests.Orchestrations.Counter
public static class HttpTriggers
{
class Input
{
public string Key { get; set; }
public int Expected { get; set; }
}
[FunctionName(nameof(WaitForCount))]
public static async Task<IActionResult> WaitForCount(
[HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = nameof(WaitForCount))] HttpRequest req,
[DurableClient] IDurableClient client)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
var input = JsonConvert.DeserializeObject<Input>(requestBody);
var entityId = new EntityId("Counter", input.Key);
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
// poll the entity until the expected count is reached
while (stopwatch.Elapsed < TimeSpan.FromMinutes(5))
{
var response = await client.ReadEntityStateAsync<Counter>(entityId);
if (response.EntityExists
&& response.EntityState.CurrentValue >= input.Expected)
{
return new OkObjectResult($"{JsonConvert.SerializeObject(response.EntityState)}\n");
}
await Task.Delay(TimeSpan.FromSeconds(2));
}
return new OkObjectResult("timed out.\n");
}
[FunctionName(nameof(Increment))]
public static async Task<IActionResult> Increment(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = nameof(Increment))] HttpRequest req,
[FunctionName(nameof(Add))]
public static async Task<IActionResult> Add(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "counter/{key}/add")] HttpRequest req,
string key,
[DurableClient] IDurableClient client)
{
try
{
string entityKey = await new StreamReader(req.Body).ReadToEndAsync();
var entityId = new EntityId("Counter", entityKey);
await client.SignalEntityAsync(entityId, "add", 1);
return new OkObjectResult($"increment was sent to {entityId}.\n");
string input = await new StreamReader(req.Body).ReadToEndAsync();
int amount = int.Parse(input);
var entityId = new EntityId("Counter", key);
await client.SignalEntityAsync(entityId, "add", amount);
return new OkObjectResult($"add({amount}) was sent to {entityId}.\n");
}
catch (Exception e)
{
return new OkObjectResult(e.ToString());
return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
[FunctionName(nameof(ReadCounter))]
public static async Task<IActionResult> ReadCounter(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "counter/{key}")] HttpRequest req,
string key,
[DurableClient] IDurableClient client)
{
try
{
var entityId = new EntityId("Counter", key);
var response = await client.ReadEntityStateAsync<Counter>(entityId);
if (!response.EntityExists)
{
return new NotFoundObjectResult($"no such entity: {entityId}");
}
else
{
return new OkObjectResult(response.EntityState);
}
}
catch (Exception e)
{
return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
[FunctionName(nameof(DeleteCounter))]
public static async Task<IActionResult> DeleteCounter(
[HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route = "counter/{key}")] HttpRequest req,
string key,
[DurableClient] IDurableClient client)
{
try
{
var entityId = new EntityId("Counter", key);
await client.SignalEntityAsync(entityId, "delete");
return new OkObjectResult($"delete was sent to {entityId}.\n");
}
catch (Exception e)
{
return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
[FunctionName(nameof(CrashCounter))]
public static async Task<IActionResult> CrashCounter(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "counter/{key}/crash")] HttpRequest req,
string key,
[DurableClient] IDurableClient client)
{
try
{
var entityId = new EntityId("Counter", key);
await client.SignalEntityAsync(entityId, "crash", DateTime.UtcNow);
return new OkObjectResult($"crash was sent to {entityId}.\n");
}
catch (Exception e)
{
return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
[FunctionName(nameof(CountSignals))]
public static async Task<IActionResult> CountSignals(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = nameof(CountSignals))] HttpRequest req,
[DurableClient] IDurableClient client)
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "signal-counter/{count}")] HttpRequest req,
int count,
[DurableClient] IDurableClient client)
{
try
{
int numberSignals = int.Parse(await new StreamReader(req.Body).ReadToEndAsync());
var entityId = new EntityId("Counter", Guid.NewGuid().ToString("N"));
string key = Guid.NewGuid().ToString("N");
var entityId = new EntityId("Counter", key);
DateTime startTime = DateTime.UtcNow;
// send the specified number of signals to the entity
await SendIncrementSignals(client, numberSignals, 50, (i) => entityId);
// send the specified number of signals to the counter entity
// for better send throughput we do not send signals one at a time, but use parallel tasks
await Enumerable.Range(0, count).ParallelForEachAsync(50, true, i => client.SignalEntityAsync(entityId, "add", 1));
// poll the entity until the expected count is reached
while ((DateTime.UtcNow - startTime) < TimeSpan.FromMinutes(5))
while ((DateTime.UtcNow - startTime) < TimeSpan.FromMinutes(3))
{
var response = await client.ReadEntityStateAsync<Counter>(entityId);
if (response.EntityExists
&& response.EntityState.CurrentValue == numberSignals)
&& response.EntityState.CurrentValue == count)
{
return new OkObjectResult($"received {numberSignals} signals in {(response.EntityState.LastModified - startTime).TotalSeconds:F1}s.\n");
var state = response.EntityState;
var elapsedSeconds = (state.LastModified - state.StartTime.Value).TotalSeconds;
var throughput = count / elapsedSeconds;
return new OkObjectResult(new { elapsedSeconds, count, throughput });
}
await Task.Delay(TimeSpan.FromSeconds(2));
await Task.Delay(TimeSpan.FromSeconds(5));
}
return new OkObjectResult($"timed out after {(DateTime.UtcNow - startTime)}.\n");
return new OkObjectResult($"timed out after {(DateTime.UtcNow - startTime)}.\n") { StatusCode = (int)HttpStatusCode.RequestTimeout };
}
catch (Exception e)
{
return new OkObjectResult(e.ToString());
}
}
[FunctionName(nameof(CountParallelSignals))]
public static async Task<IActionResult> CountParallelSignals(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = nameof(CountParallelSignals))] HttpRequest req,
[DurableClient] IDurableClient client)
{
try
{
// input is of the form "nnn,mmm"
// nnn - number of signals to send
// mmm - number of entities to distribute the signals over
string input = await new StreamReader(req.Body).ReadToEndAsync();
int commaPosition = input.IndexOf(',');
int numberSignals = int.Parse(input.Substring(0, commaPosition));
int numberEntities = int.Parse(input.Substring(commaPosition + 1));
var entityPrefix = Guid.NewGuid().ToString("N");
EntityId MakeEntityId(int i) => new EntityId("Counter", $"{entityPrefix}-!{i:D6}");
DateTime startTime = DateTime.UtcNow;
if (numberSignals % numberEntities != 0)
{
throw new ArgumentException("numberSignals must be a multiple of numberEntities");
}
// send the specified number of signals to the entity
await SendIncrementSignals(client, numberSignals, 50, (i) => MakeEntityId(i % numberEntities));
// poll the entities until the expected count is reached
async Task<double?> WaitForCount(int i)
{
var random = new Random();
while ((DateTime.UtcNow - startTime) < TimeSpan.FromMinutes(5))
{
var response = await client.ReadEntityStateAsync<Counter>(MakeEntityId(i));
if (response.EntityExists
&& response.EntityState.CurrentValue == numberSignals / numberEntities)
{
return (response.EntityState.LastModified - startTime).TotalSeconds;
}
await Task.Delay(TimeSpan.FromSeconds(2 + random.NextDouble()));
}
return null;
};
var waitTasks = Enumerable.Range(0, numberEntities).Select(i => WaitForCount(i)).ToList();
await Task.WhenAll(waitTasks);
var results = waitTasks.Select(t => t.Result);
if (results.Any(result => result == null))
{
return new OkObjectResult($"timed out after {(DateTime.UtcNow - startTime)}.\n");
}
return new OkObjectResult($"received {numberSignals} signals on {numberEntities} entities in {results.Max():F1}s.\n");
}
catch (Exception e)
{
return new OkObjectResult(e.ToString());
}
}
static async Task SendIncrementSignals(IDurableClient client, int numberSignals, int maxConcurrency, Func<int, EntityId> entityIdFactory)
{
// send the specified number of signals to the entity
// for better throughput we do this in parallel
using var semaphore = new SemaphoreSlim(maxConcurrency);
for (int i = 0; i < numberSignals; i++)
{
var entityId = entityIdFactory(i);
await semaphore.WaitAsync();
var task = Task.Run(async () =>
{
try
{
await client.SignalEntityAsync(entityId, "add", 1);
}
finally
{
semaphore.Release();
}
});
}
// wait for tasks to complete
for (int i = 0; i < maxConcurrency; i++)
{
await semaphore.WaitAsync();
return new ObjectResult(e.ToString()) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,63 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
[JsonObject(MemberSerialization.OptIn)]
public class DestinationEntity
{
public static EntityId GetEntityId(int number) => new EntityId(nameof(DestinationEntity), $"!{number}");
[JsonProperty]
public int EventCount { get; set; }
[JsonProperty]
public DateTime? LastUpdated { get; set; }
ILogger logger;
public DestinationEntity(ILogger logger)
{
this.logger = logger;
}
public ILogger Logger { set { this.logger = value; } }
public void Delete()
{
Entity.Current.DeleteState();
}
public void Receive((Event evt, int receiverPartition) input)
{
this.EventCount++;
if (this.logger.IsEnabled(LogLevel.Debug))
{
this.logger.LogInformation("{entityId} Received signal #{eventCount}", Entity.Current.EntityId, this.EventCount);
}
this.LastUpdated = DateTime.UtcNow;
// send an ack to the receiver entity
Entity.Current.SignalEntity(PullerEntity.GetEntityId(input.receiverPartition), nameof(PullerEntity.Ack));
}
[FunctionName(nameof(DestinationEntity))]
public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger)
{
return context.DispatchAsync<DestinationEntity>(logger);
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
public static class DestinationOrchestration
{
[FunctionName(nameof(DestinationOrchestration))]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
var input = context.GetInput<(Event evt, int receiverPartition)> ();
logger.LogInformation("Started orchestration {instanceId}", context.InstanceId);
// execute an activity with the event payload as the input
await context.CallActivityAsync<string>(nameof(Activities.SayHello), input.evt.Payload);
logger.LogInformation("Completed orchestration {instanceId}", context.InstanceId);
// when this orchestration completes, we notify the puller entity
context.SignalEntity(PullerEntity.GetEntityId(input.receiverPartition), nameof(PullerEntity.Ack));
}
}
}

Просмотреть файл

@ -5,21 +5,33 @@ namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Newtonsoft.Json;
[JsonObject(MemberSerialization.OptOut)]
public struct Event
{
public int Partition { get; set; }
public int Destination { get; set; }
public string Payload { get; set; }
public long SeqNo { get; set; }
public byte[] Payload { get; set; }
public override string ToString()
public static Event FromStream(Stream s)
{
return $"Event {this.Partition}.{this.SeqNo} size={this.Payload.Length}";
var r = new BinaryReader(s);
return new Event
{
Destination = r.ReadInt32(),
Payload = r.ReadString(),
};
}
public byte[] ToBytes()
{
var m = new MemoryStream();
var r = new BinaryWriter(m);
r.Write(this.Destination);
r.Write(this.Payload);
return m.ToArray();
}
}
}

Просмотреть файл

@ -1,68 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
public static class EventHubTrigger
{
// must set this manually, and comment out the [Disable], to test the trigger
const int numEntities = 1000;
[Disable]
[FunctionName("MyEventHubTrigger")]
public static async Task RunAsync(
[EventHubTrigger(Parameters.EventHubName, Connection = "EventHubsConnection")] EventData[] eventDataSet,
[DurableClient] IDurableClient client,
ILogger log)
{
int numTries = 0;
while (numTries < 10)
{
numTries++;
try
{
var signalTasks = new List<Task>();
// send one signal for each packet
for (int i = 0; i < eventDataSet.Length; i++)
{
var eventData = eventDataSet[i];
var sp = eventData.SystemProperties;
byte[] payload = eventData.Body.ToArray();
var evt = new Event()
{
Partition = payload[0],
SeqNo = eventData.SystemProperties.SequenceNumber,
Payload = payload
};
log.LogDebug($"Sending signal for {evt}");
signalTasks.Add(client.SignalEntityAsync(Parameters.GetDestinationEntity(evt, numEntities), nameof(ReceiverEntity.Receive), evt));
}
log.LogInformation($"Sent {eventDataSet.Length} signals");
await Task.WhenAll(signalTasks);
break;
}
catch (NullReferenceException)
{
// during startup we may get these exceptions if the trigger goes off before the
// Orchestration service has been started
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using System.Net;
public static class Channels
{
[FunctionName("EH_" + nameof(Channels))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/channels/{action}")] HttpRequest req,
[DurableClient] IDurableClient client,
string action,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numEventHubs = int.Parse(requestBody);
if (numEventHubs < 1 || numEventHubs > Parameters.MaxEventHubs)
{
return new ObjectResult("invalid number of event hubs.\n") { StatusCode = (int)HttpStatusCode.BadRequest };
}
switch (action)
{
case "create":
log.LogWarning("Creating {numEventHubs} EventHubs...", numEventHubs);
int numCreated = 0;
await Enumerable.Range(0, numEventHubs).ParallelForEachAsync(200, false, async (index) =>
{
if (await EventHubsUtil.EnsureEventHubExistsAsync(Parameters.EventHubConnectionString, Parameters.EventHubName(index), Parameters.MaxPartitionsPerEventHub))
{
numCreated++;
}
});
return new OkObjectResult($"Created {numCreated} EventHubs.\n");
case "delete":
log.LogWarning("Deleting {numEventHubs} EventHubs...", numEventHubs);
int numDeleted = 0;
await Enumerable.Range(0, numEventHubs).ParallelForEachAsync(200, false, async (index) =>
{
if (await EventHubsUtil.DeleteEventHubIfExistsAsync(Parameters.EventHubConnectionString, Parameters.EventHubName(index)))
{
numDeleted++;
}
});
return new OkObjectResult($"Deleted {numDeleted} EventHubs.\n");
default:
return new ObjectResult($"Unknown action: {action}\n")
{
StatusCode = (int)System.Net.HttpStatusCode.NotFound
};
}
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}
}

Просмотреть файл

@ -1,58 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
public static class Clear
{
[FunctionName("EH_" + nameof(Clear))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Clear))] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numEntities = int.Parse(requestBody);
log.LogWarning($"Deleting {numEntities + 32} entities...");
await Enumerable.Range(0, numEntities + 32).ParallelForEachAsync(200, true, async (index) =>
{
if (index <= numEntities)
{
var entityId = ReceiverEntity.GetEntityId(index);
await client.SignalEntityAsync(entityId, "delete");
}
else
{
int partition = index - numEntities;
await client.SignalEntityAsync(PartitionReceiverEntity.GetEntityId(Parameters.EventHubName, partition.ToString()), "delete");
}
});
return new OkObjectResult($"Deleted {numEntities + 32} entities.\n");
}
catch (Exception e)
{
return new ObjectResult(new {
error = e.ToString()
});
}
}
}
}

Просмотреть файл

@ -1,59 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using System.Collections.Generic;
public static class Consume
{
[FunctionName("EH_" + nameof(Consume))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Consume))] HttpRequest req,
[DurableClient] IDurableClient client)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numEntities = int.Parse(requestBody);
var connectionString = Environment.GetEnvironmentVariable("EventHubsConnection");
var eventHubName = Parameters.EventHubName;
var producer = new EventHubProducerClient(connectionString, eventHubName);
var partitionIds = await producer.GetPartitionIdsAsync();
List<Task> signalTasks = new List<Task>();
foreach (var partitionId in partitionIds)
{
var partitionProperties = await producer.GetPartitionPropertiesAsync(partitionId);
var startParameters = new PartitionReceiverEntity.StartParameters()
{
NumEntities = numEntities,
PartitionId = partitionId,
NextSequenceNumberToReceive = 0,
};
signalTasks.Add(client.SignalEntityAsync(PartitionReceiverEntity.GetEntityId(eventHubName, partitionId), nameof(PartitionReceiverEntity.Start), startParameters));
}
await Task.WhenAll(signalTasks);
return new OkObjectResult($"Started PartitionReceiverEntities for {signalTasks.Count} partitions.\n");
}
catch (Exception e)
{
return new ObjectResult(new {
error = e.ToString()
});
}
}
}
}

Просмотреть файл

@ -0,0 +1,147 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
public static class Consumers
{
[FunctionName("EH_" + nameof(Consumers))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/consumers/{action}")] HttpRequest req,
[DurableClient] IDurableClient client,
string action,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numPullers = int.Parse(requestBody);
if (numPullers < 1 || numPullers > Parameters.MaxPartitions)
{
return new ObjectResult("invalid number of consumers.\n") { StatusCode = (int)HttpStatusCode.BadRequest };
}
switch (action)
{
case "query":
object lockForUpdate = new object();
int delivered = 0;
long pulled = 0;
int pending = 0;
int active = 0;
DateTime? firstReceived = null;
DateTime? lastUpdated = null;
int errors = 0;
log.LogWarning($"Checking the status of {numPullers} puller entities...");
await Enumerable.Range(0, numPullers).ParallelForEachAsync(500, true, async (partition) =>
{
var entityId = PullerEntity.GetEntityId(partition);
var response = await client.ReadEntityStateAsync<PullerEntity>(entityId);
if (response.EntityExists)
{
lock (lockForUpdate)
{
pulled += response.EntityState.TotalEventsPulled;
pending += response.EntityState.NumPending;
active += response.EntityState.IsActive ? 1 : 0;
errors += response.EntityState.Errors;
if (!firstReceived.HasValue || response.EntityState.FirstReceived < firstReceived)
{
firstReceived = response.EntityState.FirstReceived;
}
}
}
});
log.LogWarning($"Checking the status of {Parameters.NumberDestinationEntities} destination entities...");
await Enumerable.Range(0, Parameters.NumberDestinationEntities).ParallelForEachAsync(500, true, async (destination) =>
{
var entityId = DestinationEntity.GetEntityId(destination);
var response = await client.ReadEntityStateAsync<DestinationEntity>(entityId);
if (response.EntityExists)
{
lock (lockForUpdate)
{
delivered += response.EntityState.EventCount;
if (!lastUpdated.HasValue || response.EntityState.LastUpdated > lastUpdated)
{
lastUpdated = response.EntityState.LastUpdated;
}
}
}
});
TimeSpan? duration = lastUpdated - firstReceived;
var resultObject = new
{
pending,
active,
delivered,
pulled,
errors,
firstReceived,
lastUpdated,
duration
};
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
case "start":
case "stop":
await Enumerable.Range(0, numPullers).ParallelForEachAsync(200, true, async (number) =>
{
var entityId = PullerEntity.GetEntityId(number);
await client.SignalEntityAsync(entityId, action);
});
return new OkObjectResult($"Sent {action} signal to {numPullers} puller entities.\n");
case "delete":
await Enumerable.Range(0, numPullers + Parameters.NumberDestinationEntities).ParallelForEachAsync(500, true, async (i) =>
{
var entityId = i < numPullers ? PullerEntity.GetEntityId(i) : DestinationEntity.GetEntityId(i - numPullers);
await client.SignalEntityAsync(entityId, action);
});
return new OkObjectResult($"Sent delete signal to {numPullers} puller entities and {Parameters.NumberDestinationEntities} destination entities.\n");
default:
return new ObjectResult($"Unknown action: {action}\n")
{
StatusCode = (int)System.Net.HttpStatusCode.NotFound
};
}
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}
}

Просмотреть файл

@ -1,89 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using System.Linq;
using Newtonsoft.Json;
public static class Count
{
[FunctionName("EH_" + nameof(Count))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Count))] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numEntities = int.Parse(requestBody);
int numEvents = 0;
long earliestStart = long.MaxValue;
long latestUpdate = 0;
long bytesReceived = 0;
bool gotTimeRange = false;
object lockForUpdate = new object();
var tasks = new List<Task<bool>>();
log.LogWarning($"Checking the status of {numEntities} entities...");
await Enumerable.Range(0, numEntities).ParallelForEachAsync(200, true, async (index) =>
{
var entityId = ReceiverEntity.GetEntityId(index);
var response = await client.ReadEntityStateAsync<ReceiverEntity>(entityId);
if (response.EntityExists)
{
lock (lockForUpdate)
{
numEvents += response.EntityState.EventCount;
bytesReceived += response.EntityState.BytesReceived;
earliestStart = Math.Min(earliestStart, response.EntityState.StartTime.Ticks);
latestUpdate = Math.Max(latestUpdate, response.EntityState.LastTime.Ticks);
gotTimeRange = true;
}
}
});
double elapsedSeconds = 0;
if (gotTimeRange)
{
elapsedSeconds = (new DateTime(latestUpdate) - new DateTime(earliestStart)).TotalSeconds;
}
string volume = $"{1.0 * bytesReceived / (1024 * 1024):F2}MB";
log.LogWarning($"Received a total of {numEvents} ({volume}) on {numEntities} entities in {elapsedSeconds:F2}s.");
var resultObject = new
{
numEvents,
bytesReceived,
volume,
elapsedSeconds,
};
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
}
}

Просмотреть файл

@ -1,115 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventProducer
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using EventHubs;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Routing;
using Microsoft.Azure.WebJobs;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Newtonsoft.Json;
using System.Linq;
using System.Diagnostics;
using System.IO;
public static class Produce
{
// Populates the event hubs with events. This should be called prior to consuming the events.
//
// for example, to distribute 1000 events of 1k each over 12 partitions, call
// curl http://localhost:7071/eh/produce -d 1000x1024/12
[FunctionName("EH_" + nameof(Produce))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/" + nameof(Produce))] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int xpos = requestBody.IndexOf('x');
int spos = requestBody.IndexOf('/');
int numEvents = int.Parse(requestBody.Substring(0, xpos));
int payloadSize = int.Parse(requestBody.Substring(xpos + 1, spos - (xpos + 1)));
int numPartitions = int.Parse(requestBody.Substring(spos + 1));
int producerBatchSize = 500 * 1024 / payloadSize;
string volume = $"{1.0 * payloadSize * numEvents / (1024 * 1024):F2}MB";
string connectionString = Environment.GetEnvironmentVariable(Parameters.EventHubsConnectionName);
await EventHubsUtil.EnsureEventHubExistsAsync(connectionString, Parameters.EventHubName, numPartitions);
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = Parameters.EventHubName,
};
var ehc = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
var ehInfo = await ehc.GetRuntimeInformationAsync();
if (ehInfo.PartitionCount != numPartitions)
{
throw new ArgumentException("Wrong number of partitions");
}
async Task SendEventsAsync(int offset, int events, string partitionId)
{
var partitionSender = ehc.CreatePartitionSender(partitionId);
var eventBatch = new List<EventData>(100);
for (int x = 0; x < events; x++)
{
byte[] payload = new byte[payloadSize];
(new Random()).NextBytes(payload);
payload[0] = byte.Parse(partitionId);
var e = new EventData(payload);
eventBatch.Add(e);
if (x % producerBatchSize == (producerBatchSize - 1))
{
await partitionSender.SendAsync(eventBatch);
log.LogWarning($"Sent {x + 1} events to partition {partitionId}");
eventBatch = new List<EventData>(100);
}
}
if (eventBatch.Count > 0)
{
await partitionSender.SendAsync(eventBatch);
log.LogWarning($"Sent {events} events to partition {partitionId}");
}
}
log.LogWarning($"Sending {numEvents} EventHub events ({volume}) to {numPartitions} partitions");
var partitionTasks = new List<Task>();
var remaining = numEvents;
var stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = numPartitions; i >= 1; i--)
{
var portion = remaining / i;
remaining -= portion;
partitionTasks.Add(SendEventsAsync(remaining, portion, ehInfo.PartitionIds[i - 1]));
}
await Task.WhenAll(partitionTasks);
double elapsedSeconds = stopwatch.Elapsed.TotalSeconds;
log.LogWarning($"Sent all {numEvents} EventHub events ({volume}) to {numPartitions} partitions in {elapsedSeconds}");
var resultObject = new { numEvents, volume, numPartitions, elapsedSeconds };
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
}
}

Просмотреть файл

@ -0,0 +1,116 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventProducer
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using EventHubs;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Routing;
using Microsoft.Azure.WebJobs;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Newtonsoft.Json;
using System.Linq;
using System.Diagnostics;
using System.IO;
using System.Net;
public static class Producers
{
[FunctionName("EH_" + nameof(Producers))]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "eh/producers/{action}")] HttpRequest req,
[DurableClient] IDurableClient client,
string action,
ILogger log)
{
try
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numProducers = int.Parse(requestBody);
switch (action)
{
case "query":
object lockForUpdate = new object();
long sent = 0;
long exceptions = 0;
int active = 0;
DateTime? Starttime = null;
DateTime? LastUpdate = null;
log.LogWarning("Checking the status of {NumProducers} producer entities...", numProducers);
await Enumerable.Range(0, numProducers).ParallelForEachAsync(500, true, async (partition) =>
{
var entityId = ProducerEntity.GetEntityId(partition);
var response = await client.ReadEntityStateAsync<ProducerEntity>(entityId);
if (response.EntityExists)
{
lock (lockForUpdate)
{
sent += response.EntityState.SentEvents;
exceptions += response.EntityState.Exceptions;
active += response.EntityState.IsActive ? 1 : 0;
if (!Starttime.HasValue || response.EntityState.Starttime < Starttime)
{
Starttime = response.EntityState.Starttime;
}
if (!LastUpdate.HasValue || response.EntityState.LastUpdate > LastUpdate)
{
LastUpdate = response.EntityState.LastUpdate;
}
}
}
});
double? throughput = null;
if (Starttime.HasValue && LastUpdate.HasValue)
{
throughput = 1.0 * sent / (LastUpdate.Value - Starttime.Value).TotalSeconds;
}
var resultObject = new
{
sent,
exceptions,
active,
throughput,
};
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
case "start":
case "stop":
case "delete":
await Enumerable.Range(0, numProducers).ParallelForEachAsync(500, true, async (number) =>
{
var entityId = ProducerEntity.GetEntityId(number);
await client.SignalEntityAsync(entityId, action);
});
return new OkObjectResult($"sent {action} signal to {numProducers} producer entities\n");
default:
return new ObjectResult($"Unknown action: {action}\n")
{
StatusCode = (int)System.Net.HttpStatusCode.NotFound
};
}
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}
}

Просмотреть файл

@ -6,25 +6,44 @@ namespace PerformanceTests.EventHubs
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Newtonsoft.Json;
using Microsoft.Identity.Client;
[JsonObject(MemberSerialization.OptOut)]
public static class Parameters
{
/// <summary>
/// The name of the event hubs connection
/// </summary>
public const string EventHubsConnectionName = "EventHubsConnection";
// the name of the environment variable containing the event hubs connection string for the event hub
// that we are pushing events to and pulling events from
public const string EventHubsEnvVar = "EHNamespace";
/// <summary>
/// The name of the Event Hub that connects the consumer and producer
/// </summary>
public const string EventHubName = "eventstest";
// EventHubs dimensions
public const int MaxEventHubs = 10;
public const int MaxPartitionsPerEventHub = 32;
public const int MaxPartitions = MaxEventHubs * MaxPartitionsPerEventHub;
public const int MaxPullers = MaxEventHubs * MaxPartitionsPerEventHub; // max 1 puller per partition
/// <summary>
/// Routing function (determines target entity for an event)
/// </summary>
public static EntityId GetDestinationEntity(Event evt, int numberEntities) => ReceiverEntity.GetEntityId(Math.Abs((evt.Partition, evt.SeqNo).GetHashCode()) % numberEntities);
// Event content
public const int PayloadStringLength = 5;
// Limit on the number of pending signals and orchestrations
// Serves as flow control to avoid pulling more from event hub than what can be processed
public const int PendingLimit = 3000;
// Forwarding of events to entities
public static bool SendEntitySignalForEachEvent = true;
public const int NumberDestinationEntities = 100;
// Starting of orchestrations for events
public static bool StartOrchestrationForEachEvent = false;
public static bool PlaceOrchestrationInstance = true;
public static string EventHubConnectionString => Environment.GetEnvironmentVariable(EventHubsEnvVar);
public static string EventHubName(int index) => $"hub{index}";
// assignment of puller entities to eventhubs partitions
public static string EventHubNameForPuller(int number) => EventHubName(number / MaxPartitionsPerEventHub);
public static string EventHubPartitionIdForPuller(int number) => (number % MaxPartitionsPerEventHub).ToString();
// assignment of producer entities to eventhubs
public static string EventHubNameForProducer(int number) => EventHubName((number / MaxPartitionsPerEventHub) % MaxEventHubs);
}
}

Просмотреть файл

@ -1,228 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
[JsonObject(MemberSerialization.OptIn)]
public class PartitionReceiverEntity
{
public static EntityId GetEntityId(string EventHubName, string partitionId)
{
return new EntityId(nameof(PartitionReceiverEntity), $"{EventHubName}!{int.Parse(partitionId):D2}");
}
[JsonProperty]
int NumEntities { get; set; }
[JsonProperty]
string PartitionId { get; set; }
[JsonProperty]
long Position { get; set; }
[JsonProperty]
double BackoffSeconds { get; set; }
[JsonProperty]
TimeSpan? IdleInterval { get; set; }
[JsonProperty]
Guid InstanceGuid { get; set; }
ILogger logger;
readonly static ConcurrentDictionary<Guid, PartitionReceiver> cache = new ConcurrentDictionary<Guid, PartitionReceiver>();
public PartitionReceiverEntity(ILogger logger)
{
this.logger = logger;
}
public ILogger Logger { set { this.logger = value; } }
[JsonObject(MemberSerialization.OptOut)]
public struct StartParameters
{
public int NumEntities { get; set; }
public string PartitionId { get; set; }
public long NextSequenceNumberToReceive { get; set; }
public TimeSpan? IdleInterval { get; set; }
}
void ProcessEvent(EventData eventData)
{
byte[] payload = eventData.EventBody.ToArray();
var evt = new Event()
{
Partition = payload[0],
SeqNo = eventData.SequenceNumber,
Payload = payload
};
this.logger.LogDebug($"Sending signal for {evt}");
Entity.Current.SignalEntity(Parameters.GetDestinationEntity(evt, this.NumEntities), nameof(ReceiverEntity.Receive), evt);
}
public async Task Start(StartParameters parameters)
{
// close any receivers that may already exist because of a previous run
if (cache.TryRemove(this.InstanceGuid, out var receiver))
{
await receiver.CloseAsync();
}
this.NumEntities = parameters.NumEntities;
this.PartitionId = parameters.PartitionId;
this.Position = parameters.NextSequenceNumberToReceive - 1;
this.IdleInterval = parameters.IdleInterval;
this.InstanceGuid = Guid.NewGuid();
await this.Continue(this.InstanceGuid);
}
public async Task Delete()
{
if (cache.TryRemove(this.InstanceGuid, out var receiver))
{
await receiver.CloseAsync();
}
Entity.Current.DeleteState();
}
public async Task Continue(Guid instanceGuid)
{
try
{
if (this.InstanceGuid != instanceGuid)
{
// the signal is from an instance that was deleted or overwritten. Ignore it.
return;
}
this.logger.LogInformation($"{Entity.Current.EntityKey} Continuing after {this.Position}");
PartitionReceiver receiver = cache.GetOrAdd(this.InstanceGuid, (partitionId) =>
{
this.logger.LogDebug($"{Entity.Current.EntityKey} Creating PartitionReceiver");
return new PartitionReceiver(
EventHubConsumerClient.DefaultConsumerGroupName,
this.PartitionId,
EventPosition.FromSequenceNumber(this.Position, false),
Environment.GetEnvironmentVariable("EventHubsConnection"),
Parameters.EventHubName);
});
int batchSize = 100;
TimeSpan waitTime = TimeSpan.FromSeconds(1);
long? lastReceivedSequenceNumber = null;
this.logger.LogDebug($"{Entity.Current.EntityKey} Receiving events after {this.Position}...");
var eventBatch = await receiver.ReceiveBatchAsync(batchSize, waitTime);
this.logger.LogDebug($"{Entity.Current.EntityKey} ...response received.");
foreach (EventData eventData in eventBatch)
{
try
{
this.ProcessEvent(eventData);
}
catch (Exception e)
{
this.logger.LogError($"{Entity.Current.EntityKey} Failed to process event {eventData.SequenceNumber}: {e}");
}
lastReceivedSequenceNumber = eventData.SequenceNumber;
}
if (lastReceivedSequenceNumber.HasValue)
{
this.logger.LogInformation($"Processed {lastReceivedSequenceNumber.Value - this.Position} signals");
this.Position = lastReceivedSequenceNumber.Value;
this.ScheduleContinuation(0);
}
else if (this.BackoffSeconds == 0)
{
this.ScheduleContinuation(1);
}
else if (this.BackoffSeconds < 10)
{
this.ScheduleContinuation(this.BackoffSeconds * 3);
}
else
{
cache.TryRemove(this.InstanceGuid, out _);
await receiver.CloseAsync();
if (this.IdleInterval.HasValue)
{
this.logger.LogInformation($"{Entity.Current.EntityKey} Going on standby for {this.IdleInterval.Value}");
this.ScheduleContinuation(this.IdleInterval.Value.TotalSeconds);
}
else
{
this.logger.LogInformation($"{Entity.Current.EntityKey} is done");
Entity.Current.DeleteState();
return;
}
}
}
catch (Exception e) // TODO catch
{
this.logger.LogError($"{Entity.Current.EntityKey} failed: {e}");
if (this.IdleInterval.HasValue)
{
this.ScheduleContinuation(this.IdleInterval.Value.TotalSeconds);
}
}
}
void ScheduleContinuation(double BackoffSeconds)
{
if (BackoffSeconds == 0)
{
this.logger.LogDebug($"{Entity.Current.EntityKey} Scheduling continuation immediately");
Entity.Current.SignalEntity(
Entity.Current.EntityId,
nameof(Continue),
this.InstanceGuid);
}
else
{
var targetTime = DateTime.UtcNow + TimeSpan.FromSeconds(BackoffSeconds);
this.logger.LogDebug($"{Entity.Current.EntityKey} Scheduling continuation delayed by {BackoffSeconds}s");
Entity.Current.SignalEntity(
Entity.Current.EntityId,
targetTime,
nameof(Continue),
this.InstanceGuid);
}
this.BackoffSeconds = BackoffSeconds;
}
[FunctionName(nameof(PartitionReceiverEntity))]
public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger)
{
return context.DispatchAsync<PartitionReceiverEntity>(logger);
}
}
}

Просмотреть файл

@ -0,0 +1,145 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Documents.SystemFunctions;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
[JsonObject(MemberSerialization.OptIn)]
public class ProducerEntity
{
public static EntityId GetEntityId(int number)
{
return new EntityId(nameof(ProducerEntity), $"!{number}");
}
[JsonProperty]
public bool IsActive { get; set; }
[JsonProperty]
public long SentEvents { get; set; }
[JsonProperty]
public int Exceptions { get; set; }
[JsonProperty]
public DateTime? Starttime { get; set; }
[JsonProperty]
public DateTime? LastUpdate { get; set; }
readonly ILogger logger;
public ProducerEntity(ILogger logger)
{
this.logger = logger;
}
public Task Start()
{
this.IsActive = true;
this.Starttime = DateTime.UtcNow;
return this.ProduceMore();
}
public void Stop()
{
this.IsActive = false;
}
public void Delete()
{
Entity.Current.DeleteState();
}
public string CreateRandomPayload(Random random, int length)
{
if (length == 0)
{
return string.Empty;
}
var sb = new StringBuilder();
for (int i = 0; i < length; i++)
{
sb.Append(Convert.ToChar(random.Next(0, 26) + 65));
}
return sb.ToString();
}
async Task ProduceMore()
{
if (!this.IsActive)
{
return;
}
var sw = new Stopwatch();
sw.Start();
int myNumber = int.Parse(Entity.Current.EntityId.EntityKey.Substring(1));
Random random = new Random(myNumber);
await using (var producer = new EventHubProducerClient(Parameters.EventHubConnectionString, Parameters.EventHubNameForProducer(myNumber)))
{
var r = new Random();
while (sw.Elapsed < TimeSpan.FromSeconds(3))
{
try
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
using MemoryStream stream = new MemoryStream();
int batchsize = 100;
for (int i = 0; i < batchsize; i++)
{
var evt = new Event()
{
Destination = r.Next(Parameters.NumberDestinationEntities),
Payload = this.CreateRandomPayload(random, Parameters.PayloadStringLength),
};
eventBatch.TryAdd(new EventData(evt.ToBytes()));
}
int numberEventsInBatch = eventBatch.Count;
await producer.SendAsync(eventBatch);
this.SentEvents += numberEventsInBatch;
}
catch (Exception e)
{
this.Exceptions++;
this.logger.LogError("{entityId} Failed to send events: {exception}", Entity.Current.EntityId, e);
}
}
this.LastUpdate = DateTime.UtcNow;
// schedule a continuation to send more
Entity.Current.SignalEntity(Entity.Current.EntityId, nameof(ProduceMore));
}
}
[FunctionName(nameof(ProducerEntity))]
public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger)
{
return context.DispatchAsync<ProducerEntity>(logger);
}
}
}

Просмотреть файл

@ -0,0 +1,225 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
[JsonObject(MemberSerialization.OptIn)]
public class PullerEntity
{
public static EntityId GetEntityId(int number)
{
return new EntityId(nameof(PullerEntity), $"!{number}");
}
readonly ILogger logger;
public PullerEntity(ILogger logger)
{
this.logger = logger;
}
// EventHub partition receivers are heavyweight; we use a cache so we can reuse them between entity invocations
readonly static ConcurrentDictionary<int, PartitionReceiver> cache = new ConcurrentDictionary<int, PartitionReceiver>();
[JsonProperty]
public bool IsActive { get; set; }
[JsonProperty]
public long ReceivePosition { get; set; }
[JsonProperty]
public int NumPending { get; set; }
[JsonProperty]
public DateTime? FirstReceived { get; set; }
[JsonProperty]
public int Errors { get; set; }
[JsonProperty]
public long TotalEventsPulled { get; set; }
public async Task Start()
{
this.IsActive = true;
await this.PullMore();
}
public async Task Stop()
{
int number = int.Parse(Entity.Current.EntityId.EntityKey.Substring(1));
try
{
if (cache.TryRemove(number, out var partitionReceiver))
{
await partitionReceiver.DisposeAsync();
}
}
catch(Exception e)
{
this.logger.LogWarning("{entityId} failed to dispose PartitionReceiver: {exception}", Entity.Current.EntityId, e);
}
this.IsActive = false;
}
public async Task Delete()
{
await this.Stop();
Entity.Current.DeleteState();
}
public Task Ack()
{
this.NumPending--;
return this.PullMore();
}
public async Task PullMore()
{
if (!this.IsActive)
{
return; // we are not currently active
}
if (Entity.Current.BatchPosition < Entity.Current.BatchSize - 1)
{
return; // we have more operations in this batch to process, so do those first
}
if (this.NumPending >= Parameters.PendingLimit)
{
return; // do not continue until we get more acks
}
try
{
int myNumber = int.Parse(Entity.Current.EntityId.EntityKey.Substring(1));
string myEntityId = Entity.Current.EntityId.ToString();
PartitionReceiver receiver = cache.GetOrAdd(myNumber, (partitionId) =>
{
this.logger.LogDebug("{entityId} Creating PartitionReceiver at position {receivePosition}", myEntityId, this.ReceivePosition);
return new PartitionReceiver(
EventHubConsumerClient.DefaultConsumerGroupName,
Parameters.EventHubPartitionIdForPuller(myNumber),
EventPosition.FromSequenceNumber(this.ReceivePosition - 1, false),
Parameters.EventHubConnectionString,
Parameters.EventHubNameForPuller(myNumber));
});
int batchSize = 1000;
TimeSpan waitTime = TimeSpan.FromSeconds(3);
this.logger.LogDebug("{entityId} Receiving events from position {receivePosition}", myEntityId, this.ReceivePosition);
var eventBatch = await receiver.ReceiveBatchAsync(batchSize, waitTime);
DateTime timestamp = DateTime.UtcNow;
int numEventsInBatch = 0;
foreach (EventData eventData in eventBatch)
{
numEventsInBatch++;
if (eventData.SequenceNumber != this.ReceivePosition)
{
// for sanity, check that we received the next event in the sequence.
// this can fail if events were discarded internally by the EventHub.
this.logger.LogError("{entityId} Received wrong event, sequence number expected={expected} actual={actual}", myEntityId, this.ReceivePosition, eventData.SequenceNumber);
this.Errors++;
}
if (Parameters.SendEntitySignalForEachEvent || Parameters.StartOrchestrationForEachEvent)
{
try
{
// parse the event data
var e = Event.FromStream(eventData.EventBody.ToStream());
if (Parameters.SendEntitySignalForEachEvent)
{
EntityId destinationEntityId = DestinationEntity.GetEntityId(e.Destination);
this.logger.LogDebug("{entityId} Sending signal to {destinationEntityId}", myEntityId, destinationEntityId);
Entity.Current.SignalEntity(destinationEntityId, nameof(DestinationEntity.Receive), (e, myNumber));
this.NumPending++;
}
if (Parameters.StartOrchestrationForEachEvent)
{
var instanceId = this.GetRandomOrchestrationInstanceId(myNumber);
this.logger.LogDebug("{entityId} Scheduling orchestration {instanceId}", myEntityId, instanceId);
Entity.Current.StartNewOrchestration(nameof(DestinationOrchestration), (e, myNumber), instanceId);
this.NumPending++;
}
}
catch (Exception e)
{
this.logger.LogError("{entityId} Failed to process event {sequenceNumber}: {e}", myEntityId, eventData.SequenceNumber, e);
}
}
this.TotalEventsPulled++;
this.ReceivePosition = eventData.SequenceNumber + 1;
}
this.logger.LogDebug("{entityId} Processed batch of {numEventsInBatch} events", myEntityId, numEventsInBatch);
if (!this.FirstReceived.HasValue && this.TotalEventsPulled > 0)
{
this.FirstReceived = timestamp;
}
// if we have not reached the limit of pending deliveries yet, continue
if (this.NumPending < Parameters.PendingLimit)
{
this.logger.LogDebug("{entityId} Scheduling continuation for position {receivePosition}", myEntityId, this.ReceivePosition);
Entity.Current.SignalEntity(Entity.Current.EntityId, nameof(PullMore));
}
}
catch (Exception e)
{
this.logger.LogError("{entityId} failed: {exception}", Entity.Current.EntityId, e);
this.Errors++;
this.IsActive = false;
}
}
string GetRandomOrchestrationInstanceId(int myNumber)
{
if (Parameters.PlaceOrchestrationInstance)
{
return $"{Guid.NewGuid():n}!{myNumber}";
}
else
{
return $"{Guid.NewGuid():n}";
}
}
[FunctionName(nameof(PullerEntity))]
public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger)
{
return context.DispatchAsync<PullerEntity>(logger);
}
}
}

Просмотреть файл

@ -1,66 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace PerformanceTests.EventHubs
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
[JsonObject(MemberSerialization.OptIn)]
public class ReceiverEntity
{
public static EntityId GetEntityId(int index) => new EntityId(nameof(ReceiverEntity), index.ToString());
[JsonProperty]
public int EventCount { get; set; }
[JsonProperty]
public DateTime StartTime { get; set; }
[JsonProperty]
public DateTime LastTime { get; set; }
[JsonProperty]
public long BytesReceived { get; set; }
ILogger logger;
public ReceiverEntity(ILogger logger)
{
this.logger = logger;
}
public ILogger Logger { set { this.logger = value; } }
public void Receive(Event evt)
{
this.LastTime = DateTime.UtcNow;
if (this.EventCount++ == 0)
{
this.StartTime = this.LastTime;
}
this.BytesReceived += evt.Payload.Length;
if (this.logger.IsEnabled(LogLevel.Information))
{
this.logger.LogInformation($"{Entity.Current.EntityId} Received event #{this.EventCount}: {evt}");
}
}
[FunctionName(nameof(ReceiverEntity))]
public static Task Run([EntityTrigger] IDurableEntityContext context, ILogger logger)
{
return context.DispatchAsync<ReceiverEntity>(logger);
}
}
}

Просмотреть файл

@ -30,7 +30,7 @@ namespace PerformanceTests.FanOutFanIn
Task[] tasks = new Task[count];
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = context.CallActivityAsync<string>("SayHello", i.ToString("00000"));
tasks[i] = context.CallActivityAsync<string>("SayHello", i.ToString());
}
await Task.WhenAll(tasks);

Просмотреть файл

@ -17,6 +17,9 @@ namespace PerformanceTests.FanOutFanIn
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
using Newtonsoft.Json.Linq;
using System.Net.Http;
using System.Net;
/// <summary>
/// A simple microbenchmark orchestration that executes activities in parallel.
@ -25,18 +28,35 @@ namespace PerformanceTests.FanOutFanIn
{
[FunctionName(nameof(FanOutFanIn))]
public static async Task<IActionResult> RunFanOutFanIn(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "fanoutfanin")] HttpRequest req,
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "fanoutfanin/{count}")] HttpRequest req,
int count,
[DurableClient] IDurableClient client)
{
// get the number of tasks
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int count = int.Parse(requestBody);
try
{
// start the orchestration
string orchestrationInstanceId = await client.StartNewAsync(nameof(FanOutFanInOrchestration), null, count);
// start the orchestration
string orchestrationInstanceId = await client.StartNewAsync(nameof(FanOutFanInOrchestration), null, count);
// wait for it to complete and return the result
var response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(600));
// wait for it to complete and return the result
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200));
if (response is ObjectResult objectResult
&& objectResult.Value is HttpResponseMessage responseMessage
&& responseMessage.StatusCode == System.Net.HttpStatusCode.OK
&& responseMessage.Content is StringContent stringContent)
{
var state = await client.GetStatusAsync(orchestrationInstanceId, false, false, false);
var elapsedSeconds = (state.LastUpdatedTime - state.CreatedTime).TotalSeconds;
var throughput = count / elapsedSeconds;
response = new OkObjectResult(new { elapsedSeconds, count, throughput });
}
return response;
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int)HttpStatusCode.InternalServerError };
}
}
}
}

Просмотреть файл

@ -14,6 +14,7 @@ namespace PerformanceTests.FileHash
using Microsoft.Extensions.Logging;
using System.Net.Http;
using Newtonsoft.Json.Linq;
using System.Net;
public static class HttpTriggers
{
@ -59,7 +60,7 @@ namespace PerformanceTests.FileHash
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
return new ObjectResult(new { error = e.ToString() }) { StatusCode = (int) HttpStatusCode.InternalServerError };
}
}
}

Просмотреть файл

@ -17,6 +17,7 @@ namespace PerformanceTests.HelloCities
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
using System.Text;
/// <summary>
/// A simple microbenchmark orchestration that executes several simple "hello" activities in a sequence.
@ -50,5 +51,21 @@ namespace PerformanceTests.HelloCities
return outputs;
}
[FunctionName(nameof(HelloSequenceN))]
public static async Task HelloSequenceN([OrchestrationTrigger] IDurableOrchestrationContext context)
{
int numberSteps = context.GetInput<int>();
for (int i = 0; i < numberSteps; i++)
{
await context.CallActivityAsync<string>(nameof(Activities.SayHello), $"City{i}");
}
}
[FunctionName(nameof(HelloSequence3Nested))]
public static async Task<List<string>> HelloSequence3Nested([OrchestrationTrigger] IDurableOrchestrationContext context)
{
return await context.CallSubOrchestratorAsync<List<string>>(nameof(HelloSequence3), null);
}
}
}

Просмотреть файл

@ -17,6 +17,8 @@ namespace PerformanceTests.HelloCities
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
using Newtonsoft.Json.Linq;
using System.Net.Http;
/// <summary>
/// A simple microbenchmark orchestration that some trivial activities in a sequence.
@ -48,5 +50,50 @@ namespace PerformanceTests.HelloCities
// wait for it to complete and return the result
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200));
}
[FunctionName(nameof(HelloCitiesN))]
public static async Task<IActionResult> HelloCitiesN(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "hellocities/{count}")] HttpRequest req,
[DurableClient] IDurableClient client,
int count,
ILogger log)
{
// start the orchestration
string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence.HelloSequenceN), null, count);
// wait for it to complete and return the result
var response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(500));
if (response is ObjectResult objectResult
&& objectResult.Value is HttpResponseMessage responseMessage
&& responseMessage.StatusCode == System.Net.HttpStatusCode.OK
&& responseMessage.Content is StringContent stringContent)
{
var state = await client.GetStatusAsync(orchestrationInstanceId, false, false, false);
var elapsedSeconds = (state.LastUpdatedTime - state.CreatedTime).TotalSeconds;
var throughput = count / elapsedSeconds;
response = new OkObjectResult(new { elapsedSeconds, count, throughput });
}
return response;
}
[FunctionName(nameof(HelloCities3Nested))]
public static async Task<IActionResult> HelloCities3Nested(
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
// start the orchestration
string orchestrationInstanceId = await client.StartNewAsync(nameof(HelloSequence.HelloSequence3Nested));
// wait for it to complete
await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200));
DurableOrchestrationStatus status = await client.GetStatusAsync(orchestrationInstanceId, true, true, true);
// return the history
return new OkObjectResult(status.History);
}
}
}
}

Просмотреть файл

@ -29,9 +29,12 @@
"DurableTask.Netherite.Client": "Warning",
"DurableTask.Netherite.LoadMonitor": "Warning"
},
"applicationInsights": {
"sampling": {
"isEnabled": false
"samplingSettings": {
"isEnabled": true,
"maxTelemetryItemsPerSecond": 1,
"excludedTypes": "Exception"
}
}
},

Просмотреть файл

@ -26,7 +26,11 @@ if ($DeployCode)
}
if (-not ($MaxA -eq ""))
{
$hostconf.extensions.durableTask | Add-Member -NotePropertyName "maxConcurrentActivityFunctions" -NotePropertyValue $MaxA
$hostconf.extensions.durableTask | Add-Member -Force -NotePropertyName "maxConcurrentActivityFunctions" -NotePropertyValue $MaxA
}
if (-not ($MaxO -eq ""))
{
$hostconf.extensions.durableTask | Add-Member -Force -NotePropertyName "maxConcurrentOrchestratorFunctions" -NotePropertyValue $MaxO
}
$hostconf | ConvertTo-Json -depth 32 | set-content "./bin/$Configuration/net6.0/host.json"
@ -52,7 +56,7 @@ if (-not ((az functionapp list -g $groupName --query "[].name"| ConvertFrom-Json
az functionapp config set -n $functionAppName -g $groupName --use-32bit-worker-process false
az functionapp config appsettings set -n $functionAppName -g $groupName --settings EventHubsConnection=$eventHubsConnectionString
az functionapp config appsettings set -n $functionAppName -g $groupName --settings CorpusConnection=$corpusConnectionString
az functionapp config appsettings set -n $functionAppName -g $groupName --settings EHNamespace=$Env:EHNamespace
}
else
{