address concurrency issue in EventHubClient #20

This commit is contained in:
Sreeram Garlapati 2016-01-31 19:36:57 -08:00
Родитель 03b09b5def
Коммит 8ade7fd75a
3 изменённых файлов: 97 добавлений и 14 удалений

Просмотреть файл

@ -3,8 +3,7 @@ package com.microsoft.azure.servicebus;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.*;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.*;
@ -23,6 +22,7 @@ public class MessagingFactory extends ClientEntity
public static final Duration DefaultOperationTimeout = Duration.ofSeconds(60);
private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.ServiceBusClientTrace);
private static final Object reactorLock = new Object();
// TODO: maintain refCount for reactor and close it if all MessagingFactory instances are closed
private static Reactor reactor;
@ -34,35 +34,50 @@ public class MessagingFactory extends ClientEntity
private Duration operationTimeout;
private RetryPolicy retryPolicy;
/**
* @param reactor parameter reactor is purely for testing purposes and the SDK code should always set it to null
*/
MessagingFactory(final ConnectionStringBuilder builder, final Reactor reactor) throws IOException
{
super("MessagingFactory" + UUID.randomUUID().toString());
if (reactor == null)
this.startReactor();
else if (MessagingFactory.reactor == null)
MessagingFactory.reactor = reactor;
/* else if (MessagingFactory.reactor != reactor)
throw new IllegalArgumentException("argument 'reactor' is unexpected"); */
else
{
synchronized (MessagingFactory.reactorLock)
{
if (MessagingFactory.reactor == null)
MessagingFactory.reactor = reactor;
}
}
this.connection = createConnection(builder);
this.connection = this.createConnection(builder);
this.operationTimeout = builder.getOperationTimeout();
this.retryPolicy = builder.getRetryPolicy();
}
private Connection createConnection(ConnectionStringBuilder builder)
{
ConnectionHandler connectionHandler = new ConnectionHandler(this, builder.getEndpoint().getHost(), builder.getSasKeyName(), builder.getSasKey());
this.waitingConnectionOpen = true;
return reactor.connection(connectionHandler);
synchronized (MessagingFactory.reactorLock)
{
assert MessagingFactory.reactor != null;
ConnectionHandler connectionHandler = new ConnectionHandler(this, builder.getEndpoint().getHost(), builder.getSasKeyName(), builder.getSasKey());
this.waitingConnectionOpen = true;
return reactor.connection(connectionHandler);
}
}
private void startReactor() throws IOException
{
if (reactor == null)
synchronized (MessagingFactory.reactorLock)
{
reactor = Proton.reactor();
new Thread(new RunReactor(reactor)).start();
if (MessagingFactory.reactor == null)
{
MessagingFactory.reactor = Proton.reactor();
new Thread(new RunReactor(MessagingFactory.reactor)).start();
}
}
}
@ -72,7 +87,9 @@ public class MessagingFactory extends ClientEntity
{
synchronized (this.connection)
{
if (this.connection.getLocalState() != EndpointState.ACTIVE && !this.waitingConnectionOpen)
if (this.connection.getLocalState() != EndpointState.ACTIVE &&
this.connection.getLocalState() != EndpointState.UNINITIALIZED &&
!this.waitingConnectionOpen)
{
this.connection.free();
this.connection = reactor.connection(connectionHandler);

Просмотреть файл

@ -0,0 +1,63 @@
package com.microsoft.azure.eventhubs.concurrency;
import java.io.IOException;
import java.util.concurrent.*;
import org.junit.*;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.eventhubs.lib.*;
import com.microsoft.azure.servicebus.*;
public class EventHubClientTests extends TestBase
{
@Test()
public void testParallelEventHubClients() throws ServiceBusException, InterruptedException, ExecutionException, IOException
{
Assume.assumeTrue(TestBase.isServiceRun());
TestEventHubInfo eventHubInfo = TestBase.checkoutTestEventHub();
String consumerGroupName = eventHubInfo.getRandomConsumerGroup();
String partitionId = "0";
CompletableFuture<EventHubClient>[] createFutures = new CompletableFuture[4];
try
{
ConnectionStringBuilder connectionString = TestBase.getConnectionString(eventHubInfo);
for (int i = 0; i < 4 ; i ++)
{
createFutures[i] = EventHubClient.createFromConnectionString(connectionString.toString());
}
CompletableFuture.allOf(createFutures).get();
boolean firstOne = true;
for (CompletableFuture<EventHubClient> createFuture: createFutures)
{
EventHubClient ehClient = createFuture.join();
if (firstOne)
{
TestBase.pushEventsToPartition(ehClient, partitionId, 10);
firstOne = false;
}
PartitionReceiver receiver = ehClient.createReceiver(consumerGroupName, partitionId).get();
receiver.receive().get();
}
}
finally
{
if (createFutures != null)
{
for (CompletableFuture<EventHubClient> createFuture: createFutures)
{
if (!createFuture.isCancelled() || !createFuture.isCompletedExceptionally())
{
EventHubClient ehClient = createFuture.join();
ehClient.close();
}
}
}
}
}
}

Просмотреть файл

@ -7,6 +7,7 @@ import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.*;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.message.*;
import org.apache.qpid.proton.reactor.Handshaker;
/**
* Sends 1 Msg on the first onLinkFlow event
@ -18,6 +19,8 @@ public class Sender1MsgOnLinkFlowHandler extends ServerTraceHandler
public Sender1MsgOnLinkFlowHandler()
{
add(new Handshaker());
this.firstFlow = new Object();
this.isFirstFlow = true;
}