This commit is contained in:
Sreeram Garlapati 2015-12-31 13:52:13 -08:00
Родитель 52968da1d5
Коммит 4abe07996e
9 изменённых файлов: 168 добавлений и 66 удалений

Просмотреть файл

@ -174,7 +174,7 @@ public class EventData implements AutoCloseable
return this.properties;
}
public void setProperties(Map applicationProperties)
public void setProperties(Map<String, String> applicationProperties)
{
this.properties = applicationProperties;
}

Просмотреть файл

@ -13,7 +13,7 @@ import com.microsoft.azure.servicebus.*;
/**
* Anchor class - all EventHub client operations STARTS here.
*/
public class EventHubClient
public class EventHubClient extends ClientEntity
{
public static final String DefaultConsumerGroupName = "$Default";
@ -24,6 +24,7 @@ public class EventHubClient
private EventHubClient(ConnectionStringBuilder connectionString) throws IOException, EntityNotFoundException
{
super(UUID.randomUUID().toString());
this.underlyingFactory = MessagingFactory.createFromConnectionString(connectionString.toString());
this.eventHubName = connectionString.getEntityPath();
}

Просмотреть файл

@ -6,7 +6,8 @@ public final class AmqpConstants {
private AmqpConstants() { }
public static final String Vendor = "com.microsoft";
public static final String Apache = "apache.org";
public static final String Vendor = "com.microsoft";
public static final String OffsetName = "x-opt-offset";
@ -14,4 +15,7 @@ public final class AmqpConstants {
public static final Symbol Offset = Symbol.getSymbol(AmqpConstants.OffsetName);
public static final Symbol SequenceNumber = Symbol.getSymbol("x-opt-sequence-number");
public static final Symbol EnqueuedTimeUtc = Symbol.getSymbol("x-opt-enqueued-time");
public static final Symbol StringFilter = Symbol.valueOf(AmqpConstants.Apache + ":selector-filter:string");
public static final Symbol Epoch = Symbol.valueOf(AmqpConstants.Vendor + ":epoch");
}

Просмотреть файл

@ -2,8 +2,18 @@ package com.microsoft.azure.servicebus;
// TODO: contract for all client entities with Open-Close/Abort state m/c
// main-purpose: closeAll related entities
public abstract class ClientEntity {
public abstract class ClientEntity
{
private String clientId;
protected ClientEntity(final String clientId)
{
this.clientId = clientId;
}
public abstract void close();
public String getClientId()
{
return this.clientId;
}
}

Просмотреть файл

@ -18,8 +18,8 @@ import org.apache.qpid.proton.reactor.Reactor;
* translates event-driven reactor model into async receive Api
* Manage reconnect? - store currentConsumedOffset
*/
public class MessageReceiver extends ClientEntity {
public class MessageReceiver extends ClientEntity
{
private final int prefetchCount;
private final ConcurrentLinkedQueue<CompletableFuture<Collection<Message>>> pendingReceives;
private final MessagingFactory underlyingFactory;
@ -63,7 +63,9 @@ public class MessageReceiver extends ClientEntity {
final int prefetchCount,
final Long epoch,
final boolean isEpochReceiver,
final MessageReceiveHandler receiveHandler){
final MessageReceiveHandler receiveHandler)
{
super(name);
this.underlyingFactory = factory;
this.prefetchCount = prefetchCount;
this.epoch = epoch;
@ -72,10 +74,15 @@ public class MessageReceiver extends ClientEntity {
this.receiveLink = this.createReceiveLink(factory.getConnection(), name, recvPath, offset, offsetInclusive);
this.linkOpen = new CompletableFuture<MessageReceiver>();
Timer.schedule(new Runnable() {
public void run() {
synchronized(MessageReceiver.this.linkOpen) {
if (!MessageReceiver.this.linkOpen.isDone()) {
// timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
Timer.schedule(new Runnable()
{
public void run()
{
synchronized(MessageReceiver.this.linkOpen)
{
if (!MessageReceiver.this.linkOpen.isDone())
{
MessageReceiver.this.linkOpen.completeExceptionally(new OperationTimedOutException());
}
}
@ -86,17 +93,22 @@ public class MessageReceiver extends ClientEntity {
this.receiveHandler = receiveHandler;
}
public int getPrefetchCount() {
public int getPrefetchCount()
{
return this.prefetchCount;
}
/*
* if ReceiveHandler is passed to the Constructor - this receive shouldn't be invoked
*/
public CompletableFuture<Collection<Message>> receive(){
if (!this.prefetchedMessages.isEmpty()) {
synchronized (this.prefetchedMessages) {
if (!this.prefetchedMessages.isEmpty()) {
public CompletableFuture<Collection<Message>> receive()
{
if (!this.prefetchedMessages.isEmpty())
{
synchronized (this.prefetchedMessages)
{
if (!this.prefetchedMessages.isEmpty())
{
Collection<Message> returnMessages = this.prefetchedMessages;
this.prefetchedMessages = new ConcurrentLinkedQueue<Message>();
return CompletableFuture.completedFuture(returnMessages);
@ -109,62 +121,76 @@ public class MessageReceiver extends ClientEntity {
return onReceive;
}
void onOpenComplete(ErrorCondition condition){
synchronized (this.linkOpen) {
if (!this.linkOpen.isDone()) {
if (condition == null) {
void onOpenComplete(Exception exception)
{
synchronized (this.linkOpen)
{
if (!this.linkOpen.isDone())
{
if (exception == null)
{
this.linkOpen.complete(this);
}
else {
this.linkOpen.completeExceptionally(ExceptionUtil.toException(condition));
else
{
this.linkOpen.completeExceptionally(exception);
}
}
}
}
// intended to be called by proton reactor handler
void onDelivery(Collection<Message> messages) {
if (this.receiveHandler != null) {
void onDelivery(Collection<Message> messages)
{
if (this.receiveHandler != null)
{
this.receiveHandler.onReceiveMessages(messages);
return;
}
synchronized (this.pendingReceives) {
if (this.pendingReceives.isEmpty()) {
synchronized (this.pendingReceives)
{
if (this.pendingReceives.isEmpty())
{
this.prefetchedMessages.addAll(messages);
}
else {
else
{
this.pendingReceives.poll().complete(messages);
}
}
}
// TODO: Map to appropriate exception based on ErrMsg
void onError(ErrorCondition error) {
void onError(ErrorCondition error)
{
Exception completionException = ExceptionUtil.toException(error);
// TODO: apply retryPolicy here - recreate link - "preserve offset"
synchronized (this.linkOpen) {
if (!this.linkOpen.isDone()) {
this.onOpenComplete(error);
synchronized (this.linkOpen)
{
if (!this.linkOpen.isDone())
{
this.onOpenComplete(completionException);
return;
}
}
if (this.pendingReceives != null && !this.pendingReceives.isEmpty()) {
synchronized (this.pendingReceives) {
for(CompletableFuture<Collection<Message>> future: this.pendingReceives) {
future.completeExceptionally(ExceptionUtil.toException(error));
if (this.pendingReceives != null && !this.pendingReceives.isEmpty())
{
synchronized (this.pendingReceives)
{
for(CompletableFuture<Collection<Message>> future: this.pendingReceives)
{
future.completeExceptionally(completionException);
}
}
}
}
@Override
public void close() {
if (this.receiveLink != null && this.receiveLink.getLocalState() == EndpointState.ACTIVE) {
public void close()
{
if (this.receiveLink != null && this.receiveLink.getLocalState() == EndpointState.ACTIVE)
{
this.receiveLink.close();
}
}
@ -172,15 +198,15 @@ public class MessageReceiver extends ClientEntity {
private Receiver createReceiveLink(
final Connection connection,
final String name,
final String recvPath,
final String receivePath,
final String offset,
final boolean offsetInclusive)
{
Source source = new Source();
source.setAddress(recvPath);
source.setAddress(receivePath);
source.setFilter(Collections.singletonMap(
Symbol.valueOf("apache.org:selector-filter:string"),
new UnknownDescribedType(Symbol.valueOf("apache.org:selector-filter:string"),
AmqpConstants.StringFilter,
new UnknownDescribedType(AmqpConstants.StringFilter,
String.format("amqp.annotation.%s >%s '%s'", AmqpConstants.OffsetName, offsetInclusive ? "=" : StringUtil.EMPTY, offset))));
Session ssn = connection.session();
@ -191,10 +217,10 @@ public class MessageReceiver extends ClientEntity {
// use explicit settlement via dispositions (not pre-settled)
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
// receiver.setContext(Collections.singletonMap(Symbol.valueOf("com.microsoft:epoch"), 122));
if (this.isEpochReceiver) {
receiver.setProperties(Collections.singletonMap(Symbol.valueOf("com.microsoft:epoch"), (Object) this.epoch));
if (this.isEpochReceiver)
{
receiver.setProperties(Collections.singletonMap(AmqpConstants.Epoch, (Object) this.epoch));
}
ssn.open();

Просмотреть файл

@ -42,6 +42,7 @@ public class MessageSender extends ClientEntity {
}
private MessageSender(final MessagingFactory factory, final String sendLinkName, final String senderPath) {
super(sendLinkName);
this.sendPath = senderPath;
this.underlyingFactory = factory;
this.sendLink = MessageSender.createSendLink(factory.getConnection(), sendLinkName, senderPath);

Просмотреть файл

@ -6,13 +6,14 @@ import java.util.*;
/**
* RetryPolicy implementation where the delay between retries will grow in a staggered exponential manner.
* RetryPolicy can be set on the client operations using {@link ServiceBusConnectionStringBuilder}.
* RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount
*/
public final class RetryExponential extends RetryPolicy
{
private Duration minimumBackoff;
private Duration maximumBackoff;
private int maximumRetryCount;
private double retryFactor;
private final Duration minimumBackoff;
private final Duration maximumBackoff;
private final int maximumRetryCount;
private final double retryFactor;
public RetryExponential(Duration minimumBackoff, Duration maximumBackoff, int maximumRetryCount)
{
@ -23,7 +24,7 @@ public final class RetryExponential extends RetryPolicy
}
@Override
public boolean isNextRetryAllowed(String clientId, Exception lastException, Duration remainingTime, Duration retryAfter)
public Duration getNextRetryInterval(String clientId, Exception lastException, Duration remainingTime)
{
// TODO: does string inturn'ing effect sync logic ?
synchronized (clientId)
@ -32,32 +33,35 @@ public final class RetryExponential extends RetryPolicy
if (currentRetryCount >= this.maximumRetryCount)
{
return false;
return null;
}
// TODO: Given the current implementation evaluate the need for the extra wait for ServerBusyException
if (!RetryPolicy.isRetryableException(lastException))
{
return false;
return null;
}
long nextRetryInterval = (long) Math.pow(2, (double)this.maximumRetryCount + this.retryFactor);
if (remainingTime.getSeconds() < nextRetryInterval) {
return false;
double nextRetryInterval = Math.pow(this.retryFactor, (double)currentRetryCount);
long nextRetryIntervalSeconds = (long) nextRetryInterval ;
long nextRetryIntervalNano = (long)((nextRetryInterval - (double)nextRetryIntervalSeconds) * 1000000000);
if (remainingTime.getSeconds() < nextRetryInterval + 5)
{
return null;
}
retryAfter.plusSeconds(nextRetryInterval);
return true;
Duration retryAfter = this.minimumBackoff.plus(Duration.ofSeconds(nextRetryIntervalSeconds, nextRetryIntervalNano));
return retryAfter;
}
}
private double computeRetryFactor()
{
long deltaBackoff = this.maximumBackoff.getSeconds() - this.minimumBackoff.getSeconds();
long deltaBackoff = this.maximumBackoff.minus(this.minimumBackoff).getSeconds();
if (deltaBackoff <= 0 || this.maximumRetryCount <= 0) {
return 0;
}
return (Math.log(deltaBackoff / (Math.pow(2, this.maximumRetryCount) - 1))) / (Math.log(2));
return (Math.log(deltaBackoff) / Math.log(this.maximumRetryCount));
}
}

Просмотреть файл

@ -14,7 +14,7 @@ public abstract class RetryPolicy
this.retryCounts = new ConcurrentHashMap<String, Integer>();
}
void incrementRetryCount(String clientId)
public void incrementRetryCount(String clientId)
{
synchronized (clientId)
{
@ -23,6 +23,14 @@ public abstract class RetryPolicy
}
}
public void resetRetryCount(String clientId)
{
synchronized (clientId)
{
this.retryCounts.put(clientId, 0);
}
}
public static boolean isRetryableException(Exception exception)
{
if (exception == null)
@ -56,7 +64,7 @@ public abstract class RetryPolicy
}
/**
* @param retryAfter "out" parameter. Pass retryAfter with value = 0 - this method will increment the value. Don't new-up this parameter in the implementations of {@link RetryPolicy#isNextRetryAllowed(Duration)}.
* return returns 'null' Duration when not Allowed
*/
public abstract boolean isNextRetryAllowed(String clientId, Exception lastException, Duration remainingTime, Duration retryAfter);
public abstract Duration getNextRetryInterval(String clientId, Exception lastException, Duration remainingTime);
}

Просмотреть файл

@ -0,0 +1,48 @@
package com.microsoft.azure.eventhubs.exceptioncontracts;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import org.junit.*;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.eventhubs.lib.*;
import com.microsoft.azure.servicebus.*;
public class RetryPolicyTests extends TestBase
{
@Test
public void testRetryPolicy() throws Exception
{
String clientId = "someClientEntity";
RetryPolicy retry = RetryPolicy.getDefault();
retry.incrementRetryCount(clientId);
Duration firstRetryInterval = retry.getNextRetryInterval(clientId, new IntermittentException(), Duration.ofSeconds(60));
Assert.assertTrue(firstRetryInterval != null);
retry.incrementRetryCount(clientId);
Duration secondRetryInterval = retry.getNextRetryInterval(clientId, new IntermittentException(), Duration.ofSeconds(60));
Assert.assertTrue(secondRetryInterval != null);
Assert.assertTrue(secondRetryInterval.getSeconds() > firstRetryInterval.getSeconds() ||
(secondRetryInterval.getSeconds() == firstRetryInterval.getSeconds() && secondRetryInterval.getNano() > firstRetryInterval.getNano()));
retry.incrementRetryCount(clientId);
Duration nextRetryInterval = retry.getNextRetryInterval(clientId, new AuthorizationFailedException("authorizationerror"), Duration.ofSeconds(60));
Assert.assertTrue(nextRetryInterval == null);
retry.resetRetryCount(clientId);
retry.incrementRetryCount(clientId);
Duration firstRetryIntervalAfterReset = retry.getNextRetryInterval(clientId, new IntermittentException(), Duration.ofSeconds(60));
Assert.assertTrue(firstRetryInterval.equals(firstRetryIntervalAfterReset));
}
public static class IntermittentException extends ServiceBusException
{
@Override
public boolean getIsTransient() {
return true;
}
}
}