Implement Spring cloud stream event hub binder

This commit is contained in:
Warren Zhu 2018-06-04 17:36:24 +08:00
Родитель 0e34d114c9
Коммит 85f26bfa15
11 изменённых файлов: 387 добавлений и 1 удалений

Просмотреть файл

@ -6,7 +6,6 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>spring-cloud-azure</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -24,6 +23,12 @@
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>spring-cloud-azure-eventhub</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
@ -35,5 +40,6 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,76 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder;
import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties;
import com.microsoft.azure.eventhub.stream.binder.properties.EventHubExtendedBindingProperties;
import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties;
import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelProvisioner;
import eventhub.core.EventHubOperation;
import eventhub.integration.inbound.CheckpointMode;
import eventhub.integration.inbound.EventHubInboundChannelAdapter;
import eventhub.integration.inbound.ListenerMode;
import eventhub.integration.outbound.EventHubMessageHandler;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
/**
* @author Warren Zhu
*/
public class EventHubMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<EventHubConsumerProperties>,
ExtendedProducerProperties<EventHubProducerProperties>, EventHubChannelProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, EventHubConsumerProperties, EventHubProducerProperties> {
private EventHubOperation eventHubOperation;
private EventHubExtendedBindingProperties bindingProperties = new EventHubExtendedBindingProperties();
public EventHubMessageChannelBinder(String[] headersToEmbed, EventHubChannelProvisioner provisioningProvider,
EventHubOperation eventHubOperation) {
super(headersToEmbed, provisioningProvider);
this.eventHubOperation = eventHubOperation;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<EventHubProducerProperties> producerProperties,
MessageChannel errorChannel) {
return new EventHubMessageHandler(destination.getName(), this.eventHubOperation);
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<EventHubConsumerProperties> properties) {
EventHubInboundChannelAdapter inboundAdapter =
new EventHubInboundChannelAdapter(destination.getName(), this.eventHubOperation, group);
// Spring cloud stream only support record mode now
inboundAdapter.setListenerMode(ListenerMode.RECORD);
inboundAdapter.setCheckpointMode(CheckpointMode.RECORD);
return inboundAdapter;
}
@Override
public EventHubConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.bindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public EventHubProducerProperties getExtendedProducerProperties(String channelName) {
return this.bindingProperties.getExtendedProducerProperties(channelName);
}
}

Просмотреть файл

@ -0,0 +1,57 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.config;
import com.microsoft.azure.eventhub.stream.binder.EventHubMessageChannelBinder;
import com.microsoft.azure.eventhub.stream.binder.properties.EventHubExtendedBindingProperties;
import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelProvisioner;
import eventhub.core.DefaultEventHubClientFactory;
import eventhub.core.EventHubClientFactory;
import eventhub.core.EventHubOperation;
import eventhub.core.EventHubTemplate;
import eventhub.integration.AzureAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Warren Zhu
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@EnableConfigurationProperties(EventHubExtendedBindingProperties.class)
public class EventHubBinderConfiguration {
@Bean
public EventHubClientFactory clientFactory(AzureAdmin azureAdmin,
EventHubExtendedBindingProperties bindingProperties) {
DefaultEventHubClientFactory clientFactory =
new DefaultEventHubClientFactory(azureAdmin, bindingProperties.getNamespace());
clientFactory.initCheckpointConnectionString(bindingProperties.getCheckpointStorageAccount());
clientFactory.setCheckpointStorageAccountContainer(bindingProperties.getCheckpointStorageAccountContainer());
return clientFactory;
}
@Bean
public EventHubChannelProvisioner eventHubChannelProvisioner(AzureAdmin azureAdmin,
EventHubExtendedBindingProperties bindingProperties) {
return new EventHubChannelProvisioner(azureAdmin, bindingProperties.getNamespace());
}
@Bean
public EventHubOperation eventHubOperation(EventHubClientFactory clientFactory) {
return new EventHubTemplate(clientFactory);
}
@Bean
public EventHubMessageChannelBinder eventHubBinder(EventHubChannelProvisioner eventHubChannelProvisioner,
EventHubOperation eventHubOperation) {
return new EventHubMessageChannelBinder(null, eventHubChannelProvisioner, eventHubOperation);
}
}

Просмотреть файл

@ -0,0 +1,31 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.properties;
/**
* @author Warren Zhu
*/
public class EventHubBindingProperties {
private EventHubConsumerProperties consumer = new EventHubConsumerProperties();
private EventHubProducerProperties producer = new EventHubProducerProperties();
public EventHubConsumerProperties getConsumer() {
return this.consumer;
}
public void setConsumer(EventHubConsumerProperties consumer) {
this.consumer = consumer;
}
public EventHubProducerProperties getProducer() {
return this.producer;
}
public void setProducer(EventHubProducerProperties producer) {
this.producer = producer;
}
}

Просмотреть файл

@ -0,0 +1,14 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.properties;
/**
* @author Warren Zhu
*/
public class EventHubConsumerProperties {
}

Просмотреть файл

@ -0,0 +1,61 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Warren Zhu
*/
@ConfigurationProperties("spring.cloud.stream.eventhub")
public class EventHubExtendedBindingProperties
implements ExtendedBindingProperties<EventHubConsumerProperties, EventHubProducerProperties> {
private Map<String, EventHubBindingProperties> bindings = new ConcurrentHashMap<>();
private String namespace;
private String checkpointStorageAccount;
private String checkpointStorageAccountContainer;
public String getCheckpointStorageAccount() {
return checkpointStorageAccount;
}
public void setCheckpointStorageAccount(String checkpointStorageAccount) {
this.checkpointStorageAccount = checkpointStorageAccount;
}
public String getCheckpointStorageAccountContainer() {
return checkpointStorageAccountContainer;
}
public void setCheckpointStorageAccountContainer(String checkpointStorageAccountContainer) {
this.checkpointStorageAccountContainer = checkpointStorageAccountContainer;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
@Override
public EventHubConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.bindings.computeIfAbsent(channelName, key -> new EventHubBindingProperties()).getConsumer();
}
@Override
public EventHubProducerProperties getExtendedProducerProperties(String channelName) {
return this.bindings.computeIfAbsent(channelName, key -> new EventHubBindingProperties()).getProducer();
}
}

Просмотреть файл

@ -0,0 +1,32 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.properties;
/**
* @author Warren Zhu
*/
public class EventHubProducerProperties {
private boolean sync;
private long sendTimeout = 10000;
public boolean isSync() {
return this.sync;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public long getSendTimeout() {
return sendTimeout;
}
public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
}
}

Просмотреть файл

@ -0,0 +1,51 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.provisioning;
import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties;
import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties;
import eventhub.integration.AzureAdmin;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;
/**
* @author Warren Zhu
*/
public class EventHubChannelProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<EventHubConsumerProperties>,
ExtendedProducerProperties<EventHubProducerProperties>> {
private final AzureAdmin azureAdmin;
private final String namespace;
public EventHubChannelProvisioner(AzureAdmin azureAdmin, String namespace) {
Assert.notNull(azureAdmin, "The azureAdmin can't be null.");
Assert.hasText(namespace, "The namespace can't be null or empty");
this.azureAdmin = azureAdmin;
this.namespace = namespace;
}
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<EventHubProducerProperties> properties) throws ProvisioningException {
this.azureAdmin.getOrCreateEventHub(namespace, name);
return new EventHubProducerDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<EventHubConsumerProperties> properties) throws ProvisioningException {
//TODO: create consumer group if not existed
return new EventHubConsumerDestination(name);
}
}

Просмотреть файл

@ -0,0 +1,26 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.provisioning;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
/**
* @author Warren Zhu
*/
public class EventHubConsumerDestination implements ConsumerDestination {
private String name;
public EventHubConsumerDestination(String name) {
this.name = name;
}
@Override
public String getName() {
return this.name;
}
}

Просмотреть файл

@ -0,0 +1,31 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.eventhub.stream.binder.provisioning;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
/**
* @author Warren Zhu
*/
public class EventHubProducerDestination implements ProducerDestination {
private String name;
public EventHubProducerDestination(String name) {
this.name = name;
}
@Override
public String getName() {
return this.name;
}
@Override
public String getNameForPartition(int partition) {
return this.name + "-" + partition;
}
}

Просмотреть файл

@ -0,0 +1 @@
eventhub: com.microsoft.azure.eventhub.stream.binder.config.EventHubBinderConfiguration