[SDK][Bot-Azure] Add BlobsStorage component (#1066)

* Add azure-storage-blob in pom

* Migrate BlobsStorage and BlobsTranscriptStore

* Add tests for BlobsStorage

* Add statePersistsThroughMultiTurn in StorageBaseTests for parity

* Add TestPocoState model

* Improve precision for timeStamp adding NanoClockHelper

* Add delay of 500ms to separate activities in tests

* Remove double braces in GenerateAnswerUtils

* Migrate ensureActivityHasId method to improve parity

* Apply format changes

* Replace assertEmulator with runIfEmulator

* Apply Tracy feedback

* Rename afterTest to testCleanup

Co-authored-by: Martin Battaglino <martinbatta32@gmail.com>
Co-authored-by: Victor Grycuk <victor.grycuk@southworks.com>
Co-authored-by: Victor <victor@grycuk.net>
This commit is contained in:
Franco Alvarez 2021-03-23 16:56:56 -03:00 коммит произвёл GitHub
Родитель b7c65cac38
Коммит 5c00a74e28
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 1818 добавлений и 38 удалений

Просмотреть файл

@ -282,20 +282,18 @@ public class GenerateAnswerUtils {
QnAMakerOptions withOptions
) {
String knowledgeBaseId = this.endpoint.getKnowledgeBaseId();
QnAMakerTraceInfo traceInfo = new QnAMakerTraceInfo() {
{
setMessage(messageActivity);
setQueryResults(result);
setKnowledgeBaseId(knowledgeBaseId);
setScoreThreshold(withOptions.getScoreThreshold());
setTop(withOptions.getTop());
setStrictFilters(withOptions.getStrictFilters());
setContext(withOptions.getContext());
setQnAId(withOptions.getQnAId());
setIsTest(withOptions.getIsTest());
setRankerType(withOptions.getRankerType());
}
};
QnAMakerTraceInfo traceInfo = new QnAMakerTraceInfo();
traceInfo.setMessage(messageActivity);
traceInfo.setQueryResults(result);
traceInfo.setKnowledgeBaseId(knowledgeBaseId);
traceInfo.setScoreThreshold(withOptions.getScoreThreshold());
traceInfo.setTop(withOptions.getTop());
traceInfo.setStrictFilters(withOptions.getStrictFilters());
traceInfo.setContext(withOptions.getContext());
traceInfo.setQnAId(withOptions.getQnAId());
traceInfo.setIsTest(withOptions.getIsTest());
traceInfo.setRankerType(withOptions.getRankerType());
Activity traceActivity = Activity.createTraceActivity(
QnAMaker.QNA_MAKER_NAME,
QnAMaker.QNA_MAKER_TRACE_TYPE,

Просмотреть файл

@ -11,7 +11,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@ -126,21 +125,17 @@ public class QnAMakerTests {
Assert.assertTrue(results.length == 1);
Assert.assertEquals("BaseCamp: You can use a damp rag to clean around the Power Pack", results[0].getAnswer());
}
conversationId[0] = turnContext.getActivity().getConversation().getId();
Activity typingActivity = new Activity() {
{
setType(ActivityTypes.TYPING);
setRelatesTo(turnContext.getActivity().getRelatesTo());
}
};
turnContext.sendActivity(typingActivity).join();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// Empty error
delay(500);
conversationId[0] = turnContext.getActivity().getConversation().getId();
Activity typingActivity = new Activity() {
{
setType(ActivityTypes.TYPING);
setRelatesTo(turnContext.getActivity().getRelatesTo());
}
turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join();
};
turnContext.sendActivity(typingActivity).join();
delay(500);
turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join();
return CompletableFuture.completedFuture(null);
})
.send("how do I clean the stove?")
@ -2087,6 +2082,18 @@ public class QnAMakerTests {
.setBody(mockResponse));
}
/**
* Time period delay.
* @param milliseconds Time to delay.
*/
private void delay(int milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class OverrideTelemetry extends QnAMaker {
public OverrideTelemetry(QnAMakerEndpoint endpoint, QnAMakerOptions options,

Просмотреть файл

@ -86,6 +86,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.10.0</version>
</dependency>
</dependencies>
<profiles>

Просмотреть файл

@ -0,0 +1,255 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure.blobs;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StoreItem;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Implements {@link Storage} using Azure Storage Blobs.
* This class uses a single Azure Storage Blob Container.
* Each entity or {@link StoreItem} is serialized into a JSON string and stored in an individual text blob.
* Each blob is named after the store item key, which is encoded so that it conforms a valid blob name.
* an entity is an {@link StoreItem}, the storage object will set the entity's {@link StoreItem}
* property value to the blob's ETag upon read. Afterward, an {@link BlobRequestConditions} with the ETag value
* will be generated during Write. New entities start with a null ETag.
*/
public class BlobsStorage implements Storage {
private ObjectMapper objectMapper;
private final BlobContainerClient containerClient;
private final Integer millisecondsTimeout = 2000;
private final Integer retryTimes = 8;
/**
* Initializes a new instance of the {@link BlobsStorage} class.
* @param dataConnectionString Azure Storage connection string.
* @param containerName Name of the Blob container where entities will be stored.
*/
public BlobsStorage(String dataConnectionString, String containerName) {
if (StringUtils.isBlank(dataConnectionString)) {
throw new IllegalArgumentException("dataConnectionString is required.");
}
if (StringUtils.isBlank(containerName)) {
throw new IllegalArgumentException("containerName is required.");
}
objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.findAndRegisterModules()
.enableDefaultTyping();
containerClient = new BlobContainerClientBuilder()
.connectionString(dataConnectionString)
.containerName(containerName)
.buildClient();
}
/**
* Deletes entity blobs from the configured container.
* @param keys An array of entity keys.
* @return A task that represents the work queued to execute.
*/
@Override
public CompletableFuture<Void> delete(String[] keys) {
if (keys == null) {
throw new IllegalArgumentException("The 'keys' parameter is required.");
}
for (String key: keys) {
String blobName = getBlobName(key);
BlobClient blobClient = containerClient.getBlobClient(blobName);
if (blobClient.exists()) {
try {
blobClient.delete();
} catch (BlobStorageException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
return CompletableFuture.completedFuture(null);
}
/**
* Retrieve entities from the configured blob container.
* @param keys An array of entity keys.
* @return A task that represents the work queued to execute.
*/
@Override
public CompletableFuture<Map<String, Object>> read(String[] keys) {
if (keys == null) {
throw new IllegalArgumentException("The 'keys' parameter is required.");
}
if (!containerClient.exists()) {
try {
containerClient.create();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Map<String, Object> items = new HashMap<>();
for (String key : keys) {
String blobName = getBlobName(key);
BlobClient blobClient = containerClient.getBlobClient(blobName);
innerReadBlob(blobClient).thenAccept(value -> {
if (value != null) {
items.put(key, value);
}
});
}
return CompletableFuture.completedFuture(items);
}
/**
* Stores a new entity in the configured blob container.
* @param changes The changes to write to storage.
* @return A task that represents the work queued to execute.
*/
public CompletableFuture<Void> write(Map<String, Object> changes) {
if (changes == null) {
throw new IllegalArgumentException("The 'changes' parameter is required.");
}
if (!containerClient.exists()) {
try {
containerClient.create();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
for (Map.Entry<String, Object> keyValuePair : changes.entrySet()) {
Object newValue = keyValuePair.getValue();
StoreItem storeItem = newValue instanceof StoreItem ? (StoreItem) newValue : null;
// "*" eTag in StoreItem converts to null condition for AccessCondition
boolean isNullOrEmpty = storeItem == null || StringUtils.isBlank(storeItem.getETag())
|| storeItem.getETag().equals("*");
BlobRequestConditions accessCondition = !isNullOrEmpty
? new BlobRequestConditions().setIfMatch(storeItem.getETag())
: null;
String blobName = getBlobName(keyValuePair.getKey());
BlobClient blobReference = containerClient.getBlobClient(blobName);
try {
String json = objectMapper.writeValueAsString(newValue);
InputStream stream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
//verify the corresponding length
blobReference.uploadWithResponse(stream, stream.available(),
null, null,
null, null, accessCondition, null, Context.NONE);
} catch (HttpResponseException e) {
if (e.getResponse().getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
StringBuilder sb =
new StringBuilder("An error occurred while trying to write an object. The underlying ");
sb.append(BlobErrorCode.INVALID_BLOCK_LIST);
sb.append(" error is commonly caused due to "
+ "concurrently uploading an object larger than 128MB in size.");
throw new HttpResponseException(sb.toString(), e.getResponse());
}
} catch (IOException e) {
e.printStackTrace();
}
}
return CompletableFuture.completedFuture(null);
}
private static String getBlobName(String key) {
if (StringUtils.isBlank(key)) {
throw new IllegalArgumentException("The 'key' parameter is required.");
}
String blobName;
try {
blobName = URLEncoder.encode(key, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException("The key could not be encoded");
}
return blobName;
}
private CompletableFuture<Object> innerReadBlob(BlobClient blobReference) {
Integer i = 0;
while (true) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
blobReference.download(outputStream);
String contentString = outputStream.toString();
Object obj;
// We are doing this try/catch because we are receiving String or HashMap
try {
// We need to deserialize to an Object class since there are contentString which has an Object type
obj = objectMapper.readValue(contentString, Object.class);
} catch (MismatchedInputException ex) {
// In case of the contentString has the structure of a HashMap,
// we need to deserialize it to a HashMap object
obj = objectMapper.readValue(contentString, HashMap.class);
}
if (obj instanceof StoreItem) {
String eTag = blobReference.getProperties().getETag();
((StoreItem) obj).setETag(eTag);
}
return CompletableFuture.completedFuture(obj);
} catch (HttpResponseException e) {
if (e.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) {
// additional retry logic,
// even though this is a read operation blob storage can return 412 if there is contention
if (i++ < retryTimes) {
try {
TimeUnit.MILLISECONDS.sleep(millisecondsTimeout);
continue;
} catch (InterruptedException ex) {
break;
}
}
throw e;
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
return CompletableFuture.completedFuture(null);
}
}

Просмотреть файл

@ -0,0 +1,502 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure.blobs;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.PagedResponse;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.microsoft.bot.builder.BotAssert;
import com.microsoft.bot.builder.PagedResult;
import com.microsoft.bot.builder.TranscriptInfo;
import com.microsoft.bot.builder.TranscriptStore;
import com.microsoft.bot.schema.Activity;
import com.microsoft.bot.schema.ActivityTypes;
import com.microsoft.bot.schema.ChannelAccount;
import com.microsoft.bot.schema.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* The blobs transcript store stores transcripts in an Azure Blob container.
* Each activity is stored as json blob in structure of
* container/{channelId]/{conversationId}/{Timestamp.ticks}-{activity.id}.json.
*/
public class BlobsTranscriptStore implements TranscriptStore {
// Containers checked for creation.
private static final HashSet<String> CHECKED_CONTAINERS = new HashSet<String>();
private final Integer milisecondsTimeout = 2000;
private final Integer retryTimes = 3;
private final Integer longRadix = 16;
private final Integer multipleProductValue = 10_000_000;
private final ObjectMapper jsonSerializer;
private BlobContainerClient containerClient;
/**
* Initializes a new instance of the {@link BlobsTranscriptStore} class.
* @param dataConnectionString Azure Storage connection string.
* @param containerName Name of the Blob container where entities will be stored.
*/
public BlobsTranscriptStore(String dataConnectionString, String containerName) {
if (StringUtils.isBlank(dataConnectionString)) {
throw new IllegalArgumentException("dataConnectionString");
}
if (StringUtils.isBlank(containerName)) {
throw new IllegalArgumentException("containerName");
}
jsonSerializer = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.enable(SerializationFeature.INDENT_OUTPUT)
.findAndRegisterModules();
// Triggers a check for the existence of the container
containerClient = this.getContainerClient(dataConnectionString, containerName);
}
/**
* Log an activity to the transcript.
* @param activity Activity being logged.
* @return A CompletableFuture that represents the work queued to execute.
*/
public CompletableFuture<Void> logActivity(Activity activity) {
BotAssert.activityNotNull(activity);
switch (activity.getType()) {
case ActivityTypes.MESSAGE_UPDATE:
Activity updatedActivity = null;
try {
updatedActivity = jsonSerializer
.readValue(jsonSerializer.writeValueAsString(activity), Activity.class);
} catch (IOException ex) {
ex.printStackTrace();
}
updatedActivity.setType(ActivityTypes.MESSAGE); // fixup original type (should be Message)
Activity finalUpdatedActivity = updatedActivity;
innerReadBlob(activity).thenAccept(activityAndBlob -> {
if (activityAndBlob != null && activityAndBlob.getLeft() != null) {
finalUpdatedActivity.setLocalTimestamp(activityAndBlob.getLeft().getLocalTimestamp());
finalUpdatedActivity.setTimestamp(activityAndBlob.getLeft().getTimestamp());
logActivityToBlobClient(finalUpdatedActivity, activityAndBlob.getRight(), true)
.thenApply(task -> CompletableFuture.completedFuture(null));
} else {
// The activity was not found, so just add a record of this update.
this.innerLogActivity(finalUpdatedActivity)
.thenApply(task -> CompletableFuture.completedFuture(null));
}
});
return CompletableFuture.completedFuture(null);
case ActivityTypes.MESSAGE_DELETE:
innerReadBlob(activity).thenAccept(activityAndBlob -> {
if (activityAndBlob != null && activityAndBlob.getLeft() != null) {
ChannelAccount from = new ChannelAccount();
from.setId("deleted");
from.setRole(activityAndBlob.getLeft().getFrom().getRole());
ChannelAccount recipient = new ChannelAccount();
recipient.setId("deleted");
recipient.setRole(activityAndBlob.getLeft().getRecipient().getRole());
// tombstone the original message
Activity tombstonedActivity = new Activity(ActivityTypes.MESSAGE_DELETE);
tombstonedActivity.setId(activityAndBlob.getLeft().getId());
tombstonedActivity.setFrom(from);
tombstonedActivity.setRecipient(recipient);
tombstonedActivity.setLocale(activityAndBlob.getLeft().getLocale());
tombstonedActivity.setLocalTimestamp(activityAndBlob.getLeft().getTimestamp());
tombstonedActivity.setTimestamp(activityAndBlob.getLeft().getTimestamp());
tombstonedActivity.setChannelId(activityAndBlob.getLeft().getChannelId());
tombstonedActivity.setConversation(activityAndBlob.getLeft().getConversation());
tombstonedActivity.setServiceUrl(activityAndBlob.getLeft().getServiceUrl());
tombstonedActivity.setReplyToId(activityAndBlob.getLeft().getReplyToId());
logActivityToBlobClient(tombstonedActivity, activityAndBlob.getRight(), true)
.thenApply(task -> CompletableFuture.completedFuture(null));
}
});
return CompletableFuture.completedFuture(null);
default:
this.innerLogActivity(activity)
.thenApply(task -> CompletableFuture.completedFuture(null));
return CompletableFuture.completedFuture(null);
}
}
/**
* Get activities for a conversation (Aka the transcript).
* @param channelId The ID of the channel the conversation is in.
* @param conversationId The ID of the conversation.
* @param continuationToken The continuation token (if available).
* @param startDate A cutoff date. Activities older than this date are
* not included.
* @return PagedResult of activities.
*/
public CompletableFuture<PagedResult<Activity>> getTranscriptActivities(String channelId, String conversationId,
@Nullable String continuationToken,
OffsetDateTime startDate) {
if (startDate == null) {
startDate = OffsetDateTime.MIN;
}
final int pageSize = 20;
if (StringUtils.isBlank(channelId)) {
throw new IllegalArgumentException("Missing channelId");
}
if (StringUtils.isBlank(conversationId)) {
throw new IllegalArgumentException("Missing conversationId");
}
PagedResult<Activity> pagedResult = new PagedResult<Activity>();
String token = null;
List<BlobItem> blobs = new ArrayList<BlobItem>();
do {
String prefix = String.format("%s/%s/", sanitizeKey(channelId), sanitizeKey(conversationId));
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
.iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
OffsetDateTime parseDateTime = OffsetDateTime.parse(blobItem.getMetadata().get("Timestamp"));
if (parseDateTime.isAfter(startDate)
|| parseDateTime.isEqual(startDate)) {
if (continuationToken != null) {
if (blobItem.getName().equals(continuationToken)) {
// we found continuation token
continuationToken = null;
}
} else {
blobs.add(blobItem);
if (blobs.size() == pageSize) {
break;
}
}
}
}
// Get the continuation token and loop until it is empty.
token = blobPage.getContinuationToken();
}
} while (!StringUtils.isBlank(token) && blobs.size() < pageSize);
pagedResult.setItems(blobs
.stream()
.map(bl -> {
BlobClient blobClient = containerClient.getBlobClient(bl.getName());
return this.getActivityFromBlobClient(blobClient);
})
.map(t -> t.join())
.collect(Collectors.toList()));
if (pagedResult.getItems().size() == pageSize) {
pagedResult.setContinuationToken(blobs.get(blobs.size() - 1).getName());
}
return CompletableFuture.completedFuture(pagedResult);
}
/**
* List conversations in the channelId.
* @param channelId The ID of the channel.
* @param continuationToken The continuation token (if available).
* @return A CompletableFuture that represents the work queued to execute.
*/
public CompletableFuture<PagedResult<TranscriptInfo>> listTranscripts(String channelId,
@Nullable String continuationToken) {
final int pageSize = 20;
if (StringUtils.isBlank(channelId)) {
throw new IllegalArgumentException("Missing channelId");
}
String token = null;
List<TranscriptInfo> conversations = new ArrayList<TranscriptInfo>();
do {
String prefix = String.format("%s/", sanitizeKey(channelId));
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient.
listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
.iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
// Unescape the Id we escaped when we saved it
String conversationId = new String();
String lastName = Arrays.stream(blobItem.getName().split("/"))
.reduce((first, second) -> second.length() > 0 ? second : first).get();
try {
conversationId = URLDecoder.decode(lastName, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException ex) {
ex.printStackTrace();
}
TranscriptInfo conversation =
new TranscriptInfo(conversationId, channelId, blobItem.getProperties().getCreationTime());
if (continuationToken != null) {
if (StringUtils.equals(conversation.getId(), continuationToken)) {
// we found continuation token
continuationToken = null;
}
// skip record
} else {
conversations.add(conversation);
if (conversations.size() == pageSize) {
break;
}
}
}
}
} while (!StringUtils.isBlank(token) && conversations.size() < pageSize);
PagedResult<TranscriptInfo> pagedResult = new PagedResult<TranscriptInfo>();
pagedResult.setItems(conversations);
if (pagedResult.getItems().size() == pageSize) {
pagedResult.setContinuationToken(pagedResult.getItems().get(pagedResult.getItems().size() - 1).getId());
}
return CompletableFuture.completedFuture(pagedResult);
}
/**
* Delete a specific conversation and all of it's activities.
* @param channelId The ID of the channel the conversation is in.
* @param conversationId The ID of the conversation to delete.
* @return A CompletableFuture that represents the work queued to execute.
*/
public CompletableFuture<Void> deleteTranscript(String channelId, String conversationId) {
if (StringUtils.isBlank(channelId)) {
throw new IllegalArgumentException("Missing channelId");
}
if (StringUtils.isBlank(conversationId)) {
throw new IllegalArgumentException("Missing conversationId");
}
String token = null;
do {
String prefix = String.format("%s/%s/", sanitizeKey(channelId), sanitizeKey(conversationId));
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null).iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
BlobClient blobClient = containerClient.getBlobClient(blobItem.getName());
if (blobClient.exists()) {
try {
blobClient.delete();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
// Get the continuation token and loop until it is empty.
token = blobPage.getContinuationToken();
}
}
} while (!StringUtils.isBlank(token));
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Pair<Activity, BlobClient>> innerReadBlob(Activity activity) {
int i = 0;
while (true) {
try {
String token = null;
do {
String prefix = String.format("%s/%s/",
sanitizeKey(activity.getChannelId()), sanitizeKey(activity.getConversation().getId()));
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient
.listBlobsByHierarchy("/",
this.getOptionsWithMetadata(prefix), null).iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
if (blobItem.getMetadata().get("Id").equals(activity.getId())) {
BlobClient blobClient = containerClient.getBlobClient(blobItem.getName());
return this.getActivityFromBlobClient(blobClient)
.thenApply(blobActivity ->
new Pair<Activity, BlobClient>(blobActivity, blobClient));
}
}
// Get the continuation token and loop until it is empty.
token = blobPage.getContinuationToken();
}
} while (!StringUtils.isBlank(token));
return CompletableFuture.completedFuture(null);
} catch (HttpResponseException ex) {
if (ex.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) {
// additional retry logic,
// even though this is a read operation blob storage can return 412 if there is contention
if (i++ < retryTimes) {
try {
TimeUnit.MILLISECONDS.sleep(milisecondsTimeout);
continue;
} catch (InterruptedException e) {
break;
}
}
throw ex;
}
// This break will finish the while when the catch if condition is false
break;
}
}
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Activity> getActivityFromBlobClient(BlobClient blobClient) {
ByteArrayOutputStream content = new ByteArrayOutputStream();
blobClient.download(content);
String contentString = new String(content.toByteArray());
try {
return CompletableFuture.completedFuture(jsonSerializer.readValue(contentString, Activity.class));
} catch (IOException ex) {
return CompletableFuture.completedFuture(null);
}
}
private CompletableFuture<Void> innerLogActivity(Activity activity) {
String blobName = this.getBlobName(activity);
BlobClient blobClient = containerClient.getBlobClient(blobName);
return logActivityToBlobClient(activity, blobClient, null);
}
private CompletableFuture<Void> logActivityToBlobClient(Activity activity, BlobClient blobClient,
Boolean overwrite) {
if (overwrite == null) {
overwrite = false;
}
String activityJson = null;
try {
activityJson = jsonSerializer.writeValueAsString(activity);
} catch (IOException ex) {
ex.printStackTrace();
}
InputStream data = new ByteArrayInputStream(activityJson.getBytes(StandardCharsets.UTF_8));
try {
blobClient.upload(data, data.available(), overwrite);
} catch (IOException ex) {
ex.printStackTrace();
}
Map<String, String> metaData = new HashMap<String, String>();
metaData.put("Id", activity.getId());
if (activity.getFrom() != null) {
metaData.put("FromId", activity.getFrom().getId());
}
if (activity.getRecipient() != null) {
metaData.put("RecipientId", activity.getRecipient().getId());
}
metaData.put("Timestamp", activity.getTimestamp().toString());
blobClient.setMetadata(metaData);
return CompletableFuture.completedFuture(null);
}
private String getBlobName(Activity activity) {
String blobName = String.format("%s/%s/%s-%s.json",
sanitizeKey(activity.getChannelId()), sanitizeKey(activity.getConversation().getId()),
this.formatTicks(activity.getTimestamp()), sanitizeKey(activity.getId()));
return blobName;
}
private String sanitizeKey(String key) {
// Blob Name rules: case-sensitive any url char
try {
return URLEncoder.encode(key, StandardCharsets.UTF_8.name());
} catch (Exception ex) {
ex.printStackTrace();
}
return "";
}
private BlobContainerClient getContainerClient(String dataConnectionString, String containerName) {
containerName = containerName.toLowerCase();
containerClient = new BlobContainerClientBuilder()
.connectionString(dataConnectionString)
.containerName(containerName)
.buildClient();
if (!CHECKED_CONTAINERS.contains(containerName)) {
CHECKED_CONTAINERS.add(containerName);
if (!containerClient.exists()) {
try {
containerClient.create();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
return containerClient;
}
/**
* Formats a timestamp in a way that is consistent with the C# SDK.
* @param dateTime The dateTime used to get the ticks
* @return The String representing the ticks.
*/
private String formatTicks(OffsetDateTime dateTime) {
final Instant begin = ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0,
ZoneOffset.UTC).toInstant();
final Instant end = dateTime.toInstant();
long secsDiff = Math.subtractExact(end.getEpochSecond(), begin.getEpochSecond());
long totalHundredNanos = Math.multiplyExact(secsDiff, multipleProductValue);
final Long ticks = Math.addExact(totalHundredNanos, (end.getNano() - begin.getNano()) / 100);
return Long.toString(ticks, longRadix);
}
private ListBlobsOptions getOptionsWithMetadata(String prefix) {
BlobListDetails details = new BlobListDetails();
details.setRetrieveMetadata(true);
ListBlobsOptions options = new ListBlobsOptions();
options.setDetails(details);
options.setPrefix(prefix);
return options;
}
}

Просмотреть файл

@ -0,0 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for
// license information.
/**
* This package contains the classes for bot-azure.
*/
package com.microsoft.bot.azure.blobs;

Просмотреть файл

@ -3,6 +3,6 @@
// license information.
/**
* This package contains the classes for bot-integration-core.
* This package contains the classes for bot-azure.
*/
package com.microsoft.bot.azure;

Просмотреть файл

@ -0,0 +1,572 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.bot.azure.blobs.BlobsTranscriptStore;
import com.microsoft.bot.builder.PagedResult;
import com.microsoft.bot.builder.TranscriptInfo;
import com.microsoft.bot.builder.TranscriptLoggerMiddleware;
import com.microsoft.bot.builder.TranscriptStore;
import com.microsoft.bot.builder.adapters.TestAdapter;
import com.microsoft.bot.builder.adapters.TestFlow;
import com.microsoft.bot.schema.Activity;
import com.microsoft.bot.schema.ActivityTypes;
import com.microsoft.bot.schema.ChannelAccount;
import com.microsoft.bot.schema.ConversationAccount;
import com.microsoft.bot.schema.ConversationReference;
import com.microsoft.bot.schema.ResourceResponse;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* These tests require Azure Storage Emulator v5.7
* The emulator must be installed at this path C:\Program Files (x86)\Microsoft SDKs\Azure\Storage Emulator\AzureStorageEmulator.exe
* More info: https://docs.microsoft.com/azure/storage/common/storage-use-emulator
*/
public class TranscriptStoreTests {
@Rule
public TestName TEST_NAME = new TestName();
protected String blobStorageEmulatorConnectionString =
"AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;";
private String channelId = "test";
private static final String[] CONVERSATION_IDS = {
"qaz", "wsx", "edc", "rfv", "tgb", "yhn", "ujm", "123", "456", "789",
"ZAQ", "XSW", "CDE", "VFR", "BGT", "NHY", "NHY", "098", "765", "432",
"zxc", "vbn", "mlk", "jhy", "yui", "kly", "asd", "asw", "aaa", "zzz",
};
private static final String[] CONVERSATION_SPECIAL_IDS = { "asd !&/#.'+:?\"", "ASD@123<>|}{][", "$%^;\\*()_" };
private String getContainerName() {
return String.format("blobstranscript%s", TEST_NAME.getMethodName().toLowerCase());
}
private TranscriptStore getTranscriptStore() {
return new BlobsTranscriptStore(blobStorageEmulatorConnectionString, getContainerName());
}
private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install.";
@BeforeClass
public static void allTestsInit() throws IOException, InterruptedException {
assertEmulator();
}
@After
public void testCleanup() {
BlobContainerClient containerClient = new BlobContainerClientBuilder()
.connectionString(blobStorageEmulatorConnectionString)
.containerName(getContainerName())
.buildClient();
if (containerClient.exists()) {
containerClient.delete();
}
}
// These tests require Azure Storage Emulator v5.7
@Test
public void blobTranscriptParamTest() {
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(null, getContainerName()));
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(blobStorageEmulatorConnectionString, null));
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(new String(), getContainerName()));
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(blobStorageEmulatorConnectionString, new String()));
}
@Test
public void transcriptsEmptyTest() {
TranscriptStore transcriptStore = getTranscriptStore();
String unusedChannelId = UUID.randomUUID().toString();
PagedResult<TranscriptInfo> transcripts = transcriptStore.listTranscripts(unusedChannelId).join();
Assert.assertEquals(0, transcripts.getItems().size());
}
@Test
public void activityEmptyTest() {
TranscriptStore transcriptStore = getTranscriptStore();
for(String convoId: CONVERSATION_SPECIAL_IDS) {
PagedResult<Activity> activities = transcriptStore.getTranscriptActivities(channelId, convoId).join();
Assert.assertEquals(0, activities.getItems().size());
}
}
@Test
public void activityAddTest() {
TranscriptStore transcriptStore = getTranscriptStore();
Activity[] loggedActivities = new Activity[5];
List<Activity> activities = new ArrayList<Activity>();
for (int i = 0; i < 5; i++) {
Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_IDS);
transcriptStore.logActivity(a).join();
activities.add(a);
loggedActivities[i] = transcriptStore.getTranscriptActivities(channelId, CONVERSATION_IDS[i])
.join().getItems().get(0);
}
Assert.assertEquals(5, loggedActivities.length);
}
@Test
public void transcriptRemoveTest() {
TranscriptStore transcriptStore = getTranscriptStore();
for (int i = 0; i < 5; i++) {
Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_IDS);
transcriptStore.logActivity(a).join();
transcriptStore.deleteTranscript(a.getChannelId(), a.getConversation().getId()).join();
PagedResult<Activity> loggedActivities = transcriptStore
.getTranscriptActivities(channelId, CONVERSATION_IDS[i]).join();
Assert.assertEquals(0, loggedActivities.getItems().size());
}
}
@Test
public void activityAddSpecialCharsTest() {
TranscriptStore transcriptStore = getTranscriptStore();
Activity[] loggedActivities = new Activity[CONVERSATION_SPECIAL_IDS.length];
List<Activity> activities = new ArrayList<Activity>();
for (int i = 0; i < CONVERSATION_SPECIAL_IDS.length; i++) {
Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_SPECIAL_IDS);
transcriptStore.logActivity(a).join();
activities.add(a);
int pos = i;
transcriptStore.getTranscriptActivities(channelId, CONVERSATION_SPECIAL_IDS[i]).thenAccept(result -> {
loggedActivities[pos] = result.getItems().get(0);
});
}
Assert.assertEquals(activities.size(), loggedActivities.length);
}
@Test
public void transcriptRemoveSpecialCharsTest() {
TranscriptStore transcriptStore = getTranscriptStore();
for (int i = 0; i < CONVERSATION_SPECIAL_IDS.length; i++) {
Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_SPECIAL_IDS);
transcriptStore.deleteTranscript(a.getChannelId(), a.getConversation().getId()).join();
PagedResult<Activity> loggedActivities = transcriptStore.
getTranscriptActivities(channelId, CONVERSATION_SPECIAL_IDS[i]).join();
Assert.assertEquals(0, loggedActivities.getItems().size());
}
}
@Test
public void activityAddPagedResultTest() {
TranscriptStore transcriptStore = getTranscriptStore();
String cleanChannel = UUID.randomUUID().toString();
List<Activity> activities = new ArrayList<Activity>();
for (int i = 0; i < CONVERSATION_IDS.length; i++) {
Activity a = TranscriptStoreTests.createActivity(0, i, CONVERSATION_IDS);
a.setChannelId(cleanChannel);
transcriptStore.logActivity(a).join();
activities.add(a);
}
PagedResult<Activity> loggedPagedResult = transcriptStore.getTranscriptActivities(cleanChannel, CONVERSATION_IDS[0]).join();
String ct = loggedPagedResult.getContinuationToken();
Assert.assertEquals(20, loggedPagedResult.getItems().size());
Assert.assertNotNull(ct);
Assert.assertTrue(loggedPagedResult.getContinuationToken().length() > 0);
loggedPagedResult = transcriptStore.getTranscriptActivities(cleanChannel, CONVERSATION_IDS[0], ct).join();
ct = loggedPagedResult.getContinuationToken();
Assert.assertEquals(10, loggedPagedResult.getItems().size());
Assert.assertNull(ct);
}
@Test
public void transcriptRemovePagedTest() {
TranscriptStore transcriptStore = getTranscriptStore();
int i;
for (i = 0; i < CONVERSATION_SPECIAL_IDS.length; i++) {
Activity a = TranscriptStoreTests.createActivity(i ,i , CONVERSATION_IDS);
transcriptStore.deleteTranscript(a.getChannelId(), a.getConversation().getId()).join();
}
PagedResult<Activity> loggedActivities = transcriptStore.getTranscriptActivities(channelId, CONVERSATION_IDS[i]).join();
Assert.assertEquals(0, loggedActivities.getItems().size());
}
@Test
public void nullParameterTests() {
TranscriptStore store = getTranscriptStore();
Assert.assertThrows(IllegalArgumentException.class, () -> store.logActivity(null));
Assert.assertThrows(IllegalArgumentException.class,
() -> store.getTranscriptActivities(null, CONVERSATION_IDS[0]));
Assert.assertThrows(IllegalArgumentException.class, () -> store.getTranscriptActivities(channelId, null));
}
@Test
public void logActivities() {
TranscriptStore transcriptStore = getTranscriptStore();
ConversationReference conversation = TestAdapter
.createConversationReference(UUID.randomUUID().toString(), "User1", "Bot");
TestAdapter adapter = new TestAdapter(conversation)
.use(new TranscriptLoggerMiddleware(transcriptStore));
new TestFlow(adapter, turnContext -> {
delay(500);
Activity typingActivity = new Activity(ActivityTypes.TYPING);
typingActivity.setRelatesTo(turnContext.getActivity().getRelatesTo());
turnContext.sendActivity(typingActivity).join();
delay(500);
turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join();
return CompletableFuture.completedFuture(null);
})
.send("foo")
.assertReply(activity ->
Assert.assertTrue(activity.isType(ActivityTypes.TYPING))
)
.assertReply("echo:foo")
.send("bar")
.assertReply(activity ->
Assert.assertTrue(activity.isType(ActivityTypes.TYPING))
)
.assertReply("echo:bar")
.startTest().join();
PagedResult<Activity> pagedResult = null;
try {
pagedResult = this.getPagedResult(conversation, 6, null).join();
} catch (TimeoutException ex) {
Assert.fail();
}
Assert.assertEquals(6, pagedResult.getItems().size());
Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("foo", pagedResult.getItems().get(0).getText());
Assert.assertNotNull(pagedResult.getItems().get(1));
Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.TYPING));
Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("echo:foo", pagedResult.getItems().get(2).getText());
Assert.assertTrue(pagedResult.getItems().get(3).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("bar", pagedResult.getItems().get(3).getText());
Assert.assertNotNull(pagedResult.getItems().get(4));
Assert.assertTrue(pagedResult.getItems().get(4).isType(ActivityTypes.TYPING));
Assert.assertTrue(pagedResult.getItems().get(5).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("echo:bar", pagedResult.getItems().get(5).getText());
for (Activity activity: pagedResult.getItems()) {
Assert.assertTrue(!StringUtils.isBlank(activity.getId()));
Assert.assertTrue(activity.getTimestamp().isAfter(OffsetDateTime.MIN));
}
}
@Test
public void logUpdateActivities() {
TranscriptStore transcriptStore = getTranscriptStore();
ConversationReference conversation = TestAdapter
.createConversationReference(UUID.randomUUID().toString(), "User1", "Bot");
TestAdapter adapter = new TestAdapter(conversation)
.use(new TranscriptLoggerMiddleware(transcriptStore));
final Activity[] activityToUpdate = {null};
new TestFlow(adapter, turnContext -> {
delay(500);
if(turnContext.getActivity().getText().equals("update")) {
activityToUpdate[0].setText("new response");
turnContext.updateActivity(activityToUpdate[0]).join();
} else {
Activity activity = turnContext.getActivity().createReply("response");
ResourceResponse response = turnContext.sendActivity(activity).join();
activity.setId(response.getId());
ObjectMapper objectMapper = new ObjectMapper()
.findAndRegisterModules();
try {
// clone the activity, so we can use it to do an update
activityToUpdate[0] = objectMapper.readValue(objectMapper.writeValueAsString(activity), Activity.class);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
}
return CompletableFuture.completedFuture(null);
}).send("foo")
.send("update")
.assertReply("new response")
.startTest().join();
PagedResult<Activity> pagedResult = null;
try {
pagedResult = this.getPagedResult(conversation, 3, null).join();
} catch (TimeoutException ex) {
Assert.fail();
}
Assert.assertEquals(3, pagedResult.getItems().size());
Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("foo", pagedResult.getItems().get(0).getText());
Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("new response", pagedResult.getItems().get(1).getText());
Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("update", pagedResult.getItems().get(2).getText());
}
@Test
public void logMissingUpdateActivity() {
TranscriptStore transcriptStore = getTranscriptStore();
ConversationReference conversation = TestAdapter
.createConversationReference(UUID.randomUUID().toString(), "User1", "Bot");
TestAdapter adapter = new TestAdapter(conversation)
.use(new TranscriptLoggerMiddleware(transcriptStore));
final String[] fooId = {new String()};
ObjectMapper objectMapper = new ObjectMapper()
.findAndRegisterModules();
new TestFlow(adapter, turnContext -> {
fooId[0] = turnContext.getActivity().getId();
Activity updateActivity = null;
try {
// clone the activity, so we can use it to do an update
updateActivity = objectMapper.readValue(objectMapper.writeValueAsString(turnContext.getActivity()), Activity.class);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
updateActivity.setText("updated response");
ResourceResponse response = turnContext.updateActivity(updateActivity).join();
return CompletableFuture.completedFuture(null);
}).send("foo")
.startTest().join();
delay(3000);
PagedResult<Activity> pagedResult = null;
try {
pagedResult = this.getPagedResult(conversation, 2, null).join();
} catch (TimeoutException ex) {
Assert.fail();
}
Assert.assertEquals(2, pagedResult.getItems().size());
Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE));
Assert.assertEquals(fooId[0], pagedResult.getItems().get(0).getId());
Assert.assertEquals("foo", pagedResult.getItems().get(0).getText());
Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE));
Assert.assertTrue(pagedResult.getItems().get(1).getId().startsWith("g_"));
Assert.assertEquals("updated response", pagedResult.getItems().get(1).getText());
}
@Test
public void testDateLogUpdateActivities() {
TranscriptStore transcriptStore = getTranscriptStore();
OffsetDateTime dateTimeStartOffset1 = OffsetDateTime.now();
ConversationReference conversation = TestAdapter
.createConversationReference(UUID.randomUUID().toString(), "User1", "Bot");
TestAdapter adapter = new TestAdapter(conversation)
.use(new TranscriptLoggerMiddleware(transcriptStore));
final Activity[] activityToUpdate = {null};
new TestFlow(adapter, turnContext -> {
if (turnContext.getActivity().getText().equals("update")) {
activityToUpdate[0].setText("new response");
turnContext.updateActivity(activityToUpdate[0]).join();
} else {
Activity activity = turnContext.getActivity().createReply("response");
ResourceResponse response = turnContext.sendActivity(activity).join();
activity.setId(response.getId());
ObjectMapper objectMapper = new ObjectMapper().findAndRegisterModules();
try {
// clone the activity, so we can use it to do an update
activityToUpdate[0] = objectMapper.readValue(objectMapper.writeValueAsString(activity), Activity.class);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
}
return CompletableFuture.completedFuture(null);
}).send("foo")
.send("update")
.assertReply("new response")
.startTest().join();
try {
TimeUnit.MILLISECONDS.sleep(5000);
} catch (InterruptedException e) {
// Empty error
}
// Perform some queries
PagedResult<Activity> pagedResult = transcriptStore.getTranscriptActivities(
conversation.getChannelId(),
conversation.getConversation().getId(),
null,
dateTimeStartOffset1).join();
Assert.assertEquals(3, pagedResult.getItems().size());
Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("foo", pagedResult.getItems().get(0).getText());
Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("new response", pagedResult.getItems().get(1).getText());
Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("update", pagedResult.getItems().get(2).getText());
// Perform some queries
pagedResult = transcriptStore.getTranscriptActivities(
conversation.getChannelId(),
conversation.getConversation().getId(),
null,
OffsetDateTime.MIN).join();
Assert.assertEquals(3, pagedResult.getItems().size());
Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("foo", pagedResult.getItems().get(0).getText());
Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("new response", pagedResult.getItems().get(1).getText());
Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("update", pagedResult.getItems().get(2).getText());
// Perform some queries
pagedResult = transcriptStore.getTranscriptActivities(
conversation.getChannelId(),
conversation.getConversation().getId(),
null,
OffsetDateTime.MAX).join();
Assert.assertEquals(0, pagedResult.getItems().size());
}
@Test
public void logDeleteActivities() {
TranscriptStore transcriptStore = getTranscriptStore();
ConversationReference conversation = TestAdapter
.createConversationReference(UUID.randomUUID().toString(), "User1", "Bot");
TestAdapter adapter = new TestAdapter(conversation)
.use(new TranscriptLoggerMiddleware(transcriptStore));
final String[] activityId = {null};
new TestFlow(adapter, turnContext -> {
delay(500);
if (turnContext.getActivity().getText().equals("deleteIt")) {
turnContext.deleteActivity(activityId[0]).join();
} else {
Activity activity = turnContext.getActivity().createReply("response");
ResourceResponse response = turnContext.sendActivity(activity).join();
activityId[0] = response.getId();
}
return CompletableFuture.completedFuture(null);
}).send("foo")
.assertReply("response")
.send("deleteIt")
.startTest().join();
PagedResult<Activity> pagedResult = null;
try {
pagedResult = this.getPagedResult(conversation, 3, null).join();
} catch (TimeoutException ex) {
Assert.fail();
}
Assert.assertEquals(3, pagedResult.getItems().size());
Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("foo", pagedResult.getItems().get(0).getText());
Assert.assertNotNull(pagedResult.getItems().get(1));
Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE_DELETE));
Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE));
Assert.assertEquals("deleteIt", pagedResult.getItems().get(2).getText());
}
protected static Activity createActivity(Integer i, Integer j, String[] CONVERSATION_IDS) {
return TranscriptStoreTests.createActivity(j, CONVERSATION_IDS[i]);
}
private static Activity createActivity(Integer j, String conversationId) {
ConversationAccount conversationAccount = new ConversationAccount();
conversationAccount.setId(conversationId);
Activity activity = new Activity(ActivityTypes.MESSAGE);
activity.setId(StringUtils.leftPad(String.valueOf(j + 1), 2, "0"));
activity.setChannelId("test");
activity.setText("test");
activity.setConversation(conversationAccount);
activity.setTimestamp(OffsetDateTime.now());
activity.setFrom(new ChannelAccount("testUser"));
activity.setRecipient(new ChannelAccount("testBot"));
return activity;
}
/**
* There are some async oddities within TranscriptLoggerMiddleware that make it difficult to set a short delay when
* running this tests that ensures
* the TestFlow completes while also logging transcripts. Some tests will not pass without longer delays,
* but this method minimizes the delay required.
* @param conversation ConversationReference to pass to GetTranscriptActivitiesAsync()
* that contains ChannelId and Conversation.Id.
* @param expectedLength Expected length of pagedResult array.
* @param maxTimeout Maximum time to wait to retrieve pagedResult.
* @return PagedResult.
* @throws TimeoutException
*/
private CompletableFuture<PagedResult<Activity>> getPagedResult(ConversationReference conversation,
Integer expectedLength, Integer maxTimeout) throws TimeoutException {
TranscriptStore transcriptStore = getTranscriptStore();
if (maxTimeout == null) {
maxTimeout = 5000;
}
PagedResult<Activity> pagedResult = null;
for (int timeout = 0; timeout < maxTimeout; timeout += 500) {
delay(500);
try {
pagedResult = transcriptStore
.getTranscriptActivities(conversation.getChannelId(), conversation.getConversation().getId()).join();
if (pagedResult.getItems().size() >= expectedLength) {
break;
}
} catch (NoSuchElementException ex) { }
catch (NullPointerException e) { }
}
if(pagedResult == null) {
throw new TimeoutException("Unable to retrieve pagedResult in time");
}
return CompletableFuture.completedFuture(pagedResult);
}
private static void assertEmulator() throws IOException, InterruptedException {
if (!checkEmulator()) {
Assert.fail(NO_EMULATOR_MESSAGE);
}
}
private static Boolean checkEmulator() throws IOException, InterruptedException {
Process p = Runtime.getRuntime().exec
("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start");
int result = p.waitFor();
// status = 0: the service was started.
// status = -5: the service is already started. Only one instance of the application
// can be run at the same time.
return result == 0 || result == -5;
}
/**
* Time period delay.
* @param delay Time to delay.
*/
private void delay(Integer delay) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
// Empty error
}
}
}

Просмотреть файл

@ -0,0 +1,291 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure.blobs;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.microsoft.bot.builder.BotAdapter;
import com.microsoft.bot.builder.ConversationState;
import com.microsoft.bot.builder.StatePropertyAccessor;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StorageBaseTests;
import com.microsoft.bot.builder.StoreItem;
import com.microsoft.bot.builder.TurnContext;
import com.microsoft.bot.builder.TurnContextImpl;
import com.microsoft.bot.schema.Activity;
import com.microsoft.bot.schema.ActivityTypes;
import com.microsoft.bot.schema.ConversationAccount;
import com.microsoft.bot.schema.ConversationReference;
import com.microsoft.bot.schema.ResourceResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class BlobsStorageTests extends StorageBaseTests {
@Rule
public TestName testName = new TestName();
private final String connectionString = "AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;";
private static boolean emulatorIsRunning = false;
private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install.";
public String getContainerName() {
return "blobs" + testName.getMethodName().toLowerCase().replace("_", "");
}
@BeforeClass
public static void allTestsInit() throws IOException, InterruptedException {
Process p = Runtime.getRuntime().exec
("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start");
int result = p.waitFor();
// status = 0: the service was started.
// status = -5: the service is already started. Only one instance of the application
// can be run at the same time.
emulatorIsRunning = result == 0 || result == -5;
}
@After
public void testCleanup() {
BlobContainerClient containerClient = new BlobContainerClientBuilder()
.connectionString(connectionString)
.containerName(getContainerName())
.buildClient();
if (containerClient.exists()) {
containerClient.delete();
}
}
@Test
public void blobStorageParamTest() {
if (runIfEmulator()) {
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(null, getContainerName()));
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(connectionString, null));
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(new String(), getContainerName()));
Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(connectionString, new String()));
}
}
@Test
public void testBlobStorageWriteRead()
{
if (runIfEmulator()) {
// Arrange
Storage storage = new BlobsStorage(connectionString, getContainerName());
Map<String, Object> changes = new HashMap();
changes.put("x", "hello");
changes.put("y", "world");
// Act
storage.write(changes).join();
Map<String, Object> result = storage.read(new String[] {"x", "y"}).join();
// Assert
Assert.assertEquals(2, result.size());
Assert.assertEquals("hello", result.get("x"));
Assert.assertEquals("world", result.get("y"));
}
}
@Test
public void testBlobStorageWriteDeleteRead()
{
if (runIfEmulator()) {
// Arrange
Storage storage = new BlobsStorage(connectionString, getContainerName());
Map<String, Object> changes = new HashMap();
changes.put("x", "hello");
changes.put("y", "world");
// Act
storage.write(changes).join();
storage.delete(new String[] { "x" }).join();
Map<String, Object> result = storage.read(new String[] {"x", "y"}).join();
// Assert
Assert.assertEquals(1, result.size());
Assert.assertEquals("world", result.get("y"));
}
}
@Test
public void testBlobStorageChanges() {
if (runIfEmulator()) {
// Arrange
Storage storage = new BlobsStorage(connectionString, getContainerName());
// Act
Map<String, Object> changes = new HashMap();
changes.put("a", "1.0");
changes.put("b", "2.0");
storage.write(changes).join();
changes.clear();
changes.put("c", "3.0");
storage.write(changes).join();
storage.delete(new String[] { "b" }).join();
changes.clear();
changes.put("a", "1.1");
storage.write(changes).join();
Map<String, Object> result = storage.read(new String[] { "a", "b", "c", "d", "e" }).join();
// Assert
Assert.assertEquals(2, result.size());
Assert.assertEquals("1.1", result.get("a"));
Assert.assertEquals("3.0", result.get("c"));
}
}
@Test
public void testConversationStateBlobStorage() {
if (runIfEmulator()) {
// Arrange
Storage storage = new BlobsStorage(connectionString, getContainerName());
ConversationState conversationState = new ConversationState(storage);
StatePropertyAccessor<Prop> propAccessor = conversationState.createProperty("prop");
TestStorageAdapter adapter = new TestStorageAdapter();
Activity activity = new Activity(ActivityTypes.MESSAGE);
activity.setChannelId("123");
ConversationAccount conversationAccount = new ConversationAccount();
conversationAccount.setId("abc");
activity.setConversation(conversationAccount);
// Act
TurnContext turnContext1 = new TurnContextImpl(adapter, activity);
Prop propValue1 = propAccessor.get(turnContext1, Prop::new).join();
propValue1.setX("hello");
propValue1.setY("world");
conversationState.saveChanges(turnContext1).join();
TurnContext turnContext2 = new TurnContextImpl(adapter, activity);
Prop propValue2 = propAccessor.get(turnContext2).join();
// Assert
Assert.assertEquals("hello", propValue2.getX());
Assert.assertEquals("world", propValue2.getY());
propAccessor.delete(turnContext1).join();
conversationState.saveChanges(turnContext1).join();
}
}
@Test
public void testConversationStateBlobStorage_TypeNameHandlingDefault() {
if (runIfEmulator()) {
Storage storage = new BlobsStorage(connectionString, getContainerName());
testConversationStateBlobStorage_Method(storage);
}
}
@Test
public void statePersistsThroughMultiTurn_TypeNameHandlingNone() {
if (runIfEmulator()) {
Storage storage = new BlobsStorage(connectionString, getContainerName());
statePersistsThroughMultiTurn(storage);
}
}
private void testConversationStateBlobStorage_Method(Storage blobs) {
if (runIfEmulator()) {
// Arrange
ConversationState conversationState = new ConversationState(blobs);
StatePropertyAccessor<Prop> propAccessor = conversationState.createProperty("prop");
TestStorageAdapter adapter = new TestStorageAdapter();
Activity activity = new Activity(ActivityTypes.MESSAGE);
activity.setChannelId("123");
ConversationAccount conversationAccount = new ConversationAccount();
conversationAccount.setId("abc");
activity.setConversation(conversationAccount);
// Act
TurnContext turnContext1 = new TurnContextImpl(adapter, activity);
Prop propValue1 = propAccessor.get(turnContext1, Prop::new).join();
propValue1.setX("hello");
propValue1.setY("world");
conversationState.saveChanges(turnContext1).join();
TurnContext turnContext2 = new TurnContextImpl(adapter, activity);
Prop propValue2 = propAccessor.get(turnContext2).join();
// Assert
Assert.assertEquals("hello", propValue2.getX());
Assert.assertEquals("world", propValue2.getY());
}
}
private boolean runIfEmulator() {
if (!emulatorIsRunning) {
System.out.println(NO_EMULATOR_MESSAGE);
return false;
}
return true;
}
private class TestStorageAdapter extends BotAdapter {
@Override
public CompletableFuture<ResourceResponse[]> sendActivities(TurnContext context, List<Activity> activities) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<ResourceResponse> updateActivity(TurnContext context, Activity activity) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<Void> deleteActivity(TurnContext context, ConversationReference reference) {
throw new UnsupportedOperationException();
}
}
private static class Prop {
private String X;
private String Y;
StoreItem storeItem;
public String getX() {
return X;
}
public void setX(String x) {
X = x;
}
public String getY() {
return Y;
}
public void setY(String y) {
Y = y;
}
public StoreItem getStoreItem() {
return storeItem;
}
public void setStoreItem(StoreItem storeItem) {
this.storeItem = storeItem;
}
}
}

Просмотреть файл

@ -3,10 +3,13 @@
package com.microsoft.bot.builder;
import com.microsoft.bot.builder.adapters.TestAdapter;
import com.microsoft.bot.builder.adapters.TestFlow;
import org.junit.Assert;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class StorageBaseTests {
protected void readUnknownTest(Storage storage) {
@ -241,6 +244,32 @@ public class StorageBaseTests {
storage.delete(new String[] { "unknown_key" }).join();
}
protected void statePersistsThroughMultiTurn(Storage storage) {
UserState userState = new UserState(storage);
StatePropertyAccessor<TestPocoState> testProperty = userState.createProperty("test");
TestAdapter adapter = new TestAdapter()
.use(new AutoSaveStateMiddleware(userState));
new TestFlow(adapter, context -> {
TestPocoState state = testProperty.get(context, TestPocoState::new).join();
Assert.assertNotNull(state);
switch (context.getActivity().getText()) {
case "set value":
state.setValue("test");
context.sendActivity("value saved").join();
break;
case "get value":
context.sendActivity(state.getValue()).join();
break;
}
return CompletableFuture.completedFuture(null);
})
.test("set value", "value saved")
.test("get value", "test")
.startTest().join();
}
private static class PocoItem {
public PocoItem() {

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.builder;
public class TestPocoState {
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
private String value;
}

Просмотреть файл

@ -56,6 +56,7 @@ public class TranscriptMiddlewareTest {
final String[] conversationId = { null };
new TestFlow(adapter, (context) -> {
delay(500);
conversationId[0] = context.getActivity().getConversation().getId();
Activity typingActivity = new Activity(ActivityTypes.TYPING) {
{
@ -65,12 +66,7 @@ public class TranscriptMiddlewareTest {
context.sendActivity(typingActivity).join();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
Assert.fail();
}
delay(500);
context.sendActivity("echo:" + context.getActivity().getText()).join();
return CompletableFuture.completedFuture(null);
@ -109,6 +105,7 @@ public class TranscriptMiddlewareTest {
final String[] conversationId = { null };
final Activity[] activityToUpdate = { null };
new TestFlow(adapter, (context) -> {
delay(500);
conversationId[0] = context.getActivity().getConversation().getId();
if (context.getActivity().getText().equals("update")) {
activityToUpdate[0].setText("new response");
@ -152,6 +149,7 @@ public class TranscriptMiddlewareTest {
final String[] conversationId = { null };
final String[] activityId = { null };
new TestFlow(adapter, (context) -> {
delay(500);
conversationId[0] = context.getActivity().getConversation().getId();
if (context.getActivity().getText().equals("deleteIt")) {
context.deleteActivity(activityId[0]).join();
@ -210,6 +208,7 @@ public class TranscriptMiddlewareTest {
final String[] conversationId = { null };
final Activity[] activityToUpdate = { null };
new TestFlow(adapter, (context) -> {
delay(500);
conversationId[0] = context.getActivity().getConversation().getId();
if (context.getActivity().getText().equals("update")) {
activityToUpdate[0].setText("new response");
@ -278,6 +277,7 @@ public class TranscriptMiddlewareTest {
final String[] conversationId = { null };
new TestFlow(adapter, (context) -> {
delay(500);
// The next assert implicitly tests the immutability of the incoming
// message. As demonstrated by the asserts after this TestFlow block
// the role attribute is present on the activity as it is passed to
@ -300,4 +300,16 @@ public class TranscriptMiddlewareTest {
System.out.printf("Complete");
}
/**
* Time period delay.
* @param milliseconds Time to delay.
*/
private void delay(int milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Просмотреть файл

@ -19,14 +19,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.bot.schema.teams.TeamsMeetingInfo;
import org.apache.commons.lang3.StringUtils;
import java.time.Clock;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
@ -212,7 +213,8 @@ public class Activity {
* normally required.
*/
protected Activity() {
setTimestamp(OffsetDateTime.now(ZoneId.of("UTC")));
final Clock clock = new NanoClockHelper();
setTimestamp(OffsetDateTime.now(clock));
}
/**
@ -407,9 +409,26 @@ public class Activity {
clone.setProperties(entry.getKey(), entry.getValue());
}
clone = ensureActivityHasId(clone);
return clone;
}
private static Activity ensureActivityHasId(Activity activity) {
Activity activityWithId = activity;
if (activity == null) {
throw new IllegalArgumentException("Cannot check or add Id on a null Activity.");
}
if (activity.getId() == null) {
String generatedId = String.format("g_%s", UUID.randomUUID().toString());
activity.setId(generatedId);
}
return activityWithId;
}
/**
* Gets the {@link ActivityTypes} of the activity.
*

Просмотреть файл

@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.schema;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
/**
* A customized nanoseconds clock providing access to the current instant, date and time using a time-zone.
*/
public class NanoClockHelper extends Clock {
private final Clock clock;
private final long initialNanos;
private final Instant initialInstant;
/**
* Obtains a clock that returns the current instant using the best available
* system clock with nanoseconds.
*/
public NanoClockHelper() {
this(Clock.systemUTC());
}
/**
* Obtains a clock that returns the current instant using the best available
* system clock with nanoseconds.
* @param clock A {@link Clock}
*/
public NanoClockHelper(final Clock clock) {
this.clock = clock;
initialInstant = clock.instant();
initialNanos = getSystemNanos();
}
/**
* {@inheritDoc}
*/
@Override
public ZoneId getZone() {
return clock.getZone();
}
/**
* {@inheritDoc}
*/
@Override
public Instant instant() {
return initialInstant.plusNanos(getSystemNanos() - initialNanos);
}
/**
* {@inheritDoc}
*/
@Override
public Clock withZone(final ZoneId zone) {
return new NanoClockHelper(clock.withZone(zone));
}
private long getSystemNanos() {
return System.nanoTime();
}
}