Add binding based on MIT licensed client version for VoltDB
This commit is contained in:
David Rolfe 2019-09-17 12:29:23 +01:00 коммит произвёл Sean Busbey
Родитель 178e4e5b0b
Коммит cc165f2249
23 изменённых файлов: 1988 добавлений и 0 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -22,3 +22,4 @@ output*
# ignore standard Mac OS X files/dirs
.DS_Store
/differentbin/

1
bin/bindings.properties Normal file → Executable file
Просмотреть файл

@ -79,3 +79,4 @@ solr:com.yahoo.ycsb.db.solr.SolrClient
solr6:com.yahoo.ycsb.db.solr6.SolrClient
tarantool:com.yahoo.ycsb.db.TarantoolClient
tablestore:com.yahoo.ycsb.db.tablestore.TableStoreClient
voltdb:com.yahoo.ycsb.db.voltdb.VoltClient4

0
bin/ycsb.bat Normal file → Executable file
Просмотреть файл

Просмотреть файл

@ -286,6 +286,12 @@ LICENSE file.
<version>${project.version}</version>
</dependency>
-->
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>voltdb-binding</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>

Просмотреть файл

@ -110,6 +110,7 @@ LICENSE file.
<thrift.version>0.8.0</thrift.version>
<voldemort.version>0.81</voldemort.version>
<tablestore.version>4.8.0</tablestore.version>
<voltdb.version>9.1.1</voltdb.version>
</properties>
<modules>
@ -168,6 +169,7 @@ LICENSE file.
<module>tarantool</module>
<module>tablestore</module>
<!--<module>voldemort</module>-->
<module>voltdb</module>
</modules>
<build>

86
voltdb/README.md Normal file
Просмотреть файл

@ -0,0 +1,86 @@
<!--
Copyright (c) 2010 Yahoo! Inc., 2012 - 2019 YCSB contributors.
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
# YCSB for VoltDB
![VoltDB Logo](https://www.voltdb.com/wp-content/uploads/2016/10/VoltDB-logo-320x132.jpg?ycsb=true "VoltDB")
## README
This README describes how to run YCSB on VoltDB.
Here at VoltDB we use 4 machines for testing - 1 client and a 3 node cluster.
### 1. Install Java on all the machines involved.
VoltDB uses Java. Either [Oracle Java](https://www.oracle.com/technetwork/java/javase/downloads/index.html) or [OpenJDK](https://openjdk.java.net/) will work.
### 3. Install and configure VoltDB
If you don't already have a copy of VoltDB you should [download](https://www.voltdb.com/try-voltdb/) and install it.
Make sure you know the hostnames/ip addresses of all the nodes in the cluster and that [port 21212](https://docs.voltdb.com/AdminGuide/HostConfigPortOpts.php) is open for your client.
A representative VoltDB cluster would have 3 nodes and a '[K factor](https://docs.voltdb.com/UsingVoltDB/KSafeEnable.php)' of 1. This configuration allows work to continue if a server dies.
Note: If you contact us we can give you access to AWS Cloudformation scripts and a matching AMI to create a cluster for you.
### 4. Install YCSB
Download the [latest YCSB](https://github.com/brianfrankcooper/YCSB/releases/latest) file. Follow the instructions.
### 5. Configure VoltDB parameters
Create a file called (for example) voltdb.properties:
recordcount=10000000
operationcount=10000000
voltdb.servers=localhost
threadcount=1
maxexecutiontime=300
Other possible entries would be:
- `voltdb.user`
- Username. Only needed if username/passwords enabled.
- `voltdb.password`
- Password. Only needed if username/passwords enabled.
- `voltdb.ratelimit`
- Maximum number of transactions allowed per second per 50 threads - e.g. 'voltdb.ratelimit=70000'. Note that as you increase the workload you eventually get to a point where throwing more and more transactions at a given configuration is counterproductive. For the three node configuration we mentioned above 70000 would be a good starting point for this value.
- `voltdb.scanall`
- When set to 'yes' uses a single query to return data for 'Scan' operations ('workload e') instead of a separate query per partition. The later is much more scalable but generates more network traffic.
### 6. Run YCSB
See: [Running a Workload](https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload)
Make sure you keep an eye on the voltdb GUI. If you installed VoltDB on the same computer you are running YCSB on it will be at:
http://localhost:8080/
First load the data:
bin/ycsb.sh load voltdb -P workloads/workloada -P voltdb.properties
Then run the different workloads - 'a' through 'e':
bin/ycsb.sh run voltdb -P workloads/workloada -P voltdb.properties
bin/ycsb.sh run voltdb -P workloads/workloadb -P voltdb.properties
bin/ycsb.sh run voltdb -P workloads/workloadc -P voltdb.properties
bin/ycsb.sh run voltdb -P workloads/workloadd -P voltdb.properties
bin/ycsb.sh run voltdb -P workloads/workloade -P voltdb.properties

74
voltdb/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved. Licensed
under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License. See accompanying
LICENSE file. -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.17.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>voltdb-binding</artifactId>
<name>VoltDB Binding</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.voltdb/voltdbclient -->
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>voltdbclient</artifactId>
<version>9.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.voltdb/voltdb -->
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>voltdb</artifactId>
<version>9.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,179 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/*
* VoltDB Connection Utility.
*/
package com.yahoo.ycsb.db.voltdb;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.client.Client;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
/**
* Help class to create VoltDB connections for YCSB benchmark.
*/
public final class ConnectionHelper {
/**
* Default port for VoltDB.
*/
private static final int VOLTDB_DEFAULT_PORT = 21212;
/**
* hidden constructor.
*/
private ConnectionHelper() {
}
/**
* Creates a factory used to connect to a VoltDB instance. (Note that if a
* corresponding connection exists, all parameters other than 'servers' are
* ignored)
*
* @param servers The comma separated list of VoltDB servers in
* hostname[:port] format that the instance will use.
* @param user The username for the connection
* @param password The password for the specified user
* @param ratelimit A limit on the number of transactions per second for the
* VoltDB instance
* @return The existing factory if a corresponding connection has already been
* created; the newly created one otherwise.
* @throws IOException Throws if a connection is already open with a
* different server string.
* @throws InterruptedException
*/
public static Client createConnection(String servers, String user, String password,
int ratelimit) throws IOException, InterruptedException {
ClientConfig config = new ClientConfig(user, password);
config.setMaxTransactionsPerSecond(ratelimit);
Client client = ClientFactory.createClient(config);
// Note that in VoltDB there is a distinction between creating an instance of a client
// and actually connecting to the DB...
connect(client, servers);
return client;
}
/**
* Connect to a single server with retry. Limited exponential backoff. No
* timeout. This will run until the process is killed if it's not able to
* connect.
*
* @param server hostname:port or just hostname (hostname can be ip).
*/
private static void connectToOneServerWithRetry(final Client client, String server) {
Logger logger = LoggerFactory.getLogger(ConnectionHelper.class);
int sleep = 1000;
while (true) {
try {
client.createConnection(server);
break;
} catch (Exception e) {
logger.error("Connection failed - retrying in %d second(s).\n", sleep / 1000);
try {
Thread.sleep(sleep);
} catch (java.lang.InterruptedException e2) {
logger.error(e2.getMessage());
}
if (sleep < 8000) {
sleep += sleep;
}
}
}
logger.info("Connected to VoltDB node at:" + server);
}
/**
* See if DB servers are present on the network.
*
* @return true or false
*/
public static boolean checkDBServers(String servernames) {
String[] serverNamesArray = servernames.split(",");
boolean dbThere = false;
Socket socket = null;
try {
// Connect
socket = new Socket(serverNamesArray[0], VOLTDB_DEFAULT_PORT);
dbThere = true;
} catch (IOException connectFailed) {
dbThere = false;
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException ignore) {
// Ignore.
}
}
socket = null;
}
return dbThere;
}
/**
* Connect to a set of servers in parallel. Each will retry until connection.
* This call will block until all have connected.
*
* @param servers A comma separated list of servers using the hostname:port
* syntax (where :port is optional).
* @throws InterruptedException if anything bad happens with the threads.
*/
private static void connect(final Client client, String servers) throws InterruptedException {
Logger logger = LoggerFactory.getLogger(ConnectionHelper.class);
logger.info("Connecting to VoltDB...");
String[] serverArray = servers.split(",");
final CountDownLatch connections = new CountDownLatch(serverArray.length);
// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(client, server);
connections.countDown();
}
}).start();
}
// block until all have connected
connections.await();
}
}

Просмотреть файл

@ -0,0 +1,304 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/*
* This client provideds a wrapper layer for running the Yahoo Cloud Serving
* Benchmark (YCSB) against VoltDB. This benchmark runs a synchronous client
* with a mix of the operations provided below. YCSB is open-source, and may
* be found at https://github.com/brianfrankcooper/YCSB. The YCSB jar must be
* in your classpath to compile this client.
*/
package com.yahoo.ycsb.db.voltdb;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.VoltTable;
import org.voltdb.client.Client;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientResponseWithPartitionKey;
import org.voltdb.client.NoConnectionsException;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.db.voltdb.sortedvolttable.VoltDBTableSortedMergeWrangler;
/**
* A client that can be used by YCSB to work with VoltDB.
*/
public class VoltClient4 extends DB {
private Client mclient;
private byte[] mworkingData;
private ByteBuffer mwriteBuf;
private boolean useScanAll = false;
private static final Charset UTF8 = Charset.forName("UTF-8");
private Logger logger = LoggerFactory.getLogger(VoltClient4.class);
private YCSBSchemaBuilder ysb = null;
@Override
public void init() throws DBException {
Properties props = getProperties();
String servers = props.getProperty("voltdb.servers", "localhost");
String user = props.getProperty("voltdb.user", "");
String password = props.getProperty("voltdb.password", "");
String strLimit = props.getProperty("voltdb.ratelimit");
String useScanAllParam = props.getProperty("voltdb.scanall", "no");
if (useScanAllParam.equalsIgnoreCase("YES")) {
useScanAll = true;
}
int ratelimit = strLimit != null ? Integer.parseInt(strLimit) : Integer.MAX_VALUE;
try {
mclient = ConnectionHelper.createConnection(servers, user, password, ratelimit);
ysb = StaticHolder.INSTANCE;
ysb.loadClassesAndDDLIfNeeded(mclient);
} catch (Exception e) {
logger.error("Error while creating connection: ", e);
throw new DBException(e.getMessage());
}
mworkingData = new byte[1024 * 1024];
mwriteBuf = ByteBuffer.wrap(mworkingData);
}
/**
* @return true if we have a live DB connection
*/
public boolean hasConnection() {
if (mclient != null && mclient.getConnectedHostList().size() > 0) {
return true;
}
return false;
}
@Override
public void cleanup() throws DBException {
// If VoltDB client exists and has a live connection...
if (mclient != null && mclient.getConnectedHostList().size() > 0) {
try {
mclient.drain();
mclient.close();
} catch (NoConnectionsException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
mclient = null;
}
}
@Override
public Status delete(String keyspace, String key) {
try {
ClientResponse response = mclient.callProcedure("STORE.delete", key, keyspace.getBytes(UTF8));
return response.getStatus() == ClientResponse.SUCCESS ? Status.OK : Status.ERROR;
} catch (Exception e) {
logger.error("Error while deleting row", e);
return Status.ERROR;
}
}
@Override
public Status insert(String keyspace, String key, Map<String, ByteIterator> columns) {
return update(keyspace, key, columns);
}
@Override
public Status read(String keyspace, String key, Set<String> columns, Map<String, ByteIterator> result) {
try {
ClientResponse response = mclient.callProcedure("Get", keyspace.getBytes(UTF8), key);
if (response.getStatus() != ClientResponse.SUCCESS) {
return Status.ERROR;
}
VoltTable table = response.getResults()[0];
if (table.advanceRow()) {
unpackRowData(table, columns, result);
}
return Status.OK;
} catch (Exception e) {
logger.error("Error while GETing row", e);
return Status.ERROR;
}
}
@Override
public Status scan(String keyspace, String lowerBound, int recordCount, Set<String> columns,
Vector<HashMap<String, ByteIterator>> result) {
try {
if (useScanAll) {
byte[] ks = keyspace.getBytes(UTF8);
ClientResponse response = mclient.callProcedure("ScanAll", ks, lowerBound.getBytes(UTF8), recordCount);
if (response.getStatus() != ClientResponse.SUCCESS) {
return Status.ERROR;
}
result.ensureCapacity(recordCount);
VoltTable outputTable = response.getResults()[0];
outputTable.resetRowPosition();
while (outputTable.advanceRow()) {
result.add(unpackRowDataHashMap(outputTable, columns));
}
} else {
byte[] ks = keyspace.getBytes(UTF8);
ClientResponseWithPartitionKey[] response = mclient.callAllPartitionProcedure("Scan", ks,
lowerBound.getBytes(UTF8), recordCount);
for (int i = 0; i < response.length; i++) {
if (response[i].response.getStatus() != ClientResponse.SUCCESS) {
return Status.ERROR;
}
}
result.ensureCapacity(recordCount);
VoltDBTableSortedMergeWrangler smw = new VoltDBTableSortedMergeWrangler(response);
VoltTable outputTable = smw.getSortedTable(1, recordCount);
outputTable.resetRowPosition();
while (outputTable.advanceRow()) {
result.add(unpackRowDataHashMap(outputTable, columns));
}
}
return Status.OK;
} catch (Exception e) {
logger.error("Error while calling SCAN", e);
return Status.ERROR;
}
}
@Override
public Status update(String keyspace, String key, Map<String, ByteIterator> columns) {
try {
ClientResponse response = mclient.callProcedure("Put", keyspace.getBytes(UTF8), key, packRowData(columns));
return response.getStatus() == ClientResponse.SUCCESS ? Status.OK : Status.ERROR;
} catch (Exception e) {
logger.error("Error while calling Update", e);
return Status.ERROR;
}
}
private byte[] packRowData(Map<String, ByteIterator> columns) {
mwriteBuf.clear();
mwriteBuf.putInt(columns.size());
for (String key : columns.keySet()) {
byte[] k = key.getBytes(UTF8);
mwriteBuf.putInt(k.length);
mwriteBuf.put(k);
ByteIterator v = columns.get(key);
int len = (int) v.bytesLeft();
mwriteBuf.putInt(len);
v.nextBuf(mworkingData, mwriteBuf.position());
mwriteBuf.position(mwriteBuf.position() + len);
}
byte[] data = new byte[mwriteBuf.position()];
System.arraycopy(mworkingData, 0, data, 0, data.length);
return data;
}
private Map<String, ByteIterator> unpackRowData(VoltTable data, Set<String> fields,
Map<String, ByteIterator> result) {
byte[] rowData = data.getVarbinary(0);
ByteBuffer buf = ByteBuffer.wrap(rowData);
int nFields = buf.getInt();
return unpackRowData(rowData, buf, nFields, fields, result);
}
private Map<String, ByteIterator> unpackRowData(byte[] rowData, ByteBuffer buf, int nFields, Set<String> fields,
Map<String, ByteIterator> result) {
for (int i = 0; i < nFields; i++) {
int len = buf.getInt();
int off = buf.position();
String key = new String(rowData, off, len, UTF8);
buf.position(off + len);
len = buf.getInt();
off = buf.position();
if (fields == null || fields.contains(key)) {
result.put(key, new ByteArrayByteIterator(rowData, off, len));
}
buf.position(off + len);
}
return result;
}
private HashMap<String, ByteIterator> unpackRowDataHashMap(VoltTable data, Set<String> fields) {
byte[] rowData = data.getVarbinary(0);
ByteBuffer buf = ByteBuffer.wrap(rowData);
int nFields = buf.getInt();
int size = fields != null ? Math.min(fields.size(), nFields) : nFields;
HashMap<String, ByteIterator> res = new HashMap<String, ByteIterator>(size, (float) 1.25);
return unpackRowDataHashMap(rowData, buf, nFields, fields, res);
}
private HashMap<String, ByteIterator> unpackRowDataHashMap(byte[] rowData, ByteBuffer buf, int nFields,
Set<String> fields, HashMap<String, ByteIterator> result) {
for (int i = 0; i < nFields; i++) {
int len = buf.getInt();
int off = buf.position();
String key = new String(rowData, off, len, UTF8);
buf.position(off + len);
len = buf.getInt();
off = buf.position();
if (fields == null || fields.contains(key)) {
result.put(key, new ByteArrayByteIterator(rowData, off, len));
}
buf.position(off + len);
}
return result;
}
private static class StaticHolder {
static final YCSBSchemaBuilder INSTANCE = new YCSBSchemaBuilder();
}
}

Просмотреть файл

@ -0,0 +1,232 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.voltdb;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.client.Client;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ProcCallException;
/**
* Utility class to build the YCSB schema.
*
*/
public final class YCSBSchemaBuilder {
private static final String PROCEDURE_GET_WAS_NOT_FOUND = "Procedure Get was not found";
private static final Charset UTF8 = Charset.forName("UTF-8");
private final String createTableDDL = "CREATE TABLE Store (keyspace VARBINARY(128) NOT NULL\n"
+ ", key VARCHAR(128) NOT NULL, value VARBINARY(2056) NOT NULL\n"
+ ", PRIMARY KEY (key, keyspace));";
private final String partitionTableDDL = "PARTITION TABLE Store ON COLUMN key;\n";
private final String createGetDDL = "CREATE PROCEDURE Get PARTITION ON TABLE Store COLUMN key PARAMETER 1\n"
+ "AS SELECT value FROM Store WHERE keyspace = ? AND key = ?;";
private final String createPutDDL = "CREATE PROCEDURE PARTITION ON TABLE Store COLUMN key PARAMETER 1\n"
+ "FROM CLASS com.yahoo.ycsb.db.voltdb.procs.Put;";
private final String createScanDDL = "CREATE PROCEDURE PARTITION ON TABLE Store COLUMN key \n"
+ "FROM CLASS com.yahoo.ycsb.db.voltdb.procs.Scan;";
private final String createScanAllDDL = "CREATE PROCEDURE \n" + "FROM CLASS com.yahoo.ycsb.db.voltdb.procs.ScanAll;";
private final String[] ddlStatements = {createTableDDL, partitionTableDDL };
private final String[] procStatements = {createGetDDL, createPutDDL, createScanDDL, createScanAllDDL };
private final String[] jarFiles = {"Put.class", "Scan.class", "ScanAll.class", "ByteWrapper.class" };
private final String jarFileName = "ycsb-procs.jar";
private Logger logger = LoggerFactory.getLogger(YCSBSchemaBuilder.class);
/**
* Utility class to build the YCSB schema.
*
* @author srmadscience / VoltDB
*
*/
YCSBSchemaBuilder() {
super();
}
/**
* See if we think YCSB Schema already exists...
*
* @return true if the 'Get' procedure exists and takes one string as a
* parameter.
*/
public boolean schemaExists(Client voltClient) {
final String testString = "Test";
boolean schemaExists = false;
try {
ClientResponse response = voltClient.callProcedure("Get", testString.getBytes(UTF8), testString);
if (response.getStatus() == ClientResponse.SUCCESS) {
// YCSB Database exists...
schemaExists = true;
} else {
// If we'd connected to a copy of VoltDB without the schema and tried to call Get
// we'd have got a ProcCallException
logger.error("Error while calling schemaExists(): " + response.getStatusString());
schemaExists = false;
}
} catch (ProcCallException pce) {
schemaExists = false;
// Sanity check: Make sure we've got the *right* ProcCallException...
if (!pce.getMessage().equals(PROCEDURE_GET_WAS_NOT_FOUND)) {
logger.error("Got unexpected Exception while calling schemaExists()", pce);
}
} catch (Exception e) {
logger.error("Error while creating classes.", e);
schemaExists = false;
}
return schemaExists;
}
/**
* Load classes and DDL required by YCSB.
*
* @throws Exception
*/
public synchronized void loadClassesAndDDLIfNeeded(Client voltClient) throws Exception {
if (schemaExists(voltClient)) {
return;
}
File tempDir = Files.createTempDirectory("voltdbYCSB").toFile();
if (!tempDir.canWrite()) {
throw new Exception("Temp Directory (from Files.createTempDirectory()) '"
+ tempDir.getAbsolutePath() + "' is not writable");
}
ClientResponse cr;
for (int i = 0; i < ddlStatements.length; i++) {
try {
cr = voltClient.callProcedure("@AdHoc", ddlStatements[i]);
if (cr.getStatus() != ClientResponse.SUCCESS) {
throw new Exception("Attempt to execute '" + ddlStatements[i] + "' failed:" + cr.getStatusString());
}
logger.info(ddlStatements[i]);
} catch (Exception e) {
if (e.getMessage().indexOf("object name already exists") > -1) {
// Someone else has done this...
return;
}
throw (e);
}
}
logger.info("Creating JAR file in " + tempDir + File.separator + jarFileName);
Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
JarOutputStream newJarFile = new JarOutputStream(new FileOutputStream(tempDir + File.separator + jarFileName),
manifest);
for (int i = 0; i < jarFiles.length; i++) {
InputStream is = getClass().getResourceAsStream("/com/yahoo/ycsb/db/voltdb/procs/" + jarFiles[i]);
add("com/yahoo/ycsb/db/voltdb/procs/" + jarFiles[i], is, newJarFile);
}
newJarFile.close();
File file = new File(tempDir + File.separator + jarFileName);
byte[] jarFileContents = new byte[(int) file.length()];
FileInputStream fis = new FileInputStream(file);
fis.read(jarFileContents);
fis.close();
logger.info("Calling @UpdateClasses to load JAR file containing procedures");
cr = voltClient.callProcedure("@UpdateClasses", jarFileContents, null);
if (cr.getStatus() != ClientResponse.SUCCESS) {
throw new Exception("Attempt to execute UpdateClasses failed:" + cr.getStatusString());
}
for (int i = 0; i < procStatements.length; i++) {
logger.info(procStatements[i]);
cr = voltClient.callProcedure("@AdHoc", procStatements[i]);
if (cr.getStatus() != ClientResponse.SUCCESS) {
throw new Exception("Attempt to execute '" + procStatements[i] + "' failed:" + cr.getStatusString());
}
}
}
/**
* Add an entry to our JAR file.
*
* @param fileName
* @param source
* @param target
* @throws IOException
*/
private void add(String fileName, InputStream source, JarOutputStream target) throws IOException {
BufferedInputStream in = null;
try {
JarEntry entry = new JarEntry(fileName.replace("\\", "/"));
entry.setTime(System.currentTimeMillis());
target.putNextEntry(entry);
in = new BufferedInputStream(source);
byte[] buffer = new byte[1024];
while (true) {
int count = in.read(buffer);
if (count == -1) {
break;
}
target.write(buffer, 0, count);
}
target.closeEntry();
} finally {
if (in != null) {
in.close();
}
}
}
}

Просмотреть файл

@ -0,0 +1,21 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* VoltDB integration with YCSB.
*
*/
package com.yahoo.ycsb.db.voltdb;

Просмотреть файл

@ -0,0 +1,66 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.voltdb.procs;
/**
* Utility class to map data structures used by YCSB to a VoltDB VARBINARY column.
*/
class ByteWrapper {
private byte[] marr;
private int moff;
private int mlen;
ByteWrapper(byte[] arr, int off, int len) {
marr = arr;
moff = off;
mlen = len;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof ByteWrapper)) {
return false;
}
ByteWrapper that = (ByteWrapper) obj;
if (this.mlen != that.mlen) {
return false;
}
for (int i = 0; i < this.mlen; i++) {
if (this.marr[this.moff + i] != that.marr[that.moff + i]) {
return false;
}
}
return true;
}
@Override
public int hashCode() {
if (this.marr == null) {
return 0;
}
int res = 1;
for (int i = 0; i < mlen; i++) {
res = 31 * res + marr[moff + i];
}
return res;
}
}

Просмотреть файл

@ -0,0 +1,103 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.voltdb.procs;
import java.nio.ByteBuffer;
import java.util.HashSet;
import org.voltdb.SQLStmt;
import org.voltdb.VoltProcedure;
import org.voltdb.VoltTable;
/**
*
* Update a value in STORE.
*
*/
public class Put extends VoltProcedure {
private final SQLStmt selectStmt = new SQLStmt("SELECT value FROM Store WHERE keyspace = ? AND key = ?");
private final SQLStmt insertStmt = new SQLStmt("INSERT INTO Store VALUES (?, ?, ?)");
private final SQLStmt updateStmt = new SQLStmt("UPDATE Store SET value = ? WHERE keyspace = ? AND key = ?");
public long run(byte[] keyspace, String key, byte[] data) {
voltQueueSQL(selectStmt, keyspace, key);
VoltTable res = voltExecuteSQL()[0];
if (res.advanceRow()) {
voltQueueSQL(updateStmt, merge(res.getVarbinary(0), data), keyspace, key);
} else {
voltQueueSQL(insertStmt, keyspace, key, data);
}
voltExecuteSQL(true);
return 0L;
}
private byte[] merge(byte[] dest, byte[] src) {
HashSet<ByteWrapper> mergeSet = new HashSet<ByteWrapper>();
ByteBuffer buf = ByteBuffer.wrap(src);
int nSrc = buf.getInt();
for (int i = 0; i < nSrc; i++) {
int len = buf.getInt();
int off = buf.position();
mergeSet.add(new ByteWrapper(src, off, len));
buf.position(off + len);
len = buf.getInt();
buf.position(buf.position() + len);
}
byte[] merged = new byte[src.length + dest.length];
ByteBuffer out = ByteBuffer.wrap(merged);
buf = ByteBuffer.wrap(dest);
int nDest = buf.getInt();
int nFields = nSrc + nDest;
out.putInt(nFields);
int blockStart = 4;
int blockEnd = 4;
for (int i = 0; i < nDest; i++) {
int len = buf.getInt();
int off = buf.position();
boolean flushBlock = mergeSet.contains(new ByteWrapper(dest, off, len));
buf.position(off + len);
len = buf.getInt();
buf.position(buf.position() + len);
if (flushBlock) {
if (blockStart < blockEnd) {
out.put(dest, blockStart, blockEnd - blockStart);
}
nFields--;
blockStart = buf.position();
}
blockEnd = buf.position();
}
if (blockStart < blockEnd) {
out.put(dest, blockStart, blockEnd - blockStart);
}
out.put(src, 4, src.length - 4);
int length = out.position();
if (nFields != nSrc + nDest) {
out.position(0);
out.putInt(nFields);
}
byte[] res = new byte[length];
System.arraycopy(merged, 0, res, 0, length);
return res;
}
}

Просмотреть файл

@ -0,0 +1,44 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.voltdb.procs;
import org.voltdb.SQLStmt;
import org.voltdb.VoltProcedure;
import org.voltdb.VoltTable;
/**
*
* Query STORE using a single partition query.
*
*/
public class Scan extends VoltProcedure {
private final SQLStmt getBddStmt = new SQLStmt(
"SELECT value, key FROM Store WHERE keyspace = ? AND key >= ? ORDER BY key, keyspace LIMIT ?");
private final SQLStmt getUnbddStmt = new SQLStmt(
"SELECT value, key FROM Store WHERE keyspace = ? ORDER BY key, keyspace LIMIT ?");
public VoltTable[] run(String partKey, byte[] keyspace, byte[] rangeMin, int count) throws Exception {
if (rangeMin != null) {
voltQueueSQL(getBddStmt, keyspace, new String(rangeMin, "UTF-8"), count);
} else {
voltQueueSQL(getUnbddStmt, keyspace, count);
}
return voltExecuteSQL(true);
}
}

Просмотреть файл

@ -0,0 +1,44 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.voltdb.procs;
import org.voltdb.SQLStmt;
import org.voltdb.VoltProcedure;
import org.voltdb.VoltTable;
/**
* Query STORE using a multi partition query..
*
*/
public class ScanAll extends VoltProcedure {
private final SQLStmt getBddStmt = new SQLStmt(
"SELECT value, key FROM Store WHERE keyspace = ? AND key >= ? ORDER BY key, keyspace LIMIT ?");
private final SQLStmt getUnbddStmt = new SQLStmt(
"SELECT value, key FROM Store WHERE keyspace = ? ORDER BY key, keyspace LIMIT ?");
public VoltTable[] run(byte[] keyspace, byte[] rangeMin, int count) throws Exception {
if (rangeMin != null) {
voltQueueSQL(getBddStmt, keyspace, new String(rangeMin, "UTF-8"), count);
} else {
voltQueueSQL(getUnbddStmt, keyspace, count);
}
return voltExecuteSQL(true);
}
}

Просмотреть файл

@ -0,0 +1,25 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* VoltDB com.yahoo.ycsb.db.voltdb.procs for Put, Scan and ScanAll.
* Other com.yahoo.ycsb.db.voltdb.procs are defined using DDL.
*
* ByteWrapper is a utility class, not a procedure.
*/
package com.yahoo.ycsb.db.voltdb.procs;

Просмотреть файл

@ -0,0 +1,29 @@
package com.yahoo.ycsb.db.voltdb.sortedvolttable;
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
*/
@SuppressWarnings("serial")
public class ClientResponseIsBadException extends Exception {
public ClientResponseIsBadException(String string) {
super(string);
}
}

Просмотреть файл

@ -0,0 +1,29 @@
package com.yahoo.ycsb.db.voltdb.sortedvolttable;
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
*/
@SuppressWarnings("serial")
public class IncomingVoltTablesNeedToBeSortedException extends Exception {
public IncomingVoltTablesNeedToBeSortedException(String string) {
super(string);
}
}

Просмотреть файл

@ -0,0 +1,32 @@
package com.yahoo.ycsb.db.voltdb.sortedvolttable;
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
*/
@SuppressWarnings("serial")
public class NeedsToBeComparableException extends Exception {
/**
* @param string
*/
public NeedsToBeComparableException(String string) {
super(string);
}
}

Просмотреть файл

@ -0,0 +1,205 @@
package com.yahoo.ycsb.db.voltdb.sortedvolttable;
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
import org.voltdb.VoltTable;
import org.voltdb.VoltType;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientResponseWithPartitionKey;
/**
* VoltDBTableSortedMergeWrangler allows you to merge an array of VoltTable
* provided by callAllPartitionProcedure.
*
* The intended use case is for when you need to issue a multi partition query
* but would prefer not to, as you don't need perfect read consistency and would
* rather get the individual VoltDB partitions to issue the query independently
* and then somehow merge the results.
*
*/
public class VoltDBTableSortedMergeWrangler {
private ClientResponseWithPartitionKey[] theTables = null;
@SuppressWarnings("rawtypes")
private Comparable whatWeSelectedLastTime = null;
public VoltDBTableSortedMergeWrangler(ClientResponseWithPartitionKey[] response) {
super();
this.theTables = response;
}
/**
* Takes 'theTables' and merges them based on column 'columnId'. We assume that
* column 'columnId' in each element of 'theTables' is correctly sorted within
* itself.
*
* @param columnid
* @param limit How many rows we want
* @return A new VoltTable.
* @throws NeedsToBeComparableException - if column columnId doesn't implement Comparable.
* @throws IncomingVoltTablesNeedToBeSortedException - incoming data isn't already sorted.
* @throws ClientResponseIsBadException - The procedure worked but is complaining.
*/
public VoltTable getSortedTable(int columnid, int limit)
throws NeedsToBeComparableException, IncomingVoltTablesNeedToBeSortedException, ClientResponseIsBadException {
whatWeSelectedLastTime = null;
// Create an empty output table
VoltTable outputTable = new VoltTable(theTables[0].response.getResults()[0].getTableSchema());
// make sure our input tables are usable, and ready to be read from the
// start
for (int i = 0; i < theTables.length; i++) {
VoltTable currentTable = theTables[i].response.getResults()[0];
if (theTables[i].response.getStatus() != ClientResponse.SUCCESS) {
throw new ClientResponseIsBadException(i + " " + theTables[i].response.getStatusString());
}
currentTable.resetRowPosition();
currentTable.advanceRow();
}
// Find table with lowest value for columnId, which is supposed to be
// the sort key.
int lowestId = getLowestId(columnid);
// Loop until we run out of data or get 'limit' rows.
while (lowestId > -1 && outputTable.getRowCount() < limit) {
// having identified the lowest Table pull that row, add it to
// the output table, and then call 'advanceRow' so we can do this
// again...
VoltTable lowestTable = theTables[lowestId].response.getResults()[0];
outputTable.add(lowestTable.cloneRow());
lowestTable.advanceRow();
// Find table with lowest value for columnId
lowestId = getLowestId(columnid);
}
return outputTable;
}
/**
* This routine looks at column 'columnId' in an array of VoltTable and
* identifies which one is lowest. Note that as we call 'advanceRow' elsewhere
* this will change.
*
* @param columnid
* @return the VoltTable with the lowest value for column 'columnId'. or -1 if
* we've exhausted all the VoltTables.
* @throws NeedsToBeComparableException
* @throws IncomingVoltTablesNeedToBeSortedException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private int getLowestId(int columnid) throws NeedsToBeComparableException, IncomingVoltTablesNeedToBeSortedException {
int lowestId = -1;
Comparable lowestObservedValue = null;
for (int i = 0; i < theTables.length; i++) {
VoltTable currentTable = theTables[i].response.getResults()[0];
int activeRowIndex = currentTable.getActiveRowIndex();
int rowCount = currentTable.getRowCount();
if (activeRowIndex > -1 && activeRowIndex < rowCount) {
if (lowestObservedValue == null) {
lowestId = i;
lowestObservedValue = getComparable(currentTable, columnid);
} else {
Comparable newObservedValue = getComparable(currentTable, columnid);
if (newObservedValue.compareTo(lowestObservedValue) <= 0) {
lowestId = i;
lowestObservedValue = getComparable(currentTable, columnid);
}
}
}
}
// If we found something make sure that the data in columnid was sorted
// properly when it was retrieved.
if (lowestId > -1) {
Comparable latestItemWeSelected = getComparable(theTables[lowestId].response.getResults()[0], columnid);
if (whatWeSelectedLastTime != null && latestItemWeSelected.compareTo(whatWeSelectedLastTime) < 0) {
throw new IncomingVoltTablesNeedToBeSortedException(
"Latest Item '" + latestItemWeSelected + "' is before last item '" + whatWeSelectedLastTime + "'");
}
whatWeSelectedLastTime = latestItemWeSelected;
}
return lowestId;
}
/**
* Get the value we're working with as a Comparable.
*
* @param theTable
* @param columnId
* @return a Comparable.
* @throws NeedsToBeComparableException
*/
@SuppressWarnings("rawtypes")
private Comparable getComparable(VoltTable theTable, int columnId) throws NeedsToBeComparableException {
Comparable c = null;
VoltType vt = theTable.getColumnType(columnId);
Object theValue = theTable.get(columnId, vt);
if (theValue instanceof Comparable) {
c = (Comparable) theValue;
} else {
throw new NeedsToBeComparableException(
theValue + ": Only Comparables are supported by VoltDBTableSortedMergeWrangler");
}
return c;
}
/**
* Do a comparison of byte arrays. Not used right now, but will be when we added
* support for VARBINARY.
*
* @param left
* @param right
* @return whether 'left' is <, >, or = 'right'
*/
private int compare(byte[] left, byte[] right) {
for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
int a = (left[i] & 0xff);
int b = (right[j] & 0xff);
if (a != b) {
return a - b;
}
}
return left.length - right.length;
}
}

Просмотреть файл

@ -0,0 +1,28 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
*
* VoltDBTableSortedMergeWrangler allows you to merge an array of VoltTable
* provided by callAllPartitionProcedure.
*
* The intended use case is for when you need to issue a multi partition query
* but would prefer not to, as you don't need perfect read consistency and would
* rather get the individual VoltDB partitions to issue the query independently
* and then somehow merge the results.
*
*/
package com.yahoo.ycsb.db.voltdb.sortedvolttable;

Просмотреть файл

@ -0,0 +1,455 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.voltdb.test;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeNoException;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.db.voltdb.ConnectionHelper;
import com.yahoo.ycsb.db.voltdb.VoltClient4;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
import java.util.Properties;
/**
* Test harness for YCSB / VoltDB. Note that not much happens if VoltDB isn't
* visible.
*
*/
public class VoltDBClientTest {
private static final String TABLE_NAME = "USERTABLE";
private static final int FIELD_LENGTH = 32;
private static final String FIELD_PREFIX = "FIELD";
private static final int NUM_FIELDS = 3;
private static final String INSERT_TEST_KEY = "InsertReadTest";
private static final String INSERT_DELETE_AND_READ_TEST_KEY = "InsertDeleteReadTest";
private static final String UPDATE_TEST_KEY = "UpdateTest";
private static final String NON_EXISTENT_KEY = "NonExistTest";
private static final String SCAN_KEY_PREFIX = "ScanKey_";
private static final int SCAN_RECORD_COUNT = 5000;
private static final String[] TEST_DATA_KEYS = { INSERT_TEST_KEY, INSERT_DELETE_AND_READ_TEST_KEY, UPDATE_TEST_KEY };
private static VoltClient4 voltClient = null;
private static boolean haveDb = false;
@BeforeClass
public static void setup() {
Properties p = new Properties();
String servers = p.getProperty("voltdb.servers", "localhost");
String user = p.getProperty("voltdb.user", "");
String password = p.getProperty("voltdb.password", "");
String strLimit = p.getProperty("voltdb.ratelimit", "70000");
p.setProperty("voltdb.servers", servers);
p.setProperty("voltdb.user", user);
p.setProperty("voltdb.password", password);
p.setProperty("voltdb.ratelimit", strLimit);
try {
voltClient = new VoltClient4();
voltClient.setProperties(p);
if (ConnectionHelper.checkDBServers(servers)) {
voltClient.init();
haveDb = true;
removeExistingData();
}
} catch (Exception e) {
// The call to checkDBServers above looks for activity on
// the ip and port we expect VoltDB to be on. If we get to this
// line it's because 'something' is running on localhost:21212,
// but whatever it is, it isn't a happy copy of VoltDB.
assumeNoException("Something was running on VoltDB's port but it wasn't a usable copy of VoltDB", e);
}
}
private static void removeExistingData() {
try {
for (int i = 0; i < TEST_DATA_KEYS.length; i++) {
voltClient.delete(TABLE_NAME, TEST_DATA_KEYS[i]);
}
for (int i = 0; i < SCAN_RECORD_COUNT; i++) {
voltClient.delete(TABLE_NAME, SCAN_KEY_PREFIX + i);
}
} catch (Exception e) {
Logger logger = LoggerFactory.getLogger(VoltDBClientTest.class);
logger.error("Error while calling 'removeExistingData()'", e);
fail("Failed removeExistingData");
}
}
@AfterClass
public static void teardown() {
try {
if (voltClient != null && haveDb) {
removeExistingData();
voltClient.cleanup();
}
} catch (DBException e) {
e.printStackTrace();
}
}
@Before
public void prepareTest() {
}
private boolean compareContents(HashMap<String, ByteIterator> inMsg, Map<String, ByteIterator> outMsg) {
if (inMsg == null) {
return false;
}
if (outMsg == null) {
return false;
}
if (inMsg.size() != outMsg.size()) {
return false;
}
@SuppressWarnings("rawtypes")
Iterator it = inMsg.entrySet().iterator();
while (it.hasNext()) {
@SuppressWarnings("rawtypes")
Map.Entry pair = (Map.Entry) it.next();
String key = (String) pair.getKey();
ByteIterator inPayload = inMsg.get(key);
inPayload.reset();
ByteIterator outPayload = outMsg.get(key);
outPayload.reset();
if (inPayload.bytesLeft() != outPayload.bytesLeft()) {
return false;
}
while (inPayload.hasNext()) {
byte inByte = inPayload.nextByte();
byte outByte = outPayload.nextByte();
if (inByte != outByte) {
return false;
}
}
it.remove();
}
return true;
}
@Test
public void insertAndReadTest() {
Assume.assumeTrue(haveDb);
try {
// Create some test data
final String insertKey = INSERT_TEST_KEY;
final Set<String> columns = getColumnNameMap();
// Insert row
HashMap<String, ByteIterator> insertMap = new HashMap<String, ByteIterator>();
for (int i = 0; i < NUM_FIELDS; i++) {
insertMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i)));
}
voltClient.insert(TABLE_NAME, insertKey, insertMap);
// Create a object to put retrieved row in...
Map<String, ByteIterator> testResult = new HashMap<String, ByteIterator>();
// Read row...
Status s = voltClient.read(TABLE_NAME, insertKey, columns, testResult);
if (!s.equals(Status.OK)) {
fail("Didn't get OK on read.");
}
if (!compareContents(insertMap, testResult)) {
fail("Returned data not the same as inserted data");
}
} catch (Exception e) {
e.printStackTrace();
fail("Failed insertTest");
}
}
@Test
public void insertDeleteAndReadTest() {
Assume.assumeTrue(haveDb);
try {
// Create some test data
final String insertKey = INSERT_DELETE_AND_READ_TEST_KEY;
final Set<String> columns = getColumnNameMap();
// Insert row
HashMap<String, ByteIterator> insertMap = new HashMap<String, ByteIterator>();
for (int i = 0; i < NUM_FIELDS; i++) {
insertMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i)));
}
voltClient.insert(TABLE_NAME, insertKey, insertMap);
// Create a object to put retrieved row in...
Map<String, ByteIterator> testResult = new HashMap<String, ByteIterator>();
// Read row...
Status s = voltClient.read(TABLE_NAME, insertKey, columns, testResult);
if (!s.equals(Status.OK)) {
fail("Didn't get OK on read.");
}
if (!compareContents(insertMap, testResult)) {
fail("Returned data not the same as inserted data");
}
voltClient.delete(TABLE_NAME, insertKey);
// Create another object to put retrieved row in...
Map<String, ByteIterator> testResultAfterDelete = new HashMap<String, ByteIterator>();
// Read row...
voltClient.read(TABLE_NAME, insertKey, columns, testResultAfterDelete);
if (testResultAfterDelete.size() > 0) {
fail("testResultAfterDelete has value.");
}
} catch (Exception e) {
e.printStackTrace();
fail("Failed insertDeleteAndReadTest");
}
}
@Test
public void deleteNonExistentRecordTest() {
Assume.assumeTrue(haveDb);
try {
// Create some test data
final String insertKey = NON_EXISTENT_KEY;
final Set<String> columns = getColumnNameMap();
// Create a object to put retrieved row in...
Map<String, ByteIterator> testResult = new HashMap<String, ByteIterator>();
// Read row...
voltClient.read(TABLE_NAME, insertKey, columns, testResult);
if (testResult.size() > 0) {
fail("testResult.size() > 0.");
}
} catch (Exception e) {
e.printStackTrace();
fail("Failed deleteNonExistentRecordTest");
}
}
@Test
public void scanReadTest() {
Assume.assumeTrue(haveDb);
try {
for (int z = 0; z < SCAN_RECORD_COUNT; z++) {
// Create some test data
final String insertKey = SCAN_KEY_PREFIX + z;
// Insert row
HashMap<String, ByteIterator> insertMap = new HashMap<String, ByteIterator>();
for (int i = 0; i < NUM_FIELDS; i++) {
insertMap.put(FIELD_PREFIX + i, new StringByteIterator("Data for " + SCAN_KEY_PREFIX + z + " element " + i));
}
voltClient.insert(TABLE_NAME, insertKey, insertMap);
}
final String firstInsertKey = SCAN_KEY_PREFIX + 0;
final String lastInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT - 1);
final String beyondLastInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT + 1);
final String oneHundredFromEndInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT - 101);
final String fiftyFromEndInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT - 101);
// test non existent records
singleScanReadTest(NON_EXISTENT_KEY, 1000, 0, NON_EXISTENT_KEY);
// test single record
singleScanReadTest(firstInsertKey, 1, 1, firstInsertKey);
// test scan of SCAN_RECORD_COUNT records
singleScanReadTest(firstInsertKey, SCAN_RECORD_COUNT, SCAN_RECORD_COUNT, lastInsertKey);
// test single record in middle
singleScanReadTest(oneHundredFromEndInsertKey, 1, 1, oneHundredFromEndInsertKey);
// test request of 100 starting 50 from end.
singleScanReadTest(fiftyFromEndInsertKey, 100, 50, lastInsertKey);
// test request of 100 starting beyond the end
singleScanReadTest(beyondLastInsertKey, 100, 0, lastInsertKey);
} catch (Exception e) {
e.printStackTrace();
fail("Failed scanReadTest");
}
}
private void singleScanReadTest(String startKey, int requestedCount, int expectedCount, String lastKey) {
Assume.assumeTrue(haveDb);
try {
final Set<String> columns = getColumnNameMap();
// Create a object to put retrieved row in...
Vector<HashMap<String, ByteIterator>> testResult = new Vector<HashMap<String, ByteIterator>>();
// Read row...
Status s = voltClient.scan(TABLE_NAME, startKey, expectedCount, columns, testResult);
if (!s.equals(Status.OK)) {
fail("Didn't get OK on read.");
}
if (testResult.size() != expectedCount) {
fail("Failed singleScanReadTest " + startKey + " " + expectedCount + " " + lastKey);
}
} catch (Exception e) {
e.printStackTrace();
fail("Failed singleScanReadTest " + startKey + ". Asked for " + requestedCount + ", expected " + expectedCount
+ " lastkey=" + lastKey);
}
}
@Test
public void updateTest() {
Assume.assumeTrue(haveDb);
try {
// Create some test data
final String insertKey = UPDATE_TEST_KEY;
// Insert row
// Insert row
HashMap<String, ByteIterator> insertThenUpdateMap = new HashMap<String, ByteIterator>();
for (int i = 0; i < NUM_FIELDS; i++) {
insertThenUpdateMap.put(FIELD_PREFIX + i,
new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i)));
}
voltClient.insert(TABLE_NAME, insertKey, insertThenUpdateMap);
// Change the data we inserted...
for (int i = 0; i < NUM_FIELDS; i++) {
insertThenUpdateMap.put(FIELD_PREFIX + i, new StringByteIterator(FIELD_PREFIX + i + " has changed"));
}
// now do an update
voltClient.update(TABLE_NAME, insertKey, insertThenUpdateMap);
// Create a object to put retrieved row in...
final Set<String> columns = getColumnNameMap();
Map<String, ByteIterator> testResult = new HashMap<String, ByteIterator>();
// Read row...
Status s = voltClient.read(TABLE_NAME, insertKey, columns, testResult);
if (!s.equals(Status.OK)) {
fail("Didn't get OK on read.");
}
if (!compareContents(insertThenUpdateMap, testResult)) {
fail("Returned data not the same as inserted data");
}
} catch (Exception e) {
e.printStackTrace();
fail("Failed updateTest");
}
}
/**
* @return
*/
private Set<String> getColumnNameMap() {
Set<String> columns = new HashSet<String>();
for (int i = 0; i < NUM_FIELDS; i++) {
columns.add(FIELD_PREFIX + i);
}
return columns;
}
/*
* This is a copy of buildDeterministicValue() from
* core:com.yahoo.ycsb.workloads.CoreWorkload.java. That method is neither
* public nor static so we need a copy.
*/
private String buildDeterministicValue(String key, String fieldkey) {
int size = FIELD_LENGTH;
StringBuilder sb = new StringBuilder(size);
sb.append(key);
sb.append(':');
sb.append(fieldkey);
while (sb.length() < size) {
sb.append(':');
sb.append(sb.toString().hashCode());
}
sb.setLength(size);
return sb.toString();
}
}

Просмотреть файл

@ -0,0 +1,22 @@
/**
* Copyright (c) 2015-2019 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* YCSB binding for VoltDB.
*/
package com.yahoo.ycsb.db.voltdb.test;