Revised RntbdClinetChannelPool.AcquireListener because the ReadMyWritesConsistencyTest showed that sometimes a channel will close while we're completing this operation

This commit is contained in:
David.Noble@microsoft.com 2019-07-11 21:57:05 -07:00
Родитель 6c079d5822
Коммит ad4cf3d64d
3 изменённых файлов: 39 добавлений и 21 удалений

Просмотреть файл

@ -116,7 +116,7 @@ public class ReadMyWritesConsistencyTest {
) + (useNameLink ? " -useNameLink" : "");
Configuration cfg = new Configuration();
JCommander commander = new JCommander(cfg, StringUtils.split(cmd));
new JCommander(cfg, StringUtils.split(cmd));
logger.info("cosmos.directModeProtocol={}, {}", directModeProtocol, cfg);
AtomicInteger success = new AtomicInteger();

Просмотреть файл

@ -181,7 +181,7 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck
if (completed.isSuccess()) {
promise.setSuccess(Boolean.TRUE);
} else {
logger.warn("{} health check request failed due to ", channel, completed.cause());
logger.warn("{} health check request failed due to:", channel, completed.cause());
promise.setSuccess(Boolean.FALSE);
}
});

Просмотреть файл

@ -66,6 +66,9 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
new TimeoutException("Acquisition took longer than the configured maximum time"),
RntbdClientChannelPool.class, "<init>(...)");
private static final ClosedChannelException CHANNEL_CLOSED_ON_ACQUIRE = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), RntbdClientChannelPool.class, "acquire0(...)");
private static final IllegalStateException POOL_CLOSED_ON_ACQUIRE = ThrowableUtil.unknownStackTrace(
new IllegalStateException("RntbdClientChannelPool was closed"),
RntbdClientChannelPool.class, "acquire0(...)");
@ -510,6 +513,8 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
return;
}
final Throwable cause;
if (future.isSuccess()) {
// Ensure that the channel is active and ready to receive requests
@ -522,39 +527,52 @@ public final class RntbdClientChannelPool extends SimpleChannelPool {
// 3. RntbdHealthCheckRequest -> RntbdHealthCheck
final Channel channel = future.getNow();
checkState(channel.isActive());
final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
checkState(requestManager != null);
if (channel.isActive()) {
if (requestManager.hasRequestedRntbdContext()) {
final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
this.originalPromise.setSuccess(channel);
if (requestManager != null) {
} else {
if (requestManager.hasRequestedRntbdContext()) {
channel.writeAndFlush(RntbdHealthCheckRequest.MESSAGE).addListener(completed -> {
if (completed.isSuccess()) {
reportIssueUnless(requestManager.hasRequestedRntbdContext(), logger, channel, "context request did not complete");
this.originalPromise.setSuccess(channel);
} else {
logger.error("{} health check request failed due to ", channel, completed.cause());
channel.close().addListener(closed -> this.pool.release(channel));
this.originalPromise.setFailure(completed.cause());
channel.writeAndFlush(RntbdHealthCheckRequest.MESSAGE).addListener(completed -> {
if (completed.isSuccess()) {
reportIssueUnless(requestManager.hasRequestedRntbdContext(), logger, channel, "context request did not complete");
this.originalPromise.setSuccess(channel);
} else {
logger.warn("{} health check request failed due to:", channel, completed.cause());
channel.close().addListener(closed -> this.pool.release(channel));
this.originalPromise.setFailure(completed.cause());
}
});
}
});
return;
}
reportIssueUnless(!channel.isActive(), logger, channel, "requestManager: null");
}
cause = CHANNEL_CLOSED_ON_ACQUIRE;
} else {
if (this.acquired) {
this.pool.decrementAndRunTaskQueue();
} else {
this.pool.runTaskQueue();
}
cause = future.cause();
this.originalPromise.setFailure(future.cause());
}
if (this.acquired) {
this.pool.decrementAndRunTaskQueue();
} else {
this.pool.runTaskQueue();
}
this.originalPromise.setFailure(cause);
}
}