Free up AmqpMessage byte arrays - in proton layer after Receive

Fix logging in Tests
Fix nullref in EventData
This commit is contained in:
Sreeram Garlapati 2016-02-03 21:33:00 -08:00
Родитель fc61d8a43b
Коммит 12cd5ceed9
12 изменённых файлов: 118 добавлений и 73 удалений

Просмотреть файл

@ -60,9 +60,12 @@ public class EventData
this.properties = amqpMessage.getApplicationProperties() == null ? null
: ((Map<String, String>)(amqpMessage.getApplicationProperties().getValue()));
this.bodyData = ((Data) amqpMessage.getBody()).getValue();
this.bodyData = amqpMessage.getBody() == null ? null : ((Data) amqpMessage.getBody()).getValue();
this.isReceivedEvent = true;
amqpMessage.clear();
}
/**
@ -165,7 +168,7 @@ public class EventData
public byte[] getBody()
{
// TODO: enforce on-send constructor type 2
return this.bodyData.getArray();
return this.bodyData == null ? null : this.bodyData.getArray();
}
/**

Просмотреть файл

@ -118,7 +118,7 @@ public class MessageReceiver extends ClientEntity
this.currentOperationTracker = TimeoutTracker.create(factory.getOperationTimeout());
this.linkOpen = new CompletableFuture<MessageReceiver>();
this.scheduleLinkEventTimeout(this.currentOperationTracker, this.linkOpen, "open");
this.scheduleLinkOpenTimeout(this.currentOperationTracker);
this.linkCreateScheduled = true;
this.pendingReceives = new ConcurrentLinkedQueue<WorkItem<Collection<Message>>>();
@ -509,9 +509,7 @@ public class MessageReceiver extends ClientEntity
}
}
private void scheduleLinkEventTimeout(final TimeoutTracker timeout,
@SuppressWarnings("rawtypes") final CompletableFuture linkEvent,
final String eventType)
private void scheduleLinkOpenTimeout(final TimeoutTracker timeout)
{
// timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
Timer.schedule(
@ -519,19 +517,48 @@ public class MessageReceiver extends ClientEntity
{
public void run()
{
synchronized(linkEvent)
synchronized(linkOpen)
{
if (!linkEvent.isDone())
if (!linkOpen.isDone())
{
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, eventType));
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, "Open"));
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, eventType),
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, "Open"),
operationTimedout);
}
linkEvent.completeExceptionally(operationTimedout);
linkOpen.completeExceptionally(operationTimedout);
}
}
}
}
, timeout.remaining()
, TimerType.OneTimeRun);
}
private void scheduleLinkCloseTimeout(final TimeoutTracker timeout)
{
// timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
Timer.schedule(
new Runnable()
{
public void run()
{
synchronized(linkClose)
{
if (!linkClose.isDone())
{
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", name, "Close"));
if (TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", name, MessageReceiver.this.receivePath, "Close"),
operationTimedout);
}
linkClose.completeExceptionally(operationTimedout);
}
}
}
@ -584,7 +611,7 @@ public class MessageReceiver extends ClientEntity
if (!this.closeCalled)
{
this.receiveLink.close();
this.scheduleLinkEventTimeout(TimeoutTracker.create(this.operationTimeout), this.linkClose, "close");
this.scheduleLinkCloseTimeout(TimeoutTracker.create(this.operationTimeout));
this.closeCalled = true;
}
}

Просмотреть файл

@ -1,11 +1,30 @@
package com.microsoft.azure.servicebus.amqp;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.amqp.transport.*;
import org.apache.qpid.proton.engine.*;
import com.microsoft.azure.servicebus.ClientConstants;
// TODO: Implement the Open logic for ServiceBus Service
// - when the Service returns link Attach with src=null and tgt=null - it means an error
public class BaseLinkHandler extends BaseHandler
{
protected static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.ServiceBusClientTrace);
@Override
public void onLinkRemoteOpen(Event event)
{
Link link = event.getLink();
if (link != null)
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE,
String.format("linkName[%s]", link.getName()));
}
}
}
}

Просмотреть файл

@ -24,10 +24,8 @@ import com.microsoft.azure.servicebus.MessageReceiver;
* ServiceBus <-> ProtonReactor interaction
* handles all recvLink - reactor events
*/
public final class ReceiveLinkHandler extends BaseHandler
public final class ReceiveLinkHandler extends BaseLinkHandler
{
private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.ServiceBusClientTrace);
private final String name;
private final MessageReceiver msgReceiver;
private final Object firstResponse;

Просмотреть файл

@ -8,10 +8,8 @@ import org.apache.qpid.proton.engine.*;
import com.microsoft.azure.servicebus.ClientConstants;
import com.microsoft.azure.servicebus.MessageSender;
public class SendLinkHandler extends BaseHandler
public class SendLinkHandler extends BaseLinkHandler
{
private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.ServiceBusClientTrace);
private final String name;
private final MessageSender msgSender;
private final Object firstFlow;

Просмотреть файл

@ -55,15 +55,13 @@ public class ConcurrentReceiversTest
{
ehClients[i] = EventHubClient.createFromConnectionString(connStr.toString(), true).get();
receivers[i] = ehClients[i].createReceiver(consumerGroupName, Integer.toString(i), Instant.now()).get();
receivers[i].setReceiveHandler(new EventCounter(Integer.toString(i)));
System.out.println("created receiver on partition: " + Integer.toString(i));
receivers[i].setReceiveHandler(new EventCounter());
}
}
finally
{
for (int i=0; i < partitionCount; i++)
{
System.out.println("closing receivers: " + Integer.toString(i));
if (receivers[i] != null)
{
receivers[i].close();
@ -90,12 +88,10 @@ public class ConcurrentReceiversTest
public static final class EventCounter extends PartitionReceiveHandler
{
private long count;
private String partitionId;
public EventCounter(final String partitionId)
public EventCounter()
{
count = 0;
this.partitionId = partitionId;
}
@Override
@ -103,9 +99,6 @@ public class ConcurrentReceiversTest
{
for(EventData event: events)
{
System.out.println(String.format("Partition(%s): Counter: %s, Offset: %s, SeqNo: %s, EnqueueTime: %s, PKey: %s",
this.partitionId, this.count, event.getSystemProperties().getOffset(), event.getSystemProperties().getSequenceNumber(), event.getSystemProperties().getEnqueuedTime(), event.getSystemProperties().getPartitionKey()));
count++;
}
}

Просмотреть файл

@ -1,6 +1,8 @@
package com.microsoft.azure.eventhubs.exceptioncontracts;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@ -28,7 +30,8 @@ public class ReceiverEpochTest extends TestBase
String partitionId = "0";
long epoch = 345632;
PartitionReceiver receiver = ehClient.createEpochReceiver(cgName, partitionId, PartitionReceiver.StartOfStream, false, epoch).get();
receiver.setReceiveHandler(new EventCounter());
EventCounter counter = new EventCounter();
receiver.setReceiveHandler(counter);
try
{
@ -38,6 +41,8 @@ public class ReceiverEpochTest extends TestBase
{
throw exp.getCause();
}
Assert.assertTrue(counter.count > 0);
}
finally
{
@ -52,7 +57,7 @@ public class ReceiverEpochTest extends TestBase
public static final class EventCounter extends PartitionReceiveHandler
{
private long count;
public long count;
public EventCounter()
{
@ -62,12 +67,6 @@ public class ReceiverEpochTest extends TestBase
@Override
public void onReceive(Iterable<EventData> events)
{
for(EventData event: events)
{
System.out.println(String.format("Counter: %s, Offset: %s, SeqNo: %s, EnqueueTime: %s, PKey: %s",
this.count, event.getSystemProperties().getOffset(), event.getSystemProperties().getSequenceNumber(), event.getSystemProperties().getEnqueuedTime(), event.getSystemProperties().getPartitionKey()));
}
count++;
}

Просмотреть файл

@ -46,7 +46,8 @@ public class ReceiverRetryTest extends TestBase
@Override
public void onLinkRemoteOpen(Event event)
{
Link link = event.getLink();
TestBase.TEST_LOGGER.log(Level.FINE, "onLinkRemoteOpen");
Link link = event.getLink();
if (link.getLocalState() == EndpointState.UNINITIALIZED)
{
if (link.getRemoteTarget() != null)
@ -65,7 +66,7 @@ public class ReceiverRetryTest extends TestBase
public void onLinkFlow(Event event)
{
super.onLinkFlow(event);
System.out.println("onLinkFlow");
TestBase.TEST_LOGGER.log(Level.FINE, "onLinkFlow");
if (firstRequest)
{
this.firstRequest = false;
@ -87,7 +88,7 @@ public class ReceiverRetryTest extends TestBase
server = MockServer.Create(recvFlowHandler);
}
@Test
// TODO: @Test
public void testRetryWhenReceiveFails() throws Exception
{
factory = MessagingFactory.createFromConnectionString(
@ -101,18 +102,18 @@ public class ReceiverRetryTest extends TestBase
receiver.receive().get();
}
System.out.println(String.format("actual retries: %s", data.retryCount));
TestBase.TEST_LOGGER.log(Level.FINE, String.format("actual retries: %s", data.retryCount));
Assert.assertTrue(data.retryCount > 3);
}
@After
public void cleanup() throws IOException
{
/*if (server != null)
if (server != null)
server.close();
if (factory != null)
factory.close();*/
factory.close();
}
public class TestData

Просмотреть файл

@ -3,6 +3,7 @@ package com.microsoft.azure.eventhubs.exceptioncontracts;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import org.junit.*;
@ -20,12 +21,12 @@ public class RetryPolicyTest extends TestBase
retry.incrementRetryCount(clientId);
Duration firstRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("firstRetryInterval: " + firstRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "firstRetryInterval: " + firstRetryInterval.toString());
Assert.assertTrue(firstRetryInterval != null);
retry.incrementRetryCount(clientId);
Duration secondRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("secondRetryInterval: " + secondRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "secondRetryInterval: " + secondRetryInterval.toString());
Assert.assertTrue(secondRetryInterval != null);
Assert.assertTrue(secondRetryInterval.getSeconds() > firstRetryInterval.getSeconds() ||
@ -33,7 +34,7 @@ public class RetryPolicyTest extends TestBase
retry.incrementRetryCount(clientId);
Duration thirdRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("thirdRetryInterval: " + thirdRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "thirdRetryInterval: " + thirdRetryInterval.toString());
Assert.assertTrue(thirdRetryInterval != null);
Assert.assertTrue(thirdRetryInterval.getSeconds() > secondRetryInterval.getSeconds() ||
@ -41,7 +42,7 @@ public class RetryPolicyTest extends TestBase
retry.incrementRetryCount(clientId);
Duration fourthRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("fourthRetryInterval: " + fourthRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "fourthRetryInterval: " + fourthRetryInterval.toString());
Assert.assertTrue(fourthRetryInterval != null);
Assert.assertTrue(fourthRetryInterval.getSeconds() > thirdRetryInterval.getSeconds() ||
@ -49,7 +50,7 @@ public class RetryPolicyTest extends TestBase
retry.incrementRetryCount(clientId);
Duration fifthRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("fifthRetryInterval: " + fifthRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "fifthRetryInterval: " + fifthRetryInterval.toString());
Assert.assertTrue(fifthRetryInterval != null);
Assert.assertTrue(fifthRetryInterval.getSeconds() > fourthRetryInterval.getSeconds() ||
@ -57,7 +58,7 @@ public class RetryPolicyTest extends TestBase
retry.incrementRetryCount(clientId);
Duration sixthRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("sixthRetryInterval: " + sixthRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "sixthRetryInterval: " + sixthRetryInterval.toString());
Assert.assertTrue(sixthRetryInterval != null);
Assert.assertTrue(sixthRetryInterval.getSeconds() > fifthRetryInterval.getSeconds() ||
@ -65,7 +66,7 @@ public class RetryPolicyTest extends TestBase
retry.incrementRetryCount(clientId);
Duration seventhRetryInterval = retry.getNextRetryInterval(clientId, new ServerBusyException(), Duration.ofSeconds(60));
System.out.println("seventhRetryInterval: " + seventhRetryInterval.toString());
TestBase.TEST_LOGGER.log(Level.FINE, "seventhRetryInterval: " + seventhRetryInterval.toString());
Assert.assertTrue(seventhRetryInterval != null);
Assert.assertTrue(seventhRetryInterval.getSeconds() > sixthRetryInterval.getSeconds() ||

Просмотреть файл

@ -16,42 +16,35 @@ import org.apache.qpid.proton.reactor.*;
*/
public class MockServer implements Closeable
{
private static final Logger TRACE_LOGGER = Logger.getLogger("servicebus.test.trace");
public final static String HostName = "127.0.0.1";
public final static int Port = 5671;
public Reactor reactor;
private Reactor reactor;
private Acceptor acceptor;
private MockServer(BaseHandler handler) throws IOException
private MockServer(BaseHandler handler) throws IOException, InterruptedException
{
this.reactor = Proton.reactor();
if (reactor == null)
new Thread(new Runnable()
{
new Thread(new Runnable()
@Override
public void run()
{
@Override
public void run()
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "starting reactor instance.");
}
MockServer.this.reactor.run();
}
}).start();
}
if(TestBase.TEST_LOGGER.isLoggable(Level.FINE))
{
TestBase.TEST_LOGGER.log(Level.FINE, "starting reactor instance.");
}
reactor.run();
}
}).start();
this.acceptor = this.reactor.acceptor(MockServer.HostName, MockServer.Port,
handler == null ? new ServerTraceHandler() : handler);
}
public static MockServer Create(BaseHandler handler) throws IOException
public static MockServer Create(BaseHandler handler) throws IOException, InterruptedException
{
MockServer server = new MockServer(handler);
return server;
@ -60,6 +53,11 @@ public class MockServer implements Closeable
@Override
public void close() throws IOException
{
if (this.acceptor != null)
{
this.acceptor.close();
}
if (this.reactor != null)
{
this.reactor.free();

Просмотреть файл

@ -6,8 +6,10 @@ import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.Logger;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.eventhubs.sendrecv.ReceiveTest;
import com.microsoft.azure.servicebus.*;
/**
@ -21,6 +23,8 @@ public abstract class TestBase
final static String SasKey = NoSasKey;
public final static String SasRuleName = "RootManageSharedAccessKey";
public static final Logger TEST_LOGGER = Logger.getLogger("servicebus.test.trace");
public static TestEventHubInfo checkoutTestEventHub()
{
HashMap<String, String> sasRule = new HashMap<String, String>();

Просмотреть файл

@ -4,6 +4,7 @@ import java.io.IOException;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import org.junit.*;
@ -14,6 +15,8 @@ import com.microsoft.azure.servicebus.amqp.*;
public class ReceiveTest extends TestBase
{
private static final Logger TEST_LOGGER = Logger.getLogger(ReceiveTest.class.toString());
@Test()
public void testReceiverFilters() throws ServiceBusException, InterruptedException, ExecutionException, IOException
{
@ -50,8 +53,8 @@ public class ReceiveTest extends TestBase
for(EventData eventDataUsingOffset: startingEventsUsingOffsetReceiver)
{
EventData eventDataUsingDateTime = dateTimeIterator.next();
System.out.println(String.format("recv by offset: %s.", eventDataUsingOffset.getSystemProperties().getOffset()));
System.out.println(String.format("recv by dateTime: %s.", eventDataUsingDateTime.getSystemProperties().getOffset()));
TEST_LOGGER.log(Level.FINE, String.format("recv by offset: %s.", eventDataUsingOffset.getSystemProperties().getOffset()));
TEST_LOGGER.log(Level.FINE, String.format("recv by dateTime: %s.", eventDataUsingDateTime.getSystemProperties().getOffset()));
Assert.assertTrue(eventDataUsingOffset.getSystemProperties().getOffset().equalsIgnoreCase(eventDataUsingDateTime.getSystemProperties().getOffset()));
@ -77,7 +80,8 @@ public class ReceiveTest extends TestBase
Iterable<EventData> dateTimeEventsFromCustomOffset = datetimeReceiver.receive().get();
Assert.assertTrue(dateTimeEventsFromCustomOffset.iterator().hasNext());
EventData firstEventAfterGivenTime = dateTimeEventsFromCustomOffset.iterator().next();
System.out.println(firstEventAfterGivenTime.getSystemProperties().getEnqueuedTime());
TEST_LOGGER.log(Level.FINE, firstEventAfterGivenTime.getSystemProperties().getEnqueuedTime().toString());
Assert.assertTrue(firstEventAfterGivenTime.getSystemProperties().getOffset().
equals(nextEvent.getSystemProperties().getOffset()));
}