Fix Sender stuck & timeout issue

This commit is contained in:
Sreeram Garlapati 2016-02-16 17:31:07 -08:00
Родитель 8cd79a035c
Коммит 1713319c14
4 изменённых файлов: 98 добавлений и 30 удалений

Просмотреть файл

@ -40,7 +40,6 @@ public class MessageSender extends ClientEntity
private final String sendPath;
private final Duration operationTimeout;
private final RetryPolicy retryPolicy;
private final Object pendingSendWaitersLock;
private ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> pendingSendWaiters;
private Sender sendLink;
@ -67,7 +66,6 @@ public class MessageSender extends ClientEntity
this.sendPath = senderPath;
this.underlyingFactory = factory;
this.operationTimeout = factory.getOperationTimeout();
this.pendingSendWaitersLock = new Object();
// clone ?
this.retryPolicy = factory.getRetryPolicy();
@ -230,7 +228,8 @@ public class MessageSender extends ClientEntity
unacknowledgedSends.forEachEntry(1, new Consumer<Map.Entry<byte[], ReplayableWorkItem<Void>>>()
{
@Override
public void accept(Entry<byte[], ReplayableWorkItem<Void>> sendWork) {
public void accept(Entry<byte[], ReplayableWorkItem<Void>> sendWork)
{
ReplayableWorkItem<Void> pendingSend = MessageSender.this.pendingSendWaiters.remove(sendWork.getKey());
if (pendingSend != null)
{
@ -319,22 +318,45 @@ public class MessageSender extends ClientEntity
TimerType.OneTimeRun);
}
public void onSendComplete(byte[] deliveryTag, DeliveryState outcome)
public void onSendComplete(final byte[] deliveryTag, final DeliveryState outcome)
{
ReplayableWorkItem<Void> pendingSendWorkItem = null;
synchronized(this.pendingSendWaitersLock)
{
pendingSendWorkItem = this.pendingSendWaiters.get(deliveryTag);
}
pendingSendWorkItem = this.pendingSendWaiters.get(deliveryTag);
if (pendingSendWorkItem != null)
{
CompletableFuture<Void> pendingSend = pendingSendWorkItem.getWork();
if (outcome == Accepted.getInstance())
if (outcome instanceof Accepted)
{
this.retryPolicy.resetRetryCount(this.getClientId());
pendingSend.complete(null);
}
else if (outcome instanceof Rejected)
{
Rejected rejected = (Rejected) outcome;
ErrorCondition error = rejected.getError();
Exception exception = ExceptionUtil.toException(error);
Duration retryInterval = this.retryPolicy.getNextRetryInterval(
this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining());
if (retryInterval == null)
{
pendingSend.completeExceptionally(exception);
}
else
{
Timer.schedule(new Runnable()
{
@Override
public void run()
{
MessageSender.this.reSend(deliveryTag);
}
}, retryInterval, TimerType.OneTimeRun);
return;
}
}
else
{
// TODO: enumerate all cases - if we ever return failed delivery from Service - do they translate to exceptions ?
@ -345,6 +367,27 @@ public class MessageSender extends ClientEntity
}
}
private void reSend(Object deliveryTag)
{
ReplayableWorkItem<Void> pendingSend = this.pendingSendWaiters.remove(deliveryTag);
if (pendingSend != null)
{
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
Delivery dlv = this.sendLink.delivery(tag);
dlv.setMessageFormat(pendingSend.getMessageFormat());
int sentMsgSize = this.sendLink.send(pendingSend.getMessage(), 0, pendingSend.getEncodedMessageSize());
assert sentMsgSize != pendingSend.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
this.pendingSendWaiters.put(tag,
new ReplayableWorkItem<Void>(pendingSend.getMessage(),
pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), onSend, operationTimeout));
this.sendLink.advance();
}
}
private Sender createSendLink()
{
Connection connection = this.underlyingFactory.getConnection();
@ -403,7 +446,8 @@ public class MessageSender extends ClientEntity
}
@Override
public CompletableFuture<Void> closeAsync() {
public CompletableFuture<Void> closeAsync()
{
// TODO Auto-generated method stub
return null;
}

Просмотреть файл

@ -169,14 +169,22 @@ public class MessagingFactory extends ClientEntity
// dispatch the TransportError to all dependent registered links
for (Link link : this.links)
{
Receiver receiver = (Receiver) link;
if (receiver!=null)
if (link instanceof Receiver)
{
Handler handler = BaseHandler.getHandler(receiver);
Handler handler = BaseHandler.getHandler((Receiver) link);
if (handler != null && handler instanceof ReceiveLinkHandler)
{
ReceiveLinkHandler recvLinkHandler = (ReceiveLinkHandler) handler;
recvLinkHandler.processOnClose(receiver, error);
recvLinkHandler.processOnClose(link, error);
}
}
else if(link instanceof Sender)
{
Handler handler = BaseHandler.getHandler((Sender) link);
if (handler != null && handler instanceof ReceiveLinkHandler)
{
SendLinkHandler sendLinkHandler = (SendLinkHandler) handler;
sendLinkHandler.processOnClose(link, error);
}
}
}

Просмотреть файл

@ -96,11 +96,11 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
if (link instanceof Receiver)
{
ErrorCondition condition = link.getRemoteCondition();
this.processOnClose((Receiver) link, condition);
this.processOnClose(link, condition);
}
}
public void processOnClose(Receiver link, ErrorCondition condition)
public void processOnClose(Link link, ErrorCondition condition)
{
link.close();

Просмотреть файл

@ -25,12 +25,16 @@ public class SendLinkHandler extends BaseLinkHandler
public void onDelivery(Event event)
{
Sender sender = (Sender) event.getLink();
Delivery delivery = event.getDelivery();
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "sendLink.onDelivery: name["+sender.getName()+"] : unsettled[" + sender.getUnsettled() + "] : credit[" + sender.getCredit()+ "]");
TRACE_LOGGER.log(Level.FINE,
"linkName[" + sender.getName() +
"], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit()+ "], deliveryState[" + delivery.getRemoteState() +
"], delivery.isBuffered[" + delivery.isBuffered() +"]");
}
Delivery delivery = event.getDelivery();
if (delivery != null)
{
msgSender.onSendComplete(delivery.getTag(), delivery.getRemoteState());
@ -56,7 +60,7 @@ public class SendLinkHandler extends BaseLinkHandler
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
Sender sender = (Sender) event.getLink();
TRACE_LOGGER.log(Level.FINE, "sendLink.onFlow: name[" + sender.getName() + "] : unsettled[" + sender.getUnsettled() + "] : credit[" + sender.getCredit()+ "]");
TRACE_LOGGER.log(Level.FINE, "linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit()+ "]");
}
}
@ -67,16 +71,28 @@ public class SendLinkHandler extends BaseLinkHandler
if (link instanceof Sender)
{
ErrorCondition condition = link.getRemoteCondition();
if (condition != null)
{
if(TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "sendLink.onLinkRemoteClose: name[" + link.getName() + "] : ErrorCondition[" + condition.getDescription() + "]");
}
}
link.close();
this.msgSender.onError(condition);
this.processOnClose(link, condition);
}
}
@Override
public void onLinkRemoteDetach(Event event)
{
this.onLinkRemoteClose(event);
}
// TODO: abstract this out to an interface - as a connection-child
public void processOnClose(Link link, ErrorCondition condition)
{
if (condition != null)
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "linkName[" + link.getName() + "], ErrorCondition[" + condition.getDescription() + "]");
}
}
link.close();
this.msgSender.onError(condition);
}
}