From 8a3d67bf6c7f25af1f277aebb31edf04d7ec743c Mon Sep 17 00:00:00 2001 From: vinaysurya Date: Wed, 31 Jul 2024 11:36:31 -0700 Subject: [PATCH] Fix query parameters when forming the RemoteConnectionURI so factory can be created correctly with the intended options. (#50) * Fix bug where Idletimeout and other amqp parameters are not being set correctly in azure-servicebus-jms library * Add more tests to test the remoteConnection URI in many cases --------- Co-authored-by: Vinay Suryanarayana --- .../jms/ServiceBusJmsConnectionFactory.java | 75 ++++++-- ...erviceBusJmsConnectionFactorySettings.java | 51 +++--- .../com/azure/servicebus/jms/StringUtil.java | 8 + .../jms/aad/TestInitialization.java | 2 + ...BusJMSConnectionFactoryUriBuilderTest.java | 169 ++++++++++++++++++ 5 files changed, 262 insertions(+), 43 deletions(-) create mode 100644 src/test/java/com/azure/servicebus/jms/connection/string/ServiceBusJMSConnectionFactoryUriBuilderTest.java diff --git a/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactory.java b/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactory.java index 594304c..fbed9ae 100644 --- a/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactory.java +++ b/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactory.java @@ -24,6 +24,7 @@ import jakarta.jms.TopicConnectionFactory; import org.apache.qpid.jms.JmsConnectionExtensions; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.policy.JmsPrefetchPolicy; import com.azure.core.credential.TokenCredential; import com.azure.servicebus.jms.jndi.JNDIStorable; @@ -50,6 +51,7 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn private String userName; private String password; private String host; + private String remoteConnectionUri; /** * Intended to be used by JNDI only. Users should not be actively calling this constructor to create a ServiceBusJmsConnectionFactory instance. @@ -145,16 +147,10 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn if (userName == null || password == null || host == null) { throw new IllegalArgumentException("Authentication settings and host cannot be null for a Service Bus connection factory."); } - - String destinationUri = "amqps://" + host; - if (this.settings.shouldReconnect()) { - destinationUri = getReconnectUri(destinationUri, this.settings); - } - - String serviceBusQuery = this.settings.getServiceBusQuery(); - destinationUri += serviceBusQuery; - this.factory = new JmsConnectionFactory(userName, password, destinationUri); - + + this.remoteConnectionUri = this.getServiceBusRemoteConnectionUri(); + this.factory = new JmsConnectionFactory(userName, password, this.remoteConnectionUri); + this.factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> { Map properties = new HashMap<>(); properties.put(ServiceBusJmsConnectionFactorySettings.IsClientProvider, true); @@ -228,6 +224,20 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn return this.settings; } + /* + * @return The RemoteUri set for this ConnectionFactory. + */ + public String getRemoteConnectionUri() { + return this.remoteConnectionUri; + } + + /* + * @return The PrefetchPolicy set for this ConnectionFactory. + */ + public JmsPrefetchPolicy getPrefetchPolicy() { + return this.factory.getPrefetchPolicy(); + } + @Override public Connection createConnection() throws JMSException { this.ensureInitialized(); @@ -365,26 +375,57 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn this.customUserAgent = customUserAgent; } - // Obtain the reconnect URI in the form that QPID could understand. + private String getServiceBusRemoteConnectionUri() + { + String hostUri = "amqps://" + this.host; + String amqpPerHostQuery = settings.getPerHostAmqpProviderQuery(); + + if (!amqpPerHostQuery.isEmpty()) { + hostUri += "?" + amqpPerHostQuery; + } + + String jmsProviderQuery = settings.getGlobalJMSProviderQuery(); + + String remoteConnectionUri; + if (this.settings.shouldReconnect()) { + String failoverUri = this.getFailoverUri(hostUri, amqpPerHostQuery, this.settings); + + // Append failover Provider options if any + String failoverOptionsQuery = settings.getGlobalFailoverProviderQuery(); + remoteConnectionUri = failoverUri + (!failoverOptionsQuery.isEmpty() ? "?" + failoverOptionsQuery : ""); + + //Append jmsProvider options if any + if (!jmsProviderQuery.isEmpty()) { + remoteConnectionUri += (!failoverOptionsQuery.isEmpty() ? "&" : "?") + jmsProviderQuery; + } + } + else { + remoteConnectionUri = hostUri + (!jmsProviderQuery.isEmpty() ? (amqpPerHostQuery.isEmpty() ? "?" : "&") + jmsProviderQuery : ""); + } + + return remoteConnectionUri; + } + + // Obtain the failover URI in the form that QPID could understand. // Example: failover:(amqps://contoso.servicebus.windows.net?amqp.idleTimeout=30000,amqps://contoso2.servicebus.windows.net?amqp.idleTimeout=30000)?failover.maxReconnectAttempts=20 - private String getReconnectUri(String originalHost, ServiceBusJmsConnectionFactorySettings settings) { + private String getFailoverUri(String hostUri, String amqpPerHostQuery, ServiceBusJmsConnectionFactorySettings settings) { StringBuilder builder = new StringBuilder("failover:("); - builder.append(originalHost); + builder.append(hostUri); String[] reconnectHosts = settings.getReconnectHosts(); if (settings.getReconnectHosts() != null) { - String serviceBusQuery = settings.getServiceBusQuery(); - for (String reconnectHost: reconnectHosts) { builder.append(","); builder.append("amqps://"); builder.append(reconnectHost); - builder.append(serviceBusQuery); + if (!amqpPerHostQuery.isEmpty()) { + builder.append("?"); + builder.append(amqpPerHostQuery); + } } } builder.append(")"); - builder.append(settings.getReconnectQuery()); return builder.toString(); } diff --git a/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactorySettings.java b/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactorySettings.java index 28a4b34..0e34331 100644 --- a/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactorySettings.java +++ b/src/main/java/com/azure/servicebus/jms/ServiceBusJmsConnectionFactorySettings.java @@ -314,16 +314,23 @@ public class ServiceBusJmsConnectionFactorySettings { this.reconnectAmqpOpenServerListAction = amqpOpenServerListAction; } - String getServiceBusQuery() { + String getPerHostAmqpProviderQuery() + { StringBuilder builder = new StringBuilder(); - if (connectionIdleTimeoutMS > 0) { - appendQuery(builder, "amqp.idleTimeout", String.valueOf(connectionIdleTimeoutMS)); + if (this.connectionIdleTimeoutMS > 0) { + StringUtil.appendQuery(builder, "amqp.idleTimeout", String.valueOf(this.connectionIdleTimeoutMS)); } - if (traceFrames) { - appendQuery(builder, "amqp.traceFrames", "true"); + if (this.traceFrames) { + StringUtil.appendQuery(builder, "amqp.traceFrames", "true"); } + return builder.toString(); + } + + String getGlobalJMSProviderQuery() { + StringBuilder builder = new StringBuilder(); + if (this.configurationOptions == null) { this.configurationOptions = new HashMap<>(); } @@ -333,68 +340,60 @@ public class ServiceBusJmsConnectionFactorySettings { for (String defaultOption : DefaultConfigurationOptions.keySet()) { configurationOptions.putIfAbsent(defaultOption, DefaultConfigurationOptions.get(defaultOption)); } - + for (String option : configurationOptions.keySet()) { - appendQuery(builder, option, configurationOptions.get(option)); + StringUtil.appendQuery(builder, option, configurationOptions.get(option)); } return builder.toString(); } - String getReconnectQuery() { + String getGlobalFailoverProviderQuery() { StringBuilder queryBuilder = new StringBuilder(); if (initialReconnectDelay != null) { - appendQuery(queryBuilder, "failover.initialReconnectDelay", String.valueOf(initialReconnectDelay)); + StringUtil.appendQuery(queryBuilder, "failover.initialReconnectDelay", String.valueOf(initialReconnectDelay)); } if (reconnectDelay != null) { - appendQuery(queryBuilder, "failover.reconnectDelay", String.valueOf(reconnectDelay)); + StringUtil.appendQuery(queryBuilder, "failover.reconnectDelay", String.valueOf(reconnectDelay)); } if (maxReconnectDelay != null) { - appendQuery(queryBuilder, "failover.maxReconnectDelay", String.valueOf(maxReconnectDelay)); + StringUtil.appendQuery(queryBuilder, "failover.maxReconnectDelay", String.valueOf(maxReconnectDelay)); } if (useReconnectBackOff != null) { - appendQuery(queryBuilder, "failover.useReconnectBackOff", String.valueOf(useReconnectBackOff)); + StringUtil.appendQuery(queryBuilder, "failover.useReconnectBackOff", String.valueOf(useReconnectBackOff)); } if (reconnectBackOffMultiplier != null) { - appendQuery(queryBuilder, "failover.reconnectBackOffMultiplier", String.valueOf(reconnectBackOffMultiplier)); + StringUtil.appendQuery(queryBuilder, "failover.reconnectBackOffMultiplier", String.valueOf(reconnectBackOffMultiplier)); } if (maxReconnectAttempts != null) { - appendQuery(queryBuilder, "failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts)); + StringUtil.appendQuery(queryBuilder, "failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts)); } if (startupMaxReconnectAttempts != null) { - appendQuery(queryBuilder, "failover.startupMaxReconnectAttempts", String.valueOf(startupMaxReconnectAttempts)); + StringUtil.appendQuery(queryBuilder, "failover.startupMaxReconnectAttempts", String.valueOf(startupMaxReconnectAttempts)); } if (warnAfterReconnectAttempts != null) { - appendQuery(queryBuilder, "failover.warnAfterReconnectAttempts", String.valueOf(warnAfterReconnectAttempts)); + StringUtil.appendQuery(queryBuilder, "failover.warnAfterReconnectAttempts", String.valueOf(warnAfterReconnectAttempts)); } if (reconnectRandomize != null) { - appendQuery(queryBuilder, "failover.randomize", String.valueOf(reconnectRandomize)); + StringUtil.appendQuery(queryBuilder, "failover.randomize", String.valueOf(reconnectRandomize)); } if (reconnectAmqpOpenServerListAction != null) { - appendQuery(queryBuilder, "failover.amqpOpenServerListAction", reconnectAmqpOpenServerListAction.name()); + StringUtil.appendQuery(queryBuilder, "failover.amqpOpenServerListAction", reconnectAmqpOpenServerListAction.name()); } return queryBuilder.toString(); } - private void appendQuery(StringBuilder builder, String key, String value) { - if (builder == null) { - builder = new StringBuilder(); - } - - builder.append((builder.length() == 0) ? "?" : "&").append(key).append("=").append(value); - } - private static Map getDefaultConfigurationOptions() { Map defaultConfigurationOptions = new HashMap<>(); defaultConfigurationOptions.put("jms.prefetchPolicy.all", "0"); diff --git a/src/main/java/com/azure/servicebus/jms/StringUtil.java b/src/main/java/com/azure/servicebus/jms/StringUtil.java index b734640..e73ce34 100644 --- a/src/main/java/com/azure/servicebus/jms/StringUtil.java +++ b/src/main/java/com/azure/servicebus/jms/StringUtil.java @@ -29,6 +29,14 @@ public final class StringUtil { return true; } + public static void appendQuery(StringBuilder builder, String key, String value) { + if (builder == null) { + builder = new StringBuilder(); + } + + builder.append((builder.length() == 0) ? "" : "&").append(key).append("=").append(value); + } + public static String getShortRandomString() { return getRandomString().substring(0, 6); } diff --git a/src/test/java/com/azure/servicebus/jms/aad/TestInitialization.java b/src/test/java/com/azure/servicebus/jms/aad/TestInitialization.java index 5dfbd3a..c127b40 100644 --- a/src/test/java/com/azure/servicebus/jms/aad/TestInitialization.java +++ b/src/test/java/com/azure/servicebus/jms/aad/TestInitialization.java @@ -74,6 +74,8 @@ public class TestInitialization // Common this.HOST = System.getenv("HOST_NAMESPACE"); this.SETTINGS = new ServiceBusJmsConnectionFactorySettings(); + // Setting this just to make sure when these options are specified the URI and factory gets formed correctly. + this.SETTINGS.setConnectionIdleTimeoutMS(62000); this.entityName = entityName; } diff --git a/src/test/java/com/azure/servicebus/jms/connection/string/ServiceBusJMSConnectionFactoryUriBuilderTest.java b/src/test/java/com/azure/servicebus/jms/connection/string/ServiceBusJMSConnectionFactoryUriBuilderTest.java new file mode 100644 index 0000000..67a3889 --- /dev/null +++ b/src/test/java/com/azure/servicebus/jms/connection/string/ServiceBusJMSConnectionFactoryUriBuilderTest.java @@ -0,0 +1,169 @@ +package com.azure.servicebus.jms.connection.string; + +import com.azure.servicebus.jms.ServiceBusJmsConnectionFactory; +import com.azure.servicebus.jms.ServiceBusJmsConnectionFactorySettings; +import java.util.HashMap; +import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; +import org.junit.jupiter.api.Test; + +public class ServiceBusJMSConnectionFactoryUriBuilderTest { + + @Test + public void testSingleHostNoOptions() { + Map configurationOptions = new HashMap<>(); + configurationOptions.put("jms.prefetchPolicy.all", "20"); + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings( + configurationOptions); + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + // Passing no options but failover is true by default + String expectedUri = "failover:(amqps://foo.servicebus.windows.net)?jms.prefetchPolicy.all=20"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testSingleHostWithJmsPrefetch() { + Map configurationOptions = new HashMap<>(); + configurationOptions.put("jms.prefetchPolicy.all", "20"); + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings( + configurationOptions); + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + // Passing no options but failover is true by default + String expectedUri = "failover:(amqps://foo.servicebus.windows.net)?jms.prefetchPolicy.all=20"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy(); + assertEquals(20, prefetchPolicy.getQueuePrefetch()); + } + + @Test + public void testSingleHostNoOptionsWithoutDefaultReconnect() { + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(); + settings.setShouldReconnect(false); + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + // Passing no options with reconnect disabled. + String expectedUri = "amqps://foo.servicebus.windows.net?jms.prefetchPolicy.all=0"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testSingleHostNoOptionsWithoutDefaultReconnectWithJMSPrefetch() { + Map configurationOptions = new HashMap<>(); + configurationOptions.put("jms.prefetchPolicy.all", "20"); + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings( + configurationOptions); + settings.setShouldReconnect(false); + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + // Passing no options with reconnect disabled. + String expectedUri = "amqps://foo.servicebus.windows.net?jms.prefetchPolicy.all=20"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testSingleHostWithAmqpOptionsWithDefaultReconnect() { + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(); + settings.setConnectionIdleTimeoutMS(20000); + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + String expectedUri = "failover:(amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000)?jms.prefetchPolicy.all=0"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testSingleHostWithAmqpOptionsWithoutDefaultReconnect() { + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(); + settings.setShouldReconnect(false); + settings.setConnectionIdleTimeoutMS(20000); + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + String expectedUri = "amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000&jms.prefetchPolicy.all=0"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testSingleHostWithAmqpOptionsWithDefaultReconnectAndFailoverOptions() { + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(); + settings.setConnectionIdleTimeoutMS(20000); + settings.setMaxReconnectAttempts(3); + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + String expectedUri = "failover:(amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000)?failover.maxReconnectAttempts=3&jms.prefetchPolicy.all=0"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testMultipleHostsWithAmqpOptionsWithDefaultReconnectAndFailoverOptions() { + String host = "foo.servicebus.windows.net"; + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(); + settings.setMaxReconnectAttempts(3); + settings.setConnectionIdleTimeoutMS(20000); + String[] reconnectHosts = { "bar.servicebus.windows.net" }; + settings.setReconnectHosts(reconnectHosts); + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + String expectedUri = "failover:(amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000,amqps://bar.servicebus.windows.net?amqp.idleTimeout=20000)?failover.maxReconnectAttempts=3&jms.prefetchPolicy.all=0"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + } + + @Test + public void testMultipleHostsWithJustJMSPrefetech() { + String host = "foo.servicebus.windows.net"; + Map configurationOptions = new HashMap<>(); + configurationOptions.put("jms.prefetchPolicy.all", "20"); + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings( + configurationOptions); + String[] reconnectHosts = { "bar.servicebus.windows.net" }; + settings.setReconnectHosts(reconnectHosts); + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + String expectedUri = "failover:(amqps://foo.servicebus.windows.net,amqps://bar.servicebus.windows.net)?jms.prefetchPolicy.all=20"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy(); + assertEquals(20, prefetchPolicy.getQueuePrefetch()); + } + + @Test + public void testMultipleHostsWithAmqpOptionsWithoutDefaultReconnectAndFailoverOptions() { + String host = "foo.servicebus.windows.net"; + Map configurationOptions = new HashMap<>(); + configurationOptions.put("jms.prefetchPolicy.all", "20"); + ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings( + configurationOptions); + settings.setShouldReconnect(false); + settings.setConnectionIdleTimeoutMS(20000); + // Even though options like maxReconnectAttempts and reconnect hosts are + // specified, since reconnect is set to false, there should be no failover + // options + // added and only AmqpOptions and JMSProviderOptions should be formed. + settings.setMaxReconnectAttempts(3); + String[] reconnectHosts = { "bar.servicebus.windows.net" }; + settings.setReconnectHosts(reconnectHosts); + ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings); + + String expectedUri = "amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000&jms.prefetchPolicy.all=20"; + String actualUri = factory.getRemoteConnectionUri(); + assertEquals(expectedUri, actualUri); + JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy(); + assertEquals(20, prefetchPolicy.getQueuePrefetch()); + } +}