[cassandra2] Add Cassandra 2 CQL client

This commit is contained in:
Connor McCoy 2015-10-15 09:23:18 -07:00
Родитель 4551273e5e
Коммит 09dc27a289
4 изменённых файлов: 485 добавлений и 0 удалений

Просмотреть файл

@ -51,6 +51,7 @@ DATABASES = {
"cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8",
"cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10",
"cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient",
"dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient",
"elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient",

49
cassandra2/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012-2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.5.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>cassandra2-binding</artifactId>
<name>Cassandra 2.1+ DB Binding</name>
<packaging>jar</packaging>
<dependencies>
<!-- CQL driver -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra2.cql.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,433 @@
/**
* Copyright (c) 2013-2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License. See accompanying LICENSE file.
*
* Submitted by Chrisjan Matser on 10/11/2010.
*/
package com.yahoo.ycsb.db;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cassandra 2.x CQL client.
*
* See {@code cassandra2/README.md} for details.
*
* @author cmatser
*/
public class CassandraCQLClient extends DB {
protected static Cluster cluster = null;
protected static Session session = null;
private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE;
private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE;
public static final int OK = 0;
public static final int ERR = -1;
public static final int NOT_FOUND = -3;
public static final String YCSB_KEY = "y_id";
public static final String KEYSPACE_PROPERTY = "cassandra.keyspace";
public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String HOSTS_PROPERTY = "hosts";
public static final String PORT_PROPERTY = "port";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
/** Count the number of times initialized to teardown on the last {@link #cleanup()}. */
private static final AtomicInteger initCount = new AtomicInteger(0);
private static boolean _debug = false;
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public void init() throws DBException {
//Keep track of number of calls to init (for later cleanup)
initCount.incrementAndGet();
//Synchronized so that we only have a single
// cluster/session instance for all the threads.
synchronized (initCount) {
//Check if the cluster has already been initialized
if (cluster != null) {
return;
}
try {
_debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String host = getProperties().getProperty(HOSTS_PROPERTY);
if (host == null) {
throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY));
}
String hosts[] = host.split(",");
String port = getProperties().getProperty("port", "9042");
if (port == null) {
throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY));
}
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT);
readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
// public void connect(String node) {}
if ((username != null) && !username.isEmpty()) {
cluster = Cluster.builder()
.withCredentials(username, password)
.withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
}
else {
cluster = Cluster.builder()
.withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
}
//Update number of connections based on threads
int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1"));
cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount);
//Set connection timeout 3min (default is 5s)
cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000);
//Set read (execute) timeout 3min (default is 12s)
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000);
Metadata metadata = cluster.getMetadata();
System.err.printf("Connected to cluster: %s\n", metadata.getClusterName());
for (Host discoveredHost : metadata.getAllHosts()) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
discoveredHost.getDatacenter(),
discoveredHost.getAddress(),
discoveredHost.getRack());
}
session = cluster.connect(keyspace);
} catch (Exception e) {
throw new DBException(e);
}
}//synchronized
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void cleanup() throws DBException {
synchronized(initCount) {
final int curInitCount = initCount.decrementAndGet();
if (curInitCount <= 0) {
session.close();
cluster.close();
cluster = null;
session = null;
}
if (curInitCount < 0) {
// This should never happen.
throw new DBException(
String.format("initCount is negative: %d", curInitCount));
}
}
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
@Override
public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
}
else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1);
stmt.setConsistencyLevel(readConsistencyLevel);
if (_debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
if (rs.isExhausted()) {
return NOT_FOUND;
}
//Should be only 1 row
Row row = rs.one();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
result.put(def.getName(),
new ByteArrayByteIterator(val.array()));
}
else {
result.put(def.getName(), null);
}
}
return OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error reading key: " + key);
return ERR;
}
}
/**
* Perform a range scan for a set of records in the database. Each
* field/value pair from the result will be stored in a HashMap.
*
* Cassandra CQL uses "token" method for range scan which doesn't always
* yield intuitive results.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set
* field/value pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
}
else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
stmt = selectBuilder.from(table);
//The statement builder is not setup right for tokens.
// So, we need to build it manually.
String initialStmt = stmt.toString();
StringBuilder scanStmt = new StringBuilder();
scanStmt.append(
initialStmt.substring(0, initialStmt.length()-1));
scanStmt.append(" WHERE ");
scanStmt.append(QueryBuilder.token(YCSB_KEY));
scanStmt.append(" >= ");
scanStmt.append("token('");
scanStmt.append(startkey);
scanStmt.append("')");
scanStmt.append(" LIMIT ");
scanStmt.append(recordcount);
stmt = new SimpleStatement(scanStmt.toString());
stmt.setConsistencyLevel(readConsistencyLevel);
if (_debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
HashMap<String, ByteIterator> tuple;
while (!rs.isExhausted()) {
Row row = rs.one();
tuple = new HashMap<String, ByteIterator> ();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
tuple.put(def.getName(),
new ByteArrayByteIterator(val.array()));
}
else {
tuple.put(def.getName(), null);
}
}
result.add(tuple);
}
return OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error scanning with startkey: " + startkey);
return ERR;
}
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public int update(String table, String key, HashMap<String, ByteIterator> values) {
//Insert and updates provide the same functionality
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public int insert(String table, String key, HashMap<String, ByteIterator> values) {
try {
Insert insertStmt = QueryBuilder.insertInto(table);
//Add key
insertStmt.value(YCSB_KEY, key);
//Add fields
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
Object value;
ByteIterator byteIterator = entry.getValue();
value = byteIterator.toString();
insertStmt.value(entry.getKey(), value);
}
insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing();
if (_debug) {
System.out.println(insertStmt.toString());
}
ResultSet rs = session.execute(insertStmt);
return OK;
} catch (Exception e) {
e.printStackTrace();
}
return ERR;
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public int delete(String table, String key) {
try {
Statement stmt;
stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key));
stmt.setConsistencyLevel(writeConsistencyLevel);
if (_debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
return OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error deleting key: " + key);
}
return ERR;
}
}

Просмотреть файл

@ -75,6 +75,7 @@ LICENSE file.
<accumulo.version>1.6.0</accumulo.version> <accumulo.version>1.6.0</accumulo.version>
<cassandra.version>1.2.9</cassandra.version> <cassandra.version>1.2.9</cassandra.version>
<cassandra.cql.version>1.0.3</cassandra.cql.version> <cassandra.cql.version>1.0.3</cassandra.cql.version>
<cassandra2.cql.version>2.1.8</cassandra2.cql.version>
<gemfire.version>8.1.0</gemfire.version> <gemfire.version>8.1.0</gemfire.version>
<infinispan.version>7.2.2.Final</infinispan.version> <infinispan.version>7.2.2.Final</infinispan.version>
<kudu.version>0.5.0</kudu.version> <kudu.version>0.5.0</kudu.version>
@ -101,6 +102,7 @@ LICENSE file.
<module>accumulo</module> <module>accumulo</module>
<module>aerospike</module> <module>aerospike</module>
<module>cassandra</module> <module>cassandra</module>
<module>cassandra2</module>
<module>couchbase</module> <module>couchbase</module>
<module>distribution</module> <module>distribution</module>
<module>dynamodb</module> <module>dynamodb</module>