зеркало из https://github.com/Azure/YCSB.git
Accumulo db driver for ycsb.
Based on YCSB++ http://www.pdl.cmu.edu/ycsb++/
This commit is contained in:
Родитель
4791826f0b
Коммит
ff4342d86c
|
@ -0,0 +1,96 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>root</artifactId>
|
||||
<version>0.1.4</version>
|
||||
</parent>
|
||||
<artifactId>accumulo-binding</artifactId>
|
||||
<name>Accumulo DB Binding</name>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.accumulo</groupId>
|
||||
<artifactId>accumulo-core</artifactId>
|
||||
<version>${accumulo.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>thrift</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>3.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<version>1.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.8.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-core</artifactId>
|
||||
<version>0.20.203.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>5.1.14</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>r08</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>${maven.assembly.version}</version>
|
||||
<configuration>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
<appendAssemblyId>false</appendAssemblyId>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>apache</id>
|
||||
<url>http://repository.apache.org/snapshots</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
</project>
|
|
@ -0,0 +1,437 @@
|
|||
package com.yahoo.ycsb.db;
|
||||
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Hashtable;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.Vector;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.accumulo.core.Constants;
|
||||
import org.apache.accumulo.core.client.AccumuloException;
|
||||
import org.apache.accumulo.core.client.AccumuloSecurityException;
|
||||
import org.apache.accumulo.core.client.Connector;
|
||||
import org.apache.accumulo.core.client.MutationsRejectedException;
|
||||
import org.apache.accumulo.core.client.TableNotFoundException;
|
||||
import org.apache.accumulo.core.client.ZooKeeperInstance;
|
||||
import org.apache.accumulo.core.client.Instance;
|
||||
import org.apache.accumulo.core.client.BatchWriter;
|
||||
import org.apache.accumulo.core.client.Scanner;
|
||||
import org.apache.accumulo.core.data.Key;
|
||||
import org.apache.accumulo.core.data.Mutation;
|
||||
import org.apache.accumulo.core.data.Range;
|
||||
import org.apache.accumulo.core.data.Value;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.yahoo.ycsb.DB;
|
||||
import com.yahoo.ycsb.DBException;
|
||||
import com.yahoo.ycsb.ByteArrayByteIterator;
|
||||
|
||||
public class AccumuloClient extends DB {
|
||||
// Error code constants.
|
||||
public static final int Ok = 0;
|
||||
public static final int ServerError = -1;
|
||||
public static final int HttpError = -2;
|
||||
public static final int NoMatchingRecord = -3;
|
||||
|
||||
private Connector _connector;
|
||||
private String _table = "";
|
||||
private BatchWriter _bw = null;
|
||||
private Text _colFam = new Text("");
|
||||
private Scanner _singleScanner = null; // A scanner for reads/deletes.
|
||||
private Scanner _scanScanner = null; // A scanner for use by scan()
|
||||
|
||||
private static final String PC_PRODUCER = "producer";
|
||||
private static final String PC_CONSUMER = "consumer";
|
||||
private String _PC_FLAG = "";
|
||||
private ZKProducerConsumer.Queue q = null;
|
||||
private static Hashtable<String,Long> hmKeyReads = null;
|
||||
private static Hashtable<String,Integer> hmKeyNumReads = null;
|
||||
private Random r = null;
|
||||
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
_colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
|
||||
|
||||
Instance inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"),
|
||||
getProperties().getProperty("accumulo.zooKeepers"));
|
||||
try {
|
||||
_connector = inst.getConnector(getProperties().getProperty("accumulo.username"),
|
||||
getProperties().getProperty("accumulo.password"));
|
||||
} catch (AccumuloException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (AccumuloSecurityException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
_PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none");
|
||||
if (_PC_FLAG.equals(PC_PRODUCER) || _PC_FLAG.equals(PC_CONSUMER)) {
|
||||
System.out.println("*** YCSB Client is "+_PC_FLAG);
|
||||
String address = getProperties().getProperty("accumulo.PC_SERVER");
|
||||
String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
|
||||
System.out.println("*** PC_INFO(server:"+address+";root="+root+")");
|
||||
q = new ZKProducerConsumer.Queue(address, root);
|
||||
r = new Random();
|
||||
}
|
||||
|
||||
if (_PC_FLAG.equals(PC_CONSUMER)) {
|
||||
hmKeyReads = new Hashtable<String,Long>();
|
||||
hmKeyNumReads = new Hashtable<String,Integer>();
|
||||
keyNotification(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void cleanup() throws DBException
|
||||
{
|
||||
try {
|
||||
if (_bw != null) {
|
||||
try {
|
||||
//Thread.sleep(60000);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
_bw.close();
|
||||
}
|
||||
} catch (MutationsRejectedException e) {
|
||||
throw new DBException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Commonly repeated functionality: Before doing any operation, make sure
|
||||
* we're working on the correct table. If not, open the correct one.
|
||||
*
|
||||
* @param table
|
||||
*/
|
||||
public void checkTable(String table) throws TableNotFoundException {
|
||||
if (!_table.equals(table)) {
|
||||
getTable(table);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the user specifies a table that isn't the same as the
|
||||
* existing table. Connect to it and if necessary, close our current
|
||||
* connection.
|
||||
*
|
||||
* @param table
|
||||
*/
|
||||
public void getTable(String table) throws TableNotFoundException {
|
||||
if (_bw != null) { // Close the existing writer if necessary.
|
||||
try {
|
||||
_bw.close();
|
||||
} catch (MutationsRejectedException e) {
|
||||
// Couldn't spit out the mutations we wanted.
|
||||
// Ignore this for now.
|
||||
}
|
||||
}
|
||||
|
||||
long bwSize = Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000"));
|
||||
long bwMaxLatency = Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000"));
|
||||
_bw = _connector.createBatchWriter(table, bwSize, bwMaxLatency, 1);
|
||||
// Create our scanners
|
||||
_singleScanner = _connector.createScanner(table, Constants.NO_AUTHS);
|
||||
_scanScanner = _connector.createScanner(table, Constants.NO_AUTHS);
|
||||
|
||||
_table = table; // Store the name of the table we have open.
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a scanner from Accumulo over one row
|
||||
*
|
||||
* @param row
|
||||
* @return
|
||||
*/
|
||||
private Scanner getRow(Text row, Set<String> fields)
|
||||
{
|
||||
_singleScanner.clearColumns();
|
||||
_singleScanner.setRange(new Range(row));
|
||||
if (fields != null) {
|
||||
for(String field:fields)
|
||||
{
|
||||
_singleScanner.fetchColumn(_colFam, new Text(field));
|
||||
}
|
||||
}
|
||||
return _singleScanner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(String table, String key, Set<String> fields,
|
||||
HashMap<String, ByteIterator> result) {
|
||||
|
||||
try {
|
||||
checkTable(table);
|
||||
} catch (TableNotFoundException e) {
|
||||
System.err.println("Error trying to connect to Accumulo table." + e);
|
||||
return ServerError;
|
||||
}
|
||||
|
||||
try {
|
||||
// Pick out the results we care about.
|
||||
for (Entry<Key, Value> entry : getRow(new Text(key), null)) {
|
||||
Value v = entry.getValue();
|
||||
byte[] buf = new byte[v.getSize()];
|
||||
v.copy(buf);
|
||||
result.put(entry.getKey().getColumnQualifier().toString(),
|
||||
new ByteArrayByteIterator(buf));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error trying to reading Accumulo table" + key + e);
|
||||
return ServerError;
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int scan(String table, String startkey, int recordcount,
|
||||
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
|
||||
try {
|
||||
checkTable(table);
|
||||
} catch (TableNotFoundException e) {
|
||||
System.err.println("Error trying to connect to Accumulo table." + e);
|
||||
return ServerError;
|
||||
}
|
||||
|
||||
// There doesn't appear to be a way to create a range for a given
|
||||
// LENGTH. Just start and end keys. So we'll do this the hard way for now:
|
||||
// Just make the end 'infinity' and only read as much as we need.
|
||||
_scanScanner.clearColumns();
|
||||
_scanScanner.setRange(new Range(new Text(startkey), null));
|
||||
|
||||
// Batch size is how many key/values to try to get per call. Here, I'm
|
||||
// guessing that the number of keys in a row is equal to the number of fields
|
||||
// we're interested in.
|
||||
// We try to fetch one more so as to tell when we've run out of fields.
|
||||
|
||||
if (fields != null) {
|
||||
// And add each of them as fields we want.
|
||||
for(String field:fields)
|
||||
{
|
||||
_scanScanner.fetchColumn(_colFam, new Text(field));
|
||||
}
|
||||
} else {
|
||||
// If no fields are provided, we assume one column/row.
|
||||
}
|
||||
|
||||
String rowKey = "";
|
||||
HashMap<String, ByteIterator> currentHM = null;
|
||||
int count = 0;
|
||||
|
||||
// Begin the iteration.
|
||||
for (Entry<Key, Value> entry : _scanScanner) {
|
||||
// Check for a new row.
|
||||
if (!rowKey.equals(entry.getKey().getRow().toString())) {
|
||||
if (count++ == recordcount) { // Done reading the last row.
|
||||
break;
|
||||
}
|
||||
rowKey = entry.getKey().getRow().toString();
|
||||
if (fields != null) {
|
||||
// Initial Capacity for all keys.
|
||||
currentHM = new HashMap<String, ByteIterator>(fields.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
// An empty result map.
|
||||
currentHM = new HashMap<String, ByteIterator>();
|
||||
}
|
||||
result.add(currentHM);
|
||||
}
|
||||
// Now add the key to the hashmap.
|
||||
Value v = entry.getValue();
|
||||
byte[] buf = new byte[v.getSize()];
|
||||
currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf));
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int update(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
try {
|
||||
checkTable(table);
|
||||
} catch (TableNotFoundException e) {
|
||||
System.err.println("Error trying to connect to Accumulo table." + e);
|
||||
return ServerError;
|
||||
}
|
||||
|
||||
Mutation mutInsert = new Mutation(new Text(key));
|
||||
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
|
||||
mutInsert.put(_colFam, new Text(entry.getKey()), System
|
||||
.currentTimeMillis(),
|
||||
new Value(entry.getValue().toArray()));
|
||||
}
|
||||
|
||||
try {
|
||||
_bw.addMutation(mutInsert);
|
||||
// Distributed YCSB co-ordination: YCSB on a client produces the key to
|
||||
// be stored in the shared queue in ZooKeeper.
|
||||
if (_PC_FLAG.equals(PC_PRODUCER)) {
|
||||
if (r.nextFloat() < 0.01)
|
||||
keyNotification(key);
|
||||
}
|
||||
} catch (MutationsRejectedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int insert(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
return update(table, key, values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int delete(String table, String key) {
|
||||
try {
|
||||
checkTable(table);
|
||||
} catch (TableNotFoundException e) {
|
||||
System.err.println("Error trying to connect to Accumulo table." + e);
|
||||
return ServerError;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
deleteRow(new Text(key));
|
||||
} catch (SecurityException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (TableNotFoundException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// These functions are adapted from RowOperations.java:
|
||||
private void deleteRow(Text row) throws TableNotFoundException {
|
||||
deleteRow(getRow(row, null));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Deletes a row, given a Scanner of JUST that row
|
||||
*
|
||||
*/
|
||||
private void deleteRow(Scanner scanner) {
|
||||
Mutation deleter = null;
|
||||
// iterate through the keys
|
||||
for (Entry<Key,Value> entry : scanner) {
|
||||
// create a mutation for the row
|
||||
if (deleter == null)
|
||||
deleter = new Mutation(entry.getKey().getRow());
|
||||
// the remove function adds the key with the delete flag set to true
|
||||
deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
|
||||
}
|
||||
try {
|
||||
_bw.addMutation(deleter);
|
||||
} catch (MutationsRejectedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void keyNotification(String key) {
|
||||
|
||||
if (_PC_FLAG.equals(PC_PRODUCER)) {
|
||||
try {
|
||||
q.produce(key);
|
||||
} catch (KeeperException e) {
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
} else {
|
||||
//XXX: do something better to keep the loop going (while??)
|
||||
for (int i = 0; i < 10000000; i++) {
|
||||
try {
|
||||
String strKey = q.consume();
|
||||
|
||||
if ((hmKeyReads.containsKey(strKey) == false) &&
|
||||
(hmKeyNumReads.containsKey(strKey) == false)) {
|
||||
hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
|
||||
hmKeyNumReads.put(strKey, new Integer(1));
|
||||
}
|
||||
|
||||
//YCSB Consumer will read the key that was fetched from the
|
||||
//queue in ZooKeeper.
|
||||
//(current way is kind of ugly but works, i think)
|
||||
//TODO : Get table name from configuration or argument
|
||||
String table = "usertable";
|
||||
HashSet<String> fields = new HashSet<String>();
|
||||
for (int j=0; j<9; j++)
|
||||
fields.add("field"+j);
|
||||
HashMap<String,ByteIterator> result = new HashMap<String,ByteIterator>();
|
||||
|
||||
int retval = read(table, strKey, fields, result);
|
||||
//If the results are empty, the key is enqueued in Zookeeper
|
||||
//and tried again, until the results are found.
|
||||
if (result.size() == 0) {
|
||||
q.produce(strKey);
|
||||
int count = ((Integer)hmKeyNumReads.get(strKey)).intValue();
|
||||
hmKeyNumReads.put(strKey, new Integer(count+1));
|
||||
}
|
||||
else {
|
||||
if (((Integer)hmKeyNumReads.get(strKey)).intValue() > 1) {
|
||||
long currTime = System.currentTimeMillis();
|
||||
long writeTime = ((Long)hmKeyReads.get(strKey)).longValue();
|
||||
System.out.println("Key="+strKey+
|
||||
//";StartSearch="+writeTime+
|
||||
//";EndSearch="+currTime+
|
||||
";TimeLag="+(currTime-writeTime));
|
||||
}
|
||||
}
|
||||
|
||||
} catch (KeeperException e) {
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public int presplit(String table, String[] keys)
|
||||
{
|
||||
TreeSet<Text> splits = new TreeSet<Text>();
|
||||
for (int i = 0;i < keys.length; i ++)
|
||||
{
|
||||
splits.add(new Text(keys[i]));
|
||||
}
|
||||
try {
|
||||
_connector.tableOperations().addSplits(table, splits);
|
||||
} catch (TableNotFoundException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (AccumuloException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (AccumuloSecurityException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package com.yahoo.ycsb.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
// Implementing the PC Queue in ZooKeeper
|
||||
//
|
||||
public class ZKProducerConsumer implements Watcher {
|
||||
|
||||
static ZooKeeper zk = null;
|
||||
static Integer mutex;
|
||||
|
||||
String root;
|
||||
|
||||
// Constructor that takes tha address of the ZK server
|
||||
//
|
||||
ZKProducerConsumer(String address) {
|
||||
if(zk == null){
|
||||
try {
|
||||
System.out.println("Starting ZK:");
|
||||
zk = new ZooKeeper(address, 3000, this);
|
||||
mutex = new Integer(-1);
|
||||
System.out.println("Finished starting ZK: " + zk);
|
||||
} catch (IOException e) {
|
||||
System.out.println(e.toString());
|
||||
zk = null;
|
||||
}
|
||||
}
|
||||
//else mutex = new Integer(-1);
|
||||
}
|
||||
|
||||
synchronized public void process(WatchedEvent event) {
|
||||
synchronized (mutex) {
|
||||
//System.out.println("Process: " + event.getType());
|
||||
mutex.notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static public class QueueElement {
|
||||
public String key;
|
||||
public long writeTime;
|
||||
|
||||
QueueElement(String key, long writeTime) {
|
||||
this.key = key;
|
||||
this.writeTime = writeTime;
|
||||
}
|
||||
}
|
||||
|
||||
// Producer-Consumer queue
|
||||
static public class Queue extends ZKProducerConsumer {
|
||||
|
||||
// Constructor of producer-consumer queue
|
||||
Queue(String address, String name) {
|
||||
super(address);
|
||||
this.root = name;
|
||||
// Create ZK node name
|
||||
if (zk != null) {
|
||||
try {
|
||||
Stat s = zk.exists(root, false);
|
||||
if (s == null) {
|
||||
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
System.out
|
||||
.println("Keeper exception when instantiating queue: "
|
||||
+ e.toString());
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println("Interrupted exception");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Producer calls this method to insert the key in the queue
|
||||
//
|
||||
boolean produce(String key) throws KeeperException, InterruptedException{
|
||||
byte[] value;
|
||||
value = key.getBytes();
|
||||
zk.create(root + "/key", value,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Consumer calls this method to "wait" for the key to the available
|
||||
//
|
||||
String consume() throws KeeperException, InterruptedException {
|
||||
String retvalue = null;
|
||||
Stat stat = null;
|
||||
|
||||
// Get the first element available
|
||||
while (true) {
|
||||
synchronized (mutex) {
|
||||
List<String> list = zk.getChildren(root, true);
|
||||
if (list.size() == 0) {
|
||||
System.out.println("Going to wait");
|
||||
mutex.wait();
|
||||
} else {
|
||||
String path = root+"/"+list.get(0);
|
||||
byte[] b = zk.getData(path, false, stat);
|
||||
retvalue = new String(b);
|
||||
zk.delete(path, -1);
|
||||
|
||||
return retvalue;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
1
bin/ycsb
1
bin/ycsb
|
@ -24,6 +24,7 @@ COMMANDS = {
|
|||
}
|
||||
|
||||
DATABASES = {
|
||||
"accumulo" : "com.yahoo.ycsb.db.AccumuloClient",
|
||||
"basic" : "com.yahoo.ycsb.BasicDB",
|
||||
"cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7",
|
||||
"cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8",
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -45,6 +45,7 @@
|
|||
<properties>
|
||||
<maven.assembly.version>2.2.1</maven.assembly.version>
|
||||
<hbase.version>0.92.1</hbase.version>
|
||||
<accumulo.version>1.4.3</accumulo.version>
|
||||
<cassandra.version>0.7.0</cassandra.version>
|
||||
<infinispan.version>7.1.0.CR1</infinispan.version>
|
||||
<openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>
|
||||
|
@ -64,6 +65,7 @@
|
|||
<module>core</module>
|
||||
<module>hbase</module>
|
||||
<module>hypertable</module>
|
||||
<module>accumulo</module>
|
||||
<module>dynamodb</module>
|
||||
<module>elasticsearch</module>
|
||||
<!--<module>gemfire</module>-->
|
||||
|
|
Загрузка…
Ссылка в новой задаче