[SDK][Bot-Azure] Add CosmosDbPartitionedStorage component (#1024)

* CosmosDBPartitionedStorage

* Corrected Cosmos DB Emulator status check

* Update CosmosDbPartitionedStorage classes

* Update CosmosDbPartitionStorage tests

* Normalize test names

* Clean imports

* Remove unnecesary dlls

Co-authored-by: tracyboehrer <tracyboehrer@users.noreply.github.com>
Co-authored-by: Martin Battaglino <martinbatta32@gmail.com>
This commit is contained in:
Victor Grycuk 2021-02-26 18:39:53 -03:00 коммит произвёл GitHub
Родитель 6e249103d3
Коммит 5066b5747b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 1343 добавлений и 0 удалений

Просмотреть файл

@ -53,10 +53,33 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-documentdb</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.bot</groupId>
<artifactId>bot-builder</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.bot</groupId>
<artifactId>bot-integration-core</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.bot</groupId>
<artifactId>bot-dialogs</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.bot</groupId>
<artifactId>bot-builder</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

Просмотреть файл

@ -0,0 +1,142 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Helper class to escape CosmosDB keys.
*/
public final class CosmosDbKeyEscape {
private CosmosDbKeyEscape() {
// not called
}
private static final Integer ESCAPE_LENGTH = 3;
/**
* Older libraries had a max key length of 255. The limit is now 1023. In this
* library, 255 remains the default for backwards compat. To override this
* behavior, and use the longer limit set
* CosmosDbPartitionedStorageOptions.CompatibilityMode to false.
* https://docs.microsoft.com/en-us/azure/cosmos-db/concepts-limits#per-item-limits.
*/
public static final Integer MAX_KEY_LENGTH = 255;
/**
* The list of illegal characters for Cosmos DB Keys comes from this list on the
* CosmostDB docs:
* https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.resource.id?view=azure-dotnet#remarks
*
* Note: We are also escaping the "*" character, as that what we're using as our
* escape character.
*
* Note: The Java version escapes more than .NET since otherwise it errors out.
* The additional characters are quote, single quote, semi-colon.
*/
private static final char[] ILLEGAL_KEYS = new char[] {'\\', '?', '/', '#', '*', ';', '\"', '\''};
/**
* We are escaping illegal characters using a "*{AsciiCodeInHex}" pattern. This
* means a key of "?test?" would be escaped as "*3ftest*3f".
*/
private static final Map<Character, String> ILLEGAL_KEY_CHARACTER_REPLACEMENT_MAP = Arrays
.stream(ArrayUtils.toObject(ILLEGAL_KEYS))
.collect(Collectors.toMap(c -> c, c -> "*" + String.format("%02x", (int) c)));
/**
* Converts the key into a DocumentID that can be used safely with Cosmos DB.
*
* @param key The key to escape.
* @return An escaped key that can be used safely with CosmosDB.
*
* @see #ILLEGAL_KEYS
*/
public static String escapeKey(String key) {
return escapeKey(key, new String(), true);
}
/**
* Converts the key into a DocumentID that can be used safely with Cosmos DB.
*
* @param key The key to escape.
* @param suffix The string to add at the end of all row keys.
* @param compatibilityMode True if running in compatability mode and keys
* should be truncated in order to support previous
* CosmosDb max key length of 255. This behavior can be
* overridden by setting
* {@link CosmosDbPartitionedStorage.compatibilityMode}
* to false. *
* @return An escaped key that can be used safely with CosmosDB.
*/
public static String escapeKey(String key, String suffix, Boolean compatibilityMode) {
if (StringUtils.isBlank(key)) {
throw new IllegalArgumentException("key");
}
suffix = suffix == null ? new String() : suffix;
Integer firstIllegalCharIndex = StringUtils.indexOfAny(key, new String(ILLEGAL_KEYS));
// If there are no illegal characters, and the key is within length costraints,
// return immediately and avoid any further processing/allocations
if (firstIllegalCharIndex == -1) {
return truncateKeyIfNeeded(key.concat(suffix), compatibilityMode);
}
// Allocate a builder that assumes that all remaining characters might be
// replaced
// to avoid any extra allocations
StringBuilder sanitizedKeyBuilder = new StringBuilder(
key.length() + ((key.length() - firstIllegalCharIndex) * ESCAPE_LENGTH));
// Add all good characters up to the first bad character to the builder first
for (Integer index = 0; index < firstIllegalCharIndex; index++) {
sanitizedKeyBuilder.append(key.charAt(index));
}
Map<Character, String> illegalCharacterReplacementMap = ILLEGAL_KEY_CHARACTER_REPLACEMENT_MAP;
// Now walk the remaining characters, starting at the first known bad character,
// replacing any bad ones with
// their designated replacement value from the
for (Integer index = firstIllegalCharIndex; index < key.length(); index++) {
Character ch = key.charAt(index);
// Check if this next character is considered illegal and, if so, append its
// replacement;
// otherwise just append the good character as is
if (illegalCharacterReplacementMap.containsKey(ch)) {
sanitizedKeyBuilder.append(illegalCharacterReplacementMap.get(ch));
} else {
sanitizedKeyBuilder.append(ch);
}
}
if (StringUtils.isNotBlank(key)) {
sanitizedKeyBuilder.append(suffix);
}
return truncateKeyIfNeeded(sanitizedKeyBuilder.toString(), compatibilityMode);
}
private static String truncateKeyIfNeeded(String key, Boolean truncateKeysForCompatibility) {
if (!truncateKeysForCompatibility) {
return key;
}
if (key.length() > MAX_KEY_LENGTH) {
String hash = String.format("%x", key.hashCode());
key = key.substring(0, MAX_KEY_LENGTH - hash.length()) + hash;
}
return key;
}
}

Просмотреть файл

@ -0,0 +1,487 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import com.codepoetics.protonpack.collectors.CompletableFutures;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.azure.documentdb.AccessCondition;
import com.microsoft.azure.documentdb.AccessConditionType;
import com.microsoft.azure.documentdb.Database;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.PartitionKey;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StoreItem;
import com.microsoft.bot.connector.ExecutorFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
/**
* Implements an CosmosDB based storage provider using partitioning for a bot.
*/
public class CosmosDbPartitionedStorage implements Storage {
private Logger logger = LoggerFactory.getLogger(CosmosDbPartitionedStorage.class);
private CosmosDbPartitionedStorageOptions cosmosDbStorageOptions;
private ObjectMapper objectMapper;
private final Object cacheSync = new Object();
private DocumentClient client;
private Database databaseCache;
private DocumentCollection collectionCache;
/**
* Initializes a new instance of the CosmosDbPartitionedStorage class. using the
* provided CosmosDB credentials, database ID, and container ID.
*
* @param withCosmosDbStorageOptions Cosmos DB partitioned storage configuration
* options.
*/
public CosmosDbPartitionedStorage(CosmosDbPartitionedStorageOptions withCosmosDbStorageOptions) {
if (withCosmosDbStorageOptions == null) {
throw new IllegalArgumentException("CosmosDbPartitionStorageOptions is required.");
}
if (withCosmosDbStorageOptions.getCosmosDbEndpoint() == null) {
throw new IllegalArgumentException("Service EndPoint for CosmosDB is required: cosmosDbEndpoint");
}
if (StringUtils.isBlank(withCosmosDbStorageOptions.getAuthKey())) {
throw new IllegalArgumentException("AuthKey for CosmosDB is required: authKey");
}
if (StringUtils.isBlank(withCosmosDbStorageOptions.getDatabaseId())) {
throw new IllegalArgumentException("DatabaseId is required: databaseId");
}
if (StringUtils.isBlank(withCosmosDbStorageOptions.getContainerId())) {
throw new IllegalArgumentException("ContainerId is required: containerId");
}
Boolean compatibilityMode = withCosmosDbStorageOptions.getCompatibilityMode();
if (compatibilityMode == null) {
withCosmosDbStorageOptions.setCompatibilityMode(true);
}
if (StringUtils.isNotBlank(withCosmosDbStorageOptions.getKeySuffix())) {
if (withCosmosDbStorageOptions.getCompatibilityMode()) {
throw new IllegalArgumentException(
"CompatibilityMode cannot be 'true' while using a KeySuffix: withCosmosDbStorageOptions");
}
// In order to reduce key complexity, we do not allow invalid characters in a
// KeySuffix
// If the KeySuffix has invalid characters, the EscapeKey will not match
String suffixEscaped = CosmosDbKeyEscape.escapeKey(withCosmosDbStorageOptions.getKeySuffix());
if (!withCosmosDbStorageOptions.getKeySuffix().equals(suffixEscaped)) {
throw new IllegalArgumentException(String.format("Cannot use invalid Row Key characters: %s %s",
withCosmosDbStorageOptions.getKeySuffix(), "withCosmosDbStorageOptions"));
}
}
cosmosDbStorageOptions = withCosmosDbStorageOptions;
objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.findAndRegisterModules().enableDefaultTyping();
client = new DocumentClient(cosmosDbStorageOptions.getCosmosDbEndpoint(), cosmosDbStorageOptions.getAuthKey(),
cosmosDbStorageOptions.getConnectionPolicy(), cosmosDbStorageOptions.getConsistencyLevel());
}
/**
* Reads storage items from storage.
*
* @param keys A collection of Ids for each item to be retrieved.
* @return A dictionary containing the retrieved items.
*/
@Override
public CompletableFuture<Map<String, Object>> read(String[] keys) {
if (keys == null) {
throw new IllegalArgumentException("keys");
}
if (keys.length == 0) {
// No keys passed in, no result to return.
return CompletableFuture.completedFuture(new HashMap<>());
}
return getCollection().thenApplyAsync(collection -> {
// Issue all of the reads at once
List<CompletableFuture<Document>> documentFutures = new ArrayList<>();
for (String key : keys) {
documentFutures.add(getDocumentById(CosmosDbKeyEscape.escapeKey(key,
cosmosDbStorageOptions.getKeySuffix(), cosmosDbStorageOptions.getCompatibilityMode())));
}
// Map each returned Document to it's original value.
Map<String, Object> storeItems = new HashMap<>();
documentFutures.forEach(documentFuture -> {
Document document = documentFuture.join();
if (document != null) {
try {
// We store everything in a DocumentStoreItem. Get that.
JsonNode stateNode = objectMapper.readTree(document.toJson());
DocumentStoreItem storeItem = objectMapper.treeToValue(stateNode, DocumentStoreItem.class);
// DocumentStoreItem contains the original object.
JsonNode dataNode = objectMapper.readTree(storeItem.getDocument());
Object item = objectMapper.treeToValue(dataNode, Class.forName(storeItem.getType()));
if (item instanceof StoreItem) {
((StoreItem) item).setETag(storeItem.getETag());
}
storeItems.put(storeItem.getReadId(), item);
} catch (IOException | ClassNotFoundException e) {
logger.warn("Error reading from container", e);
}
}
});
return storeItems;
}, ExecutorFactory.getExecutor());
}
/**
* Inserts or updates one or more items into the Cosmos DB container.
*
* @param changes A dictionary of items to be inserted or updated. The
* dictionary item key is used as the ID for the inserted /
* updated item.
* @return A task that represents the work queued to execute.
*/
@Override
public CompletableFuture<Void> write(Map<String, Object> changes) {
if (changes == null) {
throw new IllegalArgumentException("changes");
}
if (changes.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return getCollection().thenApplyAsync(collection -> {
for (Map.Entry<String, Object> change : changes.entrySet()) {
try {
ObjectNode node = objectMapper.valueToTree(change.getValue());
// Remove etag from JSON object that was copied from StoreItem.
// The ETag information is updated as an _etag attribute in the document
// metadata.
node.remove("eTag");
DocumentStoreItem documentChange = new DocumentStoreItem() {
{
setId(CosmosDbKeyEscape.escapeKey(change.getKey(), cosmosDbStorageOptions.getKeySuffix(),
cosmosDbStorageOptions.getCompatibilityMode()));
setReadId(change.getKey());
setDocument(node.toString());
setType(change.getValue().getClass().getTypeName());
}
};
Document document = new Document(objectMapper.writeValueAsString(documentChange));
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(documentChange.partitionKey()));
if (change.getValue() instanceof StoreItem) {
String etag = ((StoreItem) change.getValue()).getETag();
if (!StringUtils.isEmpty(etag)) {
// if we have an etag, do opt. concurrency replace
AccessCondition condition = new AccessCondition();
condition.setType(AccessConditionType.IfMatch);
condition.setCondition(etag);
options.setAccessCondition(condition);
} else if (etag != null) {
logger.warn("write change, empty eTag: " + change.getKey());
continue;
}
}
client.upsertDocument(collection.getSelfLink(), document, options, true);
} catch (JsonProcessingException | DocumentClientException e) {
logger.warn("Error upserting document: " + change.getKey(), e);
}
}
return null;
});
}
/**
* Deletes one or more items from the Cosmos DB container.
*
* @param keys An array of Ids for the items to be deleted.
* @return A task that represents the work queued to execute.
*/
@Override
public CompletableFuture<Void> delete(String[] keys) {
if (keys == null) {
throw new IllegalArgumentException("keys");
}
// issue the deletes in parallel
return getCollection().thenCompose(collection -> Arrays.stream(keys).map(key -> {
String escapedKey = CosmosDbKeyEscape.escapeKey(key, cosmosDbStorageOptions.getKeySuffix(),
cosmosDbStorageOptions.getCompatibilityMode());
return getDocumentById(escapedKey).thenApplyAsync(document -> {
if (document != null) {
try {
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(escapedKey));
client.deleteDocument(document.getSelfLink(), options);
} catch (DocumentClientException e) {
logger.warn("Unable to delete document", e);
throw new CompletionException(e);
}
}
return null;
}, ExecutorFactory.getExecutor());
}).collect(CompletableFutures.toFutureList()).thenApply(deleteResponses -> null));
}
private Database getDatabase() {
if (databaseCache == null) {
// Get the database if it exists
List<Database> databaseList = client
.queryDatabases("SELECT * FROM root r WHERE r.id='" + cosmosDbStorageOptions.getDatabaseId() + "'",
null)
.getQueryIterable().toList();
if (databaseList.size() > 0) {
// Cache the database object so we won't have to query for it
// later to retrieve the selfLink.
databaseCache = databaseList.get(0);
} else {
// Create the database if it doesn't exist.
try {
Database databaseDefinition = new Database();
databaseDefinition.setId(cosmosDbStorageOptions.getDatabaseId());
databaseCache = client.createDatabase(databaseDefinition, null).getResource();
} catch (DocumentClientException e) {
// able to query or create the collection.
// Verify your connection, endpoint, and key.
logger.error("getDatabase", e);
throw new RuntimeException(e);
}
}
}
return databaseCache;
}
private CompletableFuture<DocumentCollection> getCollection() {
if (collectionCache != null) {
return CompletableFuture.completedFuture(collectionCache);
}
synchronized (cacheSync) {
if (collectionCache != null) {
return CompletableFuture.completedFuture(collectionCache);
}
return CompletableFuture.supplyAsync(() -> {
// Get the collection if it exists.
List<DocumentCollection> collectionList = client.queryCollections(getDatabase().getSelfLink(),
"SELECT * FROM root r WHERE r.id='" + cosmosDbStorageOptions.getContainerId() + "'", null)
.getQueryIterable().toList();
if (collectionList.size() > 0) {
// Cache the collection object so we won't have to query for it
// later to retrieve the selfLink.
collectionCache = collectionList.get(0);
} else {
// Create the collection if it doesn't exist.
try {
DocumentCollection collectionDefinition = new DocumentCollection();
collectionDefinition.setId(cosmosDbStorageOptions.getContainerId());
PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
partitionKeyDefinition.setPaths(Collections.singleton(DocumentStoreItem.PARTITION_KEY_PATH));
collectionDefinition.setPartitionKey(partitionKeyDefinition);
RequestOptions options = new RequestOptions() {
{
setOfferThroughput(cosmosDbStorageOptions.getContainerThroughput());
}
};
collectionCache = client
.createCollection(getDatabase().getSelfLink(), collectionDefinition, options)
.getResource();
} catch (DocumentClientException e) {
// able to query or create the collection.
// Verify your connection, endpoint, and key.
logger.error("getCollection", e);
throw new RuntimeException("getCollection", e);
}
}
return collectionCache;
}, ExecutorFactory.getExecutor());
}
}
private CompletableFuture<Document> getDocumentById(String id) {
return getCollection().thenApplyAsync(collection -> {
// Retrieve the document using the DocumentClient.
List<Document> documentList = client
.queryDocuments(collection.getSelfLink(), "SELECT * FROM root r WHERE r.id='" + id + "'", null)
.getQueryIterable().toList();
if (documentList.size() > 0) {
return documentList.get(0);
} else {
return null;
}
}, ExecutorFactory.getExecutor());
}
/**
* Internal data structure for storing items in a CosmosDB Collection.
*/
private static class DocumentStoreItem implements StoreItem {
// PartitionKey path to be used for this document type
public static final String PARTITION_KEY_PATH = "/id";
@JsonProperty(value = "id")
private String id;
@JsonProperty(value = "realId")
private String readId;
@JsonProperty(value = "document")
private String document;
@JsonProperty(value = "_etag")
private String eTag;
@JsonProperty(value = "type")
private String type;
/**
* Gets the sanitized Id/Key used as PrimaryKey.
*
* @return The ID.
*/
public String getId() {
return id;
}
/**
* Sets the sanitized Id/Key used as PrimaryKey.
*
* @param withId The ID.
*/
public void setId(String withId) {
id = withId;
}
/**
* Gets the un-sanitized Id/Key.
*
* @return The ID.
*/
public String getReadId() {
return readId;
}
/**
* Sets the un-sanitized Id/Key.
*
* @param withReadId The ID.
*/
public void setReadId(String withReadId) {
readId = withReadId;
}
/**
* Gets the persisted object.
*
* @return The item data.
*/
public String getDocument() {
return document;
}
/**
* Sets the persisted object.
*
* @param withDocument The item data.
*/
public void setDocument(String withDocument) {
document = withDocument;
}
/**
* Get ETag information for handling optimistic concurrency updates.
*
* @return The eTag value.
*/
@Override
public String getETag() {
return eTag;
}
/**
* Set ETag information for handling optimistic concurrency updates.
*
* @param withETag The eTag value.
*/
@Override
public void setETag(String withETag) {
eTag = withETag;
}
/**
* The type of the document data.
*
* @return The class name of the data being stored.
*/
public String getType() {
return type;
}
/**
* The fully qualified class name of the data being stored.
*
* @param withType The class name of the data.
*/
public void setType(String withType) {
type = withType;
}
/**
* The value used for the PartitionKey.
*
* @return In this case, the id field.
*/
public String partitionKey() {
return id;
}
}
}

Просмотреть файл

@ -0,0 +1,255 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.bot.integration.Configuration;
/**
* Cosmos DB Partitioned Storage Options.
*/
public class CosmosDbPartitionedStorageOptions {
private static final Integer DEFAULT_THROUGHPUT = 400;
private static final ConsistencyLevel DEFAULT_CONSISTENCY = ConsistencyLevel.Session;
private String cosmosDbEndpoint;
private String authKey;
private String databaseId;
private String containerId;
private String keySuffix;
private Integer containerThroughput;
private ConnectionPolicy connectionPolicy;
private ConsistencyLevel consistencyLevel;
private Boolean compatibilityMode;
/**
* Constructs an empty options object.
*/
public CosmosDbPartitionedStorageOptions() {
connectionPolicy = ConnectionPolicy.GetDefault();
consistencyLevel = DEFAULT_CONSISTENCY;
containerThroughput = DEFAULT_THROUGHPUT;
}
/**
* Construct with properties from Configuration.
*
* @param configuration The Configuration object to read properties from.
*/
public CosmosDbPartitionedStorageOptions(Configuration configuration) {
cosmosDbEndpoint = configuration.getProperty("cosmosdb.dbEndpoint");
authKey = configuration.getProperty("cosmosdb.authKey");
databaseId = configuration.getProperty("cosmosdb.databaseId");
containerId = configuration.getProperty("cosmosdb.containerId");
// will likely need to expand this to read policy settings from Configuration.
connectionPolicy = ConnectionPolicy.GetDefault();
// will likely need to read consistency level from config.
consistencyLevel = DEFAULT_CONSISTENCY;
try {
containerThroughput = Integer.parseInt(configuration.getProperty("cosmosdb.throughput"));
} catch (NumberFormatException e) {
containerThroughput = DEFAULT_THROUGHPUT;
}
}
/**
* Gets the CosmosDB endpoint.
*
* @return The DB endpoint.
*/
public String getCosmosDbEndpoint() {
return cosmosDbEndpoint;
}
/**
* Sets the CosmosDB endpoint.
*
* @param withCosmosDbEndpoint The DB endpoint to use.
*/
public void setCosmosDbEndpoint(String withCosmosDbEndpoint) {
cosmosDbEndpoint = withCosmosDbEndpoint;
}
/**
* Gets the authentication key for Cosmos DB.
*
* @return The auth key for the DB.
*/
public String getAuthKey() {
return authKey;
}
/**
* Sets the authentication key for Cosmos DB.
*
* @param withAuthKey The auth key to use.
*/
public void setAuthKey(String withAuthKey) {
authKey = withAuthKey;
}
/**
* Gets the database identifier for Cosmos DB instance.
*
* @return The CosmosDB DB id.
*/
public String getDatabaseId() {
return databaseId;
}
/**
* Sets the database identifier for Cosmos DB instance.
*
* @param withDatabaseId The CosmosDB id.
*/
public void setDatabaseId(String withDatabaseId) {
databaseId = withDatabaseId;
}
/**
* Gets the container identifier.
*
* @return The container/collection ID.
*/
public String getContainerId() {
return containerId;
}
/**
* Sets the container identifier.
*
* @param withContainerId The container/collection ID.
*/
public void setContainerId(String withContainerId) {
containerId = withContainerId;
}
/**
* Gets the ConnectionPolicy for the CosmosDB.
*
* @return The ConnectionPolicy settings.
*/
public ConnectionPolicy getConnectionPolicy() {
return connectionPolicy;
}
/**
* Sets the ConnectionPolicy for the CosmosDB.
*
* @param withConnectionPolicy The ConnectionPolicy settings.
*/
public void setConnectionPolicy(ConnectionPolicy withConnectionPolicy) {
connectionPolicy = withConnectionPolicy;
}
/**
* Represents the consistency levels supported for Azure Cosmos DB client
* operations in the Azure Cosmos DB database service.
*
* The requested ConsistencyLevel must match or be weaker than that provisioned
* for the database account. Consistency levels by order of strength are Strong,
* BoundedStaleness, Session and Eventual.
*
* @return The ConsistencyLevel
*/
public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
}
/**
* Represents the consistency levels supported for Azure Cosmos DB client
* operations in the Azure Cosmos DB database service.
*
* The requested ConsistencyLevel must match or be weaker than that provisioned
* for the database account. Consistency levels by order of strength are Strong,
* BoundedStaleness, Session and Eventual.
*
* @param withConsistencyLevel The ConsistencyLevel to use.
*/
public void setConsistencyLevel(ConsistencyLevel withConsistencyLevel) {
consistencyLevel = withConsistencyLevel;
}
/**
* Gets the throughput set when creating the Container. Defaults to 400.
*
* @return The container throughput.
*/
public Integer getContainerThroughput() {
return containerThroughput;
}
/**
* Sets the throughput set when creating the Container. Defaults to 400.
*
* @param withContainerThroughput The desired thoughput.
*/
public void setContainerThroughput(Integer withContainerThroughput) {
containerThroughput = withContainerThroughput;
}
/**
* Gets a value indicating whether or not to run in Compatibility Mode. Early
* versions of CosmosDb had a key length limit of 255. Keys longer than this
* were truncated in CosmosDbKeyEscape. This remains the default behavior, but
* can be overridden by setting CompatibilityMode to false. This setting will
* also allow for using older collections where no PartitionKey was specified.
*
* Note: CompatibilityMode cannot be 'true' if KeySuffix is used.
* @return The compatibilityMode
*/
public Boolean getCompatibilityMode() {
return compatibilityMode;
}
/**
* Sets a value indicating whether or not to run in Compatibility Mode. Early
* versions of CosmosDb had a key length limit of 255. Keys longer than this
* were truncated in CosmosDbKeyEscape. This remains the default behavior, but
* can be overridden by setting CompatibilityMode to false. This setting will
* also allow for using older collections where no PartitionKey was specified.
*
* Note: CompatibilityMode cannot be 'true' if KeySuffix is used.
*
* @param withCompatibilityMode Currently, max key length for cosmosdb is 1023:
* https://docs.microsoft.com/en-us/azure/cosmos-db/concepts-limits#per-item-limits
* The default for backwards compatibility is 255,
* CosmosDbKeyEscape.MaxKeyLength.
*/
public void setCompatibilityMode(Boolean withCompatibilityMode) {
this.compatibilityMode = withCompatibilityMode;
}
/**
* Gets the suffix to be added to every key. See
* CosmosDbKeyEscape.EscapeKey(string). Note:CompatibilityMode must be set to
* 'false' to use a KeySuffix. When KeySuffix is used, keys will NOT be
* truncated but an exception will be thrown if the key length is longer than
* allowed by CosmosDb.
*
* @return String containing only valid CosmosDb key characters. (e.g. not:
* '\\', '?', '/', '#', '*').
*/
public String getKeySuffix() {
return keySuffix;
}
/**
* Sets the suffix to be added to every key. See
* CosmosDbKeyEscape.EscapeKey(string). Note:CompatibilityMode must be set to
* 'false' to use a KeySuffix. When KeySuffix is used, keys will NOT be
* truncated but an exception will be thrown if the key length is longer than
* allowed by CosmosDb.
*
* @param withKeySuffix String containing only valid CosmosDb key characters.
* (e.g. not: '\\', '?', '/', '#', '*').
*/
public void setKeySuffix(String withKeySuffix) {
this.keySuffix = withKeySuffix;
}
}

Просмотреть файл

@ -0,0 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for
// license information.
/**
* This package contains the classes for bot-integration-core.
*/
package com.microsoft.bot.azure;

Просмотреть файл

@ -0,0 +1,156 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import org.junit.Assert;
import org.junit.Test;
public class CosmosDBKeyEscapeTests {
@Test(expected = IllegalArgumentException.class)
public void sanitizeKeyShouldFailWithNullKey() {
// Null key should throw
CosmosDbKeyEscape.escapeKey(null);
}
@Test(expected = IllegalArgumentException.class)
public void sanitizeKeyShouldFailWithEmptyKey() {
// Empty string should throw
CosmosDbKeyEscape.escapeKey(new String());
}
@Test(expected = IllegalArgumentException.class)
public void sanitizeKeyShouldFailWithWhitespaceKey() {
// Whitespace key should throw
CosmosDbKeyEscape.escapeKey(" ");
}
@Test
public void sanitizeKeyShouldNotChangeAValidKey() {
String validKey = "Abc12345";
String sanitizedKey = CosmosDbKeyEscape.escapeKey(validKey);
Assert.assertEquals(validKey, sanitizedKey);
}
@Test
public void longKeyShouldBeTruncated() {
StringBuilder tooLongKey = new StringBuilder();
for (int i = 0; i < CosmosDbKeyEscape.MAX_KEY_LENGTH + 1; i++) {
tooLongKey.append("a");
}
String sanitizedKey = CosmosDbKeyEscape.escapeKey(tooLongKey.toString());
Assert.assertTrue(sanitizedKey.length() <= CosmosDbKeyEscape.MAX_KEY_LENGTH);
// The resulting key should be:
String hash = String.format("%x", tooLongKey.toString().hashCode());
String correctKey = sanitizedKey.substring(0, CosmosDbKeyEscape.MAX_KEY_LENGTH - hash.length()) + hash;
Assert.assertEquals(correctKey, sanitizedKey);
}
@Test
public void longKeyWithIllegalCharactersShouldBeTruncated() {
StringBuilder tooLongKey = new StringBuilder();
for (int i = 0; i < CosmosDbKeyEscape.MAX_KEY_LENGTH + 1; i++) {
tooLongKey.append("a");
}
String tooLongKeyWithIllegalCharacters = "?test?" + tooLongKey.toString();
String sanitizedKey = CosmosDbKeyEscape.escapeKey(tooLongKeyWithIllegalCharacters);
// Verify the key ws truncated
Assert.assertTrue(sanitizedKey.length() <= CosmosDbKeyEscape.MAX_KEY_LENGTH);
// Make sure the escaping still happened
Assert.assertTrue(sanitizedKey.startsWith("*3ftest*3f"));
}
@Test
public void sanitizeKeyShouldEscapeIllegalCharacter()
{
// Ascii code of "?" is "3f".
String sanitizedKey = CosmosDbKeyEscape.escapeKey("?test?");
Assert.assertEquals("*3ftest*3f", sanitizedKey);
// Ascii code of "/" is "2f".
String sanitizedKey2 = CosmosDbKeyEscape.escapeKey("/test/");
Assert.assertEquals("*2ftest*2f", sanitizedKey2);
// Ascii code of "\" is "5c".
String sanitizedKey3 = CosmosDbKeyEscape.escapeKey("\\test\\");
Assert.assertEquals("*5ctest*5c", sanitizedKey3);
// Ascii code of "#" is "23".
String sanitizedKey4 = CosmosDbKeyEscape.escapeKey("#test#");
Assert.assertEquals("*23test*23", sanitizedKey4);
// Ascii code of "*" is "2a".
String sanitizedKey5 = CosmosDbKeyEscape.escapeKey("*test*");
Assert.assertEquals("*2atest*2a", sanitizedKey5);
// Check a compound key
String compoundSanitizedKey = CosmosDbKeyEscape.escapeKey("?#/");
Assert.assertEquals("*3f*23*2f", compoundSanitizedKey);
}
@Test
public void collisionsShouldNotHappen()
{
String validKey = "*2atest*2a";
String validKey2 = "*test*";
// If we failed to esacpe the "*", then validKey2 would
// escape to the same value as validKey. To prevent this
// we makes sure to escape the *.
// Ascii code of "*" is "2a".
String escaped1 = CosmosDbKeyEscape.escapeKey(validKey);
String escaped2 = CosmosDbKeyEscape.escapeKey(validKey2);
Assert.assertNotEquals(escaped1, escaped2);
}
@Test
public void longKeyShouldNotBeTruncatedWithFalseCompatibilityMode() {
StringBuilder tooLongKey = new StringBuilder();
for (int i = 0; i < CosmosDbKeyEscape.MAX_KEY_LENGTH + 1; i++) {
tooLongKey.append("a");
}
String sanitizedKey = CosmosDbKeyEscape.escapeKey(tooLongKey.toString(), new String(), false);
Assert.assertEquals(CosmosDbKeyEscape.MAX_KEY_LENGTH + 1, sanitizedKey.length());
// The resulting key should be identical
Assert.assertEquals(tooLongKey.toString(), sanitizedKey);
}
@Test
public void longKeyWithIllegalCharactersShouldNotBeTruncatedWithFalseCompatibilityMode()
{
StringBuilder tooLongKey = new StringBuilder();
for (int i = 0; i < CosmosDbKeyEscape.MAX_KEY_LENGTH + 1; i++) {
tooLongKey.append("a");
}
String longKeyWithIllegalCharacters = "?test?" + tooLongKey.toString();
String sanitizedKey = CosmosDbKeyEscape.escapeKey(longKeyWithIllegalCharacters, new String(), false);
// Verify the key was NOT truncated
Assert.assertEquals(longKeyWithIllegalCharacters.length() + 4, sanitizedKey.length());
// Make sure the escaping still happened
Assert.assertTrue(sanitizedKey.startsWith("*3ftest*3f"));
}
@Test
public void keySuffixIsAddedToEndOfKey()
{
String suffix = "test suffix";
String key = "this is a test";
String sanitizedKey = CosmosDbKeyEscape.escapeKey(key, suffix, false);
// Verify the suffix was added to the end of the key
Assert.assertEquals(sanitizedKey, key + suffix);
}
}

Просмотреть файл

@ -0,0 +1,272 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.azure;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.Database;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StorageBaseTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* The CosmosDB tests require the CosmosDB Emulator to be installed and running.
*
* More info at: https://aka.ms/documentdb-emulator-docs
*
* Also... Java requires the CosmosDB Emulator cert to be installed. See "Export the SSL certificate" in
* the link above to export the cert. Then import the cert into the Java JDK using:
*
* https://docs.microsoft.com/en-us/azure/java/java-sdk-add-certificate-ca-store?view=azure-java-stable
*
* Note: Don't ignore the first step of "At an administrator command prompt, navigate to your JDK's jdk\jre\lib\security folder"
*/
public class CosmosDbPartitionStorageTests extends StorageBaseTests {
private static boolean emulatorIsRunning = false;
private static final String NO_EMULATOR_MESSAGE = "This test requires CosmosDB Emulator! go to https://aka.ms/documentdb-emulator-docs to download and install.";
private static String CosmosServiceEndpoint = "https://localhost:8081";
private static String CosmosAuthKey = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
private static String CosmosDatabaseName = "test-db";
private static String CosmosCollectionName = "bot-storage";
private Storage storage;
@BeforeClass
public static void allTestsInit() throws IOException, InterruptedException, DocumentClientException {
Process p = Runtime.getRuntime().exec
("cmd /C \"" + System.getenv("ProgramFiles") + "\\Azure Cosmos DB Emulator\\CosmosDB.Emulator.exe\" /GetStatus");
int result = p.waitFor();
if (result == 2) {
emulatorIsRunning = true;
DocumentClient client = new DocumentClient(
CosmosServiceEndpoint,
CosmosAuthKey,
ConnectionPolicy.GetDefault(),
ConsistencyLevel.Session);
createDatabaseIfNeeded(client);
}
}
@AfterClass
public static void allTestCleanup() throws DocumentClientException {
if (emulatorIsRunning) {
DocumentClient client = new DocumentClient(
CosmosServiceEndpoint,
CosmosAuthKey,
ConnectionPolicy.GetDefault(),
ConsistencyLevel.Session);
List<Database> databaseList = client
.queryDatabases(
"SELECT * FROM root r WHERE r.id='" + CosmosDatabaseName
+ "'", null).getQueryIterable().toList();
if (databaseList.size() > 0) {
client.deleteDatabase(databaseList.get(0).getSelfLink(), null);
}
}
}
@Before
public void testInit() {
storage = new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey(CosmosAuthKey);
setContainerId(CosmosCollectionName);
setCosmosDbEndpoint(CosmosServiceEndpoint);
setDatabaseId(CosmosDatabaseName);
}});
}
@After
public void testCleanup() {
storage = null;
}
@Test
public void constructorShouldThrowOnInvalidOptions() {
try {
new CosmosDbPartitionedStorage(null);
Assert.fail("should have thrown for null options");
} catch(IllegalArgumentException e) {
// all good
}
try {
new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey("test");
setContainerId("testId");
setDatabaseId("testDb");
}});
Assert.fail("should have thrown for missing end point");
} catch (IllegalArgumentException e) {
}
try {
new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey(null);
setContainerId("testId");
setDatabaseId("testDb");
setCosmosDbEndpoint("testEndpoint");
}});
Assert.fail("should have thrown for missing auth key");
} catch (IllegalArgumentException e) {
}
try {
new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey("testAuthKey");
setContainerId("testId");
setDatabaseId(null);
setCosmosDbEndpoint("testEndpoint");
}});
Assert.fail("should have thrown for missing db id");
} catch (IllegalArgumentException e) {
}
try {
new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey("testAuthKey");
setContainerId(null);
setDatabaseId("testDb");
setCosmosDbEndpoint("testEndpoint");
}});
Assert.fail("should have thrown for missing collection id");
} catch (IllegalArgumentException e) {
}
try {
new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey("testAuthKey");
setContainerId("testId");
setDatabaseId("testDb");
setCosmosDbEndpoint("testEndpoint");
setKeySuffix("?#*test");
setCompatibilityMode(false);
}});
Assert.fail("should have thrown for invalid Row Key characters in KeySuffix");
} catch (IllegalArgumentException e) {
}
try {
new CosmosDbPartitionedStorage(new CosmosDbPartitionedStorageOptions() {{
setAuthKey("testAuthKey");
setContainerId("testId");
setDatabaseId("testDb");
setCosmosDbEndpoint("testEndpoint");
setKeySuffix("thisisatest");
setCompatibilityMode(true);
}});
Assert.fail("should have thrown for CompatibilityMode 'true' while using a KeySuffix");
} catch (IllegalArgumentException e) {
}
}
// NOTE: THESE TESTS REQUIRE THAT THE COSMOS DB EMULATOR IS INSTALLED AND STARTED !!!!!!!!!!!!!!!!!
@Test
public void createObjectCosmosDBPartitionTest() {
assertEmulator();
super.createObjectTest(storage);
}
// NOTE: THESE TESTS REQUIRE THAT THE COSMOS DB EMULATOR IS INSTALLED AND STARTED !!!!!!!!!!!!!!!!!
@Test
public void readUnknownCosmosDBPartitionTest() {
assertEmulator();
super.readUnknownTest(storage);
}
// NOTE: THESE TESTS REQUIRE THAT THE COSMOS DB EMULATOR IS INSTALLED AND STARTED !!!!!!!!!!!!!!!!!
@Test
public void updateObjectCosmosDBPartitionTest() {
assertEmulator();
super.updateObjectTest(storage);
}
// NOTE: THESE TESTS REQUIRE THAT THE COSMOS DB EMULATOR IS INSTALLED AND STARTED !!!!!!!!!!!!!!!!!
@Test
public void deleteObjectCosmosDBPartitionTest() {
assertEmulator();
super.deleteObjectTest(storage);
}
// NOTE: THESE TESTS REQUIRE THAT THE COSMOS DB EMULATOR IS INSTALLED AND STARTED !!!!!!!!!!!!!!!!!
@Test
public void deleteUnknownObjectCosmosDBPartitionTest() {
assertEmulator();
storage.delete(new String[] {"unknown_delete"});
}
// NOTE: THESE TESTS REQUIRE THAT THE COSMOS DB EMULATOR IS INSTALLED AND STARTED !!!!!!!!!!!!!!!!!
@Test
public void handleCrazyKeysCosmosDBPartition() {
assertEmulator();
super.handleCrazyKeys(storage);
}
@Test
public void readingEmptyKeysReturnsEmptyDictionary() {
Map<String, Object> state = storage.read(new String[] {}).join();
Assert.assertNotNull(state);
Assert.assertEquals(0, state.size());
}
@Test(expected = IllegalArgumentException.class)
public void readingNullKeysThrowException() {
storage.read(null).join();
}
@Test(expected = IllegalArgumentException.class)
public void writingNullStoreItemsThrowException() {
storage.write(null);
}
@Test
public void writingNoStoreItemsDoesntThrow() {
storage.write(new HashMap<>());
}
private static void createDatabaseIfNeeded(DocumentClient client) throws DocumentClientException {
// Get the database if it exists
List<Database> databaseList = client
.queryDatabases(
"SELECT * FROM root r WHERE r.id='" + CosmosDatabaseName
+ "'", null).getQueryIterable().toList();
if (databaseList.size() == 0) {
// Create the database if it doesn't exist.
Database databaseDefinition = new Database();
databaseDefinition.setId(CosmosDatabaseName);
client.createDatabase(
databaseDefinition, null).getResource();
}
}
private void assertEmulator() {
if (!emulatorIsRunning) {
Assert.fail(NO_EMULATOR_MESSAGE);
}
}
}