We now ensure with a health check request that a newly created channel has requested an RntbdContext. Confirmed: direct and e2e test groups pass locally in Direct TCP mode.

This commit is contained in:
David.Noble@microsoft.com 2019-06-26 23:15:41 -07:00
Родитель 1b4983db34
Коммит 282e7dbe3e
12 изменённых файлов: 59 добавлений и 54 удалений

Просмотреть файл

@ -93,7 +93,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
// by the above EventExecutor.
private final Queue<AcquireTask> pendingAcquisitionQueue = new ArrayDeque<AcquireTask>();
private final Runnable acquisitionTimeoutTask;
private int pendingAcquisitionCount;
private int pendingChannelAcquisitionCount;
/**
* Initializes a newly created {@link RntbdClientChannelPool} object
@ -118,8 +118,8 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
// Alternatively: drop acquisition timeout and acquisition timeout action
// Decision should be based on performance, reliability, and usability considerations
AcquisitionTimeoutAction acquisitionTimeoutAction = null;
long acquisitionTimeoutNanos = -1L;
final AcquisitionTimeoutAction acquisitionTimeoutAction = null;
final long acquisitionTimeoutNanos = -1L;
if (acquisitionTimeoutAction == null) {
@ -206,8 +206,8 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
return this.maxRequestsPerChannel;
}
public int pendingAcquisitionCount() {
return this.pendingAcquisitionCount;
public int pendingChannelAcquisitionCount() {
return this.pendingChannelAcquisitionCount;
}
@Override
@ -233,13 +233,13 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
} else {
Throwable cause = future.cause();
final Throwable cause = future.cause();
if (!(cause instanceof IllegalArgumentException)) {
this.decrementAndRunTaskQueue();
}
promise.setFailure(future.cause());
promise.setFailure(cause);
}
}));
@ -292,7 +292,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
}
if (this.isClosed()) {
return first; // because we're being called following a call to close (from super.close)
return first; // because this.close -> this.close0 -> super.close -> this.pollChannel
}
if (this.isInactiveOrServiceableChannel(first)) {
@ -330,17 +330,17 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
checkState(this.acquiredChannelCount.get() >= 0);
AcquireListener l = new AcquireListener(this, promise);
final AcquireListener l = new AcquireListener(this, promise);
l.acquired();
Promise<Channel> p = this.executor.newPromise();
final Promise<Channel> p = this.executor.newPromise();
p.addListener(l);
super.acquire(p); // acquire an existing channel or create and acquire a new channel
} else {
if (this.pendingAcquisitionCount >= this.maxPendingAcquisitions) {
if (this.pendingChannelAcquisitionCount >= this.maxPendingAcquisitions) {
promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);
@ -352,7 +352,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
if (this.pendingAcquisitionQueue.offer(task)) {
this.pendingAcquisitionCount++;
this.pendingChannelAcquisitionCount++;
if (acquisitionTimeoutTask != null) {
task.timeoutFuture = executor.schedule(acquisitionTimeoutTask, acquisitionTimeoutNanos, TimeUnit.NANOSECONDS);
@ -363,21 +363,20 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
}
}
checkState(this.pendingAcquisitionCount > 0);
checkState(this.pendingChannelAcquisitionCount > 0);
}
}
private void close0() {
checkState(this.executor.inEventLoop());
this.availableChannelCount.set(0);
for (; ; ) {
AcquireTask task = this.pendingAcquisitionQueue.poll();
final AcquireTask task = this.pendingAcquisitionQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
final ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
@ -385,7 +384,8 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
}
this.acquiredChannelCount.set(0);
this.pendingAcquisitionCount = 0;
this.availableChannelCount.set(0);
this.pendingChannelAcquisitionCount = 0;
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
// to ensure we will not block in an EventExecutor
@ -395,11 +395,11 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
private void decrementAndRunTaskQueue() {
int currentCount = this.acquiredChannelCount.decrementAndGet();
final int currentCount = this.acquiredChannelCount.decrementAndGet();
checkState(currentCount >= 0);
// Run the pending acquisition tasks before notifying the original promise so that if the user tries to
// acquire again from the ChannelFutureListener and the pendingAcquisitionCount is greater than
// acquire again from the ChannelFutureListener and the pendingChannelAcquisitionCount is greater than
// maxPendingAcquisitions we may be able to run some pending tasks first and so allow to add more
runTaskQueue();
}
@ -429,19 +429,19 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
while (this.acquiredChannelCount.get() < this.maxChannels) {
AcquireTask task = this.pendingAcquisitionQueue.poll();
final AcquireTask task = this.pendingAcquisitionQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
final ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
this.pendingAcquisitionCount--;
this.pendingChannelAcquisitionCount--;
task.acquired();
super.acquire(task.promise);
@ -450,7 +450,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
// We should never have negative values
checkState(this.acquiredChannelCount.get() >= 0);
checkState(this.pendingAcquisitionCount >= 0);
checkState(this.pendingChannelAcquisitionCount >= 0);
}
private void throwIfClosed() {
@ -478,7 +478,6 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
private final Promise<Channel> originalPromise;
private final RntbdClientChannelPool pool;
private boolean acquired;
AcquireListener(RntbdClientChannelPool pool, Promise<Channel> originalPromise) {
@ -519,10 +518,10 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
// 2. RntbdContextRequest -> RntbdContext
// 3. RntbdHealthCheckRequest -> RntbdHealthCheck
Channel channel = future.getNow();
final Channel channel = future.getNow();
checkState(channel.isActive());
RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
checkState(requestManager != null);
if (requestManager.hasRequestedRntbdContext()) {
@ -575,7 +574,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
private static abstract class AcquireTimeoutTask implements Runnable {
RntbdClientChannelPool pool;
final RntbdClientChannelPool pool;
public AcquireTimeoutTask(RntbdClientChannelPool pool) {
this.pool = pool;
@ -587,7 +586,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
public final void run() {
checkState(this.pool.executor.inEventLoop());
long nanoTime = System.nanoTime();
final long nanoTime = System.nanoTime();
for (; ; ) {
AcquireTask task = this.pool.pendingAcquisitionQueue.peek();
@ -599,7 +598,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
break;
}
this.pool.pendingAcquisitionQueue.remove();
this.pool.pendingAcquisitionCount--;
this.pool.pendingChannelAcquisitionCount--;
this.onTimeout(task);
}
}
@ -608,11 +607,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
static final class JsonSerializer extends StdSerializer<RntbdClientChannelPool> {
JsonSerializer() {
this(null);
}
JsonSerializer(Class<RntbdClientChannelPool> type) {
super(type);
super(RntbdClientChannelPool.class);
}
@Override
@ -625,7 +620,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
generator.writeBooleanField("isClosed", pool.isClosed());
generator.writeNumberField("acquiredChannelCount", pool.acquiredChannelCount());
generator.writeNumberField("availableChannelCount", pool.availableChannelCount());
generator.writeNumberField("pendingAcquisitionCount", pool.pendingAcquisitionCount());
generator.writeNumberField("pendingChannelAcquisitionCount", pool.pendingChannelAcquisitionCount());
generator.writeEndObject();
generator.writeEndObject();
}

Просмотреть файл

@ -24,6 +24,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
@ -639,7 +640,7 @@ final class RntbdConstants {
case 0x001D:
return RntbdResourceType.UserDefinedType;
}
throw new IllegalArgumentException(String.format("id: %d", id));
throw new IllegalArgumentException(Strings.lenientFormat("id: %s", id));
}
public short id() {

Просмотреть файл

@ -27,6 +27,7 @@ package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Strings;
import com.microsoft.azure.cosmosdb.internal.UserAgentContainer;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.CorruptedFrameException;
@ -83,7 +84,7 @@ public final class RntbdContextRequest {
final int observedLength = in.readerIndex() - start;
if (observedLength != expectedLength) {
final String reason = String.format("expectedLength=%d, observeredLength=%d", expectedLength, observedLength);
final String reason = Strings.lenientFormat("expectedLength=%s, observedLength=%s", expectedLength, observedLength);
throw new IllegalStateException(reason);
}
@ -105,7 +106,7 @@ public final class RntbdContextRequest {
final int observedLength = out.writerIndex() - start;
if (observedLength != expectedLength) {
final String reason = String.format("expectedLength=%d, observeredLength=%d", expectedLength, observedLength);
final String reason = Strings.lenientFormat("expectedLength=%s, observedLength=%s", expectedLength, observedLength);
throw new IllegalStateException(reason);
}
}

Просмотреть файл

@ -24,6 +24,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.CorruptedFrameException;
@ -46,14 +47,14 @@ final class RntbdFramer {
final long length = in.getUnsignedIntLE(start);
if (length > Integer.MAX_VALUE) {
final String reason = String.format("Head frame length exceeds Integer.MAX_VALUE, %d: %d",
final String reason = Strings.lenientFormat("Head frame length exceeds Integer.MAX_VALUE, %s: %s",
Integer.MAX_VALUE, length
);
throw new CorruptedFrameException(reason);
}
if (length < Integer.BYTES) {
final String reason = String.format("Head frame length is less than size of length field, %d: %d",
final String reason = Strings.lenientFormat("Head frame length is less than size of length field, %s: %s",
Integer.BYTES, length
);
throw new CorruptedFrameException(reason);
@ -81,7 +82,7 @@ final class RntbdFramer {
final long length = in.getUnsignedIntLE(start);
if (length > Integer.MAX_VALUE) {
final String reason = String.format("Payload frame length exceeds Integer.MAX_VALUE, %d: %d",
final String reason = Strings.lenientFormat("Payload frame length exceeds Integer.MAX_VALUE, %s: %s",
Integer.MAX_VALUE, length
);
throw new CorruptedFrameException(reason);

Просмотреть файл

@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ser.PropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.CorruptedFrameException;
@ -76,7 +77,7 @@ public final class RntbdObjectMapper {
return (ObjectNode)node;
}
final String cause = String.format("Expected %s, not %s", JsonNodeType.OBJECT, node.getNodeType());
final String cause = Strings.lenientFormat("Expected %s, not %s", JsonNodeType.OBJECT, node.getNodeType());
throw new CorruptedFrameException(cause);
}

Просмотреть файл

@ -25,6 +25,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Strings;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import io.netty.buffer.ByteBuf;
@ -35,7 +36,7 @@ import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.Rnt
public final class RntbdRequest {
private static final byte[] EmptyByteArray = {};
private static final byte[] EMPTY_BYTE_ARRAY = {};
private final RntbdRequestFrame frame;
private final RntbdRequestHeaders headers;
@ -48,7 +49,7 @@ public final class RntbdRequest {
this.frame = frame;
this.headers = headers;
this.payload = payload == null ? EmptyByteArray : payload;
this.payload = payload == null ? EMPTY_BYTE_ARRAY : payload;
}
public UUID getActivityId() {
@ -84,7 +85,7 @@ public final class RntbdRequest {
final int observedLength = in.readerIndex() - start;
if (observedLength != expectedLength) {
final String reason = String.format("expectedLength=%d, observedLength=%d", expectedLength, observedLength);
final String reason = Strings.lenientFormat("expectedLength=%s, observedLength=%s", expectedLength, observedLength);
throw new IllegalStateException(reason);
}

Просмотреть файл

@ -24,11 +24,11 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.google.common.base.Strings;
import com.microsoft.azure.cosmosdb.internal.OperationType;
import com.microsoft.azure.cosmosdb.internal.ResourceType;
import io.netty.buffer.ByteBuf;
import java.util.Locale;
import java.util.UUID;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdConstants.RntbdOperationType;
@ -150,7 +150,7 @@ final class RntbdRequestFrame {
case RidRange:
return RntbdResourceType.RidRange;
default:
final String reason = String.format(Locale.ROOT, "Unrecognized resource type: %s", resourceType);
final String reason = Strings.lenientFormat("Unrecognized resource type: %s", resourceType);
throw new UnsupportedOperationException(reason);
}
}
@ -225,7 +225,7 @@ final class RntbdRequestFrame {
case AddComputeGatewayRequestCharges:
return RntbdOperationType.AddComputeGatewayRequestCharges;
default:
final String reason = String.format(Locale.ROOT, "Unrecognized operation type: %s", operationType);
final String reason = Strings.lenientFormat("Unrecognized operation type: %s", operationType);
throw new UnsupportedOperationException(reason);
}
}

Просмотреть файл

@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.http.HttpResponseStatus;
@ -85,7 +86,7 @@ final class RntbdResponseStatus {
final long length = in.readUnsignedIntLE();
if (!(LENGTH <= length && length <= Integer.MAX_VALUE)) {
final String reason = String.format("frame length: %d", length);
final String reason = Strings.lenientFormat("frame length: %s", length);
throw new CorruptedFrameException(reason);
}
@ -93,7 +94,7 @@ final class RntbdResponseStatus {
final HttpResponseStatus status = HttpResponseStatus.valueOf(code);
if (status == null) {
final String reason = String.format("status code: %d", code);
final String reason = Strings.lenientFormat("status code: %s", code);
throw new CorruptedFrameException(reason);
}

Просмотреть файл

@ -28,6 +28,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
@ -214,7 +215,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
final String reason = cause.getMessage();
final GoneException goneException = new GoneException(
String.format("failed to establish connection to %s: %s", this.remoteAddress, reason),
Strings.lenientFormat("failed to establish connection to %s: %s", this.remoteAddress, reason),
cause instanceof Exception ? (Exception)cause : new IOException(reason, cause),
ImmutableMap.of(HttpConstants.HttpHeaders.ACTIVITY_ID, activityId.toString()),
requestArgs.getReplicaPath()

Просмотреть файл

@ -31,6 +31,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.PropertyWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import static com.google.common.base.Preconditions.checkArgument;
@ -170,7 +171,7 @@ final class RntbdToken {
if (!this.isPresent()) {
if (this.isRequired()) {
final String message = String.format("Missing value for required header: %s", this);
final String message = Strings.lenientFormat("Missing value for required header: %s", this);
throw new IllegalStateException(message);
}
return;

Просмотреть файл

@ -24,6 +24,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@ -92,7 +93,7 @@ abstract class RntbdTokenStream<T extends Enum<T> & RntbdHeader> {
for (final RntbdToken token : stream.tokens.values()) {
if (!token.isPresent() && token.isRequired()) {
final String reason = String.format("Required token not found on RNTBD stream: type: %s, identifier: %s",
final String reason = Strings.lenientFormat("Required token not found on token stream: type=%s, identifier=%s",
token.getTokenType(), token.getId());
throw new IllegalStateException(reason);
}

Просмотреть файл

@ -24,6 +24,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.CorruptedFrameException;
@ -60,7 +61,7 @@ public final class RntbdUUID {
checkNotNull(in, "in");
if (in.readableBytes() < 2 * Long.BYTES) {
final String reason = String.format("invalid frame length: %d", in.readableBytes());
final String reason = Strings.lenientFormat("invalid frame length: %s", in.readableBytes());
throw new CorruptedFrameException(reason);
}