[SDK][Bot-Azure] Add AzureQueueStorage component (#1033)

* Implement AzureQueueStorage

* Add and fix unit tests

* Improve createQueueIfNotExists handling

* Replace assertEmulator with runIfEmulator

* Apply Tracy's feedback

* Update libraries/bot-azure/src/test/java/com/microsoft/bot/azure/AzureQueueTests.java

Co-authored-by: Martin Battaglino <martinbatta32@gmail.com>

Co-authored-by: Martin Battaglino <martinbatta32@gmail.com>

* Remove double brace to standard initialization

Co-authored-by: Martin Battaglino <martinbatta32@gmail.com>
This commit is contained in:
Victor Grycuk 2021-03-23 16:28:25 -03:00 коммит произвёл GitHub
Родитель 2efc030728
Коммит b7c65cac38
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 383 добавлений и 2 удалений

Просмотреть файл

@ -73,6 +73,12 @@
<artifactId>bot-dialogs</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-queue</artifactId>
<version>12.8.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.bot</groupId>
<artifactId>bot-builder</artifactId>

Просмотреть файл

@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure.queues;
import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.SendMessageResult;
import com.microsoft.bot.builder.QueueStorage;
import com.microsoft.bot.restclient.serializer.JacksonAdapter;
import com.microsoft.bot.schema.Activity;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
/**
* Service used to add messages to an Azure.Storage.Queues.
*/
public class AzureQueueStorage extends QueueStorage {
private Boolean createQueueIfNotExists = true;
private final QueueClient queueClient;
/**
* Initializes a new instance of the {@link AzureQueueStorage} class.
* @param queuesStorageConnectionString Azure Storage connection string.
* @param queueName Name of the storage queue where entities will be queued.
*/
public AzureQueueStorage(String queuesStorageConnectionString, String queueName) {
if (StringUtils.isBlank(queuesStorageConnectionString)) {
throw new IllegalArgumentException("queuesStorageConnectionString is required.");
}
if (StringUtils.isBlank(queueName)) {
throw new IllegalArgumentException("queueName is required.");
}
queueClient = new QueueClientBuilder()
.connectionString(queuesStorageConnectionString)
.queueName(queueName)
.buildClient();
}
/**
* Queue an Activity to an Azure.Storage.Queues.QueueClient.
* The visibility timeout specifies how long the message should be invisible
* to Dequeue and Peek operations. The message content must be a UTF-8 encoded string that is up to 64KB in size.
* @param activity This is expected to be an {@link Activity} retrieved from a call to
* activity.GetConversationReference().GetContinuationActivity().
* This enables restarting the conversation using BotAdapter.ContinueConversationAsync.
* @param visibilityTimeout Default value of 0. Cannot be larger than 7 days.
* @param timeToLive Specifies the time-to-live interval for the message.
* @return {@link SendMessageResult} as a Json string, from the QueueClient SendMessageAsync operation.
*/
@Override
public CompletableFuture<String> queueActivity(Activity activity,
@Nullable Duration visibilityTimeout,
@Nullable Duration timeToLive) {
return CompletableFuture.supplyAsync(() -> {
if (createQueueIfNotExists) {
try {
queueClient.create();
} catch (Exception e) {
throw new RuntimeException(e);
}
// This is an optimization flag to check if the container creation call has been made.
// It is okay if this is called more than once.
createQueueIfNotExists = false;
}
try {
JacksonAdapter jacksonAdapter = new JacksonAdapter();
String serializedActivity = jacksonAdapter.serialize(activity);
byte[] encodedBytes = serializedActivity.getBytes(StandardCharsets.UTF_8);
String encodedString = Base64.getEncoder().encodeToString(encodedBytes);
SendMessageResult receipt = queueClient.sendMessage(encodedString);
return jacksonAdapter.serialize(receipt);
} catch (IOException e) {
e.printStackTrace();
}
return null;
});
}
}

Просмотреть файл

@ -0,0 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for
// license information.
/**
* This package contains the classes for bot-integration-core.
*/
package com.microsoft.bot.azure.queues;

Просмотреть файл

@ -0,0 +1,246 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.QueueMessageItem;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.microsoft.bot.azure.queues.AzureQueueStorage;
import com.microsoft.bot.builder.ConversationState;
import com.microsoft.bot.builder.MemoryStorage;
import com.microsoft.bot.builder.QueueStorage;
import com.microsoft.bot.builder.UserState;
import com.microsoft.bot.builder.adapters.TestAdapter;
import com.microsoft.bot.builder.adapters.TestFlow;
import com.microsoft.bot.dialogs.Dialog;
import com.microsoft.bot.dialogs.DialogContext;
import com.microsoft.bot.dialogs.DialogManager;
import com.microsoft.bot.dialogs.DialogTurnResult;
import com.microsoft.bot.schema.Activity;
import com.microsoft.bot.schema.ActivityEventNames;
import com.microsoft.bot.schema.ActivityTypes;
import com.microsoft.bot.schema.ConversationReference;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
import java.util.Calendar;
import java.util.concurrent.CompletableFuture;
import com.microsoft.bot.restclient.serializer.JacksonAdapter;
public class AzureQueueTests {
private static final Integer DEFAULT_DELAY = 2000;
private static boolean emulatorIsRunning = false;
private final String connectionString = "UseDevelopmentStorage=true";
private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install.";
@BeforeClass
public static void allTestsInit() throws IOException, InterruptedException {
Process p = Runtime.getRuntime().exec
("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start");
int result = p.waitFor();
// status = 0: the service was started.
// status = -5: the service is already started. Only one instance of the application
// can be run at the same time.
emulatorIsRunning = result == 0 || result == -5;
}
// These tests require Azure Storage Emulator v5.7
public QueueClient containerInit(String name) {
QueueClient queue = new QueueClientBuilder()
.connectionString(connectionString)
.queueName(name)
.buildClient();
queue.create();
queue.clearMessages();
return queue;
}
@Test
public void continueConversationLaterTests() {
if (runIfEmulator()) {
String queueName = "continueconversationlatertests";
QueueClient queue = containerInit(queueName);
ConversationReference cr = TestAdapter.createConversationReference("ContinueConversationLaterTests", "User1", "Bot");
TestAdapter adapter = new TestAdapter(cr)
.useStorage(new MemoryStorage())
.useBotState(new ConversationState(new MemoryStorage()), new UserState(new MemoryStorage()));
AzureQueueStorage queueStorage = new AzureQueueStorage(connectionString, queueName);
Calendar cal = Calendar.getInstance();
cal.add(Calendar.SECOND, 2);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
ContinueConversationLater ccl = new ContinueConversationLater();
ccl.setDate(sdf.format(cal.getTime()));
ccl.setValue("foo");
DialogManager dm = new DialogManager(ccl, "DialogStateProperty");
dm.getInitialTurnState().replace("QueueStorage", queueStorage);
new TestFlow(adapter, turnContext -> CompletableFuture.runAsync(() -> dm.onTurn(turnContext)))
.send("hi")
.startTest().join();
try {
Thread.sleep(DEFAULT_DELAY);
} catch (InterruptedException e) {
e.printStackTrace();
Assert.fail();
}
QueueMessageItem messages = queue.receiveMessage();
JacksonAdapter jacksonAdapter = new JacksonAdapter();
String messageJson = new String(Base64.decodeBase64(messages.getMessageText()));
Activity activity = null;
try {
activity = jacksonAdapter.deserialize(messageJson, Activity.class);
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertTrue(activity.isType(ActivityTypes.EVENT));
Assert.assertEquals(ActivityEventNames.CONTINUE_CONVERSATION, activity.getName());
Assert.assertEquals("foo", activity.getValue());
Assert.assertNotNull(activity.getRelatesTo());
ConversationReference cr2 = activity.getConversationReference();
cr.setActivityId(null);
cr2.setActivityId(null);
try {
Assert.assertEquals(jacksonAdapter.serialize(cr), jacksonAdapter.serialize(cr2));
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
}
}
private boolean runIfEmulator() {
if (!emulatorIsRunning) {
System.out.println(NO_EMULATOR_MESSAGE);
return false;
}
return true;
}
private class ContinueConversationLater extends Dialog {
@JsonProperty("disabled")
private Boolean disabled = false;
@JsonProperty("date")
private String date;
@JsonProperty("value")
private String value;
/**
* Initializes a new instance of the Dialog class.
*/
public ContinueConversationLater() {
super(ContinueConversationLater.class.getName());
}
@Override
public CompletableFuture<DialogTurnResult> beginDialog(DialogContext dc, Object options) {
if (this.disabled) {
return dc.endDialog();
}
String dateString = this.date;
LocalDateTime date = null;
try {
date = LocalDateTime.parse(dateString);
} catch (DateTimeParseException ex) {
throw new IllegalArgumentException("Date is invalid");
}
ZonedDateTime zonedDate = date.atZone(ZoneOffset.UTC);
ZonedDateTime now = LocalDateTime.now().atZone(ZoneOffset.UTC);
if (zonedDate.isBefore(now)) {
throw new IllegalArgumentException("Date must be in the future");
}
// create ContinuationActivity from the conversation reference.
Activity activity = dc.getContext().getActivity().getConversationReference().getContinuationActivity();
activity.setValue(this.value);
Duration visibility = Duration.between(zonedDate, now);
Duration ttl = visibility.plusMinutes(2);
QueueStorage queueStorage = dc.getContext().getTurnState().get("QueueStorage");
if (queueStorage == null) {
throw new NullPointerException("Unable to locate QueueStorage in HostContext");
}
return queueStorage.queueActivity(activity, visibility, ttl).thenCompose(receipt -> {
// return the receipt as the result
return dc.endDialog(receipt);
});
}
/**
* Gets an optional expression which if is true will disable this action.
* "user.age > 18".
* @return A boolean expression.
*/
public Boolean getDisabled() {
return disabled;
}
/**
* Sets an optional expression which if is true will disable this action.
* "user.age > 18".
* @param withDisabled A boolean expression.
*/
public void setDisabled(Boolean withDisabled) {
this.disabled = withDisabled;
}
/**
* Gets the expression which resolves to the date/time to continue the conversation.
* @return Date/time string in ISO 8601 format to continue conversation.
*/
public String getDate() {
return date;
}
/**
* Sets the expression which resolves to the date/time to continue the conversation.
* @param withDate Date/time string in ISO 8601 format to continue conversation.
*/
public void setDate(String withDate) {
this.date = withDate;
}
/**
* Gets an optional value to use for EventActivity.Value.
* @return The value to use for the EventActivity.Value payload.
*/
public String getValue() {
return value;
}
/**
* Sets an optional value to use for EventActivity.Value.
* @param withValue The value to use for the EventActivity.Value payload.
*/
public void setValue(String withValue) {
this.value = withValue;
}
}
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.builder;
import com.microsoft.bot.schema.Activity;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
/**
* A base class for enqueueing an Activity for later processing.
*/
public abstract class QueueStorage {
/**
* Enqueues an Activity for later processing. The visibility timeout specifies how long the message
* should be invisible to Dequeue and Peek operations.
* @param activity The {@link Activity} to be queued for later processing.
* @param visibilityTimeout Visibility timeout. Optional with a default value of 0. Cannot be larger than 7 days.
* @param timeToLive Specifies the time-to-live interval for the message.
* @return A result string.
*/
public abstract CompletableFuture<String> queueActivity(Activity activity,
@Nullable Duration visibilityTimeout,
@Nullable Duration timeToLive);
}

Просмотреть файл

@ -347,8 +347,10 @@ public class TestAdapter extends BotAdapter implements UserTokenProvider {
public Activity makeActivity(String withText) {
Integer next = nextId++;
String locale = !getLocale().isEmpty() ? getLocale() : "en-us";
Activity activity = new Activity(ActivityTypes.MESSAGE) {
{
setLocale(locale);
setFrom(conversationReference().getUser());
setRecipient(conversationReference().getBot());
setConversation(conversationReference().getConversation());
@ -416,8 +418,8 @@ public class TestAdapter extends BotAdapter implements UserTokenProvider {
reference.setChannelId("test");
reference.setServiceUrl("https://test.com");
reference.setConversation(new ConversationAccount(false, name, name, null, null, null, null));
reference.setUser(new ChannelAccount(user.toLowerCase(), user.toLowerCase()));
reference.setBot(new ChannelAccount(bot.toLowerCase(), bot.toLowerCase()));
reference.setUser(new ChannelAccount(user.toLowerCase(), user));
reference.setBot(new ChannelAccount(bot.toLowerCase(), bot));
reference.setLocale("en-us");
return reference;
}

Просмотреть файл

@ -79,6 +79,7 @@ public class ConversationReference {
activity.setConversation(getConversation());
activity.setRecipient(getBot());
activity.setLocale(getLocale());
activity.setServiceUrl(getServiceUrl());
activity.setFrom(getUser());
activity.setRelatesTo(this);
return activity;