[kudu] Support range partitioning (#1331)

This commit is contained in:
Zhang Yifan 2020-11-30 09:39:59 +08:00 коммит произвёл GitHub
Родитель e31b5039d2
Коммит c4082f4554
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 86 добавлений и 25 удалений

Просмотреть файл

@ -371,6 +371,19 @@ public class CoreWorkload extends Workload {
private Measurements measurements = Measurements.getMeasurements();
public static String buildKeyName(long keynum, int zeropadding, boolean orderedinserts) {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
String value = Long.toString(keynum);
int fill = zeropadding - value.length();
String prekey = "user";
for (int i = 0; i < fill; i++) {
prekey += '0';
}
return prekey + value;
}
protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException {
NumberGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(
@ -531,19 +544,6 @@ public class CoreWorkload extends Workload {
INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT));
}
protected String buildKeyName(long keynum) {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
String value = Long.toString(keynum);
int fill = zeropadding - value.length();
String prekey = "user";
for (int i = 0; i < fill; i++) {
prekey += '0';
}
return prekey + value;
}
/**
* Builds a value for a randomly chosen field.
*/
@ -609,7 +609,7 @@ public class CoreWorkload extends Workload {
@Override
public boolean doInsert(DB db, Object threadstate) {
int keynum = keysequence.nextValue().intValue();
String dbkey = buildKeyName(keynum);
String dbkey = CoreWorkload.buildKeyName(keynum, zeropadding, orderedinserts);
HashMap<String, ByteIterator> values = buildValues(dbkey);
Status status;
@ -720,7 +720,7 @@ public class CoreWorkload extends Workload {
// choose a random key
long keynum = nextKeynum();
String keyname = buildKeyName(keynum);
String keyname = CoreWorkload.buildKeyName(keynum, zeropadding, orderedinserts);
HashSet<String> fields = null;
@ -747,7 +747,7 @@ public class CoreWorkload extends Workload {
// choose a random key
long keynum = nextKeynum();
String keyname = buildKeyName(keynum);
String keyname = CoreWorkload.buildKeyName(keynum, zeropadding, orderedinserts);
HashSet<String> fields = null;
@ -794,7 +794,7 @@ public class CoreWorkload extends Workload {
// choose a random key
long keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
String startkeyname = CoreWorkload.buildKeyName(keynum, zeropadding, orderedinserts);
// choose a random scan length
int len = scanlength.nextValue().intValue();
@ -816,7 +816,7 @@ public class CoreWorkload extends Workload {
// choose a random key
long keynum = nextKeynum();
String keyname = buildKeyName(keynum);
String keyname = CoreWorkload.buildKeyName(keynum, zeropadding, orderedinserts);
HashMap<String, ByteIterator> values;
@ -836,7 +836,7 @@ public class CoreWorkload extends Workload {
long keynum = transactioninsertkeysequence.nextValue();
try {
String dbkey = buildKeyName(keynum);
String dbkey = CoreWorkload.buildKeyName(keynum, zeropadding, orderedinserts);
HashMap<String, ByteIterator> values = buildValues(dbkey);
db.insert(table, dbkey, values);

Просмотреть файл

@ -41,6 +41,8 @@ Additional configurations:
default is true.
* `kudu_block_size`: The data block size used to configure columns. The default
is 4096 bytes.
* `kudu_partition_schema`: The partition schema used to create table. It could be
'hashPartition' or 'rangePartition', the default is 'hashPartition'.
Then, you can run the workload:

Просмотреть файл

@ -38,8 +38,14 @@ import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import static site.ycsb.Client.DEFAULT_RECORD_COUNT;
import static site.ycsb.Client.RECORD_COUNT_PROPERTY;
import static site.ycsb.workloads.CoreWorkload.INSERT_ORDER_PROPERTY;
import static site.ycsb.workloads.CoreWorkload.INSERT_ORDER_PROPERTY_DEFAULT;
import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY;
import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY_DEFAULT;
import static site.ycsb.workloads.CoreWorkload.ZERO_PADDING_PROPERTY;
import static site.ycsb.workloads.CoreWorkload.ZERO_PADDING_PROPERTY_DEFAULT;
import static org.apache.kudu.Type.STRING;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL;
@ -71,6 +77,7 @@ public class KuduYCSBClient extends site.ycsb.DB {
private static final long DEFAULT_SLEEP = 60000;
private static final int DEFAULT_NUM_CLIENTS = 1;
private static final int DEFAULT_NUM_REPLICAS = 3;
private static final String DEFAULT_PARTITION_SCHEMA = "hashPartition";
private static final String SYNC_OPS_OPT = "kudu_sync_ops";
private static final String BUFFER_NUM_OPS_OPT = "kudu_buffer_num_ops";
@ -79,6 +86,7 @@ public class KuduYCSBClient extends site.ycsb.DB {
private static final String BLOCK_SIZE_OPT = "kudu_block_size";
private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
private static final String NUM_CLIENTS_OPT = "kudu_num_clients";
private static final String PARTITION_SCHEMA_OPT = "kudu_partition_schema";
private static final int BLOCK_SIZE_DEFAULT = 4096;
private static final int BUFFER_NUM_OPS_DEFAULT = 2000;
@ -92,10 +100,21 @@ public class KuduYCSBClient extends site.ycsb.DB {
private String tableName;
private KuduSession session;
private KuduTable kuduTable;
private String partitionSchema;
private int zeropadding;
private boolean orderedinserts;
@Override
public void init() throws DBException {
this.tableName = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
Properties prop = getProperties();
this.tableName = prop.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
this.partitionSchema = prop.getProperty(PARTITION_SCHEMA_OPT, DEFAULT_PARTITION_SCHEMA);
this.zeropadding = Integer.parseInt(prop.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT));
if (prop.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) {
this.orderedinserts = false;
} else {
this.orderedinserts = true;
}
initClient();
this.session = client.newSession();
if (getProperties().getProperty(SYNC_OPS_OPT) != null
@ -150,7 +169,6 @@ public class KuduYCSBClient extends site.ycsb.DB {
setupTable();
}
private void setupTable() throws DBException {
Properties prop = getProperties();
synchronized (KuduYCSBClient.class) {
@ -163,6 +181,10 @@ public class KuduYCSBClient extends site.ycsb.DB {
+ ") must be equal " + "or below " + MAX_TABLETS);
}
int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, DEFAULT_NUM_REPLICAS);
long recordCount = Long.parseLong(prop.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
if (recordCount == 0) {
recordCount = Integer.MAX_VALUE;
}
int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
@ -187,10 +209,47 @@ public class KuduYCSBClient extends site.ycsb.DB {
schema = new Schema(columns);
CreateTableOptions builder = new CreateTableOptions();
builder.setRangePartitionColumns(new ArrayList<String>());
List<String> hashPartitionColumns = new ArrayList<>();
hashPartitionColumns.add(KEY);
builder.addHashPartitions(hashPartitionColumns, numTablets);
if (partitionSchema.equals("hashPartition")) {
builder.setRangePartitionColumns(new ArrayList<String>());
List<String> hashPartitionColumns = new ArrayList<>();
hashPartitionColumns.add(KEY);
builder.addHashPartitions(hashPartitionColumns, numTablets);
} else if (partitionSchema.equals("rangePartition")) {
if (!orderedinserts) {
// We need to use ordered keys to determine how to split range partitions.
throw new DBException("Must specify `insertorder=ordered` if using rangePartition schema.");
}
String maxKeyValue = String.valueOf(recordCount);
if (zeropadding < maxKeyValue.length()) {
throw new DBException(String.format("Invalid zeropadding value: %d, zeropadding needs to be larger "
+ "or equal to number of digits in the record number: %d.", zeropadding, maxKeyValue.length()));
}
List<String> rangePartitionColumns = new ArrayList<>();
rangePartitionColumns.add(KEY);
builder.setRangePartitionColumns(rangePartitionColumns);
// Add rangePartitions
long lowerNum = 0;
long upperNum = 0;
int remainder = (int) recordCount % numTablets;
for (int i = 0; i < numTablets; i++) {
lowerNum = upperNum;
upperNum = lowerNum + recordCount / numTablets;
if (i < remainder) {
++upperNum;
}
PartialRow lower = schema.newPartialRow();
lower.addString(KEY, CoreWorkload.buildKeyName(lowerNum, zeropadding, orderedinserts));
PartialRow upper = schema.newPartialRow();
upper.addString(KEY, CoreWorkload.buildKeyName(upperNum, zeropadding, orderedinserts));
builder.addRangePartition(lower, upper);
}
} else {
throw new DBException("Invalid partition_schema specified: " + partitionSchema
+ ", must specify `partition_schema=hashPartition` or `partition_schema=rangePartition`");
}
builder.setNumReplicas(numReplicas);
try {