зеркало из https://github.com/Azure/YCSB.git
Azure Cosmos [DB] (#1264)
* Adding Azure Cosmos Driver. Still some improvements to make like automatically creating the YCSB database and usertable collection. But this does bring in the all the latest SDKs / etc.
This commit is contained in:
Родитель
e6bd7393fa
Коммит
2f03a2f83a
|
@ -0,0 +1,120 @@
|
|||
<!--
|
||||
Copyright (c) 2018 YCSB contributors.
|
||||
All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You
|
||||
may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied. See the License for the specific language governing
|
||||
permissions and limitations under the License. See accompanying
|
||||
LICENSE file.
|
||||
-->
|
||||
|
||||
## Azure Cosmos Quick Start
|
||||
|
||||
This section describes how to run YCSB on Azure Cosmos.
|
||||
|
||||
For more information on Azure Cosmos see
|
||||
https://azure.microsoft.com/services/cosmos-db/
|
||||
|
||||
### 1. Setup
|
||||
This benchmark expects you to have pre-created the database "ycsb" and
|
||||
collection "usertable" before running the benchmark commands. You can
|
||||
override the default database name with the azurecosmos.databaseName
|
||||
configuration value.
|
||||
|
||||
You must set the uri and the primaryKey in the azurecosmos.properties file in the commands below.
|
||||
$YCSB_HOME/bin/ycsb load azurecosmos -P workloads/workloada -P azurecosmos/conf/azurecosmos.properties
|
||||
$YCSB_HOME/bin/ycsb run azurecosmos -P workloads/workloada -P azurecosmos/conf/azurecosmos.properties
|
||||
|
||||
Optionally you can set the uri and primaryKey as follows:
|
||||
$YCSB_HOME/bin/ycsb load azurecosmos -P workloads/workloada -p azurecosmos.primaryKey=<key from the portal> -p azurecosmos.uri=<uri from the portal>
|
||||
|
||||
### 2. DocumenDB Configuration Parameters
|
||||
|
||||
#### Required parameters
|
||||
|
||||
- azurecosmos.uri < uri string > :
|
||||
- Obtained from the portal and gives a path to your azurecosmos database
|
||||
account. It will look like the following:
|
||||
https://<your account name>.documents.azure.com:443/
|
||||
|
||||
- azurecosmos.primaryKey < key string > :
|
||||
- Obtained from the portal and is the key to use for benchmarking. The
|
||||
primary key is used to allow both read & write operations. If you are
|
||||
doing read only workloads you can substitute the readonly key from the
|
||||
portal.
|
||||
|
||||
#### Options parameters
|
||||
|
||||
- azurecosmos.databaseName < name string > :
|
||||
- Name of the database to use.
|
||||
- Default: ycsb
|
||||
|
||||
- azurecosmos.useSinglePartitionCollection (true | false):
|
||||
- It should be true if you created the collection with a single parition. If
|
||||
you created the collection with a partitioning key this value should be set
|
||||
to false.
|
||||
- Default: true
|
||||
|
||||
- azurecosmos.useUpsert (true | false):
|
||||
- Set to true to allow inserts to update existing documents. If this is
|
||||
false and a document already exists the insert will fail.
|
||||
- Default: false
|
||||
|
||||
- azurecosmos.connectionMode (DirectHttps | Gateway):
|
||||
- Some java operations only work when connecting via the gateway. However
|
||||
the best performance for basic operations like those used by YCSB are
|
||||
obtained by using direct more where the client will connect directly to the
|
||||
master server thats is managing the database and collection.
|
||||
- Default: DirectHttps
|
||||
|
||||
- azurecosmos.consistencyLevel (Strong | BoundedStaleness | Session | Eventual):
|
||||
- This setting defined the level on consistency you want for reads/scans
|
||||
following inserts/updates.
|
||||
- Default: Session
|
||||
|
||||
- azurecosmos.maxRetryAttemptsOnThrottledRequests < integer >
|
||||
- Sets the maximum number of retry attempts for throttled requests
|
||||
- Default: uses default value of azurecosmos Java SDK
|
||||
|
||||
- azurecosmos.maxRetryWaitTimeInSeconds < integer >
|
||||
- Sets the maximum timeout to for retry in seconds
|
||||
- Default: uses default value of azurecosmos Java SDK
|
||||
|
||||
- azurecosmos.useHashQueryForScan (true | false):
|
||||
- This setting indicates whether SCAN operation should use hash query instead of range query.
|
||||
Range query: SELECT * FROM root r WHERE r.id = @startkey
|
||||
Hash query: SELECT TOP @recordcount * FROM root r WHERE r.id >= @startkey
|
||||
- Default: false
|
||||
|
||||
- azurecosmos.maxDegreeOfParallelismForQuery < integer >
|
||||
- Sets the maximum degree of parallelism for the FeedOptions used in Query operation
|
||||
- Default: 0
|
||||
|
||||
- azurecosmos.includeExceptionStackInLog (true | false):
|
||||
- Determines if the full stack when and error happens should be included in the log.
|
||||
The default is false to reduce a lot of log spew.
|
||||
|
||||
- azurecosmos.maxConnectionPoolSize < integer >
|
||||
- This is the number of connections maintained for operations.
|
||||
- See the JAVA SDK documentation for ConnectionPolicy.getMaxPoolSize
|
||||
|
||||
- azurecosmos.idleConnectionTimeout < integer >
|
||||
- This value is in seconds and determines how quickly a connection is recycled.
|
||||
- See the JAVA SDK documentation for ConnectionPolicy.setIdleConnectionTimeout.
|
||||
|
||||
These parameters are also defined in a template configuration file in the
|
||||
following location:
|
||||
$YCSB_HOME/azurecosmos/conf/azurecosmos.properties
|
||||
|
||||
### 3. FAQs
|
||||
|
||||
### 4. Example command
|
||||
./bin/ycsb run azurecosmos -s -P workloads/workloadb -p azurecosmos.primaryKey=<your key eg:45fgt...==> -p azurecosmos.uri=https://<your account>.documents.azure.com:443/ -p recordcount=100 -p operationcount=100
|
|
@ -0,0 +1,56 @@
|
|||
# Copyright (c) 2018 YCSB contributors. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
# may not use this file except in compliance with the License. You
|
||||
# may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License. See accompanying
|
||||
# LICENSE file.
|
||||
|
||||
# Azure Cosmos host uri (ex: https://p3rf.documents.azure.com:443/) and primary key
|
||||
#azurecosmos.primaryKey =
|
||||
#azurecosmos.uri =
|
||||
|
||||
# Databse to be used, if not specified 'ycsb' will be used
|
||||
#azurecosmos.databaseName = ycsb
|
||||
|
||||
# Enable/disable the use of single collection, if not specified a single collection will be used by default
|
||||
# "true" or "false"
|
||||
#azurecosmos.useSinglePartitionCollection = true
|
||||
|
||||
# Specify if upsert should be used instead of createDocument
|
||||
# If not specified, createDocument will be used by default
|
||||
# "true" or "false"
|
||||
#azurecosmos.useUpsert = false
|
||||
|
||||
# Specify if connection policy should use gateway or not
|
||||
# If not specified, direct connectivity with better performance will be used by default
|
||||
# Value can be DirectHttps or Gateway.
|
||||
#azurecosmos.connectionMode = DirectHttps
|
||||
|
||||
# Specify consistency level, values can be Strong, BoundedStaleness, Session or Eventual
|
||||
# If not specified, Session will be used by default
|
||||
azurecosmos.consistencyLevel = Session
|
||||
|
||||
# Specify retry options to use in case of throttled request.
|
||||
# If not specified, default values will be used
|
||||
#azurecosmos.maxRetryAttemptsOnThrottledRequests = 9
|
||||
#azurecosmos.maxRetryWaitTimeInSeconds = 30
|
||||
|
||||
# Specify if hash query should be used in SCAN operation instead of range query.
|
||||
# If not specified, range query will be used by default.
|
||||
#azurecosmos.useHashQueryForScan = true
|
||||
|
||||
# Specify if the 'id' property should be used in SCAN operation.
|
||||
# If not specified, the 'docid' property will be used by default.
|
||||
#azurecosmos.useIdPropertyForScan = true
|
||||
|
||||
# Specify the maximum degree of parallelism for the FeedOptions used in Query operation.
|
||||
# If not specified it will take 0 as the default value.
|
||||
#azurecosmos.maxDegreeOfParallelismForQuery = 0
|
|
@ -0,0 +1,88 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright (c) 2018 YCSB contributors. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You
|
||||
may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied. See the License for the specific language governing
|
||||
permissions and limitations under the License. See accompanying
|
||||
LICENSE file.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>binding-parent</artifactId>
|
||||
<version>0.16.0-SNAPSHOT</version>
|
||||
<relativePath>../binding-parent</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>azurecosmos-binding</artifactId>
|
||||
<name>Azure Cosmos Binding</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<checkstyle.failOnViolation>false</checkstyle.failOnViolation>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-documentdb</artifactId>
|
||||
<version>${azurecosmos.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.7.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<version>1.2.17</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.15</version>
|
||||
<configuration>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<configLocation>../checkstyle.xml</configLocation>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
<failsOnError>true</failsOnError>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>validate</id>
|
||||
<phase>validate</phase>
|
||||
<goals>
|
||||
<goal>checkstyle</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,454 @@
|
|||
/*
|
||||
* Copyright (c) 2018 YCSB contributors. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
* may not use this file except in compliance with the License. You
|
||||
* may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License. See accompanying
|
||||
* LICENSE file.
|
||||
*/
|
||||
|
||||
// Authors: Anthony F. Voellm and Khoa Dang
|
||||
|
||||
package com.yahoo.ycsb.db;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.microsoft.azure.documentdb.*;
|
||||
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.DB;
|
||||
import com.yahoo.ycsb.DBException;
|
||||
import com.yahoo.ycsb.Status;
|
||||
import com.yahoo.ycsb.StringByteIterator;
|
||||
|
||||
/**
|
||||
* Azure Cosmos DB v1.00 client for YCSB.
|
||||
*/
|
||||
|
||||
public class AzureCosmosClient extends DB {
|
||||
// Document uri naming
|
||||
private static final String DATABASES_PATH_SEGMENT = "dbs";
|
||||
private static final String COLLECTIONS_PATH_SEGMENT = "colls";
|
||||
private static final String DOCUMENTS_PATH_SEGMENT = "docs";
|
||||
|
||||
// Default configuration values
|
||||
private static final String DEFAULT_CONSISTENCY_LEVEL = "Session";
|
||||
private static final String DEFAULT_DATABASE_NAME = "ycsb";
|
||||
private static final String DEFAULT_CONNECTION_MODE = "DirectHttps";
|
||||
private static final boolean DEFAULT_USE_SINGLE_PARTITION_COLLECTION = true;
|
||||
private static final boolean DEFAULT_USE_UPSERT = false;
|
||||
private static final boolean DEFAULT_USE_HASH_QUERY_FOR_SCAN = false;
|
||||
private static final int DEFAULT_MAX_DEGREE_OF_PARALLELISM_FOR_QUERY = 0;
|
||||
private static final boolean DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG = false;
|
||||
|
||||
private static final int NS_IN_US = 1000;
|
||||
private static final String NA_STRING = "N/A";
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AzureCosmosClient.class);
|
||||
|
||||
/**
|
||||
* Count the number of times initialized to teardown on the last
|
||||
* {@link #cleanup()}.
|
||||
*/
|
||||
private static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
|
||||
|
||||
private static DocumentClient client;
|
||||
|
||||
private String queryText;
|
||||
private String databaseName;
|
||||
private boolean useSinglePartitionCollection;
|
||||
private boolean useUpsert;
|
||||
private boolean useHashQueryForScan;
|
||||
private int maxDegreeOfParallelismForQuery;
|
||||
private boolean includeExceptionStackInLog;
|
||||
|
||||
@Override
|
||||
public synchronized void init() throws DBException {
|
||||
INIT_COUNT.incrementAndGet();
|
||||
if (client != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
initAzureCosmosClient();
|
||||
}
|
||||
|
||||
private void initAzureCosmosClient() throws DBException {
|
||||
// Connection properties
|
||||
String primaryKey = this.getStringProperty("azurecosmos.primaryKey", null);
|
||||
if (StringUtils.isEmpty(primaryKey)) {
|
||||
throw new DBException("Missing primary key required to connect to the database.");
|
||||
}
|
||||
|
||||
String uri = getProperties().getProperty("azurecosmos.uri", null);
|
||||
if (StringUtils.isEmpty(uri)) {
|
||||
throw new DBException("Missing uri required to connect to the database.");
|
||||
}
|
||||
|
||||
this.useSinglePartitionCollection = this.getBooleanProperty(
|
||||
"azurecosmos.useSinglePartitionCollection",
|
||||
DEFAULT_USE_SINGLE_PARTITION_COLLECTION);
|
||||
|
||||
this.useUpsert = this.getBooleanProperty(
|
||||
"azurecosmos.useUpsert",
|
||||
DEFAULT_USE_UPSERT);
|
||||
|
||||
this.useHashQueryForScan = this.getBooleanProperty(
|
||||
"azurecosmos.useHashQueryForScan",
|
||||
DEFAULT_USE_HASH_QUERY_FOR_SCAN);
|
||||
|
||||
this.databaseName = this.getStringProperty(
|
||||
"azurecosmos.databaseName",
|
||||
DEFAULT_DATABASE_NAME);
|
||||
|
||||
this.maxDegreeOfParallelismForQuery = this.getIntProperty(
|
||||
"azurecosmos.maxDegreeOfParallelismForQuery",
|
||||
DEFAULT_MAX_DEGREE_OF_PARALLELISM_FOR_QUERY);
|
||||
|
||||
this.includeExceptionStackInLog = this.getBooleanProperty(
|
||||
"azurecosmos.includeExceptionStackInLog",
|
||||
DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG);
|
||||
|
||||
ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(this.getStringProperty(
|
||||
"azurecosmos.consistencyLevel",
|
||||
DEFAULT_CONSISTENCY_LEVEL));
|
||||
String connectionModeString = this.getStringProperty(
|
||||
"azurecosmos.connectionMode",
|
||||
DEFAULT_CONNECTION_MODE);
|
||||
|
||||
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
|
||||
connectionPolicy.setEnableEndpointDiscovery(false);
|
||||
connectionPolicy.setConnectionMode(ConnectionMode.valueOf(connectionModeString));
|
||||
connectionPolicy.setMaxPoolSize(this.getIntProperty("azurecosmos.maxConnectionPoolSize",
|
||||
connectionPolicy.getMaxPoolSize()));
|
||||
connectionPolicy.setIdleConnectionTimeout(this.getIntProperty("azurecosmos.idleConnectionTimeout",
|
||||
connectionPolicy.getIdleConnectionTimeout()));
|
||||
|
||||
RetryOptions retryOptions = new RetryOptions();
|
||||
retryOptions.setMaxRetryAttemptsOnThrottledRequests(this.getIntProperty(
|
||||
"azurecosmos.maxRetryAttemptsOnThrottledRequests",
|
||||
retryOptions.getMaxRetryAttemptsOnThrottledRequests()));
|
||||
retryOptions.setMaxRetryWaitTimeInSeconds(this.getIntProperty(
|
||||
"azurecosmos.maxRetryWaitTimeInSeconds",
|
||||
retryOptions.getMaxRetryWaitTimeInSeconds()));
|
||||
connectionPolicy.setRetryOptions(retryOptions);
|
||||
|
||||
// Query text
|
||||
this.queryText = this.getQueryText();
|
||||
|
||||
try {
|
||||
LOGGER.info("Creating azurecosmos client {}.. connectivityMode={}, consistencyLevel={},"
|
||||
+ " maxRetryAttemptsOnThrottledRequests={}, maxRetryWaitTimeInSeconds={}"
|
||||
+ " useSinglePartitionCollection={}, useUpsert={}, useHashQueryForScan={}, "
|
||||
+ "queryText={}",
|
||||
uri,
|
||||
connectionPolicy.getConnectionMode(),
|
||||
consistencyLevel.toString(),
|
||||
connectionPolicy.getRetryOptions().getMaxRetryAttemptsOnThrottledRequests(),
|
||||
connectionPolicy.getRetryOptions().getMaxRetryWaitTimeInSeconds(),
|
||||
this.useSinglePartitionCollection,
|
||||
this.useUpsert,
|
||||
this.useHashQueryForScan,
|
||||
this.queryText);
|
||||
AzureCosmosClient.client = new DocumentClient(uri, primaryKey, connectionPolicy, consistencyLevel);
|
||||
LOGGER.info("Azure Cosmos connection created: {}", uri);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DBException("Illegal argument passed in. Check the format of your parameters.", e);
|
||||
}
|
||||
|
||||
// Verify the database exists
|
||||
try {
|
||||
AzureCosmosClient.client.readDatabase(getDatabaseLink(this.databaseName), new RequestOptions());
|
||||
} catch (DocumentClientException e) {
|
||||
throw new DBException("Invalid database name (" + this.databaseName + ") or failed to read database.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getStringProperty(String propertyName, String defaultValue) {
|
||||
return getProperties().getProperty(propertyName, defaultValue);
|
||||
}
|
||||
|
||||
private boolean getBooleanProperty(String propertyName, boolean defaultValue) {
|
||||
String stringVal = getProperties().getProperty(propertyName, null);
|
||||
if (stringVal == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
return Boolean.parseBoolean(stringVal);
|
||||
}
|
||||
|
||||
private int getIntProperty(String propertyName, int defaultValue) {
|
||||
String stringVal = getProperties().getProperty(propertyName, null);
|
||||
if (stringVal == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
try {
|
||||
return Integer.parseInt(stringVal);
|
||||
} catch (NumberFormatException e) {
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
private String getQueryText() {
|
||||
return this.useHashQueryForScan ?
|
||||
"SELECT * FROM root r WHERE r.id = @startkey" :
|
||||
"SELECT TOP @recordcount * FROM root r WHERE r.myid >= @startkey";
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup any state for this DB. Called once per DB instance; there is one DB
|
||||
* instance per client thread.
|
||||
*/
|
||||
@Override
|
||||
public void cleanup() throws DBException {
|
||||
if (INIT_COUNT.decrementAndGet() == 0) {
|
||||
try {
|
||||
AzureCosmosClient.client.close();
|
||||
} catch (Exception e) {
|
||||
if (!this.includeExceptionStackInLog) {
|
||||
e = null;
|
||||
}
|
||||
LOGGER.error("Could not close DocumentClient", e);
|
||||
} finally {
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
|
||||
String documentLink = getDocumentLink(this.databaseName, table, key);
|
||||
|
||||
ResourceResponse<Document> readResource = null;
|
||||
Document document;
|
||||
long startTime = System.nanoTime();
|
||||
|
||||
try {
|
||||
readResource = AzureCosmosClient.client.readDocument(documentLink, getRequestOptions(key));
|
||||
document = readResource.getResource();
|
||||
} catch (DocumentClientException e) {
|
||||
if (!this.includeExceptionStackInLog) {
|
||||
e = null;
|
||||
}
|
||||
LOGGER.error("Failed to read key {} in collection {} in database {}", key, table, this.databaseName, e);
|
||||
return Status.ERROR;
|
||||
} finally {
|
||||
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
|
||||
LOGGER.debug("Read key {} in {}us - ActivityID: {}", key, elapsed,
|
||||
readResource != null ? readResource.getActivityId() : NA_STRING);
|
||||
}
|
||||
|
||||
if (null != document) {
|
||||
result.putAll(extractResult(document));
|
||||
LOGGER.trace("Read result: {}", document);
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
|
||||
Vector<HashMap<String, ByteIterator>> result) {
|
||||
long startTime = System.nanoTime();
|
||||
List<Document> documents;
|
||||
FeedResponse<Document> feedResponse = null;
|
||||
try {
|
||||
feedResponse = AzureCosmosClient.client.queryDocuments(getDocumentCollectionLink(this.databaseName, table),
|
||||
new SqlQuerySpec(queryText,
|
||||
new SqlParameterCollection(new SqlParameter("@recordcount", recordcount),
|
||||
new SqlParameter("@startkey", startkey))),
|
||||
getFeedOptions(startkey));
|
||||
documents = feedResponse.getQueryIterable().toList();
|
||||
} catch (Exception e) {
|
||||
if (!this.includeExceptionStackInLog) {
|
||||
e = null;
|
||||
}
|
||||
LOGGER.error("Failed to scan with startKey={}, recordCount={}", startkey, recordcount, e);
|
||||
return Status.ERROR;
|
||||
} finally {
|
||||
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
|
||||
LOGGER.debug("Queried {} records for key {} in {}us - ActivityID: {}",
|
||||
recordcount, startkey, elapsed,
|
||||
feedResponse != null ? feedResponse.getActivityId() : NA_STRING);
|
||||
}
|
||||
|
||||
if (documents != null) {
|
||||
for (Document document : documents) {
|
||||
result.add(this.extractResult(document));
|
||||
}
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status update(String table, String key, Map<String, ByteIterator> values) {
|
||||
String documentLink = getDocumentLink(this.databaseName, table, key);
|
||||
Document document = getDocumentDefinition(key, values);
|
||||
|
||||
RequestOptions reqOptions = getRequestOptions(key);
|
||||
if (reqOptions == null) {
|
||||
reqOptions = new RequestOptions();
|
||||
}
|
||||
AccessCondition accessCondition = new AccessCondition();
|
||||
accessCondition.setCondition(document.getETag());
|
||||
accessCondition.setType(AccessConditionType.IfMatch);
|
||||
reqOptions.setAccessCondition(accessCondition);
|
||||
|
||||
ResourceResponse<Document> updatedResource = null;
|
||||
long startTime = System.nanoTime();
|
||||
try {
|
||||
updatedResource = AzureCosmosClient.client.replaceDocument(documentLink, document, reqOptions);
|
||||
} catch (DocumentClientException e) {
|
||||
if (!this.includeExceptionStackInLog) {
|
||||
e = null;
|
||||
}
|
||||
LOGGER.error("Failed to update key {}", key, e);
|
||||
return Status.ERROR;
|
||||
} finally {
|
||||
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
|
||||
LOGGER.debug("Updated key {} in {}us - ActivityID: {}", key, elapsed,
|
||||
updatedResource != null ? updatedResource.getActivityId() : NA_STRING);
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status insert(String table, String key, Map<String, ByteIterator> values) {
|
||||
Document documentDefinition = getDocumentDefinition(key, values);
|
||||
ResourceResponse<Document> resourceResponse = null;
|
||||
long startTime = System.nanoTime();
|
||||
try {
|
||||
if (this.useUpsert) {
|
||||
resourceResponse = AzureCosmosClient.client.upsertDocument(getDocumentCollectionLink(this.databaseName, table),
|
||||
documentDefinition,
|
||||
getRequestOptions(key),
|
||||
true);
|
||||
} else {
|
||||
resourceResponse = AzureCosmosClient.client.createDocument(getDocumentCollectionLink(this.databaseName, table),
|
||||
documentDefinition,
|
||||
getRequestOptions(key),
|
||||
true);
|
||||
}
|
||||
|
||||
} catch (DocumentClientException e) {
|
||||
if (!this.includeExceptionStackInLog) {
|
||||
e = null;
|
||||
}
|
||||
LOGGER.error("Failed to insert key {} to collection {} in database {}", key, table, this.databaseName, e);
|
||||
return Status.ERROR;
|
||||
} finally {
|
||||
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
|
||||
LOGGER.debug("Inserted key {} in {}us - ActivityID: {}", key, elapsed,
|
||||
resourceResponse != null ? resourceResponse.getActivityId() : NA_STRING);
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status delete(String table, String key) {
|
||||
ResourceResponse<Document> deletedResource = null;
|
||||
long startTime = System.nanoTime();
|
||||
try {
|
||||
deletedResource = AzureCosmosClient.client.deleteDocument(getDocumentLink(this.databaseName, table, key),
|
||||
getRequestOptions(key));
|
||||
} catch (DocumentClientException e) {
|
||||
if (!this.includeExceptionStackInLog) {
|
||||
e = null;
|
||||
}
|
||||
LOGGER.error("Failed to delete key {} in collection {} in database {}", key, table, this.databaseName, e);
|
||||
return Status.ERROR;
|
||||
} finally {
|
||||
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
|
||||
LOGGER.debug("Deleted key {} in {}us - ActivityID: {}", key, elapsed,
|
||||
deletedResource != null ? deletedResource.getActivityId() : NA_STRING);
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
private HashMap<String, ByteIterator> extractResult(Document item) {
|
||||
if (null == item) {
|
||||
return null;
|
||||
}
|
||||
HashMap<String, ByteIterator> rItems = new HashMap<>(item.getHashMap().size());
|
||||
|
||||
for (Entry<String, Object> attr : item.getHashMap().entrySet()) {
|
||||
LOGGER.trace("Result- key: {}, value: {}", attr.getKey(), attr.getValue().toString());
|
||||
rItems.put(attr.getKey(), new StringByteIterator(attr.getValue().toString()));
|
||||
}
|
||||
return rItems;
|
||||
}
|
||||
|
||||
private FeedOptions getFeedOptions(String key) {
|
||||
if (useSinglePartitionCollection) {
|
||||
return null;
|
||||
}
|
||||
FeedOptions feedOptions = new FeedOptions();
|
||||
if (this.useHashQueryForScan) {
|
||||
feedOptions.setEnableCrossPartitionQuery(false);
|
||||
feedOptions.setPartitionKey(new PartitionKey(key));
|
||||
} else {
|
||||
feedOptions.setEnableCrossPartitionQuery(true);
|
||||
feedOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelismForQuery);
|
||||
}
|
||||
return feedOptions;
|
||||
}
|
||||
|
||||
private RequestOptions getRequestOptions(String key) {
|
||||
RequestOptions requestOptions = new RequestOptions();
|
||||
requestOptions.setPartitionKey(new PartitionKey(key));
|
||||
return requestOptions;
|
||||
}
|
||||
|
||||
private static String getDatabaseLink(String databaseName) {
|
||||
return String.format("%s/%s", DATABASES_PATH_SEGMENT, databaseName);
|
||||
}
|
||||
|
||||
private static String getDocumentCollectionLink(String databaseName, String table) {
|
||||
return String.format("%s/%s/%s",
|
||||
getDatabaseLink(databaseName),
|
||||
COLLECTIONS_PATH_SEGMENT,
|
||||
table);
|
||||
}
|
||||
|
||||
private static String getDocumentLink(String databaseName, String table, String key) {
|
||||
return String.format("%s/%s/%s",
|
||||
getDocumentCollectionLink(databaseName, table),
|
||||
DOCUMENTS_PATH_SEGMENT,
|
||||
key);
|
||||
}
|
||||
|
||||
private Document getDocumentDefinition(String key, Map<String, ByteIterator> values) {
|
||||
Document document = new Document();
|
||||
document.set("id", key);
|
||||
if (!this.useHashQueryForScan) {
|
||||
// This field is only needed for range scans.
|
||||
// Even if this field is present in the document you
|
||||
// should still partition on id for simplicity of config.
|
||||
document.set("myid", key);
|
||||
}
|
||||
for (Entry<String, ByteIterator> entry : values.entrySet()) {
|
||||
document.set(entry.getKey(), entry.getValue().toString());
|
||||
}
|
||||
return document;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright 2018 YCSB Contributors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
* may not use this file except in compliance with the License. You
|
||||
* may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License. See accompanying
|
||||
* LICENSE file.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The YCSB binding for <a href="https://azure.microsoft.com/services/cosmos-db/">Azure Cosmos</a>.
|
||||
*/
|
||||
package com.yahoo.ycsb.db;
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright (c) 2018 YCSB contributors. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
# may not use this file except in compliance with the License. You
|
||||
# may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License. See accompanying
|
||||
# LICENSE file.
|
||||
|
||||
#define the console appender
|
||||
log4j.appender.consoleAppender = org.apache.log4j.ConsoleAppender
|
||||
|
||||
# now define the layout for the appender
|
||||
log4j.appender.consoleAppender.layout = org.apache.log4j.PatternLayout
|
||||
log4j.appender.consoleAppender.layout.ConversionPattern=%-4r [%t] %-5p %c %x -%m%n
|
||||
|
||||
# now map our console appender as a root logger, means all log messages will go
|
||||
# to this appender
|
||||
log4j.rootLogger = INFO, consoleAppender
|
||||
|
||||
# Set HTTP components' logger to INFO
|
||||
log4j.category.org.apache.http=INFO
|
||||
log4j.category.org.apache.http.wire=INFO
|
||||
log4j.category.org.apache.http.headers=INFO
|
|
@ -33,6 +33,7 @@ aerospike:com.yahoo.ycsb.db.AerospikeClient
|
|||
asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient
|
||||
arangodb:com.yahoo.ycsb.db.arangodb.ArangoDBClient
|
||||
arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDBClient
|
||||
azurecosmos:com.yahoo.ycsb.db.AzureCosmosClient
|
||||
azuredocumentdb:com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient
|
||||
azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient
|
||||
basic:com.yahoo.ycsb.BasicDB
|
||||
|
|
1
bin/ycsb
1
bin/ycsb
|
@ -59,6 +59,7 @@ DATABASES = {
|
|||
"arangodb" : "com.yahoo.ycsb.db.arangodb.ArangoDBClient",
|
||||
"arangodb3" : "com.yahoo.ycsb.db.arangodb.ArangoDBClient",
|
||||
"asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient",
|
||||
"azurecosmos" : "com.yahoo.ycsb.db.AzureCosmosClient",
|
||||
"azuredocumentdb" : "com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient",
|
||||
"azuretablestorage" : "com.yahoo.ycsb.db.azuretablestorage.AzureClient",
|
||||
"basic" : "com.yahoo.ycsb.BasicDB",
|
||||
|
|
|
@ -89,6 +89,11 @@ LICENSE file.
|
|||
<artifactId>couchbase2-binding</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>azurecosmos-binding</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>azuredocumentdb-binding</artifactId>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -72,6 +72,7 @@ LICENSE file.
|
|||
<aerospike.version>3.1.2</aerospike.version>
|
||||
<arangodb.version>4.4.1</arangodb.version>
|
||||
<asynchbase.version>1.7.1</asynchbase.version>
|
||||
<azurecosmos.version>2.0.0</azurecosmos.version>
|
||||
<azuredocumentdb.version>1.8.1</azuredocumentdb.version>
|
||||
<azurestorage.version>4.0.0</azurestorage.version>
|
||||
<cassandra.cql.version>3.0.0</cassandra.cql.version>
|
||||
|
@ -121,6 +122,7 @@ LICENSE file.
|
|||
<module>aerospike</module>
|
||||
<module>arangodb</module>
|
||||
<module>asynchbase</module>
|
||||
<module>azurecosmos</module>
|
||||
<module>azuredocumentdb</module>
|
||||
<module>azuretablestorage</module>
|
||||
<module>cassandra</module>
|
||||
|
|
Загрузка…
Ссылка в новой задаче