Added new binding for Riak, based on the one created by Basho Technologies (available at https://github.com/basho-labs/YCSB/tree/master/riak).

Change made to the original version:

- Made compatible with the latest YCSB version.
- Removed unused Constants.java file.
- Added strong consistency test support.
- Done some refinements to the code.
- Removed many unused function in the RiakUtils.java file.
- Choosed to use executeAsync methods to resolve a bug that didn't allow the benchmark to end whenever one or more nodes of the cluster are killed.
- Added TIME_OUT status return code.

To do:
- Add different LOG errors (i.e. insufficient_vnodes_available, ring_not_ready, etc.) - for references, check page http://docs.basho.com/riak/latest/ops/running/recovery/errors/#Errors-and-Messages
- ...
This commit is contained in:
nygard_89 2016-04-06 23:36:46 +02:00
Родитель d4e3f60c29
Коммит ba72c9a342
8 изменённых файлов: 745 добавлений и 1 удалений

Просмотреть файл

@ -77,6 +77,7 @@ DATABASES = {
"nosqldb" : "com.yahoo.ycsb.db.NoSqlDbClient",
"orientdb" : "com.yahoo.ycsb.db.OrientDBClient",
"redis" : "com.yahoo.ycsb.db.RedisClient",
"riak" : "com.yahoo.ycsb.db.RiakKVClient",
"s3" : "com.yahoo.ycsb.db.S3Client",
"solr" : "com.yahoo.ycsb.db.SolrClient",
"tarantool" : "com.yahoo.ycsb.db.TarantoolClient",

Просмотреть файл

@ -91,6 +91,7 @@ LICENSE file.
<hypertable.version>0.9.5.6</hypertable.version>
<couchbase.version>1.4.10</couchbase.version>
<tarantool.version>1.6.5</tarantool.version>
<riak.version>2.0.5</riak.version>
<aerospike.version>3.1.2</aerospike.version>
<solr.version>5.4.0</solr.version>
</properties>
@ -105,7 +106,6 @@ LICENSE file.
<module>cassandra</module>
<module>cassandra2</module>
<module>couchbase</module>
<module>distribution</module>
<module>dynamodb</module>
<module>elasticsearch</module>
<module>geode</module>
@ -123,9 +123,11 @@ LICENSE file.
<module>nosqldb</module>
<module>orientdb</module>
<module>redis</module>
<module>riak</module>
<module>s3</module>
<module>solr</module>
<module>tarantool</module>
<module>distribution</module>
<!--<module>voldemort</module>-->
</modules>

51
riak/README.md Normal file
Просмотреть файл

@ -0,0 +1,51 @@
Riak KV Client for Yahoo! Cloud System Benchmark (YCSB)
--------------------------------------------------------
The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.0.X line of the Riak KV database.
Creating a <i>bucket type</i> to use with YCSB
----------------------------
Perform the following operations on your Riak cluster to configure it for the benchmarks.
Set the default backend for Riak to <i>LevelDB</i> in `riak.conf` (required to support <i>secondary indexes</i> used for the <b>scan</b> workloads):
```
storage_backend = leveldb
```
Create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Then if you want to use the
* <i>default consistency model</i> (i.e. eventual), run the following riak-admin commands:
```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}'
riak-admin bucket-type activate ycsb
```
* <i>strong consistency model</i>, type:
```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false","consistent":true}}'
riak-admin bucket-type activate ycsb
```
Note that you may want to specify the number of replicas to create for each object. To do so, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
Riak KV configuration parameters
----------------------------
You can either specify these configuration parameters via command line or set them in the `riak.properties` file.
* `riak.hosts` - <b>string list</b>, comma separated list of IPs or FQDNs. <newline>Example: `riak.hosts=127.0.0.1,127.0.0.2,127.0.0.3` or `riak.hosts=riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com`.
* `riak.port` - <b>int</b>, the port on which every node is listening. It must match the one specified in the `riak.conf` file at the line `listener.protobuf.internal`.
* `riak.bucket_type` - <b>string</b>, it must match the value of the bucket type created during setup (see section above).
* `riak.r_val` - <b>int</b>, the R value represents the number of Riak nodes that must return results for a read before the read is considered successful.
* `riak.w_val` - <b>int</b>, the W value represents the number of Riak nodes that must report success before an update is considered complete.
* `riak.read_retry_count` - <b>int</b>, the number of times the client will try to read a key from Riak.
* `riak.wait_time_before_retry` - <b>int</b>, the time (in milliseconds) before client attempts to perform another read if the previous one failed.
* `riak.transaction_time_limit` - <b>int</b>, the time (in seconds) the client waits before aborting the current transaction.
* `riak.strong_consistency` - <b>boolean</b>, indicates whether to use strong consistency (true) or eventual consistency (false).
* `riak.debug` - <b>boolean</b>, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur.
<b>Note</b>: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload
<b id="f1">1</b> As specified in the `riak.properties` file. See parameters configuration section for further info. [](#a1)

52
riak/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2015-2016 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>riak-binding</artifactId>
<name>Riak KV Binding</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.basho.riak</groupId>
<artifactId>riak-client</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,493 @@
/*
* Copyright 2016 nygard_89
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.ycsb.db;
import com.basho.riak.client.api.commands.kv.UpdateValue;
import com.basho.riak.client.core.RiakFuture;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.ByteIterator;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.indexes.IntIndexQuery;
import com.basho.riak.client.api.commands.kv.DeleteValue;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.kv.StoreValue.Option;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.RiakObject;
import com.basho.riak.client.core.query.indexes.LongIntIndex;
import com.basho.riak.client.core.util.BinaryValue;
import static com.yahoo.ycsb.db.RiakUtils.getKeyAsLong;
import static com.yahoo.ycsb.db.RiakUtils.serializeTable;
/**
* @author nygard_89
* @author Basho Technologies, Inc.
*
*/
public final class RiakKVClient extends DB {
private static final String HOST_PROPERTY = "riak.hosts";
private static final String PORT_PROPERTY = "riak.port";
private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type";
private static final String R_VALUE_PROPERTY = "riak.r_val";
private static final String W_VALUE_PROPERTY = "riak.w_val";
private static final String READ_RETRY_COUNT_PROPERTY = "riak.read_retry_count";
private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry";
private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit";
private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency";
private static final String DEBUG_PROPERTY = "riak.debug";
private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time " +
"for transaction indicated");
private String[] hosts;
private int port;
private String bucketType;
private Quorum rQuorumValue;
private Quorum wQuorumValue;
private int readRetryCount;
private int waitTimeBeforeRetry;
private int transactionTimeLimit;
private boolean strongConsistency;
private boolean debug;
private RiakClient riakClient;
private RiakCluster riakCluster;
private void loadDefaultProperties() {
InputStream propFile = RiakKVClient.class.getClassLoader().getResourceAsStream("riak.properties");
Properties propsPF = new Properties(System.getProperties());
try {
propsPF.load(propFile);
} catch (IOException e) {
e.printStackTrace();
}
hosts = propsPF.getProperty(HOST_PROPERTY).split(",");
port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY));
bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY);
rQuorumValue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY)));
wQuorumValue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY)));
readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY));
waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY));
transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY));
strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY));
debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY));
}
private void loadProperties() {
loadDefaultProperties();
Properties props = getProperties();
String portString = props.getProperty(PORT_PROPERTY);
if (portString != null) {
port = Integer.parseInt(portString);
}
String hostsString = props.getProperty(HOST_PROPERTY);
if (hostsString != null) {
hosts = hostsString.split(",");
}
String bucketTypeString = props.getProperty(BUCKET_TYPE_PROPERTY);
if (bucketTypeString != null) {
bucketType = bucketTypeString;
}
String rQuorumValueString = props.getProperty(R_VALUE_PROPERTY);
if (rQuorumValueString != null) {
rQuorumValue = new Quorum(Integer.parseInt(rQuorumValueString));
}
String wQuorumValueString = props.getProperty(W_VALUE_PROPERTY);
if (wQuorumValueString != null) {
wQuorumValue = new Quorum(Integer.parseInt(wQuorumValueString));
}
String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY);
if (readRetryCountString != null) {
readRetryCount = Integer.parseInt(readRetryCountString);
}
String waitTimeBeforeRetryString = props.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY);
if (waitTimeBeforeRetryString != null) {
waitTimeBeforeRetry = Integer.parseInt(waitTimeBeforeRetryString);
}
String transactionTimeLimitString = props.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY);
if (transactionTimeLimitString != null) {
transactionTimeLimit = Integer.parseInt(transactionTimeLimitString);
}
String strongConsistencyString = props.getProperty(STRONG_CONSISTENCY_PROPERTY);
if (strongConsistencyString != null) {
strongConsistency = Boolean.parseBoolean(strongConsistencyString);
}
String debugString = props.getProperty(DEBUG_PROPERTY);
if (debugString != null) {
debug = Boolean.parseBoolean(debugString);
}
}
public void init() throws DBException {
loadProperties();
if (debug) {
System.out.println("DEBUG ENABLED. Configuration parameters:");
System.out.println("-----------------------------------------");
System.out.println("Hosts: " + Arrays.toString(hosts));
System.out.println("Port: " + port);
System.out.println("Bucket Type: " + bucketType);
System.out.println("R Quorum Value: " + rQuorumValue.toString());
System.out.println("W Quorum Value: " + wQuorumValue.toString());
System.out.println("Read Retry Count: " + readRetryCount);
System.out.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
System.out.println("Transaction Time Limit: " + transactionTimeLimit + " s");
System.out.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
}
RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts));
riakCluster = new RiakCluster.Builder(nodes).build();
try {
riakCluster.start();
riakClient = new RiakClient(riakCluster);
} catch (Exception e) {
System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
throw new DBException(e);
}
}
/**
* Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
Location location = new Location(new Namespace(bucketType, table), key);
FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rQuorumValue).build();
try {
FetchValue.Response response = fetch(fv);
if (response.isNotFound()) {
if (debug) {
System.err.println("Unable to read key " + key + ". Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to read key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to read key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
* a HashMap.
* <p>
* Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
*
* @param table The name of the table (Riak bucket)
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
Namespace ns = new Namespace(bucketType, table);
IntIndexQuery iiq = new IntIndexQuery
.Builder(ns, "key", getKeyAsLong(startkey), 999999999999999999L)
.withMaxResults(recordcount)
.withPaginationSort(true)
.build();
RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq);
try {
IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
List<IntIndexQuery.Response.Entry> entries = response.getEntries();
for (IntIndexQuery.Response.Entry entry : entries) {
Location location = entry.getRiakObjectLocation();
FetchValue fv = new FetchValue.Builder(location)
.withOption(FetchValue.Option.R, rQuorumValue)
.build();
FetchValue.Response keyResponse = fetch(fv);
if (keyResponse.isNotFound()) {
if (debug) {
System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: NOT " +
"FOUND");
}
return Status.NOT_FOUND;
}
}
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: " +
e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated,
* even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the
* cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after
* transactionTimeLimit, the transaction is discarded immediately.
*
* @param fv The value to fetch from the cluster.
*/
private FetchValue.Response fetch(FetchValue fv) throws TimeoutException {
FetchValue.Response response = null;
for (int i = 0; i < readRetryCount; i++) {
RiakFuture<FetchValue.Response, Location> future = riakClient.executeAsync(fv);
try {
response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
if (!response.isNotFound()) {
break;
}
} catch (TimeoutException e) {
// Let the callee decide how to handle this exception...
throw new TimeoutException();
} catch (Exception e) {
// Sleep for a few ms before retrying...
try {
Thread.sleep(waitTimeBeforeRetry);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
return response;
}
/**
* Insert a record in the database. Any field/value pairs in the specified values HashMap
* will be written into the record with the specified record key. Also creates a
* secondary index (2i) for each record consisting of the key converted to long to be used
* for the scan operation
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
StoreValue store = new StoreValue.Builder(object)
.withLocation(location)
.withOption(Option.W, wQuorumValue)
.build();
RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store);
try {
future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Define a class to permit object substitution within the update operation, following the same, identical steps
* made in an insert operation. This is needed for strong-consistent updates.
*/
private static final class UpdateEntity extends UpdateValue.Update<RiakObject> {
private final RiakObject object;
private UpdateEntity(RiakObject e) {
this.object = e;
}
/* Simply returns the object.
*/
@Override
public RiakObject apply(RiakObject original) {
return object;
}
}
/**
* Update a record in the database. Any field/value pairs in the specified values
* HashMap will be written into the record with the specified
* record key, overwriting any existing values with the same field name.
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
if (!strongConsistency) {
return insert(table, key, values);
}
Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
UpdateValue update = new UpdateValue.Builder(location)
.withFetchOption(FetchValue.Option.DELETED_VCLOCK, true)
.withStoreOption(Option.W, wQuorumValue)
.withUpdate(new UpdateEntity(object))
.build();
RiakFuture<UpdateValue.Response, Location> future = riakClient.executeAsync(update);
try {
future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to update key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to update key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Delete a record from the database.
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status delete(String table, String key) {
Location location = new Location(new Namespace(bucketType, table), key);
DeleteValue dv = new DeleteValue.Builder(location).build();
RiakFuture<Void, Location> future = riakClient.executeAsync(dv);
try {
future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to delete key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to delete key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
public void cleanup() throws DBException {
try {
riakCluster.shutdown();
} catch (Exception e) {
System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString());
throw new DBException(e);
}
}
}

Просмотреть файл

@ -0,0 +1,85 @@
/*
* Copyright 2016 nygard_89
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.ycsb.db;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
import com.yahoo.ycsb.ByteIterator;
/**
* @author nygard_89
* @author Basho Technologies, Inc.
*
*/
final class RiakUtils {
private RiakUtils() {
super();
}
private static byte[] toBytes(final int anInteger) {
byte[] aResult = new byte[4];
aResult[0] = (byte) (anInteger >> 24);
aResult[1] = (byte) (anInteger >> 16);
aResult[2] = (byte) (anInteger >> 8);
aResult[3] = (byte) (anInteger /* >> 0 */);
return aResult;
}
private static void close(final OutputStream anOutputStream) {
try {
anOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
static byte[] serializeTable(Map<String, ByteIterator> aTable) {
final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream();
final Set<Map.Entry<String, ByteIterator>> theEntries = aTable.entrySet();
try {
for (final Map.Entry<String, ByteIterator> anEntry : theEntries) {
final byte[] aColumnName = anEntry.getKey().getBytes();
anOutputStream.write(toBytes(aColumnName.length));
anOutputStream.write(aColumnName);
final byte[] aColumnValue = anEntry.getValue().toArray();
anOutputStream.write(toBytes(aColumnValue.length));
anOutputStream.write(aColumnValue);
}
return anOutputStream.toByteArray();
} catch (IOException e) {
throw new IllegalStateException(e);
} finally {
close(anOutputStream);
}
}
static Long getKeyAsLong(String key) {
String keyString = key.replace("user", "").replaceFirst("^0*", "");
return Long.parseLong(keyString);
}
}

Просмотреть файл

@ -0,0 +1,21 @@
/*
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* The YCSB binding for <a href="http://basho.com/products/riak-kv/">Riak KV</a>
* 2.0+.
*/
package com.yahoo.ycsb.db;

Просмотреть файл

@ -0,0 +1,39 @@
# RiakDBClient - Default Properties
# Note: Change the properties below to set the values to use for your test. You can set them either here or from the
# command line. Note that the latter choice overrides these settings.
# riak.hosts - string list, comma separated list of IPs or FQDNs.
# EX: 127.0.0.1,127.0.0.2,127.0.0.3 or riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com
riak.hosts=127.0.0.1
# riak.port - int, the port on which every node is listening. It must match the one specified in the riak.conf file
# at the line "listener.protobuf.internal".
riak.port=8087
# riak.bucket_type - string, must match value of bucket type created during setup. See readme.md for more information
riak.bucket_type=ycsb
# riak.r_val - int, the R value represents the number of Riak nodes that must return results for a read before the read
# is considered successful.
riak.r_val=2
# riak.w_val - int, the W value represents the number of Riak nodes that must report success before an update is
# considered complete.
riak.w_val=2
# riak.read_retry_count - int, number of times the client will try to read a key from Riak.
riak.read_retry_count=5
# riak.wait_time_before_retry - int, time (in milliseconds) the client waits before attempting to perform another
# read if the previous one failed.
riak.wait_time_before_retry=200
# riak.transaction_time_limit - int, time (in seconds) the client waits before aborting the current transaction.
riak.transaction_time_limit=10
# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
riak.strong_consistency=false
# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
# is started.
riak.debug=false