Get synchronization right
This commit is contained in:
Родитель
d21dce8926
Коммит
f7cf851cb7
|
@ -34,11 +34,10 @@ public class MessageReceiver extends ClientEntity
|
|||
private final String receivePath;
|
||||
private final Runnable onOperationTimedout;
|
||||
private final Duration operationTimeout;
|
||||
private final Object prefetchedMessagesLock;
|
||||
|
||||
private ConcurrentLinkedQueue<Message> prefetchedMessages;
|
||||
private Receiver receiveLink;
|
||||
private CompletableFuture<MessageReceiver> linkOpen;
|
||||
private WorkItem<MessageReceiver> linkOpen;
|
||||
private CompletableFuture<Void> linkClose;
|
||||
private boolean closeCalled;
|
||||
|
||||
|
@ -50,7 +49,6 @@ public class MessageReceiver extends ClientEntity
|
|||
private Instant dateTime;
|
||||
private boolean offsetInclusive;
|
||||
|
||||
private TimeoutTracker currentOperationTracker;
|
||||
private String lastReceivedOffset;
|
||||
private AtomicInteger pingFlowCount;
|
||||
private Instant lastCommunicatedAt;
|
||||
|
@ -78,7 +76,7 @@ public class MessageReceiver extends ClientEntity
|
|||
ReceiveLinkHandler handler = new ReceiveLinkHandler(msgReceiver);
|
||||
BaseHandler.setHandler(msgReceiver.receiveLink, handler);
|
||||
|
||||
return msgReceiver.linkOpen;
|
||||
return msgReceiver.linkOpen.getWork();
|
||||
}
|
||||
|
||||
private MessageReceiver(final MessagingFactory factory,
|
||||
|
@ -104,7 +102,6 @@ public class MessageReceiver extends ClientEntity
|
|||
this.linkCreateLock = new Object();
|
||||
this.receiveHandlerLock = new Object();
|
||||
this.linkClose = new CompletableFuture<Void>();
|
||||
this.prefetchedMessagesLock = new Object();
|
||||
|
||||
if (offset != null)
|
||||
{
|
||||
|
@ -117,10 +114,9 @@ public class MessageReceiver extends ClientEntity
|
|||
}
|
||||
|
||||
this.receiveLink = this.createReceiveLink();
|
||||
this.currentOperationTracker = TimeoutTracker.create(factory.getOperationTimeout());
|
||||
|
||||
this.linkOpen = new CompletableFuture<MessageReceiver>();
|
||||
this.scheduleLinkOpenTimeout(this.currentOperationTracker);
|
||||
this.linkOpen = new WorkItem<MessageReceiver>(new CompletableFuture<MessageReceiver>(), this.operationTimeout);
|
||||
this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
|
||||
this.linkCreateScheduled = true;
|
||||
|
||||
this.pendingReceives = new ConcurrentLinkedQueue<WorkItem<Collection<Message>>>();
|
||||
|
@ -130,31 +126,23 @@ public class MessageReceiver extends ClientEntity
|
|||
{
|
||||
public void run()
|
||||
{
|
||||
while(MessageReceiver.this.pendingReceives.peek() != null)
|
||||
WorkItem<Collection<Message>> topWorkItem = null;
|
||||
while((topWorkItem = MessageReceiver.this.pendingReceives.peek()) != null)
|
||||
{
|
||||
WorkItem<Collection<Message>> topWorkItem = MessageReceiver.this.pendingReceives.peek();
|
||||
if (topWorkItem.getTimeoutTracker().remaining().getSeconds() < ClientConstants.TimerTolerance.getSeconds())
|
||||
if (topWorkItem.getTimeoutTracker().remaining().getSeconds() <= 0)
|
||||
{
|
||||
synchronized (MessageReceiver.this.pendingReceives)
|
||||
WorkItem<Collection<Message>> dequedWorkItem = MessageReceiver.this.pendingReceives.poll();
|
||||
if (dequedWorkItem == topWorkItem)
|
||||
{
|
||||
WorkItem<Collection<Message>> dequedWorkItem = MessageReceiver.this.pendingReceives.poll();
|
||||
if (dequedWorkItem != null)
|
||||
{
|
||||
dequedWorkItem.getWork().complete(null);
|
||||
}
|
||||
dequedWorkItem.getWork().complete(null);
|
||||
}
|
||||
}
|
||||
else
|
||||
else
|
||||
{
|
||||
WorkItem<Collection<Message>> topWorkItemToBeTimedOut = MessageReceiver.this.pendingReceives.peek();
|
||||
MessageReceiver.this.currentOperationTracker = topWorkItemToBeTimedOut.getTimeoutTracker();
|
||||
MessageReceiver.this.scheduleOperationTimer();
|
||||
|
||||
MessageReceiver.this.scheduleOperationTimer(topWorkItem.getTimeoutTracker());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
MessageReceiver.this.currentOperationTracker = null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -169,43 +157,45 @@ public class MessageReceiver extends ClientEntity
|
|||
*/
|
||||
public CompletableFuture<Collection<Message>> receive()
|
||||
{
|
||||
if (this.receiveLink.getLocalState() != EndpointState.ACTIVE)
|
||||
if (this.receiveLink.getRemoteState() == EndpointState.CLOSED)
|
||||
{
|
||||
this.scheduleRecreate(Duration.ofSeconds(0));
|
||||
}
|
||||
|
||||
if (!this.prefetchedMessages.isEmpty())
|
||||
List<Message> returnMessages = null;
|
||||
Message currentMessage = null;
|
||||
Message lastMessage = null;
|
||||
while ((currentMessage = this.prefetchedMessages.poll()) != null)
|
||||
{
|
||||
synchronized (this.prefetchedMessagesLock)
|
||||
if (returnMessages == null)
|
||||
{
|
||||
if (!this.prefetchedMessages.isEmpty())
|
||||
{
|
||||
Queue<Message> returnMessages = this.prefetchedMessages;
|
||||
this.prefetchedMessages = new ConcurrentLinkedQueue<Message>();
|
||||
this.sendFlow(returnMessages.size());
|
||||
|
||||
this.lastReceivedOffset = IteratorUtil.getLast(returnMessages.iterator()).getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
|
||||
return CompletableFuture.completedFuture((Collection<Message>) returnMessages);
|
||||
}
|
||||
returnMessages = new LinkedList<Message>();
|
||||
}
|
||||
|
||||
returnMessages.add(currentMessage);
|
||||
lastMessage = currentMessage;
|
||||
}
|
||||
|
||||
if (returnMessages != null)
|
||||
{
|
||||
this.sendFlow(returnMessages.size());
|
||||
|
||||
this.lastReceivedOffset = lastMessage.getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
|
||||
return CompletableFuture.completedFuture((Collection<Message>) returnMessages);
|
||||
}
|
||||
|
||||
if (this.pendingReceives.isEmpty())
|
||||
{
|
||||
this.scheduleOperationTimer(TimeoutTracker.create(this.operationTimeout));
|
||||
}
|
||||
|
||||
CompletableFuture<Collection<Message>> onReceive = new CompletableFuture<Collection<Message>>();
|
||||
this.pendingReceives.offer(new WorkItem<Collection<Message>>(onReceive, this.operationTimeout));
|
||||
|
||||
if (this.currentOperationTracker == null && this.pendingReceives.peek() != null)
|
||||
WorkItem<Collection<Message>> topWorkItem = this.pendingReceives.peek();
|
||||
if (topWorkItem != null)
|
||||
{
|
||||
synchronized (this.pendingReceives)
|
||||
{
|
||||
WorkItem<Collection<Message>> topWorkItem = this.pendingReceives.peek();
|
||||
if (topWorkItem != null)
|
||||
{
|
||||
this.sendPingFlow();
|
||||
|
||||
this.currentOperationTracker = topWorkItem.getTimeoutTracker();
|
||||
this.scheduleOperationTimer();
|
||||
}
|
||||
}
|
||||
this.sendPingFlow();
|
||||
}
|
||||
|
||||
return onReceive;
|
||||
|
@ -226,15 +216,14 @@ public class MessageReceiver extends ClientEntity
|
|||
if (exception == null)
|
||||
{
|
||||
this.lastCommunicatedAt = Instant.now();
|
||||
this.linkOpen.complete(this);
|
||||
this.linkOpen.getWork().complete(this);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.linkOpen.completeExceptionally(exception);
|
||||
this.linkOpen.getWork().completeExceptionally(exception);
|
||||
}
|
||||
|
||||
this.offsetInclusive = false; // re-open link always starts from the last received offset
|
||||
this.currentOperationTracker = null;
|
||||
this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
|
||||
}
|
||||
|
||||
|
@ -251,61 +240,58 @@ public class MessageReceiver extends ClientEntity
|
|||
|
||||
if (this.receiveHandler != null)
|
||||
{
|
||||
ReceiveHandler localReceiveHandler = null;
|
||||
synchronized (this.receiveHandlerLock)
|
||||
{
|
||||
if (this.receiveHandler != null)
|
||||
{
|
||||
assert messages != null && messages.size() > 0;
|
||||
|
||||
try
|
||||
{
|
||||
this.receiveHandler.onReceiveMessages(messages);
|
||||
this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
|
||||
}
|
||||
catch (RuntimeException exception)
|
||||
{
|
||||
throw exception;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
if (TRACE_LOGGER.isLoggable(Level.WARNING))
|
||||
{
|
||||
TRACE_LOGGER.log(Level.WARNING,
|
||||
String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while running user-code",
|
||||
Instant.now().toString(), this.name, this.receivePath, exception.getClass()));
|
||||
}
|
||||
|
||||
this.receiveHandler.onError(exception);
|
||||
}
|
||||
|
||||
this.currentOperationTracker = TimeoutTracker.create(this.operationTimeout);
|
||||
this.sendFlow(messages.size());
|
||||
}
|
||||
localReceiveHandler = this.receiveHandler;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
WorkItem<Collection<Message>> currentReceive = this.pendingReceives.poll();
|
||||
|
||||
if (currentReceive == null)
|
||||
if (localReceiveHandler != null)
|
||||
{
|
||||
synchronized (this.prefetchedMessagesLock)
|
||||
assert messages != null && messages.size() > 0;
|
||||
|
||||
try
|
||||
{
|
||||
this.prefetchedMessages.addAll(messages);
|
||||
this.currentOperationTracker = null;
|
||||
localReceiveHandler.onReceiveMessages(messages);
|
||||
this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
|
||||
this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
WorkItem<Collection<Message>> topPendingReceive = this.pendingReceives.peek();
|
||||
this.currentOperationTracker = topPendingReceive != null ? topPendingReceive.getTimeoutTracker() : null;
|
||||
catch (RuntimeException exception)
|
||||
{
|
||||
throw exception;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
if (TRACE_LOGGER.isLoggable(Level.WARNING))
|
||||
{
|
||||
TRACE_LOGGER.log(Level.WARNING,
|
||||
String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while running user-code",
|
||||
Instant.now().toString(), this.name, this.receivePath, exception.getClass()));
|
||||
}
|
||||
|
||||
localReceiveHandler.onError(exception);
|
||||
}
|
||||
|
||||
this.sendFlow(messages.size());
|
||||
|
||||
this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
|
||||
currentReceive.getWork().complete(messages);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
WorkItem<Collection<Message>> currentReceive = this.pendingReceives.poll();
|
||||
|
||||
if (currentReceive == null)
|
||||
{
|
||||
this.prefetchedMessages.addAll(messages);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.sendFlow(messages.size());
|
||||
|
||||
this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
|
||||
CompletableFuture<Collection<Message>> future = currentReceive.getWork();
|
||||
future.complete(messages);
|
||||
}
|
||||
|
||||
this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
|
||||
}
|
||||
|
||||
|
@ -313,12 +299,16 @@ public class MessageReceiver extends ClientEntity
|
|||
{
|
||||
Exception completionException = ExceptionUtil.toException(error);
|
||||
|
||||
// if CurrentOpTracker is null - no operation is in progress
|
||||
Duration remainingTime = this.currentOperationTracker == null
|
||||
WorkItem<Collection<Message>> currentReceive = this.pendingReceives.peek();
|
||||
|
||||
TimeoutTracker currentOperationTracker = currentReceive != null
|
||||
? currentReceive.getTimeoutTracker()
|
||||
: (this.linkOpen.getWork().isDone() ? null : this.linkOpen.getTimeoutTracker());
|
||||
Duration remainingTime = currentOperationTracker == null
|
||||
? Duration.ofSeconds(0)
|
||||
: (this.currentOperationTracker.elapsed().compareTo(this.operationTimeout) > 0)
|
||||
: (currentOperationTracker.elapsed().compareTo(this.operationTimeout) > 0)
|
||||
? Duration.ofSeconds(0)
|
||||
: this.operationTimeout.minus(this.currentOperationTracker.elapsed());
|
||||
: this.operationTimeout.minus(currentOperationTracker.elapsed());
|
||||
Duration retryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), completionException, remainingTime);
|
||||
|
||||
if (retryInterval != null)
|
||||
|
@ -327,13 +317,10 @@ public class MessageReceiver extends ClientEntity
|
|||
return;
|
||||
}
|
||||
|
||||
synchronized (this.linkOpen)
|
||||
if (!this.linkOpen.getWork().isDone())
|
||||
{
|
||||
if (!this.linkOpen.isDone())
|
||||
{
|
||||
this.onOpenComplete(completionException);
|
||||
return;
|
||||
}
|
||||
this.onOpenComplete(completionException);
|
||||
return;
|
||||
}
|
||||
|
||||
if (completionException != null && this.receiveHandler != null)
|
||||
|
@ -353,30 +340,30 @@ public class MessageReceiver extends ClientEntity
|
|||
}
|
||||
}
|
||||
}
|
||||
else if (this.pendingReceives != null && !this.pendingReceives.isEmpty())
|
||||
else
|
||||
{
|
||||
synchronized (this.pendingReceives)
|
||||
WorkItem<Collection<Message>> workItem = null;
|
||||
while ((workItem = this.pendingReceives.poll()) != null)
|
||||
{
|
||||
if (this.pendingReceives != null && !this.pendingReceives.isEmpty())
|
||||
while (this.pendingReceives.peek() != null)
|
||||
{
|
||||
WorkItem<Collection<Message>> workItem = this.pendingReceives.poll();
|
||||
if (completionException instanceof ServiceBusException && ((ServiceBusException) completionException).getIsTransient())
|
||||
{
|
||||
workItem.getWork().complete(null);
|
||||
}
|
||||
else
|
||||
{
|
||||
workItem.getWork().completeExceptionally(completionException);
|
||||
}
|
||||
}
|
||||
CompletableFuture<Collection<Message>> future = workItem.getWork();
|
||||
if (completionException instanceof ServiceBusException && ((ServiceBusException) completionException).getIsTransient())
|
||||
{
|
||||
future.complete(null);
|
||||
}
|
||||
else
|
||||
{
|
||||
future.completeExceptionally(completionException);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleOperationTimer()
|
||||
private void scheduleOperationTimer(TimeoutTracker tracker)
|
||||
{
|
||||
Timer.schedule(this.onOperationTimedout, this.currentOperationTracker.remaining(), TimerType.OneTimeRun);
|
||||
if (tracker != null)
|
||||
{
|
||||
Timer.schedule(this.onOperationTimedout, tracker.remaining(), TimerType.OneTimeRun);
|
||||
}
|
||||
}
|
||||
|
||||
private Receiver createReceiveLink()
|
||||
|
@ -439,21 +426,19 @@ public class MessageReceiver extends ClientEntity
|
|||
*/
|
||||
private void sendFlow(int credits)
|
||||
{
|
||||
if (this.receiveLink.getLocalState() == EndpointState.ACTIVE)
|
||||
if (this.receiveLink.getRemoteState() != EndpointState.CLOSED)
|
||||
{
|
||||
if (this.pingFlowCount.get() != 0)
|
||||
int currentPingFlow = this.pingFlowCount.get();
|
||||
if (currentPingFlow > 0)
|
||||
{
|
||||
synchronized(this.pingFlowCount)
|
||||
if (currentPingFlow < credits)
|
||||
{
|
||||
if (this.pingFlowCount.get() < credits)
|
||||
{
|
||||
this.receiveLink.flow(credits - this.pingFlowCount.get());
|
||||
this.pingFlowCount.set(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.pingFlowCount.set(this.pingFlowCount.get() - credits);
|
||||
}
|
||||
this.receiveLink.flow(credits - currentPingFlow);
|
||||
this.pingFlowCount.set(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.pingFlowCount.set(currentPingFlow - credits);
|
||||
}
|
||||
|
||||
if(TRACE_LOGGER.isLoggable(Level.FINE))
|
||||
|
@ -475,7 +460,7 @@ public class MessageReceiver extends ClientEntity
|
|||
|
||||
private void sendPingFlow()
|
||||
{
|
||||
if (this.receiveLink.getLocalState() == EndpointState.ACTIVE)
|
||||
if (this.receiveLink.getRemoteState() != EndpointState.CLOSED)
|
||||
{
|
||||
if (Instant.now().isAfter(this.lastCommunicatedAt.plus(ClientConstants.AmqpLinkDetachTimeoutInMin, ChronoUnit.DAYS))
|
||||
&& this.pingFlowCount.get() < MessageReceiver.PingFlowThreshold)
|
||||
|
@ -507,21 +492,27 @@ public class MessageReceiver extends ClientEntity
|
|||
}
|
||||
|
||||
this.linkCreateScheduled = true;
|
||||
Timer.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink();
|
||||
ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this);
|
||||
BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler);
|
||||
MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId());
|
||||
}
|
||||
},
|
||||
runAfter,
|
||||
TimerType.OneTimeRun);
|
||||
}
|
||||
|
||||
Timer.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
if (MessageReceiver.this.receiveLink.getRemoteState() != EndpointState.CLOSED)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink();
|
||||
ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this);
|
||||
BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler);
|
||||
MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId());
|
||||
}
|
||||
},
|
||||
runAfter,
|
||||
TimerType.OneTimeRun);
|
||||
}
|
||||
|
||||
private void scheduleLinkOpenTimeout(final TimeoutTracker timeout)
|
||||
|
@ -532,20 +523,17 @@ public class MessageReceiver extends ClientEntity
|
|||
{
|
||||
public void run()
|
||||
{
|
||||
synchronized(linkOpen)
|
||||
if (!linkOpen.getWork().isDone())
|
||||
{
|
||||
if (!linkOpen.isDone())
|
||||
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open"));
|
||||
if (TRACE_LOGGER.isLoggable(Level.WARNING))
|
||||
{
|
||||
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open"));
|
||||
if (TRACE_LOGGER.isLoggable(Level.WARNING))
|
||||
{
|
||||
TRACE_LOGGER.log(Level.WARNING,
|
||||
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"),
|
||||
operationTimedout);
|
||||
}
|
||||
|
||||
linkOpen.completeExceptionally(operationTimedout);
|
||||
TRACE_LOGGER.log(Level.WARNING,
|
||||
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"),
|
||||
operationTimedout);
|
||||
}
|
||||
|
||||
linkOpen.getWork().completeExceptionally(operationTimedout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package com.microsoft.azure.servicebus;
|
|||
import java.nio.BufferOverflowException;
|
||||
import java.time.*;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.function.*;
|
||||
|
@ -88,7 +89,7 @@ public class MessageSender extends ClientEntity
|
|||
|
||||
public CompletableFuture<Void> send(Message msg, int messageFormat) throws PayloadSizeExceededException
|
||||
{
|
||||
if (this.sendLink.getLocalState() != EndpointState.ACTIVE)
|
||||
if (this.sendLink.getRemoteState() == EndpointState.CLOSED)
|
||||
{
|
||||
this.scheduleRecreate(Duration.ofSeconds(0));
|
||||
}
|
||||
|
@ -115,11 +116,8 @@ public class MessageSender extends ClientEntity
|
|||
|
||||
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
|
||||
|
||||
synchronized (this.pendingSendWaiters)
|
||||
{
|
||||
this.pendingSendWaiters.put(tag, new ReplayableWorkItem<Void>(bytes, sentMsgSize, messageFormat, onSend, this.operationTimeout));
|
||||
}
|
||||
|
||||
this.pendingSendWaiters.put(tag, new ReplayableWorkItem<Void>(bytes, sentMsgSize, messageFormat, onSend, this.operationTimeout));
|
||||
|
||||
this.sendLink.advance();
|
||||
return onSend;
|
||||
}
|
||||
|
@ -139,7 +137,7 @@ public class MessageSender extends ClientEntity
|
|||
return this.send(firstMessage);
|
||||
}
|
||||
|
||||
if (this.sendLink.getLocalState() != EndpointState.ACTIVE)
|
||||
if (this.sendLink.getRemoteState() == EndpointState.CLOSED)
|
||||
{
|
||||
this.scheduleRecreate(Duration.ofSeconds(0));
|
||||
}
|
||||
|
@ -188,11 +186,8 @@ public class MessageSender extends ClientEntity
|
|||
assert sentMsgSize != byteArrayOffset : "Contract of the ProtonJ library for Sender.Send API changed";
|
||||
|
||||
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
|
||||
synchronized (this.pendingSendWaitersLock)
|
||||
{
|
||||
this.pendingSendWaiters.put(tag,
|
||||
this.pendingSendWaiters.put(tag,
|
||||
new ReplayableWorkItem<Void>(bytes, sentMsgSize, AmqpConstants.AmqpBatchMessageFormat, onSend, this.operationTimeout));
|
||||
}
|
||||
|
||||
this.sendLink.advance();
|
||||
return onSend;
|
||||
|
@ -228,31 +223,31 @@ public class MessageSender extends ClientEntity
|
|||
}
|
||||
else if (!this.pendingSendWaiters.isEmpty())
|
||||
{
|
||||
ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> unacknowledgedSends = null;
|
||||
synchronized(this.pendingSendWaitersLock)
|
||||
{
|
||||
unacknowledgedSends = this.pendingSendWaiters;
|
||||
this.pendingSendWaiters = new ConcurrentHashMap<byte[], ReplayableWorkItem<Void>>();
|
||||
}
|
||||
|
||||
unacknowledgedSends.forEachValue(1, new Consumer<ReplayableWorkItem<Void>>()
|
||||
{
|
||||
@Override
|
||||
public void accept(ReplayableWorkItem<Void> sendWork)
|
||||
ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> unacknowledgedSends = new ConcurrentHashMap<>();
|
||||
unacknowledgedSends.putAll(this.pendingSendWaiters);
|
||||
|
||||
if (unacknowledgedSends.size() > 0)
|
||||
unacknowledgedSends.forEachEntry(1, new Consumer<Map.Entry<byte[], ReplayableWorkItem<Void>>>()
|
||||
{
|
||||
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
|
||||
Delivery dlv = sendLink.delivery(tag);
|
||||
dlv.setMessageFormat(sendWork.getMessageFormat());
|
||||
|
||||
int sentMsgSize = sendLink.send(sendWork.getMessage(), 0, sendWork.getEncodedMessageSize());
|
||||
assert sentMsgSize != sendWork.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
|
||||
|
||||
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
|
||||
pendingSendWaiters.put(tag, new ReplayableWorkItem<Void>(sendWork.getMessage(), sendWork.getEncodedMessageSize(), sendWork.getMessageFormat(), onSend, operationTimeout));
|
||||
|
||||
sendLink.advance();
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void accept(Entry<byte[], ReplayableWorkItem<Void>> sendWork) {
|
||||
ReplayableWorkItem<Void> pendingSend = MessageSender.this.pendingSendWaiters.remove(sendWork.getKey());
|
||||
if (pendingSend != null)
|
||||
{
|
||||
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
|
||||
Delivery dlv = sendLink.delivery(tag);
|
||||
dlv.setMessageFormat(pendingSend.getMessageFormat());
|
||||
|
||||
int sentMsgSize = sendLink.send(pendingSend.getMessage(), 0, pendingSend.getEncodedMessageSize());
|
||||
assert sentMsgSize != pendingSend.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
|
||||
|
||||
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
|
||||
pendingSendWaiters.put(tag, new ReplayableWorkItem<Void>(pendingSend.getMessage(), pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), onSend, operationTimeout));
|
||||
|
||||
sendLink.advance();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
unacknowledgedSends.clear();
|
||||
}
|
||||
|
@ -295,26 +290,33 @@ public class MessageSender extends ClientEntity
|
|||
{
|
||||
synchronized(this.linkCreateLock)
|
||||
{
|
||||
if (!this.linkCreateScheduled)
|
||||
if (this.linkCreateScheduled)
|
||||
{
|
||||
this.linkCreateScheduled = true;
|
||||
|
||||
Timer.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
MessageSender.this.sendLink = MessageSender.this.createSendLink();
|
||||
SendLinkHandler handler = new SendLinkHandler(MessageSender.this);
|
||||
BaseHandler.setHandler(MessageSender.this.sendLink, handler);
|
||||
MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId());
|
||||
}
|
||||
},
|
||||
runAfter,
|
||||
TimerType.OneTimeRun);
|
||||
return;
|
||||
}
|
||||
|
||||
this.linkCreateScheduled = true;
|
||||
}
|
||||
|
||||
Timer.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
if (MessageSender.this.sendLink.getRemoteState() != EndpointState.CLOSED)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MessageSender.this.sendLink = MessageSender.this.createSendLink();
|
||||
SendLinkHandler handler = new SendLinkHandler(MessageSender.this);
|
||||
BaseHandler.setHandler(MessageSender.this.sendLink, handler);
|
||||
MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId());
|
||||
}
|
||||
},
|
||||
runAfter,
|
||||
TimerType.OneTimeRun);
|
||||
}
|
||||
|
||||
public void onSendComplete(byte[] deliveryTag, DeliveryState outcome)
|
||||
|
|
|
@ -29,6 +29,7 @@ public class MessagingFactory extends ClientEntity
|
|||
private Reactor reactor;
|
||||
private Thread reactorThread;
|
||||
private final Object reactorLock = new Object();
|
||||
private final Object connectionLock = new Object();
|
||||
|
||||
private ConnectionHandler connectionHandler;
|
||||
private Connection connection;
|
||||
|
@ -81,13 +82,11 @@ public class MessagingFactory extends ClientEntity
|
|||
// Todo: async
|
||||
Connection getConnection()
|
||||
{
|
||||
if (this.connection.getLocalState() != EndpointState.ACTIVE)
|
||||
if (this.connection.getLocalState() == EndpointState.CLOSED)
|
||||
{
|
||||
synchronized (this.connection)
|
||||
synchronized (this.connectionLock)
|
||||
{
|
||||
if (this.connection.getLocalState() != EndpointState.ACTIVE &&
|
||||
this.connection.getLocalState() != EndpointState.UNINITIALIZED &&
|
||||
!this.waitingConnectionOpen)
|
||||
if (this.connection.getLocalState() == EndpointState.CLOSED && !this.waitingConnectionOpen)
|
||||
{
|
||||
this.connection.free();
|
||||
this.connection = reactor.connection(connectionHandler);
|
||||
|
@ -121,7 +120,7 @@ public class MessagingFactory extends ClientEntity
|
|||
// Contract: ConnectionHandler - MessagingFactory
|
||||
public void onOpenComplete(Exception exception)
|
||||
{
|
||||
synchronized (this.connection)
|
||||
synchronized (this.connectionLock)
|
||||
{
|
||||
this.waitingConnectionOpen = false;
|
||||
if (exception == null)
|
||||
|
@ -137,9 +136,13 @@ public class MessagingFactory extends ClientEntity
|
|||
|
||||
public void close()
|
||||
{
|
||||
if (this.connection != null && this.connection.getLocalState() != EndpointState.CLOSED)
|
||||
if (this.connection != null)
|
||||
{
|
||||
this.connection.close();
|
||||
if (this.connection.getLocalState() != EndpointState.CLOSED)
|
||||
{
|
||||
this.connection.close();
|
||||
}
|
||||
|
||||
this.connection.free();
|
||||
}
|
||||
}
|
||||
|
@ -147,7 +150,10 @@ public class MessagingFactory extends ClientEntity
|
|||
@Override
|
||||
public CompletableFuture<Void> closeAsync()
|
||||
{
|
||||
return null;
|
||||
this.close();
|
||||
|
||||
// TODO - hook up onRemoteClose & timeout
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
public static class RunReactor implements Runnable
|
||||
|
|
|
@ -17,11 +17,8 @@ public abstract class RetryPolicy
|
|||
|
||||
public void incrementRetryCount(String clientId)
|
||||
{
|
||||
synchronized (clientId)
|
||||
{
|
||||
Integer retryCount = this.retryCounts.get(clientId);
|
||||
this.retryCounts.put(clientId, retryCount == null ? 1 : retryCount + 1);
|
||||
}
|
||||
Integer retryCount = this.retryCounts.get(clientId);
|
||||
this.retryCounts.put(clientId, retryCount == null ? 1 : retryCount + 1);
|
||||
}
|
||||
|
||||
public void resetRetryCount(String clientId)
|
||||
|
@ -29,10 +26,7 @@ public abstract class RetryPolicy
|
|||
Integer currentRetryCount = this.retryCounts.get(clientId);
|
||||
if (currentRetryCount != null && currentRetryCount != 0)
|
||||
{
|
||||
synchronized (clientId)
|
||||
{
|
||||
this.retryCounts.put(clientId, 0);
|
||||
}
|
||||
this.retryCounts.put(clientId, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,13 +55,10 @@ public abstract class RetryPolicy
|
|||
|
||||
protected int getRetryCount(String clientId)
|
||||
{
|
||||
synchronized(clientId)
|
||||
{
|
||||
Integer retryCount = this.retryCounts.get(clientId);
|
||||
return retryCount == null ? 0 : retryCount;
|
||||
}
|
||||
Integer retryCount = this.retryCounts.get(clientId);
|
||||
return retryCount == null ? 0 : retryCount;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* return returns 'null' Duration when not Allowed
|
||||
*/
|
||||
|
|
|
@ -18,4 +18,6 @@ public final class AmqpErrorCode
|
|||
public static final Symbol PayloadSizeExceeded = Symbol.getSymbol("amqp:link:message-size-exceeded");
|
||||
public static final Symbol AmqpLinkDetachForced = Symbol.getSymbol("amqp:link:detach-forced");
|
||||
|
||||
// connection errors
|
||||
public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced");
|
||||
}
|
||||
|
|
|
@ -71,7 +71,6 @@ public final class ConnectionHandler extends BaseHandler
|
|||
{
|
||||
if (TRACE_LOGGER.isLoggable(Level.WARNING))
|
||||
{
|
||||
System.err.println("");
|
||||
TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "Error (no description returned).");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,8 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
|
|||
Link link = event.getLink();
|
||||
if (link instanceof Receiver)
|
||||
{
|
||||
link.close();
|
||||
|
||||
ErrorCondition condition = link.getRemoteCondition();
|
||||
if (condition != null)
|
||||
{
|
||||
|
@ -116,7 +118,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
|
|||
}
|
||||
}
|
||||
|
||||
this.msgReceiver.onError(condition);
|
||||
this.msgReceiver.onError(condition);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,9 +135,9 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
|
|||
if (TRACE_LOGGER.isLoggable(Level.WARNING))
|
||||
TRACE_LOGGER.log(Level.WARNING, "recvLink.onLinkRemoteDetach: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
|
||||
}
|
||||
|
||||
this.msgReceiver.onError(condition);
|
||||
|
||||
link.close();
|
||||
this.msgReceiver.onError(condition);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -75,6 +75,7 @@ public class SendLinkHandler extends BaseLinkHandler
|
|||
}
|
||||
}
|
||||
|
||||
link.close();
|
||||
this.msgSender.onError(condition);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ public class ReceiverRetryTest extends TestBase
|
|||
server = MockServer.Create(recvFlowHandler);
|
||||
}
|
||||
|
||||
// TODO: @Test
|
||||
@Test
|
||||
public void testRetryWhenReceiveFails() throws Exception
|
||||
{
|
||||
factory = MessagingFactory.createFromConnectionString(
|
||||
|
@ -109,11 +109,11 @@ public class ReceiverRetryTest extends TestBase
|
|||
@After
|
||||
public void cleanup() throws IOException
|
||||
{
|
||||
if (server != null)
|
||||
server.close();
|
||||
|
||||
if (factory != null)
|
||||
factory.close();
|
||||
|
||||
if (server != null)
|
||||
server.close();
|
||||
}
|
||||
|
||||
public class TestData
|
||||
|
|
|
@ -57,11 +57,5 @@ public class MockServer implements Closeable
|
|||
{
|
||||
this.acceptor.close();
|
||||
}
|
||||
|
||||
if (this.reactor != null)
|
||||
{
|
||||
this.reactor.free();
|
||||
this.reactor = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,11 +4,14 @@ import java.util.*;
|
|||
import java.util.logging.Level;
|
||||
|
||||
import org.apache.qpid.proton.Proton;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.*;
|
||||
import org.apache.qpid.proton.engine.*;
|
||||
import org.apache.qpid.proton.message.*;
|
||||
import org.apache.qpid.proton.reactor.Handshaker;
|
||||
|
||||
import com.microsoft.azure.servicebus.amqp.AmqpConstants;
|
||||
|
||||
/**
|
||||
* Sends 1 Msg on the first onLinkFlow event
|
||||
*/
|
||||
|
@ -39,6 +42,10 @@ public class Sender1MsgOnLinkFlowHandler extends ServerTraceHandler
|
|||
Map<String, String> properties = new HashMap<String, String>();
|
||||
properties.put("testkey", "testvalue");
|
||||
msg.setApplicationProperties(new ApplicationProperties(properties));
|
||||
Map<Symbol, Object> annotations = new HashMap<Symbol, Object>();
|
||||
annotations.put(AmqpConstants.Offset, "11111111");
|
||||
MessageAnnotations msgAnnotation = new MessageAnnotations(annotations);
|
||||
msg.setMessageAnnotations(msgAnnotation);
|
||||
int length = msg.encode(bytes, 0, 4 * 1024);
|
||||
|
||||
byte[] tag = String.valueOf(1).getBytes();
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
<repositories>
|
||||
<repository>
|
||||
<id>qpid protonj</id>
|
||||
<url>https://repository.apache.org/content/repositories/orgapacheqpid-1061/</url>
|
||||
<url>https://repository.apache.org/content/repositories/orgapacheqpid-1063/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче