Merge pull request #51 from SreeramGarlapati/javaClient

Java client
This commit is contained in:
Sreeram Garlapati 2016-02-18 00:42:16 -08:00
Родитель 3f6c8a605c a69351fae0
Коммит ccd0d7877f
14 изменённых файлов: 463 добавлений и 242 удалений

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.azure.eventhubs;
import java.io.*;
import java.nio.channels.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
@ -32,8 +33,7 @@ public class EventHubClient extends ClientEntity
{
ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString);
final EventHubClient eventHubClient = new EventHubClient(connStr);
if (isReceiveOnly)
{
return MessagingFactory.createFromConnectionString(connectionString.toString())
@ -133,6 +133,7 @@ public class EventHubClient extends ClientEntity
* @return
* @throws PayloadSizeExceededException if the total size of the {@link EventData} exceeds 256k bytes
* @throws ServiceBusException
* @throws UnresolvedAddressException if there are Client to Service network connectivity issues, if the Azure DNS resolution of the ServiceBus Namespace fails (ex: namespace deleted etc.)
* @see {@link #send(EventData, String)}
* @see {@link EventHubSender#send(EventData)}
*/
@ -168,12 +169,12 @@ public class EventHubClient extends ClientEntity
public final CompletableFuture<Void> send(Iterable<EventData> eventDatas)
throws ServiceBusException
{
if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas.iterator(), 0))
if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas, 0))
{
throw new IllegalArgumentException("Empty batch of EventData cannot be sent.");
}
return this.sender.send(EventDataUtil.toAmqpMessages(eventDatas), null);
return this.sender.send(EventDataUtil.toAmqpMessages(eventDatas));
}
/**
@ -248,7 +249,7 @@ public class EventHubClient extends ClientEntity
String.format(Locale.US, "PartitionKey exceeds the maximum allowed length of partitionKey: {0}", ClientConstants.MaxPartitionKeyLength));
}
return this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey), partitionKey);
return this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey));
}
public final CompletableFuture<PartitionReceiver> createReceiver(final String consumerGroupName, final String partitionId)

Просмотреть файл

@ -57,11 +57,11 @@ public final class EventHubSender
public final CompletableFuture<Void> send(Iterable<EventData> eventDatas)
throws ServiceBusException
{
if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas.iterator(), 0))
if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas, 0))
{
throw new IllegalArgumentException("EventData batch cannot be empty.");
}
return this.internalSender.send(EventDataUtil.toAmqpMessages(eventDatas), null);
return this.internalSender.send(EventDataUtil.toAmqpMessages(eventDatas));
}
}

Просмотреть файл

@ -33,4 +33,6 @@ public final class ClientConstants
public final static String ServiceBusClientTrace = "servicebus.trace";
public final static int AmqpLinkDetachTimeoutInMin = 8;
public final static boolean DefaultIsTransient = true;
}

Просмотреть файл

@ -81,6 +81,6 @@ final class ExceptionUtil
return ServiceBusException.create(false, new AmqpException(errorCondition));
}
return ServiceBusException.create(true, errorCondition.getDescription());
return ServiceBusException.create(ClientConstants.DefaultIsTransient, errorCondition.getDescription());
}
}

Просмотреть файл

@ -8,21 +8,26 @@ public final class IteratorUtil
{
}
public static boolean sizeEquals(Iterator iterator, int expectedSize)
public static <T> boolean sizeEquals(Iterable<T> iterable, int expectedSize)
{
if (expectedSize == 0)
Iterator<T> iterator = iterable.iterator();
int currentSize = 0;
while(iterator.hasNext())
{
return !iterator.hasNext();
}
else if (!iterator.hasNext())
{
return false;
}
else
{
iterator.next();
return sizeEquals(iterator, expectedSize - 1);
if (expectedSize > currentSize)
{
currentSize++;
iterator.next();
continue;
}
else
{
return false;
}
}
return true;
}
public static <T> T getLast(Iterator<T> iterator)

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.servicebus;
import java.io.IOException;
import java.time.*;
import java.time.temporal.*;
import java.util.*;
@ -56,6 +55,7 @@ public class MessageReceiver extends ClientEntity
private boolean linkCreateScheduled;
private Object linkCreateLock;
private Exception lastKnownLinkError;
private MessageReceiver(final MessagingFactory factory,
final String name,
@ -80,6 +80,7 @@ public class MessageReceiver extends ClientEntity
this.linkCreateLock = new Object();
this.receiveHandlerLock = new Object();
this.linkClose = new CompletableFuture<Void>();
this.lastKnownLinkError = null;
if (offset != null)
{
@ -149,11 +150,17 @@ public class MessageReceiver extends ClientEntity
private CompletableFuture<MessageReceiver> createLink()
{
this.receiveLink = this.createReceiveLink(true);
this.linkOpen = new WorkItem<MessageReceiver>(new CompletableFuture<MessageReceiver>(), this.operationTimeout);
this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
this.linkCreateScheduled = true;
Timer.schedule(new Runnable() {
@Override
public void run()
{
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink();
}}, Duration.ofSeconds(0), TimerType.OneTimeRun);
return this.linkOpen.getWork();
}
@ -221,22 +228,25 @@ public class MessageReceiver extends ClientEntity
public void onOpenComplete(Exception exception)
{
synchronized (this.linkOpen)
if (exception == null)
{
if (exception == null)
{
this.lastCommunicatedAt = Instant.now();
this.lastCommunicatedAt = Instant.now();
if (this.linkOpen != null && !this.linkOpen.getWork().isDone())
this.linkOpen.getWork().complete(this);
this.underlyingFactory.links.add(this.receiveLink);
}
else
{
this.linkOpen.getWork().completeExceptionally(exception);
}
this.offsetInclusive = false; // re-open link always starts from the last received offset
this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
this.underlyingFactory.links.add(this.receiveLink);
this.lastKnownLinkError = null;
}
else
{
if (this.linkOpen != null && !this.linkOpen.getWork().isDone())
this.linkOpen.getWork().completeExceptionally(exception);
this.lastKnownLinkError = exception;
}
this.offsetInclusive = false; // re-open link always starts from the last received offset
this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
synchronized (this.linkCreateLock)
{
@ -309,7 +319,11 @@ public class MessageReceiver extends ClientEntity
public void onError(ErrorCondition error)
{
Exception completionException = ExceptionUtil.toException(error);
this.onError(completionException);
}
public void onError(Exception exception)
{
WorkItem<Collection<Message>> currentReceive = this.pendingReceives.peek();
TimeoutTracker currentOperationTracker = currentReceive != null
@ -320,7 +334,7 @@ public class MessageReceiver extends ClientEntity
: (currentOperationTracker.elapsed().compareTo(this.operationTimeout) > 0)
? Duration.ofSeconds(0)
: this.operationTimeout.minus(currentOperationTracker.elapsed());
Duration retryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), completionException, remainingTime);
Duration retryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), exception, remainingTime);
if (retryInterval != null)
{
@ -328,13 +342,9 @@ public class MessageReceiver extends ClientEntity
return;
}
if (!this.linkOpen.getWork().isDone())
{
this.onOpenComplete(completionException);
return;
}
this.onOpenComplete(exception);
if (completionException != null && this.receiveHandler != null)
if (exception != null && this.receiveHandler != null)
{
synchronized (this.receiveHandlerLock)
{
@ -344,10 +354,10 @@ public class MessageReceiver extends ClientEntity
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while receiving from ServiceBus service.",
Instant.now().toString(), this.receiveLink.getName(), this.receivePath, completionException.getClass()));
Instant.now().toString(), this.receiveLink.getName(), this.receivePath, exception.getClass()));
}
this.receiveHandler.onError(completionException);
this.receiveHandler.onError(exception);
}
}
}
@ -357,13 +367,13 @@ public class MessageReceiver extends ClientEntity
while ((workItem = this.pendingReceives.poll()) != null)
{
CompletableFuture<Collection<Message>> future = workItem.getWork();
if (completionException instanceof ServiceBusException && ((ServiceBusException) completionException).getIsTransient())
if (exception instanceof ServiceBusException && ((ServiceBusException) exception).getIsTransient())
{
future.complete(null);
}
else
{
future.completeExceptionally(completionException);
future.completeExceptionally(exception);
}
}
}
@ -377,7 +387,7 @@ public class MessageReceiver extends ClientEntity
}
}
private Receiver createReceiveLink(boolean isConnectionAsync)
private Receiver createReceiveLink()
{
Source source = new Source();
source.setAddress(receivePath);
@ -410,17 +420,21 @@ public class MessageReceiver extends ClientEntity
Connection connection = null;
try {
connection = !isConnectionAsync ? this.underlyingFactory.getConnection()
: this.underlyingFactory.getConnectionAsync().get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
try
{
connection = this.underlyingFactory.getConnectionAsync().get();
}
catch (InterruptedException|ExecutionException exception)
{
Throwable throwable = exception.getCause();
if (throwable != null && throwable instanceof Exception)
{
this.onError((Exception) exception.getCause());
}
return null;
}
Session ssn = connection.session();
String receiveLinkName = this.getClientId();
@ -531,7 +545,12 @@ public class MessageReceiver extends ClientEntity
return;
}
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink(true);
Receiver receiver = MessageReceiver.this.createReceiveLink();
if (receiver != null)
{
MessageReceiver.this.underlyingFactory.links.remove(MessageReceiver.this.receiveLink);
MessageReceiver.this.receiveLink = receiver;
}
synchronized (MessageReceiver.this.linkCreateLock)
{
@ -555,7 +574,11 @@ public class MessageReceiver extends ClientEntity
{
if (!linkOpen.getWork().isDone())
{
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open"));
Exception cause = MessageReceiver.this.lastKnownLinkError;
Exception operationTimedout = ServiceBusException.create(
cause != null && cause instanceof ServiceBusException ? ((ServiceBusException) cause).getIsTransient() : ClientConstants.DefaultIsTransient,
String.format(Locale.US, "ReceiveLink(%s) %s() on path(%s) timed out", MessageReceiver.this.receiveLink.getName(), "Open", MessageReceiver.this.receivePath),
cause);
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,

Просмотреть файл

@ -20,7 +20,6 @@ import org.apache.qpid.proton.amqp.transport.*;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;
import org.omg.PortableInterceptor.ACTIVE;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.Timer;
@ -40,25 +39,35 @@ public class MessageSender extends ClientEntity
private final String sendPath;
private final Duration operationTimeout;
private final RetryPolicy retryPolicy;
private final Object pendingSendWaitersLock;
private final Runnable operationTimer;
private final Duration timerTimeout;
private ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> pendingSendWaiters;
private Sender sendLink;
private CompletableFuture<MessageSender> linkOpen;
private CompletableFuture<MessageSender> linkFirstOpen;
private AtomicLong nextTag;
private TimeoutTracker currentOperationTracker;
private TimeoutTracker openLinkTracker;
private boolean linkCreateScheduled;
private Object linkCreateLock;
private Exception lastKnownLinkError;
public static CompletableFuture<MessageSender> Create(
final MessagingFactory factory,
final String sendLinkName,
final String senderPath)
{
MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath);
SendLinkHandler handler = new SendLinkHandler(msgSender);
BaseHandler.setHandler(msgSender.sendLink, handler);
return msgSender.linkOpen;
final MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath);
msgSender.openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
msgSender.initializeLinkOpen(msgSender.openLinkTracker);
msgSender.linkCreateScheduled = true;
Timer.schedule(new Runnable() {
@Override
public void run()
{
msgSender.sendLink = msgSender.createSendLink();
}
}, Duration.ofSeconds(0), TimerType.OneTimeRun);
return msgSender.linkFirstOpen;
}
private MessageSender(final MessagingFactory factory, final String sendLinkName, final String senderPath)
@ -67,19 +76,50 @@ public class MessageSender extends ClientEntity
this.sendPath = senderPath;
this.underlyingFactory = factory;
this.operationTimeout = factory.getOperationTimeout();
this.pendingSendWaitersLock = new Object();
this.timerTimeout = this.operationTimeout.getSeconds() > 9 ? this.operationTimeout.dividedBy(3) : Duration.ofSeconds(5);
this.lastKnownLinkError = null;
// clone ?
this.retryPolicy = factory.getRetryPolicy();
this.sendLink = this.createSendLink();
this.currentOperationTracker = TimeoutTracker.create(factory.getOperationTimeout());
this.initializeLinkOpen(this.currentOperationTracker);
this.linkCreateScheduled = true;
this.pendingSendWaiters = new ConcurrentHashMap<byte[], ReplayableWorkItem<Void>>();
this.nextTag = new AtomicLong(0);
this.linkCreateLock = new Object();
this.operationTimer = new Runnable()
{
@Override
public void run()
{
if (MessageSender.this.pendingSendWaiters != null)
{
Iterator<Entry<byte[], ReplayableWorkItem<Void>>> pendingDeliveries = MessageSender.this.pendingSendWaiters.entrySet().iterator();
while(pendingDeliveries.hasNext())
{
Entry<byte[], ReplayableWorkItem<Void>> pendingSend = pendingDeliveries.next();
if (pendingSend == null)
{
break;
}
ReplayableWorkItem<Void> pendingSendWork = pendingSend.getValue();
if (pendingSendWork.getTimeoutTracker().remaining().compareTo(ClientConstants.TimerTolerance) < 0)
{
pendingDeliveries.remove();
Exception cause = pendingSendWork.getLastKnownException() == null
? MessageSender.this.lastKnownLinkError : pendingSendWork.getLastKnownException();
pendingSendWork.getWork().completeExceptionally(
ServiceBusException.create(
cause != null && cause instanceof ServiceBusException ? ((ServiceBusException) cause).getIsTransient() : ClientConstants.DefaultIsTransient,
String.format(Locale.US, "Send operation on entity(%s), link(%s) timed out."
, MessageSender.this.getSendPath()
, MessageSender.this.sendLink.getName()),
cause));
}
}
}
}
};
}
public String getSendPath()
@ -87,83 +127,78 @@ public class MessageSender extends ClientEntity
return this.sendPath;
}
public CompletableFuture<Void> send(Message msg, int messageFormat) throws PayloadSizeExceededException
public CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat)
{
if (this.sendLink.getRemoteState() == EndpointState.CLOSED)
{
this.scheduleRecreate(Duration.ofSeconds(0));
}
// TODO: fix allocation per call - use BufferPool
byte[] bytes = new byte[MaxMessageLength];
int encodedSize;
try
{
encodedSize = msg.encode(bytes, 0, MaxMessageLength);
}
catch(BufferOverflowException exception)
{
throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s", MaxMessageLength), exception);
}
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
Delivery dlv = this.sendLink.delivery(tag);
dlv.setMessageFormat(messageFormat);
int sentMsgSize = this.sendLink.send(bytes, 0, encodedSize);
assert sentMsgSize != encodedSize : "Contract of the ProtonJ library for Sender.Send API changed";
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
this.pendingSendWaiters.put(tag, new ReplayableWorkItem<Void>(bytes, sentMsgSize, messageFormat, onSend, this.operationTimeout));
this.sendLink.advance();
return onSend;
return this.send(bytes, arrayOffset, messageFormat, null, null);
}
// accepts even if PartitionKey is null - and hence, the layer above this api is supposed to enforce
public CompletableFuture<Void> send(final Iterable<Message> messages, final String partitionKey)
public CompletableFuture<Void> send(
final byte[] bytes,
final int arrayOffset,
final int messageFormat,
final CompletableFuture<Void> onSend,
final TimeoutTracker tracker)
{
byte[] tag = String.valueOf(this.nextTag.incrementAndGet()).getBytes();
if (this.sendLink.getLocalState() == EndpointState.CLOSED)
{
this.scheduleRecreate(Duration.ofSeconds(0));
}
else
{
Delivery dlv = this.sendLink.delivery(tag);
dlv.setMessageFormat(messageFormat);
int sentMsgSize = this.sendLink.send(bytes, 0, arrayOffset);
assert sentMsgSize != arrayOffset : "Contract of the ProtonJ library for Sender.Send API changed";
this.sendLink.advance();
}
CompletableFuture<Void> onSendFuture = (onSend == null) ? new CompletableFuture<Void>() : onSend;
this.pendingSendWaiters.put(
tag,
new ReplayableWorkItem<Void>(
bytes,
arrayOffset,
messageFormat,
onSendFuture,
tracker == null ? this.operationTimeout : tracker.remaining()));
return onSendFuture;
}
/**
* accepts even if PartitionKey is null - and hence, the code consuming this api is supposed to enforce
*/
public CompletableFuture<Void> send(final Iterable<Message> messages)
throws ServiceBusException
{
if (messages == null || IteratorUtil.sizeEquals(messages.iterator(), 0))
if (messages == null || IteratorUtil.sizeEquals(messages, 0))
{
throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
}
if (IteratorUtil.sizeEquals(messages.iterator(), 1))
Message firstMessage = messages.iterator().next();
if (IteratorUtil.sizeEquals(messages, 1))
{
Message firstMessage = messages.iterator().next();
return this.send(firstMessage);
}
if (this.sendLink.getRemoteState() == EndpointState.CLOSED)
{
this.scheduleRecreate(Duration.ofSeconds(0));
}
// proton-j doesn't support multiple dataSections to be part of AmqpMessage
// here's the alternate approach provided by them: https://github.com/apache/qpid-proton/pull/54
Message batchMessage = Proton.message();
MessageAnnotations messageAnnotations = batchMessage.getMessageAnnotations() == null ? new MessageAnnotations(new HashMap<Symbol, Object>())
: batchMessage.getMessageAnnotations();
messageAnnotations.getValue().put(AmqpConstants.PartitionKey, partitionKey);
batchMessage.setMessageAnnotations(messageAnnotations);
// TODO: fix allocation per call - use BufferPool
batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
byte[] bytes = new byte[MaxMessageLength];
int encodedSize = batchMessage.encode(bytes, 0, MaxMessageLength);
int byteArrayOffset = encodedSize;
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
Delivery dlv = this.sendLink.delivery(tag);
dlv.setMessageFormat(AmqpConstants.AmqpBatchMessageFormat);
for(Message amqpMessage: messages)
{
Message messageWrappedByData = Proton.message();
// TODO: essential optimization
byte[] messageBytes = new byte[MaxMessageLength];
int messageSizeBytes = amqpMessage.encode(messageBytes, 0, MaxMessageLength);
messageWrappedByData.setBody(new Data(new Binary(messageBytes, 0, messageSizeBytes)));
@ -174,28 +209,29 @@ public class MessageSender extends ClientEntity
}
catch(BufferOverflowException exception)
{
// TODO: is it intended for this purpose - else compute msg. size before hand.
dlv.clear();
throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s", MaxMessageLength), exception);
}
byteArrayOffset = byteArrayOffset + encodedSize;
}
int sentMsgSize = this.sendLink.send(bytes, 0, byteArrayOffset);
assert sentMsgSize != byteArrayOffset : "Contract of the ProtonJ library for Sender.Send API changed";
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
this.pendingSendWaiters.put(tag,
new ReplayableWorkItem<Void>(bytes, sentMsgSize, AmqpConstants.AmqpBatchMessageFormat, onSend, this.operationTimeout));
this.sendLink.advance();
return onSend;
return this.send(bytes, byteArrayOffset, AmqpConstants.AmqpBatchMessageFormat);
}
public CompletableFuture<Void> send(Message msg) throws ServiceBusException
{
return this.send(msg, DeliveryImpl.DEFAULT_MESSAGE_FORMAT);
byte[] bytes = new byte[MaxMessageLength];
int encodedSize = 0;
try
{
encodedSize = msg.encode(bytes, 0, MaxMessageLength);
}
catch(BufferOverflowException exception)
{
throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s", MaxMessageLength), exception);
}
return this.send(bytes, encodedSize, DeliveryImpl.DEFAULT_MESSAGE_FORMAT);
}
public void close()
@ -208,18 +244,17 @@ public class MessageSender extends ClientEntity
public void onOpenComplete(Exception completionException)
{
synchronized(this.linkCreateLock)
{
this.linkCreateScheduled = false;
}
if (completionException == null)
{
this.currentOperationTracker = null;
this.openLinkTracker = null;
this.retryPolicy.resetRetryCount(this.getClientId());
if (!this.linkOpen.isDone())
this.underlyingFactory.links.add(this.sendLink);
this.lastKnownLinkError = null;
if (!this.linkFirstOpen.isDone())
{
this.linkOpen.complete(this);
this.linkFirstOpen.complete(this);
Timer.schedule(this.operationTimer, this.timerTimeout, TimerType.RepeatRun);
}
else if (!this.pendingSendWaiters.isEmpty())
{
@ -230,21 +265,16 @@ public class MessageSender extends ClientEntity
unacknowledgedSends.forEachEntry(1, new Consumer<Map.Entry<byte[], ReplayableWorkItem<Void>>>()
{
@Override
public void accept(Entry<byte[], ReplayableWorkItem<Void>> sendWork) {
public void accept(Entry<byte[], ReplayableWorkItem<Void>> sendWork)
{
ReplayableWorkItem<Void> pendingSend = MessageSender.this.pendingSendWaiters.remove(sendWork.getKey());
if (pendingSend != null)
{
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
Delivery dlv = sendLink.delivery(tag);
dlv.setMessageFormat(pendingSend.getMessageFormat());
int sentMsgSize = sendLink.send(pendingSend.getMessage(), 0, pendingSend.getEncodedMessageSize());
assert sentMsgSize != pendingSend.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
pendingSendWaiters.put(tag, new ReplayableWorkItem<Void>(pendingSend.getMessage(), pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), onSend, operationTimeout));
sendLink.advance();
MessageSender.this.send(pendingSend.getMessage(),
pendingSend.getEncodedMessageSize(),
pendingSend.getMessageFormat(),
pendingSend.getWork(),
pendingSend.getTimeoutTracker());
}
}
});
@ -254,36 +284,42 @@ public class MessageSender extends ClientEntity
}
else
{
this.linkOpen.completeExceptionally(completionException);
this.linkFirstOpen.completeExceptionally(completionException);
}
synchronized(this.linkCreateLock)
{
this.linkCreateScheduled = false;
}
}
public void onError(ErrorCondition error)
{
Exception completionException = ExceptionUtil.toException(error);
// if CurrentOpTracker is null - no operation is in progress
Duration remainingTime = this.currentOperationTracker == null
? Duration.ofSeconds(0)
: (this.currentOperationTracker.elapsed().compareTo(this.operationTimeout) > 0)
this.onError(completionException);
}
public void onError(Exception completionException)
{
Duration remainingTime = this.openLinkTracker == null
? this.operationTimeout
: (this.openLinkTracker.elapsed().compareTo(this.operationTimeout) > 0)
? Duration.ofSeconds(0)
: this.operationTimeout.minus(this.currentOperationTracker.elapsed());
: this.operationTimeout.minus(this.openLinkTracker.elapsed());
Duration retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, remainingTime);
if (completionException != null)
{
this.lastKnownLinkError = completionException;
}
if (retryInterval != null)
{
this.scheduleRecreate(retryInterval);
return;
}
synchronized (this.linkOpen)
{
if (!this.linkOpen.isDone())
{
this.onOpenComplete(completionException);
return;
}
}
this.onOpenComplete(completionException);
}
private void scheduleRecreate(Duration runAfter)
@ -304,14 +340,18 @@ public class MessageSender extends ClientEntity
@Override
public void run()
{
if (MessageSender.this.sendLink.getRemoteState() != EndpointState.CLOSED)
if (MessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED)
{
return;
}
MessageSender.this.sendLink = MessageSender.this.createSendLink();
SendLinkHandler handler = new SendLinkHandler(MessageSender.this);
BaseHandler.setHandler(MessageSender.this.sendLink, handler);
Sender sender = MessageSender.this.createSendLink();
if (sender != null)
{
MessageSender.this.underlyingFactory.links.remove(MessageSender.this.sendLink);
MessageSender.this.sendLink = sender;
}
MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId());
}
},
@ -319,35 +359,93 @@ public class MessageSender extends ClientEntity
TimerType.OneTimeRun);
}
public void onSendComplete(byte[] deliveryTag, DeliveryState outcome)
public void onSendComplete(final byte[] deliveryTag, final DeliveryState outcome)
{
ReplayableWorkItem<Void> pendingSendWorkItem = null;
synchronized(this.pendingSendWaitersLock)
{
pendingSendWorkItem = this.pendingSendWaiters.get(deliveryTag);
}
TRACE_LOGGER.log(Level.FINE, String.format("linkName[%s]", this.sendLink.getName()));
ReplayableWorkItem<Void> pendingSendWorkItem = this.pendingSendWaiters.get(deliveryTag);
if (pendingSendWorkItem != null)
{
CompletableFuture<Void> pendingSend = pendingSendWorkItem.getWork();
if (outcome == Accepted.getInstance())
if (outcome instanceof Accepted)
{
this.retryPolicy.resetRetryCount(this.getClientId());
this.pendingSendWaiters.remove(deliveryTag);
pendingSend.complete(null);
}
else if (outcome instanceof Rejected)
{
Rejected rejected = (Rejected) outcome;
ErrorCondition error = rejected.getError();
Exception exception = ExceptionUtil.toException(error);
Duration retryInterval = this.retryPolicy.getNextRetryInterval(
this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining());
if (retryInterval == null)
{
this.pendingSendWaiters.remove(deliveryTag);
pendingSend.completeExceptionally(exception);
}
else
{
pendingSendWorkItem.setLastKnownException(exception);
Timer.schedule(new Runnable()
{
@Override
public void run()
{
MessageSender.this.reSend(deliveryTag);
}
}, retryInterval, TimerType.OneTimeRun);
}
}
else
{
// TODO: enumerate all cases - if we ever return failed delivery from Service - do they translate to exceptions ?
this.pendingSendWaiters.remove(deliveryTag);
pendingSend.completeExceptionally(ServiceBusException.create(false, outcome.toString()));
}
this.pendingSendWaiters.remove(deliveryTag);
}
}
private void reSend(Object deliveryTag)
{
ReplayableWorkItem<Void> pendingSend = this.pendingSendWaiters.remove(deliveryTag);
if (pendingSend != null)
{
byte[] tag = String.valueOf(nextTag.incrementAndGet()).getBytes();
Delivery dlv = this.sendLink.delivery(tag);
dlv.setMessageFormat(pendingSend.getMessageFormat());
int sentMsgSize = this.sendLink.send(pendingSend.getMessage(), 0, pendingSend.getEncodedMessageSize());
assert sentMsgSize != pendingSend.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
CompletableFuture<Void> onSend = new CompletableFuture<Void>();
this.pendingSendWaiters.put(tag,
new ReplayableWorkItem<Void>(pendingSend.getMessage(),
pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), onSend, this.operationTimeout));
this.sendLink.advance();
}
}
private Sender createSendLink()
{
Connection connection = this.underlyingFactory.getConnection();
Connection connection = null;
try
{
// TODO throw it on the appropriate operation
connection = this.underlyingFactory.getConnectionAsync().get();
}
catch (InterruptedException|ExecutionException exception)
{
Throwable throwable = exception.getCause();
if (throwable != null && throwable instanceof Exception)
{
this.onError((Exception) throwable);
return null;
}
}
Session session = connection.session();
session.open();
@ -364,6 +462,9 @@ public class MessageSender extends ClientEntity
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
SendLinkHandler handler = new SendLinkHandler(this);
BaseHandler.setHandler(sender, handler);
sender.open();
return sender;
}
@ -371,7 +472,7 @@ public class MessageSender extends ClientEntity
// TODO: consolidate common-code written for timeouts in Sender/Receiver
private void initializeLinkOpen(TimeoutTracker timeout)
{
this.linkOpen = new CompletableFuture<MessageSender>();
this.linkFirstOpen = new CompletableFuture<MessageSender>();
// timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
Timer.schedule(
@ -379,22 +480,20 @@ public class MessageSender extends ClientEntity
{
public void run()
{
synchronized(MessageSender.this.linkOpen)
if (!MessageSender.this.linkFirstOpen.isDone())
{
if (!MessageSender.this.linkOpen.isDone())
{
Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "Send Link(%s) open() timed out", MessageSender.this.getClientId()));
Exception operationTimedout = ServiceBusException.create(true,
String.format(Locale.US, "SendLink(%s).open() on Entity(%s) timed out",
MessageSender.this.sendLink.getName(), MessageSender.this.getSendPath()));
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "message Sender(linkName: %s, path: %s) open call timedout", MessageSender.this.getClientId(), MessageSender.this.sendPath),
operationTimedout);
}
MessageSender.this.linkOpen.completeExceptionally(operationTimedout);
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "message Sender(linkName: %s, path: %s) open call timedout", MessageSender.this.getClientId(), MessageSender.this.sendPath),
operationTimedout);
}
MessageSender.this.linkFirstOpen.completeExceptionally(operationTimedout);
}
}
}
@ -403,8 +502,13 @@ public class MessageSender extends ClientEntity
}
@Override
public CompletableFuture<Void> closeAsync() {
// TODO Auto-generated method stub
return null;
public CompletableFuture<Void> closeAsync()
{
if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED)
{
this.sendLink.close();
}
return CompletableFuture.completedFuture(null);
}
}

Просмотреть файл

@ -3,6 +3,7 @@ package com.microsoft.azure.servicebus;
import java.io.IOException;
import java.nio.channels.*;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.*;
@ -86,7 +87,8 @@ public class MessagingFactory extends ClientEntity
&& !this.waitingConnectionOpen)
{
this.connection.free();
try {
try
{
this.startReactor(new ReactorHandler() {
@Override
public void onReactorInit(Event e)
@ -166,17 +168,28 @@ public class MessagingFactory extends ClientEntity
{
this.connection.close();
// TODO: Abstract out processOnClose on all links-connections
// dispatch the TransportError to all dependent registered links
for (Link link : this.links)
Iterator<Link> literator = this.links.iterator();
while (literator.hasNext())
{
Receiver receiver = (Receiver) link;
if (receiver!=null)
Link link = literator.next();
if (link instanceof Receiver)
{
Handler handler = BaseHandler.getHandler(receiver);
Handler handler = BaseHandler.getHandler((Receiver) link);
if (handler != null && handler instanceof ReceiveLinkHandler)
{
ReceiveLinkHandler recvLinkHandler = (ReceiveLinkHandler) handler;
recvLinkHandler.processOnClose(receiver, error);
recvLinkHandler.processOnClose(link, error);
}
}
else if(link instanceof Sender)
{
Handler handler = BaseHandler.getHandler((Sender) link);
if (handler != null && handler instanceof SendLinkHandler)
{
SendLinkHandler sendLinkHandler = (SendLinkHandler) handler;
sendLinkHandler.processOnClose(link, error);
}
}
}

Просмотреть файл

@ -11,6 +11,8 @@ public class ReplayableWorkItem<T> extends WorkItem<T>
final private int messageFormat;
final private int encodedMessageSize;
private Exception lastKnownException;
public ReplayableWorkItem(final byte[] amqpMessage, final int encodedMessageSize, final int messageFormat, final CompletableFuture<T> completableFuture, final Duration timeout)
{
super(completableFuture, timeout);
@ -33,4 +35,14 @@ public class ReplayableWorkItem<T> extends WorkItem<T>
{
return this.messageFormat;
}
public Exception getLastKnownException()
{
return this.lastKnownException;
}
public void setLastKnownException(Exception exception)
{
this.lastKnownException = exception;
}
}

Просмотреть файл

@ -1,7 +1,5 @@
package com.microsoft.azure.servicebus;
import java.util.*;
public abstract class ServiceBusException extends Exception
{
private static final long serialVersionUID = -3654294093967132325L;
@ -34,29 +32,40 @@ public abstract class ServiceBusException extends Exception
*/
static ServiceBusException create(final boolean isTransient, final String message)
{
return new InternalServiceBusException(isTransient, message);
return (ServiceBusException) new InternalServiceBusException(isTransient, message);
}
static ServiceBusException create(final boolean isTransient, Exception exception)
static ServiceBusException create(final boolean isTransient, Throwable cause)
{
return new InternalServiceBusException(isTransient, exception);
return (ServiceBusException) new InternalServiceBusException(isTransient, cause);
}
static ServiceBusException create(final boolean isTransient, final String message, final Throwable cause)
{
return (ServiceBusException) new InternalServiceBusException(isTransient, message, cause);
}
private static final class InternalServiceBusException extends ServiceBusException
{
boolean isTransient;
public InternalServiceBusException(final boolean isTransient, final String message)
private InternalServiceBusException(final boolean isTransient, final String message)
{
super(message);
this.isTransient = isTransient;
}
public InternalServiceBusException(final boolean isTransient, Exception exception)
private InternalServiceBusException(final boolean isTransient, final Throwable exception)
{
super(exception);
this.isTransient = isTransient;
}
private InternalServiceBusException(final boolean isTransient, final String message, final Throwable cause)
{
super(message, cause);
this.isTransient = isTransient;
}
@Override
public boolean getIsTransient()

Просмотреть файл

@ -14,12 +14,16 @@ public final class Timer
{
}
public static void schedule(Runnable runnable, Duration runAfter, TimerType timerType)
public static void schedule(Runnable runnable, Duration runFrequency, TimerType timerType)
{
switch (timerType)
{
case OneTimeRun:
executor.schedule(runnable, runAfter.getSeconds(), TimeUnit.SECONDS);
executor.schedule(runnable, runFrequency.getSeconds(), TimeUnit.SECONDS);
break;
case RepeatRun:
executor.scheduleWithFixedDelay(runnable, runFrequency.getSeconds(), runFrequency.getSeconds(), TimeUnit.SECONDS);
break;
default:

Просмотреть файл

@ -74,14 +74,14 @@ public final class ConnectionHandler extends BaseHandler
{
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "Error: " + condition.getDescription());
TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "], error[" + condition.getDescription() + "]");
}
}
else
{
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "Error (no description returned).");
TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "], error[no description returned]");
}
}

Просмотреть файл

@ -96,11 +96,11 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
if (link instanceof Receiver)
{
ErrorCondition condition = link.getRemoteCondition();
this.processOnClose((Receiver) link, condition);
this.processOnClose(link, condition);
}
}
public void processOnClose(Receiver link, ErrorCondition condition)
public void processOnClose(Link link, ErrorCondition condition)
{
link.close();

Просмотреть файл

@ -1,5 +1,6 @@
package com.microsoft.azure.servicebus.amqp;
import java.util.Locale;
import java.util.logging.*;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@ -20,21 +21,56 @@ public class SendLinkHandler extends BaseLinkHandler
this.firstFlow = new Object();
this.isFirstFlow = true;
}
@Override
public void onLinkRemoteOpen(Event event)
{
Link link = event.getLink();
if (link != null && link instanceof Sender)
{
Sender sender = (Sender) link;
if (link.getRemoteTarget() != null)
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, String.format(Locale.US, "linkName[%s], remoteTarget[%s]", sender.getName(), link.getRemoteTarget()));
}
synchronized (this.firstFlow)
{
this.isFirstFlow = false;
this.msgSender.onOpenComplete(null);
}
}
else
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format(Locale.US, "linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", sender.getName()));
}
}
}
}
@Override
public void onDelivery(Event event)
{
Sender sender = (Sender) event.getLink();
Delivery delivery = event.getDelivery();
Sender sender = (Sender) delivery.getLink();
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "sendLink.onDelivery: name["+sender.getName()+"] : unsettled[" + sender.getUnsettled() + "] : credit[" + sender.getCredit()+ "]");
TRACE_LOGGER.log(Level.FINE,
"linkName[" + sender.getName() +
"], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit()+ "], deliveryState[" + delivery.getRemoteState() +
"], delivery.isBuffered[" + delivery.isBuffered() +"], delivery.id[" + delivery.getTag() + "]");
}
Delivery delivery = event.getDelivery();
if (delivery != null)
while (delivery != null)
{
msgSender.onSendComplete(delivery.getTag(), delivery.getRemoteState());
delivery.settle();
delivery = sender.current();
}
}
@ -56,7 +92,7 @@ public class SendLinkHandler extends BaseLinkHandler
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
Sender sender = (Sender) event.getLink();
TRACE_LOGGER.log(Level.FINE, "sendLink.onFlow: name[" + sender.getName() + "] : unsettled[" + sender.getUnsettled() + "] : credit[" + sender.getCredit()+ "]");
TRACE_LOGGER.log(Level.FINE, "linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit()+ "]");
}
}
@ -67,16 +103,28 @@ public class SendLinkHandler extends BaseLinkHandler
if (link instanceof Sender)
{
ErrorCondition condition = link.getRemoteCondition();
if (condition != null)
{
if(TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "sendLink.onLinkRemoteClose: name[" + link.getName() + "] : ErrorCondition[" + condition.getDescription() + "]");
}
}
link.close();
this.msgSender.onError(condition);
this.processOnClose(link, condition);
}
}
@Override
public void onLinkRemoteDetach(Event event)
{
this.onLinkRemoteClose(event);
}
// TODO: abstract this out to an interface - as a connection-child
public void processOnClose(Link link, ErrorCondition condition)
{
if (condition != null)
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "linkName[" + link.getName() + "], ErrorCondition[" + condition.getDescription() + "]");
}
}
link.close();
this.msgSender.onError(condition);
}
}