This commit is contained in:
Sreeram Garlapati 2015-12-30 12:30:31 -08:00
Родитель 8e2714c940
Коммит e178d7658e
14 изменённых файлов: 149 добавлений и 54 удалений

Просмотреть файл

@ -29,7 +29,7 @@ public class EventHubClient
}
public static CompletableFuture<EventHubClient> createFromConnectionString(final String connectionString)
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, IOException
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException, IOException
{
ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString);
final EventHubClient eventHubClient = new EventHubClient(connStr);
@ -50,20 +50,20 @@ public class EventHubClient
}
public final CompletableFuture<PartitionSender> createPartitionSender(final String partitionId)
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException
{
return PartitionSender.Create(this.underlyingFactory, this.eventHubName, partitionId);
}
// TODO: return partitionInfo
public final String getPartitionInfo()
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException
{
throw new UnsupportedOperationException("TODO: Implement over http");
}
public final CompletableFuture<Void> send(EventData data)
throws MessagingCommunicationException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, PayloadExceededException
throws MessagingCommunicationException, ServerBusyException, AuthorizationFailedException, PayloadExceededException
{
if (data == null) {
// TODO: TRACE
@ -74,13 +74,13 @@ public class EventHubClient
}
public final CompletableFuture<Void> send(Iterable<EventData> data)
throws MessagingCommunicationException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, PayloadExceededException
throws MessagingCommunicationException, ServerBusyException, AuthorizationFailedException, PayloadExceededException
{
throw new UnsupportedOperationException("TODO Implement Send Batch");
}
public final CompletableFuture<Void> send(EventData data, String partitionKey)
throws MessagingCommunicationException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, PayloadExceededException
throws MessagingCommunicationException, ServerBusyException, AuthorizationFailedException, PayloadExceededException
{
if (data == null) {
throw new IllegalArgumentException("EventData cannot be null.");
@ -99,23 +99,23 @@ public class EventHubClient
}
public final CompletableFuture<Void> send(Iterable<EventData> data, String partitionKey)
throws MessagingCommunicationException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, PayloadExceededException
throws MessagingCommunicationException, ServerBusyException, AuthorizationFailedException, PayloadExceededException
{
throw new UnsupportedOperationException("TODO: Implement Send Batch");
}
public final CompletableFuture<PartitionReceiver> createReceiver(final String consumerGroupName, final String partitionId)
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, InterruptedException, ExecutionException {
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, AuthorizationFailedException, InterruptedException, ExecutionException {
return this.createReceiver(consumerGroupName, partitionId, PartitionReceiver.StartOfStream, false);
}
public final CompletableFuture<PartitionReceiver> createReceiver(final String consumerGroupName, final String partitionId, final String startingOffset)
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, InterruptedException, ExecutionException {
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, AuthorizationFailedException, InterruptedException, ExecutionException {
return this.createReceiver(consumerGroupName, partitionId, startingOffset, false);
}
public final CompletableFuture<PartitionReceiver> createReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive)
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, AuthorizationFailedException
{
return PartitionReceiver.Create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, PartitionReceiver.NullEpoch, false, null);
}
@ -125,19 +125,19 @@ public class EventHubClient
}
public final CompletableFuture<PartitionReceiver> createEpochReceiver(final String consumerGroupName, final String partitionId, final long epoch)
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, ReceiverDisconnectedException
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException, ReceiverDisconnectedException
{
return this.createEpochReceiver(consumerGroupName, partitionId, PartitionReceiver.StartOfStream, epoch);
}
public final CompletableFuture<PartitionReceiver> createEpochReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, final long epoch)
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, ReceiverDisconnectedException
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException, ReceiverDisconnectedException
{
return this.createEpochReceiver(consumerGroupName, partitionId, startingOffset, false, epoch);
}
public final CompletableFuture<PartitionReceiver> createEpochReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final long epoch)
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, ReceiverDisconnectedException
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException, ReceiverDisconnectedException
{
return PartitionReceiver.Create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, epoch, true, null);
}
@ -147,17 +147,28 @@ public class EventHubClient
throw new UnsupportedOperationException("TODO: Implement datetime receiver");
}
/*
* Built for EventProcessorHost scenario.
* - Implement ReceiveHandler to process events.
/**
* Use Epoch receiver to ensure that there is only *one* Receiver active on the EventHub Partition for this consumer group.
* EventHubs Service will ensure that the Receiver with highest epoch Value owns the Partition.
* This overload of CreateEpochReceiver is built to support EventProcessorHost pattern. Implement ReceiveHandler to process events.
* @param consumerGroupName consumer group name
* @param partitionId partition id of the EventHub
* @param startingOffset starting offset to read the Stream from. By default Start of EventHub Stream is {@link PartitionReceiver#StartOfStream}.
* @param offsetInclusive should the first event to be read include the offset ?
* @param epoch to make sure that there is only one Receiver
* @param receiveHandler the implementation of {@link ReceiveHandler} which can process the received events
* @return
* @throws EntityNotFoundException
* @throws ServerBusyException
* @throws AuthorizationFailedException
* @throws ReceiverDisconnectedException
*/
public final CompletableFuture<PartitionReceiver> createEpochReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final long epoch, ReceiveHandler receiveHandler)
throws EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, ReceiverDisconnectedException {
throws EntityNotFoundException, ServerBusyException, AuthorizationFailedException, ReceiverDisconnectedException {
return PartitionReceiver.Create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, epoch, true, receiveHandler);
}
public final void close()
{
}
}

Просмотреть файл

@ -37,7 +37,7 @@ public final class PartitionReceiver
final Long epoch,
final boolean isEpochReceiver,
final ReceiveHandler receiveHandler)
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException {
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, AuthorizationFailedException {
this.underlyingFactory = factory;
this.eventHubName = eventHubName;
this.consumerGroupName = consumerGroupName;
@ -58,7 +58,7 @@ public final class PartitionReceiver
final long epoch,
final boolean isEpochReceiver,
final ReceiveHandler receiveHandler)
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException {
throws ReceiverDisconnectedException, EntityNotFoundException, ServerBusyException, AuthorizationFailedException {
final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, epoch, isEpochReceiver, receiveHandler);
return receiver.createInternalReceiver().thenApplyAsync(new Function<Void, PartitionReceiver>() {
public PartitionReceiver apply(Void a){
@ -107,13 +107,13 @@ public final class PartitionReceiver
}
public CompletableFuture<Collection<EventData>> receive()
throws ServerBusyException, AuthorizationFailedException, InternalServerErrorException
throws ServerBusyException, AuthorizationFailedException
{
return this.receive(this.underlyingFactory.getOperationTimeout());
}
public CompletableFuture<Collection<EventData>> receive(Duration waittime)
throws ServerBusyException, AuthorizationFailedException, InternalServerErrorException
throws ServerBusyException, AuthorizationFailedException
{
if (this.receiveHandler != null) {
throw new IllegalStateException("Receive and onReceive cannot be performed side-by-side on a single instance of Receiver.");

Просмотреть файл

@ -41,13 +41,13 @@ public final class PartitionSender
}
public final CompletableFuture<Void> send(EventData data)
throws MessagingCommunicationException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, PayloadExceededException, EntityNotFoundException
throws MessagingCommunicationException, ServerBusyException, AuthorizationFailedException, PayloadExceededException, EntityNotFoundException
{
return this.internalSender.send(data.toAmqpMessage());
}
public final void send(Iterable<EventData> eventDatas)
throws MessagingCommunicationException, ServerBusyException, InternalServerErrorException, AuthorizationFailedException, PayloadExceededException, EntityNotFoundException
throws MessagingCommunicationException, ServerBusyException, AuthorizationFailedException, PayloadExceededException, EntityNotFoundException
{
throw new UnsupportedOperationException("TODO: Implement Send Batch");
}

Просмотреть файл

@ -4,6 +4,10 @@ import org.apache.qpid.proton.amqp.Symbol;
public final class AmqpConstants {
private AmqpConstants() { }
public static final String Vendor = "com.microsoft";
public static final String OffsetName = "x-opt-offset";
public static final Symbol PartitionKey = Symbol.getSymbol("x-opt-partition-key");

Просмотреть файл

@ -5,5 +5,12 @@ import org.apache.qpid.proton.amqp.Symbol;
public final class AmqpErrorCode {
public static final Symbol NotFound = Symbol.getSymbol("amqp:not-found");
public static final Symbol UnauthorizedAccess = Symbol.getSymbol("amqp:unauthorized-access");
public static final Symbol ResourceLimitExceeded = Symbol.getSymbol("amqp:resource-limit-exceeded");
public static final Symbol NotAllowed = Symbol.getSymbol("amqp:not-allowed");
public static final Symbol InternalError = Symbol.getSymbol("amqp:internal-error");
// link errors
public static final Symbol Stolen = Symbol.getSymbol("amqp:link:stolen");
}

Просмотреть файл

@ -3,6 +3,10 @@ package com.microsoft.azure.servicebus;
public class AuthorizationFailedException extends ServiceBusException {
private static final long serialVersionUID = -1106827749824999989L;
public AuthorizationFailedException(String description) {
super(description);
}
@Override
public boolean getIsTransient(){
return false;

Просмотреть файл

@ -0,0 +1,10 @@
package com.microsoft.azure.servicebus;
import org.apache.qpid.proton.amqp.*;
public final class ClientConstants {
private ClientConstants() { }
public final static Symbol ServerBusyError = Symbol.getSymbol(AmqpConstants.Vendor + ":server-busy");
}

Просмотреть файл

@ -17,6 +17,12 @@ final class ExceptionUtil {
else if (errorCondition.getCondition() == AmqpErrorCode.Stolen) {
return new ReceiverDisconnectedException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.UnauthorizedAccess) {
return new AuthorizationFailedException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.ServerBusyError) {
return new ServerBusyException(errorCondition.getDescription());
}
// enumerate all ExceptionTypes
return null;

Просмотреть файл

@ -1,10 +0,0 @@
package com.microsoft.azure.servicebus;
public class InternalServerErrorException extends ServiceBusException {
private static final long serialVersionUID = -1106827749824999989L;
@Override public boolean getIsTransient(){
// basically tell the client to try after sometime
return true;
}
}

Просмотреть файл

@ -71,7 +71,7 @@ public class SendLinkHandler extends BaseHandler {
if (condition != null) {
if(TRACE_LOGGER.isLoggable(Level.SEVERE))
{
TRACE_LOGGER.log(Level.SEVERE, "sendLink.onLinkRemoteClose: name["+link.getName()+"] : ErrorCondition[" + condition.getDescription() + "]");
TRACE_LOGGER.log(Level.SEVERE, "sendLink.onLinkRemoteClose: name[" + link.getName() + "] : ErrorCondition[" + condition.getDescription() + "]");
}
}

Просмотреть файл

@ -4,14 +4,13 @@ public class ServerBusyException extends ServiceBusException
{
private static final long serialVersionUID = -1106827749824999989L;
// return what kind of ServerBusyException this is - Throttle or not
// PLEASE DONOT START DISCUSS: DO WE WANT A SUBCLASS or ERRORCODE
public int getErrorCode(){
return 2;
ServerBusyException(String message) {
super(message);
}
@Override
public boolean getIsTransient(){
return true;
}
public boolean getIsTransient()
{
return false;
}
}

Просмотреть файл

@ -0,0 +1,57 @@
package com.microsoft.azure.eventhubs.exceptioncontracts;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.eventhubs.lib.*;
import com.microsoft.azure.servicebus.*;
public class ReceiverEpoch extends TestBase {
@Test (expected = ReceiverDisconnectedException.class)
public void testEpochReceiver() throws Throwable
{
TestEventHubInfo eventHubInfo = TestBase.checkoutTestEventHub();
try {
ConnectionStringBuilder connectionString = TestBase.getConnectionString(eventHubInfo);
EventHubClient ehClient = EventHubClient.createFromConnectionString(connectionString.toString()).get();
String cgName = eventHubInfo.getRandomConsumerGroup();
String partitionId = "0";
long epoch = 345632;
ehClient.createEpochReceiver(cgName, partitionId, PartitionReceiver.StartOfStream, false, epoch, new EventCounter()).get();
try {
ehClient.createEpochReceiver(cgName, partitionId, epoch - 10).get();
}
catch(ExecutionException exp) {
throw exp.getCause();
}
}
finally {
TestBase.checkinTestEventHub(eventHubInfo.getName());
}
}
public static final class EventCounter extends ReceiveHandler {
private long count;
public EventCounter(){
count = 0;
}
@Override
public void onReceive(Collection<EventData> events) {
for(EventData event: events) {
System.out.println(String.format("Counter: %s, Offset: %s, SeqNo: %s, EnqueueTime: %s, PKey: %s",
this.count, event.getSystemProperties().getOffset(), event.getSystemProperties().getSequenceNumber(), event.getSystemProperties().getEnqueuedTimeUtc(), event.getSystemProperties().getPartitionKey()));
}
count++;
}
}
}

Просмотреть файл

@ -20,14 +20,19 @@ public class SecurityExceptions extends TestBase
public void testEventHubClientUnAuthorizedAccess() throws Throwable
{
TestEventHubInfo eventHubInfo = TestBase.checkoutTestEventHub();
ConnectionStringBuilder connectionString = new ConnectionStringBuilder(eventHubInfo.getNamespaceName(), eventHubInfo.getName(), TestBase.SasRuleName, "wrongvalue");
try {
ConnectionStringBuilder connectionString = new ConnectionStringBuilder(eventHubInfo.getNamespaceName(), eventHubInfo.getName(), TestBase.SasRuleName, "wrongvalue");
try
{
EventHubClient.createFromConnectionString(connectionString.toString()).get();
try
{
EventHubClient.createFromConnectionString(connectionString.toString()).get();
}
catch(ExecutionException exp) {
throw exp.getCause();
}
}
catch(ExecutionException exp) {
throw exp.getCause();
finally {
TestBase.checkinTestEventHub(eventHubInfo.getName());
}
}
}

Просмотреть файл

@ -1,9 +1,7 @@
package com.microsoft.azure.eventhubs.lib;
import static org.junit.Assert.*;
import java.util.*;
import com.microsoft.azure.servicebus.*;
/**
@ -17,7 +15,7 @@ public abstract class TestBase
public static TestEventHubInfo checkoutTestEventHub() {
HashMap sasRule = new HashMap<String, String>();
sasRule.put(TestBase.SasRuleName, "-------SasKEY--------");
sasRule.put(TestBase.SasRuleName, "LHbmplGdVC7Lo7A1RAXXDgeHSM9WHIRvZmIt7m1y5w0=");
return new TestEventHubInfo("gojavago", "firstehub-ns", null, sasRule);
}
@ -25,4 +23,8 @@ public abstract class TestBase
Map.Entry<String, String> sasRule = eventHubInfo.getSasRule();
return new ConnectionStringBuilder(eventHubInfo.getNamespaceName(), eventHubInfo.getName(), sasRule.getKey(), sasRule.getValue());
}
public static void checkinTestEventHub(String name) {
// TODO: Implement Checkin-Checkout functionality
}
}