Wired up RntbdClientChannelHealthChecker

This commit is contained in:
David.Noble@microsoft.com 2019-06-27 20:45:15 -07:00
Родитель 282e7dbe3e
Коммит baff695354
9 изменённых файлов: 255 добавлений и 67 удалений

Просмотреть файл

@ -198,9 +198,10 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// region Fields
private final String certificateHostNameOverride;
private final Duration connectionTimeout;
private final Duration idleTimeout;
private final int maxChannelsPerEndpoint;
private final int maxRequestsPerChannel;
private final Duration connectionTimeout;
private final int partitionCount;
private final Duration receiveHangDetectionTime;
private final Duration requestTimeout;
@ -214,12 +215,13 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
private Options(Builder builder) {
this.certificateHostNameOverride = builder.certificateHostNameOverride;
this.connectionTimeout = builder.connectionTimeout == null ? builder.requestTimeout : builder.connectionTimeout;
this.idleTimeout = builder.idleTimeout;
this.maxChannelsPerEndpoint = builder.maxChannelsPerEndpoint;
this.maxRequestsPerChannel = builder.maxRequestsPerChannel;
this.connectionTimeout = builder.connectionTimeout == null ? builder.requestTimeout : builder.connectionTimeout;
this.partitionCount = builder.partitionCount;
this.requestTimeout = builder.requestTimeout;
this.receiveHangDetectionTime = builder.receiveHangDetectionTime;
this.requestTimeout = builder.requestTimeout;
this.sendHangDetectionTime = builder.sendHangDetectionTime;
this.userAgent = builder.userAgent;
}
@ -228,39 +230,43 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// region Accessors
public String getCertificateHostNameOverride() {
public String certificateHostNameOverride() {
return this.certificateHostNameOverride;
}
public int getMaxChannelsPerEndpoint() {
return this.maxChannelsPerEndpoint;
}
public int getMaxRequestsPerChannel() {
return this.maxRequestsPerChannel;
}
public Duration getConnectionTimeout() {
public Duration connectionTimeout() {
return this.connectionTimeout;
}
public int getPartitionCount() {
public Duration idleTimeout() {
return this.idleTimeout;
}
public int maxChannelsPerEndpoint() {
return this.maxChannelsPerEndpoint;
}
public int maxRequestsPerChannel() {
return this.maxRequestsPerChannel;
}
public int partitionCount() {
return this.partitionCount;
}
public Duration getReceiveHangDetectionTime() {
public Duration receiveHangDetectionTime() {
return this.receiveHangDetectionTime;
}
public Duration getRequestTimeout() {
public Duration requestTimeout() {
return this.requestTimeout;
}
public Duration getSendHangDetectionTime() {
public Duration sendHangDetectionTime() {
return this.sendHangDetectionTime;
}
public UserAgentContainer getUserAgent() {
public UserAgentContainer userAgent() {
return this.userAgent;
}
@ -290,10 +296,10 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
private String certificateHostNameOverride = null;
// Optional parameters
private Duration connectionTimeout = null;
private Duration idleTimeout = Duration.ZERO;
private int maxChannelsPerEndpoint = 10;
private int maxRequestsPerChannel = 30;
private Duration connectionTimeout = null;
private int partitionCount = 1;
private Duration receiveHangDetectionTime = SIXTY_FIVE_SECONDS;
private Duration requestTimeout;
@ -331,9 +337,9 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
return this;
}
public Builder maxRequestsPerChannel(final int value) {
checkArgument(value > 0, "value: %s", value);
this.maxRequestsPerChannel = value;
public Builder idleTimeout(final Duration value) {
checkNotNull(value, "value: null");
this.idleTimeout = value;
return this;
}
@ -343,6 +349,12 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
return this;
}
public Builder maxRequestsPerChannel(final int value) {
checkArgument(value > 0, "value: %s", value);
this.maxRequestsPerChannel = value;
return this;
}
public Builder partitionCount(final int value) {
checkArgument(value > 0, "value: %s", value);
this.partitionCount = value;

Просмотреть файл

@ -112,24 +112,24 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple
checkNotNull(channel);
final RntbdRequestManager requestManager = new RntbdRequestManager(this.config.getMaxRequestsPerChannel());
final long readerIdleTime = this.config.getReceiveHangDetectionTime();
final long writerIdleTime = this.config.getSendHangDetectionTime();
final RntbdRequestManager requestManager = new RntbdRequestManager(this.config.maxRequestsPerChannel());
final long readerIdleTime = this.config.receiveHangDetectionTime();
final long writerIdleTime = this.config.sendHangDetectionTime();
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(
new RntbdContextNegotiator(requestManager, this.config.getUserAgent()),
new RntbdContextNegotiator(requestManager, this.config.userAgent()),
new RntbdResponseDecoder(),
new RntbdRequestEncoder(),
new WriteTimeoutHandler(writerIdleTime, TimeUnit.NANOSECONDS),
requestManager
);
if (this.config.getWireLogLevel() != null) {
pipeline.addFirst(new LoggingHandler(this.config.getWireLogLevel()));
if (this.config.wireLogLevel() != null) {
pipeline.addFirst(new LoggingHandler(this.config.wireLogLevel()));
}
final SSLEngine sslEngine = this.config.getSslContext().newEngine(channel.alloc());
final SSLEngine sslEngine = this.config.sslContext().newEngine(channel.alloc());
pipeline.addFirst(
new ReadTimeoutHandler(readerIdleTime, TimeUnit.NANOSECONDS),

Просмотреть файл

@ -0,0 +1,151 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdReporter.reportIssueUnless;
final class RntbdClientChannelHealthChecker implements ChannelHealthChecker {
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHealthChecker.class);
private static final long NANOS_PER_SECOND = 1_000_000_000L;
// A channel will be declared healthy if data was read recently as defined by this value.
private static final long recentReadWindow = NANOS_PER_SECOND;
// A channel should not be declared unhealthy if a write succeeded recently. As such gaps between
// Timestamps.lastChannelWrite and Timestamps.lastChannelRead lower than this value are ignored.
// Guidance: The grace period should be large enough to accommodate the round trip time of the slowest server
// request. Assuming 1s of network RTT, a 2 MB request, a 2 MB response, a connection that can sustain 1 MB/s
// both ways, and a 5-second deadline at the server, 10 seconds should be enough.
private static final long readHangGracePeriod = 10L * NANOS_PER_SECOND;
// A channel will not be declared unhealthy if a write was attempted recently. As such gaps between
// Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite lower than this value are ignored.
// Guidance: The grace period should be large enough to accommodate slow writes. For example, a value of 2s requires
// that the client can sustain data rates of at least 1 MB/s when writing 2 MB documents.
private static final long writeHangGracePeriod = 2L * NANOS_PER_SECOND;
// A channel is considered idle if:
// idleConnectionTimeout > 0L && System.nanoTime() - Timestamps.lastChannelRead() >= idleConnectionTimeout
private final long idleConnectionTimeout;
// A channel will be declared unhealthy if the gap between Timestamps.lastChannelWrite and Timestamps.lastChannelRead
// grows beyond this value.
// Constraint: readDelayLimit > readHangGracePeriod
private final long readDelayLimit;
// A channel will be declared unhealthy if the gap between Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite
// grows beyond this value.
// Constraint: writeDelayLimit > writeHangGracePeriod
private final long writeDelayLimit;
public RntbdClientChannelHealthChecker(RntbdEndpoint.Config config) {
this.idleConnectionTimeout = config.idleConnectionTimeout();
this.readDelayLimit = config.receiveHangDetectionTime();
checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit);
this.writeDelayLimit = config.sendHangDetectionTime();
checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit);
}
public Future<Boolean> isHealthy(Channel channel) {
checkNotNull(channel);
RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
Promise<Boolean> promise = channel.eventLoop().newPromise();
if (requestManager == null) {
reportIssueUnless(!channel.isActive(), logger, channel, "{} active with no request manager");
return promise.setSuccess(Boolean.FALSE);
}
Timestamps timestamps = requestManager.timestamps();
long currentTime = System.nanoTime();
if (currentTime - timestamps.lastChannelRead() < recentReadWindow) {
return promise.setSuccess(Boolean.TRUE); // because we recently received data
}
// Black hole detection, part 1:
// Treat the channel as unhealthy if the gap between the last attempted write and the last successful write
// grew beyond acceptable limits, unless a write was attempted recently. This is a sign of a hung write.
if (timestamps.lastChannelWriteAttempt() - timestamps.lastChannelWrite() > this.writeDelayLimit && currentTime - timestamps.lastChannelWriteAttempt() > writeHangGracePeriod) {
logger.warn("{} health check failed due to a hung write: {lastChannelWriteAttempt: {}, lastChannelWrite: {}, writeDelayLimit: {}}",
channel, timestamps.lastChannelWriteAttempt(), timestamps.lastChannelWrite(), this.writeDelayLimit);
return promise.setSuccess(Boolean.FALSE);
}
// Black hole detection, part 2:
// Treat the connection as unhealthy if the gap between the last successful write and the last successful read
// grew beyond acceptable limits, unless a write succeeded recently.
if (timestamps.lastChannelWrite() - timestamps.lastChannelRead() > this.readDelayLimit && currentTime - timestamps.lastChannelWrite() > readHangGracePeriod) {
logger.warn("{} health check failed due to response lag: {lastWriteTime: {}, lastReadTime: {}. readDelayLimit: {}}",
channel, timestamps.lastChannelWrite(), timestamps.lastChannelRead(), this.readDelayLimit);
return promise.setSuccess(Boolean.FALSE);
}
if (this.idleConnectionTimeout > 0L) {
if (currentTime - timestamps.lastChannelRead() > this.idleConnectionTimeout) {
return promise.setSuccess(Boolean.FALSE);
}
}
channel.writeAndFlush(RntbdHealthCheckRequest.MESSAGE).addListener(completed -> {
promise.setSuccess(completed.isSuccess() ? Boolean.TRUE : Boolean.FALSE);
});
return promise;
}
static final class Timestamps {
private volatile long lastRead;
private volatile long lastWrite;
private volatile long lastWriteAttempt;
public Timestamps() {
}
public Timestamps(Timestamps other) {
this.lastRead = other.lastRead;
this.lastWrite = other.lastWrite;
this.lastWriteAttempt = other.lastWriteAttempt;
}
public void channelReadCompleted() {
this.lastRead = System.nanoTime();
}
public void channelWriteAttempted() {
this.lastWriteAttempt = System.nanoTime();
}
public void channelWriteCompleted() {
this.lastWrite = System.nanoTime();
}
public long lastChannelRead() {
return this.lastRead;
}
public long lastChannelWrite() {
return this.lastWrite;
}
public long lastChannelWriteAttempt() {
return this.lastWriteAttempt;
}
}
}

Просмотреть файл

@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.util.concurrent.EventExecutor;
@ -103,12 +102,12 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
*/
RntbdClientChannelPool(final Bootstrap bootstrap, final RntbdEndpoint.Config config) {
super(bootstrap, new RntbdClientChannelHandler(config), ChannelHealthChecker.ACTIVE, true, true);
super(bootstrap, new RntbdClientChannelHandler(config), new RntbdClientChannelHealthChecker(config), true, true);
this.executor = bootstrap.config().group().next();
this.maxChannels = config.getMaxChannelsPerEndpoint();
this.maxChannels = config.maxChannelsPerEndpoint();
this.maxPendingAcquisitions = Integer.MAX_VALUE;
this.maxRequestsPerChannel = config.getMaxRequestsPerChannel();
this.maxRequestsPerChannel = config.maxRequestsPerChannel();
this.availableChannelCount = new AtomicInteger();
this.acquiredChannelCount = new AtomicInteger();
@ -418,7 +417,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
if (requestManager == null) {
reportIssueUnless(!channel.isActive(), logger, this, "{} active with no request manager", channel);
reportIssueUnless(!channel.isActive(), logger, channel, "active with no request manager");
return true; // inactive
}

Просмотреть файл

@ -75,11 +75,11 @@ public final class RntbdClientChannelPool1 extends FixedChannelPool {
RntbdClientChannelPool1(final Bootstrap bootstrap, final RntbdEndpoint.Config config) {
super(bootstrap, new RntbdClientChannelHandler(config), ChannelHealthChecker.ACTIVE, null,
-1L, config.getMaxChannelsPerEndpoint(), Integer.MAX_VALUE, true
-1L, config.maxChannelsPerEndpoint(), Integer.MAX_VALUE, true
);
this.maxRequestsPerChannel = config.getMaxRequestsPerChannel();
this.maxChannels = config.getMaxChannelsPerEndpoint();
this.maxRequestsPerChannel = config.maxRequestsPerChannel();
this.maxChannels = config.maxChannelsPerEndpoint();
this.availableChannelCount = new AtomicInteger();
this.closed = new AtomicBoolean();
}

Просмотреть файл

@ -73,41 +73,45 @@ public interface RntbdEndpoint extends AutoCloseable {
this.wireLogLevel = wireLogLevel;
}
public int getConnectionTimeout() {
final long value = this.options.getConnectionTimeout().toMillis();
public int connectionTimeout() {
final long value = this.options.connectionTimeout().toMillis();
assert value <= Integer.MAX_VALUE;
return (int)value;
}
public int getMaxChannelsPerEndpoint() {
return this.options.getMaxChannelsPerEndpoint();
public long idleConnectionTimeout() {
return this.options.idleTimeout().toNanos();
}
public int getMaxRequestsPerChannel() {
return this.options.getMaxRequestsPerChannel();
public int maxChannelsPerEndpoint() {
return this.options.maxChannelsPerEndpoint();
}
public long getReceiveHangDetectionTime() {
return this.options.getReceiveHangDetectionTime().toNanos();
public int maxRequestsPerChannel() {
return this.options.maxRequestsPerChannel();
}
public long getRequestTimeout() {
return this.options.getRequestTimeout().toNanos();
public long receiveHangDetectionTime() {
return this.options.receiveHangDetectionTime().toNanos();
}
public long getSendHangDetectionTime() {
return this.options.getSendHangDetectionTime().toNanos();
public long requestTimeout() {
return this.options.requestTimeout().toNanos();
}
public SslContext getSslContext() {
public long sendHangDetectionTime() {
return this.options.sendHangDetectionTime().toNanos();
}
public SslContext sslContext() {
return this.sslContext;
}
public UserAgentContainer getUserAgent() {
return this.options.getUserAgent();
public UserAgentContainer userAgent() {
return this.options.userAgent();
}
public LogLevel getWireLogLevel() {
public LogLevel wireLogLevel() {
return this.wireLogLevel;
}

Просмотреть файл

@ -51,6 +51,7 @@ import com.microsoft.azure.cosmosdb.rx.internal.PartitionKeyRangeIsSplittingExce
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
@ -62,6 +63,8 @@ import io.netty.channel.CoalescingBufferQueue;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.EventExecutor;
@ -80,6 +83,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.microsoft.azure.cosmosdb.internal.HttpConstants.StatusCodes;
import static com.microsoft.azure.cosmosdb.internal.HttpConstants.SubStatusCodes;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdClientChannelHealthChecker.Timestamps;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdReporter.reportIssue;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdReporter.reportIssueUnless;
@ -92,6 +96,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
private final CompletableFuture<RntbdContext> contextFuture = new CompletableFuture<>();
private final CompletableFuture<RntbdContextRequest> contextRequestFuture = new CompletableFuture<>();
private final ConcurrentHashMap<Long, RntbdRequestRecord> pendingRequests;
private final Timestamps timestamps = new Timestamps();
private final int pendingRequestLimit;
private boolean closingExceptionally = false;
@ -186,7 +191,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
}
/**
* Invoked when the last message read by the current read operation has been consumed
* Invoked when the most-recent message read by the current read operation has been consumed
* <p>
* If {@link ChannelOption#AUTO_READ} is off, no further attempt to read an inbound data from the current
* {@link Channel} will be made until {@link ChannelHandlerContext#read} is called. This leaves time
@ -197,6 +202,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
@Override
public void channelReadComplete(final ChannelHandlerContext context) {
this.traceOperation(context, "channelReadComplete");
this.timestamps.channelReadCompleted();
context.fireChannelReadComplete();
}
@ -296,6 +302,13 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
this.traceOperation(context, "userEventTriggered", event);
try {
if (event instanceof IdleStateEvent) {
if (((IdleStateEvent)event).state() == IdleState.WRITER_IDLE) {
context.writeAndFlush(RntbdHealthCheckRequest.MESSAGE);
}
return;
}
if (event instanceof RntbdContext) {
this.contextFuture.complete((RntbdContext)event);
this.removeContextNegotiatorAndFlushPendingWrites(context);
@ -434,28 +447,33 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) {
// TODO: DANOBLE: Ensure that all write errors are reported with a root cause of type EncoderException
// Requires a full scan of the rntbd code
this.traceOperation(context, "write", message);
this.timestamps.channelWriteAttempted();
final ChannelFuture future;
if (message instanceof RntbdRequestRecord) {
context.write(this.addPendingRequestRecord(context, (RntbdRequestRecord)message), promise);
future = context.write(this.addPendingRequestRecord(context, (RntbdRequestRecord)message), promise);
} else if (message == RntbdHealthCheckRequest.MESSAGE) {
context.write(RntbdHealthCheckRequest.MESSAGE, promise);
future = context.write(RntbdHealthCheckRequest.MESSAGE, promise);
} else {
final IllegalStateException error = new IllegalStateException(
Strings.lenientFormat("expected message of %s, not %s: %s",
RntbdRequestRecord.class, message.getClass(), message
)
);
final IllegalStateException error = new IllegalStateException(Strings.lenientFormat("message of %s: %s", message.getClass(), message));
reportIssue(logger, context, "", error);
this.exceptionCaught(context, error);
return;
}
future.addListener(completed -> {
if (completed.isSuccess()) {
this.timestamps.channelWriteCompleted();
}
});
}
// endregion
@ -483,6 +501,10 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
this.pendingWrites.add(out, promise);
}
RntbdClientChannelHealthChecker.Timestamps timestamps() {
return new RntbdClientChannelHealthChecker.Timestamps(this.timestamps);
}
private RntbdRequestArgs addPendingRequestRecord(final ChannelHandlerContext context, final RntbdRequestRecord record) {
return this.pendingRequests.compute(record.getTransportRequestId(), (id, current) -> {

Просмотреть файл

@ -83,7 +83,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
.channel(NioSocketChannel.class)
.group(group)
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectionTimeout())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout())
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());
@ -282,7 +282,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
}
this.config = new Config(options, sslContext, wireLogLevel);
this.requestTimer = new RntbdRequestTimer(config.getRequestTimeout());
this.requestTimer = new RntbdRequestTimer(config.requestTimeout());
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
}

Просмотреть файл

@ -842,7 +842,7 @@ public final class RntbdTransportClientTest {
this.requestTimer = timer;
this.fakeChannel = new FakeChannel(responses,
new RntbdContextNegotiator(requestManager, config.getUserAgent()),
new RntbdContextNegotiator(requestManager, config.userAgent()),
new RntbdRequestEncoder(),
new RntbdResponseDecoder(),
requestManager
@ -874,7 +874,7 @@ public final class RntbdTransportClientTest {
Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
this.config = new Config(options, sslContext, LogLevel.WARN);
this.timer = new RntbdRequestTimer(config.getRequestTimeout());
this.timer = new RntbdRequestTimer(config.requestTimeout());
this.expected = expected;
}