check ConsumerGroup Argument in createReceiver API
Append TrackingID to link name
This commit is contained in:
Sreeram Garlapati 2016-02-10 20:16:49 -08:00
Родитель 59650c71e8
Коммит b69510f101
12 изменённых файлов: 187 добавлений и 78 удалений

Просмотреть файл

@ -15,7 +15,7 @@ public class EventHubClient extends ClientEntity
{
public static final String DefaultConsumerGroupName = "$Default";
private final MessagingFactory underlyingFactory;
private MessagingFactory underlyingFactory;
private final String eventHubName;
private MessageSender sender;
@ -23,7 +23,6 @@ public class EventHubClient extends ClientEntity
private EventHubClient(ConnectionStringBuilder connectionString) throws IOException, IllegalEntityException
{
super(UUID.randomUUID().toString());
this.underlyingFactory = MessagingFactory.createFromConnectionString(connectionString.toString());
this.eventHubName = connectionString.getEntityPath();
}
@ -34,21 +33,40 @@ public class EventHubClient extends ClientEntity
ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString);
final EventHubClient eventHubClient = new EventHubClient(connStr);
if (isReceiveOnly)
{
return CompletableFuture.completedFuture(eventHubClient);
return MessagingFactory.createFromConnectionString(connectionString.toString())
.thenApplyAsync(new Function<MessagingFactory, EventHubClient>()
{
@Override
public EventHubClient apply(MessagingFactory factory)
{
eventHubClient.underlyingFactory = factory;
return eventHubClient;
}
});
}
else
{
return eventHubClient.createInternalSender()
.thenApplyAsync(new Function<Void, EventHubClient>()
{
@Override
public EventHubClient apply(Void a)
return MessagingFactory.createFromConnectionString(connectionString.toString())
.thenComposeAsync(new Function<MessagingFactory, CompletableFuture<EventHubClient>>()
{
return eventHubClient;
}
});
@Override
public CompletableFuture<EventHubClient> apply(MessagingFactory factory)
{
eventHubClient.underlyingFactory = factory;
return eventHubClient.createInternalSender()
.thenApplyAsync(new Function<Void, EventHubClient>()
{
@Override
public EventHubClient apply(Void a)
{
return eventHubClient;
}
});
}
});
}
}
@ -66,7 +84,7 @@ public class EventHubClient extends ClientEntity
return EventHubClient.createFromConnectionString(connectionString, false);
}
CompletableFuture<Void> createInternalSender() throws IllegalEntityException
CompletableFuture<Void> createInternalSender()
{
return MessageSender.Create(this.underlyingFactory, UUID.randomUUID().toString(), this.eventHubName)
.thenAcceptAsync(new Consumer<MessageSender>()

Просмотреть файл

@ -60,6 +60,11 @@ public final class PartitionReceiver
final boolean isEpochReceiver)
throws ServiceBusException
{
if (StringUtil.isNullOrWhiteSpace(consumerGroupName))
{
throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'");
}
final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver);
return receiver.createInternalReceiver().thenApplyAsync(new Function<Void, PartitionReceiver>()
{

Просмотреть файл

@ -24,4 +24,15 @@ public final class IteratorUtil
return sizeEquals(iterator, expectedSize - 1);
}
}
public static <T> T getLast(Iterator<T> iterator)
{
T last = null;
while(iterator.hasNext())
{
last = iterator.next();
}
return last;
}
}

Просмотреть файл

@ -34,6 +34,7 @@ public class MessageReceiver extends ClientEntity
private final String receivePath;
private final Runnable onOperationTimedout;
private final Duration operationTimeout;
private final Object prefetchedMessagesLock;
private ConcurrentLinkedQueue<Message> prefetchedMessages;
private Receiver receiveLink;
@ -74,7 +75,7 @@ public class MessageReceiver extends ClientEntity
{
MessageReceiver msgReceiver = new MessageReceiver(factory, name, recvPath, offset, offsetInclusive, dateTime, prefetchCount, epoch, isEpochReceiver);
ReceiveLinkHandler handler = new ReceiveLinkHandler(name, msgReceiver);
ReceiveLinkHandler handler = new ReceiveLinkHandler(msgReceiver);
BaseHandler.setHandler(msgReceiver.receiveLink, handler);
return msgReceiver.linkOpen;
@ -103,7 +104,8 @@ public class MessageReceiver extends ClientEntity
this.linkCreateLock = new Object();
this.receiveHandlerLock = new Object();
this.linkClose = new CompletableFuture<Void>();
this.prefetchedMessagesLock = new Object();
if (offset != null)
{
this.lastReceivedOffset = offset;
@ -174,15 +176,16 @@ public class MessageReceiver extends ClientEntity
if (!this.prefetchedMessages.isEmpty())
{
synchronized (this.prefetchedMessages)
synchronized (this.prefetchedMessagesLock)
{
if (!this.prefetchedMessages.isEmpty())
{
// return all available msgs to application-layer and send 'link-flow' frame for prefetch
Collection<Message> returnMessages = this.prefetchedMessages;
Queue<Message> returnMessages = this.prefetchedMessages;
this.prefetchedMessages = new ConcurrentLinkedQueue<Message>();
this.sendFlow(returnMessages.size());
return CompletableFuture.completedFuture(returnMessages);
this.lastReceivedOffset = IteratorUtil.getLast(returnMessages.iterator()).getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
return CompletableFuture.completedFuture((Collection<Message>) returnMessages);
}
}
}
@ -281,23 +284,25 @@ public class MessageReceiver extends ClientEntity
}
}
else
{
synchronized (this.pendingReceives)
{
WorkItem<Collection<Message>> currentReceive = this.pendingReceives.poll();
if (currentReceive == null)
{
if (this.pendingReceives.isEmpty())
synchronized (this.prefetchedMessagesLock)
{
this.prefetchedMessages.addAll(messages);
this.currentOperationTracker = null;
}
else
{
WorkItem<Collection<Message>> currentReceive = this.pendingReceives.poll();
this.currentOperationTracker = this.pendingReceives.peek() != null ? this.pendingReceives.peek().getTimeoutTracker() : null;
currentReceive.getWork().complete(messages);
this.sendFlow(messages.size());
}
}
else
{
WorkItem<Collection<Message>> topPendingReceive = this.pendingReceives.peek();
this.currentOperationTracker = topPendingReceive != null ? topPendingReceive.getTimeoutTracker() : null;
this.sendFlow(messages.size());
this.lastReceivedOffset = messages.getLast().getMessageAnnotations().getValue().get(AmqpConstants.Offset).toString();
currentReceive.getWork().complete(messages);
}
}
@ -341,7 +346,7 @@ public class MessageReceiver extends ClientEntity
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "%s: LinkName (%s), receiverpath (%s): encountered Exception (%s) while receiving from ServiceBus service.",
Instant.now().toString(), this.getClientId(), this.receivePath, completionException.getClass()));
Instant.now().toString(), this.receiveLink.getName(), this.receivePath, completionException.getClass()));
}
this.receiveHandler.onError(completionException);
@ -405,9 +410,12 @@ public class MessageReceiver extends ClientEntity
Map<Symbol, UnknownDescribedType> filterMap = Collections.singletonMap(AmqpConstants.StringFilter, filter);
source.setFilter(filterMap);
Session ssn = this.underlyingFactory.getConnection().session();
Connection connection = this.underlyingFactory.getConnection();
Session ssn = connection.session();
Receiver receiver = ssn.receiver(name);
String receiveLinkName = this.getClientId();
receiveLinkName = receiveLinkName.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer());
Receiver receiver = ssn.receiver(receiveLinkName);
receiver.setSource(source);
receiver.setTarget(new Target());
@ -433,24 +441,31 @@ public class MessageReceiver extends ClientEntity
{
if (this.receiveLink.getLocalState() == EndpointState.ACTIVE)
{
synchronized(this.pingFlowCount)
if (this.pingFlowCount.get() != 0)
{
if (this.pingFlowCount.get() < credits)
synchronized(this.pingFlowCount)
{
this.receiveLink.flow(credits - this.pingFlowCount.get());
this.pingFlowCount.set(0);
}
else
{
this.pingFlowCount.set(this.pingFlowCount.get() - credits);
if (this.pingFlowCount.get() < credits)
{
this.receiveLink.flow(credits - this.pingFlowCount.get());
this.pingFlowCount.set(0);
}
else
{
this.pingFlowCount.set(this.pingFlowCount.get() - credits);
}
}
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format("MessageReceiver.sendFlow (linkname: %s), updated-link-credit: %s", this.receiveLink.getName(), this.receiveLink.getCredit()));
}
}
else
{
this.receiveLink.flow(credits);
}
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format("MessageReceiver.sendFlow (linkname: %s), updated-link-credit: %s", this.receiveLink.getName(), this.receiveLink.getCredit()));
}
}
else
{
@ -499,7 +514,7 @@ public class MessageReceiver extends ClientEntity
public void run()
{
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink();
ReceiveLinkHandler handler = new ReceiveLinkHandler(name, MessageReceiver.this);
ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this);
BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler);
MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId());
}
@ -521,11 +536,11 @@ public class MessageReceiver extends ClientEntity
{
if (!linkOpen.isDone())
{
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, "Open"));
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Open"));
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, "Open"),
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"),
operationTimedout);
}
@ -550,11 +565,11 @@ public class MessageReceiver extends ClientEntity
{
if (!linkClose.isDone())
{
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, "Close"));
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Close"));
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, "Close"),
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Close"),
operationTimedout);
}

Просмотреть файл

@ -51,10 +51,10 @@ public class MessageSender extends ClientEntity
public static CompletableFuture<MessageSender> Create(
final MessagingFactory factory,
final String sendLinkName,
final String senderPath) throws IllegalEntityException
final String senderPath)
{
MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath);
SendLinkHandler handler = new SendLinkHandler(sendLinkName, msgSender);
SendLinkHandler handler = new SendLinkHandler(msgSender);
BaseHandler.setHandler(msgSender.sendLink, handler);
return msgSender.linkOpen;
}
@ -302,7 +302,7 @@ public class MessageSender extends ClientEntity
public void run()
{
MessageSender.this.sendLink = MessageSender.this.createSendLink();
SendLinkHandler handler = new SendLinkHandler(MessageSender.this.getClientId(), MessageSender.this);
SendLinkHandler handler = new SendLinkHandler(MessageSender.this);
BaseHandler.setHandler(MessageSender.this.sendLink, handler);
MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId());
}
@ -342,7 +342,9 @@ public class MessageSender extends ClientEntity
Session session = connection.session();
session.open();
Sender sender = session.sender(this.getClientId());
String sendLinkName = this.getClientId();
sendLinkName = sendLinkName.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer());
Sender sender = session.sender(sendLinkName);
Target target = new Target();
target.setAddress(this.sendPath);
@ -374,6 +376,7 @@ public class MessageSender extends ClientEntity
{
Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "Send Link(%s) open() timed out", MessageSender.this.getClientId()));
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.azure.servicebus;
import java.io.IOException;
import java.nio.channels.*;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@ -35,6 +36,7 @@ public class MessagingFactory extends ClientEntity
private Duration operationTimeout;
private RetryPolicy retryPolicy;
private CompletableFuture<MessagingFactory> open;
/**
* @param reactor parameter reactor is purely for testing purposes and the SDK code should always set it to null
@ -45,12 +47,11 @@ public class MessagingFactory extends ClientEntity
this.startReactor();
this.connection = this.createConnection(builder);
this.operationTimeout = builder.getOperationTimeout();
this.retryPolicy = builder.getRetryPolicy();
}
private Connection createConnection(ConnectionStringBuilder builder)
private void createConnection(ConnectionStringBuilder builder)
{
synchronized (this.reactorLock)
{
@ -58,7 +59,8 @@ public class MessagingFactory extends ClientEntity
ConnectionHandler connectionHandler = new ConnectionHandler(this, builder.getEndpoint().getHost(), builder.getSasKeyName(), builder.getSasKey());
this.waitingConnectionOpen = true;
return reactor.connection(connectionHandler);
this.connection = reactor.connection(connectionHandler);
this.open = new CompletableFuture<MessagingFactory>();
}
}
@ -70,12 +72,13 @@ public class MessagingFactory extends ClientEntity
{
this.reactor = Proton.reactor(new ReactorHandler());
this.reactorThread = new Thread(new RunReactor(this.reactor));
this.reactorThread = new Thread(new RunReactor(this, this.reactor));
this.reactorThread.start();
}
}
}
// Todo: async
Connection getConnection()
{
if (this.connection.getLocalState() != EndpointState.ACTIVE)
@ -106,18 +109,29 @@ public class MessagingFactory extends ClientEntity
return this.retryPolicy;
}
public static MessagingFactory createFromConnectionString(final String connectionString) throws IOException
public static CompletableFuture<MessagingFactory> createFromConnectionString(final String connectionString) throws IOException
{
ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
return new MessagingFactory(builder);
MessagingFactory messagingFactory = new MessagingFactory(builder);
messagingFactory.createConnection(builder);
return messagingFactory.open;
}
// Contract: ConnectionHandler - MessagingFactory
public void onOpenComplete()
public void onOpenComplete(Exception exception)
{
synchronized (this.connection)
{
this.waitingConnectionOpen = false;
if (exception == null)
{
this.open.complete(this);
}
else
{
this.open.completeExceptionally(exception);
}
}
}
@ -139,10 +153,12 @@ public class MessagingFactory extends ClientEntity
public static class RunReactor implements Runnable
{
private Reactor r;
private MessagingFactory messagingFactory;
public RunReactor(Reactor r)
public RunReactor(MessagingFactory owner, Reactor r)
{
this.r = r;
this.messagingFactory = owner;
}
public void run()
@ -158,9 +174,17 @@ public class MessagingFactory extends ClientEntity
}
catch (HandlerException handlerException)
{
if (handlerException.getCause() != null && handlerException.getCause() instanceof UnresolvedAddressException)
{
UnresolvedAddressException unresolvedAddressException = (UnresolvedAddressException) handlerException.getCause();
this.messagingFactory.onOpenComplete(unresolvedAddressException);
return;
}
if(TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "UnHandled exception while processing events in reactor:" + handlerException.toString());
TRACE_LOGGER.log(Level.WARNING, "UnHandled exception while processing events in reactor: " + handlerException.toString());
handlerException.printStackTrace();
}
}
}

Просмотреть файл

@ -0,0 +1,24 @@
package com.microsoft.azure.servicebus;
public final class TrackingUtil
{
public static final String TRACKING_ID_TOKEN_SEPARATOR = "_";
private TrackingUtil()
{
}
/**
* parses ServiceBus role identifiers from trackingId
* @return null if no roleIdentifier found
*/
public static String parseRoleIdentifier(final String trackingId)
{
if (StringUtil.isNullOrWhiteSpace(trackingId) || !trackingId.contains(TRACKING_ID_TOKEN_SEPARATOR))
{
return null;
}
return trackingId.substring(trackingId.indexOf(TRACKING_ID_TOKEN_SEPARATOR));
}
}

Просмотреть файл

@ -34,7 +34,7 @@ public final class ConnectionHandler extends BaseHandler
this.password = password;
this.messagingFactory = messagingFactory;
}
@Override
public void onConnectionBound(Event event)
{
@ -46,6 +46,15 @@ public final class ConnectionHandler extends BaseHandler
Sasl sasl = transport.sasl();
sasl.plain(this.username, this.password);
}
@Override
public void onConnectionUnbound(Event event)
{
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "Connection.onConnectionUnbound: hostname[" + event.getConnection().getHostname() + "]");
}
}
@Override
public void onTransportError(Event event)
@ -80,8 +89,12 @@ public final class ConnectionHandler extends BaseHandler
@Override
public void onConnectionRemoteOpen(Event event)
{
Connection connection = event.getConnection();
this.messagingFactory.onOpenComplete();
if (TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "Connection.onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + "]");
}
this.messagingFactory.onOpenComplete(null);
}
private static SslDomain makeDomain(SslDomain.Mode mode)

Просмотреть файл

@ -26,14 +26,12 @@ import com.microsoft.azure.servicebus.MessageReceiver;
*/
public final class ReceiveLinkHandler extends BaseLinkHandler
{
private final String name;
private final MessageReceiver msgReceiver;
private final Object firstResponse;
private boolean isFirstResponse;
public ReceiveLinkHandler(final String name, final MessageReceiver receiver)
public ReceiveLinkHandler(final MessageReceiver receiver)
{
this.name = name;
this.msgReceiver = receiver;
this.firstResponse = new Object();
this.isFirstResponse = true;
@ -50,7 +48,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format("ReceiveLinkHandler(name: %s) initial credit: %s", this.name, receiver.getCredit()));
String.format("ReceiveLinkHandler(name: %s) initial credit: %s", receiver.getName(), receiver.getCredit()));
}
}
}
@ -67,7 +65,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format("ReceiveLinkHandler(name: %s) RemoteSource: %s", this.name, link.getRemoteSource()));
String.format("ReceiveLinkHandler(name: %s) RemoteSource: %s", receiver.getName(), link.getRemoteSource()));
}
synchronized (this.firstResponse)
@ -81,7 +79,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format("ReceiveLinkHandler(name: %s): remote Target Source set to null. waiting for error.", this.name));
String.format("ReceiveLinkHandler(name: %s): remote Target Source set to null. waiting for error.", receiver.getName()));
}
}

Просмотреть файл

@ -10,14 +10,12 @@ import com.microsoft.azure.servicebus.MessageSender;
public class SendLinkHandler extends BaseLinkHandler
{
private final String name;
private final MessageSender msgSender;
private final Object firstFlow;
private boolean isFirstFlow;
public SendLinkHandler(final String name, final MessageSender sender)
public SendLinkHandler(final MessageSender sender)
{
this.name = name;
this.msgSender = sender;
this.firstFlow = new Object();
this.isFirstFlow = true;

Просмотреть файл

@ -92,7 +92,7 @@ public class ReceiverRetryTest extends TestBase
public void testRetryWhenReceiveFails() throws Exception
{
factory = MessagingFactory.createFromConnectionString(
new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString());
new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString()).get();
MessageReceiver receiver = MessageReceiver.create(factory,
"receiver1", "eventhub1/consumergroups/$default/partitions/0", "-1", false, null, 100, 0, false).get();

Просмотреть файл

@ -17,7 +17,7 @@ public class TimeoutExceptionTest
{
MockServer server = MockServer.Create(null);
MessagingFactory factory = MessagingFactory.createFromConnectionString(
new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString());
new ConnectionStringBuilder("Endpoint=amqps://localhost;SharedAccessKeyName=somename;EntityPath=eventhub1;SharedAccessKey=somekey").toString()).get();
try
{