From f7cf851cb713fdd3651be5bb4d416e842cc84b7b Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sat, 13 Feb 2016 01:12:44 -0800 Subject: [PATCH] Get synchronization right --- .../azure/servicebus/MessageReceiver.java | 312 +++++++++--------- .../azure/servicebus/MessageSender.java | 106 +++--- .../azure/servicebus/MessagingFactory.java | 24 +- .../azure/servicebus/RetryPolicy.java | 21 +- .../azure/servicebus/amqp/AmqpErrorCode.java | 2 + .../servicebus/amqp/ConnectionHandler.java | 1 - .../servicebus/amqp/ReceiveLinkHandler.java | 8 +- .../servicebus/amqp/SendLinkHandler.java | 1 + .../exceptioncontracts/ReceiverRetryTest.java | 8 +- .../azure/eventhubs/lib/MockServer.java | 6 - .../lib/Sender1MsgOnLinkFlowHandler.java | 7 + java/pom.xml | 2 +- 12 files changed, 245 insertions(+), 253 deletions(-) diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 293ced58..f8f9c8e2 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -34,11 +34,10 @@ public class MessageReceiver extends ClientEntity private final String receivePath; private final Runnable onOperationTimedout; private final Duration operationTimeout; - private final Object prefetchedMessagesLock; private ConcurrentLinkedQueue prefetchedMessages; private Receiver receiveLink; - private CompletableFuture linkOpen; + private WorkItem linkOpen; private CompletableFuture linkClose; private boolean closeCalled; @@ -50,7 +49,6 @@ public class MessageReceiver extends ClientEntity private Instant dateTime; private boolean offsetInclusive; - private TimeoutTracker currentOperationTracker; private String lastReceivedOffset; private AtomicInteger pingFlowCount; private Instant lastCommunicatedAt; @@ -78,7 +76,7 @@ public class MessageReceiver extends ClientEntity ReceiveLinkHandler handler = new ReceiveLinkHandler(msgReceiver); BaseHandler.setHandler(msgReceiver.receiveLink, handler); - return msgReceiver.linkOpen; + return msgReceiver.linkOpen.getWork(); } private MessageReceiver(final MessagingFactory factory, @@ -104,7 +102,6 @@ public class MessageReceiver extends ClientEntity this.linkCreateLock = new Object(); this.receiveHandlerLock = new Object(); this.linkClose = new CompletableFuture(); - this.prefetchedMessagesLock = new Object(); if (offset != null) { @@ -117,10 +114,9 @@ public class MessageReceiver extends ClientEntity } this.receiveLink = this.createReceiveLink(); - this.currentOperationTracker = TimeoutTracker.create(factory.getOperationTimeout()); - this.linkOpen = new CompletableFuture(); - this.scheduleLinkOpenTimeout(this.currentOperationTracker); + this.linkOpen = new WorkItem(new CompletableFuture(), this.operationTimeout); + this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker()); this.linkCreateScheduled = true; this.pendingReceives = new ConcurrentLinkedQueue>>(); @@ -130,31 +126,23 @@ public class MessageReceiver extends ClientEntity { public void run() { - while(MessageReceiver.this.pendingReceives.peek() != null) + WorkItem> topWorkItem = null; + while((topWorkItem = MessageReceiver.this.pendingReceives.peek()) != null) { - WorkItem> topWorkItem = MessageReceiver.this.pendingReceives.peek(); - if (topWorkItem.getTimeoutTracker().remaining().getSeconds() < ClientConstants.TimerTolerance.getSeconds()) + if (topWorkItem.getTimeoutTracker().remaining().getSeconds() <= 0) { - synchronized (MessageReceiver.this.pendingReceives) + WorkItem> dequedWorkItem = MessageReceiver.this.pendingReceives.poll(); + if (dequedWorkItem == topWorkItem) { - WorkItem> dequedWorkItem = MessageReceiver.this.pendingReceives.poll(); - if (dequedWorkItem != null) - { - dequedWorkItem.getWork().complete(null); - } + dequedWorkItem.getWork().complete(null); } } - else + else { - WorkItem> topWorkItemToBeTimedOut = MessageReceiver.this.pendingReceives.peek(); - MessageReceiver.this.currentOperationTracker = topWorkItemToBeTimedOut.getTimeoutTracker(); - MessageReceiver.this.scheduleOperationTimer(); - + MessageReceiver.this.scheduleOperationTimer(topWorkItem.getTimeoutTracker()); return; } } - - MessageReceiver.this.currentOperationTracker = null; } }; } @@ -169,43 +157,45 @@ public class MessageReceiver extends ClientEntity */ public CompletableFuture> receive() { - if (this.receiveLink.getLocalState() != EndpointState.ACTIVE) + if (this.receiveLink.getRemoteState() == EndpointState.CLOSED) { this.scheduleRecreate(Duration.ofSeconds(0)); } - if (!this.prefetchedMessages.isEmpty()) + List returnMessages = null; + Message currentMessage = null; + Message lastMessage = null; + while ((currentMessage = this.prefetchedMessages.poll()) != null) { - synchronized (this.prefetchedMessagesLock) + if (returnMessages == null) { - if (!this.prefetchedMessages.isEmpty()) - { - Queue returnMessages = this.prefetchedMessages; - this.prefetchedMessages = new ConcurrentLinkedQueue(); - this.sendFlow(returnMessages.size()); - - this.lastReceivedOffset = IteratorUtil.getLast(returnMessages.iterator()).getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); - return CompletableFuture.completedFuture((Collection) returnMessages); - } + returnMessages = new LinkedList(); } + + returnMessages.add(currentMessage); + lastMessage = currentMessage; + } + + if (returnMessages != null) + { + this.sendFlow(returnMessages.size()); + + this.lastReceivedOffset = lastMessage.getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); + return CompletableFuture.completedFuture((Collection) returnMessages); + } + + if (this.pendingReceives.isEmpty()) + { + this.scheduleOperationTimer(TimeoutTracker.create(this.operationTimeout)); } CompletableFuture> onReceive = new CompletableFuture>(); this.pendingReceives.offer(new WorkItem>(onReceive, this.operationTimeout)); - if (this.currentOperationTracker == null && this.pendingReceives.peek() != null) + WorkItem> topWorkItem = this.pendingReceives.peek(); + if (topWorkItem != null) { - synchronized (this.pendingReceives) - { - WorkItem> topWorkItem = this.pendingReceives.peek(); - if (topWorkItem != null) - { - this.sendPingFlow(); - - this.currentOperationTracker = topWorkItem.getTimeoutTracker(); - this.scheduleOperationTimer(); - } - } + this.sendPingFlow(); } return onReceive; @@ -226,15 +216,14 @@ public class MessageReceiver extends ClientEntity if (exception == null) { this.lastCommunicatedAt = Instant.now(); - this.linkOpen.complete(this); + this.linkOpen.getWork().complete(this); } else { - this.linkOpen.completeExceptionally(exception); + this.linkOpen.getWork().completeExceptionally(exception); } this.offsetInclusive = false; // re-open link always starts from the last received offset - this.currentOperationTracker = null; this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId()); } @@ -251,61 +240,58 @@ public class MessageReceiver extends ClientEntity if (this.receiveHandler != null) { + ReceiveHandler localReceiveHandler = null; synchronized (this.receiveHandlerLock) { - if (this.receiveHandler != null) - { - assert messages != null && messages.size() > 0; - - try - { - this.receiveHandler.onReceiveMessages(messages); - this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); - } - catch (RuntimeException exception) - { - throw exception; - } - catch (Exception exception) - { - if (TRACE_LOGGER.isLoggable(Level.WARNING)) - { - TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while running user-code", - Instant.now().toString(), this.name, this.receivePath, exception.getClass())); - } - - this.receiveHandler.onError(exception); - } - - this.currentOperationTracker = TimeoutTracker.create(this.operationTimeout); - this.sendFlow(messages.size()); - } + localReceiveHandler = this.receiveHandler; } - } - else - { - WorkItem> currentReceive = this.pendingReceives.poll(); - if (currentReceive == null) + if (localReceiveHandler != null) { - synchronized (this.prefetchedMessagesLock) + assert messages != null && messages.size() > 0; + + try { - this.prefetchedMessages.addAll(messages); - this.currentOperationTracker = null; + localReceiveHandler.onReceiveMessages(messages); + this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); + this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); } - } - else - { - WorkItem> topPendingReceive = this.pendingReceives.peek(); - this.currentOperationTracker = topPendingReceive != null ? topPendingReceive.getTimeoutTracker() : null; + catch (RuntimeException exception) + { + throw exception; + } + catch (Exception exception) + { + if (TRACE_LOGGER.isLoggable(Level.WARNING)) + { + TRACE_LOGGER.log(Level.WARNING, + String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while running user-code", + Instant.now().toString(), this.name, this.receivePath, exception.getClass())); + } + + localReceiveHandler.onError(exception); + } + this.sendFlow(messages.size()); - - this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); - currentReceive.getWork().complete(messages); - } - } + return; + } + } + + WorkItem> currentReceive = this.pendingReceives.poll(); + if (currentReceive == null) + { + this.prefetchedMessages.addAll(messages); + } + else + { + this.sendFlow(messages.size()); + + this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString(); + CompletableFuture> future = currentReceive.getWork(); + future.complete(messages); + } + this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); } @@ -313,12 +299,16 @@ public class MessageReceiver extends ClientEntity { Exception completionException = ExceptionUtil.toException(error); - // if CurrentOpTracker is null - no operation is in progress - Duration remainingTime = this.currentOperationTracker == null + WorkItem> currentReceive = this.pendingReceives.peek(); + + TimeoutTracker currentOperationTracker = currentReceive != null + ? currentReceive.getTimeoutTracker() + : (this.linkOpen.getWork().isDone() ? null : this.linkOpen.getTimeoutTracker()); + Duration remainingTime = currentOperationTracker == null ? Duration.ofSeconds(0) - : (this.currentOperationTracker.elapsed().compareTo(this.operationTimeout) > 0) + : (currentOperationTracker.elapsed().compareTo(this.operationTimeout) > 0) ? Duration.ofSeconds(0) - : this.operationTimeout.minus(this.currentOperationTracker.elapsed()); + : this.operationTimeout.minus(currentOperationTracker.elapsed()); Duration retryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), completionException, remainingTime); if (retryInterval != null) @@ -327,13 +317,10 @@ public class MessageReceiver extends ClientEntity return; } - synchronized (this.linkOpen) + if (!this.linkOpen.getWork().isDone()) { - if (!this.linkOpen.isDone()) - { - this.onOpenComplete(completionException); - return; - } + this.onOpenComplete(completionException); + return; } if (completionException != null && this.receiveHandler != null) @@ -353,30 +340,30 @@ public class MessageReceiver extends ClientEntity } } } - else if (this.pendingReceives != null && !this.pendingReceives.isEmpty()) + else { - synchronized (this.pendingReceives) + WorkItem> workItem = null; + while ((workItem = this.pendingReceives.poll()) != null) { - if (this.pendingReceives != null && !this.pendingReceives.isEmpty()) - while (this.pendingReceives.peek() != null) - { - WorkItem> workItem = this.pendingReceives.poll(); - if (completionException instanceof ServiceBusException && ((ServiceBusException) completionException).getIsTransient()) - { - workItem.getWork().complete(null); - } - else - { - workItem.getWork().completeExceptionally(completionException); - } - } + CompletableFuture> future = workItem.getWork(); + if (completionException instanceof ServiceBusException && ((ServiceBusException) completionException).getIsTransient()) + { + future.complete(null); + } + else + { + future.completeExceptionally(completionException); + } } } } - private void scheduleOperationTimer() + private void scheduleOperationTimer(TimeoutTracker tracker) { - Timer.schedule(this.onOperationTimedout, this.currentOperationTracker.remaining(), TimerType.OneTimeRun); + if (tracker != null) + { + Timer.schedule(this.onOperationTimedout, tracker.remaining(), TimerType.OneTimeRun); + } } private Receiver createReceiveLink() @@ -439,21 +426,19 @@ public class MessageReceiver extends ClientEntity */ private void sendFlow(int credits) { - if (this.receiveLink.getLocalState() == EndpointState.ACTIVE) + if (this.receiveLink.getRemoteState() != EndpointState.CLOSED) { - if (this.pingFlowCount.get() != 0) + int currentPingFlow = this.pingFlowCount.get(); + if (currentPingFlow > 0) { - synchronized(this.pingFlowCount) + if (currentPingFlow < credits) { - if (this.pingFlowCount.get() < credits) - { - this.receiveLink.flow(credits - this.pingFlowCount.get()); - this.pingFlowCount.set(0); - } - else - { - this.pingFlowCount.set(this.pingFlowCount.get() - credits); - } + this.receiveLink.flow(credits - currentPingFlow); + this.pingFlowCount.set(0); + } + else + { + this.pingFlowCount.set(currentPingFlow - credits); } if(TRACE_LOGGER.isLoggable(Level.FINE)) @@ -475,7 +460,7 @@ public class MessageReceiver extends ClientEntity private void sendPingFlow() { - if (this.receiveLink.getLocalState() == EndpointState.ACTIVE) + if (this.receiveLink.getRemoteState() != EndpointState.CLOSED) { if (Instant.now().isAfter(this.lastCommunicatedAt.plus(ClientConstants.AmqpLinkDetachTimeoutInMin, ChronoUnit.DAYS)) && this.pingFlowCount.get() < MessageReceiver.PingFlowThreshold) @@ -507,21 +492,27 @@ public class MessageReceiver extends ClientEntity } this.linkCreateScheduled = true; - Timer.schedule( - new Runnable() - { - @Override - public void run() - { - MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink(); - ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this); - BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler); - MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId()); - } - }, - runAfter, - TimerType.OneTimeRun); } + + Timer.schedule( + new Runnable() + { + @Override + public void run() + { + if (MessageReceiver.this.receiveLink.getRemoteState() != EndpointState.CLOSED) + { + return; + } + + MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink(); + ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this); + BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler); + MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId()); + } + }, + runAfter, + TimerType.OneTimeRun); } private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) @@ -532,20 +523,17 @@ public class MessageReceiver extends ClientEntity { public void run() { - synchronized(linkOpen) + if (!linkOpen.getWork().isDone()) { - if (!linkOpen.isDone()) + Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open")); + if (TRACE_LOGGER.isLoggable(Level.WARNING)) { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open")); - if (TRACE_LOGGER.isLoggable(Level.WARNING)) - { - TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"), - operationTimedout); - } - - linkOpen.completeExceptionally(operationTimedout); + TRACE_LOGGER.log(Level.WARNING, + String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"), + operationTimedout); } + + linkOpen.getWork().completeExceptionally(operationTimedout); } } } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java index 25c9a607..b30cce9a 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java @@ -3,6 +3,7 @@ package com.microsoft.azure.servicebus; import java.nio.BufferOverflowException; import java.time.*; import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; @@ -88,7 +89,7 @@ public class MessageSender extends ClientEntity public CompletableFuture send(Message msg, int messageFormat) throws PayloadSizeExceededException { - if (this.sendLink.getLocalState() != EndpointState.ACTIVE) + if (this.sendLink.getRemoteState() == EndpointState.CLOSED) { this.scheduleRecreate(Duration.ofSeconds(0)); } @@ -115,11 +116,8 @@ public class MessageSender extends ClientEntity CompletableFuture onSend = new CompletableFuture(); - synchronized (this.pendingSendWaiters) - { - this.pendingSendWaiters.put(tag, new ReplayableWorkItem(bytes, sentMsgSize, messageFormat, onSend, this.operationTimeout)); - } - + this.pendingSendWaiters.put(tag, new ReplayableWorkItem(bytes, sentMsgSize, messageFormat, onSend, this.operationTimeout)); + this.sendLink.advance(); return onSend; } @@ -139,7 +137,7 @@ public class MessageSender extends ClientEntity return this.send(firstMessage); } - if (this.sendLink.getLocalState() != EndpointState.ACTIVE) + if (this.sendLink.getRemoteState() == EndpointState.CLOSED) { this.scheduleRecreate(Duration.ofSeconds(0)); } @@ -188,11 +186,8 @@ public class MessageSender extends ClientEntity assert sentMsgSize != byteArrayOffset : "Contract of the ProtonJ library for Sender.Send API changed"; CompletableFuture onSend = new CompletableFuture(); - synchronized (this.pendingSendWaitersLock) - { - this.pendingSendWaiters.put(tag, + this.pendingSendWaiters.put(tag, new ReplayableWorkItem(bytes, sentMsgSize, AmqpConstants.AmqpBatchMessageFormat, onSend, this.operationTimeout)); - } this.sendLink.advance(); return onSend; @@ -228,31 +223,31 @@ public class MessageSender extends ClientEntity } else if (!this.pendingSendWaiters.isEmpty()) { - ConcurrentHashMap> unacknowledgedSends = null; - synchronized(this.pendingSendWaitersLock) - { - unacknowledgedSends = this.pendingSendWaiters; - this.pendingSendWaiters = new ConcurrentHashMap>(); - } - - unacknowledgedSends.forEachValue(1, new Consumer>() - { - @Override - public void accept(ReplayableWorkItem sendWork) + ConcurrentHashMap> unacknowledgedSends = new ConcurrentHashMap<>(); + unacknowledgedSends.putAll(this.pendingSendWaiters); + + if (unacknowledgedSends.size() > 0) + unacknowledgedSends.forEachEntry(1, new Consumer>>() { - byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes(); - Delivery dlv = sendLink.delivery(tag); - dlv.setMessageFormat(sendWork.getMessageFormat()); - - int sentMsgSize = sendLink.send(sendWork.getMessage(), 0, sendWork.getEncodedMessageSize()); - assert sentMsgSize != sendWork.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed"; - - CompletableFuture onSend = new CompletableFuture(); - pendingSendWaiters.put(tag, new ReplayableWorkItem(sendWork.getMessage(), sendWork.getEncodedMessageSize(), sendWork.getMessageFormat(), onSend, operationTimeout)); - - sendLink.advance(); - } - }); + @Override + public void accept(Entry> sendWork) { + ReplayableWorkItem pendingSend = MessageSender.this.pendingSendWaiters.remove(sendWork.getKey()); + if (pendingSend != null) + { + byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes(); + Delivery dlv = sendLink.delivery(tag); + dlv.setMessageFormat(pendingSend.getMessageFormat()); + + int sentMsgSize = sendLink.send(pendingSend.getMessage(), 0, pendingSend.getEncodedMessageSize()); + assert sentMsgSize != pendingSend.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed"; + + CompletableFuture onSend = new CompletableFuture(); + pendingSendWaiters.put(tag, new ReplayableWorkItem(pendingSend.getMessage(), pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), onSend, operationTimeout)); + + sendLink.advance(); + } + } + }); unacknowledgedSends.clear(); } @@ -295,26 +290,33 @@ public class MessageSender extends ClientEntity { synchronized(this.linkCreateLock) { - if (!this.linkCreateScheduled) + if (this.linkCreateScheduled) { - this.linkCreateScheduled = true; - - Timer.schedule( - new Runnable() - { - @Override - public void run() - { - MessageSender.this.sendLink = MessageSender.this.createSendLink(); - SendLinkHandler handler = new SendLinkHandler(MessageSender.this); - BaseHandler.setHandler(MessageSender.this.sendLink, handler); - MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId()); - } - }, - runAfter, - TimerType.OneTimeRun); + return; } + + this.linkCreateScheduled = true; } + + Timer.schedule( + new Runnable() + { + @Override + public void run() + { + if (MessageSender.this.sendLink.getRemoteState() != EndpointState.CLOSED) + { + return; + } + + MessageSender.this.sendLink = MessageSender.this.createSendLink(); + SendLinkHandler handler = new SendLinkHandler(MessageSender.this); + BaseHandler.setHandler(MessageSender.this.sendLink, handler); + MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId()); + } + }, + runAfter, + TimerType.OneTimeRun); } public void onSendComplete(byte[] deliveryTag, DeliveryState outcome) diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java index f8004647..6e9d017c 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java @@ -29,6 +29,7 @@ public class MessagingFactory extends ClientEntity private Reactor reactor; private Thread reactorThread; private final Object reactorLock = new Object(); + private final Object connectionLock = new Object(); private ConnectionHandler connectionHandler; private Connection connection; @@ -81,13 +82,11 @@ public class MessagingFactory extends ClientEntity // Todo: async Connection getConnection() { - if (this.connection.getLocalState() != EndpointState.ACTIVE) + if (this.connection.getLocalState() == EndpointState.CLOSED) { - synchronized (this.connection) + synchronized (this.connectionLock) { - if (this.connection.getLocalState() != EndpointState.ACTIVE && - this.connection.getLocalState() != EndpointState.UNINITIALIZED && - !this.waitingConnectionOpen) + if (this.connection.getLocalState() == EndpointState.CLOSED && !this.waitingConnectionOpen) { this.connection.free(); this.connection = reactor.connection(connectionHandler); @@ -121,7 +120,7 @@ public class MessagingFactory extends ClientEntity // Contract: ConnectionHandler - MessagingFactory public void onOpenComplete(Exception exception) { - synchronized (this.connection) + synchronized (this.connectionLock) { this.waitingConnectionOpen = false; if (exception == null) @@ -137,9 +136,13 @@ public class MessagingFactory extends ClientEntity public void close() { - if (this.connection != null && this.connection.getLocalState() != EndpointState.CLOSED) + if (this.connection != null) { - this.connection.close(); + if (this.connection.getLocalState() != EndpointState.CLOSED) + { + this.connection.close(); + } + this.connection.free(); } } @@ -147,7 +150,10 @@ public class MessagingFactory extends ClientEntity @Override public CompletableFuture closeAsync() { - return null; + this.close(); + + // TODO - hook up onRemoteClose & timeout + return CompletableFuture.completedFuture(null); } public static class RunReactor implements Runnable diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/RetryPolicy.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/RetryPolicy.java index 5eccdd73..80e1ae90 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/RetryPolicy.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/RetryPolicy.java @@ -17,11 +17,8 @@ public abstract class RetryPolicy public void incrementRetryCount(String clientId) { - synchronized (clientId) - { - Integer retryCount = this.retryCounts.get(clientId); - this.retryCounts.put(clientId, retryCount == null ? 1 : retryCount + 1); - } + Integer retryCount = this.retryCounts.get(clientId); + this.retryCounts.put(clientId, retryCount == null ? 1 : retryCount + 1); } public void resetRetryCount(String clientId) @@ -29,10 +26,7 @@ public abstract class RetryPolicy Integer currentRetryCount = this.retryCounts.get(clientId); if (currentRetryCount != null && currentRetryCount != 0) { - synchronized (clientId) - { - this.retryCounts.put(clientId, 0); - } + this.retryCounts.put(clientId, 0); } } @@ -61,13 +55,10 @@ public abstract class RetryPolicy protected int getRetryCount(String clientId) { - synchronized(clientId) - { - Integer retryCount = this.retryCounts.get(clientId); - return retryCount == null ? 0 : retryCount; - } + Integer retryCount = this.retryCounts.get(clientId); + return retryCount == null ? 0 : retryCount; } - + /** * return returns 'null' Duration when not Allowed */ diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java index 66cadf6e..eee1a9d1 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java @@ -18,4 +18,6 @@ public final class AmqpErrorCode public static final Symbol PayloadSizeExceeded = Symbol.getSymbol("amqp:link:message-size-exceeded"); public static final Symbol AmqpLinkDetachForced = Symbol.getSymbol("amqp:link:detach-forced"); + // connection errors + public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced"); } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java index 78f1dbf4..4cdf4aac 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java @@ -71,7 +71,6 @@ public final class ConnectionHandler extends BaseHandler { if (TRACE_LOGGER.isLoggable(Level.WARNING)) { - System.err.println(""); TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "Error (no description returned)."); } } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java index 4b1cfc5d..66d2a02e 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java @@ -96,6 +96,8 @@ public final class ReceiveLinkHandler extends BaseLinkHandler Link link = event.getLink(); if (link instanceof Receiver) { + link.close(); + ErrorCondition condition = link.getRemoteCondition(); if (condition != null) { @@ -116,7 +118,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler } } - this.msgReceiver.onError(condition); + this.msgReceiver.onError(condition); } } @@ -133,9 +135,9 @@ public final class ReceiveLinkHandler extends BaseLinkHandler if (TRACE_LOGGER.isLoggable(Level.WARNING)) TRACE_LOGGER.log(Level.WARNING, "recvLink.onLinkRemoteDetach: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]"); } - - this.msgReceiver.onError(condition); + link.close(); + this.msgReceiver.onError(condition); } } diff --git a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java index 17196513..1289cbd6 100644 --- a/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java +++ b/java/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/SendLinkHandler.java @@ -75,6 +75,7 @@ public class SendLinkHandler extends BaseLinkHandler } } + link.close(); this.msgSender.onError(condition); } } diff --git a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java index 040aa454..1c764aac 100644 --- a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java +++ b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReceiverRetryTest.java @@ -88,7 +88,7 @@ public class ReceiverRetryTest extends TestBase server = MockServer.Create(recvFlowHandler); } - // TODO: @Test + @Test public void testRetryWhenReceiveFails() throws Exception { factory = MessagingFactory.createFromConnectionString( @@ -109,11 +109,11 @@ public class ReceiverRetryTest extends TestBase @After public void cleanup() throws IOException { - if (server != null) - server.close(); - if (factory != null) factory.close(); + + if (server != null) + server.close(); } public class TestData diff --git a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/MockServer.java b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/MockServer.java index fdf9b341..3cd47efa 100644 --- a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/MockServer.java +++ b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/MockServer.java @@ -57,11 +57,5 @@ public class MockServer implements Closeable { this.acceptor.close(); } - - if (this.reactor != null) - { - this.reactor.free(); - this.reactor = null; - } } } diff --git a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/Sender1MsgOnLinkFlowHandler.java b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/Sender1MsgOnLinkFlowHandler.java index 259a2746..ae2859ab 100644 --- a/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/Sender1MsgOnLinkFlowHandler.java +++ b/java/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/Sender1MsgOnLinkFlowHandler.java @@ -4,11 +4,14 @@ import java.util.*; import java.util.logging.Level; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.*; import org.apache.qpid.proton.engine.*; import org.apache.qpid.proton.message.*; import org.apache.qpid.proton.reactor.Handshaker; +import com.microsoft.azure.servicebus.amqp.AmqpConstants; + /** * Sends 1 Msg on the first onLinkFlow event */ @@ -39,6 +42,10 @@ public class Sender1MsgOnLinkFlowHandler extends ServerTraceHandler Map properties = new HashMap(); properties.put("testkey", "testvalue"); msg.setApplicationProperties(new ApplicationProperties(properties)); + Map annotations = new HashMap(); + annotations.put(AmqpConstants.Offset, "11111111"); + MessageAnnotations msgAnnotation = new MessageAnnotations(annotations); + msg.setMessageAnnotations(msgAnnotation); int length = msg.encode(bytes, 0, 4 * 1024); byte[] tag = String.valueOf(1).getBytes(); diff --git a/java/pom.xml b/java/pom.xml index a50170a5..a139e780 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -20,7 +20,7 @@ qpid protonj - https://repository.apache.org/content/repositories/orgapacheqpid-1061/ + https://repository.apache.org/content/repositories/orgapacheqpid-1063/