Add support for specifying default Direct TCP options using System properties (#299)
* Configs.getProtocol now accepts in priority order -Dcosmos.directModeProtocol, -Dazure.cosmos.directModeProtocol (for compatability with v4) and the environment variable DIRECT_MODE_PROTOCOL (which is sometimes convenient in test environments) * ReadMyWritesConsistencyTest now sets document data field size and RntbdTransportClient can now load default direct tcp options * Removed option to specify document data field size on ReadMyWritesConsistencyTest and fixed a test break. * Enclosed static initializer in try/finally to reduce chance of unexpected failures and renamed a mvn version property for clarity. * Bug fix and tweaks based on code review feedback * Increased request timer resolution from 5 to 100 ms * Decreased request timer resolution from 100 to 5 ms * Added RntbdTransportClient.Options.requestTimerResolution property for use by RntbdRequestTimer and renamed long times with unit suffix (e.g., InNanos for nanosecond values and InMillis for millisecond values)
This commit is contained in:
Родитель
0fa78725cb
Коммит
66719ab4a1
|
@ -33,7 +33,6 @@ import com.microsoft.azure.cosmosdb.SqlParameterCollection;
|
|||
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
|
||||
import com.microsoft.azure.cosmosdb.internal.Utils;
|
||||
import com.microsoft.azure.cosmosdb.rx.internal.NotFoundException;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import rx.Observable;
|
||||
import rx.Subscriber;
|
||||
|
|
|
@ -69,11 +69,6 @@ public class ReadMyWritesConsistencyTest {
|
|||
StringUtils.defaultString(Strings.emptyToNull(
|
||||
System.getenv().get("DESIRED_CONSISTENCY")), "Session"));
|
||||
|
||||
private final String directModeProtocol =
|
||||
System.getProperty("azure.cosmos.directModeProtocol",
|
||||
StringUtils.defaultString(Strings.emptyToNull(
|
||||
System.getenv().get("DIRECT_MODE_PROTOCOL")), Protocol.Tcp.name()));
|
||||
|
||||
private final int initialCollectionThroughput = 10_000;
|
||||
|
||||
private final String maxRunningTime =
|
||||
|
@ -122,7 +117,6 @@ public class ReadMyWritesConsistencyTest {
|
|||
Configuration cfg = new Configuration();
|
||||
new JCommander(cfg, StringUtils.split(cmd));
|
||||
|
||||
logger.info("azure.cosmos.directModeProtocol={}, {}", directModeProtocol, cfg);
|
||||
AtomicInteger success = new AtomicInteger();
|
||||
AtomicInteger error = new AtomicInteger();
|
||||
|
||||
|
|
|
@ -88,7 +88,13 @@ public class Configs {
|
|||
}
|
||||
|
||||
public Protocol getProtocol() {
|
||||
String protocol = getJVMConfigAsString(PROTOCOL, DEFAULT_PROTOCOL.name());
|
||||
|
||||
String protocol = getJVMConfigAsString(PROTOCOL, StringUtils.defaultString(
|
||||
StringUtils.defaultString(
|
||||
System.getProperty("azure.cosmos.directModeProtocol"),
|
||||
System.getenv("DIRECT_MODE_PROTOCOL")),
|
||||
DEFAULT_PROTOCOL.name()));
|
||||
|
||||
try {
|
||||
return Protocol.valueOf(WordUtils.capitalize(protocol.toLowerCase()));
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -26,6 +26,7 @@ package com.microsoft.azure.cosmosdb.rx.internal;
|
|||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class ConfigsTests {
|
||||
|
@ -45,7 +46,12 @@ public class ConfigsTests {
|
|||
@Test(groups = { "unit" })
|
||||
public void getProtocol() {
|
||||
Configs config = new Configs();
|
||||
assertThat(config.getProtocol()).isEqualTo(Protocol.valueOf(System.getProperty("cosmos.directModeProtocol", "Tcp")));
|
||||
Protocol expected = Protocol.valueOf(System.getProperty("cosmos.directModeProtocol",
|
||||
System.getProperty("azure.cosmos.directModeProtocol",
|
||||
StringUtils.defaultString(
|
||||
System.getenv("DIRECT_MODE_PROTOCOL"),
|
||||
"Tcp"))));
|
||||
assertThat(config.getProtocol()).isEqualTo(expected);
|
||||
}
|
||||
|
||||
@Test(groups = { "unit" })
|
||||
|
|
|
@ -36,6 +36,7 @@ SOFTWARE.
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<test.groups>unit</test.groups>
|
||||
<guava.version>27.0.1-jre</guava.version>
|
||||
<jackson-datatype-jsr310.version>2.9.9</jackson-datatype-jsr310.version>
|
||||
<metrics.version>4.1.0</metrics.version>
|
||||
<micrometer.version>1.2.0</micrometer.version>
|
||||
</properties>
|
||||
|
@ -295,6 +296,11 @@ SOFTWARE.
|
|||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||
<version>${jackson-datatype-jsr310.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
|
||||
package com.microsoft.azure.cosmosdb.internal.directconnectivity;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
|
@ -45,7 +47,9 @@ import org.slf4j.LoggerFactory;
|
|||
import rx.Single;
|
||||
import rx.SingleEmitter;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.Iterator;
|
||||
|
@ -190,42 +194,88 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
|
||||
// region Fields
|
||||
|
||||
@JsonProperty()
|
||||
private final int bufferPageSize;
|
||||
private final String certificateHostNameOverride;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration connectionTimeout;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration idleChannelTimeout;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration idleEndpointTimeout;
|
||||
|
||||
@JsonProperty()
|
||||
private final int maxBufferCapacity;
|
||||
|
||||
@JsonProperty()
|
||||
private final int maxChannelsPerEndpoint;
|
||||
|
||||
@JsonProperty()
|
||||
private final int maxRequestsPerChannel;
|
||||
private final int partitionCount;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration receiveHangDetectionTime;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration requestExpiryInterval;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration requestTimeout;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration requestTimerResolution;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration sendHangDetectionTime;
|
||||
|
||||
@JsonProperty()
|
||||
private final Duration shutdownTimeout;
|
||||
|
||||
@JsonIgnore()
|
||||
private final UserAgentContainer userAgent;
|
||||
|
||||
// endregion
|
||||
|
||||
// region Constructors
|
||||
|
||||
private Options() {
|
||||
this.bufferPageSize = 8192;
|
||||
this.connectionTimeout = null;
|
||||
this.idleChannelTimeout = Duration.ZERO;
|
||||
this.idleEndpointTimeout = Duration.ofSeconds(70L);
|
||||
this.maxBufferCapacity = 8192 << 10;
|
||||
this.maxChannelsPerEndpoint = 10;
|
||||
this.maxRequestsPerChannel = 30;
|
||||
this.receiveHangDetectionTime = Duration.ofSeconds(65L);
|
||||
this.requestExpiryInterval = Duration.ofSeconds(5L);
|
||||
this.requestTimeout = null;
|
||||
this.requestTimerResolution = Duration.ofMillis(5L);
|
||||
this.sendHangDetectionTime = Duration.ofSeconds(10L);
|
||||
this.shutdownTimeout = Duration.ofSeconds(15L);
|
||||
this.userAgent = new UserAgentContainer();
|
||||
}
|
||||
|
||||
private Options(Builder builder) {
|
||||
|
||||
this.bufferPageSize = builder.bufferPageSize;
|
||||
this.certificateHostNameOverride = builder.certificateHostNameOverride;
|
||||
this.connectionTimeout = builder.connectionTimeout == null ? builder.requestTimeout : builder.connectionTimeout;
|
||||
this.idleChannelTimeout = builder.idleChannelTimeout;
|
||||
this.idleEndpointTimeout = builder.idleEndpointTimeout;
|
||||
this.maxBufferCapacity = builder.maxBufferCapacity;
|
||||
this.maxChannelsPerEndpoint = builder.maxChannelsPerEndpoint;
|
||||
this.maxRequestsPerChannel = builder.maxRequestsPerChannel;
|
||||
this.partitionCount = builder.partitionCount;
|
||||
this.receiveHangDetectionTime = builder.receiveHangDetectionTime;
|
||||
this.requestExpiryInterval = builder.requestExpiryInterval;
|
||||
this.requestTimeout = builder.requestTimeout;
|
||||
this.requestTimerResolution = builder.requestTimerResolution;
|
||||
this.sendHangDetectionTime = builder.sendHangDetectionTime;
|
||||
this.shutdownTimeout = builder.shutdownTimeout;
|
||||
this.userAgent = builder.userAgent;
|
||||
|
||||
this.connectionTimeout = builder.connectionTimeout == null
|
||||
? builder.requestTimeout
|
||||
: builder.connectionTimeout;
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
@ -236,10 +286,6 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this.bufferPageSize;
|
||||
}
|
||||
|
||||
public String certificateHostNameOverride() {
|
||||
return this.certificateHostNameOverride;
|
||||
}
|
||||
|
||||
public Duration connectionTimeout() {
|
||||
return this.connectionTimeout;
|
||||
}
|
||||
|
@ -264,10 +310,6 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this.maxRequestsPerChannel;
|
||||
}
|
||||
|
||||
public int partitionCount() {
|
||||
return this.partitionCount;
|
||||
}
|
||||
|
||||
public Duration receiveHangDetectionTime() {
|
||||
return this.receiveHangDetectionTime;
|
||||
}
|
||||
|
@ -280,6 +322,10 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this.requestTimeout;
|
||||
}
|
||||
|
||||
public Duration requestTimerResolution() {
|
||||
return this.requestTimerResolution;
|
||||
}
|
||||
|
||||
public Duration sendHangDetectionTime() {
|
||||
return this.sendHangDetectionTime;
|
||||
}
|
||||
|
@ -288,17 +334,17 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this.shutdownTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return RntbdObjectMapper.toJson(this);
|
||||
public UserAgentContainer userAgent() {
|
||||
return this.userAgent;
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region Methods
|
||||
|
||||
public UserAgentContainer userAgent() {
|
||||
return this.userAgent;
|
||||
@Override
|
||||
public String toString() {
|
||||
return RntbdObjectMapper.toJson(this);
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
@ -310,35 +356,105 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
|
||||
// region Fields
|
||||
|
||||
private static final UserAgentContainer DEFAULT_USER_AGENT_CONTAINER = new UserAgentContainer();
|
||||
private static final Duration FIFTEEN_SECONDS = Duration.ofSeconds(15L);
|
||||
private static final Duration FIVE_SECONDS =Duration.ofSeconds(5L);
|
||||
private static final Duration SEVENTY_SECONDS = Duration.ofSeconds(70L);
|
||||
private static final Duration SIXTY_FIVE_SECONDS = Duration.ofSeconds(65L);
|
||||
private static final Duration TEN_SECONDS = Duration.ofSeconds(10L);
|
||||
private static final String DEFAULT_OPTIONS_PROPERTY_NAME = "azure.cosmos.directTcp.defaultOptions";
|
||||
private static final Options DEFAULT_OPTIONS;
|
||||
|
||||
private int bufferPageSize = 8192;
|
||||
private String certificateHostNameOverride = null;
|
||||
private Duration connectionTimeout = null;
|
||||
private Duration idleChannelTimeout = Duration.ZERO;
|
||||
private Duration idleEndpointTimeout = SEVENTY_SECONDS;
|
||||
private int maxBufferCapacity = 8192 << 10;
|
||||
private int maxChannelsPerEndpoint = 10;
|
||||
private int maxRequestsPerChannel = 30;
|
||||
private int partitionCount = 1;
|
||||
private Duration receiveHangDetectionTime = SIXTY_FIVE_SECONDS;
|
||||
private Duration requestExpiryInterval = FIVE_SECONDS;
|
||||
static {
|
||||
|
||||
// In priority order we take default Direct TCP options from:
|
||||
//
|
||||
// 1. the string value of system property "azure.cosmos.directTcp.options", or
|
||||
// 2. the contents of the file located by the system property "azure.cosmos.directTcp.optionsFile", or
|
||||
// 3. the contents of the resource file named "azure.cosmos.directTcp.options.json"
|
||||
//
|
||||
// Otherwise, if none of these values are set or an error occurs we create default options based on a
|
||||
// set of hard-wired values defined in the default private parameterless constructor for
|
||||
// RntbdTransportClient.Options.
|
||||
|
||||
Options options = null;
|
||||
|
||||
try {
|
||||
final String string = System.getProperty(DEFAULT_OPTIONS_PROPERTY_NAME);
|
||||
|
||||
if (string != null) {
|
||||
// Attempt to set default options based on the JSON string value of "{propertyName}"
|
||||
try {
|
||||
options = RntbdObjectMapper.readValue(string, Options.class);
|
||||
} catch (IOException error) {
|
||||
logger.error("failed to parse default Direct TCP options {} due to ", string, error);
|
||||
}
|
||||
}
|
||||
|
||||
if (options == null) {
|
||||
|
||||
final String path = System.getProperty(DEFAULT_OPTIONS_PROPERTY_NAME + "File");
|
||||
|
||||
if (path != null) {
|
||||
// Attempt to load default options from the JSON file on the path specified by
|
||||
// "{propertyName}File"
|
||||
try {
|
||||
options = RntbdObjectMapper.readValue(new File(path), Options.class);
|
||||
} catch (IOException error) {
|
||||
logger.error("failed to load default Direct TCP options from {} due to ", path, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (options == null) {
|
||||
|
||||
final ClassLoader loader = RntbdTransportClient.class.getClassLoader();
|
||||
final String name = DEFAULT_OPTIONS_PROPERTY_NAME + ".json";
|
||||
|
||||
try (final InputStream stream = loader.getResourceAsStream(name)) {
|
||||
if (stream != null) {
|
||||
// Attempt to load default options from the JSON resource file "{propertyName}.json"
|
||||
options = RntbdObjectMapper.readValue(stream, Options.class);
|
||||
}
|
||||
} catch (IOException error) {
|
||||
logger.error("failed to load Direct TCP options from resource {} due to ", name, error);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
DEFAULT_OPTIONS = options != null ? options : new Options();
|
||||
}
|
||||
}
|
||||
|
||||
private int bufferPageSize;
|
||||
private Duration connectionTimeout;
|
||||
private Duration idleChannelTimeout;
|
||||
private Duration idleEndpointTimeout;
|
||||
private int maxBufferCapacity;
|
||||
private int maxChannelsPerEndpoint;
|
||||
private int maxRequestsPerChannel;
|
||||
private Duration receiveHangDetectionTime;
|
||||
private Duration requestExpiryInterval;
|
||||
private Duration requestTimeout;
|
||||
private Duration sendHangDetectionTime = TEN_SECONDS;
|
||||
private Duration shutdownTimeout = FIFTEEN_SECONDS;
|
||||
private UserAgentContainer userAgent = DEFAULT_USER_AGENT_CONTAINER;
|
||||
private Duration requestTimerResolution;
|
||||
private Duration sendHangDetectionTime;
|
||||
private Duration shutdownTimeout;
|
||||
private UserAgentContainer userAgent;
|
||||
|
||||
// endregion
|
||||
|
||||
// region Constructors
|
||||
|
||||
public Builder(Duration requestTimeout) {
|
||||
|
||||
this.requestTimeout(requestTimeout);
|
||||
|
||||
this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize;
|
||||
this.connectionTimeout = DEFAULT_OPTIONS.connectionTimeout;
|
||||
this.idleChannelTimeout = DEFAULT_OPTIONS.idleChannelTimeout;
|
||||
this.idleEndpointTimeout = DEFAULT_OPTIONS.idleEndpointTimeout;
|
||||
this.maxBufferCapacity = DEFAULT_OPTIONS.maxBufferCapacity;
|
||||
this.maxChannelsPerEndpoint = DEFAULT_OPTIONS.maxChannelsPerEndpoint;
|
||||
this.maxRequestsPerChannel = DEFAULT_OPTIONS.maxRequestsPerChannel;
|
||||
this.receiveHangDetectionTime = DEFAULT_OPTIONS.receiveHangDetectionTime;
|
||||
this.requestExpiryInterval = DEFAULT_OPTIONS.requestExpiryInterval;
|
||||
this.requestTimerResolution = DEFAULT_OPTIONS.requestTimerResolution;
|
||||
this.sendHangDetectionTime = DEFAULT_OPTIONS.sendHangDetectionTime;
|
||||
this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout;
|
||||
this.userAgent = DEFAULT_OPTIONS.userAgent;
|
||||
}
|
||||
|
||||
public Builder(int requestTimeoutInSeconds) {
|
||||
|
@ -365,11 +481,6 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return new Options(this);
|
||||
}
|
||||
|
||||
public Builder certificateHostNameOverride(final String value) {
|
||||
this.certificateHostNameOverride = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder connectionTimeout(final Duration value) {
|
||||
checkArgument(value == null || value.compareTo(Duration.ZERO) > 0,
|
||||
"expected positive value, not %s",
|
||||
|
@ -412,12 +523,6 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder partitionCount(final int value) {
|
||||
checkArgument(value > 0, "expected positive value, not %s", value);
|
||||
this.partitionCount = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder receiveHangDetectionTime(final Duration value) {
|
||||
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
|
||||
"expected positive value, not %s",
|
||||
|
@ -442,6 +547,14 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder requestTimerResolution(final Duration value) {
|
||||
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
|
||||
"expected positive value, not %s",
|
||||
value);
|
||||
this.requestTimerResolution = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder sendHangDetectionTime(final Duration value) {
|
||||
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
|
||||
"expected positive value, not %s",
|
||||
|
|
|
@ -117,9 +117,9 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple
|
|||
checkNotNull(channel);
|
||||
|
||||
final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel());
|
||||
final long readerIdleTime = this.config.receiveHangDetectionTime();
|
||||
final long writerIdleTime = this.config.sendHangDetectionTime();
|
||||
final long allIdleTime = this.config.idleConnectionTimeout();
|
||||
final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos();
|
||||
final long writerIdleTime = this.config.sendHangDetectionTimeInNanos();
|
||||
final long allIdleTime = this.config.idleConnectionTimeoutInNanos();
|
||||
final ChannelPipeline pipeline = channel.pipeline();
|
||||
|
||||
pipeline.addFirst(
|
||||
|
|
|
@ -90,12 +90,12 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck
|
|||
|
||||
checkNotNull(config, "config: null");
|
||||
|
||||
this.idleConnectionTimeout = config.idleConnectionTimeout();
|
||||
this.idleConnectionTimeout = config.idleConnectionTimeoutInNanos();
|
||||
|
||||
this.readDelayLimit = config.receiveHangDetectionTime();
|
||||
this.readDelayLimit = config.receiveHangDetectionTimeInNanos();
|
||||
checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit);
|
||||
|
||||
this.writeDelayLimit = config.sendHangDetectionTime();
|
||||
this.writeDelayLimit = config.sendHangDetectionTimeInNanos();
|
||||
checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit);
|
||||
}
|
||||
|
||||
|
|
|
@ -167,7 +167,7 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
|
|||
}
|
||||
}
|
||||
|
||||
final long idleEndpointTimeout = config.idleEndpointTimeout();
|
||||
final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos();
|
||||
|
||||
this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
|
||||
() -> {
|
||||
|
|
|
@ -126,19 +126,19 @@ public interface RntbdEndpoint extends AutoCloseable {
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public int connectionTimeout() {
|
||||
public int connectionTimeoutInMillis() {
|
||||
final long value = this.options.connectionTimeout().toMillis();
|
||||
assert value <= Integer.MAX_VALUE;
|
||||
return (int)value;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long idleConnectionTimeout() {
|
||||
public long idleConnectionTimeoutInNanos() {
|
||||
return this.options.idleChannelTimeout().toNanos();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long idleEndpointTimeout() {
|
||||
public long idleEndpointTimeoutInNanos() {
|
||||
return this.options.idleEndpointTimeout().toNanos();
|
||||
}
|
||||
|
||||
|
@ -158,22 +158,27 @@ public interface RntbdEndpoint extends AutoCloseable {
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public long receiveHangDetectionTime() {
|
||||
public long receiveHangDetectionTimeInNanos() {
|
||||
return this.options.receiveHangDetectionTime().toNanos();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long requestTimeout() {
|
||||
public long requestTimeoutInNanos() {
|
||||
return this.options.requestTimeout().toNanos();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long sendHangDetectionTime() {
|
||||
public long requestTimerResolutionInNanos() {
|
||||
return this.options.requestTimerResolution().toNanos();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long sendHangDetectionTimeInNanos() {
|
||||
return this.options.sendHangDetectionTime().toNanos();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long shutdownTimeout() {
|
||||
public long shutdownTimeoutInNanos() {
|
||||
return this.options.shutdownTimeout().toNanos();
|
||||
}
|
||||
|
||||
|
|
|
@ -28,18 +28,23 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeType;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.ser.PropertyFilter;
|
||||
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
|
||||
import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufInputStream;
|
||||
import io.netty.handler.codec.CorruptedFrameException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
@ -49,13 +54,38 @@ public final class RntbdObjectMapper {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RntbdObjectMapper.class);
|
||||
private static final SimpleFilterProvider filterProvider = new SimpleFilterProvider();
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper().setFilterProvider(filterProvider);
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper()
|
||||
.registerModule(new SimpleModule()
|
||||
.addSerializer(Duration.class, ToStringSerializer.instance)
|
||||
.addDeserializer(Duration.class, DurationDeserializer.INSTANCE))
|
||||
.setFilterProvider(filterProvider);
|
||||
|
||||
private static final ObjectWriter objectWriter = objectMapper.writer();
|
||||
|
||||
private static final ConcurrentHashMap<Class<?>, String> simpleClassNames = new ConcurrentHashMap<>();
|
||||
|
||||
private RntbdObjectMapper() {
|
||||
}
|
||||
|
||||
public static <T> T readValue(File file, Class<T> type) throws IOException {
|
||||
checkNotNull(file, "expected non-null file");
|
||||
checkNotNull(type, "expected non-null type");
|
||||
return objectMapper.readValue(file, type);
|
||||
}
|
||||
|
||||
public static <T> T readValue(InputStream stream, Class<T> type) throws IOException {
|
||||
checkNotNull(stream, "expected non-null stream");
|
||||
checkNotNull(type, "expected non-null type");
|
||||
return objectMapper.readValue(stream, type);
|
||||
}
|
||||
|
||||
public static <T> T readValue(String string, Class<T> type) throws IOException {
|
||||
checkNotNull(string, "expected non-null string");
|
||||
checkNotNull(type, "expected non-null type");
|
||||
return objectMapper.readValue(string, type);
|
||||
}
|
||||
|
||||
public static String toJson(final Object value) {
|
||||
try {
|
||||
return objectWriter.writeValueAsString(value);
|
||||
|
|
|
@ -36,24 +36,18 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public final class RntbdRequestTimer implements AutoCloseable {
|
||||
|
||||
private static final long FIVE_MILLISECONDS = 5000000L;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RntbdRequestTimer.class);
|
||||
private final long requestTimeout;
|
||||
private final long requestTimeoutInNanos;
|
||||
private final Timer timer;
|
||||
|
||||
public RntbdRequestTimer(final long requestTimeout) {
|
||||
|
||||
// Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures
|
||||
// a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because
|
||||
// cancellation of a timeout takes two timer resolution units to complete.
|
||||
|
||||
this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS);
|
||||
this.requestTimeout = requestTimeout;
|
||||
public RntbdRequestTimer(final long requestTimeoutInNanos, final long requestTimerResolutionInNanos) {
|
||||
// The HashWheelTimer code shows that cancellation of a timeout takes two timer resolution units to complete.
|
||||
this.timer = new HashedWheelTimer(requestTimerResolutionInNanos, TimeUnit.NANOSECONDS);
|
||||
this.requestTimeoutInNanos = requestTimeoutInNanos;
|
||||
}
|
||||
|
||||
public long getRequestTimeout(final TimeUnit unit) {
|
||||
return unit.convert(requestTimeout, TimeUnit.NANOSECONDS);
|
||||
return unit.convert(requestTimeoutInNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -68,6 +62,6 @@ public final class RntbdRequestTimer implements AutoCloseable {
|
|||
}
|
||||
|
||||
public Timeout newTimeout(final TimerTask task) {
|
||||
return this.timer.newTimeout(task, this.requestTimeout, TimeUnit.NANOSECONDS);
|
||||
return this.timer.newTimeout(task, this.requestTimeoutInNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,7 +100,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
|
|||
.group(group)
|
||||
.option(ChannelOption.ALLOCATOR, config.allocator())
|
||||
.option(ChannelOption.AUTO_READ, true)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout())
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeoutInMillis())
|
||||
.option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
.remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());
|
||||
|
@ -351,9 +351,12 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
|
|||
|
||||
this.transportClient = transportClient;
|
||||
this.config = new Config(options, sslContext, wireLogLevel);
|
||||
this.requestTimer = new RntbdRequestTimer(config.requestTimeout());
|
||||
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
|
||||
|
||||
this.requestTimer = new RntbdRequestTimer(
|
||||
config.requestTimeoutInNanos(),
|
||||
config.requestTimerResolutionInNanos());
|
||||
|
||||
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);
|
||||
this.endpoints = new ConcurrentHashMap<>();
|
||||
this.evictions = new AtomicInteger();
|
||||
this.closed = new AtomicBoolean();
|
||||
|
@ -370,7 +373,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
|
|||
endpoint.close();
|
||||
}
|
||||
|
||||
this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS)
|
||||
this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeoutInNanos(), NANOSECONDS)
|
||||
.addListener(future -> {
|
||||
if (future.isSuccess()) {
|
||||
logger.debug("\n [{}]\n closed endpoints", this);
|
||||
|
|
|
@ -934,7 +934,9 @@ public final class RntbdTransportClientTest {
|
|||
|
||||
Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
|
||||
this.config = new Config(options, sslContext, LogLevel.WARN);
|
||||
this.timer = new RntbdRequestTimer(config.requestTimeout());
|
||||
this.timer = new RntbdRequestTimer(
|
||||
config.requestTimeoutInNanos(),
|
||||
config.requestTimerResolutionInNanos());
|
||||
this.expected = expected;
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче