Support send message by partition key expression in event hub

This commit is contained in:
Warren Zhu 2018-06-05 13:48:28 +08:00
Родитель 2b20282f9b
Коммит 300184f0ad
2 изменённых файлов: 37 добавлений и 11 удалений

Просмотреть файл

@ -15,13 +15,12 @@ import eventhub.integration.inbound.CheckpointMode;
import eventhub.integration.inbound.EventHubInboundChannelAdapter;
import eventhub.integration.inbound.ListenerMode;
import eventhub.integration.outbound.EventHubMessageHandler;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
@ -48,11 +47,18 @@ public class EventHubMessageChannelBinder extends
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<EventHubProducerProperties> producerProperties,
MessageChannel errorChannel) {
EventHubMessageHandler messageHandler = new EventHubMessageHandler(destination.getName(), this
.eventHubOperation);
messageHandler.setSync(producerProperties.getExtension().isSync());
messageHandler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
return messageHandler;
EventHubMessageHandler handler = new EventHubMessageHandler(destination.getName(), this.eventHubOperation);
handler.setSync(producerProperties.getExtension().isSync());
handler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
if (producerProperties.isPartitioned()) {
handler.setPartitionKeyExpressionString(
"'partitionKey-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
} else {
handler
.setPartitionKeyExpression(new FunctionExpression<Message<?>>(m -> m.getPayload().hashCode()));
}
return handler;
}
@Override

Просмотреть файл

@ -11,6 +11,7 @@ import eventhub.core.EventHubOperation;
import eventhub.integration.EventHubHeaders;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.codec.CodecMessageConverter;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
@ -18,6 +19,7 @@ import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.nio.charset.Charset;
@ -44,6 +46,7 @@ public class EventHubMessageHandler extends AbstractMessageHandler {
private ListenableFutureCallback<Void> sendCallback;
private EvaluationContext evaluationContext;
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
private Expression partitionKeyExpression;
public EventHubMessageHandler(String eventHubName, EventHubOperation eventHubOperation) {
Assert.hasText(eventHubName, "eventHubName can't be null or empty");
@ -97,6 +100,18 @@ public class EventHubMessageHandler extends AbstractMessageHandler {
this.sendTimeoutExpression = sendTimeoutExpression;
}
public void setPartitionKey(String partitionKey) {
setPartitionKeyExpression(new LiteralExpression(partitionKey));
}
public void setPartitionKeyExpressionString(String partitionKeyExpression) {
setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(partitionKeyExpression));
}
public void setPartitionKeyExpression(Expression partitionKeyExpression) {
this.partitionKeyExpression = partitionKeyExpression;
}
public boolean isSync() {
return this.sync;
}
@ -148,8 +163,13 @@ public class EventHubMessageHandler extends AbstractMessageHandler {
private PartitionSupplier toPartitionSupplier(Message<?> message) {
PartitionSupplier partitionSupplier = new PartitionSupplier();
if (message.getHeaders().containsKey(EventHubHeaders.PARTITION_KEY)) {
partitionSupplier.setPartitionKey(message.getHeaders().get(EventHubHeaders.PARTITION_KEY, String.class));
String partitionKey = message.getHeaders().get(EventHubHeaders.PARTITION_KEY, String.class);
if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
}
if (StringUtils.hasText(partitionKey)) {
partitionSupplier.setPartitionKey(partitionKey);
}
if (message.getHeaders().containsKey(EventHubHeaders.PARTITION_ID)) {