Fix query parameters when forming the RemoteConnectionURI so factory can be created correctly with the intended options. (#50)

* Fix bug where Idletimeout and other amqp parameters are not being set correctly in azure-servicebus-jms library

* Add more tests to test the remoteConnection URI in many cases

---------

Co-authored-by: Vinay Suryanarayana <vinsu@microsoft.com>
This commit is contained in:
vinaysurya 2024-07-31 11:36:31 -07:00 коммит произвёл GitHub
Родитель 2417857ea1
Коммит 8a3d67bf6c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 262 добавлений и 43 удалений

Просмотреть файл

@ -24,6 +24,7 @@ import jakarta.jms.TopicConnectionFactory;
import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import com.azure.core.credential.TokenCredential;
import com.azure.servicebus.jms.jndi.JNDIStorable;
@ -50,6 +51,7 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn
private String userName;
private String password;
private String host;
private String remoteConnectionUri;
/**
* Intended to be used by JNDI only. Users should not be actively calling this constructor to create a ServiceBusJmsConnectionFactory instance.
@ -145,16 +147,10 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn
if (userName == null || password == null || host == null) {
throw new IllegalArgumentException("Authentication settings and host cannot be null for a Service Bus connection factory.");
}
String destinationUri = "amqps://" + host;
if (this.settings.shouldReconnect()) {
destinationUri = getReconnectUri(destinationUri, this.settings);
}
String serviceBusQuery = this.settings.getServiceBusQuery();
destinationUri += serviceBusQuery;
this.factory = new JmsConnectionFactory(userName, password, destinationUri);
this.remoteConnectionUri = this.getServiceBusRemoteConnectionUri();
this.factory = new JmsConnectionFactory(userName, password, this.remoteConnectionUri);
this.factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
Map<String, Object> properties = new HashMap<>();
properties.put(ServiceBusJmsConnectionFactorySettings.IsClientProvider, true);
@ -228,6 +224,20 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn
return this.settings;
}
/*
* @return The RemoteUri set for this ConnectionFactory.
*/
public String getRemoteConnectionUri() {
return this.remoteConnectionUri;
}
/*
* @return The PrefetchPolicy set for this ConnectionFactory.
*/
public JmsPrefetchPolicy getPrefetchPolicy() {
return this.factory.getPrefetchPolicy();
}
@Override
public Connection createConnection() throws JMSException {
this.ensureInitialized();
@ -365,26 +375,57 @@ public class ServiceBusJmsConnectionFactory extends JNDIStorable implements Conn
this.customUserAgent = customUserAgent;
}
// Obtain the reconnect URI in the form that QPID could understand.
private String getServiceBusRemoteConnectionUri()
{
String hostUri = "amqps://" + this.host;
String amqpPerHostQuery = settings.getPerHostAmqpProviderQuery();
if (!amqpPerHostQuery.isEmpty()) {
hostUri += "?" + amqpPerHostQuery;
}
String jmsProviderQuery = settings.getGlobalJMSProviderQuery();
String remoteConnectionUri;
if (this.settings.shouldReconnect()) {
String failoverUri = this.getFailoverUri(hostUri, amqpPerHostQuery, this.settings);
// Append failover Provider options if any
String failoverOptionsQuery = settings.getGlobalFailoverProviderQuery();
remoteConnectionUri = failoverUri + (!failoverOptionsQuery.isEmpty() ? "?" + failoverOptionsQuery : "");
//Append jmsProvider options if any
if (!jmsProviderQuery.isEmpty()) {
remoteConnectionUri += (!failoverOptionsQuery.isEmpty() ? "&" : "?") + jmsProviderQuery;
}
}
else {
remoteConnectionUri = hostUri + (!jmsProviderQuery.isEmpty() ? (amqpPerHostQuery.isEmpty() ? "?" : "&") + jmsProviderQuery : "");
}
return remoteConnectionUri;
}
// Obtain the failover URI in the form that QPID could understand.
// Example: failover:(amqps://contoso.servicebus.windows.net?amqp.idleTimeout=30000,amqps://contoso2.servicebus.windows.net?amqp.idleTimeout=30000)?failover.maxReconnectAttempts=20
private String getReconnectUri(String originalHost, ServiceBusJmsConnectionFactorySettings settings) {
private String getFailoverUri(String hostUri, String amqpPerHostQuery, ServiceBusJmsConnectionFactorySettings settings) {
StringBuilder builder = new StringBuilder("failover:(");
builder.append(originalHost);
builder.append(hostUri);
String[] reconnectHosts = settings.getReconnectHosts();
if (settings.getReconnectHosts() != null) {
String serviceBusQuery = settings.getServiceBusQuery();
for (String reconnectHost: reconnectHosts) {
builder.append(",");
builder.append("amqps://");
builder.append(reconnectHost);
builder.append(serviceBusQuery);
if (!amqpPerHostQuery.isEmpty()) {
builder.append("?");
builder.append(amqpPerHostQuery);
}
}
}
builder.append(")");
builder.append(settings.getReconnectQuery());
return builder.toString();
}

Просмотреть файл

@ -314,16 +314,23 @@ public class ServiceBusJmsConnectionFactorySettings {
this.reconnectAmqpOpenServerListAction = amqpOpenServerListAction;
}
String getServiceBusQuery() {
String getPerHostAmqpProviderQuery()
{
StringBuilder builder = new StringBuilder();
if (connectionIdleTimeoutMS > 0) {
appendQuery(builder, "amqp.idleTimeout", String.valueOf(connectionIdleTimeoutMS));
if (this.connectionIdleTimeoutMS > 0) {
StringUtil.appendQuery(builder, "amqp.idleTimeout", String.valueOf(this.connectionIdleTimeoutMS));
}
if (traceFrames) {
appendQuery(builder, "amqp.traceFrames", "true");
if (this.traceFrames) {
StringUtil.appendQuery(builder, "amqp.traceFrames", "true");
}
return builder.toString();
}
String getGlobalJMSProviderQuery() {
StringBuilder builder = new StringBuilder();
if (this.configurationOptions == null) {
this.configurationOptions = new HashMap<>();
}
@ -333,68 +340,60 @@ public class ServiceBusJmsConnectionFactorySettings {
for (String defaultOption : DefaultConfigurationOptions.keySet()) {
configurationOptions.putIfAbsent(defaultOption, DefaultConfigurationOptions.get(defaultOption));
}
for (String option : configurationOptions.keySet()) {
appendQuery(builder, option, configurationOptions.get(option));
StringUtil.appendQuery(builder, option, configurationOptions.get(option));
}
return builder.toString();
}
String getReconnectQuery() {
String getGlobalFailoverProviderQuery() {
StringBuilder queryBuilder = new StringBuilder();
if (initialReconnectDelay != null) {
appendQuery(queryBuilder, "failover.initialReconnectDelay", String.valueOf(initialReconnectDelay));
StringUtil.appendQuery(queryBuilder, "failover.initialReconnectDelay", String.valueOf(initialReconnectDelay));
}
if (reconnectDelay != null) {
appendQuery(queryBuilder, "failover.reconnectDelay", String.valueOf(reconnectDelay));
StringUtil.appendQuery(queryBuilder, "failover.reconnectDelay", String.valueOf(reconnectDelay));
}
if (maxReconnectDelay != null) {
appendQuery(queryBuilder, "failover.maxReconnectDelay", String.valueOf(maxReconnectDelay));
StringUtil.appendQuery(queryBuilder, "failover.maxReconnectDelay", String.valueOf(maxReconnectDelay));
}
if (useReconnectBackOff != null) {
appendQuery(queryBuilder, "failover.useReconnectBackOff", String.valueOf(useReconnectBackOff));
StringUtil.appendQuery(queryBuilder, "failover.useReconnectBackOff", String.valueOf(useReconnectBackOff));
}
if (reconnectBackOffMultiplier != null) {
appendQuery(queryBuilder, "failover.reconnectBackOffMultiplier", String.valueOf(reconnectBackOffMultiplier));
StringUtil.appendQuery(queryBuilder, "failover.reconnectBackOffMultiplier", String.valueOf(reconnectBackOffMultiplier));
}
if (maxReconnectAttempts != null) {
appendQuery(queryBuilder, "failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts));
StringUtil.appendQuery(queryBuilder, "failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts));
}
if (startupMaxReconnectAttempts != null) {
appendQuery(queryBuilder, "failover.startupMaxReconnectAttempts", String.valueOf(startupMaxReconnectAttempts));
StringUtil.appendQuery(queryBuilder, "failover.startupMaxReconnectAttempts", String.valueOf(startupMaxReconnectAttempts));
}
if (warnAfterReconnectAttempts != null) {
appendQuery(queryBuilder, "failover.warnAfterReconnectAttempts", String.valueOf(warnAfterReconnectAttempts));
StringUtil.appendQuery(queryBuilder, "failover.warnAfterReconnectAttempts", String.valueOf(warnAfterReconnectAttempts));
}
if (reconnectRandomize != null) {
appendQuery(queryBuilder, "failover.randomize", String.valueOf(reconnectRandomize));
StringUtil.appendQuery(queryBuilder, "failover.randomize", String.valueOf(reconnectRandomize));
}
if (reconnectAmqpOpenServerListAction != null) {
appendQuery(queryBuilder, "failover.amqpOpenServerListAction", reconnectAmqpOpenServerListAction.name());
StringUtil.appendQuery(queryBuilder, "failover.amqpOpenServerListAction", reconnectAmqpOpenServerListAction.name());
}
return queryBuilder.toString();
}
private void appendQuery(StringBuilder builder, String key, String value) {
if (builder == null) {
builder = new StringBuilder();
}
builder.append((builder.length() == 0) ? "?" : "&").append(key).append("=").append(value);
}
private static Map<String, String> getDefaultConfigurationOptions() {
Map<String, String> defaultConfigurationOptions = new HashMap<>();
defaultConfigurationOptions.put("jms.prefetchPolicy.all", "0");

Просмотреть файл

@ -29,6 +29,14 @@ public final class StringUtil {
return true;
}
public static void appendQuery(StringBuilder builder, String key, String value) {
if (builder == null) {
builder = new StringBuilder();
}
builder.append((builder.length() == 0) ? "" : "&").append(key).append("=").append(value);
}
public static String getShortRandomString() {
return getRandomString().substring(0, 6);
}

Просмотреть файл

@ -74,6 +74,8 @@ public class TestInitialization
// Common
this.HOST = System.getenv("HOST_NAMESPACE");
this.SETTINGS = new ServiceBusJmsConnectionFactorySettings();
// Setting this just to make sure when these options are specified the URI and factory gets formed correctly.
this.SETTINGS.setConnectionIdleTimeoutMS(62000);
this.entityName = entityName;
}

Просмотреть файл

@ -0,0 +1,169 @@
package com.azure.servicebus.jms.connection.string;
import com.azure.servicebus.jms.ServiceBusJmsConnectionFactory;
import com.azure.servicebus.jms.ServiceBusJmsConnectionFactorySettings;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.junit.jupiter.api.Test;
public class ServiceBusJMSConnectionFactoryUriBuilderTest {
@Test
public void testSingleHostNoOptions() {
Map<String, String> configurationOptions = new HashMap<>();
configurationOptions.put("jms.prefetchPolicy.all", "20");
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(
configurationOptions);
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
// Passing no options but failover is true by default
String expectedUri = "failover:(amqps://foo.servicebus.windows.net)?jms.prefetchPolicy.all=20";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testSingleHostWithJmsPrefetch() {
Map<String, String> configurationOptions = new HashMap<>();
configurationOptions.put("jms.prefetchPolicy.all", "20");
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(
configurationOptions);
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
// Passing no options but failover is true by default
String expectedUri = "failover:(amqps://foo.servicebus.windows.net)?jms.prefetchPolicy.all=20";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy();
assertEquals(20, prefetchPolicy.getQueuePrefetch());
}
@Test
public void testSingleHostNoOptionsWithoutDefaultReconnect() {
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings();
settings.setShouldReconnect(false);
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
// Passing no options with reconnect disabled.
String expectedUri = "amqps://foo.servicebus.windows.net?jms.prefetchPolicy.all=0";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testSingleHostNoOptionsWithoutDefaultReconnectWithJMSPrefetch() {
Map<String, String> configurationOptions = new HashMap<>();
configurationOptions.put("jms.prefetchPolicy.all", "20");
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(
configurationOptions);
settings.setShouldReconnect(false);
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
// Passing no options with reconnect disabled.
String expectedUri = "amqps://foo.servicebus.windows.net?jms.prefetchPolicy.all=20";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testSingleHostWithAmqpOptionsWithDefaultReconnect() {
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings();
settings.setConnectionIdleTimeoutMS(20000);
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
String expectedUri = "failover:(amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000)?jms.prefetchPolicy.all=0";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testSingleHostWithAmqpOptionsWithoutDefaultReconnect() {
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings();
settings.setShouldReconnect(false);
settings.setConnectionIdleTimeoutMS(20000);
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
String expectedUri = "amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000&jms.prefetchPolicy.all=0";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testSingleHostWithAmqpOptionsWithDefaultReconnectAndFailoverOptions() {
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings();
settings.setConnectionIdleTimeoutMS(20000);
settings.setMaxReconnectAttempts(3);
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
String expectedUri = "failover:(amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000)?failover.maxReconnectAttempts=3&jms.prefetchPolicy.all=0";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testMultipleHostsWithAmqpOptionsWithDefaultReconnectAndFailoverOptions() {
String host = "foo.servicebus.windows.net";
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings();
settings.setMaxReconnectAttempts(3);
settings.setConnectionIdleTimeoutMS(20000);
String[] reconnectHosts = { "bar.servicebus.windows.net" };
settings.setReconnectHosts(reconnectHosts);
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
String expectedUri = "failover:(amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000,amqps://bar.servicebus.windows.net?amqp.idleTimeout=20000)?failover.maxReconnectAttempts=3&jms.prefetchPolicy.all=0";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
}
@Test
public void testMultipleHostsWithJustJMSPrefetech() {
String host = "foo.servicebus.windows.net";
Map<String, String> configurationOptions = new HashMap<>();
configurationOptions.put("jms.prefetchPolicy.all", "20");
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(
configurationOptions);
String[] reconnectHosts = { "bar.servicebus.windows.net" };
settings.setReconnectHosts(reconnectHosts);
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
String expectedUri = "failover:(amqps://foo.servicebus.windows.net,amqps://bar.servicebus.windows.net)?jms.prefetchPolicy.all=20";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy();
assertEquals(20, prefetchPolicy.getQueuePrefetch());
}
@Test
public void testMultipleHostsWithAmqpOptionsWithoutDefaultReconnectAndFailoverOptions() {
String host = "foo.servicebus.windows.net";
Map<String, String> configurationOptions = new HashMap<>();
configurationOptions.put("jms.prefetchPolicy.all", "20");
ServiceBusJmsConnectionFactorySettings settings = new ServiceBusJmsConnectionFactorySettings(
configurationOptions);
settings.setShouldReconnect(false);
settings.setConnectionIdleTimeoutMS(20000);
// Even though options like maxReconnectAttempts and reconnect hosts are
// specified, since reconnect is set to false, there should be no failover
// options
// added and only AmqpOptions and JMSProviderOptions should be formed.
settings.setMaxReconnectAttempts(3);
String[] reconnectHosts = { "bar.servicebus.windows.net" };
settings.setReconnectHosts(reconnectHosts);
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory("user", "pass", host, settings);
String expectedUri = "amqps://foo.servicebus.windows.net?amqp.idleTimeout=20000&jms.prefetchPolicy.all=20";
String actualUri = factory.getRemoteConnectionUri();
assertEquals(expectedUri, actualUri);
JmsDefaultPrefetchPolicy prefetchPolicy = (JmsDefaultPrefetchPolicy) factory.getPrefetchPolicy();
assertEquals(20, prefetchPolicy.getQueuePrefetch());
}
}