We now have useful metrics for direct tcp transport clients

This commit is contained in:
David.Noble@microsoft.com 2019-07-22 22:25:54 -07:00
Родитель eda076012f
Коммит 188cd290ea
5 изменённых файлов: 100 добавлений и 71 удалений

Просмотреть файл

@ -129,16 +129,10 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
this.throwIfClosed();
final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, physicalAddress);
if (logger.isDebugEnabled()) {
requestArgs.traceOperation(logger, null, "invokeStoreAsync");
logger.debug("\n [{}]\n {}\n INVOKE_STORE_ASYNC", this, requestArgs);
}
requestArgs.traceOperation(logger, null, "invokeStoreAsync");
final RntbdEndpoint endpoint = this.endpointProvider.get(physicalAddress);
final RntbdMetrics metrics = new RntbdMetrics(this, endpoint);
metrics.markRequestStart();
final RntbdRequestRecord record = endpoint.request(requestArgs);
return Single.fromEmitter((SingleEmitter<StoreResponse> emitter) -> {
@ -146,7 +140,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
record.whenComplete((response, error) -> {
requestArgs.traceOperation(logger, null, "emitSingle", response, error);
metrics.markRequestComplete(record);
metrics.markComplete(record);
if (error == null) {
emitter.onSuccess(response);

Просмотреть файл

@ -43,9 +43,11 @@ public interface RntbdEndpoint extends AutoCloseable {
// region Accessors
int acquiredChannels();
int channelsAcquired();
int availableChannels();
int channelsAvailable();
int concurrentRequests();
long id();

Просмотреть файл

@ -26,29 +26,29 @@ package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.dropwizard.DropwizardConfig;
import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry;
import io.micrometer.core.instrument.search.Search;
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import io.micrometer.core.lang.Nullable;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@JsonPropertyOrder({
"concurrentRequests", "requests", "responseErrors", "responseSuccesses", "completionRate", "responseRate",
"tags", "concurrentRequests", "requests", "responseErrors", "responseSuccesses", "completionRate", "responseRate",
"channelsAcquired", "channelsAvailable", "requestQueueLength", "usedDirectMemory", "usedHeapMemory"
})
public final class RntbdMetrics {
@ -59,18 +59,12 @@ public final class RntbdMetrics {
private static final String prefix = "cosmos.directTcp.";
private static MeterRegistry consoleLoggingRegistry;
private final DistributionSummary channelsAcquired;
private final DistributionSummary channelsAvailable;
private final AtomicInteger concurrentRequestCount = new AtomicInteger();
private final DistributionSummary concurrentRequests;
private final RntbdEndpoint endpoint;
private final DistributionSummary requestQueueLength;
private final Timer requests;
private final Timer responseErrors;
private final Timer responseSuccesses;
private final List<Tag> tags;
private final DistributionSummary usedDirectMemory;
private final DistributionSummary usedHeapMemory;
private final Tags tags;
static {
if (Boolean.getBoolean("cosmos.directTcp.consoleMetricsReporter.enabled")) {
@ -84,41 +78,73 @@ public final class RntbdMetrics {
public RntbdMetrics(RntbdTransportClient client, RntbdEndpoint endpoint) {
this.tags = ImmutableList.of(client.tag(), endpoint.tag());
this.endpoint = endpoint;
this.tags = Tags.of(client.tag(), endpoint.tag());
this.concurrentRequests = registry.summary(nameOf("concurrentRequests"), tags);
this.requests = registry.timer(nameOf("requests"), tags);
this.responseErrors = registry.timer(nameOf("responseErrors"), tags);
this.responseSuccesses = registry.timer(nameOf("responseSuccesses"), tags);
this.channelsAcquired = registry.summary(nameOf("channelsAcquired"), tags);
this.channelsAvailable = registry.summary(nameOf("channelsAvailable"), tags);
this.requestQueueLength = registry.summary(nameOf("requestQueueLength"), tags);
this.usedDirectMemory = registry.summary(nameOf("usedDirectMemory"), tags);
this.usedHeapMemory = registry.summary(nameOf("usedHeapMemory"), tags);
final Timer responseSuccesses = this.responseSuccesses;
final Timer requests = this.requests;
registry.gauge(nameOf("completionRate"), tags, this, RntbdMetrics::completionRate);
registry.gauge(nameOf("responseRate"), tags, this, RntbdMetrics::responseRate);
Gauge.builder(nameOf("concurrentRequests"), endpoint, RntbdEndpoint::concurrentRequests)
.description("executing or queued request count")
.tags(this.tags)
.register(registry);
this.endpoint = endpoint;
Gauge.builder(nameOf("requestQueueLength"), endpoint, RntbdEndpoint::requestQueueLength)
.description("queued request count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("completionRate"), endpoint, x -> responseSuccesses.count() / (double)requests.count())
.description("successful (non-error) responses / total responses")
.baseUnit("%")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("responseRate"), endpoint, x -> responseSuccesses.count() / (double)(requests.count() + x.concurrentRequests()))
.description("successful (non-error) responses / total requests")
.baseUnit("%")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("channelsAcquired"), endpoint, RntbdEndpoint::channelsAcquired)
.description("acquired channel count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("channelsAvailable"), endpoint, RntbdEndpoint::channelsAvailable)
.description("available channel count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("usedDirectMemory"), endpoint, x -> (double)x.usedDirectMemory() / (1024L * 1024L))
.description("Java direct memory usage")
.baseUnit("MiB")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("usedHeapMemory"), endpoint, x -> (double)x.usedHeapMemory() / (1024L * 1024L))
.description("Java heap memory usage")
.baseUnit("MiB")
.tags(this.tags)
.register(registry);
}
// endregion
// region Accessors
public static void add(MeterRegistry registry) {
RntbdMetrics.registry.add(registry);
@JsonProperty
public int channelsAcquired() {
return this.endpoint.channelsAcquired();
}
@JsonProperty
public Iterable<Measurement> channelsAcquired() {
return this.channelsAcquired.measure();
}
@JsonProperty
public Iterable<Measurement> channelsAvailable() {
return this.channelsAvailable.measure();
public int channelsAvailable() {
return this.endpoint.channelsAvailable();
}
/***
@ -131,10 +157,11 @@ public final class RntbdMetrics {
}
@JsonProperty
public Iterable<Measurement> concurrentRequests() {
return this.concurrentRequests.measure();
public long concurrentRequests() {
return this.endpoint.concurrentRequests();
}
@JsonIgnore
public static synchronized MeterRegistry consoleLoggingRegistry() {
if (consoleLoggingRegistry == null) {
@ -147,7 +174,7 @@ public final class RntbdMetrics {
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
consoleReporter.start(1, TimeUnit.SECONDS);
consoleReporter.start(Long.getLong("cosmos.directTcp.consoleMetricsReporter.period", 30), TimeUnit.SECONDS);
DropwizardConfig dropwizardConfig = new DropwizardConfig() {
@ -178,8 +205,8 @@ public final class RntbdMetrics {
}
@JsonProperty
public Iterable<Measurement> requestQueueLength() {
return this.requestQueueLength.measure();
public int requestQueueLength() {
return this.endpoint.requestQueueLength();
}
@JsonProperty
@ -197,7 +224,7 @@ public final class RntbdMetrics {
*/
@JsonProperty
public double responseRate() {
return this.responseSuccesses.count() / (double)(this.requests.count() + this.concurrentRequests.count());
return this.responseSuccesses.count() / (double)(this.requests.count() + this.endpoint.concurrentRequests());
}
@JsonProperty
@ -211,28 +238,25 @@ public final class RntbdMetrics {
}
@JsonProperty
public Iterable<Measurement> usedDirectMemory() {
return this.usedDirectMemory.measure();
public long usedDirectMemory() {
return this.endpoint.usedDirectMemory();
}
@JsonProperty
public Iterable<Measurement> usedHeapMemory() {
return this.usedHeapMemory.measure();
public long usedHeapMemory() {
return this.endpoint.usedHeapMemory();
}
// endregion
// region Methods
public void markRequestComplete(RntbdRequestRecord record) {
record.stop(this.requests, record.isCompletedExceptionally() ? this.responseErrors : this.responseSuccesses);
this.concurrentRequests.record(this.concurrentRequestCount.decrementAndGet());
this.takeChannelPoolMeasurements();
public static void add(MeterRegistry registry) {
RntbdMetrics.registry.add(registry);
}
public void markRequestStart() {
this.concurrentRequests.record(this.concurrentRequestCount.incrementAndGet());
this.takeChannelPoolMeasurements();
public void markComplete(RntbdRequestRecord record) {
record.stop(this.requests, record.isCompletedExceptionally() ? this.responseErrors : this.responseSuccesses);
}
@Override
@ -248,13 +272,5 @@ public final class RntbdMetrics {
return prefix + member;
}
private void takeChannelPoolMeasurements() {
this.channelsAcquired.record(this.endpoint.acquiredChannels());
this.channelsAvailable.record(this.endpoint.availableChannels());
this.requestQueueLength.record(this.endpoint.requestQueueLength());
this.usedDirectMemory.record(this.endpoint.usedDirectMemory());
this.usedHeapMemory.record(this.endpoint.usedHeapMemory());
}
// endregion
}

Просмотреть файл

@ -53,6 +53,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
@ -72,6 +73,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
private final RntbdClientChannelPool channelPool;
private final AtomicBoolean closed;
private final AtomicInteger concurrentRequests;
private final long id;
private final SocketAddress remoteAddress;
private final RntbdRequestTimer requestTimer;
@ -95,6 +97,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
this.channelPool = new RntbdClientChannelPool(bootstrap, config);
this.closed = new AtomicBoolean();
this.concurrentRequests = new AtomicInteger();
this.remoteAddress = bootstrap.config().remoteAddress();
this.requestTimer = timer;
@ -107,15 +110,20 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
// region Accessors
@Override
public int acquiredChannels() {
public int channelsAcquired() {
return this.channelPool.channelsAcquired();
}
@Override
public int availableChannels() {
public int channelsAvailable() {
return this.channelPool.channelsAvailable();
}
@Override
public int concurrentRequests() {
return this.concurrentRequests.get();
}
@Override
public long id() {
return this.id;
@ -165,6 +173,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
public RntbdRequestRecord request(final RntbdRequestArgs args) {
this.throwIfClosed();
this.concurrentRequests.incrementAndGet();
if (logger.isDebugEnabled()) {
args.traceOperation(logger, null, "request");
@ -183,6 +192,8 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
logger.debug("\n [{}]\n {}\n request failed due to ", this, args, error);
}
this.concurrentRequests.decrementAndGet();
});
return record;
@ -280,6 +291,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
generator.writeStartObject();
generator.writeNumberField("id", value.id);
generator.writeBooleanField("isClosed", value.isClosed());
generator.writeNumberField("concurrentRequests", value.concurrentRequests());
generator.writeStringField("remoteAddress", value.remoteAddress.toString());
generator.writeObjectField("channelPool", value.channelPool);
generator.writeEndObject();

Просмотреть файл

@ -861,12 +861,17 @@ public final class RntbdTransportClientTest {
// region Accessors
@Override
public int acquiredChannels() {
public int channelsAcquired() {
return 0;
}
@Override
public int availableChannels() {
public int channelsAvailable() {
return 0;
}
@Override
public int concurrentRequests() {
return 0;
}