[riak] New YCSB binding for Riak.

The binding is based on the one created by Basho Technologies (available at https://github.com/basho-labs/YCSB/tree/master/riak).
[riak] Made compatible with the latest YCSB version.
[riak] Added strong consistency test support and made it default.
[riak] Added missing results return based on provided fields in scan/read functions.
[riak] Added TIME_OUT status return code for those transactions which take too long to complete.
[riak] Added flexible test parameter specification via customizable configuration file and command line options support.
[riak] Added test unit.
[riak] Removed unused Constants.java file.
[riak] Removed many unused function in the RiakUtils.java file.
[riak] Choosed to use executeAsync methods to resolve a bug that didn't allow the benchmark to end whenever one or more nodes of the cluster are killed.
[riak] Fixed license issues in all files.
[riak] Done some refinements to the code.
This commit is contained in:
nygard_89 2016-04-09 23:29:46 +02:00
Родитель ad34f6f227
Коммит 7492500497
8 изменённых файлов: 232 добавлений и 87 удалений

Просмотреть файл

@ -139,6 +139,11 @@ LICENSE file.
<artifactId>redis-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>riak-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>s3-binding</artifactId>

Просмотреть файл

@ -106,6 +106,7 @@ LICENSE file.
<module>cassandra</module>
<module>cassandra2</module>
<module>couchbase</module>
<module>distribution</module>
<module>dynamodb</module>
<module>elasticsearch</module>
<module>geode</module>
@ -127,7 +128,6 @@ LICENSE file.
<module>s3</module>
<module>solr</module>
<module>tarantool</module>
<module>distribution</module>
<!--<module>voldemort</module>-->
</modules>

Просмотреть файл

@ -1,3 +1,21 @@
<!--
Copyright (c) 2016 YCSB contributors. All rights reserved.
Copyright 2014 Basho Technologies, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
Riak KV Client for Yahoo! Cloud System Benchmark (YCSB)
--------------------------------------------------------
@ -8,27 +26,30 @@ Creating a <i>bucket type</i> to use with YCSB
Perform the following operations on your Riak cluster to configure it for the benchmarks.
Set the default backend for Riak to <i>LevelDB</i> in `riak.conf` (required to support <i>secondary indexes</i> used for the <b>scan</b> workloads):
Set the default backend for Riak to <i>LevelDB</i> in the `riak.conf` file of every node of your cluster. This is required to support <i>secondary indexes</i>, which are used for the `scan` transactions. You can do this by modifying the proper line as shown below.
```
storage_backend = leveldb
```
Create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Then if you want to use the
Create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster.
Then, if you want to use the <i>strong consistency model</i> (default), you have to follow the next two steps.
* <i>default consistency model</i> (i.e. eventual), run the following riak-admin commands:
1) In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It is important that you do this <b>before you start your cluster</b>!
2) Run the following riak-admin commands:
```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false","consistent":true}}'
riak-admin bucket-type activate ycsb
```
If instead you want to use the <i>eventual consistency model</i> implemented in Riak, then type:
```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}'
riak-admin bucket-type activate ycsb
```
* <i>strong consistency model</i>, type:
```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false","consistent":true}}'
riak-admin bucket-type activate ycsb
```
Note that you may want to specify the number of replicas to create for each object. To do so, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
Riak KV configuration parameters

Просмотреть файл

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2015-2016 YCSB contributors. All rights reserved.
Copyright (c) 2016 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
@ -47,6 +47,12 @@ LICENSE file.
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -1,27 +1,26 @@
/*
* Copyright 2016 nygard_89
/**
* Copyright (c) 2016 YCSB contributors All rights reserved.
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.basho.riak.client.api.commands.kv.UpdateValue;
import com.basho.riak.client.core.RiakFuture;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.*;
import java.io.IOException;
import java.io.InputStream;
@ -44,13 +43,12 @@ import com.basho.riak.client.core.query.RiakObject;
import com.basho.riak.client.core.query.indexes.LongIntIndex;
import com.basho.riak.client.core.util.BinaryValue;
import static com.yahoo.ycsb.db.RiakUtils.deserializeTable;
import static com.yahoo.ycsb.db.RiakUtils.getKeyAsLong;
import static com.yahoo.ycsb.db.RiakUtils.serializeTable;
/**
* @author nygard_89
* @author Basho Technologies, Inc.
* Riak KV 2.0.x client for YCSB framework.
*
*/
public final class RiakKVClient extends DB {
@ -71,8 +69,8 @@ public final class RiakKVClient extends DB {
private String[] hosts;
private int port;
private String bucketType;
private Quorum rQuorumValue;
private Quorum wQuorumValue;
private Quorum rvalue;
private Quorum wvalue;
private int readRetryCount;
private int waitTimeBeforeRetry;
private int transactionTimeLimit;
@ -95,8 +93,8 @@ public final class RiakKVClient extends DB {
hosts = propsPF.getProperty(HOST_PROPERTY).split(",");
port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY));
bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY);
rQuorumValue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY)));
wQuorumValue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY)));
rvalue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY)));
wvalue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY)));
readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY));
waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY));
transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY));
@ -124,14 +122,14 @@ public final class RiakKVClient extends DB {
bucketType = bucketTypeString;
}
String rQuorumValueString = props.getProperty(R_VALUE_PROPERTY);
if (rQuorumValueString != null) {
rQuorumValue = new Quorum(Integer.parseInt(rQuorumValueString));
String rValueString = props.getProperty(R_VALUE_PROPERTY);
if (rValueString != null) {
rvalue = new Quorum(Integer.parseInt(rValueString));
}
String wQuorumValueString = props.getProperty(W_VALUE_PROPERTY);
if (wQuorumValueString != null) {
wQuorumValue = new Quorum(Integer.parseInt(wQuorumValueString));
String wValueString = props.getProperty(W_VALUE_PROPERTY);
if (wValueString != null) {
wvalue = new Quorum(Integer.parseInt(wValueString));
}
String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY);
@ -164,17 +162,17 @@ public final class RiakKVClient extends DB {
loadProperties();
if (debug) {
System.out.println("DEBUG ENABLED. Configuration parameters:");
System.out.println("-----------------------------------------");
System.out.println("Hosts: " + Arrays.toString(hosts));
System.out.println("Port: " + port);
System.out.println("Bucket Type: " + bucketType);
System.out.println("R Quorum Value: " + rQuorumValue.toString());
System.out.println("W Quorum Value: " + wQuorumValue.toString());
System.out.println("Read Retry Count: " + readRetryCount);
System.out.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
System.out.println("Transaction Time Limit: " + transactionTimeLimit + " s");
System.out.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
System.err.println("DEBUG ENABLED. Configuration parameters:");
System.err.println("-----------------------------------------");
System.err.println("Hosts: " + Arrays.toString(hosts));
System.err.println("Port: " + port);
System.err.println("Bucket Type: " + bucketType);
System.err.println("R Val: " + rvalue.toString());
System.err.println("W Val: " + wvalue.toString());
System.err.println("Read Retry Count: " + readRetryCount);
System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s");
System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
}
RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
@ -202,10 +200,11 @@ public final class RiakKVClient extends DB {
@Override
public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
Location location = new Location(new Namespace(bucketType, table), key);
FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rQuorumValue).build();
FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rvalue).build();
FetchValue.Response response;
try {
FetchValue.Response response = fetch(fv);
response = fetch(fv);
if (response.isNotFound()) {
if (debug) {
@ -228,10 +227,11 @@ public final class RiakKVClient extends DB {
return Status.ERROR;
}
result.put(key, getFields(fields, response));
return Status.OK;
}
/**
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
* a HashMap.
@ -251,7 +251,7 @@ public final class RiakKVClient extends DB {
Namespace ns = new Namespace(bucketType, table);
IntIndexQuery iiq = new IntIndexQuery
.Builder(ns, "key", getKeyAsLong(startkey), 999999999999999999L)
.Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE)
.withMaxResults(recordcount)
.withPaginationSort(true)
.build();
@ -264,31 +264,37 @@ public final class RiakKVClient extends DB {
for (IntIndexQuery.Response.Entry entry : entries) {
Location location = entry.getRiakObjectLocation();
FetchValue fv = new FetchValue.Builder(location)
.withOption(FetchValue.Option.R, rQuorumValue)
.withOption(FetchValue.Option.R, rvalue)
.build();
FetchValue.Response keyResponse = fetch(fv);
if (keyResponse.isNotFound()) {
if (debug) {
System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: NOT " +
"FOUND");
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
HashMap<String, ByteIterator> partialResult = new HashMap<>();
partialResult.put(location.getKeyAsString(), getFields(fields, keyResponse));
result.add(partialResult);
}
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: TIME OUT");
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: " +
e.toString());
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: " + e.toString());
}
return Status.ERROR;
@ -333,6 +339,49 @@ public final class RiakKVClient extends DB {
return response;
}
/**
* Function that retrieves all the fields searched within a read or scan operation.
*
* @param fields The list of fields to read, or null for all of them
* @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return A ByteIterator containing all the values that correspond to the fields provided.
*/
private ByteIterator getFields(Set<String> fields, FetchValue.Response response) {
// If everything went fine, then a result must be given. Such an object is an hash table containing the (key,
// value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
ByteIterator valuesToPut;
// If only specific field are requested, then only these should be put in the result object!
if (fields != null) {
HashMap<String, ByteIterator> deserializedTable = new HashMap<>();
deserializeTable(responseFieldsAndValues, deserializedTable);
// Instantiate a new HashMap for returning only the requested fields.
HashMap<String, ByteIterator> returnMap = new HashMap<>();
// Build the return HashMap to provide as result.
for (Object field : fields.toArray()) {
// Comparison between a requested field and the ones retrieved: if they're equal, then proceed to store the
// couple in the returnMap.
ByteIterator value = deserializedTable.get(field);
if (value != null) {
returnMap.put((String) field, value);
}
}
// Finally, convert the returnMap to a byte array.
valuesToPut = new ByteArrayByteIterator(serializeTable(returnMap));
} else {
// If, instead, no field is specified, then all the ones retrieved must be provided as result.
valuesToPut = new ByteArrayByteIterator(responseFieldsAndValues);
}
// Results.
return valuesToPut;
}
/**
* Insert a record in the database. Any field/value pairs in the specified values HashMap
* will be written into the record with the specified record key. Also creates a
@ -354,7 +403,7 @@ public final class RiakKVClient extends DB {
StoreValue store = new StoreValue.Builder(object)
.withLocation(location)
.withOption(Option.W, wQuorumValue)
.withOption(Option.W, wvalue)
.build();
RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store);
@ -423,7 +472,7 @@ public final class RiakKVClient extends DB {
UpdateValue update = new UpdateValue.Builder(location)
.withFetchOption(FetchValue.Option.DELETED_VCLOCK, true)
.withStoreOption(Option.W, wQuorumValue)
.withStoreOption(Option.W, wvalue)
.withUpdate(new UpdateEntity(object))
.build();

Просмотреть файл

@ -1,32 +1,34 @@
/*
* Copyright 2016 nygard_89
/**
* Copyright (c) 2016 YCSB contributors All rights reserved.
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.util.Map;
import java.util.Set;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import static com.google.common.base.Preconditions.checkArgument;
/**
* @author nygard_89
* @author Basho Technologies, Inc.
* Utility class for Riak KV Client.
*
*/
final class RiakUtils {
@ -46,6 +48,12 @@ final class RiakUtils {
return aResult;
}
private static int fromBytes(final byte[] aByteArray) {
checkArgument(aByteArray.length == 4);
return (aByteArray[0] << 24) | (aByteArray[1] & 0xFF) << 16 | (aByteArray[2] & 0xFF) << 8 | (aByteArray[3] & 0xFF);
}
private static void close(final OutputStream anOutputStream) {
try {
anOutputStream.close();
@ -54,6 +62,14 @@ final class RiakUtils {
}
}
private static void close(final InputStream anInputStream) {
try {
anInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
static byte[] serializeTable(Map<String, ByteIterator> aTable) {
final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream();
final Set<Map.Entry<String, ByteIterator>> theEntries = aTable.entrySet();
@ -78,8 +94,36 @@ final class RiakUtils {
}
}
static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) {
final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue);
byte[] aSizeBuffer = new byte[4];
try {
while (anInputStream.available() > 0) {
anInputStream.read(aSizeBuffer);
final int aColumnNameLength = fromBytes(aSizeBuffer);
final byte[] aColumnNameBuffer = new byte[aColumnNameLength];
anInputStream.read(aColumnNameBuffer);
anInputStream.read(aSizeBuffer);
final int aColumnValueLength = fromBytes(aSizeBuffer);
final byte[] aColumnValue = new byte[aColumnValueLength];
anInputStream.read(aColumnValue);
theResult.put(new String(aColumnNameBuffer), new ByteArrayByteIterator(aColumnValue));
}
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
close(anInputStream);
}
}
static Long getKeyAsLong(String key) {
String keyString = key.replace("user", "").replaceFirst("^0*", "");
String keyString = key.replaceFirst("[a-zA-Z]*", "");
return Long.parseLong(keyString);
}
}

Просмотреть файл

@ -1,21 +1,23 @@
/*
/**
* Copyright (c) 2016 YCSB contributors All rights reserved.
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* The YCSB binding for <a href="http://basho.com/products/riak-kv/">Riak KV</a>
* 2.0+.
* 2.0.x.
*/
package com.yahoo.ycsb.db;

Просмотреть файл

@ -1,4 +1,22 @@
# RiakDBClient - Default Properties
##
# Copyright (c) 2016 YCSB contributors All rights reserved.
# Copyright 2014 Basho Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
#
# RiakKVClient - Default Properties
# Note: Change the properties below to set the values to use for your test. You can set them either here or from the
# command line. Note that the latter choice overrides these settings.
@ -32,7 +50,7 @@ riak.wait_time_before_retry=200
riak.transaction_time_limit=10
# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
riak.strong_consistency=false
riak.strong_consistency=true
# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
# is started.