Merge pull request #44 from Microsoft/servicebus

Add service bus topic and queue send and subscribe operation
This commit is contained in:
Warren Zhu 2018-07-06 11:31:59 +08:00 коммит произвёл GitHub
Родитель a18b4ffe3d 11a30e6892
Коммит b6896efe89
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 849 добавлений и 0 удалений

Просмотреть файл

@ -23,6 +23,11 @@
<artifactId>azure-eventhubs-eph</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>

Просмотреть файл

@ -0,0 +1,44 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.NonNull;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* Default implementation of {@link IMessageHandler} to handle Service Bus {@link IMessage}
*
* @author Warren Zhu
*/
public class ServiceBusMessageHandler implements IMessageHandler {
private static final Log LOGGER = LogFactory.getLog(ServiceBusMessageHandler.class);
private final Set<Consumer<Iterable<IMessage>>> consumers;
public ServiceBusMessageHandler(@NonNull Set<Consumer<Iterable<IMessage>>> consumers) {
this.consumers = consumers;
}
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
consumers.forEach(c -> c.accept(Collections.singleton(message)));
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
LOGGER.error(String.format("Exception encountered in phase %s", phase), exception);
}
}

Просмотреть файл

@ -0,0 +1,25 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus;
import org.springframework.core.NestedRuntimeException;
/**
* The Azure Service Bus specific {@link NestedRuntimeException}.
*
* @author Warren Zhu
*/
public class ServiceBusRuntimeException extends NestedRuntimeException {
public ServiceBusRuntimeException(String msg) {
super(msg);
}
public ServiceBusRuntimeException(String msg, Throwable cause) {
super(msg, cause);
}
}

Просмотреть файл

@ -0,0 +1,59 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.spring.integration.core.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.SendOperation;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusSenderFactory;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.concurrent.CompletableFuture;
/**
* Azure service bus send template to support send {@link IMessage} asynchronously
*
* @author Warren Zhu
*/
public class ServiceBusSendTemplate<T extends ServiceBusSenderFactory> implements SendOperation<IMessage> {
protected final T senderFactory;
public ServiceBusSendTemplate(@NonNull T senderFactory) {
this.senderFactory = senderFactory;
}
@Override
public CompletableFuture<Void> sendAsync(String destination, @NonNull IMessage message,
PartitionSupplier partitionSupplier) {
Assert.hasText(destination, "destination can't be null or empty");
String partitionKey = getPartitionKey(partitionSupplier);
if (StringUtils.hasText(partitionKey)) {
message.setPartitionKey(partitionKey);
}
return this.senderFactory.getSenderCreator().apply(destination).sendAsync(message);
}
private String getPartitionKey(PartitionSupplier partitionSupplier) {
if (partitionSupplier == null) {
return "";
}
if (StringUtils.hasText(partitionSupplier.getPartitionKey())) {
return partitionSupplier.getPartitionKey();
}
if (StringUtils.hasText(partitionSupplier.getPartitionId())) {
return partitionSupplier.getPartitionId();
}
return "";
}
}

Просмотреть файл

@ -0,0 +1,47 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.factory;
import com.microsoft.azure.management.servicebus.AuthorizationKeys;
import com.microsoft.azure.management.servicebus.AuthorizationRule;
import com.microsoft.azure.management.servicebus.ServiceBusNamespace;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.spring.cloud.context.core.AzureAdmin;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import java.util.function.Function;
/**
* Base class of service bus client factory to provide connection string
*
* @author Warren Zhu
*/
public abstract class AbstractServiceBusSenderFactory implements ServiceBusSenderFactory {
protected final AzureAdmin azureAdmin;
protected final ServiceBusNamespace namespace;
public AbstractServiceBusSenderFactory(@NonNull AzureAdmin azureAdmin, @NonNull String namespace) {
Assert.hasText(namespace, "namespace can't be null or empty");
this.azureAdmin = azureAdmin;
this.namespace = azureAdmin.getOrCreateServiceBusNamespace(namespace);
}
protected Function<String, String> getConnectionStringCreator() {
return Memoizer.memoize(this::getConnectionString);
}
private String getConnectionString(String name) {
return namespace.authorizationRules().list().stream().findFirst().map(AuthorizationRule::getKeys)
.map(AuthorizationKeys::primaryConnectionString)
.map(s -> new ConnectionStringBuilder(s, name).toString()).orElseThrow(
() -> new ServiceBusRuntimeException(
String.format("Service bus namespace '%s' key is empty", name), null));
}
}

Просмотреть файл

@ -0,0 +1,54 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.factory;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.cloud.context.core.AzureAdmin;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import java.util.function.Function;
/**
* Default implementation of {@link ServiceBusQueueClientFactory}.
* Client will be cached to improve performance
*
* @author Warren Zhu
*/
public class DefaultServiceBusQueueClientFactory extends AbstractServiceBusSenderFactory
implements ServiceBusQueueClientFactory {
private final Function<String, IQueueClient> queueClientCreator = Memoizer.memoize(this::createQueueClient);
public DefaultServiceBusQueueClientFactory(AzureAdmin azureAdmin, String namespace) {
super(azureAdmin, namespace);
}
@Override
public Function<String, IQueueClient> getQueueClientCreator() {
return queueClientCreator;
}
@Override
public Function<String, IQueueClient> getSenderCreator() {
return getQueueClientCreator();
}
private IQueueClient createQueueClient(String destination) {
azureAdmin.getOrCreateServiceBusQueue(namespace, destination);
try {
return new QueueClient(new ConnectionStringBuilder(getConnectionStringCreator().apply(destination)),
ReceiveMode.PEEKLOCK);
} catch (InterruptedException | ServiceBusException e) {
throw new ServiceBusRuntimeException("Failed to create service bus queue client", e);
}
}
}

Просмотреть файл

@ -0,0 +1,72 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.factory;
import com.microsoft.azure.management.servicebus.Topic;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.cloud.context.core.AzureAdmin;
import com.microsoft.azure.spring.cloud.context.core.Tuple;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import org.springframework.util.Assert;
import java.util.function.Function;
/**
* Default implementation of {@link ServiceBusTopicClientFactory}.
* Client will be cached to improve performance
*
* @author Warren Zhu
*/
public class DefaultServiceBusTopicClientFactory extends AbstractServiceBusSenderFactory
implements ServiceBusTopicClientFactory {
private final Function<Tuple<String, String>, ISubscriptionClient> subscriptionClientCreator =
Memoizer.memoize(this::createSubscriptionClient);
private final Function<String, ? extends IMessageSender> sendCreator = Memoizer.memoize(this::createTopicClient);
public DefaultServiceBusTopicClientFactory(AzureAdmin azureAdmin, String namespace) {
super(azureAdmin, namespace);
}
@Override
public Function<Tuple<String, String>, ISubscriptionClient> getSubscriptionClientCreator() {
return subscriptionClientCreator;
}
@Override
public Function<String, ? extends IMessageSender> getSenderCreator() {
return sendCreator;
}
private ISubscriptionClient createSubscriptionClient(Tuple<String, String> nameAndSubscription) {
Topic topic = azureAdmin.getServiceBusTopic(namespace, nameAndSubscription.getFirst());
Assert.notNull(topic,
() -> String.format("Service bus topic '%s' not existed", nameAndSubscription.getFirst()));
azureAdmin.getOrCreateServiceBusTopicSubscription(topic, nameAndSubscription.getSecond());
try {
return new SubscriptionClient(
new ConnectionStringBuilder(getConnectionStringCreator().apply(nameAndSubscription.getFirst())),
ReceiveMode.PEEKLOCK);
} catch (InterruptedException | ServiceBusException e) {
throw new ServiceBusRuntimeException("Failed to create service bus subscription client", e);
}
}
private IMessageSender createTopicClient(String destination) {
azureAdmin.getOrCreateServiceBusTopic(namespace, destination);
try {
return new TopicClient(new ConnectionStringBuilder(getConnectionStringCreator().apply(destination)));
} catch (InterruptedException | ServiceBusException e) {
throw new ServiceBusRuntimeException("Failed to create service bus topic client", e);
}
}
}

Просмотреть файл

@ -0,0 +1,23 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.factory;
import com.microsoft.azure.servicebus.IQueueClient;
import java.util.function.Function;
/**
* Factory to return functional creator of service bus queue client
*
* @author Warren Zhu
*/
public interface ServiceBusQueueClientFactory extends ServiceBusSenderFactory {
/**
* Return a function which accepts service bus queue name, then returns {@link IQueueClient}
*/
Function<String, IQueueClient> getQueueClientCreator();
}

Просмотреть файл

@ -0,0 +1,23 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.factory;
import com.microsoft.azure.servicebus.IMessageSender;
import java.util.function.Function;
/**
* Factory to return functional creator of service bus sender
*
* @author Warren Zhu
*/
public interface ServiceBusSenderFactory {
/**
* Return a function which accepts service bus topic or queue name, then returns {@link IMessageSender}
*/
Function<String, ? extends IMessageSender> getSenderCreator();
}

Просмотреть файл

@ -0,0 +1,24 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.factory;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import com.microsoft.azure.spring.cloud.context.core.Tuple;
import java.util.function.Function;
/**
* Factory to return functional creator of service bus topic and subscription client
*
* @author Warren Zhu
*/
public interface ServiceBusTopicClientFactory extends ServiceBusSenderFactory {
/**
* Return a function which accepts service bus topic and subscription name, then returns {@link ISubscriptionClient}
*/
Function<Tuple<String, String>, ISubscriptionClient> getSubscriptionClientCreator();
}

Просмотреть файл

@ -0,0 +1,109 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.inbound;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.spring.integration.core.AzureHeaders;
import com.microsoft.azure.spring.integration.core.SubscribeOperation;
import com.microsoft.azure.spring.integration.eventhub.inbound.CheckpointMode;
import com.microsoft.azure.spring.integration.eventhub.inbound.ListenerMode;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.StreamSupport;
public class ServiceBusQueueInboundChannelAdapter extends MessageProducerSupport {
private final String destination;
private final SubscribeOperation<IMessage> subscribeOperation;
private CheckpointMode checkpointMode = CheckpointMode.RECORD;
private ListenerMode listenerMode = ListenerMode.RECORD;
private MessageConverter messageConverter;
private Map<String, Object> commonHeaders = new HashMap<>();
public ServiceBusQueueInboundChannelAdapter(String destination,
@NonNull SubscribeOperation<IMessage> subscribeOperation) {
Assert.hasText(destination, "destination can't be null or empty");
this.destination = destination;
this.subscribeOperation = subscribeOperation;
}
@Override
protected void doStart() {
super.doStart();
this.subscribeOperation.subscribe(this.destination, this::receiveMessage);
if (this.checkpointMode == CheckpointMode.MANUAL) {
// Send the checkpointer downstream so user decides on when to checkpoint.
this.commonHeaders.put(AzureHeaders.CHECKPOINTER, subscribeOperation.getCheckpointer(this.destination));
}
}
public void receiveMessage(Iterable<IMessage> events) {
if (this.listenerMode == ListenerMode.BATCH) {
sendMessage(toMessage(events));
} else /* ListenerMode.RECORD */ {
StreamSupport.stream(events.spliterator(), false).forEach((e) -> {
sendMessage(toMessage(e.getBody()));
if (this.checkpointMode == CheckpointMode.RECORD) {
this.subscribeOperation.getCheckpointer(destination).checkpoint(e);
}
});
}
if (this.checkpointMode == CheckpointMode.BATCH) {
this.subscribeOperation.getCheckpointer(destination).checkpoint();
}
}
private Message<?> toMessage(Object payload) {
if (this.messageConverter == null) {
return MessageBuilder.withPayload(payload).copyHeaders(commonHeaders).build();
}
return this.messageConverter.toMessage(payload, new MessageHeaders(commonHeaders));
}
@Override
protected void doStop() {
this.subscribeOperation.unsubscribe(destination, this::receiveMessage);
super.doStop();
}
public CheckpointMode getCheckpointMode() {
return checkpointMode;
}
public void setCheckpointMode(CheckpointMode checkpointMode) {
this.checkpointMode = checkpointMode;
}
public ListenerMode getListenerMode() {
return listenerMode;
}
public void setListenerMode(ListenerMode listenerMode) {
this.listenerMode = listenerMode;
}
public MessageConverter getMessageConverter() {
return this.messageConverter;
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
}

Просмотреть файл

@ -0,0 +1,36 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.inbound;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.spring.integration.core.AbstractInboundChannelAdapter;
import com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public class ServiceBusTopicInboundChannelAdapter extends AbstractInboundChannelAdapter<IMessage> {
public ServiceBusTopicInboundChannelAdapter(String destination,
SubscribeByGroupOperation<IMessage> subscribeByGroupOperation, String consumerGroup) {
super(destination, subscribeByGroupOperation, consumerGroup);
}
@Override
public Message<?> toMessage(IMessage message) {
Object payload = message.getBody();
if (this.messageConverter == null) {
return MessageBuilder.withPayload(payload).copyHeaders(commonHeaders).build();
}
return this.messageConverter.toMessage(payload, new MessageHeaders(commonHeaders));
}
@Override
protected Message<?> toMessage(Iterable<IMessage> data) {
throw new UnsupportedOperationException();
}
}

Просмотреть файл

@ -0,0 +1,56 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.outbound;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.spring.integration.core.AbstractAzureMessageHandler;
import com.microsoft.azure.spring.integration.core.SendOperation;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import java.nio.charset.Charset;
import java.util.UUID;
/**
* Outbound channel adapter to publish messages to Azure Service Bus topic and queue.
*
* @author Warren Zhu
*/
public class ServiceBusMessageHandler extends AbstractAzureMessageHandler<IMessage> {
public ServiceBusMessageHandler(String destination, SendOperation<IMessage> sendOperation) {
super(destination, sendOperation);
}
@Override
public IMessage toAzureMessage(Message<?> message) {
com.microsoft.azure.servicebus.Message serviceBusMessage;
Object payload = message.getPayload();
if (payload instanceof com.microsoft.azure.servicebus.Message) {
serviceBusMessage = (com.microsoft.azure.servicebus.Message) payload;
} else if (payload instanceof String) {
serviceBusMessage =
new com.microsoft.azure.servicebus.Message(((String) payload).getBytes(Charset.defaultCharset()));
} else if (payload instanceof byte[]) {
serviceBusMessage = new com.microsoft.azure.servicebus.Message((byte[]) payload);
} else {
serviceBusMessage = new com.microsoft.azure.servicebus.Message(String.valueOf(payload));
}
if (message.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE)) {
serviceBusMessage.setContentType(message.getHeaders().get(MessageHeaders.CONTENT_TYPE, String.class));
}
if (message.getHeaders().containsKey(MessageHeaders.ID)) {
serviceBusMessage.setMessageId(message.getHeaders().get(MessageHeaders.ID, UUID.class).toString());
}
return serviceBusMessage;
}
}

Просмотреть файл

@ -0,0 +1,32 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.queue;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import java.util.concurrent.CompletableFuture;
public class ServiceBusQueueCheckpointer implements Checkpointer<IMessage> {
private final IQueueClient queueClient;
public ServiceBusQueueCheckpointer(IQueueClient queueClient) {
this.queueClient = queueClient;
}
@Override
public CompletableFuture<Void> checkpoint() {
// Service bus unsupported
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<Void> checkpoint(IMessage iMessage) {
return this.queueClient.completeAsync(iMessage.getLockToken());
}
}

Просмотреть файл

@ -0,0 +1,19 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.queue;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.spring.integration.core.SendOperation;
import com.microsoft.azure.spring.integration.core.SubscribeOperation;
/**
* Azure service bus queue operation to support send {@link IMessage} asynchronously and subscribe
*
* @author Warren Zhu
*/
public interface ServiceBusQueueOperation extends SendOperation<IMessage>, SubscribeOperation<IMessage> {
}

Просмотреть файл

@ -0,0 +1,80 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.queue;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusSendTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Default implementation of {@link ServiceBusQueueOperation}.
*
* @author Warren Zhu
*/
public class ServiceBusQueueTemplate extends ServiceBusSendTemplate<ServiceBusQueueClientFactory>
implements ServiceBusQueueOperation {
private static final Log LOGGER = LogFactory.getLog(ServiceBusQueueTemplate.class);
private final Map<String, Set<Consumer<Iterable<IMessage>>>> consumersByName = new ConcurrentHashMap<>();
private final Function<String, Checkpointer<IMessage>> checkpointGetter =
Memoizer.memoize(this::createCheckpointer);
public ServiceBusQueueTemplate(ServiceBusQueueClientFactory clientFactory) {
super(clientFactory);
}
@Override
public synchronized boolean subscribe(String destination, @NonNull Consumer<Iterable<IMessage>> consumer) {
Assert.hasText(destination, "destination can't be null or empty");
consumersByName.putIfAbsent(destination, new CopyOnWriteArraySet<>());
boolean added = consumersByName.get(destination).add(consumer);
try {
this.senderFactory.getQueueClientCreator().apply(destination)
.registerMessageHandler(new ServiceBusMessageHandler(consumersByName.get(destination)));
} catch (ServiceBusException | InterruptedException e) {
LOGGER.error("Failed to register message handler", e);
throw new ServiceBusRuntimeException("Failed to register message handler", e);
}
return added;
}
@Override
public synchronized boolean unsubscribe(String destination, Consumer<Iterable<IMessage>> consumer) {
boolean existed = consumersByName.get(destination).remove(consumer);
//TODO: unregister message handler but service bus sdk unsupported
return existed;
}
@Override
public Checkpointer<IMessage> getCheckpointer(String destination) {
return checkpointGetter.apply(destination);
}
private Checkpointer<IMessage> createCheckpointer(String destination) {
return new ServiceBusQueueCheckpointer(this.senderFactory.getQueueClientCreator().apply(destination));
}
}

Просмотреть файл

@ -0,0 +1,32 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.topic;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import java.util.concurrent.CompletableFuture;
public class ServiceBusTopicCheckpointer implements Checkpointer<IMessage> {
private final ISubscriptionClient subscriptionClient;
public ServiceBusTopicCheckpointer(ISubscriptionClient subscriptionClient) {
this.subscriptionClient = subscriptionClient;
}
@Override
public CompletableFuture<Void> checkpoint() {
// Service bus unsupported
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<Void> checkpoint(IMessage iMessage) {
return this.subscriptionClient.completeAsync(iMessage.getLockToken());
}
}

Просмотреть файл

@ -0,0 +1,21 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.topic;
import com.microsoft.azure.management.servicebus.ServiceBusSubscription;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.spring.integration.core.SendOperation;
import com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation;
/**
* Azure service bus topic operation to support send {@link IMessage} asynchronously
* and subscribe by {@link ServiceBusSubscription} as consumer group
*
* @author Warren Zhu
*/
public interface ServiceBusTopicOperation extends SendOperation<IMessage>, SubscribeByGroupOperation<IMessage> {
}

Просмотреть файл

@ -0,0 +1,88 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.servicebus.topic;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.cloud.context.core.Tuple;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusSendTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Default implementation of {@link ServiceBusTopicOperation}.
*
* @author Warren Zhu
*/
public class ServiceBusTopicTemplate extends ServiceBusSendTemplate<ServiceBusTopicClientFactory>
implements ServiceBusTopicOperation {
private static final Log LOGGER = LogFactory.getLog(ServiceBusTopicTemplate.class);
private final Map<Tuple<String, String>, Set<Consumer<Iterable<IMessage>>>> consumersByNameAndConsumerGroup =
new ConcurrentHashMap<>();
private final Function<Tuple<String, String>, Checkpointer<IMessage>> checkpointGetter =
Memoizer.memoize(this::createCheckpointer);
public ServiceBusTopicTemplate(ServiceBusTopicClientFactory clientFactory) {
super(clientFactory);
}
@Override
public synchronized boolean subscribe(String destination, @NonNull Consumer<Iterable<IMessage>> consumer,
@NonNull String consumerGroup) {
Assert.hasText(destination, "destination can't be null or empty");
Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
consumersByNameAndConsumerGroup.putIfAbsent(nameAndConsumerGroup, new CopyOnWriteArraySet<>());
boolean added = consumersByNameAndConsumerGroup.get(nameAndConsumerGroup).add(consumer);
try {
this.senderFactory.getSubscriptionClientCreator().apply(Tuple.of(destination, consumerGroup))
.registerMessageHandler(new ServiceBusMessageHandler(
consumersByNameAndConsumerGroup.get(nameAndConsumerGroup)));
} catch (ServiceBusException | InterruptedException e) {
LOGGER.error("Failed to register message handler", e);
throw new ServiceBusRuntimeException("Failed to register message handler", e);
}
return added;
}
@Override
public synchronized boolean unsubscribe(String destination, Consumer<Iterable<IMessage>> consumer,
String consumerGroup) {
boolean existed = consumersByNameAndConsumerGroup.get(Tuple.of(destination, consumerGroup)).remove(consumer);
//TODO: unregister message handler but service bus sdk unsupported
return existed;
}
@Override
public Checkpointer<IMessage> getCheckpointer(String destination, String consumerGroup) {
return this.checkpointGetter.apply(Tuple.of(destination, consumerGroup));
}
private Checkpointer<IMessage> createCheckpointer(Tuple<String, String> nameAndSubscription) {
return new ServiceBusTopicCheckpointer(this.senderFactory.getSubscriptionClientCreator().apply(Tuple
.of(nameAndSubscription.getFirst(), nameAndSubscription.getSecond())));
}
}