Only enable events/counters if there are listeners available (#928)

* Only enable events/counters if there are listeners available

* Remove some synchronization logic
This commit is contained in:
Miha Zupan 2021-04-22 00:27:53 +02:00 коммит произвёл GitHub
Родитель b105f96798
Коммит a2f2e467d6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 470 добавлений и 493 удалений

Просмотреть файл

@ -1,11 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Yarp.ReverseProxy.Telemetry.Consumption;
@ -15,7 +10,7 @@ namespace Yarp.Sample
{
public static IServiceCollection AddPrometheusProxyMetrics(this IServiceCollection services)
{
services.AddProxyTelemetryListener();
services.AddTelemetryListeners();
services.AddSingleton<IProxyMetricsConsumer, PrometheusProxyMetrics>();
return services;
}
@ -23,28 +18,28 @@ namespace Yarp.Sample
#if NET5_0_OR_GREATER
public static IServiceCollection AddPrometheusDnsMetrics(this IServiceCollection services)
{
services.AddNameResolutionTelemetryListener();
services.AddTelemetryListeners();
services.AddSingleton<INameResolutionMetricsConsumer, PrometheusDnsMetrics>();
return services;
}
public static IServiceCollection AddPrometheusKestrelMetrics(this IServiceCollection services)
{
services.AddKestrelTelemetryListener();
services.AddTelemetryListeners();
services.AddSingleton<IKestrelMetricsConsumer, PrometheusKestrelMetrics>();
return services;
}
public static IServiceCollection AddPrometheusOutboundHttpMetrics(this IServiceCollection services)
{
services.AddHttpTelemetryListener();
services.AddTelemetryListeners();
services.AddSingleton<IHttpMetricsConsumer, PrometheusOutboundHttpMetrics>();
return services;
}
public static IServiceCollection AddPrometheusSocketsMetrics(this IServiceCollection services)
{
services.AddSocketsTelemetryListener();
services.AddTelemetryListeners();
services.AddSingleton<ISocketsMetricsConsumer, PrometheusSocketMetrics>();
return services;
}

Просмотреть файл

@ -5,11 +5,6 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Prometheus;
using Yarp.ReverseProxy.Telemetry.Consumption;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using System;
using Yarp.ReverseProxy.Middleware;
namespace Yarp.Sample
{
@ -20,7 +15,6 @@ namespace Yarp.Sample
{
private readonly IConfiguration _configuration;
/// <summary>
/// Initializes a new instance of the <see cref="Startup" /> class.
/// </summary>
@ -38,8 +32,6 @@ namespace Yarp.Sample
services.AddReverseProxy()
.LoadFromConfig(_configuration.GetSection("ReverseProxy"));
services.AddHttpContextAccessor();
//Enable metric collection for all the underlying event counters used by YARP
services.AddAllPrometheusMetrics();
}

Просмотреть файл

@ -0,0 +1,104 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal abstract class EventListenerService<TService, TTelemetryConsumer, TMetricsConsumer> : EventListener, IHostedService
{
protected abstract string EventSourceName { get; }
protected readonly ILogger<TService> Logger;
protected readonly TMetricsConsumer[] MetricsConsumers;
protected readonly TTelemetryConsumer[] TelemetryConsumers;
private EventSource _eventSource;
private readonly object _syncObject = new();
private readonly bool _initialized;
public EventListenerService(
ILogger<TService> logger,
IEnumerable<TTelemetryConsumer> telemetryConsumers,
IEnumerable<TMetricsConsumer> metricsConsumers)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
_ = telemetryConsumers ?? throw new ArgumentNullException(nameof(telemetryConsumers));
_ = metricsConsumers ?? throw new ArgumentNullException(nameof(metricsConsumers));
TelemetryConsumers = telemetryConsumers.ToArray();
MetricsConsumers = metricsConsumers.ToArray();
if (TelemetryConsumers.Any(s => s is null) || metricsConsumers.Any(c => c is null))
{
throw new ArgumentException("A consumer may not be null",
TelemetryConsumers.Any(s => s is null) ? nameof(telemetryConsumers) : nameof(metricsConsumers));
}
if (TelemetryConsumers.Length == 0)
{
TelemetryConsumers = null;
}
if (MetricsConsumers.Length == 0)
{
MetricsConsumers = null;
}
lock (_syncObject)
{
if (_eventSource is not null)
{
EnableEventSource();
}
_initialized = true;
}
}
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == EventSourceName)
{
lock (_syncObject)
{
_eventSource = eventSource;
if (_initialized)
{
// Ctor already finished - enable the EventSource here
EnableEventSource();
}
}
}
}
private void EnableEventSource()
{
var enableEvents = TelemetryConsumers is not null;
var enableMetrics = MetricsConsumers is not null;
if (!enableEvents && !enableMetrics)
{
return;
}
var eventLevel = enableEvents ? EventLevel.Verbose : EventLevel.Critical;
var arguments = enableMetrics ? new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } } : null;
EnableEvents(_eventSource, eventLevel, EventKeywords.None, arguments);
_eventSource = null;
}
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
}

Просмотреть файл

@ -6,44 +6,21 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal sealed class HttpEventListenerService : EventListener, IHostedService
internal sealed class HttpEventListenerService : EventListenerService<HttpEventListenerService, IHttpTelemetryConsumer, IHttpMetricsConsumer>
{
private readonly ILogger<HttpEventListenerService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private HttpMetrics _previousMetrics;
private HttpMetrics _currentMetrics = new();
private int _eventCountersCount;
public HttpEventListenerService(ILogger<HttpEventListenerService> logger, IServiceProvider serviceProvider, IHttpContextAccessor httpContextAccessor)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
protected override string EventSourceName => "System.Net.Http";
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "System.Net.Http")
{
var arguments = new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } };
EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.None, arguments);
}
}
public HttpEventListenerService(ILogger<HttpEventListenerService> logger, IEnumerable<IHttpTelemetryConsumer> telemetryConsumers, IEnumerable<IHttpMetricsConsumer> metricsConsumers)
: base(logger, telemetryConsumers, metricsConsumers)
{ }
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
@ -60,15 +37,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
return;
}
var context = _httpContextAccessor?.HttpContext;
if (context is null)
{
return;
}
using var consumers = context.RequestServices.GetServices<IHttpTelemetryConsumer>().GetEnumerator();
if (!consumers.MoveNext())
if (TelemetryConsumers is null)
{
return;
}
@ -87,33 +56,30 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var versionMajor = (int)(byte)payload[4];
var versionMinor = (int)(byte)payload[5];
var versionPolicy = (HttpVersionPolicy)payload[6];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestStart(eventData.TimeStamp, scheme, host, port, pathAndQuery, versionMajor, versionMinor, versionPolicy);
consumer.OnRequestStart(eventData.TimeStamp, scheme, host, port, pathAndQuery, versionMajor, versionMinor, versionPolicy);
}
while (consumers.MoveNext());
}
break;
case 2:
Debug.Assert(eventData.EventName == "RequestStop" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestStop(eventData.TimeStamp);
consumer.OnRequestStop(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
case 3:
Debug.Assert(eventData.EventName == "RequestFailed" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestFailed(eventData.TimeStamp);
consumer.OnRequestFailed(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
@ -122,11 +88,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
{
var versionMajor = (int)(byte)payload[0];
var versionMinor = (int)(byte)payload[1];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnConnectionEstablished(eventData.TimeStamp, versionMajor, versionMinor);
consumer.OnConnectionEstablished(eventData.TimeStamp, versionMajor, versionMinor);
}
while (consumers.MoveNext());
}
break;
@ -140,44 +105,40 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var timeOnQueue = TimeSpan.FromMilliseconds((double)payload[0]);
var versionMajor = (int)(byte)payload[1];
var versionMinor = (int)(byte)payload[2];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestLeftQueue(eventData.TimeStamp, timeOnQueue, versionMajor, versionMinor);
consumer.OnRequestLeftQueue(eventData.TimeStamp, timeOnQueue, versionMajor, versionMinor);
}
while (consumers.MoveNext());
}
break;
case 7:
Debug.Assert(eventData.EventName == "RequestHeadersStart" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestHeadersStart(eventData.TimeStamp);
consumer.OnRequestHeadersStart(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
case 8:
Debug.Assert(eventData.EventName == "RequestHeadersStop" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestHeadersStop(eventData.TimeStamp);
consumer.OnRequestHeadersStop(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
case 9:
Debug.Assert(eventData.EventName == "RequestContentStart" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestContentStart(eventData.TimeStamp);
consumer.OnRequestContentStart(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
@ -185,33 +146,30 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "RequestContentStop" && payload.Count == 1);
{
var contentLength = (long)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestContentStop(eventData.TimeStamp, contentLength);
consumer.OnRequestContentStop(eventData.TimeStamp, contentLength);
}
while (consumers.MoveNext());
}
break;
case 11:
Debug.Assert(eventData.EventName == "ResponseHeadersStart" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnResponseHeadersStart(eventData.TimeStamp);
consumer.OnResponseHeadersStart(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
case 12:
Debug.Assert(eventData.EventName == "ResponseHeadersStop" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnResponseHeadersStop(eventData.TimeStamp);
consumer.OnResponseHeadersStop(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
}
@ -219,6 +177,11 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
private void OnEventCounters(EventWrittenEventArgs eventData)
{
if (MetricsConsumers is null)
{
return;
}
Debug.Assert(eventData.EventName == "EventCounters" && eventData.Payload.Count == 1);
var counters = (IDictionary<string, object>)eventData.Payload[0];
@ -284,14 +247,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
_previousMetrics = metrics;
_currentMetrics = new HttpMetrics();
if (previous is null || _serviceProvider is null)
if (previous is null)
{
return;
}
try
{
foreach (var consumer in _serviceProvider.GetServices<IHttpMetricsConsumer>())
foreach (var consumer in MetricsConsumers)
{
consumer.OnHttpMetrics(previous, metrics);
}
@ -299,7 +262,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
catch (Exception ex)
{
// We can't let an uncaught exception propagate as that would crash the process
_logger.LogError(ex, $"Uncaught exception occured while processing {nameof(HttpMetrics)}.");
Logger.LogError(ex, $"Uncaught exception occured while processing {nameof(HttpMetrics)}.");
}
}
}

Просмотреть файл

@ -22,19 +22,19 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="versionMajor">Major component of the request's HTTP version.</param>
/// <param name="versionMinor">Minor component of the request's HTTP version.</param>
/// <param name="versionPolicy"><see cref="HttpVersionPolicy"/> of the request.</param>
void OnRequestStart(DateTime timestamp, string scheme, string host, int port, string pathAndQuery, int versionMajor, int versionMinor, HttpVersionPolicy versionPolicy);
void OnRequestStart(DateTime timestamp, string scheme, string host, int port, string pathAndQuery, int versionMajor, int versionMinor, HttpVersionPolicy versionPolicy) { }
/// <summary>
/// Called after an HTTP request.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnRequestStop(DateTime timestamp);
void OnRequestStop(DateTime timestamp) { }
/// <summary>
/// Called before <see cref="OnRequestStop(DateTime)"/> if the request failed.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnRequestFailed(DateTime timestamp);
void OnRequestFailed(DateTime timestamp) { }
/// <summary>
/// Called when a new HTTP connection is established.
@ -42,7 +42,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="versionMajor">Major component of the connection's HTTP version.</param>
/// <param name="versionMinor">Minor component of the connection's HTTP version.</param>
void OnConnectionEstablished(DateTime timestamp, int versionMajor, int versionMinor);
void OnConnectionEstablished(DateTime timestamp, int versionMajor, int versionMinor) { }
/// <summary>
/// Called when a request that hit the MaxConnectionsPerServer or MAX_CONCURRENT_STREAMS limit leaves the queue.
@ -51,43 +51,43 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="timeOnQueue">Time spent on queue.</param>
/// <param name="versionMajor">Major component of the request's HTTP version.</param>
/// <param name="versionMinor">Minor component of the request's HTTP version.</param>
void OnRequestLeftQueue(DateTime timestamp, TimeSpan timeOnQueue, int versionMajor, int versionMinor);
void OnRequestLeftQueue(DateTime timestamp, TimeSpan timeOnQueue, int versionMajor, int versionMinor) { }
/// <summary>
/// Called before sending the request headers.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnRequestHeadersStart(DateTime timestamp);
void OnRequestHeadersStart(DateTime timestamp) { }
/// <summary>
/// Called after sending the request headers.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnRequestHeadersStop(DateTime timestamp);
void OnRequestHeadersStop(DateTime timestamp) { }
/// <summary>
/// Called before sending the request content.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnRequestContentStart(DateTime timestamp);
void OnRequestContentStart(DateTime timestamp) { }
/// <summary>
/// Called after sending the request content.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="contentLength"></param>
void OnRequestContentStop(DateTime timestamp, long contentLength);
void OnRequestContentStop(DateTime timestamp, long contentLength) { }
/// <summary>
/// Called before reading the response headers.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnResponseHeadersStart(DateTime timestamp);
void OnResponseHeadersStart(DateTime timestamp) { }
/// <summary>
/// Called after reading all response headers.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnResponseHeadersStop(DateTime timestamp);
void OnResponseHeadersStop(DateTime timestamp) { }
}
}

Просмотреть файл

@ -20,7 +20,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="httpVersion">HTTP version of the request.</param>
/// <param name="path">Path of the request.</param>
/// <param name="method">HTTP method of the request.</param>
void OnRequestStart(DateTime timestamp, string connectionId, string requestId, string httpVersion, string path, string method);
void OnRequestStart(DateTime timestamp, string connectionId, string requestId, string httpVersion, string path, string method) { }
/// <summary>
/// Called at the end of a request.
@ -31,7 +31,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="httpVersion">HTTP version of the request.</param>
/// <param name="path">Path of the request.</param>
/// <param name="method">HTTP method of the request.</param>
void OnRequestStop(DateTime timestamp, string connectionId, string requestId, string httpVersion, string path, string method);
void OnRequestStop(DateTime timestamp, string connectionId, string requestId, string httpVersion, string path, string method) { }
#else
/// <summary>
/// Called at the start of a request.
@ -39,7 +39,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="connectionId">ID of the connection.</param>
/// <param name="requestId">ID of the request.</param>
void OnRequestStart(DateTime timestamp, string connectionId, string requestId);
void OnRequestStart(DateTime timestamp, string connectionId, string requestId) { }
/// <summary>
/// Called at the end of a request.
@ -47,7 +47,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="connectionId">ID of the connection.</param>
/// <param name="requestId">ID of the request.</param>
void OnRequestStop(DateTime timestamp, string connectionId, string requestId);
void OnRequestStop(DateTime timestamp, string connectionId, string requestId) { }
#endif
}
}

Просмотреть файл

@ -5,53 +5,27 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal sealed class KestrelEventListenerService : EventListener, IHostedService
#if !NET5_0
internal interface IKestrelMetricsConsumer { }
#endif
internal sealed class KestrelEventListenerService : EventListenerService<KestrelEventListenerService, IKestrelTelemetryConsumer, IKestrelMetricsConsumer>
{
#if NET5_0
private readonly ILogger<KestrelEventListenerService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private KestrelMetrics _previousMetrics;
private KestrelMetrics _currentMetrics = new();
private int _eventCountersCount;
public KestrelEventListenerService(ILogger<KestrelEventListenerService> logger, IServiceProvider serviceProvider, IHttpContextAccessor httpContextAccessor)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
#else
private readonly IHttpContextAccessor _httpContextAccessor;
public KestrelEventListenerService(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
#endif
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override string EventSourceName => "Microsoft-AspNetCore-Server-Kestrel";
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "Microsoft-AspNetCore-Server-Kestrel")
{
var arguments = new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } };
EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.None, arguments);
}
}
public KestrelEventListenerService(ILogger<KestrelEventListenerService> logger, IEnumerable<IKestrelTelemetryConsumer> telemetryConsumers, IEnumerable<IKestrelMetricsConsumer> metricsConsumers)
: base(logger, telemetryConsumers, metricsConsumers)
{ }
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
@ -70,15 +44,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
return;
}
var context = _httpContextAccessor?.HttpContext;
if (context is null)
{
return;
}
using var consumers = context.RequestServices.GetServices<IKestrelTelemetryConsumer>().GetEnumerator();
if (!consumers.MoveNext())
if (TelemetryConsumers is null)
{
return;
}
@ -96,11 +62,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var httpVersion = (string)payload[2];
var path = (string)payload[3];
var method = (string)payload[4];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestStart(eventData.TimeStamp, connectionId, requestId, httpVersion, path, method);
consumer.OnRequestStart(eventData.TimeStamp, connectionId, requestId, httpVersion, path, method);
}
while (consumers.MoveNext());
}
break;
@ -112,11 +77,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var httpVersion = (string)payload[2];
var path = (string)payload[3];
var method = (string)payload[4];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestStop(eventData.TimeStamp, connectionId, requestId, httpVersion, path, method);
consumer.OnRequestStop(eventData.TimeStamp, connectionId, requestId, httpVersion, path, method);
}
while (consumers.MoveNext());
}
break;
}
@ -128,11 +92,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
{
var connectionId = (string)payload[0];
var requestId = (string)payload[1];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestStart(eventData.TimeStamp, connectionId, requestId);
consumer.OnRequestStart(eventData.TimeStamp, connectionId, requestId);
}
while (consumers.MoveNext());
}
break;
@ -141,11 +104,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
{
var connectionId = (string)payload[0];
var requestId = (string)payload[1];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnRequestStop(eventData.TimeStamp, connectionId, requestId);
consumer.OnRequestStop(eventData.TimeStamp, connectionId, requestId);
}
while (consumers.MoveNext());
}
break;
}
@ -155,6 +117,11 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
#if NET5_0
private void OnEventCounters(EventWrittenEventArgs eventData)
{
if (MetricsConsumers is null)
{
return;
}
Debug.Assert(eventData.EventName == "EventCounters" && eventData.Payload.Count == 1);
var counters = (IDictionary<string, object>)eventData.Payload[0];
@ -224,14 +191,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
_previousMetrics = metrics;
_currentMetrics = new KestrelMetrics();
if (previous is null || _serviceProvider is null)
if (previous is null)
{
return;
}
try
{
foreach (var consumer in _serviceProvider.GetServices<IKestrelMetricsConsumer>())
foreach (var consumer in MetricsConsumers)
{
consumer.OnKestrelMetrics(previous, metrics);
}
@ -239,7 +206,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
catch (Exception ex)
{
// We can't let an uncaught exception propagate as that would crash the process
_logger.LogError(ex, $"Uncaught exception occured while processing {nameof(KestrelMetrics)}.");
Logger.LogError(ex, $"Uncaught exception occured while processing {nameof(KestrelMetrics)}.");
}
}
}

Просмотреть файл

@ -15,18 +15,18 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="hostNameOrAddress">Host name or address we are resolving.</param>
void OnResolutionStart(DateTime timestamp, string hostNameOrAddress);
void OnResolutionStart(DateTime timestamp, string hostNameOrAddress) { }
/// <summary>
/// Called after a name resolution.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnResolutionStop(DateTime timestamp);
void OnResolutionStop(DateTime timestamp) { }
/// <summary>
/// Called before <see cref="OnResolutionStop(DateTime)"/> if the name resolution failed.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnResolutionFailed(DateTime timestamp);
void OnResolutionFailed(DateTime timestamp) { }
}
}

Просмотреть файл

@ -5,44 +5,21 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal sealed class NameResolutionEventListenerService : EventListener, IHostedService
internal sealed class NameResolutionEventListenerService : EventListenerService<NameResolutionEventListenerService, INameResolutionTelemetryConsumer, INameResolutionMetricsConsumer>
{
private readonly ILogger<NameResolutionEventListenerService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private NameResolutionMetrics _previousMetrics;
private NameResolutionMetrics _currentMetrics = new();
private int _eventCountersCount;
public NameResolutionEventListenerService(ILogger<NameResolutionEventListenerService> logger, IServiceProvider serviceProvider, IHttpContextAccessor httpContextAccessor)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
protected override string EventSourceName => "System.Net.NameResolution";
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "System.Net.NameResolution")
{
var arguments = new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } };
EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.None, arguments);
}
}
public NameResolutionEventListenerService(ILogger<NameResolutionEventListenerService> logger, IEnumerable<INameResolutionTelemetryConsumer> telemetryConsumers, IEnumerable<INameResolutionMetricsConsumer> metricsConsumers)
: base(logger, telemetryConsumers, metricsConsumers)
{ }
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
@ -59,15 +36,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
return;
}
var context = _httpContextAccessor?.HttpContext;
if (context is null)
{
return;
}
using var consumers = context.RequestServices.GetServices<INameResolutionTelemetryConsumer>().GetEnumerator();
if (!consumers.MoveNext())
if (TelemetryConsumers is null)
{
return;
}
@ -80,33 +49,30 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "ResolutionStart" && payload.Count == 1);
{
var hostNameOrAddress = (string)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnResolutionStart(eventData.TimeStamp, hostNameOrAddress);
consumer.OnResolutionStart(eventData.TimeStamp, hostNameOrAddress);
}
while (consumers.MoveNext());
}
break;
case 2:
Debug.Assert(eventData.EventName == "ResolutionStop" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnResolutionStop(eventData.TimeStamp);
consumer.OnResolutionStop(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
case 3:
Debug.Assert(eventData.EventName == "ResolutionFailed" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnResolutionFailed(eventData.TimeStamp);
consumer.OnResolutionFailed(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
}
@ -114,6 +80,11 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
private void OnEventCounters(EventWrittenEventArgs eventData)
{
if (MetricsConsumers is null)
{
return;
}
Debug.Assert(eventData.EventName == "EventCounters" && eventData.Payload.Count == 1);
var counters = (IDictionary<string, object>)eventData.Payload[0];
@ -151,14 +122,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
_previousMetrics = metrics;
_currentMetrics = new NameResolutionMetrics();
if (previous is null || _serviceProvider is null)
if (previous is null)
{
return;
}
try
{
foreach (var consumer in _serviceProvider.GetServices<INameResolutionMetricsConsumer>())
foreach (var consumer in MetricsConsumers)
{
consumer.OnNameResolutionMetrics(previous, metrics);
}
@ -166,7 +137,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
catch (Exception ex)
{
// We can't let an uncaught exception propagate as that would crash the process
_logger.LogError(ex, $"Uncaught exception occured while processing {nameof(NameResolutionMetrics)}.");
Logger.LogError(ex, $"Uncaught exception occured while processing {nameof(NameResolutionMetrics)}.");
}
}
}

Просмотреть файл

@ -17,14 +17,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="isServer">Indicates whether we are authenticating as the server.</param>
/// <param name="targetHost">Name of the host we are authenticating with.</param>
void OnHandshakeStart(DateTime timestamp, bool isServer, string targetHost);
void OnHandshakeStart(DateTime timestamp, bool isServer, string targetHost) { }
/// <summary>
/// Called after a handshake.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="protocol">The protocol established by the handshake.</param>
void OnHandshakeStop(DateTime timestamp, SslProtocols protocol);
void OnHandshakeStop(DateTime timestamp, SslProtocols protocol) { }
/// <summary>
/// Called before <see cref="OnHandshakeStop(DateTime, SslProtocols)"/> if the handshake failed.
@ -33,6 +33,6 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="isServer">Indicates whether we were authenticating as the server.</param>
/// <param name="elapsed">Time elapsed since the start of the handshake.</param>
/// <param name="exceptionMessage">Exception information for the handshake failure.</param>
void OnHandshakeFailed(DateTime timestamp, bool isServer, TimeSpan elapsed, string exceptionMessage);
void OnHandshakeFailed(DateTime timestamp, bool isServer, TimeSpan elapsed, string exceptionMessage) { }
}
}

Просмотреть файл

@ -6,44 +6,21 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal sealed class NetSecurityEventListenerService : EventListener, IHostedService
internal sealed class NetSecurityEventListenerService : EventListenerService<NetSecurityEventListenerService, INetSecurityTelemetryConsumer, INetSecurityMetricsConsumer>
{
private readonly ILogger<NetSecurityEventListenerService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private NetSecurityMetrics _previousMetrics;
private NetSecurityMetrics _currentMetrics = new();
private int _eventCountersCount;
public NetSecurityEventListenerService(ILogger<NetSecurityEventListenerService> logger, IServiceProvider serviceProvider, IHttpContextAccessor httpContextAccessor)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
protected override string EventSourceName => "System.Net.Security";
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "System.Net.Security")
{
var arguments = new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } };
EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.None, arguments);
}
}
public NetSecurityEventListenerService(ILogger<NetSecurityEventListenerService> logger, IEnumerable<INetSecurityTelemetryConsumer> telemetryConsumers, IEnumerable<INetSecurityMetricsConsumer> metricsConsumers)
: base(logger, telemetryConsumers, metricsConsumers)
{ }
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
@ -60,15 +37,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
return;
}
var context = _httpContextAccessor?.HttpContext;
if (context is null)
{
return;
}
using var consumers = context.RequestServices.GetServices<INetSecurityTelemetryConsumer>().GetEnumerator();
if (!consumers.MoveNext())
if (TelemetryConsumers is null)
{
return;
}
@ -82,11 +51,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
{
var isServer = (bool)payload[0];
var targetHost = (string)payload[1];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnHandshakeStart(eventData.TimeStamp, isServer, targetHost);
consumer.OnHandshakeStart(eventData.TimeStamp, isServer, targetHost);
}
while (consumers.MoveNext());
}
break;
@ -94,11 +62,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "HandshakeStop" && payload.Count == 1);
{
var protocol = (SslProtocols)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnHandshakeStop(eventData.TimeStamp, protocol);
consumer.OnHandshakeStop(eventData.TimeStamp, protocol);
}
while (consumers.MoveNext());
}
break;
@ -108,11 +75,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var isServer = (bool)payload[0];
var elapsed = TimeSpan.FromMilliseconds((double)payload[1]);
var exceptionMessage = (string)payload[2];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnHandshakeFailed(eventData.TimeStamp, isServer, elapsed, exceptionMessage);
consumer.OnHandshakeFailed(eventData.TimeStamp, isServer, elapsed, exceptionMessage);
}
while (consumers.MoveNext());
}
break;
}
@ -120,6 +86,11 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
private void OnEventCounters(EventWrittenEventArgs eventData)
{
if (MetricsConsumers is null)
{
return;
}
Debug.Assert(eventData.EventName == "EventCounters" && eventData.Payload.Count == 1);
var counters = (IDictionary<string, object>)eventData.Payload[0];
@ -205,14 +176,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
_previousMetrics = metrics;
_currentMetrics = new NetSecurityMetrics();
if (previous is null || _serviceProvider is null)
if (previous is null)
{
return;
}
try
{
foreach (var consumer in _serviceProvider.GetServices<INetSecurityMetricsConsumer>())
foreach (var consumer in MetricsConsumers)
{
consumer.OnNetSecurityMetrics(previous, metrics);
}
@ -220,7 +191,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
catch (Exception ex)
{
// We can't let an uncaught exception propagate as that would crash the process
_logger.LogError(ex, $"Uncaught exception occured while processing {nameof(NetSecurityMetrics)}.");
Logger.LogError(ex, $"Uncaught exception occured while processing {nameof(NetSecurityMetrics)}.");
}
}
}

Просмотреть файл

@ -16,28 +16,28 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="destinationPrefix"></param>
void OnProxyStart(DateTime timestamp, string destinationPrefix);
void OnProxyStart(DateTime timestamp, string destinationPrefix) { }
/// <summary>
/// Called after proxying a request.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="statusCode">The status code returned in the response.</param>
void OnProxyStop(DateTime timestamp, int statusCode);
void OnProxyStop(DateTime timestamp, int statusCode) { }
/// <summary>
/// Called before <see cref="OnProxyStop(DateTime, int)"/> if proxying the request failed.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="error"><see cref="ProxyError"/> information for the proxy failure.</param>
void OnProxyFailed(DateTime timestamp, ProxyError error);
void OnProxyFailed(DateTime timestamp, ProxyError error) { }
/// <summary>
/// Called when reaching a given stage of proxying a request.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="stage">Stage of the proxy operation.</param>
void OnProxyStage(DateTime timestamp, ProxyStage stage);
void OnProxyStage(DateTime timestamp, ProxyStage stage) { }
/// <summary>
/// Called periodically while a content transfer is active.
@ -48,7 +48,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="iops">Number of read/write pairs performed.</param>
/// <param name="readTime">Time spent reading from the source.</param>
/// <param name="writeTime">Time spent writing to the destination.</param>
void OnContentTransferring(DateTime timestamp, bool isRequest, long contentLength, long iops, TimeSpan readTime, TimeSpan writeTime);
void OnContentTransferring(DateTime timestamp, bool isRequest, long contentLength, long iops, TimeSpan readTime, TimeSpan writeTime) { }
/// <summary>
/// Called after transferring the request or response content.
@ -60,7 +60,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="readTime">Time spent reading from the source.</param>
/// <param name="writeTime">Time spent writing to the destination.</param>
/// <param name="firstReadTime">Time spent on the first read of the source.</param>
void OnContentTransferred(DateTime timestamp, bool isRequest, long contentLength, long iops, TimeSpan readTime, TimeSpan writeTime, TimeSpan firstReadTime);
void OnContentTransferred(DateTime timestamp, bool isRequest, long contentLength, long iops, TimeSpan readTime, TimeSpan writeTime, TimeSpan firstReadTime) { }
/// <summary>
/// Called before proxying a request.
@ -69,6 +69,6 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="clusterId">Cluster ID</param>
/// <param name="routeId">Route ID</param>
/// <param name="destinationId">Destination ID</param>
void OnProxyInvoke(DateTime timestamp, string clusterId, string routeId, string destinationId);
void OnProxyInvoke(DateTime timestamp, string clusterId, string routeId, string destinationId) { }
}
}

Просмотреть файл

@ -5,45 +5,22 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Yarp.ReverseProxy.Service.Proxy;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal sealed class ProxyEventListenerService : EventListener, IHostedService
internal sealed class ProxyEventListenerService : EventListenerService<ProxyEventListenerService, IProxyTelemetryConsumer, IProxyMetricsConsumer>
{
private readonly ILogger<ProxyEventListenerService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private ProxyMetrics _previousMetrics;
private ProxyMetrics _currentMetrics = new();
private int _eventCountersCount;
public ProxyEventListenerService(ILogger<ProxyEventListenerService> logger, IServiceProvider serviceProvider, IHttpContextAccessor httpContextAccessor)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
protected override string EventSourceName => "Yarp.ReverseProxy";
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "Yarp.ReverseProxy")
{
var arguments = new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } };
EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.None, arguments);
}
}
public ProxyEventListenerService(ILogger<ProxyEventListenerService> logger, IEnumerable<IProxyTelemetryConsumer> telemetryConsumers, IEnumerable<IProxyMetricsConsumer> metricsConsumers)
: base(logger, telemetryConsumers, metricsConsumers)
{ }
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
@ -60,15 +37,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
return;
}
var context = _httpContextAccessor?.HttpContext;
if (context is null)
{
return;
}
using var consumers = context.RequestServices.GetServices<IProxyTelemetryConsumer>().GetEnumerator();
if (!consumers.MoveNext())
if (TelemetryConsumers is null)
{
return;
}
@ -81,11 +50,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "ProxyStart" && payload.Count == 1);
{
var destinationPrefix = (string)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnProxyStart(eventData.TimeStamp, destinationPrefix);
consumer.OnProxyStart(eventData.TimeStamp, destinationPrefix);
}
while (consumers.MoveNext());
}
break;
@ -93,11 +61,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "ProxyStop" && payload.Count == 1);
{
var statusCode = (int)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnProxyStop(eventData.TimeStamp, statusCode);
consumer.OnProxyStop(eventData.TimeStamp, statusCode);
}
while (consumers.MoveNext());
}
break;
@ -105,11 +72,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "ProxyFailed" && payload.Count == 1);
{
var error = (ProxyError)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnProxyFailed(eventData.TimeStamp, error);
consumer.OnProxyFailed(eventData.TimeStamp, error);
}
while (consumers.MoveNext());
}
break;
@ -117,11 +83,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "ProxyStage" && payload.Count == 1);
{
var proxyStage = (ProxyStage)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnProxyStage(eventData.TimeStamp, proxyStage);
consumer.OnProxyStage(eventData.TimeStamp, proxyStage);
}
while (consumers.MoveNext());
}
break;
@ -133,11 +98,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var iops = (long)payload[2];
var readTime = new TimeSpan((long)payload[3]);
var writeTime = new TimeSpan((long)payload[4]);
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnContentTransferring(eventData.TimeStamp, isRequest, contentLength, iops, readTime, writeTime);
consumer.OnContentTransferring(eventData.TimeStamp, isRequest, contentLength, iops, readTime, writeTime);
}
while (consumers.MoveNext());
}
break;
@ -150,11 +114,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var readTime = new TimeSpan((long)payload[3]);
var writeTime = new TimeSpan((long)payload[4]);
var firstReadTime = new TimeSpan((long)payload[5]);
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnContentTransferred(eventData.TimeStamp, isRequest, contentLength, iops, readTime, writeTime, firstReadTime);
consumer.OnContentTransferred(eventData.TimeStamp, isRequest, contentLength, iops, readTime, writeTime, firstReadTime);
}
while (consumers.MoveNext());
}
break;
@ -164,11 +127,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
var clusterId = (string)payload[0];
var routeId = (string)payload[1];
var destinationId = (string)payload[2];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnProxyInvoke(eventData.TimeStamp, clusterId, routeId, destinationId);
consumer.OnProxyInvoke(eventData.TimeStamp, clusterId, routeId, destinationId);
}
while (consumers.MoveNext());
}
break;
}
@ -176,6 +138,11 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
private void OnEventCounters(EventWrittenEventArgs eventData)
{
if (MetricsConsumers is null)
{
return;
}
Debug.Assert(eventData.EventName == "EventCounters" && eventData.Payload.Count == 1);
var counters = (IDictionary<string, object>)eventData.Payload[0];
@ -221,14 +188,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
_previousMetrics = metrics;
_currentMetrics = new ProxyMetrics();
if (previous is null || _serviceProvider is null)
if (previous is null)
{
return;
}
try
{
foreach (var consumer in _serviceProvider.GetServices<IProxyMetricsConsumer>())
foreach (var consumer in MetricsConsumers)
{
consumer.OnProxyMetrics(previous, metrics);
}
@ -236,7 +203,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
catch (Exception ex)
{
// We can't let an uncaught exception propagate as that would crash the process
_logger.LogError(ex, $"Uncaught exception occured while processing {nameof(ProxyMetrics)}.");
Logger.LogError(ex, $"Uncaught exception occured while processing {nameof(ProxyMetrics)}.");
}
}
}

Просмотреть файл

@ -16,13 +16,13 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="address">Socket address we are connecting to.</param>
void OnConnectStart(DateTime timestamp, string address);
void OnConnectStart(DateTime timestamp, string address) { }
/// <summary>
/// Called after a Socket connect.
/// </summary>
/// <param name="timestamp">Timestamp when the event was fired.</param>
void OnConnectStop(DateTime timestamp);
void OnConnectStop(DateTime timestamp) { }
/// <summary>
/// Called before <see cref="OnConnectStop(DateTime)"/> if the connect failed.
@ -30,6 +30,6 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
/// <param name="timestamp">Timestamp when the event was fired.</param>
/// <param name="error"><see cref="SocketError"/> information for the connect failure.</param>
/// <param name="exceptionMessage">Exception information for the connect failure.</param>
void OnConnectFailed(DateTime timestamp, SocketError error, string exceptionMessage);
void OnConnectFailed(DateTime timestamp, SocketError error, string exceptionMessage) { }
}
}

Просмотреть файл

@ -6,44 +6,21 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Yarp.ReverseProxy.Telemetry.Consumption
{
internal sealed class SocketsEventListenerService : EventListener, IHostedService
internal sealed class SocketsEventListenerService : EventListenerService<SocketsEventListenerService, ISocketsTelemetryConsumer, ISocketsMetricsConsumer>
{
private readonly ILogger<SocketsEventListenerService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private SocketsMetrics _previousMetrics;
private SocketsMetrics _currentMetrics = new();
private int _eventCountersCount;
public SocketsEventListenerService(ILogger<SocketsEventListenerService> logger, IServiceProvider serviceProvider, IHttpContextAccessor httpContextAccessor)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
}
protected override string EventSourceName => "System.Net.Sockets";
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "System.Net.Sockets")
{
var arguments = new Dictionary<string, string> { { "EventCounterIntervalSec", MetricsOptions.Interval.TotalSeconds.ToString() } };
EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.None, arguments);
}
}
public SocketsEventListenerService(ILogger<SocketsEventListenerService> logger, IEnumerable<ISocketsTelemetryConsumer> telemetryConsumers, IEnumerable<ISocketsMetricsConsumer> metricsConsumers)
: base(logger, telemetryConsumers, metricsConsumers)
{ }
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
@ -60,15 +37,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
return;
}
var context = _httpContextAccessor?.HttpContext;
if (context is null)
{
return;
}
using var consumers = context.RequestServices.GetServices<ISocketsTelemetryConsumer>().GetEnumerator();
if (!consumers.MoveNext())
if (TelemetryConsumers is null)
{
return;
}
@ -81,22 +50,20 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
Debug.Assert(eventData.EventName == "ConnectStart" && payload.Count == 1);
{
var address = (string)payload[0];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnConnectStart(eventData.TimeStamp, address);
consumer.OnConnectStart(eventData.TimeStamp, address);
}
while (consumers.MoveNext());
}
break;
case 2:
Debug.Assert(eventData.EventName == "ConnectStop" && payload.Count == 0);
{
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnConnectStop(eventData.TimeStamp);
consumer.OnConnectStop(eventData.TimeStamp);
}
while (consumers.MoveNext());
}
break;
@ -105,11 +72,10 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
{
var error = (SocketError)payload[0];
var exceptionMessage = (string)payload[1];
do
foreach (var consumer in TelemetryConsumers)
{
consumers.Current.OnConnectFailed(eventData.TimeStamp, error, exceptionMessage);
consumer.OnConnectFailed(eventData.TimeStamp, error, exceptionMessage);
}
while (consumers.MoveNext());
}
break;
}
@ -117,6 +83,11 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
private void OnEventCounters(EventWrittenEventArgs eventData)
{
if (MetricsConsumers is null)
{
return;
}
Debug.Assert(eventData.EventName == "EventCounters" && eventData.Payload.Count == 1);
var counters = (IDictionary<string, object>)eventData.Payload[0];
@ -170,14 +141,14 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
_previousMetrics = metrics;
_currentMetrics = new SocketsMetrics();
if (previous is null || _serviceProvider is null)
if (previous is null)
{
return;
}
try
{
foreach (var consumer in _serviceProvider.GetServices<ISocketsMetricsConsumer>())
foreach (var consumer in MetricsConsumers)
{
consumer.OnSocketsMetrics(previous, metrics);
}
@ -185,7 +156,7 @@ namespace Yarp.ReverseProxy.Telemetry.Consumption
catch (Exception ex)
{
// We can't let an uncaught exception propagate as that would crash the process
_logger.LogError(ex, $"Uncaught exception occured while processing {nameof(SocketsMetrics)}.");
Logger.LogError(ex, $"Uncaught exception occured while processing {nameof(SocketsMetrics)}.");
}
}
}

Просмотреть файл

@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Yarp.ReverseProxy.Telemetry.Consumption;
namespace Microsoft.Extensions.DependencyInjection
@ -9,92 +11,137 @@ namespace Microsoft.Extensions.DependencyInjection
{
#if NET5_0
/// <summary>
/// Shortcut for registering all (Proxy, Kestrel, Http, NameResolution, NetSecurity and Sockets) telemetry listeners.
/// Registers all telemetry listeners (Proxy, Kestrel, Http, NameResolution, NetSecurity and Sockets).
/// </summary>
#else
/// <summary>
/// Shortcut for registering all (Proxy and Kestrel) telemetry listeners.
/// Registers all telemetry listeners (Proxy and Kestrel).
/// </summary>
#endif
public static IServiceCollection AddTelemetryListeners(this IServiceCollection services)
{
services.AddProxyTelemetryListener();
services.AddKestrelTelemetryListener();
#if NET5_0
services.AddHttpTelemetryListener();
services.AddNameResolutionTelemetryListener();
services.AddNetSecurityTelemetryListener();
services.AddSocketsTelemetryListener();
#endif
return services;
}
/// <summary>
/// Registers a service responsible for calling <see cref="IProxyTelemetryConsumer"/>s and <see cref="IProxyMetricsConsumer"/>s.
/// </summary>
public static IServiceCollection AddProxyTelemetryListener(this IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddHostedService<ProxyEventListenerService>();
return services;
}
#if NET5_0
/// <summary>
/// Registers a service responsible for calling <see cref="IKestrelTelemetryConsumer"/>s and <see cref="IKestrelMetricsConsumer"/>s.
/// </summary>
#else
/// <summary>
/// Registers a service responsible for calling <see cref="IKestrelTelemetryConsumer"/>s
/// </summary>
#endif
public static IServiceCollection AddKestrelTelemetryListener(this IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddHostedService<KestrelEventListenerService>();
#if NET5_0
services.AddHostedService<HttpEventListenerService>();
services.AddHostedService<NameResolutionEventListenerService>();
services.AddHostedService<NetSecurityEventListenerService>();
services.AddHostedService<SocketsEventListenerService>();
#endif
return services;
}
/// <summary>
/// Registers a consumer singleton for every I*TelemetryConsumer interface it implements.
/// </summary>
public static IServiceCollection AddTelemetryConsumer(this IServiceCollection services, object consumer)
{
var implementsAny = false;
if (consumer is IProxyTelemetryConsumer)
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(IProxyTelemetryConsumer), consumer));
implementsAny = true;
}
if (consumer is IKestrelTelemetryConsumer)
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(IKestrelTelemetryConsumer), consumer));
implementsAny = true;
}
#if NET5_0
/// <summary>
/// Registers a service responsible for calling <see cref="IHttpTelemetryConsumer"/>s and <see cref="IHttpMetricsConsumer"/>s.
/// </summary>
public static IServiceCollection AddHttpTelemetryListener(this IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddHostedService<HttpEventListenerService>();
return services;
}
if (consumer is IHttpTelemetryConsumer)
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(IHttpTelemetryConsumer), consumer));
implementsAny = true;
}
/// <summary>
/// Registers a service responsible for calling <see cref="INameResolutionTelemetryConsumer"/>s and <see cref="INameResolutionMetricsConsumer"/>s.
/// </summary>
public static IServiceCollection AddNameResolutionTelemetryListener(this IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddHostedService<NameResolutionEventListenerService>();
return services;
}
if (consumer is INameResolutionTelemetryConsumer)
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(INameResolutionTelemetryConsumer), consumer));
implementsAny = true;
}
/// <summary>
/// Registers a service responsible for calling <see cref="INetSecurityTelemetryConsumer"/>s and <see cref="INetSecurityMetricsConsumer"/>s.
/// </summary>
public static IServiceCollection AddNetSecurityTelemetryListener(this IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddHostedService<NetSecurityEventListenerService>();
return services;
}
if (consumer is INetSecurityTelemetryConsumer)
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(INetSecurityTelemetryConsumer), consumer));
implementsAny = true;
}
/// <summary>
/// Registers a service responsible for calling <see cref="ISocketsTelemetryConsumer"/>s and <see cref="ISocketsMetricsConsumer"/>s.
/// </summary>
public static IServiceCollection AddSocketsTelemetryListener(this IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddHostedService<SocketsEventListenerService>();
return services;
}
if (consumer is ISocketsTelemetryConsumer)
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(ISocketsTelemetryConsumer), consumer));
implementsAny = true;
}
#endif
if (!implementsAny)
{
throw new ArgumentException("The consumer must implement at least one I*TelemetryConsumer interface.", nameof(consumer));
}
services.AddTelemetryListeners();
return services;
}
/// <summary>
/// Registers a <typeparamref name="TConsumer"/> singleton for every I*TelemetryConsumer interface it implements.
/// </summary>
public static IServiceCollection AddTelemetryConsumer<TConsumer>(this IServiceCollection services)
where TConsumer : class
{
var implementsAny = false;
if (typeof(IProxyTelemetryConsumer).IsAssignableFrom(typeof(TConsumer)))
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(IProxyTelemetryConsumer), provider => provider.GetRequiredService<TConsumer>(), ServiceLifetime.Singleton));
implementsAny = true;
}
if (typeof(IKestrelTelemetryConsumer).IsAssignableFrom(typeof(TConsumer)))
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(IKestrelTelemetryConsumer), provider => provider.GetRequiredService<TConsumer>(), ServiceLifetime.Singleton));
implementsAny = true;
}
#if NET5_0
if (typeof(IHttpTelemetryConsumer).IsAssignableFrom(typeof(TConsumer)))
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(IHttpTelemetryConsumer), provider => provider.GetRequiredService<TConsumer>(), ServiceLifetime.Singleton));
implementsAny = true;
}
if (typeof(INameResolutionTelemetryConsumer).IsAssignableFrom(typeof(TConsumer)))
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(INameResolutionTelemetryConsumer), provider => provider.GetRequiredService<TConsumer>(), ServiceLifetime.Singleton));
implementsAny = true;
}
if (typeof(INetSecurityTelemetryConsumer).IsAssignableFrom(typeof(TConsumer)))
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(INetSecurityTelemetryConsumer), provider => provider.GetRequiredService<TConsumer>(), ServiceLifetime.Singleton));
implementsAny = true;
}
if (typeof(ISocketsTelemetryConsumer).IsAssignableFrom(typeof(TConsumer)))
{
services.TryAddEnumerable(new ServiceDescriptor(typeof(ISocketsTelemetryConsumer), provider => provider.GetRequiredService<TConsumer>(), ServiceLifetime.Singleton));
implementsAny = true;
}
#endif
if (!implementsAny)
{
throw new ArgumentException("TConsumer must implement at least one I*TelemetryConsumer interface.", nameof(TConsumer));
}
services.TryAddSingleton<TConsumer>();
services.AddTelemetryListeners();
return services;
}
}
}

Просмотреть файл

@ -11,6 +11,7 @@ using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
@ -26,34 +27,11 @@ namespace Yarp.ReverseProxy
[Fact]
public async Task TelemetryConsumptionWorks()
{
var consumers = new ConcurrentBag<TelemetryConsumer>();
var consumer = new TelemetryConsumer();
var test = new TestEnvironment(
async context =>
{
await context.Response.WriteAsync("Foo");
},
proxyBuilder =>
{
var services = proxyBuilder.Services;
services.AddScoped(services =>
{
var consumer = new TelemetryConsumer();
consumers.Add(consumer);
return consumer;
});
services.AddScoped<IProxyTelemetryConsumer>(services => services.GetRequiredService<TelemetryConsumer>());
services.AddScoped<IKestrelTelemetryConsumer>(services => services.GetRequiredService<TelemetryConsumer>());
#if NET5_0
services.AddScoped<IHttpTelemetryConsumer>(services => services.GetRequiredService<TelemetryConsumer>());
services.AddScoped<ISocketsTelemetryConsumer>(services => services.GetRequiredService<TelemetryConsumer>());
services.AddScoped<INetSecurityTelemetryConsumer>(services => services.GetRequiredService<TelemetryConsumer>());
services.AddScoped<INameResolutionTelemetryConsumer>(services => services.GetRequiredService<TelemetryConsumer>());
#endif
services.AddTelemetryListeners();
},
async context => await context.Response.WriteAsync("Foo"),
proxyBuilder => proxyBuilder.Services.AddTelemetryConsumer(consumer),
proxyApp => { },
useHttpsOnDestination: true);
@ -65,7 +43,7 @@ namespace Yarp.ReverseProxy
await httpClient.GetStringAsync(uri);
});
var stages = Assert.Single(consumers, c => c.ClusterId == test.ClusterId).Stages;
Assert.True(consumer.PerClusterTelemetry.TryGetValue(test.ClusterId, out var stages));
var expected = new[]
{
@ -101,6 +79,50 @@ namespace Yarp.ReverseProxy
}
}
#if NET5_0
[Fact]
public async Task NonProxyTelemetryConsumptionWorks()
{
var consumer = new TelemetryConsumer();
var test = new TestEnvironment(
async context => await context.Response.WriteAsync("Foo"),
proxyBuilder => proxyBuilder.Services.AddTelemetryConsumer(consumer),
proxyApp => { },
useHttpsOnDestination: true);
var path = $"/{Guid.NewGuid()}";
await test.Invoke(async uri =>
{
using var httpClient = new HttpClient();
await httpClient.GetStringAsync($"{uri.TrimEnd('/')}{path}");
});
Assert.True(consumer.PerPathAndQueryTelemetry.TryGetValue(path, out var stages));
var expected = new[]
{
"OnRequestStart",
"OnConnectStart",
"OnConnectStop",
"OnConnectionEstablished",
"OnRequestHeadersStart",
"OnRequestHeadersStop",
"OnResponseHeadersStart",
"OnResponseHeadersStop",
"OnRequestStop"
};
Assert.Equal(expected, stages.Select(s => s.Stage).ToArray());
for (var i = 1; i < stages.Count; i++)
{
Assert.True(stages[i - 1].Timestamp <= stages[i].Timestamp);
}
}
#endif
private sealed class TelemetryConsumer :
IProxyTelemetryConsumer,
IKestrelTelemetryConsumer
@ -112,15 +134,18 @@ namespace Yarp.ReverseProxy
ISocketsTelemetryConsumer
#endif
{
public string ClusterId { get; set; }
public readonly ConcurrentDictionary<string, List<(string Stage, DateTime Timestamp)>> PerClusterTelemetry = new();
public readonly ConcurrentDictionary<string, List<(string Stage, DateTime Timestamp)>> PerPathAndQueryTelemetry = new();
public readonly List<(string Stage, DateTime Timestamp)> Stages = new List<(string, DateTime)>(16);
private readonly AsyncLocal<List<(string Stage, DateTime Timestamp)>> _stages = new();
private void AddStage(string stage, DateTime timestamp)
{
lock (Stages)
var stages = _stages.Value ??= new List<(string Stage, DateTime Timestamp)>();
lock (stages)
{
Stages.Add((stage, timestamp));
stages.Add((stage, timestamp));
}
}
@ -132,11 +157,15 @@ namespace Yarp.ReverseProxy
public void OnContentTransferred(DateTime timestamp, bool isRequest, long contentLength, long iops, TimeSpan readTime, TimeSpan writeTime, TimeSpan firstReadTime) => AddStage(nameof(OnContentTransferred), timestamp);
public void OnProxyInvoke(DateTime timestamp, string clusterId, string routeId, string destinationId)
{
ClusterId = clusterId;
AddStage(nameof(OnProxyInvoke), timestamp);
PerClusterTelemetry.TryAdd(clusterId, _stages.Value);
}
#if NET5_0
public void OnRequestStart(DateTime timestamp, string scheme, string host, int port, string pathAndQuery, int versionMajor, int versionMinor, HttpVersionPolicy versionPolicy) => AddStage(nameof(OnRequestStart), timestamp);
public void OnRequestStart(DateTime timestamp, string scheme, string host, int port, string pathAndQuery, int versionMajor, int versionMinor, HttpVersionPolicy versionPolicy)
{
AddStage(nameof(OnRequestStart), timestamp);
PerPathAndQueryTelemetry.TryAdd(pathAndQuery, _stages.Value);
}
public void OnRequestStop(DateTime timestamp) => AddStage(nameof(OnRequestStop), timestamp);
public void OnRequestFailed(DateTime timestamp) => AddStage(nameof(OnRequestFailed), timestamp);
public void OnConnectionEstablished(DateTime timestamp, int versionMajor, int versionMinor) => AddStage(nameof(OnConnectionEstablished), timestamp);

Просмотреть файл

@ -94,7 +94,7 @@ namespace Yarp.ReverseProxy.Sample
services.AddHttpContextAccessor();
services.AddSingleton<IProxyMetricsConsumer, ProxyMetricsConsumer>();
services.AddScoped<IProxyTelemetryConsumer, ProxyTelemetryConsumer>();
services.AddProxyTelemetryListener();
services.AddTelemetryListeners();
}
/// <summary>