Fixes several issues in the reactor related components (#411)

This pull request contains the following changes.

1) Finish pending tasks when recreating the reactor and make sure pending calls scheduled on the old reactor get complete.
2) Fix the session open timeout issue which can result in NPE in proton-J engine.
3) Make session open timeout configurable and use the value of OperationTimeout.
4) Update the message of exceptions and include an entity name in the exception message.
5) API change - use ScheduledExecutorService.
6) Improve tracing.
This commit is contained in:
SJ 2018-12-21 00:30:59 -08:00 коммит произвёл GitHub
Родитель 8f92153e12
Коммит bf8e6c1561
36 изменённых файлов: 557 добавлений и 397 удалений

Просмотреть файл

@ -42,11 +42,11 @@ For a simple event consumer, you'll need to import the *com.microsoft.azure.even
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
So, `EventHubClient` requires an instance of `ScheduledExecutorService`, where all these tasks are run.
```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```
The receiver code creates an *EventHubClient* from a given connecting string

Просмотреть файл

@ -36,11 +36,11 @@ which is quite simple in a Maven build [as we explain in the guide](PublishingEv
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
So, `EventHubClient` requires an instance of `ScheduledExecutorService`, where all these tasks are run.
```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```
Using an Event Hub connection string, which holds all required connection information, including an authorization key or token,

Просмотреть файл

@ -34,7 +34,7 @@ So, `EventHubClient` requires an instance of `Executor`, where all these tasks a
```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```
Using an Event Hub connection string, which holds all required connection information including an authorization key or token

Просмотреть файл

@ -27,7 +27,7 @@ class PartitionManager extends Closable {
private ScheduledFuture<?> scanFuture = null;
PartitionManager(HostContext hostContext) {
super(null);
super(null);
this.hostContext = hostContext;
}
@ -41,17 +41,17 @@ class PartitionManager extends Closable {
// EventHubException or IOException, in addition to whatever failures may occur when the result of
// the CompletableFuture is evaluated.
try {
final CompletableFuture<Void> cleanupFuture = new CompletableFuture<Void>();
final CompletableFuture<Void> cleanupFuture = new CompletableFuture<Void>();
// Stage 0A: get EventHubClient for the event hub
retval = EventHubClient.create(this.hostContext.getEventHubConnectionString(), this.hostContext.getRetryPolicy(), this.hostContext.getExecutor())
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) ->
{
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) ->
{
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 1: use the client to get runtime info for the event hub
.thenComposeAsync((ehClient) -> ehClient.getRuntimeInformation(), this.hostContext.getExecutor())
// Stage 2: extract the partition ids from the runtime info or throw on null (timeout)
@ -71,7 +71,7 @@ class PartitionManager extends Closable {
// Stage 3: RUN REGARDLESS OF EXCEPTIONS -- if there was an error, wrap it in IllegalEntityException and throw
.handleAsync((empty, e) ->
{
cleanupFuture.complete(null); // trigger client cleanup
cleanupFuture.complete(null); // trigger client cleanup
if (e != null) {
Throwable notifyWith = e;
if (e instanceof CompletionException) {
@ -104,8 +104,8 @@ class PartitionManager extends Closable {
}
CompletableFuture<Void> stopPartitions() {
setClosing();
setClosing();
// If the lease scanner is between runs, cancel so it doesn't run again.
synchronized (this.scanFutureSynchronizer) {
if (this.scanFuture != null) {
@ -119,20 +119,20 @@ class PartitionManager extends Closable {
if (this.pumpManager != null) {
TRACE_LOGGER.info(this.hostContext.withHost("Shutting down all pumps"));
stopping = this.pumpManager.removeAllPumps(CloseReason.Shutdown)
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Throwable notifyWith = LoggingUtils.unwrapException(e, null);
TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
if (notifyWith instanceof Exception) {
this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) notifyWith,
EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Throwable notifyWith = LoggingUtils.unwrapException(e, null);
TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
if (notifyWith instanceof Exception) {
this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) notifyWith,
EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);
}
}
}, this.hostContext.getExecutor());
}
}
}, this.hostContext.getExecutor());
}
// else no pumps to shut down
stopping = stopping.whenCompleteAsync((empty, e) -> {
TRACE_LOGGER.info(this.hostContext.withHost("Partition manager exiting"));
setClosed();
@ -287,14 +287,14 @@ class PartitionManager extends Closable {
// Return Void so it can be called from a lambda.
// throwOnFailure is true
private Void scan(boolean isFirst) {
TRACE_LOGGER.info(this.hostContext.withHost("Starting lease scan"));
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();
(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) ->
{
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
onPartitionCheckCompleteTestHook();
// Schedule the next scan unless we are shutting down.
@ -305,11 +305,11 @@ class PartitionManager extends Closable {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());

Просмотреть файл

@ -165,7 +165,7 @@ class PartitionPump extends Closable implements PartitionReceiveHandler {
if (!getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
this.leaseRenewerFuture = this.hostContext.getExecutor().schedule(() -> leaseRenewer(), seconds, TimeUnit.SECONDS);
TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
}
}

Просмотреть файл

@ -7,35 +7,35 @@ package com.microsoft.azure.eventprocessorhost;
import org.junit.Assume;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
final class TestUtilities {
static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
static void skipIfAppveyor() {
String appveyor = System.getenv("APPVEYOR"); // Set to "true" by Appveyor
if (appveyor != null) {
TestBase.logInfo("SKIPPING - APPVEYOR DETECTED");
}
Assume.assumeTrue(appveyor == null);
String appveyor = System.getenv("APPVEYOR"); // Set to "true" by Appveyor
if (appveyor != null) {
TestBase.logInfo("SKIPPING - APPVEYOR DETECTED");
}
Assume.assumeTrue(appveyor == null);
}
static String getStorageConnectionString() {
TestUtilities.skipIfAppveyor();
TestUtilities.skipIfAppveyor();
String retval = System.getenv("EPHTESTSTORAGE");
// if EPHTESTSTORAGE is not set - we cannot run integration tests
if (retval == null) {
TestBase.logInfo("SKIPPING - NO STORAGE CONNECTION STRING");
TestBase.logInfo("SKIPPING - NO STORAGE CONNECTION STRING");
}
Assume.assumeTrue(retval != null);
return ((retval != null) ? retval : "");
}
static Boolean isRunningOnAzure() {
return (System.getenv("EVENT_HUB_CONNECTION_STRING") != null);
return (System.getenv("EVENT_HUB_CONNECTION_STRING") != null);
}
}

Просмотреть файл

@ -9,13 +9,13 @@ import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import org.apache.logging.log4j.core.appender.AbstractManager;
import java.io.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public final class EventHubsManager extends AbstractManager {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
private final String eventHubConnectionString;
private EventHubClient eventHubSender;

Просмотреть файл

@ -4,12 +4,12 @@
*/
package com.microsoft.azure.eventhubs;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* Authorization failed exception is thrown when error is encountered during authorizing user's permission to run the intended operations.
* When encountered this exception user should check whether the token/key provided in the connection string (e.g. one passed to
* {@link EventHubClient#create(String, Executor)}) is valid, and has correct execution right for the intended operations (e.g.
* {@link EventHubClient#create(String, ScheduledExecutorService)}) is valid, and has correct execution right for the intended operations (e.g.
* Receive call will need Listen claim associated with the key/token).
*
* @see <a href="http://go.microsoft.com/fwlink/?LinkId=761101">http://go.microsoft.com/fwlink/?LinkId=761101</a>

Просмотреть файл

@ -12,7 +12,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* The data structure encapsulating the Event being sent-to and received-from EventHubs.
@ -48,7 +48,7 @@ public interface EventData extends Serializable {
*
* @param data the actual payload of data in bytes to be Sent to EventHubs.
* @return EventData the created {@link EventData} to send to EventHubs.
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
static EventData create(final byte[] data) {
return new EventDataImpl(data);
@ -72,7 +72,7 @@ public interface EventData extends Serializable {
* @param offset Offset in the byte[] to read from ; inclusive index
* @param length length of the byte[] to be read, starting from offset
* @return EventData the created {@link EventData} to send to EventHubs.
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
static EventData create(final byte[] data, final int offset, final int length) {
return new EventDataImpl(data, offset, length);
@ -94,7 +94,7 @@ public interface EventData extends Serializable {
*
* @param buffer ByteBuffer which references the payload of the Event to be sent to EventHubs
* @return EventData the created {@link EventData} to send to EventHubs.
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
static EventData create(final ByteBuffer buffer) {
return new EventDataImpl(buffer);

Просмотреть файл

@ -11,41 +11,42 @@ import java.io.IOException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* Anchor class - all EventHub client operations STARTS here.
*
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
public interface EventHubClient {
String DEFAULT_CONSUMER_GROUP_NAME = "$Default";
/**
* Synchronous version of {@link #create(String, Executor)}.
* Synchronous version of {@link #create(String, ScheduledExecutorService)}.
*
* @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString.
* @param executor An {@link Executor} to run all tasks performed by {@link EventHubClient}.
* @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}.
* @return EventHubClient which can be used to create Senders and Receivers to EventHub
* @throws EventHubException If Service Bus service encountered problems during connection creation.
* @throws IOException If the underlying Proton-J layer encounter network errors.
*/
static EventHubClient createSync(final String connectionString, final Executor executor)
static EventHubClient createSync(final String connectionString, final ScheduledExecutorService executor)
throws EventHubException, IOException {
return EventHubClient.createSync(connectionString, null, executor);
}
/**
* Synchronous version of {@link #create(String, Executor)}.
* Synchronous version of {@link #create(String, ScheduledExecutorService)}.
*
* @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString.
* @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub.
* @param executor An {@link Executor} to run all tasks performed by {@link EventHubClient}.
* @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}.
* @return EventHubClient which can be used to create Senders and Receivers to EventHub
* @throws EventHubException If Service Bus service encountered problems during connection creation.
* @throws IOException If the underlying Proton-J layer encounter network errors.
*/
static EventHubClient createSync(final String connectionString, final RetryPolicy retryPolicy, final Executor executor)
static EventHubClient createSync(final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor)
throws EventHubException, IOException {
return ExceptionUtil.syncWithIOException(() -> create(connectionString, retryPolicy, executor).get());
}
@ -56,12 +57,12 @@ public interface EventHubClient {
* <p>The {@link EventHubClient} created from this method creates a Sender instance internally, which is used by the {@link #send(EventData)} methods.
*
* @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString.
* @param executor An {@link Executor} to run all tasks performed by {@link EventHubClient}.
* @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}.
* @return CompletableFuture{@literal <EventHubClient>} which can be used to create Senders and Receivers to EventHub
* @throws EventHubException If Service Bus service encountered problems during connection creation.
* @throws IOException If the underlying Proton-J layer encounter network errors.
*/
static CompletableFuture<EventHubClient> create(final String connectionString, final Executor executor)
static CompletableFuture<EventHubClient> create(final String connectionString, final ScheduledExecutorService executor)
throws EventHubException, IOException {
return EventHubClient.create(connectionString, null, executor);
}
@ -73,13 +74,13 @@ public interface EventHubClient {
*
* @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString.
* @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub.
* @param executor An {@link Executor} to run all tasks performed by {@link EventHubClient}.
* @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}.
* @return CompletableFuture{@literal <EventHubClient>} which can be used to create Senders and Receivers to EventHub
* @throws EventHubException If Service Bus service encountered problems during connection creation.
* @throws IOException If the underlying Proton-J layer encounter network errors.
*/
static CompletableFuture<EventHubClient> create(
final String connectionString, final RetryPolicy retryPolicy, final Executor executor)
final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor)
throws EventHubException, IOException {
return EventHubClientImpl.create(connectionString, retryPolicy, executor);
}

Просмотреть файл

@ -7,14 +7,14 @@ package com.microsoft.azure.eventhubs;
import com.microsoft.azure.eventhubs.impl.ExceptionUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* This sender class is a logical representation of sending events to a specific EventHub partition. Do not use this class
* if you do not care about sending events to specific partitions. Instead, use {@link EventHubClient#send} method.
*
* @see EventHubClient#createPartitionSender(String)
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
public interface PartitionSender {

Просмотреть файл

@ -25,7 +25,7 @@ public class BaseLinkHandler extends BaseHandler {
public void onLinkLocalClose(Event event) {
final Link link = event.getLink();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("linkName[%s]", link.getName()));
TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s]", link.getName()));
}
closeSession(link);
@ -33,18 +33,30 @@ public class BaseLinkHandler extends BaseHandler {
@Override
public void onLinkRemoteClose(Event event) {
final Link link = event.getLink();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s]", link.getName()));
}
handleRemoteLinkClosed(event);
}
@Override
public void onLinkRemoteDetach(Event event) {
final Link link = event.getLink();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s]", link.getName()));
}
handleRemoteLinkClosed(event);
}
public void processOnClose(Link link, ErrorCondition condition) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("linkName[" + link.getName() +
(condition != null ? "], ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]" : "], condition[null]"));
TRACE_LOGGER.info(String.format("processOnClose linkName[%s], errorCondition[%s], errorDescription[%s]",
link.getName(),
condition != null ? condition.getCondition() : "n/a",
condition != null ? condition.getDescription() : "n/a"));
}
this.underlyingEntity.onClose(condition);

Просмотреть файл

@ -31,7 +31,6 @@ public final class ClientConstants {
public final static Duration TOKEN_VALIDITY = Duration.ofMinutes(20);
public final static int DEFAULT_MAX_RETRY_COUNT = 10;
public final static boolean DEFAULT_IS_TRANSIENT = true;
public final static int SESSION_OPEN_TIMEOUT_IN_MS = 15000;
public final static int REACTOR_IO_POLL_TIMEOUT = 20;
public final static int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4;
public final static int MGMT_CHANNEL_MIN_RETRY_IN_MILLIS = 5;

Просмотреть файл

@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* Contract for all client entities with Open-Close/Abort state m/c
@ -21,7 +21,7 @@ import java.util.concurrent.Executor;
abstract class ClientEntity {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ClientEntity.class);
protected final Executor executor;
protected final ScheduledExecutorService executor;
private final String clientId;
private final Object syncClose;
private final ClientEntity parent;
@ -29,7 +29,7 @@ abstract class ClientEntity {
private boolean isClosing;
private boolean isClosed;
protected ClientEntity(final String clientId, final ClientEntity parent, final Executor executor) {
protected ClientEntity(final String clientId, final ClientEntity parent, final ScheduledExecutorService executor) {
this.clientId = clientId;
this.parent = parent;
this.executor = executor;

Просмотреть файл

@ -8,18 +8,14 @@ import com.microsoft.azure.eventhubs.TransportType;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.reactor.Handshaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
// ServiceBus <-> ProtonReactor interaction handles all
@ -50,10 +46,6 @@ public class ConnectionHandler extends BaseHandler {
}
}
protected AmqpConnection getAmqpConnection() {
return this.amqpConnection;
}
private static SslDomain makeDomain(SslDomain.Mode mode) {
final SslDomain domain = Proton.sslDomain();
@ -64,14 +56,21 @@ public class ConnectionHandler extends BaseHandler {
return domain;
}
protected AmqpConnection getAmqpConnection() {
return this.amqpConnection;
}
@Override
public void onConnectionInit(Event event) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s]", this.amqpConnection.getHostName()));
}
final Connection connection = event.getConnection();
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();
connection.setHostname(hostName);
connection.setContainer(StringUtil.getRandomString());
@ -105,6 +104,7 @@ public class ConnectionHandler extends BaseHandler {
/**
* HostName to be used for socket creation.
* for ex: in case of proxy server - this could be proxy ip address
*
* @return host name
*/
public String getRemoteHostName() {
@ -114,6 +114,7 @@ public class ConnectionHandler extends BaseHandler {
/**
* port used to create socket.
* for ex: in case of talking to event hubs service via proxy - use proxy port
*
* @return port
*/
protected int getRemotePort() {
@ -122,6 +123,7 @@ public class ConnectionHandler extends BaseHandler {
/**
* Port used on connection open frame
*
* @return port
*/
protected int getProtocolPort() {
@ -134,6 +136,9 @@ public class ConnectionHandler extends BaseHandler {
@Override
public void onConnectionBound(Event event) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s]", this.amqpConnection.getHostName()));
}
final Transport transport = event.getTransport();
@ -145,8 +150,8 @@ public class ConnectionHandler extends BaseHandler {
final Connection connection = event.getConnection();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
"onConnectionUnbound: hostname[" + connection.getHostname() + "], state[" + connection.getLocalState() + "], remoteState[" + connection.getRemoteState() + "]");
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound: hostname[%s], state[%s], remoteState[%s]",
connection.getHostname(), connection.getLocalState(), connection.getRemoteState()));
}
// if failure happened while establishing transport - nothing to free up.
@ -162,7 +167,9 @@ public class ConnectionHandler extends BaseHandler {
final ErrorCondition condition = transport.getCondition();
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn("onTransportClosed: hostname[" + (connection != null ? connection.getHostname() : "n/a") + "], error[" + (condition != null ? condition.getDescription() : "n/a") + "]");
TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError: hostname[%s], error[%s]",
connection != null ? connection.getHostname() : "n/a",
condition != null ? condition.getDescription() : "n/a"));
}
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
@ -185,7 +192,8 @@ public class ConnectionHandler extends BaseHandler {
final ErrorCondition condition = transport.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("onTransportClosed: hostname[" + (connection != null ? connection.getHostname() : "n/a") + "], error[" + (condition != null ? condition.getDescription() : "n/a") + "]");
TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed: hostname[%s], error[%s]",
connection != null ? connection.getHostname() : "n/a", (condition != null ? condition.getDescription() : "n/a")));
}
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
@ -195,11 +203,25 @@ public class ConnectionHandler extends BaseHandler {
}
}
@Override
public void onConnectionLocalOpen(Event event) {
final Connection connection = event.getConnection();
final ErrorCondition error = connection.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen: hostname[%s], errorCondition[%s], errorDescription[%s]",
connection.getHostname(),
error != null ? error.getCondition() : "n/a",
error != null ? error.getDescription() : "n/a"));
}
}
@Override
public void onConnectionRemoteOpen(Event event) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]");
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen: hostname[%s], remoteContainer[%s]",
event.getConnection().getHostname(), event.getConnection().getRemoteContainer()));
}
this.amqpConnection.onOpenComplete(null);
@ -209,13 +231,13 @@ public class ConnectionHandler extends BaseHandler {
public void onConnectionLocalClose(Event event) {
final Connection connection = event.getConnection();
final ErrorCondition error = connection.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("onConnectionLocalClose: hostname[" + connection.getHostname() +
(error != null
? "], errorCondition[" + error.getCondition() + ", " + error.getDescription() + "]"
: "]"));
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose: hostname[%s], errorCondition[%s], errorDescription[%s]",
connection.getHostname(),
error != null ? error.getCondition() : "n/a",
error != null ? error.getDescription() : "n/a"));
}
if (connection.getRemoteState() == EndpointState.CLOSED) {
@ -234,12 +256,25 @@ public class ConnectionHandler extends BaseHandler {
final ErrorCondition error = connection.getRemoteCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("onConnectionRemoteClose: hostname[" + connection.getHostname() +
(error != null
? "], errorCondition[" + error.getCondition() + ", " + error.getDescription() + "]"
: "]"));
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose: hostname[%s], errorCondition[%s], errorDescription[%s]",
connection.getHostname(),
error != null ? error.getCondition() : "n/a",
error != null ? error.getDescription() : "n/a"));
}
this.amqpConnection.onConnectionError(error);
}
@Override
public void onConnectionFinal(Event event) {
final Connection connection = event.getConnection();
final ErrorCondition error = connection.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal: hostname[%s], errorCondition[%s], errorDescription[%s]",
connection.getHostname(),
error != null ? error.getCondition() : "n/a",
error != null ? error.getDescription() : "n/a"));
}
}
}

Просмотреть файл

@ -4,15 +4,25 @@ import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.impl.IOHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Locale;
public class CustomIOHandler extends IOHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CustomIOHandler.class);
@Override
public void onTransportClosed(Event event) {
final Transport transport = event.getTransport();
final Connection connection = event.getConnection();
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s]",
(connection != null ? connection.getHostname() : "n/a")));
}
if (transport != null && connection != null && connection.getTransport() != null) {
transport.unbind();
}

Просмотреть файл

@ -17,7 +17,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
@ -37,15 +37,15 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
private CompletableFuture<Void> createSender;
private EventHubClientImpl(final ConnectionStringBuilder connectionString, final Executor executor) {
super(StringUtil.getRandomString(), null, executor);
private EventHubClientImpl(final ConnectionStringBuilder connectionString, final ScheduledExecutorService executor) {
super("EventHubClientImpl".concat(StringUtil.getRandomString()), null, executor);
this.eventHubName = connectionString.getEventHubName();
this.senderCreateSync = new Object();
}
public static CompletableFuture<EventHubClient> create(
final String connectionString, final RetryPolicy retryPolicy, final Executor executor)
final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor)
throws EventHubException, IOException {
final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString);
final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr, executor);
@ -220,7 +220,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
if (!this.isSenderCreateStarted) {
synchronized (this.senderCreateSync) {
if (!this.isSenderCreateStarted) {
this.createSender = MessageSender.create(this.underlyingFactory, StringUtil.getRandomString(), this.eventHubName)
this.createSender = MessageSender.create(this.underlyingFactory, this.getClientId().concat("-InternalSender"), this.eventHubName)
.thenAcceptAsync(new Consumer<MessageSender>() {
public void accept(MessageSender a) {
EventHubClientImpl.this.sender = a;
@ -290,7 +290,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
(long) rawData.get(ClientConstants.MANAGEMENT_RESULT_LAST_ENQUEUED_SEQUENCE_NUMBER),
(String) rawData.get(ClientConstants.MANAGEMENT_RESULT_LAST_ENQUEUED_OFFSET),
((Date) rawData.get(ClientConstants.MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC)).toInstant(),
(boolean)rawData.get(ClientConstants.MANAGEMENT_RESULT_PARTITION_IS_EMPTY)));
(boolean) rawData.get(ClientConstants.MANAGEMENT_RESULT_PARTITION_IS_EMPTY)));
return future2;
}
}, this.executor);
@ -349,7 +349,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
public void run() {
final long timeLeft = this.timeoutTracker.remaining().toMillis();
final CompletableFuture<Map<String, Object>> intermediateFuture = this.mf.getManagementChannel()
.request(this.mf.getReactorScheduler(),
.request(this.mf.getReactorDispatcher(),
this.request,
timeLeft > 0 ? timeLeft : 0);

Просмотреть файл

@ -114,7 +114,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs",
"clientId[%s], path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs",
getClientId(),
receivePath,
receiveLink.getName(),
Instant.now(),
@ -142,7 +143,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
public void run() {
try {
underlyingFactory.getCBSChannel().sendToken(
underlyingFactory.getReactorScheduler(),
underlyingFactory.getReactorDispatcher(),
underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new OperationResult<Void, Exception>() {
@ -151,7 +152,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US,
"path[%s], linkName[%s] - token renewed", receivePath, receiveLink.getName()));
"clientId[%s], path[%s], linkName[%s] - token renewed",
getClientId(), receivePath, receiveLink.getName()));
}
}
@ -160,7 +162,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"path[%s], linkName[%s], tokenRenewalFailure[%s]", receivePath, receiveLink.getName(), error.getMessage()));
"clientId[%s], path[%s], linkName[%s], tokenRenewalFailure[%s]",
getClientId(), receivePath, receiveLink.getName(), error.getMessage()));
}
}
});
@ -168,7 +171,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]", receivePath, receiveLink.getName(), exception.getMessage()));
"clientId[%s], path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]",
getClientId(), receivePath, receiveLink.getName(), exception.getMessage()));
}
}
}
@ -247,8 +251,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (maxMessageCount <= 0 || maxMessageCount > this.prefetchCount) {
onReceive.completeExceptionally(new IllegalArgumentException(String.format(
Locale.US,
"maxEventCount(%s) should be a positive number and should be less than prefetchCount(%s)",
maxMessageCount, this.prefetchCount)));
"Entity(%s): maxEventCount(%s) should be a positive number and should be less than prefetchCount(%s)",
this.receivePath, maxMessageCount, this.prefetchCount)));
return onReceive;
}
@ -256,9 +260,10 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs",
receivePath,
receiveLink.getName(),
"clientId[%s], path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs",
this.getClientId(),
this.receivePath,
this.receiveLink.getName(),
Instant.now(),
this.receiveTimeout.getSeconds()));
}
@ -282,17 +287,16 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
this.creatingLink = false;
if (exception == null) {
if (this.getIsClosingOrClosed()) {
this.receiveLink.close();
return;
}
if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
this.linkOpen.getWork().complete(this);
if (this.openTimer != null)
this.openTimer.cancel(false);
}
if (this.getIsClosingOrClosed()) {
return;
}
synchronized (this.errorConditionLock) {
this.lastKnownLinkError = null;
}
@ -303,8 +307,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
this.sendFlow(this.prefetchCount - this.prefetchedMessages.size());
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s]",
this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
TRACE_LOGGER.info(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]",
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
}
} else {
synchronized (this.errorConditionLock) {
@ -333,8 +337,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
} catch (IOException | RejectedExecutionException schedulerException) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
String.format(Locale.US, "receiverPath[%s], scheduling createLink encountered error: %s",
this.receivePath, schedulerException.getLocalizedMessage()));
String.format(Locale.US, "clientId[%s], receiverPath[%s], scheduling createLink encountered error: %s",
this.getClientId(), this.receivePath, schedulerException.getLocalizedMessage()));
}
this.cancelOpen(schedulerException);
@ -388,7 +392,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
}
final Exception completionException = exception == null
? new EventHubException(true, "Client encountered transient error for unknown reasons, please retry the operation.")
? new EventHubException(true, String.format(Locale.US,
"Entity(%s): client encountered transient error for unknown reasons, please retry the operation.", this.receivePath))
: exception;
this.onOpenComplete(completionException);
@ -415,8 +420,9 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
recreateScheduled = false;
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
String.format(Locale.US, "receiverPath[%s], linkName[%s], scheduling createLink encountered error: %s",
MessageReceiver.this.receivePath,
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], scheduling createLink encountered error: %s",
this.getClientId(),
this.receivePath,
this.receiveLink.getName(), ignore.getLocalizedMessage()));
}
}
@ -515,7 +521,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
try {
this.underlyingFactory.getCBSChannel().sendToken(
this.underlyingFactory.getReactorScheduler(),
this.underlyingFactory.getReactorDispatcher(),
this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new OperationResult<Void, Exception>() {
@ -571,8 +577,8 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
this.nextCreditToFlow = 0;
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]",
this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
TRACE_LOGGER.debug(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]",
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
}
}
}
@ -594,10 +600,11 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
String.format(Locale.US, "Open operation on entity(%s) timed out at %s.",
MessageReceiver.this.receivePath, ZonedDateTime.now()),
lastReportedLinkError);
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
String.format(Locale.US, "receiverPath[%s], Open call timedout", MessageReceiver.this.receivePath),
operationTimedout);
String.format(Locale.US, "clientId[%s], receiverPath[%s], Open call timed out",
MessageReceiver.this.getClientId(), MessageReceiver.this.receivePath), operationTimedout);
}
ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, MessageReceiver.this);
@ -630,10 +637,13 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
link = MessageReceiver.this.receiveLink;
}
final Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now()));
final Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Close operation on Receive Link(%s) timed out at %s",
link.getName(), ZonedDateTime.now()));
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Close"),
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], Close call timed out",
MessageReceiver.this.getClientId(), MessageReceiver.this.receivePath, link.getName()),
operationTimedout);
}
@ -708,8 +718,9 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
if (receiveLink != null && receiveLink.getLocalState() != EndpointState.CLOSED) {
receiveLink.close();
} else if (receiveLink == null || receiveLink.getRemoteState() == EndpointState.CLOSED) {
if (closeTimer != null)
if (closeTimer != null && !closeTimer.isCancelled()) {
closeTimer.cancel(false);
}
linkClose.complete(null);
}

Просмотреть файл

@ -40,7 +40,8 @@ import java.util.function.Consumer;
public final class MessageSender extends ClientEntity implements AmqpSender, ErrorContextProvider {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessageSender.class);
private static final String SEND_TIMED_OUT = "Send operation timed out";
// TestHooks for code injection
private static volatile Consumer<MessageSender> onOpenRetry = null;
private final MessagingFactory underlyingFactory;
private final String sendPath;
private final Duration operationTimeout;
@ -54,7 +55,6 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
private final String tokenAudience;
private final Object errorConditionLock;
private final Timer timer;
private volatile int maxMessageSize;
private volatile Sender sendLink;
private volatile CompletableFuture<MessageSender> linkFirstOpen;
@ -62,13 +62,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
private volatile boolean creatingLink;
private volatile CompletableFuture<?> closeTimer;
private volatile CompletableFuture<?> openTimer;
private Exception lastKnownLinkError;
private Instant lastKnownErrorReportedAt;
// TestHooks for code injection
private static volatile Consumer<MessageSender> onOpenRetry = null;
private MessageSender(final MessagingFactory factory, final String sendLinkName, final String senderPath) {
super(sendLinkName, factory, factory.executor);
@ -106,7 +102,7 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
public void run() {
try {
underlyingFactory.getCBSChannel().sendToken(
underlyingFactory.getReactorScheduler(),
underlyingFactory.getReactorDispatcher(),
underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new OperationResult<Void, Exception>() {
@ -114,7 +110,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
public void onComplete(Void result) {
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(String.format(Locale.US,
"path[%s], linkName[%s] - token renewed", sendPath, sendLink.getName()));
"clientId[%s], path[%s], linkName[%s] - token renewed",
getClientId(), sendPath, sendLink.getName()));
}
}
@ -122,14 +119,16 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
public void onError(Exception error) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US,
"path[%s], linkName[%s] - tokenRenewalFailure[%s]", sendPath, sendLink.getName(), error.getMessage()));
"clientId[%s], path[%s], linkName[%s] - tokenRenewalFailure[%s]",
getClientId(), sendPath, sendLink.getName(), error.getMessage()));
}
}
});
} catch (IOException | NoSuchAlgorithmException | InvalidKeyException | RuntimeException exception) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US,
"path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]", sendPath, sendLink.getName(), exception.getMessage()));
"clientId[%s], path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]",
getClientId(), sendPath, sendLink.getName(), exception.getMessage()));
}
}
}
@ -210,8 +209,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
(unUsed, exception) -> {
if (exception != null && !(exception instanceof CancellationException))
onSendFuture.completeExceptionally(
new OperationCancelledException("Send failed while dispatching to Reactor, see cause for more details.",
exception));
new OperationCancelledException(String.format(Locale.US,
"Entity(%s): send failed while dispatching to Reactor, see cause for more details.",
this.sendPath), exception));
return null;
}, this.executor);
@ -230,7 +230,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
} catch (IOException | RejectedExecutionException schedulerException) {
onSendFuture.completeExceptionally(
new OperationCancelledException("Send failed while dispatching to Reactor, see cause for more details.", schedulerException));
new OperationCancelledException(String.format(Locale.US,
"Entity(%s): send failed while dispatching to Reactor, see cause for more details.",
this.sendPath), schedulerException));
}
return onSendFuture;
@ -247,7 +249,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
public CompletableFuture<Void> send(final Iterable<Message> messages) {
if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
throw new IllegalArgumentException(String.format(Locale.US,
"Entity[%s}: sending Empty batch of messages is not allowed.", this.sendPath));
}
final Message firstMessage = messages.iterator().next();
@ -280,7 +283,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
encodedSize = messageWrappedByData.encode(bytes, byteArrayOffset, maxMessageSizeTemp - byteArrayOffset - 1);
} catch (BufferOverflowException exception) {
final CompletableFuture<Void> sendTask = new CompletableFuture<>();
sendTask.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", maxMessageSizeTemp / 1024), exception));
sendTask.completeExceptionally(new PayloadSizeExceededException(String.format(Locale.US,
"Entity(%s): size of the payload exceeded Maximum message size: %s kb",
this.sendPath, maxMessageSizeTemp / 1024), exception));
return sendTask;
}
@ -302,7 +307,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
encodedSize = msg.encode(bytes, 0, allocationSize);
} catch (BufferOverflowException exception) {
final CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
sendTask.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", maxMessageSizeTemp / 1024), exception));
sendTask.completeExceptionally(new PayloadSizeExceededException(String.format(Locale.US,
"Entity(%s): size of the payload exceeded Maximum message size: %s kb",
this.sendPath, maxMessageSizeTemp / 1024), exception));
return sendTask;
}
@ -380,14 +387,14 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
} catch (IOException | RejectedExecutionException schedulerException) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
String.format(Locale.US, "senderPath[%s], scheduling createLink encountered error: %s",
this.sendPath, schedulerException.getLocalizedMessage()));
String.format(Locale.US, "clientId[%s], senderPath[%s], scheduling createLink encountered error: %s",
this.getClientId(), this.sendPath, schedulerException.getLocalizedMessage()));
}
this.cancelOpen(schedulerException);
}
} else if (completionException instanceof EventHubException
&& !((EventHubException) completionException).getIsTransient()){
&& !((EventHubException) completionException).getIsTransient()) {
this.cancelOpen(completionException);
}
}
@ -419,7 +426,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
for (Map.Entry<String, ReplayableWorkItem<Void>> pendingSend : this.pendingSendsData.entrySet()) {
ExceptionUtil.completeExceptionally(pendingSend.getValue().getWork(),
completionException == null
? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.")
? new OperationCancelledException(String.format(Locale.US,
"Entity(%s): send cancelled as the Sender instance is Closed before the sendOperation completed.",
this.sendPath))
: completionException,
this);
}
@ -438,7 +447,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
}
final Exception finalCompletionException = completionException == null
? new EventHubException(true, "Client encountered transient error for unknown reasons, please retry the operation.") : completionException;
? new EventHubException(true, String.format(Locale.US,
"Entity(%s): client encountered transient error for unknown reasons, please retry the operation.",
this.sendPath)) : completionException;
this.onOpenComplete(finalCompletionException);
@ -489,7 +500,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
TRACE_LOGGER.trace(
String.format(
Locale.US,
"path[%s], linkName[%s], deliveryTag[%s]", MessageSender.this.sendPath, this.sendLink.getName(), deliveryTag));
"clientId[%s], path[%s], linkName[%s], deliveryTag[%s]",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
final ReplayableWorkItem<Void> pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag);
@ -542,7 +554,10 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
exception.initCause(schedulerException);
this.cleanupFailedSend(
pendingSendWorkItem,
new EventHubException(false, "Send operation failed while scheduling a retry on Reactor, see cause for more details.", schedulerException));
new EventHubException(false, String.format(Locale.US,
"Entity(%s): send operation failed while scheduling a retry on Reactor, see cause for more details.",
this.sendPath),
schedulerException));
}
}
} else if (outcome instanceof Released) {
@ -553,7 +568,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
} else {
if (TRACE_LOGGER.isDebugEnabled())
TRACE_LOGGER.debug(
String.format(Locale.US, "path[%s], linkName[%s], delivery[%s] - mismatch (or send timedout)", this.sendPath, this.sendLink.getName(), deliveryTag));
String.format(Locale.US, "clientId[%s]. path[%s], linkName[%s], delivery[%s] - mismatch (or send timed out)",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
}
}
@ -613,7 +629,7 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
try {
this.underlyingFactory.getCBSChannel().sendToken(
this.underlyingFactory.getReactorScheduler(),
this.underlyingFactory.getReactorDispatcher(),
this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new OperationResult<Void, Exception>() {
@ -670,7 +686,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
String.format(Locale.US, "path[%s], open call timedout", MessageSender.this.sendPath),
String.format(Locale.US, "clientId[%s], path[%s], open call timed out",
MessageSender.this.getClientId(), MessageSender.this.sendPath),
operationTimedout);
}
@ -724,8 +741,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
if (TRACE_LOGGER.isDebugEnabled()) {
int numberOfSendsWaitingforCredit = this.pendingSends.size();
TRACE_LOGGER.debug(String.format(Locale.US, "path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]",
this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
TRACE_LOGGER.debug(String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]",
this.getClientId(), this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
}
this.sendWork.onEvent();
@ -791,8 +809,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
} else {
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US, "path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed",
this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
}
if (delivery != null) {
@ -800,16 +818,17 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
}
sendData.getWork().completeExceptionally(sendException != null
? new OperationCancelledException("Send operation failed. Please see cause for more details", sendException)
? new OperationCancelledException(String.format(Locale.US,
"Entity(%s): send operation failed. Please see cause for more details", this.sendPath), sendException)
: new OperationCancelledException(
String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag)));
String.format(Locale.US, "Entity(%s): send operation failed while advancing delivery(tag: %s).", this.sendPath, deliveryTag)));
}
} else {
if (deliveryTag != null) {
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US, "path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.",
this.sendPath, this.sendLink.getName(), deliveryTag));
String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
}
}
@ -840,7 +859,8 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
final boolean isClientSideTimeout = (cause == null || !(cause instanceof EventHubException));
final EventHubException exception = isClientSideTimeout
? new TimeoutException(String.format(Locale.US, "%s %s %s.", MessageSender.SEND_TIMED_OUT, " at ", ZonedDateTime.now()), cause)
? new TimeoutException(String.format(Locale.US, "Entity(%s): %s at %s.",
this.sendPath, MessageSender.SEND_TIMED_OUT, ZonedDateTime.now()), cause)
: (EventHubException) cause;
ExceptionUtil.completeExceptionally(pendingSendWork, exception, this);
@ -858,10 +878,11 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
}
final Exception operationTimedout = new TimeoutException(String.format(Locale.US,
"%s operation on Sender Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now()));
"Entity(%s): close operation timed out at %s", MessageSender.this.sendPath, ZonedDateTime.now()));
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US, "message sender(linkName: %s, path: %s) %s call timedout", link.getName(), MessageSender.this.sendPath, "Close"),
String.format(Locale.US, "clientId[%s], message sender(linkName: %s, path: %s) close call timed out",
MessageSender.this.getClientId(), link.getName(), MessageSender.this.sendPath),
operationTimedout);
}
@ -894,8 +915,9 @@ public final class MessageSender extends ClientEntity implements AmqpSender, Err
if (sendLink != null && sendLink.getLocalState() != EndpointState.CLOSED) {
sendLink.close();
} else if (sendLink == null || sendLink.getRemoteState() == EndpointState.CLOSED) {
if (closeTimer != null)
if (closeTimer != null && !closeTimer.isCancelled()) {
closeTimer.cancel(false);
}
linkClose.complete(null);
}

Просмотреть файл

@ -5,16 +5,11 @@
package com.microsoft.azure.eventhubs.impl;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.eventhubs.TimeoutException;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.engine.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,20 +19,10 @@ import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.CommunicationException;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.TimeoutException;
/**
* Abstracts all amqp related details and exposes AmqpConnection object
* Manages connection life-cycle
@ -57,11 +42,10 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
private final ReactorFactory reactorFactory;
private Reactor reactor;
private ReactorDispatcher reactorScheduler;
private ReactorDispatcher reactorDispatcher;
private Connection connection;
private CBSChannel cbsChannel;
private ManagementChannel mgmtChannel;
private Duration operationTimeout;
private RetryPolicy retryPolicy;
private CompletableFuture<MessagingFactory> open;
@ -70,13 +54,12 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
MessagingFactory(final ConnectionStringBuilder builder,
final RetryPolicy retryPolicy,
final Executor executor,
final ScheduledExecutorService executor,
final ReactorFactory reactorFactory) {
super("MessagingFactory".concat(StringUtil.getRandomString()), null, executor);
this.hostName = builder.getEndpoint().getHost();
this.reactorFactory = reactorFactory;
this.operationTimeout = builder.getOperationTimeout();
this.retryPolicy = retryPolicy;
this.registeredLinks = new LinkedList<>();
@ -91,21 +74,21 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
this.closeTask = new CompletableFuture<>();
}
public static CompletableFuture<MessagingFactory> createFromConnectionString(final String connectionString, final Executor executor) throws IOException {
public static CompletableFuture<MessagingFactory> createFromConnectionString(final String connectionString, final ScheduledExecutorService executor) throws IOException {
return createFromConnectionString(connectionString, RetryPolicy.getDefault(), executor);
}
public static CompletableFuture<MessagingFactory> createFromConnectionString(
final String connectionString,
final RetryPolicy retryPolicy,
final Executor executor) throws IOException {
final ScheduledExecutorService executor) throws IOException {
return createFromConnectionString(connectionString, retryPolicy, executor, new ReactorFactory());
}
public static CompletableFuture<MessagingFactory> createFromConnectionString(
final String connectionString,
final RetryPolicy retryPolicy,
final Executor executor,
final ScheduledExecutorService executor,
final ReactorFactory reactorFactory) throws IOException {
final ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
final MessagingFactory messagingFactory = new MessagingFactory(builder,
@ -153,9 +136,9 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
}
}
public ReactorDispatcher getReactorScheduler() {
public ReactorDispatcher getReactorDispatcher() {
synchronized (this.reactorLock) {
return this.reactorScheduler;
return this.reactorDispatcher;
}
}
@ -165,26 +148,15 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
private void createConnection() throws IOException {
this.open = new CompletableFuture<>();
this.startReactor(new ReactorHandler() {
@Override
public void onReactorInit(Event e) {
super.onReactorInit(e);
final Reactor r = e.getReactor();
connection = r.connectionToHost(
connectionHandler.getRemoteHostName(),
connectionHandler.getRemotePort(),
connectionHandler);
}
});
this.startReactor(new ReactorHandlerWithConnection());
}
private void startReactor(final ReactorHandler reactorHandler) throws IOException {
final Reactor newReactor = this.reactorFactory.create(reactorHandler, this.connectionHandler.getMaxFrameSize());
synchronized (this.reactorLock) {
this.reactor = newReactor;
this.reactorScheduler = new ReactorDispatcher(newReactor);
reactorHandler.unsafeSetReactorDispatcher(this.reactorScheduler);
this.reactorDispatcher = new ReactorDispatcher(newReactor);
reactorHandler.unsafeSetReactorDispatcher(this.reactorDispatcher);
}
executor.execute(new RunReactor(newReactor, executor));
@ -226,7 +198,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
}
final Session session = this.connection.session();
BaseHandler.setHandler(session, new SessionHandler(path, onRemoteSessionOpen, onRemoteSessionOpenError));
BaseHandler.setHandler(session, new SessionHandler(path, onRemoteSessionOpen, onRemoteSessionOpenError, this.operationTimeout));
session.open();
return session;
@ -258,16 +230,35 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
@Override
public void onConnectionError(ErrorCondition error) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], error[%s]",
this.getClientId(),
this.hostName,
error != null ? error.getDescription() : "n/a"));
}
if (!this.open.isDone()) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], open hasn't complete, stopping the reactor",
this.getClientId(),
this.hostName));
}
this.getReactor().stop();
this.onOpenComplete(ExceptionUtil.toException(error));
} else {
final Connection currentConnection = this.connection;
final List<Link> registeredLinksCopy = new LinkedList<>(this.registeredLinks);
final Connection oldConnection = this.connection;
final List<Link> oldRegisteredLinksCopy = new LinkedList<>(this.registeredLinks);
final List<Link> closedLinks = new LinkedList<>();
for (Link link : registeredLinksCopy) {
for (Link link : oldRegisteredLinksCopy) {
if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], closing link [%s]",
this.getClientId(),
this.hostName, link.getName()));
}
link.close();
closedLinks.add(link);
}
@ -275,11 +266,18 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
// if proton-j detects transport error - onConnectionError is invoked, but, the connection state is not set to closed
// in connection recreation we depend on currentConnection state to evaluate need for recreation
if (currentConnection.getLocalState() != EndpointState.CLOSED) {
if (oldConnection.getLocalState() != EndpointState.CLOSED) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], closing current connection",
this.getClientId(),
this.hostName));
}
// this should ideally be done in Connectionhandler
// - but, since proton doesn't automatically emit close events
// for all child objects (links & sessions) we are doing it here
currentConnection.close();
oldConnection.setCondition(error);
oldConnection.close();
}
for (Link link : closedLinks) {
@ -300,19 +298,29 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
if (!this.open.isDone()) {
this.onOpenComplete(cause);
} else {
final Connection currentConnection = this.connection;
if (this.getIsClosingOrClosed()) {
return;
}
TRACE_LOGGER.warn(String.format(Locale.US, "onReactorError messagingFactory[%s], hostName[%s], error[%s]",
this.getClientId(), this.getHostName(),
cause.getMessage()));
final Connection oldConnection = this.connection;
final List<Link> oldRegisteredLinksCopy = new LinkedList<>(this.registeredLinks);
try {
if (this.getIsClosingOrClosed()) {
return;
} else {
this.startReactor(new ReactorHandler());
}
TRACE_LOGGER.info(String.format(Locale.US, "onReactorError messagingFactory[%s], hostName[%s], message[%s]",
this.getClientId(), this.getHostName(),
"starting new reactor"));
this.startReactor(new ReactorHandlerWithConnection());
} catch (IOException e) {
TRACE_LOGGER.error(String.format(Locale.US, "messagingFactory[%s], hostName[%s], error[%s]",
this.getClientId(), this.getHostName(),
ExceptionUtil.toStackTraceString(e, "Re-starting reactor failed with error")));
// TODO - stop retrying on the error after multiple attempts.
this.onReactorError(cause);
}
@ -320,12 +328,11 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
// below .close() calls (local closes).
// But, we still need to change the states of these to Closed - so that subsequent retries - will
// treat the links and connection as closed and re-establish them and continue running on new Reactor instance.
if (currentConnection.getLocalState() != EndpointState.CLOSED && currentConnection.getRemoteState() != EndpointState.CLOSED) {
currentConnection.close();
if (oldConnection.getLocalState() != EndpointState.CLOSED && oldConnection.getRemoteState() != EndpointState.CLOSED) {
oldConnection.close();
}
final List<Link> registeredLinksCopy = new LinkedList<>(this.registeredLinks);
for (final Link link : registeredLinksCopy) {
for (final Link link : oldRegisteredLinksCopy) {
if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) {
link.close();
}
@ -379,11 +386,11 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
}
public void scheduleOnReactorThread(final DispatchHandler handler) throws IOException, RejectedExecutionException {
this.getReactorScheduler().invoke(handler);
this.getReactorDispatcher().invoke(handler);
}
public void scheduleOnReactorThread(final int delay, final DispatchHandler handler) throws IOException, RejectedExecutionException {
this.getReactorScheduler().invoke(delay, handler);
this.getReactorDispatcher().invoke(delay, handler);
}
public static class ReactorFactory {
@ -396,15 +403,12 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
private class CloseWork extends DispatchHandler {
@Override
public void onEvent() {
final ReactorDispatcher dispatcher = getReactorScheduler();
final ReactorDispatcher dispatcher = getReactorDispatcher();
synchronized (cbsChannelCreateLock) {
if (cbsChannel != null) {
cbsChannel.close(
dispatcher,
new OperationResult<Void, Exception>() {
@Override
public void onComplete(Void result) {
if (TRACE_LOGGER.isInfoEnabled()) {
@ -416,7 +420,6 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
@Override
public void onError(Exception error) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US,
"messagingFactory[%s], hostName[%s], cbsChannelCloseError[%s]",
@ -428,12 +431,10 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
}
synchronized (mgmtChannelCreateLock) {
if (mgmtChannel != null) {
mgmtChannel.close(
dispatcher,
new OperationResult<Void, Exception>() {
@Override
public void onComplete(Void result) {
if (TRACE_LOGGER.isInfoEnabled()) {
@ -456,33 +457,34 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
}
}
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED && connection.getLocalState() != EndpointState.CLOSED)
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED && connection.getLocalState() != EndpointState.CLOSED) {
connection.close();
}
}
}
private class RunReactor implements Runnable {
final private Reactor rctr;
final private Executor executor;
final private ScheduledExecutorService executor;
volatile boolean hasStarted;
public RunReactor(final Reactor reactor, final Executor executor) {
public RunReactor(final Reactor reactor, final ScheduledExecutorService executor) {
this.rctr = reactor;
this.executor = executor;
this.hasStarted = false;
}
public void run() {
if (TRACE_LOGGER.isInfoEnabled() && !this.hasStarted) {
TRACE_LOGGER.info(String.format(Locale.US, "messagingFactory[%s], hostName[%s], info[%s]",
getClientId(), getHostName(), "starting reactor instance."));
}
boolean reScheduledReactor = false;
try {
if (!this.hasStarted) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "messagingFactory[%s], hostName[%s], info[%s]",
getClientId(), getHostName(), "starting reactor instance."));
}
this.rctr.start();
this.hasStarted = true;
}
@ -495,7 +497,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "messagingFactory[%s], hostName[%s], error[%s]",
getClientId(), getHostName(),
ExceptionUtil.toStackTraceString(exception, "scheduling reactor failed")));
ExceptionUtil.toStackTraceString(exception, "scheduling reactor failed because the executor has been shut down")));
}
this.rctr.attachments().set(RejectedExecutionException.class, RejectedExecutionException.class, exception);
@ -504,6 +506,12 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
return;
}
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "messagingFactory[%s], hostName[%s], message[%s]",
getClientId(), getHostName(),
"stopping the reactor because thread was interrupted or the reactor has no more events to process."));
}
this.rctr.stop();
} catch (HandlerException handlerException) {
Throwable cause = handlerException.getCause();
@ -543,15 +551,55 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
return;
}
this.rctr.free();
if (getIsClosingOrClosed() && !closeTask.isDone()) {
this.rctr.free();
closeTask.complete(null);
if (closeTimer != null)
if (closeTimer != null) {
closeTimer.cancel(false);
}
} else {
scheduleCompletePendingTasks();
}
}
}
private void scheduleCompletePendingTasks() {
this.executor.schedule(new Runnable() {
@Override
public void run() {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "messagingFactory[%s], hostName[%s], message[%s]",
getClientId(), getHostName(),
"Processing all pending tasks and closing old reactor."));
}
try {
rctr.stop();
rctr.process();
} catch (HandlerException e) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "messagingFactory[%s], hostName[%s], error[%s]",
getClientId(), getHostName(), ExceptionUtil.toStackTraceString(e,
"scheduleCompletePendingTasks - exception occurred while processing events.")));
}
} finally {
rctr.free();
}
}
}, MessagingFactory.this.getOperationTimeout().getSeconds(), TimeUnit.SECONDS);
}
}
private class ReactorHandlerWithConnection extends ReactorHandler {
@Override
public void onReactorInit(Event e) {
super.onReactorInit(e);
final Reactor r = e.getReactor();
connection = r.connectionToHost(
connectionHandler.getRemoteHostName(),
connectionHandler.getRemotePort(),
connectionHandler);
}
}
}

Просмотреть файл

@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
@ -46,8 +46,8 @@ final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettin
final Long epoch,
final boolean isEpochReceiver,
final ReceiverOptions receiverOptions,
final Executor executor) {
super(null, null, executor);
final ScheduledExecutorService executor) {
super("PartitionReceiverImpl".concat(StringUtil.getRandomString()), null, executor);
this.underlyingFactory = factory;
this.eventHubName = eventHubName;
@ -72,8 +72,7 @@ final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettin
final long epoch,
final boolean isEpochReceiver,
ReceiverOptions receiverOptions,
final Executor executor)
throws EventHubException {
final ScheduledExecutorService executor) {
if (epoch < NULL_EPOCH) {
throw new IllegalArgumentException("epoch cannot be a negative value. Please specify a zero or positive long value.");
}
@ -96,7 +95,7 @@ final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettin
private CompletableFuture<Void> createInternalReceiver() {
return MessageReceiver.create(this.underlyingFactory,
StringUtil.getRandomString(),
this.getClientId().concat("-InternalReceiver"),
String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId),
this.receiverOptions.getPrefetchCount(), this)
.thenAcceptAsync(new Consumer<MessageReceiver>() {
@ -179,6 +178,8 @@ final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettin
"Unexpected value for parameter 'receiveHandler'. PartitionReceiver was already registered with a PartitionReceiveHandler instance. Only 1 instance can be registered.");
this.receivePump = new ReceivePump(
this.eventHubName,
this.consumerGroupName,
new ReceivePump.IPartitionReceiver() {
@Override
public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {

Просмотреть файл

@ -7,7 +7,7 @@ package com.microsoft.azure.eventhubs.impl;
import com.microsoft.azure.eventhubs.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
@ -18,8 +18,8 @@ final class PartitionSenderImpl extends ClientEntity implements PartitionSender
private volatile MessageSender internalSender;
private PartitionSenderImpl(final MessagingFactory factory, final String eventHubName, final String partitionId, final Executor executor) {
super(null, null, executor);
private PartitionSenderImpl(final MessagingFactory factory, final String eventHubName, final String partitionId, final ScheduledExecutorService executor) {
super("PartitionSenderImpl".concat(StringUtil.getRandomString()), null, executor);
this.partitionId = partitionId;
this.eventHubName = eventHubName;
@ -29,7 +29,7 @@ final class PartitionSenderImpl extends ClientEntity implements PartitionSender
static CompletableFuture<PartitionSender> Create(final MessagingFactory factory,
final String eventHubName,
final String partitionId,
final Executor executor) throws EventHubException {
final ScheduledExecutorService executor) throws EventHubException {
final PartitionSenderImpl sender = new PartitionSenderImpl(factory, eventHubName, partitionId, executor);
return sender.createInternalSender()
.thenApplyAsync(new Function<Void, PartitionSender>() {
@ -40,7 +40,7 @@ final class PartitionSenderImpl extends ClientEntity implements PartitionSender
}
private CompletableFuture<Void> createInternalSender() throws EventHubException {
return MessageSender.create(this.factory, StringUtil.getRandomString(),
return MessageSender.create(this.factory, this.getClientId().concat("-InternalSender"),
String.format("%s/Partitions/%s", this.eventHubName, this.partitionId))
.thenAcceptAsync(new Consumer<MessageSender>() {
public void accept(MessageSender a) {

Просмотреть файл

@ -80,7 +80,7 @@ public final class ReactorDispatcher {
// throw when the pipe is in closed state - in which case,
// signalling the new event-dispatch will fail
if (!this.ioSignal.source().isOpen() || !this.ioSignal.sink().isOpen()) {
if (!this.ioSignal.sink().isOpen()) {
throw new RejectedExecutionException("ReactorDispatcher instance is closed.");
}
}
@ -121,7 +121,7 @@ public final class ReactorDispatcher {
} catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
TRACE_LOGGER.info("ScheduleHandler.run() failed with an error", ignorePipeClosedDuringReactorShutdown);
} catch (IOException ioException) {
TRACE_LOGGER.info("ScheduleHandler.run() failed with an error", ioException);
TRACE_LOGGER.warn("ScheduleHandler.run() failed with an error", ioException);
throw new RuntimeException(ioException);
}
@ -135,17 +135,11 @@ public final class ReactorDispatcher {
private final class CloseHandler implements Callback {
@Override
public void run(Selectable selectable) {
try {
selectable.getChannel().close();
} catch (IOException ioException) {
TRACE_LOGGER.info("CloseHandler.run() failed with an error", ioException);
}
try {
if (ioSignal.sink().isOpen())
ioSignal.sink().close();
} catch (IOException ioException) {
TRACE_LOGGER.info("CloseHandler.run() failed with an error", ioException);
TRACE_LOGGER.info("CloseHandler.run() sink().close() failed with an error", ioException);
}
workScheduler.run(null);
@ -154,7 +148,7 @@ public final class ReactorDispatcher {
if (ioSignal.source().isOpen())
ioSignal.source().close();
} catch (IOException ioException) {
TRACE_LOGGER.info("CloseHandler.run() failed with an error", ioException);
TRACE_LOGGER.info("CloseHandler.run() source().close() failed with an error", ioException);
}
}
}

Просмотреть файл

@ -37,7 +37,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format("linkName[%s], localSource[%s]", receiver.getName(), receiver.getSource()));
String.format("onLinkLocalOpen linkName[%s], localSource[%s]", receiver.getName(), receiver.getSource()));
}
}
}
@ -45,11 +45,12 @@ public final class ReceiveLinkHandler extends BaseLinkHandler {
@Override
public void onLinkRemoteOpen(Event event) {
Link link = event.getLink();
if (link != null && link instanceof Receiver) {
if (link instanceof Receiver) {
Receiver receiver = (Receiver) link;
if (link.getRemoteSource() != null) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "linkName[%s], remoteSource[%s]", receiver.getName(), link.getRemoteSource()));
TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteSource[%s]",
receiver.getName(), link.getRemoteSource()));
}
synchronized (this.firstResponse) {
@ -59,7 +60,8 @@ public final class ReceiveLinkHandler extends BaseLinkHandler {
} else {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US, "linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", receiver.getName()));
String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], " +
"remoteSource[null], action[waitingForError]", receiver.getName()));
}
}
}
@ -89,8 +91,9 @@ public final class ReceiveLinkHandler extends BaseLinkHandler {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
receiveLink != null
? String.format(Locale.US, "linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], remoteCondition[%s], delivery.isSettled[%s]",
receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled())
? String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " +
"remoteCondition[%s], delivery.isSettled[%s]",
receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled())
: String.format(Locale.US, "delivery.isSettled[%s]", delivery.isSettled()));
}
} else {
@ -100,7 +103,8 @@ public final class ReceiveLinkHandler extends BaseLinkHandler {
if (TRACE_LOGGER.isTraceEnabled() && receiveLink != null) {
TRACE_LOGGER.trace(
String.format(Locale.US, "linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], remoteCondition[%s], delivery.isPartial[%s]",
String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " +
"remoteCondition[%s], delivery.isPartial[%s]",
receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isPartial()));
}
}

Просмотреть файл

@ -6,7 +6,6 @@ package com.microsoft.azure.eventhubs.impl;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,15 +24,21 @@ public class ReceivePump implements Runnable {
private final CompletableFuture<Void> stopPump;
private final Executor executor;
private final ProcessAndReschedule processAndReschedule;
private final String eventHubName;
private final String consumerGroupName;
private AtomicBoolean stopPumpRaised;
private volatile boolean isPumpHealthy = true;
public ReceivePump(
final String eventHubName,
final String consumerGroupName,
final IPartitionReceiver receiver,
final PartitionReceiveHandler receiveHandler,
final boolean invokeOnReceiveWithNoEvents,
final Executor executor) {
this.eventHubName = eventHubName;
this.consumerGroupName = consumerGroupName;
this.receiver = receiver;
this.onReceiveHandler = receiveHandler;
this.invokeOnTimeout = invokeOnReceiveWithNoEvents;
@ -51,8 +56,9 @@ public class ReceivePump implements Runnable {
} catch (final Exception exception) {
if (TRACE_LOGGER.isErrorEnabled()) {
TRACE_LOGGER.error(
String.format("Receive pump for partition (%s) encountered unrecoverable error and exited with exception %s.",
ReceivePump.this.receiver.getPartitionId(), exception.toString()));
String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " +
"encountered unrecoverable error and exited with exception %s.",
this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), exception.toString()));
}
throw exception;
@ -63,11 +69,11 @@ public class ReceivePump implements Runnable {
public void receiveAndProcess() {
if (this.shouldContinue()) {
this.receiver.receive(this.onReceiveHandler.getMaxEventCount())
.handleAsync(this.processAndReschedule, this.executor);
.handleAsync(this.processAndReschedule, this.executor);
} else {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("Stopping receive pump for partition (%s) as %s",
ReceivePump.this.receiver.getPartitionId(),
TRACE_LOGGER.info(String.format("Stopping receive pump for eventHub (%s), consumerGroup (%s), partition (%s) as %s",
this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(),
this.stopPumpRaised.get() ? "per the request." : "pump ran into errors."));
}
@ -84,13 +90,6 @@ public class ReceivePump implements Runnable {
return !this.stopPump.isDone();
}
// partition receiver contract against which this pump works
public interface IPartitionReceiver {
String getPartitionId();
CompletableFuture<Iterable<EventData>> receive(final int maxBatchSize);
}
private boolean shouldContinue() {
return this.isPumpHealthy && !this.stopPumpRaised.get();
}
@ -101,8 +100,8 @@ public class ReceivePump implements Runnable {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(
"Receive pump for partition (%s) exiting after receive exception %s",
this.receiver.getPartitionId(), clientException.toString()));
"Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) exiting after receive exception %s",
this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), clientException.toString()));
}
this.onReceiveHandler.onError(clientException);
@ -113,16 +112,17 @@ public class ReceivePump implements Runnable {
this.isPumpHealthy = false;
if (TRACE_LOGGER.isErrorEnabled()) {
TRACE_LOGGER.error(
String.format("Receive pump for partition (%s) exiting after user-code exception %s",
this.receiver.getPartitionId(), userCodeException.toString()));
String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " +
"exiting after user-code exception %s",
this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), userCodeException.toString()));
}
this.onReceiveHandler.onError(userCodeException);
if (userCodeException instanceof InterruptedException) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("Interrupting receive pump for partition (%s)",
this.receiver.getPartitionId()));
TRACE_LOGGER.info(String.format("Interrupting receive pump for eventHub (%s), consumerGroup (%s), partition (%s)",
this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId()));
}
Thread.currentThread().interrupt();
@ -137,14 +137,21 @@ public class ReceivePump implements Runnable {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(
"Receive pump for partition (%s) exiting with error: %s",
ReceivePump.this.receiver.getPartitionId(), rejectedException.toString()));
"Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) exiting with error: %s",
this.eventHubName, this.consumerGroupName, ReceivePump.this.receiver.getPartitionId(), rejectedException.toString()));
}
this.onReceiveHandler.onError(rejectedException);
}
}
// partition receiver contract against which this pump works
public interface IPartitionReceiver {
String getPartitionId();
CompletableFuture<Iterable<EventData>> receive(final int maxBatchSize);
}
private final class ProcessAndReschedule implements BiFunction<Iterable<EventData>, Throwable, Void> {
@Override
@ -156,7 +163,7 @@ public class ReceivePump implements Runnable {
// don't invoke user call back - if stop is already raised / pump is unhealthy
if (ReceivePump.this.shouldContinue() &&
(receivedEvents != null
|| (receivedEvents == null && ReceivePump.this.invokeOnTimeout))) {
|| (receivedEvents == null && ReceivePump.this.invokeOnTimeout))) {
ReceivePump.this.onReceiveHandler.onReceive(receivedEvents);
}
} catch (final Throwable userCodeError) {

Просмотреть файл

@ -2,5 +2,5 @@ package com.microsoft.azure.eventhubs.impl;
interface SchedulerProvider {
ReactorDispatcher getReactorScheduler();
ReactorDispatcher getReactorDispatcher();
}

Просмотреть файл

@ -27,14 +27,25 @@ public class SendLinkHandler extends BaseLinkHandler {
this.isFirstFlow = true;
}
@Override
public void onLinkLocalOpen(Event event) {
Link link = event.getLink();
if (link instanceof Sender) {
Sender sender = (Sender) link;
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onLinkLocalOpen linkName[%s], localTarget[%s]", sender.getName(), sender.getTarget()));
}
}
}
@Override
public void onLinkRemoteOpen(Event event) {
Link link = event.getLink();
if (link != null && link instanceof Sender) {
if (link instanceof Sender) {
Sender sender = (Sender) link;
if (link.getRemoteTarget() != null) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "linkName[%s], remoteTarget[%s]", sender.getName(), link.getRemoteTarget()));
TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[%s]", sender.getName(), link.getRemoteTarget()));
}
synchronized (this.firstFlow) {
@ -44,7 +55,7 @@ public class SendLinkHandler extends BaseLinkHandler {
} else {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US, "linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", sender.getName()));
String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", sender.getName()));
}
}
}
@ -59,7 +70,7 @@ public class SendLinkHandler extends BaseLinkHandler {
if (TRACE_LOGGER.isTraceEnabled()) {
TRACE_LOGGER.trace(
"linkName[" + sender.getName() +
"onDelivery linkName[" + sender.getName() +
"], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getRemoteCredit() + "], deliveryState[" + delivery.getRemoteState() +
"], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag()) + "]");
}
@ -86,7 +97,7 @@ public class SendLinkHandler extends BaseLinkHandler {
this.msgSender.onFlow(sender.getRemoteCredit());
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug("linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit() + "]");
TRACE_LOGGER.debug("onLinkFlow linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit() + "]");
}
}
}

Просмотреть файл

@ -5,7 +5,6 @@
package com.microsoft.azure.eventhubs.impl;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.TimeoutException;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.reactor.Reactor;
@ -13,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Locale;
import java.util.function.BiConsumer;
@ -24,18 +24,28 @@ public class SessionHandler extends BaseHandler {
private final String entityName;
private final Consumer<Session> onRemoteSessionOpen;
private final BiConsumer<ErrorCondition, Exception> onRemoteSessionOpenError;
private final Duration openTimeout;
private boolean sessionCreated = false;
private boolean sessionOpenErrorDispatched = false;
public SessionHandler(final String entityName, final Consumer<Session> onRemoteSessionOpen, final BiConsumer<ErrorCondition, Exception> onRemoteSessionOpenError) {
public SessionHandler(final String entityName,
final Consumer<Session> onRemoteSessionOpen,
final BiConsumer<ErrorCondition, Exception> onRemoteSessionOpenError,
final Duration openTimeout) {
this.entityName = entityName;
this.onRemoteSessionOpenError = onRemoteSessionOpenError;
this.onRemoteSessionOpen = onRemoteSessionOpen;
this.openTimeout = openTimeout;
}
@Override
public void onSessionLocalOpen(Event e) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalOpen entityName[%s], condition[%s]", this.entityName,
e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString()));
}
if (this.onRemoteSessionOpenError != null) {
ReactorHandler reactorHandler = null;
@ -53,12 +63,11 @@ public class SessionHandler extends BaseHandler {
final Session session = e.getSession();
try {
reactorDispatcher.invoke(ClientConstants.SESSION_OPEN_TIMEOUT_IN_MS, new SessionTimeoutHandler(session));
reactorDispatcher.invoke((int) this.openTimeout.toMillis(), new SessionTimeoutHandler(session));
} catch (IOException ignore) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "entityName[%s], reactorDispatcherError[%s]", this.entityName, ignore.getMessage()));
TRACE_LOGGER.warn(String.format(Locale.US, "onSessionLocalOpen entityName[%s], reactorDispatcherError[%s]",
this.entityName, ignore.getMessage()));
}
session.close();
@ -66,8 +75,8 @@ public class SessionHandler extends BaseHandler {
null,
new EventHubException(
false,
String.format("underlying IO of reactorDispatcher faulted with error: %s", ignore.getMessage()),
ignore));
String.format("onSessionLocalOpen entityName[%s], underlying IO of reactorDispatcher faulted with error: %s",
this.entityName, ignore.getMessage()), ignore));
}
}
}
@ -89,7 +98,6 @@ public class SessionHandler extends BaseHandler {
this.onRemoteSessionOpen.accept(session);
}
@Override
public void onSessionLocalClose(Event e) {
if (TRACE_LOGGER.isInfoEnabled()) {
@ -132,32 +140,17 @@ public class SessionHandler extends BaseHandler {
@Override
public void onEvent() {
// It is supposed to close a local session to handle timeout exception.
// However, closing the session can result in NPE because of proton-j bug (https://issues.apache.org/jira/browse/PROTON-1939).
// And the bug will cause the reactor thread to stop processing pending tasks scheduled on the reactor and
// as a result task won't be completed at all.
// TODO: handle timeout error once the proton-j bug is fixed.
// notify - if connection or transport error'ed out before even session open completed
if (!sessionCreated && !sessionOpenErrorDispatched) {
final Connection connection = session.getConnection();
if (connection != null) {
if (connection.getRemoteCondition() != null && connection.getRemoteCondition().getCondition() != null) {
session.close();
onRemoteSessionOpenError.accept(connection.getRemoteCondition(), null);
return;
}
final Transport transport = connection.getTransport();
if (transport != null && transport.getCondition() != null && transport.getCondition().getCondition() != null) {
session.close();
onRemoteSessionOpenError.accept(transport.getCondition(), null);
return;
}
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "SessionTimeoutHandler.onEvent - session open timed out."));
}
session.close();
onRemoteSessionOpenError.accept(null, new TimeoutException("session creation timedout."));
}
}
}

Просмотреть файл

@ -24,7 +24,7 @@ final class Timer {
final ScheduledTask scheduledTask = new ScheduledTask(runnable);
final CompletableFuture<?> taskHandle = scheduledTask.getScheduledFuture();
try {
this.schedulerProvider.getReactorScheduler().invoke((int) runAfter.toMillis(), scheduledTask);
this.schedulerProvider.getReactorDispatcher().invoke((int) runAfter.toMillis(), scheduledTask);
} catch (IOException | RejectedExecutionException e) {
taskHandle.completeExceptionally(e);
}

Просмотреть файл

@ -14,7 +14,9 @@ import com.microsoft.azure.eventhubs.lib.TestContext;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class EventHubClientTest extends ApiTestBase {
@ -24,7 +26,7 @@ public class EventHubClientTest extends ApiTestBase {
final String consumerGroupName = TestContext.getConsumerGroupName();
final String partitionId = "0";
final int noOfClients = 4;
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
@SuppressWarnings("unchecked")
CompletableFuture<EventHubClient>[] createFutures = new CompletableFuture[noOfClients];

Просмотреть файл

@ -20,7 +20,7 @@ public class EventDataBatchTest extends ApiTestBase {
@Test(expected = PayloadSizeExceededException.class)
public void payloadExceededException() throws EventHubException, IOException {
final ConnectionStringBuilder connStrBuilder = TestContext.getConnectionString();
ehClient = EventHubClient.createSync(connStrBuilder.toString(), Executors.newSingleThreadExecutor());
ehClient = EventHubClient.createSync(connStrBuilder.toString(), Executors.newScheduledThreadPool(1));
final EventDataBatch batch = ehClient.createBatch();

Просмотреть файл

@ -22,16 +22,14 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
static ConnectionStringBuilder connStr;
@BeforeClass
public static void initialize() throws Exception {
public static void initialize() {
connStr = TestContext.getConnectionString();
}
@Test()
public void VerifyTaskQueueEmptyOnMsgFactoryGracefulClose() throws Exception {
final LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
final ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 1, TimeUnit.MINUTES, blockingQueue);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
try {
final EventHubClient ehClient = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -49,8 +47,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
ehClient.closeSync();
Assert.assertEquals(blockingQueue.size(), 0);
Assert.assertEquals(executor.getTaskCount(), executor.getCompletedTaskCount());
Assert.assertEquals(((ScheduledThreadPoolExecutor) executor).getQueue().size(), 0);
} finally {
executor.shutdown();
}
@ -59,9 +56,8 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test()
public void VerifyTaskQueueEmptyOnMsgFactoryWithPumpGracefulClose() throws Exception {
final LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
final ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 1, TimeUnit.MINUTES, blockingQueue);
final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
try {
final EventHubClient ehClient = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -100,8 +96,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
ehClient.closeSync();
Assert.assertEquals(blockingQueue.size(), 0);
Assert.assertEquals(executor.getTaskCount(), executor.getCompletedTaskCount());
Assert.assertEquals(((ScheduledThreadPoolExecutor) executor).getQueue().size(), 0);
} finally {
executor.shutdown();
}
@ -113,9 +108,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
final FaultInjectingReactorFactory networkOutageSimulator = new FaultInjectingReactorFactory();
networkOutageSimulator.setFaultType(FaultInjectingReactorFactory.FaultType.NetworkOutage);
final LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
final ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 1, TimeUnit.MINUTES, blockingQueue);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
try {
final CompletableFuture<MessagingFactory> openFuture = MessagingFactory.createFromConnectionString(
@ -131,8 +124,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
Thread.sleep(1000); // for reactor to transition from cleanup to complete-stop
Assert.assertEquals(0, blockingQueue.size());
Assert.assertEquals(executor.getTaskCount(), executor.getCompletedTaskCount());
Assert.assertEquals(((ScheduledThreadPoolExecutor) executor).getQueue().size(), 0);
} finally {
executor.shutdown();
}
@ -140,7 +132,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceToEventHubClient() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = new ScheduledThreadPoolExecutor(1);
testClosed.shutdown();
EventHubClient.createSync(
@ -150,7 +142,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceToSendOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = Executors.newScheduledThreadPool(1);
final EventHubClient temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -165,7 +157,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceToReceiveOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = new ScheduledThreadPoolExecutor(1);
final PartitionReceiver temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -180,7 +172,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceToCreateLinkOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = Executors.newScheduledThreadPool(1);
final EventHubClient temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -195,7 +187,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceToCreateSenderOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = new ScheduledThreadPoolExecutor(1);
final EventHubClient temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -209,7 +201,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceToCreateReceiverOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = Executors.newScheduledThreadPool(1);
final EventHubClient temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -223,7 +215,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceThenMgmtOperation() throws Throwable {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledThreadPoolExecutor testClosed = new ScheduledThreadPoolExecutor(1);
final EventHubClient temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -241,7 +233,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceThenFactoryCloseOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = Executors.newScheduledThreadPool(1);
final EventHubClient temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -255,7 +247,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceThenSenderCloseOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledThreadPoolExecutor testClosed = new ScheduledThreadPoolExecutor(1);
final PartitionSender temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),
@ -269,7 +261,7 @@ public class MsgFactoryOpenCloseTest extends ApiTestBase {
@Test(expected = RejectedExecutionException.class)
public void SupplyClosedExecutorServiceThenReceiverCloseOperation() throws Exception {
final ExecutorService testClosed = Executors.newWorkStealingPool();
final ScheduledExecutorService testClosed = Executors.newScheduledThreadPool(1);
final PartitionReceiver temp = EventHubClient.createSync(
TestContext.getConnectionString().toString(),

Просмотреть файл

@ -12,6 +12,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ReactorFaultTest extends ApiTestBase {
@ -26,27 +27,38 @@ public class ReactorFaultTest extends ApiTestBase {
@Test()
public void VerifyReactorRestartsOnProtonBugs() throws Exception {
final EventHubClient eventHubClient = EventHubClient.createSync(connStr.toString(), TestContext.EXECUTOR_SERVICE);
try {
final PartitionReceiver partitionReceiver = eventHubClient.createEpochReceiverSync(
"$default", "0", EventPosition.fromStartOfStream(), System.currentTimeMillis());
partitionReceiver.receiveSync(100);
Executors.newScheduledThreadPool(1).schedule(new Runnable() {
@Override
public void run() {
try {
final Field factoryField = EventHubClientImpl.class.getDeclaredField("underlyingFactory");
factoryField.setAccessible(true);
final MessagingFactory underlyingFactory = (MessagingFactory) factoryField.get(eventHubClient);
final Field reactorField = MessagingFactory.class.getDeclaredField("reactor");
reactorField.setAccessible(true);
final Reactor reactor = (Reactor) reactorField.get(underlyingFactory);
org.apache.qpid.proton.engine.Handler handler = reactor.getHandler();
handler.add(new BaseHandler() {
@Override
public void handle(org.apache.qpid.proton.engine.Event e) {
throw new NullPointerException();
}
});
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}, 2, TimeUnit.SECONDS);
try {
final Field factoryField = EventHubClientImpl.class.getDeclaredField("underlyingFactory");
factoryField.setAccessible(true);
final MessagingFactory underlyingFactory = (MessagingFactory) factoryField.get(eventHubClient);
final Field reactorField = MessagingFactory.class.getDeclaredField("reactor");
reactorField.setAccessible(true);
final Reactor reactor = (Reactor) reactorField.get(underlyingFactory);
org.apache.qpid.proton.engine.Handler handler = reactor.getHandler();
handler.add(new BaseHandler() {
@Override
public void handle(org.apache.qpid.proton.engine.Event e) {
throw new NullPointerException();
}
});
Thread.sleep(4000);
final Iterable<EventData> events = partitionReceiver.receiveSync(100);
Assert.assertTrue(events != null && events.iterator().hasNext());

Просмотреть файл

@ -6,11 +6,13 @@ package com.microsoft.azure.eventhubs.lib;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public final class TestContext {
public final static ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
public final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
final static String EVENT_HUB_CONNECTION_STRING_ENV_NAME = "EVENT_HUB_CONNECTION_STRING";

Просмотреть файл

@ -32,6 +32,7 @@ public class ReceivePumpTest {
public void testPumpOnReceiveEventFlow() throws Exception {
final CompletableFuture<Void> pumpRun = new CompletableFuture<>();
final ReceivePump receivePump = new ReceivePump(
"eventhub1", "consumerGroup1",
new ReceivePump.IPartitionReceiver() {
@Override
public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {
@ -82,6 +83,7 @@ public class ReceivePumpTest {
public void testPumpReceiveTransientErrorsPropagated() throws Exception {
final CompletableFuture<Void> pumpRun = new CompletableFuture<>();
final ReceivePump receivePump = new ReceivePump(
"eventhub1", "consumerGroup1",
new ReceivePump.IPartitionReceiver() {
@Override
public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {
@ -128,6 +130,7 @@ public class ReceivePumpTest {
public void testPumpReceiveExceptionsPropagated() throws Exception {
final CompletableFuture<Void> pumpRun = new CompletableFuture<>();
final ReceivePump receivePump = new ReceivePump(
"eventhub1", "consumerGroup1",
new ReceivePump.IPartitionReceiver() {
@Override
public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {
@ -175,6 +178,7 @@ public class ReceivePumpTest {
final String runtimeExceptionMsg = "random exception";
final CompletableFuture<Void> pumpRun = new CompletableFuture<>();
final ReceivePump receivePump = new ReceivePump(
"eventhub1", "consumerGroup1",
new ReceivePump.IPartitionReceiver() {
@Override
public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {

Просмотреть файл

@ -50,7 +50,7 @@ public class RequestResponseTest extends ApiTestBase {
@Test()
public void testRequestResponse() throws Exception {
final ReactorDispatcher dispatcher = factory.getReactorScheduler();
final ReactorDispatcher dispatcher = factory.getReactorDispatcher();
final RequestResponseChannel requestResponseChannel = new RequestResponseChannel(
"reqresp",
ClientConstants.MANAGEMENT_ADDRESS,