[Internal] Benchmark tool: Fixes code refractoring to model the metrics as EventSource (#4040)
* Adding metrics for Benchmark tool. * Adding OpenTelemetry. * Revert "Adding OpenTelemetry." This reverts commitc7da088469
. * Telemetry for windowed percentiles. * OpenTelemetry, AppInsights and Dashboard. * Removing DiagnosticDataListener. * Code styling, comments and clean-up. * Fixing issues with dashboard. * Fixing positions of charts on the dashboard. * Fixing the dashboard. * Updating titles and subtitles. * Removing ILogger and other not required references. * Fixing code review points. * Fixing issues after rebase. * Removing unnecessary changes. * Fixing code review points. * Adding metrics for Benchmark tool. * Adding OpenTelemetry. * Revert "Adding OpenTelemetry." This reverts commitc7da088469
. * Telemetry for windowed percentiles. * OpenTelemetry, AppInsights and Dashboard. * Removing DiagnosticDataListener. * Code styling, comments and clean-up. * Fixing issues with dashboard. * Fixing positions of charts on the dashboard. * Fixing the dashboard. * Updating titles and subtitles. * Removing ILogger and other not required references. * Fixing code review points. * Fixing issues after rebase. * Removing unnecessary changes. * Fixing code review points. * Fixing code review points. * make MetrcisCollectorProvider non static and remove locks * fix * fixes * use static class name TelemetrySpan.IncludePercentile * use app insights connection string * modified: Microsoft.Azure.Cosmos.Samples/Tools/Benchmark/Program.cs * modified: Microsoft.Azure.Cosmos.Samples/Tools/Benchmark/Program.cs * rename AppInsightsConnectionString * fix * fix comments * fix if AppInsights c string is not set * summary * fix * remove unnecessary collector types * remove unnecesary metere provicer * add event source * remove folder * fix * split success and failed latencies * Code refractor to use EvenSource design pattern for metrics * Fixing build breaks * Removing BenchmarkExecutionEventSource * Fixign misc things * Some extra cleanup * use TimeSpan except milliseconds * fix metrics publication * fix metrics publication * move tests to benchmark folder * move back benchmark test * use background task for flushing metrics * remove sync metrics flushing * split failed and success operations * fix latenclies charts * fix benchmark run command * remove ShouldUnsetParentConfigurationAndPlatform=false --------- Co-authored-by: Mikhail Lipin <v-milipin@microsoft.com> Co-authored-by: David Chaava <chaava01@gmail.com> Co-authored-by: David Chaava <v-dchaava@microsoft.com>
This commit is contained in:
Родитель
4111caed99
Коммит
b909bd7ee4
|
@ -4,9 +4,6 @@ packages:
|
|||
- azure-cli
|
||||
|
||||
runcmd:
|
||||
- wget https://aka.ms/downloadazcopy-v10-linux
|
||||
- tar -xvf downloadazcopy-v10-linux
|
||||
- sudo cp ./azcopy_linux_amd64_*/azcopy /usr/bin/
|
||||
- wget https://packages.microsoft.com/config/ubuntu/18.04/packages-microsoft-prod.deb
|
||||
- sudo dpkg -i packages-microsoft-prod.deb
|
||||
- sudo apt update
|
||||
|
|
|
@ -45,5 +45,17 @@ namespace CosmosBenchmark
|
|||
this.WriteEvent(1, dbName, containerName, durationInMs, lazyDiagnostics());
|
||||
}
|
||||
}
|
||||
|
||||
[Event(2, Level = EventLevel.Informational)]
|
||||
public void OnOperationSuccess(int operationType, double durationInMs)
|
||||
{
|
||||
this.WriteEvent(2, operationType, durationInMs);
|
||||
}
|
||||
|
||||
[Event(3, Level = EventLevel.Informational)]
|
||||
public void OnOperationFailure(int operationType, double durationInMs)
|
||||
{
|
||||
this.WriteEvent(3, operationType, durationInMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
namespace CosmosBenchmark
|
||||
{
|
||||
using System;
|
||||
using System.Diagnostics.Tracing;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using OpenTelemetry.Metrics;
|
||||
|
||||
internal class CosmosBenchmarkEventListener : EventListener
|
||||
{
|
||||
static readonly string CosmosBenchmarkEventSourceName = "Azure.Cosmos.Benchmark";
|
||||
|
||||
private readonly MeterProvider meterProvider;
|
||||
private readonly MetricsCollector[] metricsCollectors;
|
||||
private readonly MetricCollectionWindow metricCollectionWindow;
|
||||
private const int WindowCheckInterval = 10;
|
||||
|
||||
public CosmosBenchmarkEventListener(MeterProvider meterProvider, BenchmarkConfig config)
|
||||
{
|
||||
this.meterProvider = meterProvider;
|
||||
this.metricCollectionWindow ??= new MetricCollectionWindow(config.MetricsReportingIntervalInSec);
|
||||
|
||||
this.metricsCollectors = new MetricsCollector[Enum.GetValues<BenchmarkOperationType>().Length];
|
||||
foreach (BenchmarkOperationType entry in Enum.GetValues<BenchmarkOperationType>())
|
||||
{
|
||||
this.metricsCollectors[(int)entry] = new MetricsCollector(entry);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Flush metrics every <see cref="AppConfig.MetricsReportingIntervalInSec"/>
|
||||
/// </summary>
|
||||
ThreadPool.QueueUserWorkItem(async state =>
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
// Reset metricCollectionWindow and flush.
|
||||
if (this.metricCollectionWindow.IsInvalid())
|
||||
{
|
||||
this.meterProvider.ForceFlush();
|
||||
this.metricCollectionWindow.Reset();
|
||||
}
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(CosmosBenchmarkEventListener.WindowCheckInterval));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Override this method to get a list of all the eventSources that exist.
|
||||
/// </summary>
|
||||
protected override void OnEventSourceCreated(EventSource eventSource)
|
||||
{
|
||||
// Because we want to turn on every EventSource, we subscribe to a callback that triggers
|
||||
// when new EventSources are created. It is also fired when the EventListener is created
|
||||
// for all pre-existing EventSources. Thus this callback get called once for every
|
||||
// EventSource regardless of the order of EventSource and EventListener creation.
|
||||
|
||||
// For any EventSource we learn about, turn it on.
|
||||
if (eventSource.Name == CosmosBenchmarkEventSourceName)
|
||||
{
|
||||
this.EnableEvents(eventSource, EventLevel.Informational, EventKeywords.All);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// We override this method to get a callback on every event we subscribed to with EnableEvents
|
||||
/// </summary>
|
||||
/// <param name="eventData"></param>
|
||||
protected override void OnEventWritten(EventWrittenEventArgs eventData)
|
||||
{
|
||||
if (eventData.EventId == 2 // Successful
|
||||
|| eventData.EventId == 3) // Failure
|
||||
{
|
||||
int operationTypeIndex = (int)eventData.Payload[0];
|
||||
double durationInMs = (double)eventData.Payload[1];
|
||||
|
||||
switch (eventData.EventId)
|
||||
{
|
||||
case 2:
|
||||
this.metricsCollectors[operationTypeIndex].OnOperationSuccess(durationInMs);
|
||||
break;
|
||||
case 3:
|
||||
this.metricsCollectors[operationTypeIndex].OnOperationFailure(durationInMs);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -131,13 +131,16 @@ namespace CosmosBenchmark.Fx
|
|||
/// <param name="eventData">An instance of <see cref="EventWrittenEventArgs "/> containing the request latency and diagnostics.</param>
|
||||
protected override void OnEventWritten(EventWrittenEventArgs eventData)
|
||||
{
|
||||
try
|
||||
if (eventData.EventId == 1)
|
||||
{
|
||||
this.Writer.WriteLine($"{eventData.Payload[2]} ; {eventData.Payload[3]}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Utility.TraceError("An exception ocured while writing diagnostic data to the file", ex);
|
||||
try
|
||||
{
|
||||
this.Writer.WriteLine($"{eventData.Payload[2]} ; {eventData.Payload[3]}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Utility.TraceError("An exception ocured while writing diagnostic data to the file", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ namespace CosmosBenchmark
|
|||
BenchmarkConfig benchmarkConfig,
|
||||
int serialExecutorConcurrency,
|
||||
int serialExecutorIterationCount,
|
||||
double warmupFraction,
|
||||
MetricsCollectorProvider metricsCollectorProvider);
|
||||
double warmupFraction);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ namespace CosmosBenchmark
|
|||
bool isWarmup,
|
||||
bool traceFailures,
|
||||
Action completionCallback,
|
||||
BenchmarkConfig benchmarkConfig,
|
||||
MetricsCollectorProvider metricsCollectorProvider);
|
||||
BenchmarkConfig benchmarkConfig);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,25 +12,13 @@ namespace CosmosBenchmark
|
|||
public interface IMetricsCollector
|
||||
{
|
||||
/// <summary>
|
||||
/// Collects the number of successful operations.
|
||||
/// Successful operation with latency
|
||||
/// </summary>
|
||||
void CollectMetricsOnSuccess();
|
||||
void OnOperationSuccess(double operationLatencyInMs);
|
||||
|
||||
/// <summary>
|
||||
/// Collects the number of failed operations.
|
||||
/// Failed operation with latency
|
||||
/// </summary>
|
||||
void CollectMetricsOnFailure();
|
||||
|
||||
/// <summary>
|
||||
/// Records latency for success operations in milliseconda.
|
||||
/// </summary>
|
||||
/// <param name="milliseconds">The number of milliseconds to record.</param>
|
||||
void RecordSuccessOpLatencyAndRps(TimeSpan timeSpan);
|
||||
|
||||
/// <summary>
|
||||
/// Records latency for failed operations in milliseconda.
|
||||
/// </summary>
|
||||
/// <param name="milliseconds">The number of milliseconds to record.</param>
|
||||
void RecordFailedOpLatencyAndRps(TimeSpan timeSpan);
|
||||
void OnOperationFailure(double operationLatencyInMs);
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
namespace CosmosBenchmark
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
/// <summary>
|
||||
|
@ -96,45 +97,55 @@ namespace CosmosBenchmark
|
|||
/// Initialize new instance of <see cref="MetricsCollector"/>.
|
||||
/// </summary>
|
||||
/// <param name="meter">OpenTelemetry meter.</param>
|
||||
public MetricsCollector(Meter meter, string prefix)
|
||||
public MetricsCollector(BenchmarkOperationType operationType)
|
||||
{
|
||||
this.meter = meter;
|
||||
this.rpsMetricNameHistogram = meter.CreateHistogram<double>($"{prefix}OperationRpsHistogram");
|
||||
this.operationLatencyHistogram = meter.CreateHistogram<double>($"{prefix}OperationLatencyInMsHistogram");
|
||||
this.meter = new Meter($"CosmosBenchmark{operationType}OperationMeter");
|
||||
this.rpsMetricNameHistogram = this.meter.CreateHistogram<double>($"{operationType}OperationRpsHistogram");
|
||||
this.operationLatencyHistogram = this.meter.CreateHistogram<double>($"{operationType}OperationLatencyInMsHistogram");
|
||||
|
||||
this.rpsFailedMetricNameHistogram = meter.CreateHistogram<double>($"{prefix}FailedOperationRpsHistogram");
|
||||
this.operationFailedLatencyHistogram = meter.CreateHistogram<double>($"{prefix}FailedOperationLatencyInMsHistogram");
|
||||
this.rpsFailedMetricNameHistogram = this.meter.CreateHistogram<double>($"{operationType}FailedOperationRpsHistogram");
|
||||
this.operationFailedLatencyHistogram = this.meter.CreateHistogram<double>($"{operationType}FailedOperationLatencyInMsHistogram");
|
||||
|
||||
this.successOperationCounter = meter.CreateCounter<long>($"{prefix}OperationSuccess");
|
||||
this.failureOperationCounter = meter.CreateCounter<long>($"{prefix}OperationFailure");
|
||||
this.successOperationCounter = this.meter.CreateCounter<long>($"{operationType}OperationSuccess");
|
||||
this.failureOperationCounter = this.meter.CreateCounter<long>($"{operationType}OperationFailure");
|
||||
|
||||
this.latencyInMsMetricNameGauge = this.meter.CreateObservableGauge($"{prefix}OperationLatencyInMs",
|
||||
this.latencyInMsMetricNameGauge = this.meter.CreateObservableGauge($"{operationType}OperationLatencyInMs",
|
||||
() => new Measurement<double>(this.latencyInMs));
|
||||
|
||||
this.rpsNameGauge = this.meter.CreateObservableGauge($"{prefix}OperationRps",
|
||||
this.rpsNameGauge = this.meter.CreateObservableGauge($"{operationType}OperationRps",
|
||||
() => new Measurement<double>(this.rps));
|
||||
|
||||
this.latencyInMsFailedMetricNameGauge = this.meter.CreateObservableGauge($"{prefix}FailedOperationLatencyInMs",
|
||||
() => new Measurement<double>(this.latencyInMs));
|
||||
this.latencyInMsFailedMetricNameGauge = this.meter.CreateObservableGauge($"{operationType}FailedOperationLatencyInMs",
|
||||
() => new Measurement<double>(this.latencyFailedInMs));
|
||||
|
||||
this.rpsFailedNameGauge = this.meter.CreateObservableGauge($"{prefix}FailedOperationRps",
|
||||
() => new Measurement<double>(this.rps));
|
||||
this.rpsFailedNameGauge = this.meter.CreateObservableGauge($"{operationType}FailedOperationRps",
|
||||
() => new Measurement<double>(this.rpsFailed));
|
||||
}
|
||||
|
||||
internal static IEnumerable<string> GetBenchmarkMeterNames()
|
||||
{
|
||||
foreach (BenchmarkOperationType entry in Enum.GetValues<BenchmarkOperationType>())
|
||||
{
|
||||
yield return $"CosmosBenchmark{entry}OperationMeter";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Collects the number of successful operations.
|
||||
/// Successful operation with latency
|
||||
/// </summary>
|
||||
public void CollectMetricsOnSuccess()
|
||||
public void OnOperationSuccess(double operationLatencyInMs)
|
||||
{
|
||||
this.successOperationCounter.Add(1);
|
||||
this.RecordSuccessOpLatencyAndRps(operationLatencyInMs);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Collects the number of failed operations.
|
||||
/// Failed operation with latency
|
||||
/// </summary>
|
||||
public void CollectMetricsOnFailure()
|
||||
public void OnOperationFailure(double operationLatencyInMs)
|
||||
{
|
||||
this.failureOperationCounter.Add(1);
|
||||
this.RecordFailedOpLatencyAndRps(operationLatencyInMs);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -142,10 +153,10 @@ namespace CosmosBenchmark
|
|||
/// </summary>
|
||||
/// <param name="timeSpan">The number of milliseconds to record.</param>
|
||||
public void RecordSuccessOpLatencyAndRps(
|
||||
TimeSpan timeSpan)
|
||||
double operationLatencyInMs)
|
||||
{
|
||||
this.rps = timeSpan.TotalMilliseconds != 0 ? 1000 / timeSpan.TotalMilliseconds : 0;
|
||||
this.latencyInMs = timeSpan.TotalMilliseconds;
|
||||
this.rps = operationLatencyInMs != 0 ? 1000 / operationLatencyInMs : 0;
|
||||
this.latencyInMs = operationLatencyInMs;
|
||||
this.rpsMetricNameHistogram.Record(this.rps);
|
||||
this.operationLatencyHistogram.Record(this.latencyInMs);
|
||||
}
|
||||
|
@ -155,10 +166,10 @@ namespace CosmosBenchmark
|
|||
/// </summary>
|
||||
/// <param name="timeSpan">The number of milliseconds to record.</param>
|
||||
public void RecordFailedOpLatencyAndRps(
|
||||
TimeSpan timeSpan)
|
||||
double operationLatencyInMs)
|
||||
{
|
||||
this.rpsFailed = timeSpan.TotalMilliseconds != 0 ? 1000 / timeSpan.TotalMilliseconds : 0;
|
||||
this.latencyFailedInMs = timeSpan.TotalMilliseconds;
|
||||
this.rpsFailed = operationLatencyInMs != 0 ? 1000 / operationLatencyInMs : 0;
|
||||
this.latencyFailedInMs = operationLatencyInMs;
|
||||
this.rpsFailedMetricNameHistogram.Record(this.rpsFailed);
|
||||
this.operationFailedLatencyHistogram.Record(this.latencyFailedInMs);
|
||||
}
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
//------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//------------------------------------------------------------
|
||||
|
||||
namespace CosmosBenchmark
|
||||
{
|
||||
using System;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using OpenTelemetry.Metrics;
|
||||
|
||||
/// <summary>
|
||||
/// Represents the metrics collector provider.
|
||||
/// </summary>
|
||||
internal class MetricsCollectorProvider
|
||||
{
|
||||
private const int WindowCheckInterval = 10;
|
||||
private MetricCollectionWindow metricCollectionWindow;
|
||||
|
||||
private static readonly object metricCollectionWindowLock = new object();
|
||||
|
||||
private readonly MetricsCollector insertOperationMetricsCollector;
|
||||
|
||||
private readonly MetricsCollector queryOperationMetricsCollector;
|
||||
|
||||
private readonly MetricsCollector readOperationMetricsCollector;
|
||||
|
||||
private readonly Meter insertOperationMeter = new("CosmosBenchmarkInsertOperationMeter");
|
||||
|
||||
private readonly Meter queryOperationMeter = new("CosmosBenchmarkQueryOperationMeter");
|
||||
|
||||
private readonly Meter readOperationMeter = new("CosmosBenchmarkReadOperationMeter");
|
||||
|
||||
private readonly MeterProvider meterProvider;
|
||||
|
||||
public MetricsCollectorProvider(BenchmarkConfig config, MeterProvider meterProvider)
|
||||
{
|
||||
this.meterProvider = meterProvider;
|
||||
this.insertOperationMetricsCollector ??= new MetricsCollector(this.insertOperationMeter, "Insert");
|
||||
this.queryOperationMetricsCollector ??= new MetricsCollector(this.queryOperationMeter, "Query");
|
||||
this.readOperationMetricsCollector ??= new MetricsCollector(this.readOperationMeter, "Read");
|
||||
this.metricCollectionWindow ??= new MetricCollectionWindow(config);
|
||||
|
||||
/// <summary>
|
||||
/// Flush metrics every <see cref="AppConfig.MetricsReportingIntervalInSec"/>
|
||||
/// </summary>
|
||||
ThreadPool.QueueUserWorkItem(async state =>
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
MetricCollectionWindow metricCollectionWindow = this.GetCurrentMetricCollectionWindow(config);
|
||||
|
||||
// Reset metricCollectionWindow and flush.
|
||||
if (!metricCollectionWindow.IsValid)
|
||||
{
|
||||
this.meterProvider.ForceFlush();
|
||||
this.metricCollectionWindow.Reset(config);
|
||||
}
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(MetricsCollectorProvider.WindowCheckInterval));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private MetricCollectionWindow GetCurrentMetricCollectionWindow(BenchmarkConfig config)
|
||||
{
|
||||
if (this.metricCollectionWindow is null || !this.metricCollectionWindow.IsValid)
|
||||
{
|
||||
lock (metricCollectionWindowLock)
|
||||
{
|
||||
this.metricCollectionWindow ??= new MetricCollectionWindow(config);
|
||||
}
|
||||
}
|
||||
|
||||
return this.metricCollectionWindow;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the metric collector.
|
||||
/// </summary>
|
||||
/// <param name="benchmarkOperation">Benchmark operation.</param>
|
||||
/// <param name="config">Benchmark configuration.</param>
|
||||
/// <returns>Metrics collector.</returns>
|
||||
/// <exception cref="NotSupportedException">Thrown if provided benchmark operation is not covered supported to collect metrics.</exception>
|
||||
public IMetricsCollector GetMetricsCollector(IBenchmarkOperation benchmarkOperation)
|
||||
{
|
||||
return benchmarkOperation.OperationType switch
|
||||
{
|
||||
BenchmarkOperationType.Insert => this.insertOperationMetricsCollector,
|
||||
BenchmarkOperationType.Query => this.queryOperationMetricsCollector,
|
||||
BenchmarkOperationType.Read => this.readOperationMetricsCollector,
|
||||
_ => throw new NotSupportedException($"The type of {nameof(benchmarkOperation)} is not supported for collecting metrics."),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@
|
|||
{
|
||||
public string DatabseName { get; set; }
|
||||
public string ContainerName { get; set; }
|
||||
public BenchmarkOperationType OperationType { get; set; }
|
||||
public double RuCharges { get; set; }
|
||||
public Func<string> LazyDiagnostics { get; set; }
|
||||
public CosmosDiagnostics CosmosDiagnostics { get; set; }
|
||||
|
|
|
@ -29,8 +29,7 @@ namespace CosmosBenchmark
|
|||
BenchmarkConfig benchmarkConfig,
|
||||
int serialExecutorConcurrency,
|
||||
int serialExecutorIterationCount,
|
||||
double warmupFraction,
|
||||
MetricsCollectorProvider metricsCollectorProvider)
|
||||
double warmupFraction)
|
||||
{
|
||||
IExecutor warmupExecutor = new SerialOperationExecutor(
|
||||
executorId: "Warmup",
|
||||
|
@ -39,9 +38,8 @@ namespace CosmosBenchmark
|
|||
(int)(serialExecutorIterationCount * warmupFraction),
|
||||
isWarmup: true,
|
||||
traceFailures: benchmarkConfig.TraceFailures,
|
||||
completionCallback: () => { },
|
||||
benchmarkConfig,
|
||||
metricsCollectorProvider);
|
||||
completionCallback: () => { },
|
||||
benchmarkConfig);
|
||||
|
||||
Utility.TeePrint("Starting execution {0} tasks", serialExecutorConcurrency);
|
||||
IExecutor[] executors = new IExecutor[serialExecutorConcurrency];
|
||||
|
@ -60,8 +58,7 @@ namespace CosmosBenchmark
|
|||
isWarmup: false,
|
||||
traceFailures: benchmarkConfig.TraceFailures,
|
||||
completionCallback: () => Interlocked.Decrement(ref this.pendingExecutorCount),
|
||||
benchmarkConfig,
|
||||
metricsCollectorProvider);
|
||||
benchmarkConfig);
|
||||
}
|
||||
|
||||
return await this.LogOutputStats(
|
||||
|
|
|
@ -13,7 +13,6 @@ namespace CosmosBenchmark
|
|||
internal class SerialOperationExecutor : IExecutor
|
||||
{
|
||||
private readonly IBenchmarkOperation operation;
|
||||
|
||||
private readonly string executorId;
|
||||
|
||||
public SerialOperationExecutor(
|
||||
|
@ -28,7 +27,6 @@ namespace CosmosBenchmark
|
|||
}
|
||||
|
||||
public int SuccessOperationCount { get; private set; }
|
||||
|
||||
public int FailedOperationCount { get; private set; }
|
||||
|
||||
public double TotalRuCharges { get; private set; }
|
||||
|
@ -38,19 +36,15 @@ namespace CosmosBenchmark
|
|||
bool isWarmup,
|
||||
bool traceFailures,
|
||||
Action completionCallback,
|
||||
BenchmarkConfig benchmarkConfig,
|
||||
MetricsCollectorProvider metricsCollectorProvider)
|
||||
BenchmarkConfig benchmarkConfig)
|
||||
{
|
||||
Trace.TraceInformation($"Executor {this.executorId} started");
|
||||
|
||||
Trace.TraceInformation("Initializing counters and metrics.");
|
||||
|
||||
try
|
||||
{
|
||||
int currentIterationCount = 0;
|
||||
do
|
||||
{
|
||||
IMetricsCollector metricsCollector = metricsCollectorProvider.GetMetricsCollector(this.operation);
|
||||
OperationResult? operationResult = null;
|
||||
|
||||
await this.operation.PrepareAsync();
|
||||
|
@ -58,15 +52,12 @@ namespace CosmosBenchmark
|
|||
using (ITelemetrySpan telemetrySpan = TelemetrySpan.StartNew(
|
||||
benchmarkConfig,
|
||||
() => operationResult.Value,
|
||||
disableTelemetry: isWarmup,
|
||||
metricsCollector.RecordSuccessOpLatencyAndRps,
|
||||
metricsCollector.RecordFailedOpLatencyAndRps))
|
||||
disableTelemetry: isWarmup))
|
||||
{
|
||||
try
|
||||
{
|
||||
operationResult = await this.operation.ExecuteOnceAsync();
|
||||
|
||||
metricsCollector.CollectMetricsOnSuccess();
|
||||
telemetrySpan.MarkSuccess();
|
||||
|
||||
// Success case
|
||||
this.SuccessOperationCount++;
|
||||
|
@ -79,13 +70,11 @@ namespace CosmosBenchmark
|
|||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
telemetrySpan.MarkFailed();
|
||||
if (traceFailures)
|
||||
{
|
||||
Trace.TraceInformation(ex.ToString());
|
||||
}
|
||||
telemetrySpan.MarkFailed();
|
||||
|
||||
metricsCollector.CollectMetricsOnFailure();
|
||||
|
||||
// failure case
|
||||
this.FailedOperationCount++;
|
||||
|
@ -123,4 +112,4 @@ namespace CosmosBenchmark
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@ namespace CosmosBenchmark
|
|||
|
||||
private Stopwatch stopwatch;
|
||||
private Func<OperationResult> lazyOperationResult;
|
||||
private Action<TimeSpan> recordFailedOpLatencyAction;
|
||||
private Action<TimeSpan> recordSuccessOpLatencyAction;
|
||||
private bool disableTelemetry;
|
||||
private bool isFailed = false;
|
||||
private BenchmarkConfig benchmarkConfig;
|
||||
|
@ -28,9 +26,7 @@ namespace CosmosBenchmark
|
|||
public static ITelemetrySpan StartNew(
|
||||
BenchmarkConfig benchmarkConfig,
|
||||
Func<OperationResult> lazyOperationResult,
|
||||
bool disableTelemetry,
|
||||
Action<TimeSpan> recordSuccessOpLatencyAction,
|
||||
Action<TimeSpan> recordFailedOpLatencyAction)
|
||||
bool disableTelemetry)
|
||||
{
|
||||
if (disableTelemetry || !TelemetrySpan.IncludePercentile)
|
||||
{
|
||||
|
@ -42,17 +38,25 @@ namespace CosmosBenchmark
|
|||
benchmarkConfig = benchmarkConfig,
|
||||
stopwatch = Stopwatch.StartNew(),
|
||||
lazyOperationResult = lazyOperationResult,
|
||||
recordSuccessOpLatencyAction = recordSuccessOpLatencyAction,
|
||||
recordFailedOpLatencyAction = recordFailedOpLatencyAction,
|
||||
disableTelemetry = disableTelemetry
|
||||
};
|
||||
}
|
||||
|
||||
public void MarkFailed() { this.isFailed = true; }
|
||||
public void MarkFailed()
|
||||
{
|
||||
this.isFailed = true;
|
||||
this.stopwatch.Stop();
|
||||
}
|
||||
|
||||
public void MarkSuccess()
|
||||
{
|
||||
this.isFailed = false;
|
||||
this.stopwatch.Stop();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
this.stopwatch.Stop();
|
||||
this.stopwatch.Stop(); // No-op in-case of MarkFailed or MarkSuccess prior call
|
||||
if (!this.disableTelemetry)
|
||||
{
|
||||
OperationResult operationResult = this.lazyOperationResult();
|
||||
|
@ -61,14 +65,13 @@ namespace CosmosBenchmark
|
|||
{
|
||||
RecordLatency(this.stopwatch.Elapsed.TotalMilliseconds);
|
||||
|
||||
if(this.isFailed)
|
||||
if (this.isFailed)
|
||||
{
|
||||
this.recordSuccessOpLatencyAction?.Invoke(this.stopwatch.Elapsed);
|
||||
BenchmarkLatencyEventSource.Instance.OnOperationFailure((int)operationResult.OperationType, this.stopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.recordSuccessOpLatencyAction?.Invoke(this.stopwatch.Elapsed);
|
||||
|
||||
BenchmarkLatencyEventSource.Instance.OnOperationSuccess((int)operationResult.OperationType, this.stopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,9 +79,8 @@ namespace CosmosBenchmark
|
|||
operationResult.DatabseName,
|
||||
operationResult.ContainerName,
|
||||
(int)this.stopwatch.ElapsedMilliseconds,
|
||||
operationResult.LazyDiagnostics,
|
||||
operationResult.LazyDiagnostics,
|
||||
this.benchmarkConfig.DiagnosticLatencyThresholdInMs);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -104,16 +106,6 @@ namespace CosmosBenchmark
|
|||
return MathNet.Numerics.Statistics.Statistics.Percentile(latencyHistogram.Take(latencyIndex + 1), percentile);
|
||||
}
|
||||
|
||||
internal static double? GetLatencyQuantile(double quantile)
|
||||
{
|
||||
if (TelemetrySpan.latencyHistogram == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return MathNet.Numerics.Statistics.Statistics.Quantile(latencyHistogram.Take(latencyIndex + 1), quantile);
|
||||
}
|
||||
|
||||
private class NoOpDisposable : ITelemetrySpan
|
||||
{
|
||||
public static readonly NoOpDisposable Instance = new NoOpDisposable();
|
||||
|
@ -122,13 +114,19 @@ namespace CosmosBenchmark
|
|||
{
|
||||
}
|
||||
|
||||
public void MarkSuccess()
|
||||
{
|
||||
}
|
||||
|
||||
public void MarkFailed()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
public interface ITelemetrySpan : IDisposable {
|
||||
public interface ITelemetrySpan : IDisposable
|
||||
{
|
||||
void MarkSuccess();
|
||||
void MarkFailed();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,38 +11,35 @@ namespace CosmosBenchmark
|
|||
/// </summary>
|
||||
internal class MetricCollectionWindow
|
||||
{
|
||||
/// <summary>
|
||||
/// The timestamp when window span is started.
|
||||
/// </summary>
|
||||
public DateTime Started { get; private set; }
|
||||
private DateTime ValidTill { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The timestamp until which the current window span is not elapsed.
|
||||
/// </summary>
|
||||
public DateTime ValidTill { get; private set; }
|
||||
private int MetricsReportingIntervalInSec { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates the instance of <see cref="MetricCollectionWindow"/>.
|
||||
/// </summary>
|
||||
/// <param name="config">Cosmos Benchmark configuration.</param>
|
||||
public MetricCollectionWindow(BenchmarkConfig config)
|
||||
public MetricCollectionWindow(int metricsReportingIntervalInSec)
|
||||
{
|
||||
this.Reset(config);
|
||||
this.MetricsReportingIntervalInSec = metricsReportingIntervalInSec;
|
||||
this.Reset();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Indicates whether the current window is valid.
|
||||
/// </summary>
|
||||
public bool IsValid => DateTime.UtcNow > this.ValidTill;
|
||||
public bool IsInvalid()
|
||||
{
|
||||
return DateTime.UtcNow > this.ValidTill;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resets the started timestamp and valid till timespan.
|
||||
/// </summary>
|
||||
/// <param name="config"></param>
|
||||
public void Reset(BenchmarkConfig config)
|
||||
public void Reset()
|
||||
{
|
||||
this.Started = DateTime.UtcNow;
|
||||
this.ValidTill = this.Started.AddSeconds(config.MetricsReportingIntervalInSec);
|
||||
this.ValidTill = DateTime.UtcNow.AddSeconds(this.MetricsReportingIntervalInSec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,8 +41,7 @@ namespace CosmosBenchmark
|
|||
await AddAzureInfoToRunSummary();
|
||||
|
||||
MeterProvider meterProvider = BuildMeterProvider(config);
|
||||
|
||||
MetricsCollectorProvider metricsCollectorProvider = new MetricsCollectorProvider(config, meterProvider);
|
||||
CosmosBenchmarkEventListener listener = new CosmosBenchmarkEventListener(meterProvider, config);
|
||||
|
||||
ThreadPool.SetMinThreads(config.MinThreadPoolSize, config.MinThreadPoolSize);
|
||||
|
||||
|
@ -62,7 +61,7 @@ namespace CosmosBenchmark
|
|||
|
||||
Program program = new Program();
|
||||
|
||||
RunSummary runSummary = await program.ExecuteAsync(config, metricsCollectorProvider);
|
||||
RunSummary runSummary = await program.ExecuteAsync(config);
|
||||
|
||||
if (!string.IsNullOrEmpty(config.DiagnosticsStorageConnectionString))
|
||||
{
|
||||
|
@ -90,25 +89,28 @@ namespace CosmosBenchmark
|
|||
/// <returns></returns>
|
||||
private static MeterProvider BuildMeterProvider(BenchmarkConfig config)
|
||||
{
|
||||
MeterProviderBuilder meterProviderBuilder = Sdk.CreateMeterProviderBuilder();
|
||||
if (string.IsNullOrWhiteSpace(config.AppInsightsConnectionString))
|
||||
{
|
||||
return Sdk.CreateMeterProviderBuilder()
|
||||
.AddMeter("CosmosBenchmarkInsertOperationMeter")
|
||||
.AddMeter("CosmosBenchmarkQueryOperationMeter")
|
||||
.AddMeter("CosmosBenchmarkReadOperationMeter")
|
||||
.Build();
|
||||
foreach(string benchmarkName in MetricsCollector.GetBenchmarkMeterNames())
|
||||
{
|
||||
meterProviderBuilder = meterProviderBuilder.AddMeter(benchmarkName);
|
||||
};
|
||||
|
||||
return meterProviderBuilder.Build();
|
||||
}
|
||||
|
||||
OpenTelemetry.Trace.TracerProviderBuilder tracerProviderBuilder = Sdk.CreateTracerProviderBuilder()
|
||||
.AddAzureMonitorTraceExporter();
|
||||
|
||||
return Sdk.CreateMeterProviderBuilder()
|
||||
.AddAzureMonitorMetricExporter(configure: new Action<AzureMonitorExporterOptions>(
|
||||
(options) => options.ConnectionString = config.AppInsightsConnectionString))
|
||||
.AddMeter("CosmosBenchmarkInsertOperationMeter")
|
||||
.AddMeter("CosmosBenchmarkQueryOperationMeter")
|
||||
.AddMeter("CosmosBenchmarkReadOperationMeter")
|
||||
.Build();
|
||||
meterProviderBuilder = meterProviderBuilder.AddAzureMonitorMetricExporter(configure: new Action<AzureMonitorExporterOptions>(
|
||||
(options) => options.ConnectionString = config.AppInsightsConnectionString));
|
||||
foreach (string benchmarkName in MetricsCollector.GetBenchmarkMeterNames())
|
||||
{
|
||||
meterProviderBuilder = meterProviderBuilder.AddMeter(benchmarkName);
|
||||
};
|
||||
|
||||
return meterProviderBuilder.Build();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -138,13 +140,11 @@ namespace CosmosBenchmark
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Executing benchmarks for V2/V3 cosmosdb SDK.
|
||||
/// </summary>
|
||||
/// <returns>a Task object.</returns>
|
||||
private async Task<RunSummary> ExecuteAsync(BenchmarkConfig config,
|
||||
MetricsCollectorProvider metricsCollectorProvider)
|
||||
private async Task<RunSummary> ExecuteAsync(BenchmarkConfig config)
|
||||
{
|
||||
// V3 SDK client initialization
|
||||
using (CosmosClient cosmosClient = config.CreateCosmosClient(config.Key))
|
||||
|
@ -200,7 +200,7 @@ namespace CosmosBenchmark
|
|||
}
|
||||
|
||||
IExecutionStrategy execution = IExecutionStrategy.StartNew(benchmarkOperationFactory);
|
||||
runSummary = await execution.ExecuteAsync(config, taskCount, opsPerTask, 0.01, metricsCollectorProvider);
|
||||
runSummary = await execution.ExecuteAsync(config, taskCount, opsPerTask, 0.01);
|
||||
}
|
||||
|
||||
if (config.CleanupOnFinish)
|
||||
|
@ -224,8 +224,8 @@ namespace CosmosBenchmark
|
|||
Utility.TeeTraceInformation("Publishing results");
|
||||
runSummary.Diagnostics = CosmosDiagnosticsLogger.GetDiagnostics();
|
||||
await this.PublishResults(
|
||||
config,
|
||||
runSummary,
|
||||
config,
|
||||
runSummary,
|
||||
cosmosClient);
|
||||
}
|
||||
|
||||
|
@ -234,8 +234,8 @@ namespace CosmosBenchmark
|
|||
}
|
||||
|
||||
private async Task PublishResults(
|
||||
BenchmarkConfig config,
|
||||
RunSummary runSummary,
|
||||
BenchmarkConfig config,
|
||||
RunSummary runSummary,
|
||||
CosmosClient benchmarkClient)
|
||||
{
|
||||
if (string.IsNullOrEmpty(config.ResultsEndpoint))
|
||||
|
@ -390,4 +390,4 @@ namespace CosmosBenchmark
|
|||
traceSource.Listeners.Clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = ruCharges,
|
||||
LazyDiagnostics = () => itemResponse.RequestDiagnosticsString,
|
||||
};
|
||||
|
|
|
@ -82,6 +82,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = totalCharge,
|
||||
LazyDiagnostics = lastDiagnostics,
|
||||
};
|
||||
|
|
|
@ -90,6 +90,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = totalCharge,
|
||||
LazyDiagnostics = lastDiagnostics,
|
||||
};
|
||||
|
|
|
@ -54,6 +54,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = ruCharges,
|
||||
LazyDiagnostics = () => feedResponse.QueryMetrics.ToString(),
|
||||
};
|
||||
|
|
|
@ -61,6 +61,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = dce.RequestCharge,
|
||||
LazyDiagnostics = () => dce.ToString(),
|
||||
};
|
||||
|
|
|
@ -58,6 +58,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = ruCharges,
|
||||
LazyDiagnostics = () => itemResponse.RequestDiagnosticsString,
|
||||
};
|
||||
|
|
|
@ -59,6 +59,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = ruCharges,
|
||||
LazyDiagnostics = () => itemResponse.RequestDiagnosticsString,
|
||||
};
|
||||
|
|
|
@ -55,6 +55,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databaseName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = ruCharges,
|
||||
CosmosDiagnostics = itemResponse.Diagnostics,
|
||||
LazyDiagnostics = () => itemResponse.Diagnostics.ToString(),
|
||||
|
|
|
@ -108,6 +108,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databaseName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = totalCharge,
|
||||
CosmosDiagnostics = lastDiagnostics,
|
||||
LazyDiagnostics = () => lastDiagnostics?.ToString(),
|
||||
|
|
|
@ -59,6 +59,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = feedResponse.Headers.RequestCharge,
|
||||
CosmosDiagnostics = feedResponse.Diagnostics,
|
||||
LazyDiagnostics = () => feedResponse.Diagnostics.ToString(),
|
||||
|
|
|
@ -49,6 +49,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = itemResponse.Headers.RequestCharge,
|
||||
CosmosDiagnostics = itemResponse.Diagnostics,
|
||||
LazyDiagnostics = () => itemResponse.Diagnostics.ToString(),
|
||||
|
|
|
@ -56,6 +56,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = itemResponse.Headers.RequestCharge,
|
||||
CosmosDiagnostics = itemResponse.Diagnostics,
|
||||
LazyDiagnostics = () => itemResponse.Diagnostics.ToString(),
|
||||
|
|
|
@ -62,6 +62,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = itemResponse.Headers.RequestCharge,
|
||||
CosmosDiagnostics = itemResponse.Diagnostics,
|
||||
LazyDiagnostics = () => itemResponse.Diagnostics.ToString(),
|
||||
|
|
|
@ -55,6 +55,7 @@ namespace CosmosBenchmark
|
|||
{
|
||||
DatabseName = databsaeName,
|
||||
ContainerName = containerName,
|
||||
OperationType = this.OperationType,
|
||||
RuCharges = itemResponse.Headers.RequestCharge,
|
||||
CosmosDiagnostics = itemResponse.Diagnostics,
|
||||
LazyDiagnostics = () => itemResponse.Diagnostics.ToString(),
|
||||
|
|
Загрузка…
Ссылка в новой задаче