[dynamodb] Checkstyle updates for DynamoDB.

This commit is contained in:
Robert J. Moore 2015-11-08 14:04:18 -05:00 коммит произвёл Kevin Risden
Родитель d329ed4cb5
Коммит a15b364361
3 изменённых файлов: 326 добавлений и 294 удалений

Просмотреть файл

@ -50,4 +50,28 @@ LICENSE file.
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.15</version>
<configuration>
<consoleOutput>true</consoleOutput>
<configLocation>../checkstyle.xml</configLocation>
<failOnViolation>true</failOnViolation>
<failsOnError>true</failsOnError>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>checkstyle</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,6 +1,6 @@
/*
* Copyright 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2015-2016 YCSB Contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
@ -22,21 +22,8 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.amazonaws.services.dynamodbv2.model.*;
import com.yahoo.ycsb.*;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@ -48,308 +35,307 @@ import java.util.Set;
import java.util.Vector;
/**
* DynamoDB v1.10.48 client for YCSB
* DynamoDB v1.10.48 client for YCSB.
*/
public class DynamoDBClient extends DB {
/**
* Defines the primary key type used in this particular DB instance.
*
* By default, the primary key type is "HASH". Optionally, the user can
* choose to use hash_and_range key type. See documentation in the
* DynamoDB.Properties file for more details.
*/
private enum PrimaryKeyType {
HASH,
HASH_AND_RANGE
/**
* Defines the primary key type used in this particular DB instance.
* <p>
* By default, the primary key type is "HASH". Optionally, the user can
* choose to use hash_and_range key type. See documentation in the
* DynamoDB.Properties file for more details.
*/
private enum PrimaryKeyType {
HASH,
HASH_AND_RANGE
}
private AmazonDynamoDBClient dynamoDB;
private String primaryKeyName;
private PrimaryKeyType primaryKeyType = PrimaryKeyType.HASH;
// If the user choose to use HASH_AND_RANGE as primary key type, then
// the following two variables become relevant. See documentation in the
// DynamoDB.Properties file for more details.
private String hashKeyValue;
private String hashKeyName;
private boolean consistentRead = false;
private String endpoint = "http://dynamodb.us-east-1.amazonaws.com";
private int maxConnects = 50;
private static final Logger LOGGER = Logger.getLogger(DynamoDBClient.class);
private static final Status CLIENT_ERROR = new Status("CLIENT_ERROR", "An error occurred on the client.");
private static final String DEFAULT_HASH_KEY_VALUE = "YCSB_0";
@Override
public void init() throws DBException {
String debug = getProperties().getProperty("dynamodb.debug", null);
if (null != debug && "true".equalsIgnoreCase(debug)) {
LOGGER.setLevel(Level.DEBUG);
}
private AmazonDynamoDBClient dynamoDB;
private String primaryKeyName;
private PrimaryKeyType primaryKeyType = PrimaryKeyType.HASH;
String configuredEndpoint = getProperties().getProperty("dynamodb.endpoint", null);
String credentialsFile = getProperties().getProperty("dynamodb.awsCredentialsFile", null);
String primaryKey = getProperties().getProperty("dynamodb.primaryKey", null);
String primaryKeyTypeString = getProperties().getProperty("dynamodb.primaryKeyType", null);
String consistentReads = getProperties().getProperty("dynamodb.consistentReads", null);
String connectMax = getProperties().getProperty("dynamodb.connectMax", null);
// If the user choose to use HASH_AND_RANGE as primary key type, then
// the following two variables become relevant. See documentation in the
// DynamoDB.Properties file for more details.
private String hashKeyValue;
private String hashKeyName;
private boolean debug = false;
private boolean consistentRead = false;
private String endpoint = "http://dynamodb.us-east-1.amazonaws.com";
private int maxConnects = 50;
private static Logger logger = Logger.getLogger(DynamoDBClient.class);
private static final Status CLIENT_ERROR = new Status("CLIENT_ERROR",
"An error occurred on the client.");
private static final String DEFAULT_HASH_KEY_VALUE = "YCSB_0";
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public void init() throws DBException {
// initialize DynamoDb driver & table.
String debug = getProperties().getProperty("dynamodb.debug", null);
if (null != debug && "true".equalsIgnoreCase(debug)) {
logger.setLevel(Level.DEBUG);
}
String endpoint = getProperties().getProperty("dynamodb.endpoint", null);
String credentialsFile = getProperties().getProperty("dynamodb.awsCredentialsFile", null);
String primaryKey = getProperties().getProperty("dynamodb.primaryKey", null);
String primaryKeyTypeString = getProperties().getProperty("dynamodb.primaryKeyType", null);
String consistentReads = getProperties().getProperty("dynamodb.consistentReads", null);
String connectMax = getProperties().getProperty("dynamodb.connectMax", null);
if (null != connectMax) {
this.maxConnects = Integer.parseInt(connectMax);
}
if (null != consistentReads && "true".equalsIgnoreCase(consistentReads)) {
this.consistentRead = true;
}
if (null != endpoint) {
this.endpoint = endpoint;
}
if (null == primaryKey || primaryKey.length() < 1) {
String errMsg = "Missing primary key attribute name, cannot continue";
logger.error(errMsg);
}
if (null != primaryKeyTypeString) {
try {
this.primaryKeyType = PrimaryKeyType.valueOf(
primaryKeyTypeString.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new DBException("Invalid primary key mode specified: " +
primaryKeyTypeString + ". Expecting HASH or HASH_AND_RANGE.");
}
}
if (this.primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
// When the primary key type is HASH_AND_RANGE, keys used by YCSB
// are range keys so we can benchmark performance of individual hash
// partitions. In this case, the user must specify the hash key's name
// and optionally can designate a value for the hash key.
String hashKeyName = getProperties().getProperty("dynamodb.hashKeyName", null);
if (null == hashKeyName || hashKeyName.isEmpty()) {
throw new DBException("Must specify a non-empty hash key name " +
"when the primary key type is HASH_AND_RANGE.");
}
this.hashKeyName = hashKeyName;
this.hashKeyValue = getProperties().getProperty(
"dynamodb.hashKeyValue", DEFAULT_HASH_KEY_VALUE);
}
try {
AWSCredentials credentials = new PropertiesCredentials(new File(credentialsFile));
ClientConfiguration cconfig = new ClientConfiguration();
cconfig.setMaxConnections(maxConnects);
dynamoDB = new AmazonDynamoDBClient(credentials, cconfig);
dynamoDB.setEndpoint(this.endpoint);
primaryKeyName = primaryKey;
logger.info("dynamodb connection created with " + this.endpoint);
} catch (Exception e1) {
String errMsg = "DynamoDBClient.init(): Could not initialize DynamoDB client: " + e1.getMessage();
logger.error(errMsg);
}
if (null != connectMax) {
this.maxConnects = Integer.parseInt(connectMax);
}
@Override
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
logger.debug("readkey: " + key + " from table: " + table);
GetItemRequest req = new GetItemRequest(table, createPrimaryKey(key));
req.setAttributesToGet(fields);
req.setConsistentRead(consistentRead);
GetItemResult res = null;
try {
res = dynamoDB.getItem(req);
}catch (AmazonServiceException ex) {
logger.error(ex.getMessage());
return Status.ERROR;
}catch (AmazonClientException ex){
logger.error(ex.getMessage());
return CLIENT_ERROR;
}
if (null != res.getItem()) {
result.putAll(extractResult(res.getItem()));
logger.debug("Result: " + res.toString());
}
return Status.OK;
if (null != consistentReads && "true".equalsIgnoreCase(consistentReads)) {
this.consistentRead = true;
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
logger.debug("scan " + recordcount + " records from key: " + startkey + " on table: " + table);
/*
* on DynamoDB's scan, startkey is *exclusive* so we need to
* getItem(startKey) and then use scan for the res
*/
GetItemRequest greq = new GetItemRequest(table, createPrimaryKey(startkey));
greq.setAttributesToGet(fields);
GetItemResult gres = null;
try {
gres = dynamoDB.getItem(greq);
}catch (AmazonServiceException ex) {
logger.error(ex.getMessage());
return Status.ERROR;
}catch (AmazonClientException ex){
logger.error(ex.getMessage());
return CLIENT_ERROR;
}
if (null != gres.getItem()) {
result.add(extractResult(gres.getItem()));
}
int count = 1; // startKey is done, rest to go.
Map<String, AttributeValue> startKey = createPrimaryKey(startkey);
ScanRequest req = new ScanRequest(table);
req.setAttributesToGet(fields);
while (count < recordcount) {
req.setExclusiveStartKey(startKey);
req.setLimit(recordcount - count);
ScanResult res = null;
try {
res = dynamoDB.scan(req);
}catch (AmazonServiceException ex) {
logger.error(ex.getMessage());
ex.printStackTrace();
return Status.ERROR;
}catch (AmazonClientException ex){
logger.error(ex.getMessage());
ex.printStackTrace();
return CLIENT_ERROR;
}
count += res.getCount();
for (Map<String, AttributeValue> items : res.getItems()) {
result.add(extractResult(items));
}
startKey = res.getLastEvaluatedKey();
}
return Status.OK;
if (null != configuredEndpoint) {
this.endpoint = configuredEndpoint;
}
@Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
logger.debug("updatekey: " + key + " from table: " + table);
Map<String, AttributeValueUpdate> attributes = new HashMap<String, AttributeValueUpdate>(
values.size());
for (Entry<String, ByteIterator> val : values.entrySet()) {
AttributeValue v = new AttributeValue(val.getValue().toString());
attributes.put(val.getKey(), new AttributeValueUpdate()
.withValue(v).withAction("PUT"));
}
UpdateItemRequest req = new UpdateItemRequest(table, createPrimaryKey(key), attributes);
try {
dynamoDB.updateItem(req);
}catch (AmazonServiceException ex) {
logger.error(ex.getMessage());
return Status.ERROR;
}catch (AmazonClientException ex){
logger.error(ex.getMessage());
return CLIENT_ERROR;
}
return Status.OK;
if (null == primaryKey || primaryKey.length() < 1) {
throw new DBException("Missing primary key attribute name, cannot continue");
}
@Override
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
logger.debug("insertkey: " + primaryKeyName + "-" + key + " from table: " + table);
Map<String, AttributeValue> attributes = createAttributes(values);
// adding primary key
attributes.put(primaryKeyName, new AttributeValue(key));
if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
// If the primary key type is HASH_AND_RANGE, then what has been put
// into the attributes map above is the range key part of the primary
// key, we still need to put in the hash key part here.
attributes.put(hashKeyName, new AttributeValue(hashKeyValue));
}
PutItemRequest putItemRequest = new PutItemRequest(table, attributes);
try {
dynamoDB.putItem(putItemRequest);
}catch (AmazonServiceException ex) {
logger.error(ex.getMessage());
return Status.ERROR;
}catch (AmazonClientException ex){
logger.error(ex.getMessage());
return CLIENT_ERROR;
}
return Status.OK;
if (null != primaryKeyTypeString) {
try {
this.primaryKeyType = PrimaryKeyType.valueOf(primaryKeyTypeString.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new DBException("Invalid primary key mode specified: " + primaryKeyTypeString +
". Expecting HASH or HASH_AND_RANGE.");
}
}
@Override
public Status delete(String table, String key) {
logger.debug("deletekey: " + key + " from table: " + table);
DeleteItemRequest req = new DeleteItemRequest(table, createPrimaryKey(key));
if (this.primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
// When the primary key type is HASH_AND_RANGE, keys used by YCSB
// are range keys so we can benchmark performance of individual hash
// partitions. In this case, the user must specify the hash key's name
// and optionally can designate a value for the hash key.
try {
dynamoDB.deleteItem(req);
}catch (AmazonServiceException ex) {
logger.error(ex.getMessage());
return Status.ERROR;
}catch (AmazonClientException ex){
logger.error(ex.getMessage());
return CLIENT_ERROR;
}
return Status.OK;
String configuredHashKeyName = getProperties().getProperty("dynamodb.hashKeyName", null);
if (null == configuredHashKeyName || configuredHashKeyName.isEmpty()) {
throw new DBException("Must specify a non-empty hash key name when the primary key type is HASH_AND_RANGE.");
}
this.hashKeyName = configuredHashKeyName;
this.hashKeyValue = getProperties().getProperty("dynamodb.hashKeyValue", DEFAULT_HASH_KEY_VALUE);
}
private static Map<String, AttributeValue> createAttributes(
HashMap<String, ByteIterator> values) {
Map<String, AttributeValue> attributes = new HashMap<String, AttributeValue>(
values.size() + 1); //leave space for the PrimaryKey
for (Entry<String, ByteIterator> val : values.entrySet()) {
attributes.put(val.getKey(), new AttributeValue(val.getValue()
.toString()));
}
return attributes;
try {
AWSCredentials credentials = new PropertiesCredentials(new File(credentialsFile));
ClientConfiguration cconfig = new ClientConfiguration();
cconfig.setMaxConnections(maxConnects);
dynamoDB = new AmazonDynamoDBClient(credentials, cconfig);
dynamoDB.setEndpoint(this.endpoint);
primaryKeyName = primaryKey;
LOGGER.info("dynamodb connection created with " + this.endpoint);
} catch (Exception e1) {
LOGGER.error("DynamoDBClient.init(): Could not initialize DynamoDB client.", e1);
}
}
@Override
public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("readkey: " + key + " from table: " + table);
}
private HashMap<String, ByteIterator> extractResult(Map<String, AttributeValue> item) {
if(null == item)
return null;
HashMap<String, ByteIterator> rItems = new HashMap<String, ByteIterator>(item.size());
GetItemRequest req = new GetItemRequest(table, createPrimaryKey(key));
req.setAttributesToGet(fields);
req.setConsistentRead(consistentRead);
GetItemResult res;
for (Entry<String, AttributeValue> attr : item.entrySet()) {
logger.debug(String.format("Result- key: %s, value: %s", attr.getKey(), attr.getValue()));
rItems.put(attr.getKey(), new StringByteIterator(attr.getValue().getS()));
}
return rItems;
try {
res = dynamoDB.getItem(req);
} catch (AmazonServiceException ex) {
LOGGER.error(ex);
return Status.ERROR;
} catch (AmazonClientException ex) {
LOGGER.error(ex);
return CLIENT_ERROR;
}
private Map<String, AttributeValue> createPrimaryKey(String key) {
Map<String, AttributeValue> k = new HashMap<String, AttributeValue>();
if (primaryKeyType == PrimaryKeyType.HASH) {
k.put(primaryKeyName, new AttributeValue().withS(key));
} else if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
k.put(hashKeyName, new AttributeValue().withS(hashKeyValue));
k.put(primaryKeyName, new AttributeValue().withS(key));
} else {
throw new RuntimeException("Assertion Error: impossible primary key"
+ " type");
}
return k;
if (null != res.getItem()) {
result.putAll(extractResult(res.getItem()));
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Result: " + res.toString());
}
}
return Status.OK;
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("scan " + recordcount + " records from key: " + startkey + " on table: " + table);
}
/*
* on DynamoDB's scan, startkey is *exclusive* so we need to
* getItem(startKey) and then use scan for the res
*/
GetItemRequest greq = new GetItemRequest(table, createPrimaryKey(startkey));
greq.setAttributesToGet(fields);
GetItemResult gres;
try {
gres = dynamoDB.getItem(greq);
} catch (AmazonServiceException ex) {
LOGGER.error(ex);
return Status.ERROR;
} catch (AmazonClientException ex) {
LOGGER.error(ex);
return CLIENT_ERROR;
}
if (null != gres.getItem()) {
result.add(extractResult(gres.getItem()));
}
int count = 1; // startKey is done, rest to go.
Map<String, AttributeValue> startKey = createPrimaryKey(startkey);
ScanRequest req = new ScanRequest(table);
req.setAttributesToGet(fields);
while (count < recordcount) {
req.setExclusiveStartKey(startKey);
req.setLimit(recordcount - count);
ScanResult res;
try {
res = dynamoDB.scan(req);
} catch (AmazonServiceException ex) {
LOGGER.error(ex);
return Status.ERROR;
} catch (AmazonClientException ex) {
LOGGER.error(ex);
return CLIENT_ERROR;
}
count += res.getCount();
for (Map<String, AttributeValue> items : res.getItems()) {
result.add(extractResult(items));
}
startKey = res.getLastEvaluatedKey();
}
return Status.OK;
}
@Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("updatekey: " + key + " from table: " + table);
}
Map<String, AttributeValueUpdate> attributes = new HashMap<>(values.size());
for (Entry<String, ByteIterator> val : values.entrySet()) {
AttributeValue v = new AttributeValue(val.getValue().toString());
attributes.put(val.getKey(), new AttributeValueUpdate().withValue(v).withAction("PUT"));
}
UpdateItemRequest req = new UpdateItemRequest(table, createPrimaryKey(key), attributes);
try {
dynamoDB.updateItem(req);
} catch (AmazonServiceException ex) {
LOGGER.error(ex);
return Status.ERROR;
} catch (AmazonClientException ex) {
LOGGER.error(ex);
return CLIENT_ERROR;
}
return Status.OK;
}
@Override
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("insertkey: " + primaryKeyName + "-" + key + " from table: " + table);
}
Map<String, AttributeValue> attributes = createAttributes(values);
// adding primary key
attributes.put(primaryKeyName, new AttributeValue(key));
if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
// If the primary key type is HASH_AND_RANGE, then what has been put
// into the attributes map above is the range key part of the primary
// key, we still need to put in the hash key part here.
attributes.put(hashKeyName, new AttributeValue(hashKeyValue));
}
PutItemRequest putItemRequest = new PutItemRequest(table, attributes);
try {
dynamoDB.putItem(putItemRequest);
} catch (AmazonServiceException ex) {
LOGGER.error(ex);
return Status.ERROR;
} catch (AmazonClientException ex) {
LOGGER.error(ex);
return CLIENT_ERROR;
}
return Status.OK;
}
@Override
public Status delete(String table, String key) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("deletekey: " + key + " from table: " + table);
}
DeleteItemRequest req = new DeleteItemRequest(table, createPrimaryKey(key));
try {
dynamoDB.deleteItem(req);
} catch (AmazonServiceException ex) {
LOGGER.error(ex);
return Status.ERROR;
} catch (AmazonClientException ex) {
LOGGER.error(ex);
return CLIENT_ERROR;
}
return Status.OK;
}
private static Map<String, AttributeValue> createAttributes(HashMap<String, ByteIterator> values) {
//leave space for the PrimaryKey
Map<String, AttributeValue> attributes = new HashMap<>(values.size() + 1);
for (Entry<String, ByteIterator> val : values.entrySet()) {
attributes.put(val.getKey(), new AttributeValue(val.getValue().toString()));
}
return attributes;
}
private HashMap<String, ByteIterator> extractResult(Map<String, AttributeValue> item) {
if (null == item) {
return null;
}
HashMap<String, ByteIterator> rItems = new HashMap<>(item.size());
for (Entry<String, AttributeValue> attr : item.entrySet()) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Result- key: %s, value: %s", attr.getKey(), attr.getValue()));
}
rItems.put(attr.getKey(), new StringByteIterator(attr.getValue().getS()));
}
return rItems;
}
private Map<String, AttributeValue> createPrimaryKey(String key) {
Map<String, AttributeValue> k = new HashMap<>();
if (primaryKeyType == PrimaryKeyType.HASH) {
k.put(primaryKeyName, new AttributeValue().withS(key));
} else if (primaryKeyType == PrimaryKeyType.HASH_AND_RANGE) {
k.put(hashKeyName, new AttributeValue().withS(hashKeyValue));
k.put(primaryKeyName, new AttributeValue().withS(key));
} else {
throw new RuntimeException("Assertion Error: impossible primary key type");
}
return k;
}
}

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Copyright 2015-2016 YCSB Contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* The YCSB binding for <a href="https://aws.amazon.com/dynamodb/">DynamoDB</a>.
*/
package com.yahoo.ycsb.db;