diff --git a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java index dd56cd3e..202deed9 100644 --- a/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java +++ b/spring-cloud-azure-eventhub-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/EventHubMessageChannelBinder.java @@ -15,13 +15,12 @@ import eventhub.integration.inbound.CheckpointMode; import eventhub.integration.inbound.EventHubInboundChannelAdapter; import eventhub.integration.inbound.ListenerMode; import eventhub.integration.outbound.EventHubMessageHandler; -import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; -import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.ExtendedProducerProperties; -import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.binder.*; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.util.Assert; @@ -48,11 +47,18 @@ public class EventHubMessageChannelBinder extends protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) { - EventHubMessageHandler messageHandler = new EventHubMessageHandler(destination.getName(), this - .eventHubOperation); - messageHandler.setSync(producerProperties.getExtension().isSync()); - messageHandler.setSendTimeout(producerProperties.getExtension().getSendTimeout()); - return messageHandler; + EventHubMessageHandler handler = new EventHubMessageHandler(destination.getName(), this.eventHubOperation); + handler.setSync(producerProperties.getExtension().isSync()); + handler.setSendTimeout(producerProperties.getExtension().getSendTimeout()); + if (producerProperties.isPartitioned()) { + handler.setPartitionKeyExpressionString( + "'partitionKey-' + headers['" + BinderHeaders.PARTITION_HEADER + "']"); + } else { + handler + .setPartitionKeyExpression(new FunctionExpression>(m -> m.getPayload().hashCode())); + } + + return handler; } @Override diff --git a/spring-cloud-azure-eventhub/src/main/java/eventhub/integration/outbound/EventHubMessageHandler.java b/spring-cloud-azure-eventhub/src/main/java/eventhub/integration/outbound/EventHubMessageHandler.java index 3579a6a8..0df1693b 100644 --- a/spring-cloud-azure-eventhub/src/main/java/eventhub/integration/outbound/EventHubMessageHandler.java +++ b/spring-cloud-azure-eventhub/src/main/java/eventhub/integration/outbound/EventHubMessageHandler.java @@ -11,6 +11,7 @@ import eventhub.core.EventHubOperation; import eventhub.integration.EventHubHeaders; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.codec.CodecMessageConverter; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.expression.ValueExpression; @@ -18,6 +19,7 @@ import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; import org.springframework.util.concurrent.ListenableFutureCallback; import java.nio.charset.Charset; @@ -44,6 +46,7 @@ public class EventHubMessageHandler extends AbstractMessageHandler { private ListenableFutureCallback sendCallback; private EvaluationContext evaluationContext; private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT); + private Expression partitionKeyExpression; public EventHubMessageHandler(String eventHubName, EventHubOperation eventHubOperation) { Assert.hasText(eventHubName, "eventHubName can't be null or empty"); @@ -97,6 +100,18 @@ public class EventHubMessageHandler extends AbstractMessageHandler { this.sendTimeoutExpression = sendTimeoutExpression; } + public void setPartitionKey(String partitionKey) { + setPartitionKeyExpression(new LiteralExpression(partitionKey)); + } + + public void setPartitionKeyExpressionString(String partitionKeyExpression) { + setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(partitionKeyExpression)); + } + + public void setPartitionKeyExpression(Expression partitionKeyExpression) { + this.partitionKeyExpression = partitionKeyExpression; + } + public boolean isSync() { return this.sync; } @@ -148,8 +163,13 @@ public class EventHubMessageHandler extends AbstractMessageHandler { private PartitionSupplier toPartitionSupplier(Message message) { PartitionSupplier partitionSupplier = new PartitionSupplier(); - if (message.getHeaders().containsKey(EventHubHeaders.PARTITION_KEY)) { - partitionSupplier.setPartitionKey(message.getHeaders().get(EventHubHeaders.PARTITION_KEY, String.class)); + String partitionKey = message.getHeaders().get(EventHubHeaders.PARTITION_KEY, String.class); + if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) { + partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class); + } + + if (StringUtils.hasText(partitionKey)) { + partitionSupplier.setPartitionKey(partitionKey); } if (message.getHeaders().containsKey(EventHubHeaders.PARTITION_ID)) {