From b69510f10180fc69468cb2435df0ea491f563793 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Wed, 10 Feb 2016 20:16:49 -0800 Subject: [PATCH] Fix receiver stuck issue check ConsumerGroup Argument in createReceiver API Append TrackingID to link name --- .../azure/eventhubs/EventHubClient.java | 42 ++++++--- .../azure/eventhubs/PartitionReceiver.java | 5 + .../azure/servicebus/IteratorUtil.java | 11 +++ .../azure/servicebus/MessageReceiver.java | 93 +++++++++++-------- .../azure/servicebus/MessageSender.java | 11 ++- .../azure/servicebus/MessagingFactory.java | 42 +++++++-- .../azure/servicebus/TrackingUtil.java | 24 +++++ .../servicebus/amqp/ConnectionHandler.java | 19 +++- .../servicebus/amqp/ReceiveLinkHandler.java | 10 +- .../servicebus/amqp/SendLinkHandler.java | 4 +- .../exceptioncontracts/ReceiverRetryTest.java | 2 +- .../TimeoutExceptionTest.java | 2 +- 12 files changed, 187 insertions(+), 78 deletions(-) create mode 100644 java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/TrackingUtil.java diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index d6c305b7..8c0c1961 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -15,7 +15,7 @@ public class EventHubClient extends ClientEntity { public static final String DefaultConsumerGroupName = "$Default"; - private final MessagingFactory underlyingFactory; + private MessagingFactory underlyingFactory; private final String eventHubName; private MessageSender sender; @@ -23,7 +23,6 @@ public class EventHubClient extends ClientEntity private EventHubClient(ConnectionStringBuilder connectionString) throws IOException, IllegalEntityException { super(UUID.randomUUID().toString()); - this.underlyingFactory = MessagingFactory.createFromConnectionString(connectionString.toString()); this.eventHubName = connectionString.getEntityPath(); } @@ -34,21 +33,40 @@ public class EventHubClient extends ClientEntity ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClient eventHubClient = new EventHubClient(connStr); + if (isReceiveOnly) { - return CompletableFuture.completedFuture(eventHubClient); + return MessagingFactory.createFromConnectionString(connectionString.toString()) + .thenApplyAsync(new Function() + { + @Override + public EventHubClient apply(MessagingFactory factory) + { + eventHubClient.underlyingFactory = factory; + return eventHubClient; + } + }); } else { - return eventHubClient.createInternalSender() - .thenApplyAsync(new Function() - { - @Override - public EventHubClient apply(Void a) + return MessagingFactory.createFromConnectionString(connectionString.toString()) + .thenComposeAsync(new Function>() { - return eventHubClient; - } - }); + @Override + public CompletableFuture apply(MessagingFactory factory) + { + eventHubClient.underlyingFactory = factory; + return eventHubClient.createInternalSender() + .thenApplyAsync(new Function() + { + @Override + public EventHubClient apply(Void a) + { + return eventHubClient; + } + }); + } + }); } } @@ -66,7 +84,7 @@ public class EventHubClient extends ClientEntity return EventHubClient.createFromConnectionString(connectionString, false); } - CompletableFuture createInternalSender() throws IllegalEntityException + CompletableFuture createInternalSender() { return MessageSender.Create(this.underlyingFactory, UUID.randomUUID().toString(), this.eventHubName) .thenAcceptAsync(new Consumer() diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index 65ed3e2c..091226ce 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -60,6 +60,11 @@ public final class PartitionReceiver final boolean isEpochReceiver) throws ServiceBusException { + if (StringUtil.isNullOrWhiteSpace(consumerGroupName)) + { + throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'"); + } + final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver); return receiver.createInternalReceiver().thenApplyAsync(new Function() { diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IteratorUtil.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IteratorUtil.java index ba230f3d..262f5d8c 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IteratorUtil.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IteratorUtil.java @@ -24,4 +24,15 @@ public final class IteratorUtil return sizeEquals(iterator, expectedSize - 1); } } + + public static T getLast(Iterator iterator) + { + T last = null; + while(iterator.hasNext()) + { + last = iterator.next(); + } + + return last; + } } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index f097a455..293ced58 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -34,6 +34,7 @@ public class MessageReceiver extends ClientEntity private final String receivePath; private final Runnable onOperationTimedout; private final Duration operationTimeout; + private final Object prefetchedMessagesLock; private ConcurrentLinkedQueue prefetchedMessages; private Receiver receiveLink; @@ -74,7 +75,7 @@ public class MessageReceiver extends ClientEntity { MessageReceiver msgReceiver = new MessageReceiver(factory, name, recvPath, offset, offsetInclusive, dateTime, prefetchCount, epoch, isEpochReceiver); - ReceiveLinkHandler handler = new ReceiveLinkHandler(name, msgReceiver); + ReceiveLinkHandler handler = new ReceiveLinkHandler(msgReceiver); BaseHandler.setHandler(msgReceiver.receiveLink, handler); return msgReceiver.linkOpen; @@ -103,7 +104,8 @@ public class MessageReceiver extends ClientEntity this.linkCreateLock = new Object(); this.receiveHandlerLock = new Object(); this.linkClose = new CompletableFuture(); - + this.prefetchedMessagesLock = new Object(); + if (offset != null) { this.lastReceivedOffset = offset; @@ -174,15 +176,16 @@ public class MessageReceiver extends ClientEntity if (!this.prefetchedMessages.isEmpty()) { - synchronized (this.prefetchedMessages) + synchronized (this.prefetchedMessagesLock) { if (!this.prefetchedMessages.isEmpty()) { - // return all available msgs to application-layer and send 'link-flow' frame for prefetch - Collection returnMessages = this.prefetchedMessages; + Queue returnMessages = this.prefetchedMessages; this.prefetchedMessages = new ConcurrentLinkedQueue(); this.sendFlow(returnMessages.size()); - return CompletableFuture.completedFuture(returnMessages); + + this.lastReceivedOffset = IteratorUtil.getLast(returnMessages.iterator()).getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); + return CompletableFuture.completedFuture((Collection) returnMessages); } } } @@ -281,23 +284,25 @@ public class MessageReceiver extends ClientEntity } } else - { - synchronized (this.pendingReceives) + { + WorkItem> currentReceive = this.pendingReceives.poll(); + + if (currentReceive == null) { - if (this.pendingReceives.isEmpty()) + synchronized (this.prefetchedMessagesLock) { this.prefetchedMessages.addAll(messages); this.currentOperationTracker = null; } - else - { - WorkItem> currentReceive = this.pendingReceives.poll(); - this.currentOperationTracker = this.pendingReceives.peek() != null ? this.pendingReceives.peek().getTimeoutTracker() : null; - currentReceive.getWork().complete(messages); - this.sendFlow(messages.size()); - } - + } + else + { + WorkItem> topPendingReceive = this.pendingReceives.peek(); + this.currentOperationTracker = topPendingReceive != null ? topPendingReceive.getTimeoutTracker() : null; + this.sendFlow(messages.size()); + this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); + currentReceive.getWork().complete(messages); } } @@ -341,7 +346,7 @@ public class MessageReceiver extends ClientEntity { TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while receiving from ServiceBus service.", - Instant.now().toString(), this.getClientId(), this.receivePath, completionException.getClass())); + Instant.now().toString(), this.receiveLink.getName(), this.receivePath, completionException.getClass())); } this.receiveHandler.onError(completionException); @@ -405,9 +410,12 @@ public class MessageReceiver extends ClientEntity Map filterMap = Collections.singletonMap(AmqpConstants.StringFilter, filter); source.setFilter(filterMap); - Session ssn = this.underlyingFactory.getConnection().session(); + Connection connection = this.underlyingFactory.getConnection(); + Session ssn = connection.session(); - Receiver receiver = ssn.receiver(name); + String receiveLinkName = this.getClientId(); + receiveLinkName = receiveLinkName.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()); + Receiver receiver = ssn.receiver(receiveLinkName); receiver.setSource(source); receiver.setTarget(new Target()); @@ -433,24 +441,31 @@ public class MessageReceiver extends ClientEntity { if (this.receiveLink.getLocalState() == EndpointState.ACTIVE) { - synchronized(this.pingFlowCount) + if (this.pingFlowCount.get() != 0) { - if (this.pingFlowCount.get() < credits) + synchronized(this.pingFlowCount) { - this.receiveLink.flow(credits - this.pingFlowCount.get()); - this.pingFlowCount.set(0); - } - else - { - this.pingFlowCount.set(this.pingFlowCount.get() - credits); + if (this.pingFlowCount.get() < credits) + { + this.receiveLink.flow(credits - this.pingFlowCount.get()); + this.pingFlowCount.set(0); + } + else + { + this.pingFlowCount.set(this.pingFlowCount.get() - credits); + } } + + if(TRACE_LOGGER.isLoggable(Level.FINE)) + { + TRACE_LOGGER.log(Level.FINE, + String.format("MessageReceiver.sendFlow (linkname: %s), updated-link-credit: %s", this.receiveLink.getName(), this.receiveLink.getCredit())); + } + } + else + { + this.receiveLink.flow(credits); } - - if(TRACE_LOGGER.isLoggable(Level.FINE)) - { - TRACE_LOGGER.log(Level.FINE, - String.format("MessageReceiver.sendFlow (linkname: %s), updated-link-credit: %s", this.receiveLink.getName(), this.receiveLink.getCredit())); - } } else { @@ -499,7 +514,7 @@ public class MessageReceiver extends ClientEntity public void run() { MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink(); - ReceiveLinkHandler handler = new ReceiveLinkHandler(name, MessageReceiver.this); + ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this); BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler); MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId()); } @@ -521,11 +536,11 @@ public class MessageReceiver extends ClientEntity { if (!linkOpen.isDone()) { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, "Open")); + Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open")); if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, "Open"), + String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"), operationTimedout); } @@ -550,11 +565,11 @@ public class MessageReceiver extends ClientEntity { if (!linkClose.isDone()) { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, "Close")); + Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Close")); if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, "Close"), + String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Close"), operationTimedout); } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java index cae4069e..550ea5aa 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java @@ -51,10 +51,10 @@ public class MessageSender extends ClientEntity public static CompletableFuture Create( final MessagingFactory factory, final String sendLinkName, - final String senderPath) throws IllegalEntityException + final String senderPath) { MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath); - SendLinkHandler handler = new SendLinkHandler(sendLinkName, msgSender); + SendLinkHandler handler = new SendLinkHandler(msgSender); BaseHandler.setHandler(msgSender.sendLink, handler); return msgSender.linkOpen; } @@ -302,7 +302,7 @@ public class MessageSender extends ClientEntity public void run() { MessageSender.this.sendLink = MessageSender.this.createSendLink(); - SendLinkHandler handler = new SendLinkHandler(MessageSender.this.getClientId(), MessageSender.this); + SendLinkHandler handler = new SendLinkHandler(MessageSender.this); BaseHandler.setHandler(MessageSender.this.sendLink, handler); MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId()); } @@ -342,7 +342,9 @@ public class MessageSender extends ClientEntity Session session = connection.session(); session.open(); - Sender sender = session.sender(this.getClientId()); + String sendLinkName = this.getClientId(); + sendLinkName = sendLinkName.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()); + Sender sender = session.sender(sendLinkName); Target target = new Target(); target.setAddress(this.sendPath); @@ -374,6 +376,7 @@ public class MessageSender extends ClientEntity { Exception operationTimedout = new TimeoutException( String.format(Locale.US, "Send Link(%s) open() timed out", MessageSender.this.getClientId())); + if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java index 8a0acabd..f8004647 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java @@ -1,6 +1,7 @@ package com.microsoft.azure.servicebus; import java.io.IOException; +import java.nio.channels.*; import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -35,6 +36,7 @@ public class MessagingFactory extends ClientEntity private Duration operationTimeout; private RetryPolicy retryPolicy; + private CompletableFuture open; /** * @param reactor parameter reactor is purely for testing purposes and the SDK code should always set it to null @@ -45,12 +47,11 @@ public class MessagingFactory extends ClientEntity this.startReactor(); - this.connection = this.createConnection(builder); this.operationTimeout = builder.getOperationTimeout(); this.retryPolicy = builder.getRetryPolicy(); } - private Connection createConnection(ConnectionStringBuilder builder) + private void createConnection(ConnectionStringBuilder builder) { synchronized (this.reactorLock) { @@ -58,7 +59,8 @@ public class MessagingFactory extends ClientEntity ConnectionHandler connectionHandler = new ConnectionHandler(this, builder.getEndpoint().getHost(), builder.getSasKeyName(), builder.getSasKey()); this.waitingConnectionOpen = true; - return reactor.connection(connectionHandler); + this.connection = reactor.connection(connectionHandler); + this.open = new CompletableFuture(); } } @@ -70,12 +72,13 @@ public class MessagingFactory extends ClientEntity { this.reactor = Proton.reactor(new ReactorHandler()); - this.reactorThread = new Thread(new RunReactor(this.reactor)); + this.reactorThread = new Thread(new RunReactor(this, this.reactor)); this.reactorThread.start(); } } } + // Todo: async Connection getConnection() { if (this.connection.getLocalState() != EndpointState.ACTIVE) @@ -106,18 +109,29 @@ public class MessagingFactory extends ClientEntity return this.retryPolicy; } - public static MessagingFactory createFromConnectionString(final String connectionString) throws IOException + public static CompletableFuture createFromConnectionString(final String connectionString) throws IOException { ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString); - return new MessagingFactory(builder); + MessagingFactory messagingFactory = new MessagingFactory(builder); + + messagingFactory.createConnection(builder); + return messagingFactory.open; } // Contract: ConnectionHandler - MessagingFactory - public void onOpenComplete() + public void onOpenComplete(Exception exception) { synchronized (this.connection) { this.waitingConnectionOpen = false; + if (exception == null) + { + this.open.complete(this); + } + else + { + this.open.completeExceptionally(exception); + } } } @@ -139,10 +153,12 @@ public class MessagingFactory extends ClientEntity public static class RunReactor implements Runnable { private Reactor r; + private MessagingFactory messagingFactory; - public RunReactor(Reactor r) + public RunReactor(MessagingFactory owner, Reactor r) { this.r = r; + this.messagingFactory = owner; } public void run() @@ -158,9 +174,17 @@ public class MessagingFactory extends ClientEntity } catch (HandlerException handlerException) { + if (handlerException.getCause() != null && handlerException.getCause() instanceof UnresolvedAddressException) + { + UnresolvedAddressException unresolvedAddressException = (UnresolvedAddressException) handlerException.getCause(); + this.messagingFactory.onOpenComplete(unresolvedAddressException); + return; + } + if(TRACE_LOGGER.isLoggable(Level.WARNING)) { - TRACE_LOGGER.log(Level.WARNING, "UnHandled exception while processing events in reactor:" + handlerException.toString()); + TRACE_LOGGER.log(Level.WARNING, "UnHandled exception while processing events in reactor: " + handlerException.toString()); + handlerException.printStackTrace(); } } } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/TrackingUtil.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/TrackingUtil.java new file mode 100644 index 00000000..5537cc5f --- /dev/null +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/TrackingUtil.java @@ -0,0 +1,24 @@ +package com.microsoft.azure.servicebus; + +public final class TrackingUtil +{ + public static final String TRACKING_ID_TOKEN_SEPARATOR = "_"; + + private TrackingUtil() + { + } + + /** + * parses ServiceBus role identifiers from trackingId + * @return null if no roleIdentifier found + */ + public static String parseRoleIdentifier(final String trackingId) + { + if (StringUtil.isNullOrWhiteSpace(trackingId) || !trackingId.contains(TRACKING_ID_TOKEN_SEPARATOR)) + { + return null; + } + + return trackingId.substring(trackingId.indexOf(TRACKING_ID_TOKEN_SEPARATOR)); + } +} diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java index 00aa0b7c..78f1dbf4 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java @@ -34,7 +34,7 @@ public final class ConnectionHandler extends BaseHandler this.password = password; this.messagingFactory = messagingFactory; } - + @Override public void onConnectionBound(Event event) { @@ -46,6 +46,15 @@ public final class ConnectionHandler extends BaseHandler Sasl sasl = transport.sasl(); sasl.plain(this.username, this.password); } + + @Override + public void onConnectionUnbound(Event event) + { + if (TRACE_LOGGER.isLoggable(Level.WARNING)) + { + TRACE_LOGGER.log(Level.WARNING, "Connection.onConnectionUnbound: hostname[" + event.getConnection().getHostname() + "]"); + } + } @Override public void onTransportError(Event event) @@ -80,8 +89,12 @@ public final class ConnectionHandler extends BaseHandler @Override public void onConnectionRemoteOpen(Event event) { - Connection connection = event.getConnection(); - this.messagingFactory.onOpenComplete(); + if (TRACE_LOGGER.isLoggable(Level.FINE)) + { + TRACE_LOGGER.log(Level.FINE, "Connection.onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + "]"); + } + + this.messagingFactory.onOpenComplete(null); } private static SslDomain makeDomain(SslDomain.Mode mode) diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java index ddcac0ef..4b1cfc5d 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java @@ -26,14 +26,12 @@ import com.microsoft.azure.servicebus.MessageReceiver; */ public final class ReceiveLinkHandler extends BaseLinkHandler { - private final String name; private final MessageReceiver msgReceiver; private final Object firstResponse; private boolean isFirstResponse; - public ReceiveLinkHandler(final String name, final MessageReceiver receiver) + public ReceiveLinkHandler(final MessageReceiver receiver) { - this.name = name; this.msgReceiver = receiver; this.firstResponse = new Object(); this.isFirstResponse = true; @@ -50,7 +48,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler if(TRACE_LOGGER.isLoggable(Level.FINE)) { TRACE_LOGGER.log(Level.FINE, - String.format("ReceiveLinkHandler(name: %s) initial credit: %s", this.name, receiver.getCredit())); + String.format("ReceiveLinkHandler(name: %s) initial credit: %s", receiver.getName(), receiver.getCredit())); } } } @@ -67,7 +65,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler if(TRACE_LOGGER.isLoggable(Level.FINE)) { TRACE_LOGGER.log(Level.FINE, - String.format("ReceiveLinkHandler(name: %s) RemoteSource: %s", this.name, link.getRemoteSource())); + String.format("ReceiveLinkHandler(name: %s) RemoteSource: %s", receiver.getName(), link.getRemoteSource())); } synchronized (this.firstResponse) @@ -81,7 +79,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler if(TRACE_LOGGER.isLoggable(Level.FINE)) { TRACE_LOGGER.log(Level.FINE, - String.format("ReceiveLinkHandler(name: %s): remote Target Source set to null. waiting for error.", this.name)); + String.format("ReceiveLinkHandler(name: %s): remote Target Source set to null. waiting for error.", receiver.getName())); } } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java index eee8747f..17196513 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java @@ -10,14 +10,12 @@ import com.microsoft.azure.servicebus.MessageSender; public class SendLinkHandler extends BaseLinkHandler { - private final String name; private final MessageSender msgSender; private final Object firstFlow; private boolean isFirstFlow; - public SendLinkHandler(final String name, final MessageSender sender) + public SendLinkHandler(final MessageSender sender) { - this.name = name; this.msgSender = sender; this.firstFlow = new Object(); this.isFirstFlow = true; diff --git a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java index 120c8995..040aa454 100644 --- a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java +++ b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java @@ -92,7 +92,7 @@ public class ReceiverRetryTest extends TestBase public void testRetryWhenReceiveFails() throws Exception { factory = MessagingFactory.createFromConnectionString( - new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString()); + new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString()).get(); MessageReceiver receiver = MessageReceiver.create(factory, "receiver1", "eventhub1/consumergroups/$default/partitions/0", "-1", false, null, 100, 0, false).get(); diff --git a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/TimeoutExceptionTest.java b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/TimeoutExceptionTest.java index ee5f05c2..4617b91f 100644 --- a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/TimeoutExceptionTest.java +++ b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/TimeoutExceptionTest.java @@ -17,7 +17,7 @@ public class TimeoutExceptionTest { MockServer server = MockServer.Create(null); MessagingFactory factory = MessagingFactory.createFromConnectionString( - new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString()); + new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString()).get(); try {