Added additional parameter to turn on deterministic creation of values for

specified keys. When running a workload, read data is checked against the
deterministic value and inconsistencies exported to the measurements object.

Discovered flaws in multithreaded YCSB 'latest' workload distribution as
data may be read by one thread before it is written by another.
This commit is contained in:
nixon 2012-08-15 15:57:46 -07:00
Родитель 0caa77fe44
Коммит 5134d46db6
1 изменённых файлов: 150 добавлений и 31 удалений

Просмотреть файл

@ -37,6 +37,9 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Vector;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
/**
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The relative
@ -88,6 +91,8 @@ public class CoreWorkload extends Workload
int fieldcount;
private List<String> fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian" (favoring short records), "constant", and "histogram".
*
@ -148,6 +153,33 @@ public class CoreWorkload extends Workload
boolean writeallfields;
/**
* The name of the property for deciding whether to check all returned
* data against the formation template to ensure data integrity.
*/
public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity";
/**
* The default value for the dataintegrity property.
*/
public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false";
/**
* Set to true if want to check correctness of reads. Must also
* be set to true during loading phase to function.
*/
private boolean dataintegrity;
/**
* Response values for data integrity checks.
* Need to be multiples of 1000 to match bucket offsets of
* measurements/OneMeasurementHistogram.java.
*/
private final int DATA_INT_MATCH = 0;
private final int DATA_INT_DEVIATE = 1000;
private final int DATA_INT_UNEXPECTED_NULL = 2000;
/**
* The name of the property for the proportion of transactions that are reads.
*/
@ -306,6 +338,10 @@ public class CoreWorkload extends Workload
table = p.getProperty(TABLENAME_PROPERTY,TABLENAME_PROPERTY_DEFAULT);
fieldcount=Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY,FIELD_COUNT_PROPERTY_DEFAULT));
fieldnames = new ArrayList<String>();
for (int i = 0; i < fieldcount; i++) {
fieldnames.add("field" + i);
}
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
double readproportion=Double.parseDouble(p.getProperty(READ_PROPORTION_PROPERTY,READ_PROPORTION_PROPERTY_DEFAULT));
@ -323,6 +359,15 @@ public class CoreWorkload extends Workload
readallfields=Boolean.parseBoolean(p.getProperty(READ_ALL_FIELDS_PROPERTY,READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields=Boolean.parseBoolean(p.getProperty(WRITE_ALL_FIELDS_PROPERTY,WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
//Confirm that fieldlengthgenerator returns a constant if data
//integrity check requested.
if (dataintegrity && !(p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant"))
{
System.err.println("Must have constant field size to check data integrity.");
System.exit(-1);
}
if (p.getProperty(INSERT_ORDER_PROPERTY,INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")==0)
{
orderedinserts=false;
@ -427,25 +472,62 @@ public class CoreWorkload extends Workload
}
return "user"+keynum;
}
HashMap<String, ByteIterator> buildValues() {
HashMap<String,ByteIterator> values=new HashMap<String,ByteIterator>();
for (int i=0; i<fieldcount; i++)
{
String fieldkey="field"+i;
ByteIterator data= new RandomByteIterator(fieldlengthgenerator.nextInt());
values.put(fieldkey,data);
}
return values;
}
HashMap<String, ByteIterator> buildUpdate() {
//update a random field
HashMap<String, ByteIterator> values=new HashMap<String,ByteIterator>();
String fieldname="field"+fieldchooser.nextString();
ByteIterator data = new RandomByteIterator(fieldlengthgenerator.nextInt());
values.put(fieldname,data);
return values;
}
/**
* Builds a value for a randomly chosen field.
*/
private HashMap<String, ByteIterator> buildSingleValue(String key) {
HashMap<String,ByteIterator> value = new HashMap<String,ByteIterator>();
String fieldkey = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
//fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextInt());
}
value.put(fieldkey,data);
return value;
}
/**
* Builds values for all fields.
*/
private HashMap<String, ByteIterator> buildValues(String key) {
HashMap<String,ByteIterator> values = new HashMap<String,ByteIterator>();
for (String fieldkey : fieldnames) {
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
//fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextInt());
}
values.put(fieldkey,data);
}
return values;
}
/**
* Build a deterministic value given the key information.
*/
private String buildDeterministicValue(String key, String fieldkey) {
int size = fieldlengthgenerator.nextInt();
StringBuilder sb = new StringBuilder(size);
sb.append(key);
sb.append(':');
sb.append(fieldkey);
while (sb.length() < size) {
sb.append(':');
sb.append(sb.toString().hashCode());
}
sb.setLength(size);
return sb.toString();
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads, this
@ -457,7 +539,7 @@ public class CoreWorkload extends Workload
{
int keynum=keysequence.nextInt();
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues();
HashMap<String, ByteIterator> values = buildValues(dbkey);
if (db.insert(table,dbkey,values) == 0)
return true;
else
@ -498,6 +580,30 @@ public class CoreWorkload extends Workload
return true;
}
/**
* Results are reported in the first three buckets of the histogram under
* the label "VERIFY".
* Bucket 0 means the expected data was returned.
* Bucket 1 means incorrect data was returned.
* Bucket 2 means null data was returned when some data was expected.
*/
protected void verifyRow(String key, HashMap<String,ByteIterator> cells) {
int matchType = DATA_INT_MATCH;
if (!cells.isEmpty()) {
for (Map.Entry<String, ByteIterator> entry : cells.entrySet()) {
if (!entry.getValue().toString().equals(
buildDeterministicValue(key, entry.getKey()))) {
matchType = DATA_INT_DEVIATE;
break;
}
}
} else {
//This assumes that null data is never valid
matchType = DATA_INT_UNEXPECTED_NULL;
}
Measurements.getMeasurements().measure("VERIFY", matchType);
}
int nextKeynum() {
int keynum;
if(keychooser instanceof ExponentialGenerator) {
@ -528,13 +634,19 @@ public class CoreWorkload extends Workload
if (!readallfields)
{
//read a random field
String fieldname="field"+fieldchooser.nextString();
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
}
db.read(table,keyname,fields,new HashMap<String,ByteIterator>());
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
db.read(table,keyname,fields,cells);
if (dataintegrity) {
verifyRow(keyname, cells);
}
}
public void doTransactionReadModifyWrite(DB db)
@ -549,7 +661,7 @@ public class CoreWorkload extends Workload
if (!readallfields)
{
//read a random field
String fieldname="field"+fieldchooser.nextString();
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
@ -560,24 +672,31 @@ public class CoreWorkload extends Workload
if (writeallfields)
{
//new data for all the fields
values = buildValues();
values = buildValues(keyname);
}
else
{
//update a random field
values = buildUpdate();
values = buildSingleValue(keyname);
}
//do the transaction
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
long st=System.nanoTime();
db.read(table,keyname,fields,new HashMap<String,ByteIterator>());
db.read(table,keyname,fields,cells);
db.update(table,keyname,values);
long en=System.nanoTime();
if (dataintegrity) {
verifyRow(keyname, cells);
}
Measurements.getMeasurements().measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
}
@ -596,7 +715,7 @@ public class CoreWorkload extends Workload
if (!readallfields)
{
//read a random field
String fieldname="field"+fieldchooser.nextString();
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
@ -617,12 +736,12 @@ public class CoreWorkload extends Workload
if (writeallfields)
{
//new data for all the fields
values = buildValues();
values = buildValues(keyname);
}
else
{
//update a random field
values = buildUpdate();
values = buildSingleValue(keyname);
}
db.update(table,keyname,values);
@ -635,7 +754,7 @@ public class CoreWorkload extends Workload
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues();
HashMap<String, ByteIterator> values = buildValues(dbkey);
db.insert(table,dbkey,values);
}
}