diff --git a/spring-cloud-azure-eventhub-stream-binder/pom.xml b/spring-cloud-azure-eventhub-stream-binder/pom.xml index 5afbed80..6d70ca1f 100644 --- a/spring-cloud-azure-eventhub-stream-binder/pom.xml +++ b/spring-cloud-azure-eventhub-stream-binder/pom.xml @@ -6,7 +6,6 @@ com.microsoft.azure spring-cloud-azure 1.0.0.BUILD-SNAPSHOT - ../pom.xml 4.0.0 @@ -24,6 +23,12 @@ spring-cloud-stream + + com.microsoft.azure + spring-cloud-azure-eventhub + ${project.version} + + org.springframework.boot spring-boot-configuration-processor @@ -35,5 +40,6 @@ spring-cloud-stream-binder-test test + diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java new file mode 100644 index 00000000..dcb203a7 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder; + +import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties; +import com.microsoft.azure.eventhub.stream.binder.properties.EventHubExtendedBindingProperties; +import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties; +import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelProvisioner; +import eventhub.core.EventHubOperation; +import eventhub.integration.inbound.CheckpointMode; +import eventhub.integration.inbound.EventHubInboundChannelAdapter; +import eventhub.integration.inbound.ListenerMode; +import eventhub.integration.outbound.EventHubMessageHandler; +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.core.MessageProducer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.util.Assert; + +/** + * @author Warren Zhu + */ +public class EventHubMessageChannelBinder extends + AbstractMessageChannelBinder, + ExtendedProducerProperties, EventHubChannelProvisioner> + implements ExtendedPropertiesBinder { + + private EventHubOperation eventHubOperation; + + private EventHubExtendedBindingProperties bindingProperties = new EventHubExtendedBindingProperties(); + + public EventHubMessageChannelBinder(String[] headersToEmbed, EventHubChannelProvisioner provisioningProvider, + EventHubOperation eventHubOperation) { + super(headersToEmbed, provisioningProvider); + this.eventHubOperation = eventHubOperation; + } + + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties producerProperties, + MessageChannel errorChannel) { + return new EventHubMessageHandler(destination.getName(), this.eventHubOperation); + } + + @Override + protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, + ExtendedConsumerProperties properties) { + EventHubInboundChannelAdapter inboundAdapter = + new EventHubInboundChannelAdapter(destination.getName(), this.eventHubOperation, group); + // Spring cloud stream only support record mode now + inboundAdapter.setListenerMode(ListenerMode.RECORD); + inboundAdapter.setCheckpointMode(CheckpointMode.RECORD); + + return inboundAdapter; + } + + @Override + public EventHubConsumerProperties getExtendedConsumerProperties(String channelName) { + return this.bindingProperties.getExtendedConsumerProperties(channelName); + } + + @Override + public EventHubProducerProperties getExtendedProducerProperties(String channelName) { + return this.bindingProperties.getExtendedProducerProperties(channelName); + } + +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/config/EventHubBinderConfiguration.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/config/EventHubBinderConfiguration.java new file mode 100644 index 00000000..cb0f4ee6 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/config/EventHubBinderConfiguration.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.config; + +import com.microsoft.azure.eventhub.stream.binder.EventHubMessageChannelBinder; +import com.microsoft.azure.eventhub.stream.binder.properties.EventHubExtendedBindingProperties; +import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelProvisioner; +import eventhub.core.DefaultEventHubClientFactory; +import eventhub.core.EventHubClientFactory; +import eventhub.core.EventHubOperation; +import eventhub.core.EventHubTemplate; +import eventhub.integration.AzureAdmin; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.stream.binder.Binder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Warren Zhu + */ +@Configuration +@ConditionalOnMissingBean(Binder.class) +@EnableConfigurationProperties(EventHubExtendedBindingProperties.class) +public class EventHubBinderConfiguration { + + @Bean + public EventHubClientFactory clientFactory(AzureAdmin azureAdmin, + EventHubExtendedBindingProperties bindingProperties) { + DefaultEventHubClientFactory clientFactory = + new DefaultEventHubClientFactory(azureAdmin, bindingProperties.getNamespace()); + clientFactory.initCheckpointConnectionString(bindingProperties.getCheckpointStorageAccount()); + clientFactory.setCheckpointStorageAccountContainer(bindingProperties.getCheckpointStorageAccountContainer()); + return clientFactory; + } + + @Bean + public EventHubChannelProvisioner eventHubChannelProvisioner(AzureAdmin azureAdmin, + EventHubExtendedBindingProperties bindingProperties) { + return new EventHubChannelProvisioner(azureAdmin, bindingProperties.getNamespace()); + } + + @Bean + public EventHubOperation eventHubOperation(EventHubClientFactory clientFactory) { + return new EventHubTemplate(clientFactory); + } + + @Bean + public EventHubMessageChannelBinder eventHubBinder(EventHubChannelProvisioner eventHubChannelProvisioner, + EventHubOperation eventHubOperation) { + return new EventHubMessageChannelBinder(null, eventHubChannelProvisioner, eventHubOperation); + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubBindingProperties.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubBindingProperties.java new file mode 100644 index 00000000..206998d7 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubBindingProperties.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.properties; + +/** + * @author Warren Zhu + */ +public class EventHubBindingProperties { + private EventHubConsumerProperties consumer = new EventHubConsumerProperties(); + private EventHubProducerProperties producer = new EventHubProducerProperties(); + + public EventHubConsumerProperties getConsumer() { + return this.consumer; + } + + public void setConsumer(EventHubConsumerProperties consumer) { + this.consumer = consumer; + } + + public EventHubProducerProperties getProducer() { + return this.producer; + } + + public void setProducer(EventHubProducerProperties producer) { + this.producer = producer; + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubConsumerProperties.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubConsumerProperties.java new file mode 100644 index 00000000..196796a4 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubConsumerProperties.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.properties; + +/** + * @author Warren Zhu + */ +public class EventHubConsumerProperties { + +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubExtendedBindingProperties.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubExtendedBindingProperties.java new file mode 100644 index 00000000..c285f6c1 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubExtendedBindingProperties.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.stream.binder.ExtendedBindingProperties; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author Warren Zhu + */ +@ConfigurationProperties("spring.cloud.stream.eventhub") +public class EventHubExtendedBindingProperties + implements ExtendedBindingProperties { + + private Map bindings = new ConcurrentHashMap<>(); + private String namespace; + private String checkpointStorageAccount; + private String checkpointStorageAccountContainer; + + public String getCheckpointStorageAccount() { + return checkpointStorageAccount; + } + + public void setCheckpointStorageAccount(String checkpointStorageAccount) { + this.checkpointStorageAccount = checkpointStorageAccount; + } + + public String getCheckpointStorageAccountContainer() { + return checkpointStorageAccountContainer; + } + + public void setCheckpointStorageAccountContainer(String checkpointStorageAccountContainer) { + this.checkpointStorageAccountContainer = checkpointStorageAccountContainer; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + @Override + public EventHubConsumerProperties getExtendedConsumerProperties(String channelName) { + return this.bindings.computeIfAbsent(channelName, key -> new EventHubBindingProperties()).getConsumer(); + } + + @Override + public EventHubProducerProperties getExtendedProducerProperties(String channelName) { + return this.bindings.computeIfAbsent(channelName, key -> new EventHubBindingProperties()).getProducer(); + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubProducerProperties.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubProducerProperties.java new file mode 100644 index 00000000..a6c959d5 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubProducerProperties.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.properties; + +/** + * @author Warren Zhu + */ +public class EventHubProducerProperties { + private boolean sync; + + private long sendTimeout = 10000; + + public boolean isSync() { + return this.sync; + } + + public void setSync(boolean sync) { + this.sync = sync; + } + + public long getSendTimeout() { + return sendTimeout; + } + + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelProvisioner.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelProvisioner.java new file mode 100644 index 00000000..e88c15dd --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelProvisioner.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.provisioning; + +import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties; +import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties; +import eventhub.integration.AzureAdmin; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.provisioning.ProvisioningException; +import org.springframework.cloud.stream.provisioning.ProvisioningProvider; +import org.springframework.util.Assert; + +/** + * @author Warren Zhu + */ +public class EventHubChannelProvisioner implements + ProvisioningProvider, + ExtendedProducerProperties> { + + private final AzureAdmin azureAdmin; + private final String namespace; + + public EventHubChannelProvisioner(AzureAdmin azureAdmin, String namespace) { + Assert.notNull(azureAdmin, "The azureAdmin can't be null."); + Assert.hasText(namespace, "The namespace can't be null or empty"); + this.azureAdmin = azureAdmin; + this.namespace = namespace; + } + + @Override + public ProducerDestination provisionProducerDestination(String name, + ExtendedProducerProperties properties) throws ProvisioningException { + this.azureAdmin.getOrCreateEventHub(namespace, name); + + return new EventHubProducerDestination(name); + } + + @Override + public ConsumerDestination provisionConsumerDestination(String name, String group, + ExtendedConsumerProperties properties) throws ProvisioningException { + //TODO: create consumer group if not existed + return new EventHubConsumerDestination(name); + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubConsumerDestination.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubConsumerDestination.java new file mode 100644 index 00000000..b129d177 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubConsumerDestination.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.provisioning; + +import org.springframework.cloud.stream.provisioning.ConsumerDestination; + +/** + * @author Warren Zhu + */ +public class EventHubConsumerDestination implements ConsumerDestination { + + private String name; + + public EventHubConsumerDestination(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubProducerDestination.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubProducerDestination.java new file mode 100644 index 00000000..ab92c5b8 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubProducerDestination.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for + * license information. + */ + +package com.microsoft.azure.eventhub.stream.binder.provisioning; + +import org.springframework.cloud.stream.provisioning.ProducerDestination; + +/** + * @author Warren Zhu + */ +public class EventHubProducerDestination implements ProducerDestination { + + private String name; + + public EventHubProducerDestination(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public String getNameForPartition(int partition) { + return this.name + "-" + partition; + } +} diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/resources/META-INF/spring.binders b/spring-cloud-azure-eventhub-stream-binder/src/main/resources/META-INF/spring.binders new file mode 100644 index 00000000..4f388e15 --- /dev/null +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/resources/META-INF/spring.binders @@ -0,0 +1 @@ +eventhub: com.microsoft.azure.eventhub.stream.binder.config.EventHubBinderConfiguration