Channel pool refinements
This commit is contained in:
Родитель
022a537b19
Коммит
2e3837b125
|
@ -30,8 +30,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
|||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelId;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.pool.ChannelHealthChecker;
|
||||
import io.netty.channel.pool.FixedChannelPool;
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
@ -43,27 +41,31 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdReporter.reportIssue;
|
||||
import static com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdReporter.reportIssueUnless;
|
||||
|
||||
@JsonSerialize(using = RntbdClientChannelPool.JsonSerializer.class)
|
||||
public final class RntbdClientChannelPool extends FixedChannelPool {
|
||||
|
||||
// region Fields
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelPool.class);
|
||||
private static final AtomicReference<Field> pendingAcquireCount = new AtomicReference<>();
|
||||
|
||||
private final ConcurrentHashMap<ChannelId, Channel> atCapacity;
|
||||
private final AtomicInteger availableChannelCount;
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
private final int maxChannels;
|
||||
private final int maxRequestsPerChannel;
|
||||
|
||||
// endregion
|
||||
|
||||
// region Methods
|
||||
|
||||
/**
|
||||
* Initializes a newly created {@link RntbdClientChannelPool} object
|
||||
*
|
||||
|
@ -79,7 +81,6 @@ public final class RntbdClientChannelPool extends FixedChannelPool {
|
|||
this.maxRequestsPerChannel = config.getMaxRequestsPerChannel();
|
||||
this.maxChannels = config.getMaxChannelsPerEndpoint();
|
||||
this.availableChannelCount = new AtomicInteger();
|
||||
this.atCapacity = new ConcurrentHashMap<>();
|
||||
this.closed = new AtomicBoolean();
|
||||
}
|
||||
|
||||
|
@ -97,18 +98,7 @@ public final class RntbdClientChannelPool extends FixedChannelPool {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
if (this.closed.compareAndSet(false, true)) {
|
||||
|
||||
if (!this.atCapacity.isEmpty()) {
|
||||
|
||||
for (Channel channel : this.atCapacity.values()) {
|
||||
super.offerChannel(channel);
|
||||
}
|
||||
|
||||
this.atCapacity.clear();
|
||||
}
|
||||
|
||||
this.availableChannelCount.set(0);
|
||||
super.close();
|
||||
}
|
||||
|
@ -162,40 +152,30 @@ public final class RntbdClientChannelPool extends FixedChannelPool {
|
|||
@Override
|
||||
protected synchronized Channel pollChannel() {
|
||||
|
||||
Channel channel = super.pollChannel();
|
||||
Channel first = super.pollChannel();
|
||||
|
||||
if (channel != null) {
|
||||
this.availableChannelCount.decrementAndGet();
|
||||
return channel;
|
||||
}
|
||||
|
||||
if (this.atCapacity.isEmpty()) {
|
||||
if (first == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
channel = this.atCapacity.reduce(Long.MAX_VALUE, (id, value) -> {
|
||||
|
||||
if (pendingRequestCount(value) < this.maxRequestsPerChannel) {
|
||||
// Channel has drained sufficiently for us to send a new request on it
|
||||
this.availableChannelCount.decrementAndGet();
|
||||
this.atCapacity.remove(id);
|
||||
return value;
|
||||
}
|
||||
|
||||
if (!value.isActive()) {
|
||||
// Channel closed while we were waiting for it to drain and we'll let our super deal with it later
|
||||
super.offerChannel(this.atCapacity.remove(id));
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
}, (other, value) -> pendingRequestCount(other) < pendingRequestCount(value) ? other : value);
|
||||
|
||||
if (channel == null) {
|
||||
logger.warn("\n [{}]\n no channels are available", this);
|
||||
if (this.closed.get()) {
|
||||
return first;
|
||||
}
|
||||
|
||||
return channel;
|
||||
if (isInactiveOrServiceableChannel(first)) {
|
||||
return decrementAvailableChannelCountAndAccept(first);
|
||||
}
|
||||
|
||||
super.offerChannel(first);
|
||||
|
||||
for (Channel next = super.pollChannel(); next != first; super.offerChannel(next), next = super.pollChannel()) {
|
||||
if (isInactiveOrServiceableChannel(next)) {
|
||||
return decrementAvailableChannelCountAndAccept(next);
|
||||
}
|
||||
}
|
||||
|
||||
super.offerChannel(first);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -208,69 +188,56 @@ public final class RntbdClientChannelPool extends FixedChannelPool {
|
|||
*/
|
||||
@Override
|
||||
protected synchronized boolean offerChannel(final Channel channel) {
|
||||
|
||||
final boolean offered;
|
||||
|
||||
if (pendingRequestCount(channel) >= this.maxRequestsPerChannel) {
|
||||
this.atCapacity.put(channel.id(), channel);
|
||||
offered = true;
|
||||
} else {
|
||||
offered = super.offerChannel(channel);
|
||||
}
|
||||
|
||||
if (offered) {
|
||||
if (super.offerChannel(channel)) {
|
||||
this.availableChannelCount.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
return offered;
|
||||
return false;
|
||||
}
|
||||
|
||||
public SocketAddress remoteAddress() {
|
||||
return this.bootstrap().config().remoteAddress();
|
||||
}
|
||||
|
||||
public int saturatedChannelCount() {
|
||||
return this.atCapacity.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RntbdClientChannelPool(" + RntbdObjectMapper.toJson(this) + ")";
|
||||
}
|
||||
|
||||
private int pendingRequestCount(final Channel channel) {
|
||||
// endregion
|
||||
|
||||
if (channel == null) {
|
||||
reportIssue(logger, this, "channel: null");
|
||||
return Integer.MAX_VALUE;
|
||||
// region Privates
|
||||
|
||||
private Channel decrementAvailableChannelCountAndAccept(final Channel first) {
|
||||
this.availableChannelCount.decrementAndGet();
|
||||
return first;
|
||||
}
|
||||
|
||||
private boolean isInactiveOrServiceableChannel(final Channel channel) {
|
||||
|
||||
if (!channel.isActive()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
final ChannelPipeline pipeline = channel.pipeline();
|
||||
|
||||
if (pipeline == null) {
|
||||
reportIssue(logger, this, "{} pipeline: null", channel);
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
final RntbdRequestManager requestManager = pipeline.get(RntbdRequestManager.class);
|
||||
final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class);
|
||||
|
||||
if (requestManager == null) {
|
||||
if (channel.isActive()) {
|
||||
reportIssue(logger, this, "{} active and pipeline.requestManager: null", channel);
|
||||
channel.close();
|
||||
} else {
|
||||
logger.debug("\n [{}]\n {} closed", this, channel);
|
||||
}
|
||||
return Integer.MAX_VALUE;
|
||||
reportIssueUnless(!channel.isActive(), logger, this, "{} active with no request manager", channel);
|
||||
return true; // inactive
|
||||
}
|
||||
|
||||
return requestManager.getPendingRequestCount();
|
||||
final int maxRequestsPerChannel = requestManager.hasRntbdContext() ? this.maxRequestsPerChannel : 1;
|
||||
final int requestCount = requestManager.getPendingRequestCount();
|
||||
|
||||
return requestCount < maxRequestsPerChannel;
|
||||
}
|
||||
|
||||
private void throwIfClosed() {
|
||||
checkState(!this.closed.get(), "%s is closed", this);
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
// region Types
|
||||
|
||||
static final class JsonSerializer extends StdSerializer<RntbdClientChannelPool> {
|
||||
|
@ -293,7 +260,6 @@ public final class RntbdClientChannelPool extends FixedChannelPool {
|
|||
generator.writeBooleanField("isClosed", value.closed.get());
|
||||
generator.writeNumberField("acquiredChannelCount", value.acquiredChannelCount());
|
||||
generator.writeNumberField("availableChannelCount", value.availableChannelCount());
|
||||
generator.writeNumberField("saturatedChannelCount", value.saturatedChannelCount());
|
||||
generator.writeNumberField("pendingAcquisitionCount", value.pendingAcquisitionCount());
|
||||
generator.writeEndObject();
|
||||
generator.writeEndObject();
|
||||
|
|
|
@ -535,7 +535,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
|
|||
logger.debug("\n {} closed: context send cancelled", channel);
|
||||
contextRequestException = error;
|
||||
} catch (final Throwable error) {
|
||||
final String message = Strings.lenientFormat("RNTBD context request write failed with %s pending requests: %s", count, error);
|
||||
final String message = Strings.lenientFormat("RNTBD context request write failed with %s pending requests", count);
|
||||
logger.debug("\n {} closed: {}", channel, message);
|
||||
contextRequestException = new ChannelException(message, error);
|
||||
}
|
||||
|
@ -546,7 +546,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
|
|||
logger.debug("\n {} closed: context receive cancelled", channel);
|
||||
contextRequestException = error;
|
||||
} catch (final Throwable error) {
|
||||
final String message = Strings.lenientFormat("RNTBD context request read failed with %s pending requests: %s", count, error);
|
||||
final String message = Strings.lenientFormat("RNTBD context request read failed with %s pending requests", count);
|
||||
logger.debug("\n {} closed: {}", channel, message);
|
||||
contextRequestException = new ChannelException(message, error);
|
||||
}
|
||||
|
@ -555,17 +555,17 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
|
|||
final String message;
|
||||
|
||||
if (contextRequestException == null) {
|
||||
message = Strings.lenientFormat("%s closed with %s pending requests", channel, count);
|
||||
message = Strings.lenientFormat("%s closed with %s pending requests", context, count);
|
||||
} else {
|
||||
message = Strings.lenientFormat("%s %s", channel, contextRequestException.getMessage());
|
||||
message = Strings.lenientFormat("%s %s", context, contextRequestException.getMessage());
|
||||
}
|
||||
|
||||
final Exception reason;
|
||||
final Exception cause;
|
||||
|
||||
if (throwable == ClosedWithPendingRequestsException.INSTANCE && contextRequestException != null) {
|
||||
reason = contextRequestException;
|
||||
cause = contextRequestException;
|
||||
} else {
|
||||
reason = throwable instanceof Exception ? (Exception)throwable : new ChannelException(throwable);
|
||||
cause = throwable instanceof Exception ? (Exception)throwable : new ChannelException(throwable);
|
||||
}
|
||||
|
||||
this.pendingRequests.forEach(Long.MAX_VALUE, (id, requestRecord) -> {
|
||||
|
@ -574,10 +574,10 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
|
|||
final String requestUri = args.getPhysicalAddress().toString();
|
||||
final Map<String, String> requestHeaders = args.getServiceRequest().getHeaders();
|
||||
|
||||
final GoneException cause = new GoneException(message, reason, (Map<String, String>)null, requestUri);
|
||||
BridgeInternal.setRequestHeaders(cause, requestHeaders);
|
||||
final GoneException goneException = new GoneException(message, cause, (Map<String, String>)null, requestUri);
|
||||
BridgeInternal.setRequestHeaders(goneException, requestHeaders);
|
||||
|
||||
requestRecord.completeExceptionally(cause);
|
||||
requestRecord.completeExceptionally(goneException);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче