зеркало из https://github.com/Azure/YCSB.git
Merge pull request #647 from risdenk/pr-481
[dynamodb] Checkstyle updates for DynamoDB.
This commit is contained in:
Коммит
afb90c3019
|
@ -50,4 +50,28 @@ LICENSE file.
|
|||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.15</version>
|
||||
<configuration>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<configLocation>../checkstyle.xml</configLocation>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
<failsOnError>true</failsOnError>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>validate</id>
|
||||
<phase>validate</phase>
|
||||
<goals>
|
||||
<goal>checkstyle</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
|
||||
* Copyright 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2015-2016 YCSB Contributors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
@ -22,21 +22,8 @@ import com.amazonaws.ClientConfiguration;
|
|||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.PropertiesCredentials;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
||||
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
|
||||
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
|
||||
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
|
||||
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.ScanResult;
|
||||
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.DB;
|
||||
import com.yahoo.ycsb.DBException;
|
||||
import com.yahoo.ycsb.Status;
|
||||
import com.yahoo.ycsb.StringByteIterator;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.model.*;
|
||||
import com.yahoo.ycsb.*;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
|
@ -48,308 +35,307 @@ import java.util.Set;
|
|||
import java.util.Vector;
|
||||
|
||||
/**
|
||||
* DynamoDB v1.10.48 client for YCSB
|
||||
* DynamoDB v1.10.48 client for YCSB.
|
||||
*/
|
||||
|
||||
public class DynamoDBClient extends DB {
|
||||
|
||||
/**
|
||||
* Defines the primary key type used in this particular DB instance.
|
||||
*
|
||||
* By default, the primary key type is "HASH". Optionally, the user can
|
||||
* choose to use hash_and_range key type. See documentation in the
|
||||
* DynamoDB.Properties file for more details.
|
||||
*/
|
||||
private enum PrimaryKeyType {
|
||||
HASH,
|
||||
HASH_AND_RANGE
|
||||
/**
|
||||
* Defines the primary key type used in this particular DB instance.
|
||||
* <p>
|
||||
* By default, the primary key type is "HASH". Optionally, the user can
|
||||
* choose to use hash_and_range key type. See documentation in the
|
||||
* DynamoDB.Properties file for more details.
|
||||
*/
|
||||
private enum PrimaryKeyType {
|
||||
HASH,
|
||||
HASH_AND_RANGE
|
||||
}
|
||||
|
||||
private AmazonDynamoDBClient dynamoDB;
|
||||
private String primaryKeyName;
|
||||
private PrimaryKeyType primaryKeyType = PrimaryKeyType.HASH;
|
||||
|
||||
// If the user choose to use HASH_AND_RANGE as primary key type, then
|
||||
// the following two variables become relevant. See documentation in the
|
||||
// DynamoDB.Properties file for more details.
|
||||
private String hashKeyValue;
|
||||
private String hashKeyName;
|
||||
|
||||
private boolean consistentRead = false;
|
||||
private String endpoint = "http://dynamodb.us-east-1.amazonaws.com";
|
||||
private int maxConnects = 50;
|
||||
private static final Logger LOGGER = Logger.getLogger(DynamoDBClient.class);
|
||||
private static final Status CLIENT_ERROR = new Status("CLIENT_ERROR", "An error occurred on the client.");
|
||||
private static final String DEFAULT_HASH_KEY_VALUE = "YCSB_0";
|
||||
|
||||
@Override
|
||||
public void init() throws DBException {
|
||||
String debug = getProperties().getProperty("dynamodb.debug", null);
|
||||
|
||||
if (null != debug && "true".equalsIgnoreCase(debug)) {
|
||||
LOGGER.setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
private AmazonDynamoDBClient dynamoDB;
|
||||
private String primaryKeyName;
|
||||
private PrimaryKeyType primaryKeyType = PrimaryKeyType.HASH;
|
||||
String configuredEndpoint = getProperties().getProperty("dynamodb.endpoint", null);
|
||||
String credentialsFile = getProperties().getProperty("dynamodb.awsCredentialsFile", null);
|
||||
String primaryKey = getProperties().getProperty("dynamodb.primaryKey", null);
|
||||
String primaryKeyTypeString = getProperties().getProperty("dynamodb.primaryKeyType", null);
|
||||
String consistentReads = getProperties().getProperty("dynamodb.consistentReads", null);
|
||||
String connectMax = getProperties().getProperty("dynamodb.connectMax", null);
|
||||
|
||||
// If the user choose to use HASH_AND_RANGE as primary key type, then
|
||||
// the following two variables become relevant. See documentation in the
|
||||
// DynamoDB.Properties file for more details.
|
||||
private String hashKeyValue;
|
||||
private String hashKeyName;
|
||||
|
||||
private boolean debug = false;
|
||||
private boolean consistentRead = false;
|
||||
private String endpoint = "http://dynamodb.us-east-1.amazonaws.com";
|
||||
private int maxConnects = 50;
|
||||
private static Logger logger = Logger.getLogger(DynamoDBClient.class);
|
||||
private static final Status CLIENT_ERROR = new Status("CLIENT_ERROR",
|
||||
"An error occurred on the client.");
|
||||
private static final String DEFAULT_HASH_KEY_VALUE = "YCSB_0";
|
||||
|
||||
/**
|
||||
* Initialize any state for this DB. Called once per DB instance; there is
|
||||
* one DB instance per client thread.
|
||||
*/
|
||||
@Override
|
||||
public void init() throws DBException {
|
||||
// initialize DynamoDb driver & table.
|
||||
String debug = getProperties().getProperty("dynamodb.debug", null);
|
||||
|
||||
if (null != debug && "true".equalsIgnoreCase(debug)) {
|
||||
logger.setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
String endpoint = getProperties().getProperty("dynamodb.endpoint", null);
|
||||
String credentialsFile = getProperties().getProperty("dynamodb.awsCredentialsFile", null);
|
||||
String primaryKey = getProperties().getProperty("dynamodb.primaryKey", null);
|
||||
String primaryKeyTypeString = getProperties().getProperty("dynamodb.primaryKeyType", null);
|
||||
String consistentReads = getProperties().getProperty("dynamodb.consistentReads", null);
|
||||
String connectMax = getProperties().getProperty("dynamodb.connectMax", null);
|
||||
|
||||
if (null != connectMax) {
|
||||
this.maxConnects = Integer.parseInt(connectMax);
|
||||
}
|
||||
|
||||
if (null != consistentReads && "true".equalsIgnoreCase(consistentReads)) {
|
||||
this.consistentRead = true;
|
||||
}
|
||||
|
||||
if (null != endpoint) {
|
||||
this.endpoint = endpoint;
|
||||
}
|
||||
|
||||
if (null == primaryKey || primaryKey.length() < 1) {
|
||||
String errMsg = "Missing primary key attribute name, cannot continue";
|
||||
logger.error(errMsg);
|
||||
}
|
||||
|
||||
if (null != primaryKeyTypeString) {
|
||||
try {
|
||||
this.primaryKeyType = PrimaryKeyType.valueOf(
|
||||
primaryKeyTypeString.trim().toUpperCase());
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DBException("Invalid primary key mode specified: " +
|
||||
primaryKeyTypeString + ". Expecting HASH or HASH_AND_RANGE.");
|
||||
}
|
||||
}
|
||||
|
||||
if (this.primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
|
||||
// When the primary key type is HASH_AND_RANGE, keys used by YCSB
|
||||
// are range keys so we can benchmark performance of individual hash
|
||||
// partitions. In this case, the user must specify the hash key's name
|
||||
// and optionally can designate a value for the hash key.
|
||||
|
||||
String hashKeyName = getProperties().getProperty("dynamodb.hashKeyName", null);
|
||||
if (null == hashKeyName || hashKeyName.isEmpty()) {
|
||||
throw new DBException("Must specify a non-empty hash key name " +
|
||||
"when the primary key type is HASH_AND_RANGE.");
|
||||
}
|
||||
this.hashKeyName = hashKeyName;
|
||||
this.hashKeyValue = getProperties().getProperty(
|
||||
"dynamodb.hashKeyValue", DEFAULT_HASH_KEY_VALUE);
|
||||
}
|
||||
|
||||
try {
|
||||
AWSCredentials credentials = new PropertiesCredentials(new File(credentialsFile));
|
||||
ClientConfiguration cconfig = new ClientConfiguration();
|
||||
cconfig.setMaxConnections(maxConnects);
|
||||
dynamoDB = new AmazonDynamoDBClient(credentials, cconfig);
|
||||
dynamoDB.setEndpoint(this.endpoint);
|
||||
primaryKeyName = primaryKey;
|
||||
logger.info("dynamodb connection created with " + this.endpoint);
|
||||
} catch (Exception e1) {
|
||||
String errMsg = "DynamoDBClient.init(): Could not initialize DynamoDB client: " + e1.getMessage();
|
||||
logger.error(errMsg);
|
||||
}
|
||||
if (null != connectMax) {
|
||||
this.maxConnects = Integer.parseInt(connectMax);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status read(String table, String key, Set<String> fields,
|
||||
HashMap<String, ByteIterator> result) {
|
||||
|
||||
logger.debug("readkey: " + key + " from table: " + table);
|
||||
GetItemRequest req = new GetItemRequest(table, createPrimaryKey(key));
|
||||
req.setAttributesToGet(fields);
|
||||
req.setConsistentRead(consistentRead);
|
||||
GetItemResult res = null;
|
||||
|
||||
try {
|
||||
res = dynamoDB.getItem(req);
|
||||
}catch (AmazonServiceException ex) {
|
||||
logger.error(ex.getMessage());
|
||||
return Status.ERROR;
|
||||
}catch (AmazonClientException ex){
|
||||
logger.error(ex.getMessage());
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
|
||||
if (null != res.getItem()) {
|
||||
result.putAll(extractResult(res.getItem()));
|
||||
logger.debug("Result: " + res.toString());
|
||||
}
|
||||
return Status.OK;
|
||||
if (null != consistentReads && "true".equalsIgnoreCase(consistentReads)) {
|
||||
this.consistentRead = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status scan(String table, String startkey, int recordcount,
|
||||
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
|
||||
logger.debug("scan " + recordcount + " records from key: " + startkey + " on table: " + table);
|
||||
/*
|
||||
* on DynamoDB's scan, startkey is *exclusive* so we need to
|
||||
* getItem(startKey) and then use scan for the res
|
||||
*/
|
||||
GetItemRequest greq = new GetItemRequest(table, createPrimaryKey(startkey));
|
||||
greq.setAttributesToGet(fields);
|
||||
|
||||
GetItemResult gres = null;
|
||||
|
||||
try {
|
||||
gres = dynamoDB.getItem(greq);
|
||||
}catch (AmazonServiceException ex) {
|
||||
logger.error(ex.getMessage());
|
||||
return Status.ERROR;
|
||||
}catch (AmazonClientException ex){
|
||||
logger.error(ex.getMessage());
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
|
||||
if (null != gres.getItem()) {
|
||||
result.add(extractResult(gres.getItem()));
|
||||
}
|
||||
|
||||
int count = 1; // startKey is done, rest to go.
|
||||
|
||||
Map<String, AttributeValue> startKey = createPrimaryKey(startkey);
|
||||
ScanRequest req = new ScanRequest(table);
|
||||
req.setAttributesToGet(fields);
|
||||
while (count < recordcount) {
|
||||
req.setExclusiveStartKey(startKey);
|
||||
req.setLimit(recordcount - count);
|
||||
ScanResult res = null;
|
||||
try {
|
||||
res = dynamoDB.scan(req);
|
||||
}catch (AmazonServiceException ex) {
|
||||
logger.error(ex.getMessage());
|
||||
ex.printStackTrace();
|
||||
return Status.ERROR;
|
||||
}catch (AmazonClientException ex){
|
||||
logger.error(ex.getMessage());
|
||||
ex.printStackTrace();
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
|
||||
count += res.getCount();
|
||||
for (Map<String, AttributeValue> items : res.getItems()) {
|
||||
result.add(extractResult(items));
|
||||
}
|
||||
startKey = res.getLastEvaluatedKey();
|
||||
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
if (null != configuredEndpoint) {
|
||||
this.endpoint = configuredEndpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
logger.debug("updatekey: " + key + " from table: " + table);
|
||||
|
||||
Map<String, AttributeValueUpdate> attributes = new HashMap<String, AttributeValueUpdate>(
|
||||
values.size());
|
||||
for (Entry<String, ByteIterator> val : values.entrySet()) {
|
||||
AttributeValue v = new AttributeValue(val.getValue().toString());
|
||||
attributes.put(val.getKey(), new AttributeValueUpdate()
|
||||
.withValue(v).withAction("PUT"));
|
||||
}
|
||||
|
||||
UpdateItemRequest req = new UpdateItemRequest(table, createPrimaryKey(key), attributes);
|
||||
|
||||
try {
|
||||
dynamoDB.updateItem(req);
|
||||
}catch (AmazonServiceException ex) {
|
||||
logger.error(ex.getMessage());
|
||||
return Status.ERROR;
|
||||
}catch (AmazonClientException ex){
|
||||
logger.error(ex.getMessage());
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
return Status.OK;
|
||||
if (null == primaryKey || primaryKey.length() < 1) {
|
||||
throw new DBException("Missing primary key attribute name, cannot continue");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
logger.debug("insertkey: " + primaryKeyName + "-" + key + " from table: " + table);
|
||||
Map<String, AttributeValue> attributes = createAttributes(values);
|
||||
// adding primary key
|
||||
attributes.put(primaryKeyName, new AttributeValue(key));
|
||||
if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
|
||||
// If the primary key type is HASH_AND_RANGE, then what has been put
|
||||
// into the attributes map above is the range key part of the primary
|
||||
// key, we still need to put in the hash key part here.
|
||||
attributes.put(hashKeyName, new AttributeValue(hashKeyValue));
|
||||
}
|
||||
|
||||
PutItemRequest putItemRequest = new PutItemRequest(table, attributes);
|
||||
try {
|
||||
dynamoDB.putItem(putItemRequest);
|
||||
}catch (AmazonServiceException ex) {
|
||||
logger.error(ex.getMessage());
|
||||
return Status.ERROR;
|
||||
}catch (AmazonClientException ex){
|
||||
logger.error(ex.getMessage());
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
return Status.OK;
|
||||
if (null != primaryKeyTypeString) {
|
||||
try {
|
||||
this.primaryKeyType = PrimaryKeyType.valueOf(primaryKeyTypeString.trim().toUpperCase());
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DBException("Invalid primary key mode specified: " + primaryKeyTypeString +
|
||||
". Expecting HASH or HASH_AND_RANGE.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status delete(String table, String key) {
|
||||
logger.debug("deletekey: " + key + " from table: " + table);
|
||||
DeleteItemRequest req = new DeleteItemRequest(table, createPrimaryKey(key));
|
||||
if (this.primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
|
||||
// When the primary key type is HASH_AND_RANGE, keys used by YCSB
|
||||
// are range keys so we can benchmark performance of individual hash
|
||||
// partitions. In this case, the user must specify the hash key's name
|
||||
// and optionally can designate a value for the hash key.
|
||||
|
||||
try {
|
||||
dynamoDB.deleteItem(req);
|
||||
}catch (AmazonServiceException ex) {
|
||||
logger.error(ex.getMessage());
|
||||
return Status.ERROR;
|
||||
}catch (AmazonClientException ex){
|
||||
logger.error(ex.getMessage());
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
return Status.OK;
|
||||
String configuredHashKeyName = getProperties().getProperty("dynamodb.hashKeyName", null);
|
||||
if (null == configuredHashKeyName || configuredHashKeyName.isEmpty()) {
|
||||
throw new DBException("Must specify a non-empty hash key name when the primary key type is HASH_AND_RANGE.");
|
||||
}
|
||||
this.hashKeyName = configuredHashKeyName;
|
||||
this.hashKeyValue = getProperties().getProperty("dynamodb.hashKeyValue", DEFAULT_HASH_KEY_VALUE);
|
||||
}
|
||||
|
||||
private static Map<String, AttributeValue> createAttributes(
|
||||
HashMap<String, ByteIterator> values) {
|
||||
Map<String, AttributeValue> attributes = new HashMap<String, AttributeValue>(
|
||||
values.size() + 1); //leave space for the PrimaryKey
|
||||
for (Entry<String, ByteIterator> val : values.entrySet()) {
|
||||
attributes.put(val.getKey(), new AttributeValue(val.getValue()
|
||||
.toString()));
|
||||
}
|
||||
return attributes;
|
||||
try {
|
||||
AWSCredentials credentials = new PropertiesCredentials(new File(credentialsFile));
|
||||
ClientConfiguration cconfig = new ClientConfiguration();
|
||||
cconfig.setMaxConnections(maxConnects);
|
||||
dynamoDB = new AmazonDynamoDBClient(credentials, cconfig);
|
||||
dynamoDB.setEndpoint(this.endpoint);
|
||||
primaryKeyName = primaryKey;
|
||||
LOGGER.info("dynamodb connection created with " + this.endpoint);
|
||||
} catch (Exception e1) {
|
||||
LOGGER.error("DynamoDBClient.init(): Could not initialize DynamoDB client.", e1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("readkey: " + key + " from table: " + table);
|
||||
}
|
||||
|
||||
private HashMap<String, ByteIterator> extractResult(Map<String, AttributeValue> item) {
|
||||
if(null == item)
|
||||
return null;
|
||||
HashMap<String, ByteIterator> rItems = new HashMap<String, ByteIterator>(item.size());
|
||||
GetItemRequest req = new GetItemRequest(table, createPrimaryKey(key));
|
||||
req.setAttributesToGet(fields);
|
||||
req.setConsistentRead(consistentRead);
|
||||
GetItemResult res;
|
||||
|
||||
for (Entry<String, AttributeValue> attr : item.entrySet()) {
|
||||
logger.debug(String.format("Result- key: %s, value: %s", attr.getKey(), attr.getValue()));
|
||||
rItems.put(attr.getKey(), new StringByteIterator(attr.getValue().getS()));
|
||||
}
|
||||
return rItems;
|
||||
try {
|
||||
res = dynamoDB.getItem(req);
|
||||
} catch (AmazonServiceException ex) {
|
||||
LOGGER.error(ex);
|
||||
return Status.ERROR;
|
||||
} catch (AmazonClientException ex) {
|
||||
LOGGER.error(ex);
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
|
||||
private Map<String, AttributeValue> createPrimaryKey(String key) {
|
||||
Map<String, AttributeValue> k = new HashMap<String, AttributeValue>();
|
||||
if (primaryKeyType == PrimaryKeyType.HASH) {
|
||||
k.put(primaryKeyName, new AttributeValue().withS(key));
|
||||
} else if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
|
||||
k.put(hashKeyName, new AttributeValue().withS(hashKeyValue));
|
||||
k.put(primaryKeyName, new AttributeValue().withS(key));
|
||||
} else {
|
||||
throw new RuntimeException("Assertion Error: impossible primary key"
|
||||
+ " type");
|
||||
}
|
||||
return k;
|
||||
if (null != res.getItem()) {
|
||||
result.putAll(extractResult(res.getItem()));
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("Result: " + res.toString());
|
||||
}
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status scan(String table, String startkey, int recordcount,
|
||||
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
|
||||
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("scan " + recordcount + " records from key: " + startkey + " on table: " + table);
|
||||
}
|
||||
|
||||
/*
|
||||
* on DynamoDB's scan, startkey is *exclusive* so we need to
|
||||
* getItem(startKey) and then use scan for the res
|
||||
*/
|
||||
GetItemRequest greq = new GetItemRequest(table, createPrimaryKey(startkey));
|
||||
greq.setAttributesToGet(fields);
|
||||
|
||||
GetItemResult gres;
|
||||
|
||||
try {
|
||||
gres = dynamoDB.getItem(greq);
|
||||
} catch (AmazonServiceException ex) {
|
||||
LOGGER.error(ex);
|
||||
return Status.ERROR;
|
||||
} catch (AmazonClientException ex) {
|
||||
LOGGER.error(ex);
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
|
||||
if (null != gres.getItem()) {
|
||||
result.add(extractResult(gres.getItem()));
|
||||
}
|
||||
|
||||
int count = 1; // startKey is done, rest to go.
|
||||
|
||||
Map<String, AttributeValue> startKey = createPrimaryKey(startkey);
|
||||
ScanRequest req = new ScanRequest(table);
|
||||
req.setAttributesToGet(fields);
|
||||
while (count < recordcount) {
|
||||
req.setExclusiveStartKey(startKey);
|
||||
req.setLimit(recordcount - count);
|
||||
ScanResult res;
|
||||
try {
|
||||
res = dynamoDB.scan(req);
|
||||
} catch (AmazonServiceException ex) {
|
||||
LOGGER.error(ex);
|
||||
return Status.ERROR;
|
||||
} catch (AmazonClientException ex) {
|
||||
LOGGER.error(ex);
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
|
||||
count += res.getCount();
|
||||
for (Map<String, AttributeValue> items : res.getItems()) {
|
||||
result.add(extractResult(items));
|
||||
}
|
||||
startKey = res.getLastEvaluatedKey();
|
||||
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("updatekey: " + key + " from table: " + table);
|
||||
}
|
||||
|
||||
Map<String, AttributeValueUpdate> attributes = new HashMap<>(values.size());
|
||||
for (Entry<String, ByteIterator> val : values.entrySet()) {
|
||||
AttributeValue v = new AttributeValue(val.getValue().toString());
|
||||
attributes.put(val.getKey(), new AttributeValueUpdate().withValue(v).withAction("PUT"));
|
||||
}
|
||||
|
||||
UpdateItemRequest req = new UpdateItemRequest(table, createPrimaryKey(key), attributes);
|
||||
|
||||
try {
|
||||
dynamoDB.updateItem(req);
|
||||
} catch (AmazonServiceException ex) {
|
||||
LOGGER.error(ex);
|
||||
return Status.ERROR;
|
||||
} catch (AmazonClientException ex) {
|
||||
LOGGER.error(ex);
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("insertkey: " + primaryKeyName + "-" + key + " from table: " + table);
|
||||
}
|
||||
|
||||
Map<String, AttributeValue> attributes = createAttributes(values);
|
||||
// adding primary key
|
||||
attributes.put(primaryKeyName, new AttributeValue(key));
|
||||
if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
|
||||
// If the primary key type is HASH_AND_RANGE, then what has been put
|
||||
// into the attributes map above is the range key part of the primary
|
||||
// key, we still need to put in the hash key part here.
|
||||
attributes.put(hashKeyName, new AttributeValue(hashKeyValue));
|
||||
}
|
||||
|
||||
PutItemRequest putItemRequest = new PutItemRequest(table, attributes);
|
||||
try {
|
||||
dynamoDB.putItem(putItemRequest);
|
||||
} catch (AmazonServiceException ex) {
|
||||
LOGGER.error(ex);
|
||||
return Status.ERROR;
|
||||
} catch (AmazonClientException ex) {
|
||||
LOGGER.error(ex);
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status delete(String table, String key) {
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("deletekey: " + key + " from table: " + table);
|
||||
}
|
||||
|
||||
DeleteItemRequest req = new DeleteItemRequest(table, createPrimaryKey(key));
|
||||
|
||||
try {
|
||||
dynamoDB.deleteItem(req);
|
||||
} catch (AmazonServiceException ex) {
|
||||
LOGGER.error(ex);
|
||||
return Status.ERROR;
|
||||
} catch (AmazonClientException ex) {
|
||||
LOGGER.error(ex);
|
||||
return CLIENT_ERROR;
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
private static Map<String, AttributeValue> createAttributes(HashMap<String, ByteIterator> values) {
|
||||
//leave space for the PrimaryKey
|
||||
Map<String, AttributeValue> attributes = new HashMap<>(values.size() + 1);
|
||||
for (Entry<String, ByteIterator> val : values.entrySet()) {
|
||||
attributes.put(val.getKey(), new AttributeValue(val.getValue().toString()));
|
||||
}
|
||||
return attributes;
|
||||
}
|
||||
|
||||
private HashMap<String, ByteIterator> extractResult(Map<String, AttributeValue> item) {
|
||||
if (null == item) {
|
||||
return null;
|
||||
}
|
||||
HashMap<String, ByteIterator> rItems = new HashMap<>(item.size());
|
||||
|
||||
for (Entry<String, AttributeValue> attr : item.entrySet()) {
|
||||
if(LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(String.format("Result- key: %s, value: %s", attr.getKey(), attr.getValue()));
|
||||
}
|
||||
rItems.put(attr.getKey(), new StringByteIterator(attr.getValue().getS()));
|
||||
}
|
||||
return rItems;
|
||||
}
|
||||
|
||||
private Map<String, AttributeValue> createPrimaryKey(String key) {
|
||||
Map<String, AttributeValue> k = new HashMap<>();
|
||||
if (primaryKeyType == PrimaryKeyType.HASH) {
|
||||
k.put(primaryKeyName, new AttributeValue().withS(key));
|
||||
} else if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
|
||||
k.put(hashKeyName, new AttributeValue().withS(hashKeyValue));
|
||||
k.put(primaryKeyName, new AttributeValue().withS(key));
|
||||
} else {
|
||||
throw new RuntimeException("Assertion Error: impossible primary key type");
|
||||
}
|
||||
return k;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright 2015-2016 YCSB Contributors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
* may not use this file except in compliance with the License. You
|
||||
* may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License. See accompanying
|
||||
* LICENSE file.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The YCSB binding for <a href="https://aws.amazon.com/dynamodb/">DynamoDB</a>.
|
||||
*/
|
||||
package com.yahoo.ycsb.db;
|
||||
|
Загрузка…
Ссылка в новой задаче