enable scheduled enqueue message in servicebus binder

This commit is contained in:
Yi Liu 2020-07-16 15:07:20 +08:00
Родитель 37221c4e49
Коммит 4a4eab82ef
3 изменённых файлов: 23 добавлений и 0 удалений

Просмотреть файл

@ -22,6 +22,8 @@ public class AzureHeaders {
public static final String NAME = PREFIX + "name";
public static final String SCHEDULED_ENQUEUE_MESSAGE = "x-delay";
/**
* The {@value CHECKPOINTER} header for checkpoint the specific message.
*/

Просмотреть файл

@ -6,6 +6,7 @@
package com.microsoft.azure.spring.integration.servicebus.converter;
import com.microsoft.azure.management.Azure;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.MessageBody;
@ -18,6 +19,8 @@ import org.springframework.util.InvalidMimeTypeException;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -82,6 +85,11 @@ public class ServiceBusMessageConverter extends AbstractAzureMessageConverter<IM
serviceBusMessage.setReplyTo(headers.get(MessageHeaders.REPLY_CHANNEL, String.class));
}
if (headers.containsKey(AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE)) {
serviceBusMessage.setScheduledEnqueueTimeUtc(Instant.now().plus(Duration.ofMillis(
headers.get(AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE, Integer.class))));
}
headers.forEach((key, value) -> serviceBusMessage.getProperties().put(key, value.toString()));
}

Просмотреть файл

@ -17,7 +17,10 @@ import com.microsoft.azure.spring.integration.servicebus.converter.ServiceBusMes
import com.microsoft.azure.spring.integration.test.support.AzureMessageConverterTest;
import org.junit.Test;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.integration.support.MessageBuilder;
import java.util.HashMap;
import java.util.List;
import static java.util.Collections.singletonList;
@ -98,4 +101,14 @@ public class ServiceBusMessageConverterTest extends AzureMessageConverterTest<IM
convertedPayload.getPayload(),
new ObjectMapper().writeValueAsBytes(internalSequence));
}
@Test
public void shouldConvertSpringMessageHeaderIntoIMessage() {
org.springframework.messaging.Message<String> springMessage =
MessageBuilder.withPayload(payload).setHeader("x-delay", 5000).build();
IMessage servicebusMessage = getConverter().fromMessage(springMessage, IMessage.class);
assertNotNull(servicebusMessage);
assertNotNull(servicebusMessage.getScheduledEnqueueTimeUtc());
}
}