Merge pull request #25 from Microsoft/eventhub-starter
Implement event hub properties auto config and event hub starter
This commit is contained in:
Коммит
43fe03734c
|
@ -18,11 +18,18 @@
|
|||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>spring-cloud-azure-context</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>spring-cloud-azure-eventhub</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -38,11 +38,11 @@ public class AzureRedisAutoConfiguration {
|
|||
@ConditionalOnMissingBean
|
||||
@Primary
|
||||
@Bean
|
||||
public RedisProperties redisProperties(Azure.Authenticated authenticated, AzureProperties azureProperties,
|
||||
public RedisProperties redisProperties(Azure azure, AzureProperties azureProperties,
|
||||
AzureRedisProperties azureRedisProperties) throws IOException {
|
||||
String cacheName = azureRedisProperties.getName();
|
||||
|
||||
RedisCache redisCache = authenticated.withDefaultSubscription().redisCaches()
|
||||
RedisCache redisCache = azure.redisCaches()
|
||||
.getByResourceGroup(azureProperties.getResourceGroup(), cacheName);
|
||||
|
||||
RedisProperties redisProperties = new RedisProperties();
|
||||
|
|
|
@ -16,6 +16,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
|||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Auto-config to provide default {@link CredentialsProvider} for all Azure services
|
||||
*
|
||||
|
@ -41,7 +43,7 @@ public class AzureContextAutoConfiguration {
|
|||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public Azure.Authenticated authenticated() {
|
||||
return Azure.authenticate(credentialsProvider().getCredentials());
|
||||
public Azure azure() throws IOException {
|
||||
return Azure.authenticate(credentialsProvider().getCredentials()).withDefaultSubscription();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,8 +13,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
|||
public class AzureProperties implements CredentialSupplier {
|
||||
|
||||
private String credentialFilePath;
|
||||
|
||||
private String resourceGroup;
|
||||
private String region;
|
||||
|
||||
public String getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
public void setRegion(String region) {
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
public String getCredentialFilePath() {
|
||||
return this.credentialFilePath;
|
||||
|
|
|
@ -6,12 +6,16 @@
|
|||
|
||||
package com.microsoft.azure.spring.cloud.autoconfigure.eventhub;
|
||||
|
||||
import com.microsoft.azure.management.Azure;
|
||||
import com.microsoft.azure.management.eventhub.EventHub;
|
||||
import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureContextAutoConfiguration;
|
||||
import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureProperties;
|
||||
import eventhub.integration.AzureAdmin;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
|
@ -23,7 +27,10 @@ import org.springframework.context.annotation.Configuration;
|
|||
@AutoConfigureAfter(AzureContextAutoConfiguration.class)
|
||||
@ConditionalOnClass(EventHub.class)
|
||||
@EnableConfigurationProperties(AzureEventHubProperties.class)
|
||||
@ConditionalOnProperty("spring.cloud.azure.event.hub.namespace")
|
||||
public class AzureEventHubAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public AzureAdmin azureAdmin(Azure azure, AzureProperties azureProperties){
|
||||
return new AzureAdmin(azure, azureProperties.getResourceGroup(), azureProperties.getRegion());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,13 +13,5 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
|||
*/
|
||||
@ConfigurationProperties("spring.cloud.azure.event.hub")
|
||||
public class AzureEventHubProperties {
|
||||
private String namespace;
|
||||
|
||||
public String getNamespace() {
|
||||
return namespace;
|
||||
}
|
||||
|
||||
public void setNamespace(String namespace) {
|
||||
this.namespace = namespace;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,9 +8,9 @@ package com.microsoft.azure.spring.cloud.autoconfigure.storage;
|
|||
|
||||
import com.microsoft.azure.management.Azure;
|
||||
import com.microsoft.azure.management.storage.StorageAccount;
|
||||
import com.microsoft.azure.management.storage.StorageAccountKey;
|
||||
import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureContextAutoConfiguration;
|
||||
import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureProperties;
|
||||
import com.microsoft.azure.spring.cloud.context.core.AzureUtil;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -22,13 +22,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
|||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* An auto-configuration for Azure Storage Account
|
||||
|
@ -45,49 +40,20 @@ public class AzureStorageAutoConfiguration {
|
|||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public CloudStorageAccount storage(Azure.Authenticated authenticated, AzureProperties azureProperties,
|
||||
AzureStorageProperties azureStorageProperties) throws IOException {
|
||||
public CloudStorageAccount storage(Azure azure, AzureProperties azureProperties,
|
||||
AzureStorageProperties azureStorageProperties) {
|
||||
String accountName = azureStorageProperties.getAccount();
|
||||
|
||||
StorageAccount storageAccount = authenticated.withDefaultSubscription().storageAccounts()
|
||||
.getByResourceGroup(azureProperties.getResourceGroup(), accountName);
|
||||
Optional<StorageAccountKey> key = storageAccount.getKeys().stream().findAny();
|
||||
StorageAccount storageAccount =
|
||||
azure.storageAccounts().getByResourceGroup(azureProperties.getResourceGroup(), accountName);
|
||||
|
||||
String connectionString = ConnectionStringBuilder.build(accountName, key.get().value());
|
||||
if (key.isPresent()) {
|
||||
try {
|
||||
return CloudStorageAccount.parse(connectionString);
|
||||
} catch (URISyntaxException | InvalidKeyException e) {
|
||||
LOGGER.error("Failed to parse connection string" + connectionString, e);
|
||||
}
|
||||
}
|
||||
String connectionString = AzureUtil.getConnectionString(storageAccount);
|
||||
|
||||
throw new RuntimeException("Storage account key is empty.");
|
||||
}
|
||||
|
||||
static class ConnectionStringBuilder {
|
||||
private static final String DEFAULT_PROTOCOL = "DefaultEndpointsProtocol";
|
||||
|
||||
private static final String ACCOUNT_NAME = "AccountName";
|
||||
|
||||
private static final String ACCOUNT_KEY = "AccountKey";
|
||||
|
||||
private static final String ENDPOINT_SUFFIX = "EndpointSuffix";
|
||||
|
||||
private static final String HTTP_PROTOCOL = "http";
|
||||
|
||||
private static final String DEFAULT_ENDPOINT_SUFFIX = "core.windows.net";
|
||||
|
||||
private static final String SEPARATOR = ";";
|
||||
|
||||
static String build(String accountName, String accountKey) {
|
||||
Map<String, String> map = new HashMap<>();
|
||||
map.put(DEFAULT_PROTOCOL, HTTP_PROTOCOL);
|
||||
map.put(ACCOUNT_NAME, accountName);
|
||||
map.put(ACCOUNT_KEY, accountKey);
|
||||
map.put(ENDPOINT_SUFFIX, DEFAULT_ENDPOINT_SUFFIX);
|
||||
|
||||
return map.entrySet().stream().map(Object::toString).collect(Collectors.joining(SEPARATOR));
|
||||
try {
|
||||
return CloudStorageAccount.parse(connectionString);
|
||||
} catch (URISyntaxException | InvalidKeyException e) {
|
||||
LOGGER.error("Failed to parse connection string" + connectionString, e);
|
||||
throw new RuntimeException("Failed to parse connection string" + connectionString, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.microsoft.azure.spring.cloud.autoconfigure.context.AzureContextAutoConfiguration,\
|
||||
com.microsoft.azure.spring.cloud.autoconfigure.cache.AzureRedisAutoConfiguration,\
|
||||
com.microsoft.azure.spring.cloud.autoconfigure.storage.AzureStorageAutoConfiguration
|
||||
com.microsoft.azure.spring.cloud.autoconfigure.storage.AzureStorageAutoConfiguration,\
|
||||
com.microsoft.azure.spring.cloud.autoconfigure.eventhub.AzureEventHubAutoConfiguration
|
||||
|
||||
|
||||
|
|
|
@ -49,8 +49,8 @@ public class AzureContextAutoConfigurationTest {
|
|||
}
|
||||
|
||||
@Bean
|
||||
Azure.Authenticated authenticated() {
|
||||
return mock(Azure.Authenticated.class);
|
||||
Azure azure() {
|
||||
return mock(Azure.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
mock-maker-inline
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
|
||||
package com.microsoft.azure.spring.cloud.context.core;
|
||||
|
||||
import com.microsoft.azure.management.storage.StorageAccount;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AzureUtil {
|
||||
public static String getConnectionString(StorageAccount storageAccount){
|
||||
return storageAccount.getKeys().stream().findFirst().map(key -> ConnectionStringBuilder.build(storageAccount
|
||||
.name(), key.value())).orElseThrow(() -> new RuntimeException("Storage account key is empty."));
|
||||
}
|
||||
|
||||
private static class ConnectionStringBuilder {
|
||||
private static final String DEFAULT_PROTOCOL = "DefaultEndpointsProtocol";
|
||||
|
||||
private static final String ACCOUNT_NAME = "AccountName";
|
||||
|
||||
private static final String ACCOUNT_KEY = "AccountKey";
|
||||
|
||||
private static final String ENDPOINT_SUFFIX = "EndpointSuffix";
|
||||
|
||||
private static final String HTTP_PROTOCOL = "http";
|
||||
|
||||
private static final String DEFAULT_ENDPOINT_SUFFIX = "core.windows.net";
|
||||
|
||||
private static final String SEPARATOR = ";";
|
||||
|
||||
static String build(String accountName, String accountKey) {
|
||||
Map<String, String> map = new HashMap<>();
|
||||
map.put(DEFAULT_PROTOCOL, HTTP_PROTOCOL);
|
||||
map.put(ACCOUNT_NAME, accountName);
|
||||
map.put(ACCOUNT_KEY, accountKey);
|
||||
map.put(ENDPOINT_SUFFIX, DEFAULT_ENDPOINT_SUFFIX);
|
||||
|
||||
return map.entrySet().stream().map(Object::toString).collect(Collectors.joining(SEPARATOR));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>spring-cloud-azure-autoconfigure</artifactId>
|
||||
<artifactId>spring-cloud-azure-context</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -11,12 +11,13 @@ import com.microsoft.azure.eventhubs.EventHubClient;
|
|||
import com.microsoft.azure.eventhubs.EventHubException;
|
||||
import com.microsoft.azure.eventhubs.PartitionSender;
|
||||
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
|
||||
import com.microsoft.azure.spring.cloud.autoconfigure.eventhub.AzureEventHubProperties;
|
||||
import com.microsoft.azure.management.eventhub.AuthorizationRule;
|
||||
import com.microsoft.azure.management.eventhub.EventHubAuthorizationKey;
|
||||
import com.microsoft.azure.spring.cloud.context.core.AzureUtil;
|
||||
import eventhub.integration.AzureAdmin;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -29,7 +30,6 @@ import java.util.stream.Stream;
|
|||
*
|
||||
* @author Warren Zhu
|
||||
*/
|
||||
@Component
|
||||
public class DefaultEventHubClientFactory implements EventHubClientFactory, DisposableBean {
|
||||
private static final Log LOGGER = LogFactory.getLog(DefaultEventHubClientFactory.class);
|
||||
// eventHubName -> eventHubClient
|
||||
|
@ -46,8 +46,24 @@ public class DefaultEventHubClientFactory implements EventHubClientFactory, Disp
|
|||
private final ConcurrentHashMap<Tuple<String, String>, EventProcessorHost> processorHostMap =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private AzureEventHubProperties eventHubProperties;
|
||||
private final AzureAdmin azureAdmin;
|
||||
private final String namespace;
|
||||
private String checkpointStorageAccountContainer;
|
||||
private String checkpointStorageConnectionString;
|
||||
|
||||
public DefaultEventHubClientFactory(AzureAdmin azureAdmin, String namespace) {
|
||||
this.azureAdmin = azureAdmin;
|
||||
this.namespace = namespace;
|
||||
}
|
||||
|
||||
public void initCheckpointConnectionString(String checkpointStorageAccount) {
|
||||
this.checkpointStorageConnectionString =
|
||||
AzureUtil.getConnectionString(azureAdmin.getOrCreateStorageAccount(checkpointStorageAccount));
|
||||
}
|
||||
|
||||
public void setCheckpointStorageAccountContainer(String checkpointStorageAccountContainer) {
|
||||
this.checkpointStorageAccountContainer = checkpointStorageAccountContainer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHubClient getOrCreateEventHubClient(String eventHubName) {
|
||||
|
@ -78,18 +94,20 @@ public class DefaultEventHubClientFactory implements EventHubClientFactory, Disp
|
|||
public EventProcessorHost getOrCreateEventProcessorHost(String eventHubName, String consumerGroup) {
|
||||
return this.processorHostMap.computeIfAbsent(new Tuple(eventHubName, consumerGroup),
|
||||
key -> new EventProcessorHost(EventProcessorHost.createHostName("hostNamePrefix"), eventHubName,
|
||||
consumerGroup, getOrCreateConnectionString(eventHubName), "storageConnectionString",
|
||||
"storageContainerName"));
|
||||
consumerGroup, getOrCreateConnectionString(eventHubName), checkpointStorageConnectionString,
|
||||
checkpointStorageAccountContainer));
|
||||
}
|
||||
|
||||
private String getOrCreateConnectionString(String eventHubName) {
|
||||
return this.connectionStringMap.computeIfAbsent(eventHubName, key -> {
|
||||
//TODO: get all properties from management api, pending on no way to get access way
|
||||
return new ConnectionStringBuilder().setNamespaceName(eventHubProperties.getNamespace())
|
||||
.setEventHubName(eventHubName)
|
||||
.setSasKeyName("-----SharedAccessSignatureKeyName-----")
|
||||
.setSasKey("---SharedAccessSignatureKey----").toString();
|
||||
});
|
||||
|
||||
return this.connectionStringMap.computeIfAbsent(eventHubName,
|
||||
name -> azureAdmin.getOrCreateEventHubNamespace(namespace).listAuthorizationRules().stream().findFirst()
|
||||
.map(AuthorizationRule::getKeys)
|
||||
.map(EventHubAuthorizationKey::primaryConnectionString)
|
||||
.map(s -> new ConnectionStringBuilder(s).setEventHubName(name).toString())
|
||||
.orElseThrow(() -> new EventHubRuntimeException(
|
||||
String.format("Failed to fetch connection string of '%s'", eventHubName),
|
||||
null)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,8 +15,6 @@ import eventhub.integration.inbound.Subscriber;
|
|||
import eventhub.integration.outbound.PartitionSupplier;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
@ -28,17 +26,19 @@ import java.util.concurrent.CompletableFuture;
|
|||
*
|
||||
* @author Warren Zhu
|
||||
*/
|
||||
@Component
|
||||
public class EventHubTemplate implements EventHubOperation {
|
||||
|
||||
private static final Log LOGGER = LogFactory.getLog(EventHubTemplate.class);
|
||||
|
||||
@Autowired
|
||||
private EventHubClientFactory clientFactory;
|
||||
private final EventHubClientFactory clientFactory;
|
||||
|
||||
public EventHubTemplate(EventHubClientFactory clientFactory) {
|
||||
this.clientFactory = clientFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> sendAsync(String eventHubName, EventData eventData,
|
||||
PartitionSupplier partitionSupplier) {
|
||||
PartitionSupplier partitionSupplier) {
|
||||
try {
|
||||
EventHubClient client = this.clientFactory.getOrCreateEventHubClient(eventHubName);
|
||||
|
||||
|
@ -46,7 +46,7 @@ public class EventHubTemplate implements EventHubOperation {
|
|||
return client.send(eventData);
|
||||
} else if (!Strings.isNullOrEmpty(partitionSupplier.getPartitionId())) {
|
||||
return this.clientFactory.getOrCreatePartitionSender(eventHubName, partitionSupplier.getPartitionId())
|
||||
.send(eventData);
|
||||
.send(eventData);
|
||||
} else if (!Strings.isNullOrEmpty(partitionSupplier.getPartitionKey())) {
|
||||
return client.send(eventData, partitionSupplier.getPartitionKey());
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
|
||||
package eventhub.integration;
|
||||
|
||||
import com.microsoft.azure.management.Azure;
|
||||
import com.microsoft.azure.management.eventhub.EventHub;
|
||||
import com.microsoft.azure.management.eventhub.EventHubNamespace;
|
||||
import com.microsoft.azure.management.storage.StorageAccount;
|
||||
|
||||
public class AzureAdmin {
|
||||
|
||||
private final Azure azure;
|
||||
private final String resourceGroup;
|
||||
private final String region;
|
||||
|
||||
public AzureAdmin(Azure azure, String resourceGroup, String region) {
|
||||
this.azure = azure;
|
||||
this.resourceGroup = resourceGroup;
|
||||
this.region = region;
|
||||
if (!this.azure.resourceGroups().contain(resourceGroup)) {
|
||||
this.azure.resourceGroups().define(resourceGroup).withRegion(region).create();
|
||||
}
|
||||
}
|
||||
|
||||
public EventHub getOrCreateEventHub(String namespace, String name) {
|
||||
EventHub eventHub = getEventHub(namespace, name);
|
||||
if (eventHub == null) {
|
||||
return createEventHub(namespace, name);
|
||||
}
|
||||
|
||||
return eventHub;
|
||||
}
|
||||
|
||||
public EventHub getEventHub(String namespace, String name) {
|
||||
return azure.eventHubs().getByName(resourceGroup, namespace, name);
|
||||
}
|
||||
|
||||
public EventHub createEventHub(String namespace, String name) {
|
||||
EventHubNamespace eventHubNamespace = getOrCreateEventHubNamespace(namespace);
|
||||
|
||||
return azure.eventHubs().define(name).withExistingNamespace(eventHubNamespace).create();
|
||||
}
|
||||
|
||||
public EventHubNamespace getOrCreateEventHubNamespace(String namespace) {
|
||||
try {
|
||||
return azure.eventHubNamespaces().getByResourceGroup(resourceGroup, namespace);
|
||||
} catch (NullPointerException e) {
|
||||
// azure management api has no way to determine whether an eventhub namespace exists
|
||||
// Workaround for this is by catching NPE
|
||||
return azure.eventHubNamespaces().define(namespace).withRegion(region)
|
||||
.withExistingResourceGroup(resourceGroup).create();
|
||||
}
|
||||
}
|
||||
|
||||
public StorageAccount getOrCreateStorageAccount(String name) {
|
||||
StorageAccount storageAccount = getStorageAccount(name);
|
||||
if (storageAccount == null) {
|
||||
return createStorageAccount(name);
|
||||
}
|
||||
|
||||
return storageAccount;
|
||||
}
|
||||
|
||||
public StorageAccount getStorageAccount(String name) {
|
||||
return azure.storageAccounts().getByResourceGroup(resourceGroup, name);
|
||||
}
|
||||
|
||||
public StorageAccount createStorageAccount(String name) {
|
||||
return azure.storageAccounts().define(name).withRegion(region).withExistingResourceGroup(resourceGroup)
|
||||
.create();
|
||||
}
|
||||
}
|
|
@ -6,15 +6,12 @@
|
|||
|
||||
package eventhub.integration.inbound;
|
||||
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* A callback to perform checkpoint.
|
||||
*
|
||||
* @param <T> message type parameter
|
||||
*
|
||||
* @author Warren Zhu
|
||||
*/
|
||||
public interface Checkpointer<T> {
|
||||
|
|
|
@ -24,14 +24,14 @@ public class EventHubInboundChannelAdapter extends MessageProducerSupport {
|
|||
private final String eventHubName;
|
||||
private final EventHubOperation eventHubOperation;
|
||||
private final String consumerGroup;
|
||||
private CheckpointMode checkpointMode = CheckpointMode.BATCH;
|
||||
private ListenerMode listenerMode = ListenerMode.BATCH;
|
||||
private CheckpointMode checkpointMode = CheckpointMode.RECORD;
|
||||
private ListenerMode listenerMode = ListenerMode.RECORD;
|
||||
private Subscriber<EventData> subscriber;
|
||||
private MessageConverter messageConverter;
|
||||
private Map<String, Object> commonHeaders = new HashMap<>();
|
||||
|
||||
public EventHubInboundChannelAdapter(String eventHubName, EventHubOperation eventHubOperation,
|
||||
String consumerGroup) {
|
||||
String consumerGroup) {
|
||||
this.eventHubName = eventHubName;
|
||||
this.eventHubOperation = eventHubOperation;
|
||||
this.consumerGroup = consumerGroup;
|
||||
|
@ -56,7 +56,7 @@ public class EventHubInboundChannelAdapter extends MessageProducerSupport {
|
|||
sendMessage(toMessage(events));
|
||||
} else /* ListenerMode.RECORD */ {
|
||||
StreamSupport.stream(events.spliterator(), false).forEach((e) -> {
|
||||
sendMessage(toMessage(e));
|
||||
sendMessage(toMessage(e.getBytes()));
|
||||
if (this.checkpointMode == checkpointMode.RECORD) {
|
||||
this.subscriber.getCheckpointer().checkpoint(e);
|
||||
}
|
||||
|
@ -85,6 +85,22 @@ public class EventHubInboundChannelAdapter extends MessageProducerSupport {
|
|||
super.doStop();
|
||||
}
|
||||
|
||||
public CheckpointMode getCheckpointMode() {
|
||||
return checkpointMode;
|
||||
}
|
||||
|
||||
public void setCheckpointMode(CheckpointMode checkpointMode) {
|
||||
this.checkpointMode = checkpointMode;
|
||||
}
|
||||
|
||||
public ListenerMode getListenerMode() {
|
||||
return listenerMode;
|
||||
}
|
||||
|
||||
public void setListenerMode(ListenerMode listenerMode) {
|
||||
this.listenerMode = listenerMode;
|
||||
}
|
||||
|
||||
public MessageConverter getMessageConverter() {
|
||||
return this.messageConverter;
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ import java.util.function.Consumer;
|
|||
* {@link Checkpointer} callback to checkpoint the messages successfully processed
|
||||
*
|
||||
* @param <T> message type parameter
|
||||
*
|
||||
* @author Warren Zhu
|
||||
*/
|
||||
public interface Subscriber<T> {
|
||||
|
|
|
@ -44,11 +44,8 @@ public class EventHubMessageHandler extends AbstractMessageHandler {
|
|||
protected void handleMessageInternal(Message<?> message) throws Exception {
|
||||
|
||||
PartitionSupplier partitionSupplier = toPartitionSupplier(message);
|
||||
|
||||
String eventHubName = toEventHubName(message);
|
||||
|
||||
EventData eventData = toEventData(message);
|
||||
|
||||
CompletableFuture future = this.eventHubTemplate.sendAsync(eventHubName, eventData, partitionSupplier);
|
||||
|
||||
if (this.sync) {
|
||||
|
@ -115,9 +112,14 @@ public class EventHubMessageHandler extends AbstractMessageHandler {
|
|||
|
||||
private PartitionSupplier toPartitionSupplier(Message<?> message) {
|
||||
PartitionSupplier partitionSupplier = new PartitionSupplier();
|
||||
partitionSupplier.setPartitionKey(message.getHeaders().get(EventHubHeaders.PARTITION_KEY, String.class));
|
||||
partitionSupplier
|
||||
.setPartitionId(message.getHeaders().get(EventHubHeaders.PARTITION_ID, Integer.class).toString());
|
||||
if (message.getHeaders().containsKey(EventHubHeaders.PARTITION_KEY)) {
|
||||
partitionSupplier.setPartitionKey(message.getHeaders().get(EventHubHeaders.PARTITION_KEY, String.class));
|
||||
}
|
||||
|
||||
if (message.getHeaders().containsKey(EventHubHeaders.PARTITION_ID)) {
|
||||
partitionSupplier
|
||||
.setPartitionId(message.getHeaders().get(EventHubHeaders.PARTITION_ID, Integer.class).toString());
|
||||
}
|
||||
return partitionSupplier;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
<modules>
|
||||
<module>spring-cloud-azure-starter-cache</module>
|
||||
<module>spring-cloud-azure-starter-storage</module>
|
||||
<module>spring-cloud-azure-starter-eventhub</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>spring-cloud-azure-starters</artifactId>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<version>1.0.0.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-cloud-azure-starter-eventhub</artifactId>
|
||||
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>spring-cloud-azure-autoconfigure</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>spring-cloud-azure-eventhub</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1 @@
|
|||
provides: spring-cloud-azure-eventhub, spring-cloud-azure-autoconfigure
|
Загрузка…
Ссылка в новой задаче