Refactored for improved diagnstics, testability, and usability.

This commit is contained in:
David Noble 2019-05-04 01:04:09 -07:00
Родитель 45b7ef984d
Коммит 27fc9c1c7f
8 изменённых файлов: 403 добавлений и 263 удалений

Просмотреть файл

@ -27,16 +27,13 @@ package com.microsoft.azure.cosmosdb.internal.directconnectivity;
import com.google.common.base.Stopwatch;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.internal.UserAgentContainer;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdEndpoint;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdObjectMapper;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestArgs;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestTimer;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdServiceEndpoint;
import com.microsoft.azure.cosmosdb.rx.internal.Configs;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Single;
@ -45,7 +42,6 @@ import rx.SingleEmitter;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -61,7 +57,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
private static final Logger logger = LoggerFactory.getLogger(className);
private final AtomicBoolean closed = new AtomicBoolean();
private final EndpointFactory endpointFactory;
private final RntbdEndpoint.Provider endpointProvider;
private final Metrics metrics;
private final String name;
@ -69,23 +65,18 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// region Constructors
RntbdTransportClient(final EndpointFactory endpointFactory) {
RntbdTransportClient(final RntbdEndpoint.Provider endpointProvider) {
this.name = RntbdTransportClient.className + '-' + RntbdTransportClient.instanceCount.incrementAndGet();
this.endpointFactory = endpointFactory;
this.endpointProvider = endpointProvider;
this.metrics = new Metrics();
}
RntbdTransportClient(final Options options, final SslContext sslContext) {
this(new EndpointFactory(options, sslContext));
this(new RntbdServiceEndpoint.Provider(options, sslContext));
}
RntbdTransportClient(final Configs configs, final int requestTimeoutInSeconds, final UserAgentContainer userAgent) {
this(new Options.Builder()
.requestTimeout(Duration.ofSeconds(requestTimeoutInSeconds))
.userAgent(userAgent)
.build(),
configs.getSslContext()
);
this(new Options.Builder(requestTimeoutInSeconds).userAgent(userAgent).build(), configs.getSslContext());
}
// endregion
@ -93,21 +84,12 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// region Methods
@Override
public void close() {
public void close() throws RuntimeException {
if (this.closed.compareAndSet(false, true)) {
this.endpointFactory.close().addListener(future -> {
if (future.isSuccess()) {
logger.debug("{} closed endpoints", this);
return;
}
logger.error("{} failed to close endpoints due to {}", this, future.cause());
});
} else {
logger.debug("{} already closed", this);
this.endpointProvider.close();
return;
}
logger.debug("{} already closed", this);
}
@Override
@ -125,7 +107,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
logger.debug("\n {}\n {}\n INVOKE_STORE_ASYNC", this, requestArgs);
}
final Endpoint endpoint = this.endpointFactory.getEndpoint(physicalAddress);
final RntbdEndpoint endpoint = this.endpointProvider.get(physicalAddress);
this.metrics.incrementRequestCount();
final CompletableFuture<StoreResponse> future = endpoint.request(requestArgs);
@ -153,8 +135,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
@Override
public String toString() {
final long endpointCount = this.endpointFactory.endpoints.mappingCount();
return '[' + this.name + "(endpointCount: " + endpointCount + ", " + this.metrics + ")]";
return '[' + this.name + "(endpointCount: " + this.endpointProvider.count() + ", " + this.metrics + ")]";
}
private void throwIfClosed() {
@ -167,171 +148,57 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// region Types
public interface Endpoint {
void close();
CompletableFuture<StoreResponse> request(RntbdRequestArgs requestArgs);
}
public static final class Config {
private final Options options;
private final SslContext sslContext;
private final LogLevel wireLogLevel;
Config(final SslContext sslContext, final LogLevel wireLogLevel, final Options options) {
checkNotNull(sslContext, "sslContext");
checkNotNull(options, "options");
this.sslContext = sslContext;
this.wireLogLevel = wireLogLevel;
this.options = options;
}
public int getConnectionTimeout() {
final long value = this.options.getOpenTimeout().toMillis();
assert value <= Integer.MAX_VALUE;
return (int)value;
}
public int getMaxChannelsPerEndpoint() {
return this.options.getMaxChannelsPerEndpoint();
}
public int getMaxRequestsPerChannel() {
return this.options.getMaxRequestsPerChannel();
}
public Options getOptions() {
return this.options;
}
public long getReceiveHangDetectionTime() {
return this.options.getReceiveHangDetectionTime().toNanos();
}
public long getSendHangDetectionTime() {
return this.options.getSendHangDetectionTime().toNanos();
}
public SslContext getSslContext() {
return this.sslContext;
}
public UserAgentContainer getUserAgent() {
return this.options.getUserAgent();
}
public LogLevel getWireLogLevel() {
return this.wireLogLevel;
}
}
static class EndpointFactory {
private final ConcurrentHashMap<String, Endpoint> endpoints = new ConcurrentHashMap<>();
private final NioEventLoopGroup eventLoopGroup;
private final Config pipelineConfig;
private final RntbdRequestTimer requestTimer;
EndpointFactory(final Options options, final SslContext sslContext) {
checkNotNull(options, "options");
checkNotNull(sslContext, "sslContext");
final DefaultThreadFactory threadFactory = new DefaultThreadFactory("CosmosEventLoop", true);
final int threadCount = Runtime.getRuntime().availableProcessors();
final LogLevel wireLogLevel;
if (RntbdTransportClient.logger.isTraceEnabled()) {
wireLogLevel = LogLevel.TRACE;
} else if (RntbdTransportClient.logger.isDebugEnabled()) {
wireLogLevel = LogLevel.DEBUG;
} else {
wireLogLevel = null;
}
this.requestTimer = new RntbdRequestTimer(options.getRequestTimeout());
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
this.pipelineConfig = new Config(sslContext, wireLogLevel, options);
}
Config getPipelineConfig() {
return this.pipelineConfig;
}
RntbdRequestTimer getRequestTimer() {
return this.requestTimer;
}
Future<?> close() {
this.requestTimer.close();
for (final Endpoint endpoint : this.endpoints.values()) {
endpoint.close();
}
return this.eventLoopGroup.shutdownGracefully();
}
Endpoint createEndpoint(final URI physicalAddress) {
return new RntbdServiceEndpoint(this.pipelineConfig, this.eventLoopGroup, this.requestTimer, physicalAddress);
}
void deleteEndpoint(final URI physicalAddress) {
// TODO: DANOBLE: Utilize this method of tearing down unhealthy endpoints
// Links:
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/331552
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/331593
final String authority = physicalAddress.getAuthority();
final Endpoint endpoint = this.endpoints.remove(authority);
checkNotNull(endpoint, "physicalAddress: %s", physicalAddress);
endpoint.close();
}
Endpoint getEndpoint(final URI physicalAddress) {
return this.endpoints.computeIfAbsent(
physicalAddress.getAuthority(), authority -> this.createEndpoint(physicalAddress)
);
}
}
public static final class Metrics {
// region Fields
private final AtomicLong errorResponseCount = new AtomicLong();
private final Stopwatch lifetime = Stopwatch.createStarted();
private final AtomicLong requestCount = new AtomicLong();
private final AtomicLong responseCount = new AtomicLong();
private final AtomicLong requestFailureCount = new AtomicLong();
public final Stopwatch getLifetime() {
// endregion
// region Accessors
public double getFailureRate() {
return this.errorResponseCount.get() / this.requestCount.get();
}
public Stopwatch getLifetime() {
return this.lifetime;
}
public final long getRequestCount() {
public long getPendingRequestCount() {
return this.requestCount.get() - this.responseCount.get();
}
public long getRequestCount() {
return this.requestCount.get();
}
public final double getRequestsPerSecond() {
public long getResponseCount() {
return this.responseCount.get();
}
public double getSuccessRate() {
return (this.responseCount.get() - this.errorResponseCount.get()) / this.requestCount.get();
}
public double getThroughput() {
return this.responseCount.get() / (1E-9 * this.lifetime.elapsed().toNanos());
}
public final long getResponseCount() {
return this.responseCount.get();
}
// endregion
// region Methods
public final void incrementRequestCount() {
this.requestCount.incrementAndGet();
}
public final void incrementRequestFailureCount() {
this.requestFailureCount.incrementAndGet();
this.errorResponseCount.incrementAndGet();
}
public final void incrementResponseCount() {
@ -340,9 +207,10 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
@Override
public String toString() {
return "lifetime: " + this.lifetime + ", requestCount: " + this.requestCount + ", responseCount: "
+ this.responseCount + ", requestFailureCount: " + this.requestFailureCount;
return RntbdObjectMapper.toJson(this);
}
// endregion
}
public static final class Options {
@ -352,7 +220,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
private final String certificateHostNameOverride;
private final int maxChannelsPerEndpoint;
private final int maxRequestsPerChannel;
private final Duration openTimeout;
private final Duration connectionTimeout;
private final int partitionCount;
private final Duration receiveHangDetectionTime;
private final Duration requestTimeout;
@ -363,22 +231,12 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// region Constructors
public Options(final Duration requestTimeout) {
this(new Builder().requestTimeout(requestTimeout));
}
public Options(final int requestTimeoutInSeconds) {
this(Duration.ofSeconds((long)requestTimeoutInSeconds));
}
private Options(Builder builder) {
checkNotNull(builder.requestTimeout, "requestTimeout: null");
this.certificateHostNameOverride = builder.certificateHostNameOverride;
this.maxChannelsPerEndpoint = builder.maxChannelsPerEndpoint;
this.maxRequestsPerChannel = builder.maxRequestsPerChannel;
this.openTimeout = builder.openTimeout == null ? builder.requestTimeout : builder.openTimeout;
this.connectionTimeout = builder.connectionTimeout == null ? builder.requestTimeout : builder.connectionTimeout;
this.partitionCount = builder.partitionCount;
this.requestTimeout = builder.requestTimeout;
this.receiveHangDetectionTime = builder.receiveHangDetectionTime;
@ -388,7 +246,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// endregion
// region Property accessors
// region Accessors
public String getCertificateHostNameOverride() {
return this.certificateHostNameOverride;
@ -402,8 +260,8 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
return this.maxRequestsPerChannel;
}
public Duration getOpenTimeout() {
return this.openTimeout;
public Duration getConnectionTimeout() {
return this.connectionTimeout;
}
public int getPartitionCount() {
@ -428,36 +286,68 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
// endregion
// region Methods
@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
}
// endregion
// region Types
public static class Builder {
// region Fields
private static final UserAgentContainer DEFAULT_USER_AGENT_CONTAINER = new UserAgentContainer();
private static final Duration SIXTY_FIVE_SECONDS = Duration.ofSeconds(65L);
private static final Duration TEN_SECONDS = Duration.ofSeconds(10L);
// Required parameters
private String certificateHostNameOverride = null;
// Optional parameters
private int maxChannelsPerEndpoint = 10;
private int maxRequestsPerChannel = 30;
private Duration openTimeout = null;
private Duration connectionTimeout = null;
private int partitionCount = 1;
private Duration receiveHangDetectionTime = SIXTY_FIVE_SECONDS;
private Duration requestTimeout = null;
private Duration requestTimeout;
private Duration sendHangDetectionTime = TEN_SECONDS;
private UserAgentContainer userAgent = DEFAULT_USER_AGENT_CONTAINER;
Options build() {
// endregion
// region Constructors
public Builder(Duration requestTimeout) {
this.requestTimeout(requestTimeout);
}
public Builder(int requestTimeoutInSeconds) {
this(Duration.ofSeconds(requestTimeoutInSeconds));
}
// endregion
// region Methods
public Options build() {
return new Options(this);
}
Builder certificateHostNameOverride(final String value) {
public Builder certificateHostNameOverride(final String value) {
this.certificateHostNameOverride = value;
return this;
}
Builder maxChannelsPerEndpoint(final int value) {
checkArgument(value > 0, "value: %s", value);
this.maxChannelsPerEndpoint = value;
public Builder connectionTimeout(final Duration value) {
checkArgument(value == null || value.compareTo(Duration.ZERO) > 0, "value: %s", value);
this.connectionTimeout = value;
return this;
}
@ -467,9 +357,9 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
return this;
}
public Builder openTimeout(final Duration value) {
checkArgument(value == null || value.compareTo(Duration.ZERO) > 0, "value: %s", value);
this.openTimeout = value;
public Builder maxChannelsPerEndpoint(final int value) {
checkArgument(value > 0, "value: %s", value);
this.maxChannelsPerEndpoint = value;
return this;
}
@ -511,6 +401,8 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
this.userAgent = value;
return this;
}
// endregion
}
// endregion

Просмотреть файл

@ -24,7 +24,6 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
@ -46,9 +45,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class RntbdClientChannelHandler extends ChannelInitializer<Channel> implements ChannelPoolHandler {
private static Logger logger = LoggerFactory.getLogger(RntbdClientChannelHandler.class);
private final RntbdTransportClient.Config config;
private final RntbdEndpoint.Config config;
RntbdClientChannelHandler(final RntbdTransportClient.Config config) {
RntbdClientChannelHandler(final RntbdEndpoint.Config config) {
checkNotNull(config, "config");
this.config = config;
}

Просмотреть файл

@ -24,7 +24,6 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
@ -45,9 +44,9 @@ public class RntbdClientChannelPool extends FixedChannelPool {
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param config the {@link RntbdTransportClient.Config} that is used for the channel pool instance created
* @param config the {@link RntbdEndpoint.Config} that is used for the channel pool instance created
*/
RntbdClientChannelPool(final Bootstrap bootstrap, final RntbdTransportClient.Config config) {
RntbdClientChannelPool(final Bootstrap bootstrap, final RntbdEndpoint.Config config) {
super(bootstrap, new RntbdClientChannelHandler(config), ChannelHealthChecker.ACTIVE, null,
-1L, config.getMaxChannelsPerEndpoint(), Integer.MAX_VALUE, true
@ -104,4 +103,5 @@ public class RntbdClientChannelPool extends FixedChannelPool {
private static int pendingRequestCount(final Channel channel) {
return channel.pipeline().get(RntbdRequestManager.class).getPendingRequestCount();
}
}

Просмотреть файл

@ -0,0 +1,117 @@
/*
* The MIT License (MIT)
* Copyright (c) 2018 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.microsoft.azure.cosmosdb.internal.UserAgentContainer;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.StoreResponse;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient.Options;
public interface RntbdEndpoint extends AutoCloseable {
@Override
void close() throws RuntimeException;
CompletableFuture<StoreResponse> request(RntbdRequestArgs requestArgs);
interface Provider extends AutoCloseable {
@Override
void close() throws RuntimeException;
int count();
RntbdEndpoint get(URI physicalAddress);
Stream<RntbdEndpoint> list();
}
final class Config {
private final Options options;
private final SslContext sslContext;
private final LogLevel wireLogLevel;
public Config(final Options options, final SslContext sslContext, final LogLevel wireLogLevel) {
checkNotNull(options, "options");
checkNotNull(sslContext, "sslContext");
this.options = options;
this.sslContext = sslContext;
this.wireLogLevel = wireLogLevel;
}
public int getConnectionTimeout() {
final long value = this.options.getConnectionTimeout().toMillis();
assert value <= Integer.MAX_VALUE;
return (int)value;
}
public int getMaxChannelsPerEndpoint() {
return this.options.getMaxChannelsPerEndpoint();
}
public int getMaxRequestsPerChannel() {
return this.options.getMaxRequestsPerChannel();
}
public long getReceiveHangDetectionTime() {
return this.options.getReceiveHangDetectionTime().toNanos();
}
public long getRequestTimeout() {
return this.options.getRequestTimeout().toNanos();
}
public long getSendHangDetectionTime() {
return this.options.getSendHangDetectionTime().toNanos();
}
public SslContext getSslContext() {
return this.sslContext;
}
public UserAgentContainer getUserAgent() {
return this.options.getUserAgent();
}
public LogLevel getWireLogLevel() {
return this.wireLogLevel;
}
@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
}
}
}

Просмотреть файл

@ -24,6 +24,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
@ -34,13 +35,14 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.EncoderException;
import java.io.IOException;
import java.io.InputStream;
import static com.google.common.base.Preconditions.checkNotNull;
final class RntbdObjectMapper {
public final class RntbdObjectMapper {
private static final SimpleFilterProvider filterProvider;
private static final ObjectMapper objectMapper;
@ -90,7 +92,15 @@ final class RntbdObjectMapper {
}
}
static ObjectWriter writer() {
public static String toJson(Object value) {
try {
return objectWriter.writeValueAsString(value);
} catch (final JsonProcessingException error) {
throw new EncoderException(error);
}
}
public static ObjectWriter writer() {
return objectWriter;
}
}

Просмотреть файл

@ -29,23 +29,22 @@ import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public final class RntbdRequestTimer implements AutoCloseable {
private static final long FIVE_MILLISECONDS = 5000000L;
private final long requestTimeoutInterval;
private final long requestTimeout;
private final Timer timer;
public RntbdRequestTimer(final Duration requestTimeoutInterval) {
public RntbdRequestTimer(final long requestTimeout) {
// Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures
// a request will timeout within 10 milliseconds of the specified requestTimeoutInterval. This is because
// a request will timeout within 10 milliseconds of the specified requestTimeout interval. This is because
// cancellation of a timeout takes two timer resolution units to complete.
this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS);
this.requestTimeoutInterval = requestTimeoutInterval.toNanos();
this.requestTimeout = requestTimeout;
}
/**
@ -57,6 +56,6 @@ public final class RntbdRequestTimer implements AutoCloseable {
}
Timeout newTimeout(final TimerTask task) {
return this.timer.newTimeout(task, this.requestTimeoutInterval, TimeUnit.NANOSECONDS);
return this.timer.newTimeout(task, this.requestTimeout, TimeUnit.NANOSECONDS);
}
}

Просмотреть файл

@ -28,8 +28,7 @@ import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.GoneException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient.Config;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.ServiceUnavailableException;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient.Options;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.StoreResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -37,6 +36,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,12 +47,15 @@ import java.net.SocketAddress;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient.Endpoint;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.RntbdTransportClient.Metrics;
public class RntbdServiceEndpoint implements Endpoint {
public class RntbdServiceEndpoint implements RntbdEndpoint {
private static final String className = RntbdServiceEndpoint.class.getCanonicalName();
private static final AtomicLong instanceCount = new AtomicLong();
@ -62,7 +67,9 @@ public class RntbdServiceEndpoint implements Endpoint {
private final SocketAddress remoteAddress;
private final RntbdRequestTimer requestTimer;
public RntbdServiceEndpoint(
// region Constructors
private RntbdServiceEndpoint(
final Config config, final NioEventLoopGroup group, final RntbdRequestTimer timer, final URI physicalAddress
) {
@ -81,6 +88,10 @@ public class RntbdServiceEndpoint implements Endpoint {
this.requestTimer = timer;
}
// endregion
// region Methods
@Override
public void close() {
this.channelPool.close();
@ -93,10 +104,10 @@ public class RntbdServiceEndpoint implements Endpoint {
logger.debug("\n {}\n {}\n REQUEST", this, args);
}
final CompletableFuture<StoreResponse> responseFuture = this.write(args);
final RntbdRequestRecord requestRecord = this.write(args);
this.metrics.incrementRequestCount();
return responseFuture.whenComplete((response, error) -> {
return requestRecord.whenComplete((response, error) -> {
args.traceOperation(logger, null, "requestComplete", response, error);
assert (response == null && error != null) || (response != null && error == null);
@ -119,11 +130,13 @@ public class RntbdServiceEndpoint implements Endpoint {
@Override
public String toString() {
return '[' + this.name + '(' + this.remoteAddress + ", acquiredChannelCount: "
+ this.channelPool.acquiredChannelCount() + ", " + this.metrics
+ ")]";
return '[' + this.name + "(remoteAddress: " + this.remoteAddress + ", " + this.metrics + ")]";
}
// endregion
// region Privates
private void releaseToPool(final Channel channel) {
logger.debug("\n {}\n {}\n RELEASE", this, channel);
@ -189,4 +202,98 @@ public class RntbdServiceEndpoint implements Endpoint {
return requestRecord;
}
// endregion
// region Types
public static class Provider implements RntbdEndpoint.Provider {
private static final Logger logger = LoggerFactory.getLogger(Provider.class);
private final AtomicBoolean closed = new AtomicBoolean();
private final Config config;
private final ConcurrentHashMap<String, RntbdEndpoint> endpoints = new ConcurrentHashMap<>();
private final NioEventLoopGroup eventLoopGroup;
private final RntbdRequestTimer requestTimer;
public Provider(final Options options, final SslContext sslContext) {
checkNotNull(options, "options");
checkNotNull(sslContext, "sslContext");
final DefaultThreadFactory threadFactory = new DefaultThreadFactory("CosmosEventLoop", true);
final int threadCount = Runtime.getRuntime().availableProcessors();
final LogLevel wireLogLevel;
if (logger.isTraceEnabled()) {
wireLogLevel = LogLevel.TRACE;
} else if (logger.isDebugEnabled()) {
wireLogLevel = LogLevel.DEBUG;
} else {
wireLogLevel = null;
}
this.config = new Config(options, sslContext, wireLogLevel);
this.requestTimer = new RntbdRequestTimer(config.getRequestTimeout());
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
}
@Override
public void close() {
if (this.closed.compareAndSet(false, true)) {
this.requestTimer.close();
for (final RntbdEndpoint endpoint : this.endpoints.values()) {
endpoint.close();
}
this.eventLoopGroup.shutdownGracefully().addListener(future -> {
if (future.isSuccess()) {
logger.debug("{} closed endpoints", this);
return;
}
logger.error("{} failed to close endpoints due to {}", this, future.cause());
});
return;
}
logger.debug("{} already closed", this);
}
@Override
public int count() {
return this.endpoints.size();
}
@Override
public RntbdEndpoint get(URI physicalAddress) {
return endpoints.computeIfAbsent(physicalAddress.getAuthority(), authority ->
new RntbdServiceEndpoint(config, eventLoopGroup, requestTimer, physicalAddress)
);
}
@Override
public Stream<RntbdEndpoint> list() {
return this.endpoints.values().stream();
}
private void deleteEndpoint(final URI physicalAddress) {
// TODO: DANOBLE: Utilize this method of tearing down unhealthy endpoints
// Links:
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/331552
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/331593
final String authority = physicalAddress.getAuthority();
final RntbdEndpoint endpoint = this.endpoints.remove(authority);
checkNotNull(endpoint, "physicalAddress: %s", physicalAddress);
endpoint.close();
}
}
// endregion
}

Просмотреть файл

@ -36,10 +36,12 @@ import com.microsoft.azure.cosmosdb.internal.Utils;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContext;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContextNegotiator;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdContextRequest;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdEndpoint;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestArgs;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestEncoder;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestManager;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestRecord;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestTimer;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdResponse;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdResponseDecoder;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdUUID;
@ -55,13 +57,11 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.DataProvider;
@ -76,15 +76,12 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static com.microsoft.azure.cosmosdb.internal.HttpConstants.HttpHeaders;
import static com.microsoft.azure.cosmosdb.internal.HttpConstants.HttpMethods;
import static com.microsoft.azure.cosmosdb.internal.HttpConstants.SubStatusCodes;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@ -596,7 +593,7 @@ public class RntbdTransportClientTest {
@Test(enabled = false, groups = "direct")
public void verifyGoneResponseMapsToGoneException() throws Exception {
final RntbdTransportClient.Options options = new RntbdTransportClient.Options(requestTimeout);
final RntbdTransportClient.Options options = new RntbdTransportClient.Options.Builder(requestTimeout).build();
final SslContext sslContext = SslContextBuilder.forClient().build();
try (final RntbdTransportClient transportClient = new RntbdTransportClient(options, sslContext)) {
@ -718,7 +715,10 @@ public class RntbdTransportClientTest {
final RntbdResponse expected
) {
final RntbdTransportClient.Options options = new RntbdTransportClient.Options(requestTimeout);
final RntbdTransportClient.Options options = new RntbdTransportClient.Options.Builder(requestTimeout)
.userAgent(userAgent)
.build();
final SslContext sslContext;
try {
@ -727,21 +727,7 @@ public class RntbdTransportClientTest {
throw new AssertionError(String.format("%s: %s", error.getClass(), error.getMessage()));
}
final RntbdTransportClient.EndpointFactory endpointFactory = spy(new RntbdTransportClient.EndpointFactory(
options, sslContext
));
final RntbdTransportClient client = new RntbdTransportClient(endpointFactory);
doAnswer(invocation -> {
RntbdTransportClient.EndpointFactory factory = (RntbdTransportClient.EndpointFactory) invocation.getMock();
URI physicalAddress = invocation.getArgumentAt(0, URI.class);
return new FakeEndpoint(factory, physicalAddress, expected);
}).when(endpointFactory).createEndpoint(any());
return client;
return new RntbdTransportClient(new FakeEndpoint.Provider(options, sslContext, expected));
}
private void validateFailure(final Single<? extends StoreResponse> single, final FailureValidator validator) {
@ -814,16 +800,14 @@ public class RntbdTransportClientTest {
}
}
private static final class FakeEndpoint implements RntbdTransportClient.Endpoint {
private static final class FakeEndpoint implements RntbdEndpoint {
final RntbdTransportClient.EndpointFactory factory;
final RntbdRequestTimer requestTimer;
final FakeChannel fakeChannel;
final URI physicalAddress;
final RntbdRequestManager requestManager;
FakeEndpoint(
final RntbdTransportClient.EndpointFactory factory,
final URI physicalAddress,
private FakeEndpoint(
final Config config, final RntbdRequestTimer timer, final URI physicalAddress,
final RntbdResponse... expected
) {
@ -831,17 +815,16 @@ public class RntbdTransportClientTest {
expected.length, true, Arrays.asList(expected)
);
this.requestManager = new RntbdRequestManager();
RntbdRequestManager requestManager = new RntbdRequestManager();
this.physicalAddress = physicalAddress;
this.requestTimer = timer;
this.fakeChannel = new FakeChannel(responses,
new RntbdContextNegotiator(this.requestManager, factory.getPipelineConfig().getUserAgent()),
new RntbdContextNegotiator(requestManager, config.getUserAgent()),
new RntbdRequestEncoder(),
new RntbdResponseDecoder(),
this.requestManager
requestManager
);
this.factory = factory;
}
@Override
@ -851,10 +834,43 @@ public class RntbdTransportClientTest {
@Override
public RntbdRequestRecord request(final RntbdRequestArgs requestArgs) {
final RntbdRequestRecord requestRecord = new RntbdRequestRecord(requestArgs, this.factory.getRequestTimer());
final RntbdRequestRecord requestRecord = new RntbdRequestRecord(requestArgs, this.requestTimer);
this.fakeChannel.writeOutbound(requestRecord);
return requestRecord;
}
static class Provider implements RntbdEndpoint.Provider {
final Config config;
final RntbdResponse expected;
final RntbdRequestTimer timer;
Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
this.config = new Config(options, sslContext, LogLevel.WARN);
this.timer = new RntbdRequestTimer(config.getRequestTimeout());
this.expected = expected;
}
@Override
public void close() {
this.timer.close();
}
@Override
public int count() {
return 1;
}
@Override
public RntbdEndpoint get(URI physicalAddress) {
return new FakeEndpoint(config, timer, physicalAddress, expected);
}
@Override
public Stream<RntbdEndpoint> list() {
return Stream.empty();
}
}
}
private static final class RntbdTestConfiguration {