Refactored RntbdTransportClient.Options for improved usability.
This commit is contained in:
Родитель
ecfd4c9b56
Коммит
45b7ef984d
|
@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
public final class RntbdTransportClient extends TransportClient implements AutoCloseable {
|
||||
|
@ -74,12 +75,17 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
this.metrics = new Metrics();
|
||||
}
|
||||
|
||||
RntbdTransportClient(final Options options, final SslContext sslContext, final UserAgentContainer userAgent) {
|
||||
this(new EndpointFactory(options, sslContext, userAgent));
|
||||
RntbdTransportClient(final Options options, final SslContext sslContext) {
|
||||
this(new EndpointFactory(options, sslContext));
|
||||
}
|
||||
|
||||
RntbdTransportClient(final Configs configs, final int requestTimeoutInSeconds, final UserAgentContainer userAgent) {
|
||||
this(new Options(Duration.ofSeconds((long)requestTimeoutInSeconds)), configs.getSslContext(), userAgent);
|
||||
this(new Options.Builder()
|
||||
.requestTimeout(Duration.ofSeconds(requestTimeoutInSeconds))
|
||||
.userAgent(userAgent)
|
||||
.build(),
|
||||
configs.getSslContext()
|
||||
);
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
@ -172,17 +178,14 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
|
||||
private final Options options;
|
||||
private final SslContext sslContext;
|
||||
private final UserAgentContainer userAgent;
|
||||
private final LogLevel wireLogLevel;
|
||||
|
||||
Config(final UserAgentContainer userAgent, final SslContext sslContext, final LogLevel wireLogLevel, final Options options) {
|
||||
Config(final SslContext sslContext, final LogLevel wireLogLevel, final Options options) {
|
||||
|
||||
checkNotNull(sslContext, "sslContext");
|
||||
checkNotNull(userAgent, "userAgent");
|
||||
checkNotNull(options, "options");
|
||||
|
||||
this.sslContext = sslContext;
|
||||
this.userAgent = userAgent;
|
||||
this.wireLogLevel = wireLogLevel;
|
||||
this.options = options;
|
||||
}
|
||||
|
@ -219,7 +222,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
}
|
||||
|
||||
public UserAgentContainer getUserAgent() {
|
||||
return this.userAgent;
|
||||
return this.options.getUserAgent();
|
||||
}
|
||||
|
||||
public LogLevel getWireLogLevel() {
|
||||
|
@ -234,11 +237,10 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
private final Config pipelineConfig;
|
||||
private final RntbdRequestTimer requestTimer;
|
||||
|
||||
EndpointFactory(final Options options, final SslContext sslContext, final UserAgentContainer userAgent) {
|
||||
EndpointFactory(final Options options, final SslContext sslContext) {
|
||||
|
||||
checkNotNull(options, "options");
|
||||
checkNotNull(sslContext, "sslContext");
|
||||
checkNotNull(userAgent, "userAgent");
|
||||
|
||||
final DefaultThreadFactory threadFactory = new DefaultThreadFactory("CosmosEventLoop", true);
|
||||
final int threadCount = Runtime.getRuntime().availableProcessors();
|
||||
|
@ -254,7 +256,7 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
|
||||
this.requestTimer = new RntbdRequestTimer(options.getRequestTimeout());
|
||||
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
|
||||
this.pipelineConfig = new Config(userAgent, sslContext, wireLogLevel, options);
|
||||
this.pipelineConfig = new Config(sslContext, wireLogLevel, options);
|
||||
}
|
||||
|
||||
Config getPipelineConfig() {
|
||||
|
@ -347,39 +349,41 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
|
||||
// region Fields
|
||||
|
||||
private String certificateHostNameOverride;
|
||||
private int maxChannelsPerEndpoint;
|
||||
private int maxRequestsPerChannel;
|
||||
private Duration openTimeout = Duration.ZERO;
|
||||
private int partitionCount;
|
||||
private Duration receiveHangDetectionTime;
|
||||
private Duration requestTimeout;
|
||||
private Duration sendHangDetectionTime;
|
||||
private Duration timerPoolResolution = Duration.ZERO;
|
||||
private UserAgentContainer userAgent = null;
|
||||
private final String certificateHostNameOverride;
|
||||
private final int maxChannelsPerEndpoint;
|
||||
private final int maxRequestsPerChannel;
|
||||
private final Duration openTimeout;
|
||||
private final int partitionCount;
|
||||
private final Duration receiveHangDetectionTime;
|
||||
private final Duration requestTimeout;
|
||||
private final Duration sendHangDetectionTime;
|
||||
private final UserAgentContainer userAgent;
|
||||
|
||||
// endregion
|
||||
|
||||
// region Constructors
|
||||
|
||||
public Options(final Duration requestTimeout) {
|
||||
this(new Builder().requestTimeout(requestTimeout));
|
||||
}
|
||||
|
||||
public Options(final int requestTimeoutInSeconds) {
|
||||
this(Duration.ofSeconds((long)requestTimeoutInSeconds));
|
||||
}
|
||||
|
||||
public Options(final Duration requestTimeout) {
|
||||
private Options(Builder builder) {
|
||||
|
||||
checkNotNull(requestTimeout, "requestTimeoutInterval");
|
||||
checkNotNull(builder.requestTimeout, "requestTimeout: null");
|
||||
|
||||
if (requestTimeout.compareTo(Duration.ZERO) <= 0) {
|
||||
throw new IllegalArgumentException("requestTimeoutInterval");
|
||||
}
|
||||
|
||||
this.maxChannelsPerEndpoint = 10;
|
||||
this.maxRequestsPerChannel = 30;
|
||||
this.partitionCount = 1;
|
||||
this.receiveHangDetectionTime = Duration.ofSeconds(65L);
|
||||
this.requestTimeout = requestTimeout;
|
||||
this.sendHangDetectionTime = Duration.ofSeconds(10L);
|
||||
this.certificateHostNameOverride = builder.certificateHostNameOverride;
|
||||
this.maxChannelsPerEndpoint = builder.maxChannelsPerEndpoint;
|
||||
this.maxRequestsPerChannel = builder.maxRequestsPerChannel;
|
||||
this.openTimeout = builder.openTimeout == null ? builder.requestTimeout : builder.openTimeout;
|
||||
this.partitionCount = builder.partitionCount;
|
||||
this.requestTimeout = builder.requestTimeout;
|
||||
this.receiveHangDetectionTime = builder.receiveHangDetectionTime;
|
||||
this.sendHangDetectionTime = builder.sendHangDetectionTime;
|
||||
this.userAgent = builder.userAgent;
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
@ -390,50 +394,26 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this.certificateHostNameOverride;
|
||||
}
|
||||
|
||||
public void setCertificateHostNameOverride(final String value) {
|
||||
this.certificateHostNameOverride = value;
|
||||
}
|
||||
|
||||
public int getMaxChannelsPerEndpoint() {
|
||||
return this.maxChannelsPerEndpoint;
|
||||
}
|
||||
|
||||
public void setMaxChannelsPerEndpoint(final int value) {
|
||||
this.maxChannelsPerEndpoint = value;
|
||||
}
|
||||
|
||||
public int getMaxRequestsPerChannel() {
|
||||
return this.maxRequestsPerChannel;
|
||||
}
|
||||
|
||||
public void setMaxRequestsPerChannel(final int maxRequestsPerChannel) {
|
||||
this.maxRequestsPerChannel = maxRequestsPerChannel;
|
||||
}
|
||||
|
||||
public Duration getOpenTimeout() {
|
||||
return this.openTimeout.isNegative() || this.openTimeout.isZero() ? this.requestTimeout : this.openTimeout;
|
||||
}
|
||||
|
||||
public void setOpenTimeout(final Duration value) {
|
||||
this.openTimeout = value;
|
||||
return this.openTimeout;
|
||||
}
|
||||
|
||||
public int getPartitionCount() {
|
||||
return this.partitionCount;
|
||||
}
|
||||
|
||||
public void setPartitionCount(final int value) {
|
||||
this.partitionCount = value;
|
||||
}
|
||||
|
||||
public Duration getReceiveHangDetectionTime() {
|
||||
return this.receiveHangDetectionTime;
|
||||
}
|
||||
|
||||
public void setReceiveHangDetectionTime(final Duration value) {
|
||||
this.receiveHangDetectionTime = value;
|
||||
}
|
||||
|
||||
public Duration getRequestTimeout() {
|
||||
return this.requestTimeout;
|
||||
}
|
||||
|
@ -442,63 +422,95 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this.sendHangDetectionTime;
|
||||
}
|
||||
|
||||
public void setSendHangDetectionTime(final Duration value) {
|
||||
this.sendHangDetectionTime = value;
|
||||
}
|
||||
|
||||
public Duration getTimerPoolResolution() {
|
||||
return calculateTimerPoolResolutionSeconds(this.timerPoolResolution, this.requestTimeout, this.openTimeout);
|
||||
}
|
||||
|
||||
public void setTimerPoolResolution(final Duration value) {
|
||||
this.timerPoolResolution = value;
|
||||
}
|
||||
|
||||
public UserAgentContainer getUserAgent() {
|
||||
|
||||
if (this.userAgent != null) {
|
||||
return this.userAgent;
|
||||
}
|
||||
|
||||
this.userAgent = new UserAgentContainer();
|
||||
return this.userAgent;
|
||||
}
|
||||
|
||||
public void setUserAgent(final UserAgentContainer value) {
|
||||
this.userAgent = value;
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region Methods
|
||||
// region Types
|
||||
|
||||
private static Duration calculateTimerPoolResolutionSeconds(
|
||||
final Duration timerPoolResolution,
|
||||
final Duration requestTimeout,
|
||||
final Duration openTimeout
|
||||
) {
|
||||
public static class Builder {
|
||||
|
||||
checkNotNull(timerPoolResolution, "timerPoolResolution");
|
||||
checkNotNull(requestTimeout, "requestTimeoutInterval");
|
||||
checkNotNull(openTimeout, "openTimeout");
|
||||
private static final UserAgentContainer DEFAULT_USER_AGENT_CONTAINER = new UserAgentContainer();
|
||||
private static final Duration SIXTY_FIVE_SECONDS = Duration.ofSeconds(65L);
|
||||
private static final Duration TEN_SECONDS = Duration.ofSeconds(10L);
|
||||
|
||||
if (timerPoolResolution.compareTo(Duration.ZERO) <= 0 && requestTimeout.compareTo(Duration.ZERO) <= 0 &&
|
||||
openTimeout.compareTo(Duration.ZERO) <= 0) {
|
||||
private String certificateHostNameOverride = null;
|
||||
private int maxChannelsPerEndpoint = 10;
|
||||
private int maxRequestsPerChannel = 30;
|
||||
private Duration openTimeout = null;
|
||||
private int partitionCount = 1;
|
||||
private Duration receiveHangDetectionTime = SIXTY_FIVE_SECONDS;
|
||||
private Duration requestTimeout = null;
|
||||
private Duration sendHangDetectionTime = TEN_SECONDS;
|
||||
private UserAgentContainer userAgent = DEFAULT_USER_AGENT_CONTAINER;
|
||||
|
||||
throw new IllegalStateException("RntbdTransportClient.Options");
|
||||
Options build() {
|
||||
return new Options(this);
|
||||
}
|
||||
|
||||
if (timerPoolResolution.compareTo(Duration.ZERO) > 0 && timerPoolResolution.compareTo(openTimeout) < 0 &&
|
||||
timerPoolResolution.compareTo(requestTimeout) < 0) {
|
||||
|
||||
return timerPoolResolution;
|
||||
Builder certificateHostNameOverride(final String value) {
|
||||
this.certificateHostNameOverride = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
if (openTimeout.compareTo(Duration.ZERO) > 0 && requestTimeout.compareTo(Duration.ZERO) > 0) {
|
||||
return openTimeout.compareTo(requestTimeout) < 0 ? openTimeout : requestTimeout;
|
||||
Builder maxChannelsPerEndpoint(final int value) {
|
||||
checkArgument(value > 0, "value: %s", value);
|
||||
this.maxChannelsPerEndpoint = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
return openTimeout.compareTo(Duration.ZERO) > 0 ? openTimeout : requestTimeout;
|
||||
public Builder maxRequestsPerChannel(final int value) {
|
||||
checkArgument(value > 0, "value: %s", value);
|
||||
this.maxRequestsPerChannel = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder openTimeout(final Duration value) {
|
||||
checkArgument(value == null || value.compareTo(Duration.ZERO) > 0, "value: %s", value);
|
||||
this.openTimeout = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder partitionCount(final int value) {
|
||||
checkArgument(value > 0, "value: %s", value);
|
||||
this.partitionCount = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder receiveHangDetectionTime(final Duration value) {
|
||||
|
||||
checkNotNull(value, "value: null");
|
||||
checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value);
|
||||
|
||||
this.receiveHangDetectionTime = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder requestTimeout(final Duration value) {
|
||||
|
||||
checkNotNull(value, "value: null");
|
||||
checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value);
|
||||
|
||||
this.requestTimeout = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder sendHangDetectionTime(final Duration value) {
|
||||
|
||||
checkNotNull(value, "value: null");
|
||||
checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value);
|
||||
|
||||
this.sendHangDetectionTime = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder userAgent(final UserAgentContainer value) {
|
||||
checkNotNull(value, "value: null");
|
||||
this.userAgent = value;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
|
|
@ -598,9 +598,8 @@ public class RntbdTransportClientTest {
|
|||
|
||||
final RntbdTransportClient.Options options = new RntbdTransportClient.Options(requestTimeout);
|
||||
final SslContext sslContext = SslContextBuilder.forClient().build();
|
||||
final UserAgentContainer userAgent = new UserAgentContainer();
|
||||
|
||||
try (final RntbdTransportClient transportClient = new RntbdTransportClient(options, sslContext, userAgent)) {
|
||||
try (final RntbdTransportClient transportClient = new RntbdTransportClient(options, sslContext)) {
|
||||
|
||||
final BaseAuthorizationTokenProvider authorizationTokenProvider = new BaseAuthorizationTokenProvider(
|
||||
RntbdTestConfiguration.AccountKey
|
||||
|
@ -729,7 +728,7 @@ public class RntbdTransportClientTest {
|
|||
}
|
||||
|
||||
final RntbdTransportClient.EndpointFactory endpointFactory = spy(new RntbdTransportClient.EndpointFactory(
|
||||
options, sslContext, userAgent
|
||||
options, sslContext
|
||||
));
|
||||
|
||||
final RntbdTransportClient client = new RntbdTransportClient(endpointFactory);
|
||||
|
|
Загрузка…
Ссылка в новой задаче