Merge pull request #723 from nygard89/master

[riak] Added a workaround to allow strong-consistent scan transactions.
This commit is contained in:
Sean Busbey 2016-05-03 09:35:46 -05:00
Родитель 6834e6bcd9 f593bad58f
Коммит a372340319
4 изменённых файлов: 186 добавлений и 93 удалений

Просмотреть файл

@ -21,7 +21,7 @@ Riak KV Client for Yahoo! Cloud System Benchmark (YCSB)
The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.x.y line of the Riak KV database.
Creating a <i>bucket type</i> to use with YCSB
Creating a <i>bucket-type</i> to use with YCSB
----------------------------
Perform the following operations on your Riak cluster to configure it for the benchmarks.
@ -31,8 +31,11 @@ Set the default backend for Riak to <i>LevelDB</i> in the `riak.conf` file of ev
```
storage_backend = leveldb
```
After this, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Now you're ready to set up the cluster to operate using one between strong and eventual consistency model as shown in the next two subsections.
Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Then, to use the <i>strong consistency model</i><sup id="a2">[2](#f2)</sup> (default), you need to follow the next two steps.
###Strong consistency model
To use the <i>strong consistency model</i> (default), you need to follow the next two steps.
1. In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It's important that you do this <b>before you start your cluster</b>!
2. Run the following `riak-admin` commands:
@ -42,9 +45,24 @@ Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging int
riak-admin bucket-type activate ycsb
```
Note that when using the strong consistency model, you **may have to specify the number of replicas to create for each object**. The *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
When using this model, you **may want to specify the number of replicas to create for each object**<sup id="a2">[2](#f2)</sup>: the *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
If instead you want to use the <i>eventual consistency model</i> implemented in Riak, then type:
####A note on the scan transactions
Currently, `scan` transactions are not _directly_ supported, as there is no suitable mean to perform them properly. This will not cause the benchmark to fail, it simply won't perform any scan transaction at all (these will immediately return with a `Status.NOT_IMPLEMENTED` code).
However, a possible workaround has been provided: considering that Riak doesn't allow strong-consistent bucket-types to use secondary indexes, we can create an eventually consistent one just to store (*key*, *2i indexes*) pairs. This will be later used only to obtain the keys where the objects are located, which will be then used to retrieve the actual objects from the strong-consistent bucket. If you want to use this workaround, then you have to create and activate a "_fake bucket-type_" using the following commands:
```
riak-admin bucket-type create fakeBucketType '{"props":{"allow_mult":"false","n_val":1,"dvv_enabled":false,"last_write_wins":true}}'
riak-admin bucket-type activate fakeBucketType
```
A bucket-type so defined isn't allowed to _create siblings_ (`allow_mult":"false"`), it'll have just _one replica_ (`"n_val":1`) which'll store the _last value provided_ (`"last_write_wins":true`) and _vector clocks_ will be used instead of _dotted version vectors_ (`"dvv_enabled":false`). Note that setting `"n_val":1` means that the `scan` transactions won't be much *fault-tolerant*, considering that if a node fails then a lot of them could potentially fail. You may indeed increase this value, but this choice will necessarily load the cluster with more work. So, the choice is yours to make!
Then you have to set the `riak.strong_consistent_scans_bucket_type` property (see next section) equal to the name you gave to the aforementioned "fake bucket-type" (e.g. `fakeBucketType` in this case).
Please note that this workaround involves a **double store operation for each insert transaction**, one to store the actual object and another one to save the corresponding 2i index. In practice, the client won't notice any difference, as the latter operation is performed asynchronously. However, the cluster will be obviously loaded more, and this is why the proposed "fake bucket-type" to create is as less _resource-demanding_ as possible.
###Eventual consistency model
If you want to use the <i>eventual consistency model</i> implemented in Riak, you have just to type:
```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}'
riak-admin bucket-type activate ycsb
@ -63,10 +81,12 @@ You can either specify these configuration parameters via command line or set th
* `riak.wait_time_before_retry` - <b>int</b>, the time (in milliseconds) before the client attempts to perform another read if the previous one failed.
* `riak.transaction_time_limit` - <b>int</b>, the time (in seconds) the client waits before aborting the current transaction.
* `riak.strong_consistency` - <b>boolean</b>, indicates whether to use *strong consistency* (true) or *eventual consistency* (false).
* `riak.strong_consistent_scans_bucket_type` - **string**, indicates the bucket-type to use to allow scans transactions when using strong consistency mode.
* `riak.debug` - <b>boolean</b>, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur.
<b>Note</b>: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload
<b id="f1">1</b> As specified in the `riak.properties` file. See parameters configuration section for further info. [](#a1)
<b id="f2">2</b> <b>IMPORTANT NOTE:</b> Currently the `scan` transactions are <b>NOT SUPPORTED</b> for the benchmarks which use the strong consistency model! However this will not cause the benchmark to fail, it simply won't perform any scan transaction at all. These latter will immediately return with a `Status.NOT_IMPLEMENTED` code. [](#a2)
<b id="f2">2</b> More info about properly setting up a fault-tolerant cluster can be found at http://docs.basho.com/riak/kv/2.1.4/configuring/strong-consistency/#enabling-strong-consistency.[↩](#a2)

Просмотреть файл

@ -19,8 +19,12 @@
package com.yahoo.ycsb.db.riak;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.kv.UpdateValue;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.query.RiakObject;
import com.basho.riak.client.core.query.indexes.LongIntIndex;
import com.basho.riak.client.core.util.BinaryValue;
import com.yahoo.ycsb.*;
import java.io.IOException;
@ -34,17 +38,12 @@ import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.indexes.IntIndexQuery;
import com.basho.riak.client.api.commands.kv.DeleteValue;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.kv.StoreValue.Option;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.RiakObject;
import com.basho.riak.client.core.query.indexes.LongIntIndex;
import com.basho.riak.client.core.util.BinaryValue;
import static com.yahoo.ycsb.db.riak.RiakUtils.deserializeTable;
import static com.yahoo.ycsb.db.riak.RiakUtils.createResultHashMap;
import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong;
import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable;
@ -52,7 +51,7 @@ import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable;
* Riak KV 2.x.y client for YCSB framework.
*
*/
public final class RiakKVClient extends DB {
public class RiakKVClient extends DB {
private static final String HOST_PROPERTY = "riak.hosts";
private static final String PORT_PROPERTY = "riak.port";
private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type";
@ -62,6 +61,7 @@ public final class RiakKVClient extends DB {
private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry";
private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit";
private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency";
private static final String STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY = "riak.strong_consistent_scans_bucket_type";
private static final String DEBUG_PROPERTY = "riak.debug";
private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time.");
@ -69,12 +69,15 @@ public final class RiakKVClient extends DB {
private String[] hosts;
private int port;
private String bucketType;
private String bucketType2i;
private Quorum rvalue;
private Quorum wvalue;
private int readRetryCount;
private int waitTimeBeforeRetry;
private int transactionTimeLimit;
private boolean strongConsistency;
private String strongConsistentScansBucketType;
private boolean performStrongConsistentScans;
private boolean debug;
private RiakClient riakClient;
@ -99,12 +102,15 @@ public final class RiakKVClient extends DB {
waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY));
transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY));
strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY));
strongConsistentScansBucketType = propsPF.getProperty(STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY);
debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY));
}
private void loadProperties() {
// First, load the default properties...
loadDefaultProperties();
// ...then, check for some props set at command line!
Properties props = getProperties();
String portString = props.getProperty(PORT_PROPERTY);
@ -152,6 +158,11 @@ public final class RiakKVClient extends DB {
strongConsistency = Boolean.parseBoolean(strongConsistencyString);
}
String strongConsistentScansBucketTypeString = props.getProperty(STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY);
if (strongConsistentScansBucketTypeString != null) {
strongConsistentScansBucketType = strongConsistentScansBucketTypeString;
}
String debugString = props.getProperty(DEBUG_PROPERTY);
if (debugString != null) {
debug = Boolean.parseBoolean(debugString);
@ -161,6 +172,32 @@ public final class RiakKVClient extends DB {
public void init() throws DBException {
loadProperties();
RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts));
riakCluster = new RiakCluster.Builder(nodes).build();
try {
riakCluster.start();
riakClient = new RiakClient(riakCluster);
} catch (Exception e) {
System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
throw new DBException(e);
}
// If strong consistency is in use, we need to change the bucket-type where the 2i indexes will be stored.
if (strongConsistency && !strongConsistentScansBucketType.isEmpty()) {
// The 2i indexes have to be stored in the appositely created strongConsistentScansBucketType: this however has
// to be done only if the user actually created it! So, if the latter doesn't exist, then the scan transactions
// will not be performed at all.
bucketType2i = strongConsistentScansBucketType;
performStrongConsistentScans = true;
} else {
// If instead eventual consistency is in use, then the 2i indexes have to be stored in the bucket-type
// indicated with the bucketType variable.
bucketType2i = bucketType;
performStrongConsistentScans = false;
}
if (debug) {
System.err.println("DEBUG ENABLED. Configuration parameters:");
System.err.println("-----------------------------------------");
@ -173,18 +210,11 @@ public final class RiakKVClient extends DB {
System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s");
System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
}
RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts));
riakCluster = new RiakCluster.Builder(nodes).build();
try {
riakCluster.start();
riakClient = new RiakClient(riakCluster);
} catch (Exception e) {
System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
throw new DBException(e);
if (strongConsistency) {
System.err.println("Strong Consistent Scan Transactions " + (performStrongConsistentScans ? "" : "NOT ") +
"allowed.");
}
}
}
@ -237,8 +267,6 @@ public final class RiakKVClient extends DB {
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
* a HashMap.
* Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
* IMPORTANT NOTE: the 2i queries DO NOT WORK in conjunction with strong consistency (ref: http://docs.basho
* .com/riak/kv/2.1.4/developing/usage/secondary-indexes/)!
*
* @param table The name of the table (Riak bucket)
* @param startkey The record key of the first record to read.
@ -250,29 +278,44 @@ public final class RiakKVClient extends DB {
@Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
// As of 2.1.4 Riak KV version, strong consistency does not support any suitable mean capable of searching
// consecutive stored keys, as requested by a scan transaction. So, the latter WILL NOT BE PERFORMED AT ALL!
// More info at http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/strong-consistency/
if (strongConsistency) {
if (strongConsistency && !performStrongConsistentScans) {
return Status.NOT_IMPLEMENTED;
}
Namespace ns = new Namespace(bucketType, table);
// The strong consistent bucket-type is not capable of storing 2i indexes. So, we need to read them from the fake
// one (which we use only to store indexes). This is why, when using such a consistency model, the bucketType2i
// variable is set to FAKE_BUCKET_TYPE.
IntIndexQuery iiq = new IntIndexQuery
.Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE)
.Builder(new Namespace(bucketType2i, table), "key", getKeyAsLong(startkey), Long.MAX_VALUE)
.withMaxResults(recordcount)
.withPaginationSort(true)
.build();
Location location;
RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq);
try {
IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
List<IntIndexQuery.Response.Entry> entries = response.getEntries();
// If no entries were retrieved, then something bad happened...
if (entries.size() == 0) {
if (debug) {
System.err.println("Unable to scan any record starting from key " + startkey + ", aborting transaction. " +
"Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
for (IntIndexQuery.Response.Entry entry : entries) {
Location location = entry.getRiakObjectLocation();
// If strong consistency is in use, then the actual location of the object we want to read is obtained by
// fetching the key from the one retrieved with the 2i indexes search operation.
if (strongConsistency) {
location = new Location(new Namespace(bucketType, table), entry.getRiakObjectLocation().getKeyAsString());
} else {
location = entry.getRiakObjectLocation();
}
FetchValue fv = new FetchValue.Builder(location)
.withOption(FetchValue.Option.R, rvalue)
@ -282,21 +325,22 @@ public final class RiakKVClient extends DB {
if (keyResponse.isNotFound()) {
if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: NOT FOUND");
System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
"transaction. Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
// Create the partial result to add to the result vector.
HashMap<String, ByteIterator> partialResult = new HashMap<>();
createResultHashMap(fields, keyResponse, partialResult);
result.add(partialResult);
}
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: TIME OUT");
System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
"transaction. Reason: TIME OUT");
}
return TIME_OUT;
@ -348,45 +392,6 @@ public final class RiakKVClient extends DB {
return response;
}
/**
* Function that retrieves all the fields searched within a read or scan operation and puts them in the result
* HashMap.
*
* @param fields The list of fields to read, or null for all of them.
* @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record.
* @param resultHashMap The HashMap to return as result.
*/
private void createResultHashMap(Set<String> fields, FetchValue.Response response, HashMap<String, ByteIterator>
resultHashMap) {
// If everything went fine, then a result must be given. Such an object is a hash table containing the (field,
// value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
// The following line retrieves the previously serialized table which was store with an insert transaction.
byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
// Deserialize the stored response table.
HashMap<String, ByteIterator> deserializedTable = new HashMap<>();
deserializeTable(responseFieldsAndValues, deserializedTable);
// If only specific fields are requested, then only these should be put in the result object!
if (fields != null) {
// Populate the HashMap to provide as result.
for (Object field : fields.toArray()) {
// Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation
// DOES NOT return a null value), then proceed to store the pair in the resultHashMap.
ByteIterator value = deserializedTable.get(field);
if (value != null) {
resultHashMap.put((String) field, value);
}
}
} else {
// If, instead, no field is specified, then all the ones retrieved must be provided as result.
for (String field : deserializedTable.keySet()) {
resultHashMap.put(field, deserializedTable.get(field));
}
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the
* record with the specified record key. Also creates a secondary index (2i) for each record consisting of the key
@ -402,12 +407,40 @@ public final class RiakKVClient extends DB {
Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
// Strong consistency doesn't support secondary indexing, but eventually consistent model does. So, we can mock a
// 2i usage by creating a fake object stored in an eventually consistent bucket-type with the SAME KEY THAT THE
// ACTUAL OBJECT HAS. This latter is obviously stored in the strong consistent bucket-type indicated with the
// riak.bucket_type property.
if (strongConsistency && performStrongConsistentScans) {
// Create a fake object to store in the default bucket-type just to keep track of the 2i indices.
Location fakeLocation = new Location(new Namespace(strongConsistentScansBucketType, table), key);
// Obviously, we want the fake object to contain as less data as possible. We can't create a void object, so
// we have to choose the minimum data size allowed: it is one byte.
RiakObject fakeObject = new RiakObject();
fakeObject.setValue(BinaryValue.create(new byte[]{0x00}));
fakeObject.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
StoreValue fakeStore = new StoreValue.Builder(fakeObject)
.withLocation(fakeLocation)
.build();
// We don't mind whether the operation is finished or not, because waiting for it to complete would slow down the
// client and make our solution too heavy to be seen as a valid compromise. This will obviously mean that under
// heavy load conditions a scan operation could fail due to an unfinished "fakeStore".
riakClient.executeAsync(fakeStore);
} else if (!strongConsistency) {
// The next operation is useless when using strong consistency model, so it's ok to perform it only when using
// eventual consistency.
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
}
// Store proper values into the object.
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
StoreValue store = new StoreValue.Builder(object)
.withOption(StoreValue.Option.W, wvalue)
.withLocation(location)
.withOption(Option.W, wvalue)
.build();
RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store);
@ -416,15 +449,13 @@ public final class RiakKVClient extends DB {
future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT");
System.err.println("Unable to insert key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString());
System.err.println("Unable to insert key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
@ -462,18 +493,15 @@ public final class RiakKVClient extends DB {
*/
@Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
// If eventual consistency model is in use, then an update operation is pratically equivalent to an insert one.
if (!strongConsistency) {
return insert(table, key, values);
}
Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
UpdateValue update = new UpdateValue.Builder(location)
.withUpdate(new UpdateEntity(object))
.withUpdate(new UpdateEntity(new RiakObject().setValue(BinaryValue.create(serializeTable(values)))))
.build();
RiakFuture<UpdateValue.Response, Location> future = riakClient.executeAsync(update);
@ -503,7 +531,6 @@ public final class RiakKVClient extends DB {
return Status.OK;
}
/**
* Delete a record from the database.
*
@ -548,14 +575,15 @@ public final class RiakKVClient extends DB {
/**
* Auxiliary function needed for testing. It configures the default bucket-type to take care of the consistency
* problem by disallowing the siblings creation. Moreover, it disables strong consistency, as the scan transaction
* test would otherwise fail.
* problem by disallowing the siblings creation. Moreover, it disables strong consistency, because we don't have
* the possibility to create a proper bucket-type to use to fake 2i indexes usage.
*
* @param bucket The bucket name.
* @throws Exception Thrown if something bad happens.
*/
void setTestEnvironment(String bucket) throws Exception {
bucketType = "default";
bucketType2i = bucketType;
strongConsistency = false;
Namespace ns = new Namespace(bucketType, bucket);

Просмотреть файл

@ -19,9 +19,11 @@
package com.yahoo.ycsb.db.riak;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
@ -101,12 +103,12 @@ final class RiakUtils {
}
/**
* Deserializes an input byte array, transforming it into a list of (String, ByteIterator) couples (i.e. a Map).
* Deserializes an input byte array, transforming it into a list of (String, ByteIterator) pairs (i.e. a Map).
*
* @param aValue A byte array containing the table to deserialize.
* @param theResult A Map containing the deserialized table.
*/
static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) {
private static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) {
final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue);
byte[] aSizeBuffer = new byte[4];
@ -144,4 +146,43 @@ final class RiakUtils {
return Long.parseLong(keyString);
}
/**
* Function that retrieves all the fields searched within a read or scan operation and puts them in the result
* HashMap.
*
* @param fields The list of fields to read, or null for all of them.
* @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record.
* @param resultHashMap The HashMap to return as result.
*/
static void createResultHashMap(Set<String> fields, FetchValue.Response response,
HashMap<String, ByteIterator>resultHashMap) {
// If everything went fine, then a result must be given. Such an object is a hash table containing the (field,
// value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
// The following line retrieves the previously serialized table which was store with an insert transaction.
byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
// Deserialize the stored response table.
HashMap<String, ByteIterator> deserializedTable = new HashMap<>();
deserializeTable(responseFieldsAndValues, deserializedTable);
// If only specific fields are requested, then only these should be put in the result object!
if (fields != null) {
// Populate the HashMap to provide as result.
for (Object field : fields.toArray()) {
// Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation
// DOES NOT return a null value), then proceed to store the pair in the resultHashMap.
ByteIterator value = deserializedTable.get(field);
if (value != null) {
resultHashMap.put((String) field, value);
}
}
} else {
// If, instead, no field is specified, then all those retrieved must be provided as result.
for (String field : deserializedTable.keySet()) {
resultHashMap.put(field, deserializedTable.get(field));
}
}
}
}

Просмотреть файл

@ -52,6 +52,10 @@ riak.transaction_time_limit=10
# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
riak.strong_consistency=true
# riak.strong_consistent_scans_bucket_type - string, indicates the bucket-type to use to allow scans transactions
# when using strong consistency mode. Example: fakeBucketType.
riak.strong_consistent_scans_bucket_type=
# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
# is started.
riak.debug=false
riak.debug=false