* Adding a large message test.
Adding duplicate tests for partitioned entities.
Minor tweaks to make message handler tests more reliable.

* Invalid merge.

* Removed merge conflict files

* Some test fixes.

* Adding ARM deployment template for CI (#218)

build/azuredeploy.json

* Test reliability fixes.
And RequestResponseLimitTest is modified to drastically reduce its runtime.

* More test reliability fixes.

* More tweaks.
This commit is contained in:
Vijaya Gopal Yarramneni 2018-11-30 15:34:15 -08:00 коммит произвёл GitHub
Родитель e866ccd8ca
Коммит 12e68c1cae
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 385 добавлений и 51 удалений

Просмотреть файл

@ -41,6 +41,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20</version>
<configuration>
<forkCount>0</forkCount> <!-- Speeds up test execution using multiple JVMs. Set it to 0 if you want to run test sequentially in the build vm itself -->
<reuseForks>true</reuseForks>
</configuration>
</plugin>
</plugins>
</build>

Просмотреть файл

@ -378,7 +378,7 @@ public class Util
}
catch(BufferOverflowException exception)
{
throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", length / 1024), exception);
throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s KB", length / 1024), exception);
}
}

Просмотреть файл

@ -186,7 +186,7 @@ public class ClientValidationTests
}
}, MessageAndSessionPumpTests.EXECUTOR_SERVICE);
Thread.sleep(1000); // Sleep for a second for the exception
Thread.sleep(2000); // Sleep for two seconds for the exception
Assert.assertTrue("QueueClient created to a subscription which shouldn't be allowed.", unsupportedExceptionOccured.get());
} finally {
qc.close();
@ -224,7 +224,7 @@ public class ClientValidationTests
}
}, MessageAndSessionPumpTests.EXECUTOR_SERVICE);
Thread.sleep(1000); // Sleep for a second for the exception
Thread.sleep(2000); // Sleep for two seconds for the exception
Assert.assertTrue("SubscriptionClient created to a queue which shouldn't be allowed.", unsupportedExceptionOccured.get());
} finally {
sc.close();

Просмотреть файл

@ -90,7 +90,7 @@ public class MessageAndSessionPumpTests {
sender.send(new Message("AMQPMessage"));
}
boolean autoComplete = true;
int sleepMinutes = 1; // This should be less than message lock duration of the queue or subscription
int sleepMinutes = 1; // This should be more than message lock duration of the queue or subscription
CountingMessageHandler messageHandler = new CountingMessageHandler(messagePump, !autoComplete, numMessages, false, Duration.ofMinutes(sleepMinutes));
messagePump.registerMessageHandler(messageHandler, new MessageHandlerOptions(numMessages, autoComplete, Duration.ofMinutes(10)), EXECUTOR_SERVICE);
int waitMinutes = 2 * sleepMinutes;
@ -309,7 +309,8 @@ public class MessageAndSessionPumpTests {
CountingMessageHandler(IMessageAndSessionPump messagePump, boolean completeMessages, int messageCount, boolean firstThrowException)
{
this(messagePump, completeMessages, messageCount, firstThrowException, Duration.ZERO);
// Let's make every onMessage sleep for sometime so we can reliably count max concurrent threads
this(messagePump, completeMessages, messageCount, firstThrowException, Duration.ofSeconds(2));
}
CountingMessageHandler(IMessageAndSessionPump messagePump, boolean completeMessages, int messageCount, boolean firstThrowException, Duration sleepDuration)
@ -391,7 +392,8 @@ public class MessageAndSessionPumpTests {
CountingSessionHandler(IMessageAndSessionPump sessionPump, boolean completeMessages, int totalMessageCount, boolean firstThrowException)
{
this(sessionPump, completeMessages, totalMessageCount, firstThrowException, Duration.ZERO);
// Let's make every onMessage sleep for sometime so we can reliably count max concurrent threads
this(sessionPump, completeMessages, totalMessageCount, firstThrowException, Duration.ofSeconds(2));
}
CountingSessionHandler(IMessageAndSessionPump sessionPump, boolean completeMessages, int totalMessageCount, boolean firstThrowException, Duration sleepDuration)

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.servicebus;
public class PartitionedQueueClientSessionTests extends QueueClientSessionTests{
@Override
public String getEntityNamePrefix() {
return "PartitionedQueueClientSessionTests";
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
@Override
public boolean isEntityPartitioned() {
return true;
}
}

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.servicebus;
public class PartitionedQueueClientTests extends QueueClientTests{
@Override
public String getEntityNamePrefix() {
return "PartitionedQueueClientTests";
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
@Override
public boolean isEntityPartitioned() {
return true;
}
}

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.servicebus;
public class PartitionedQueueSendReceiveTests extends QueueSendReceiveTests{
@Override
public String getEntityNamePrefix() {
return "PartitionedQueueSendReceiveTests";
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
@Override
public boolean isEntityPartitioned() {
return true;
}
}

Просмотреть файл

@ -0,0 +1,19 @@
package com.microsoft.azure.servicebus;
public class PartitionedQueueSessionTests extends QueueSessionTests
{
@Override
public String getEntityNamePrefix() {
return "PartitionedQueueSessionTests";
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
@Override
public boolean isEntityPartitioned() {
return true;
}
}

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.servicebus;
public class PartitionedSubscriptionClientSessionTests extends SubscriptionClientSessionTests{
@Override
public String getEntityNamePrefix() {
return "PartitionedSubscriptionClientSessionTests";
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
@Override
public boolean isEntityPartitioned() {
return true;
}
}

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.servicebus;
public class PartitionedSubscriptionClientTests extends SubscriptionClientTests{
@Override
public String getEntityNamePrefix() {
return "PartitionedSubscriptionClientTests";
}
@Override
public boolean isEntityPartitioned() {
return true;
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
}

Просмотреть файл

@ -0,0 +1,19 @@
package com.microsoft.azure.servicebus;
public class PartitionedTopicSendReceiveTests extends TopicSendReceiveTests {
@Override
public String getEntityNamePrefix() {
return "PartitionedTopicSendReceiveTests";
}
@Override
public boolean isEntityPartitioned() {
return true;
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
}

Просмотреть файл

@ -0,0 +1,19 @@
package com.microsoft.azure.servicebus;
public class PartitionedTopicSessionTests extends TopicSessionTests {
@Override
public String getEntityNamePrefix() {
return "PartitionedTopicSessionTests";
}
@Override
public boolean isEntityPartitioned() {
return true;
}
@Override
public boolean shouldCreateEntityForEveryTest() {
return TestUtils.shouldCreateEntityForEveryTest();
}
}

Просмотреть файл

@ -117,11 +117,18 @@ public abstract class SendReceiveTests extends Tests {
TestCommons.testBasicReceiveAndDelete(this.sender, this.sessionId, this.receiver);
}
@Test
public void testBasicReceiveAndDeleteWithLargeMessage() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
TestCommons.testBasicReceiveAndDeleteWithLargeMessage(this.sender, this.sessionId, this.receiver);
}
@Test
public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver);
TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
@ -156,14 +163,14 @@ public abstract class SendReceiveTests extends Tests {
public void testBasicReceiveAndRenewLockBatch() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK);
TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver);
TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
public void testBasicReceiveBatchAndComplete() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK);
TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver);
TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
@ -191,7 +198,7 @@ public abstract class SendReceiveTests extends Tests {
public void testPeekMessageBatch() throws InterruptedException, ServiceBusException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK);
TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver);
TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test

Просмотреть файл

@ -3,6 +3,7 @@ package com.microsoft.azure.servicebus;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -125,12 +126,20 @@ public abstract class SessionTests extends Tests {
TestCommons.testBasicReceiveAndDelete(this.sender, sessionId, this.session);
}
@Test
public void testBasicReceiveAndDeleteWithLargeMessage() throws InterruptedException, ServiceBusException, ExecutionException
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
TestCommons.testBasicReceiveAndDeleteWithLargeMessage(this.sender, sessionId, this.session);
}
@Test
public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, ExecutionException
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session);
TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session, this.isEntityPartitioned());
}
@Test
@ -162,7 +171,7 @@ public abstract class SessionTests extends Tests {
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.PEEKLOCK);
TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session);
TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session, this.isEntityPartitioned());
}
@Test
@ -194,7 +203,7 @@ public abstract class SessionTests extends Tests {
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.PEEKLOCK);
TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session);
TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session, this.isEntityPartitioned());
}
@Test
@ -305,16 +314,20 @@ public abstract class SessionTests extends Tests {
}
@Test
public void testRequestResponseLinkRequestLimit() throws InterruptedException, ServiceBusException
public void testRequestResponseLinkRequestLimit() throws InterruptedException, ServiceBusException, ExecutionException
{
int limitToTest = 5000;
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, sessionId, TestUtils.getClientSettings(), ReceiveMode.PEEKLOCK);
CompletableFuture[] futures = new CompletableFuture[limitToTest];
for(int i=0; i<limitToTest; i++)
{
this.session.renewSessionLock();
CompletableFuture<Void> future = this.session.renewSessionLockAsync();
futures[i] = future;
}
CompletableFuture.allOf(futures).get();
this.session.renewSessionLock();
}

Просмотреть файл

@ -8,7 +8,7 @@ public class SubscriptionClientSessionTests extends ClientSessionTests{
@Override
public boolean isEntityQueue() {
return true;
return false;
}
@Override

Просмотреть файл

@ -40,14 +40,29 @@ public class TestCommons {
}
public static void testBasicReceiveAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
{
testBasicReceiveAndDelete(sender, sessionId, receiver, 64);
}
public static void testBasicReceiveAndDeleteWithLargeMessage(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
{
testBasicReceiveAndDelete(sender, sessionId, receiver, 64 * 1024);
}
private static void testBasicReceiveAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver, int messageSize) throws InterruptedException, ServiceBusException, ExecutionException
{
String messageId = UUID.randomUUID().toString();
Message message = new Message("AMQP message");
Message message = new Message();
message.setMessageId(messageId);
if(sessionId != null)
{
message.setSessionId(sessionId);
}
byte[] body = new byte[messageSize];
Arrays.fill(body, (byte)127);
message.setBody(body);
sender.send(message);
IMessage receivedMessage = receiver.receive();
@ -57,9 +72,25 @@ public class TestCommons {
Assert.assertNull("Message received again", receivedMessage);
}
public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException
{
int numMessages = 10;
if(isEntityPartitioned)
{
for(int i=0; i<numMessages; i++)
{
Message message = new Message("AMQP message");
if(sessionId != null)
{
message.setSessionId(sessionId);
}
sender.send(message);
}
}
else
{
// Keep this batch send part as this test sendBatch API call
List<Message> messages = new ArrayList<Message>();
for(int i=0; i<numMessages; i++)
{
@ -71,6 +102,8 @@ public class TestCommons {
messages.add(message);
}
sender.sendBatch(messages);
}
int totalReceivedMessages = 0;
Collection<IMessage> receivedMessages = receiver.receiveBatch(numMessages);
@ -168,9 +201,24 @@ public class TestCommons {
receiver.complete(receivedMessage.getLockToken());
}
public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException
{
int numMessages = 10;
if(isEntityPartitioned)
{
for(int i=0; i<numMessages; i++)
{
Message message = new Message("AMQP message");
if(sessionId != null)
{
message.setSessionId(sessionId);
}
sender.send(message);
}
}
else
{
// Keep this batch send part as this test sendBatch API call
List<Message> messages = new ArrayList<Message>();
for(int i=0; i<numMessages; i++)
{
@ -182,16 +230,24 @@ public class TestCommons {
messages.add(message);
}
sender.sendBatch(messages);
}
ArrayList<IMessage> totalReceivedMessages = new ArrayList<>();
Collection<IMessage> receivedMessages = receiver.receiveBatch(numMessages);
if(isEntityPartitioned)
{
Assert.assertTrue("Messages not received", receivedMessages != null && receivedMessages.size() > 0);
}
else
{
while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages)
{
totalReceivedMessages.addAll(receivedMessages);
receivedMessages = receiver.receiveBatch(numMessages);
}
Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size());
}
ArrayList<Instant> oldLockTimes = new ArrayList<Instant>();
for(IMessage message : totalReceivedMessages)
@ -214,9 +270,24 @@ public class TestCommons {
}
}
public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException
{
int numMessages = 10;
if(isEntityPartitioned)
{
for(int i=0; i<numMessages; i++)
{
Message message = new Message("AMQP message");
if(sessionId != null)
{
message.setSessionId(sessionId);
}
sender.send(message);
}
}
else
{
// Keep this batch send part as this test sendBatch API call
List<Message> messages = new ArrayList<Message>();
for(int i=0; i<numMessages; i++)
{
@ -228,6 +299,7 @@ public class TestCommons {
messages.add(message);
}
sender.sendBatch(messages);
}
int totalMessagesReceived = 0;
Collection<IMessage> receivedMessages = receiver.receiveBatch(numMessages);
@ -264,7 +336,16 @@ public class TestCommons {
sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
if(sessionId == null)
{
Thread.sleep(secondsToWaitBeforeScheduling * 1000 * 2);
}
else
{
Thread.sleep(secondsToWaitBeforeScheduling * 1000);
((IMessageSession)receiver).renewSessionLock();
Thread.sleep(secondsToWaitBeforeScheduling * 1000);
}
Collection<IMessage> allReceivedMessages = new LinkedList<IMessage>();
Collection<IMessage> receivedMessages = receiver.receiveBatch(10);
@ -306,7 +387,16 @@ public class TestCommons {
sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
long sequnceNumberMsg2 = sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
sender.cancelScheduledMessage(sequnceNumberMsg2);
if(sessionId == null)
{
Thread.sleep(secondsToWaitBeforeScheduling * 1000 * 2);
}
else
{
Thread.sleep(secondsToWaitBeforeScheduling * 1000);
((IMessageSession)receiver).renewSessionLock();
Thread.sleep(secondsToWaitBeforeScheduling * 1000);
}
Collection<IMessage> allReceivedMessages = new LinkedList<IMessage>();
Collection<IMessage> receivedMessages = receiver.receiveBatch(10);
@ -345,19 +435,34 @@ public class TestCommons {
Assert.assertEquals("Peek with sequence number failed.", firstMessageSequenceNumber, peekedMessage5.getSequenceNumber());
}
public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser) throws InterruptedException, ServiceBusException
public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException
{
String partitionKey = "pkey1";
Message message = new Message("AMQP Scheduled message");
if(sessionId != null)
{
message.setSessionId(sessionId);
}
else
{
if(isEntityPartitioned)
{
message.setPartitionKey(partitionKey);
}
}
sender.send(message);
message = new Message("AMQP Scheduled message2");
if(sessionId != null)
{
message.setSessionId(sessionId);
}
else
{
if(isEntityPartitioned)
{
message.setPartitionKey(partitionKey);
}
}
sender.send(message);
Thread.sleep(5000);
Collection<IMessage> peekedMessages = browser.peekBatch(10);

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.servicebus.primitives;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ConnectionStringBuilderTests {
@Test
public void ConnectionStringBuilderTest() {
String connectionString = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessSignatureToken=SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic";
ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
assertEquals("SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic", builder.getSharedAccessSignatureToken());
assertEquals(connectionString, builder.toString());
}
}

38
build/azuredeploy.json Normal file
Просмотреть файл

@ -0,0 +1,38 @@
{
"$schema": "http://schema.management.azure.com/schemas/2014-04-01-preview/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"serviceBusNamespaceName": {
"type": "string",
"metadata": {
"description": "Name of the Service Bus namespace"
}
}
},
"variables": {
"location": "[resourceGroup().location]",
"sbVersion": "2015-08-01",
"defaultSASKeyName": "RootManageSharedAccessKey",
"authRuleResourceId": "[resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', parameters('serviceBusNamespaceName'), variables('defaultSASKeyName'))]"
},
"resources": [
{
"apiVersion": "[variables('sbVersion')]",
"name": "[parameters('serviceBusNamespaceName')]",
"type": "Microsoft.ServiceBus/Namespaces",
"location": "[variables('location')]",
"sku": {
"name": "Standard",
"tier": "Standard"
},
"resources": [
]
}
],
"outputs": {
"NamespaceConnectionString": {
"type": "string",
"value": "[listkeys(variables('authRuleResourceId'), variables('sbVersion')).primaryConnectionString]"
}
}
}