зеркало из https://github.com/Azure/YCSB.git
[asynchbase] Add AsyncHBase as an alternative to the default HBase client.
Note the Guava classes in the test directory for AsyncHBase. These are there as AsyncHBase uses Guava >= 18 while HBase and the mini cluster use Guava 12.
This commit is contained in:
Родитель
4e37e502d0
Коммит
b6ae89901e
|
@ -0,0 +1,59 @@
|
|||
<!--
|
||||
Copyright (c) 2016 YCSB contributors. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You
|
||||
may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied. See the License for the specific language governing
|
||||
permissions and limitations under the License. See accompanying
|
||||
LICENSE file.
|
||||
-->
|
||||
|
||||
# AsyncHBase Driver for YCSB
|
||||
|
||||
This driver provides a YCSB workload binding for Apache HBase using an alternative to the included HBase client. AsyncHBase is completely asynchronous for all operations and is particularly useful for write heavy workloads. Note that it supports a subset of the HBase client APIs but supports all public released versions of HBase.
|
||||
|
||||
## Quickstart
|
||||
|
||||
### 1. Setup Hbase
|
||||
|
||||
Follow directions 1 to 3 from ``hbase098``'s readme.
|
||||
|
||||
### 2. Load a Workload
|
||||
|
||||
Switch to the root of the YCSB repo and choose the workload you want to run and `load` it first. With the CLI you must provide the column family at a minimum if HBase is running on localhost. Otherwise you must provide connection properties via CLI or the path to a config file. Additional configuration parameters are available below.
|
||||
|
||||
```
|
||||
bin/ycsb load asynchbase -p columnfamily=cf -P workloads/workloada
|
||||
|
||||
```
|
||||
|
||||
The `load` step only executes inserts into the datastore. After loading data, run the same workload to mix reads with writes.
|
||||
|
||||
```
|
||||
bin/ycsb run asynchbase -p columnfamily=cf -P workloads/workloada
|
||||
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
The following options can be configured using CLI (using the `-p` parameter) or via a JAVA style properties configuration file.. Check the [AsyncHBase Configuration](http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html) project for additional tuning parameters.
|
||||
|
||||
* `columnfamily`: (Required) The column family to target.
|
||||
* `config`: Optional full path to a configuration file with AsyncHBase options.
|
||||
* `hbase.zookeeper.quorum`: Zookeeper quorum list.
|
||||
* `hbase.zookeeper.znode.parent`: Path used by HBase in Zookeeper. Default is "/hbase".
|
||||
* `debug`: If true, prints debug information to standard out. The default is false.
|
||||
* `clientbuffering`: Whether or not to use client side buffering and batching of write operations. This can significantly improve performance and defaults to true.
|
||||
* `durable`: When set to false, writes and deletes bypass the WAL for quicker responses. Default is true.
|
||||
* `jointimeout`: A timeout value, in milliseconds, for waiting on operations synchronously before an error is thrown.
|
||||
* `prefetchmeta`: Whether or not to read meta for all regions in the table and connect to the proper region servers before starting operations. Defaults to false.
|
||||
|
||||
|
||||
Note: This module includes some Google Guava source files from version 12 that were later removed but are still required by HBase's test modules for setting up the mini cluster during integration testing.
|
|
@ -0,0 +1,77 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright (c) 2016 YCSB contributors. All rights reserved.
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You
|
||||
may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied. See the License for the specific language governing
|
||||
permissions and limitations under the License. See accompanying
|
||||
LICENSE file.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>binding-parent</artifactId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<relativePath>../binding-parent/</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>asynchbase-binding</artifactId>
|
||||
<name>AsyncHBase Client Binding for Apache HBase</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.hbase</groupId>
|
||||
<artifactId>asynchbase</artifactId>
|
||||
<version>${asynchbase.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-testing-util</artifactId>
|
||||
<version>${hbase10.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<version>${hbase10.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<version>1.2.17</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<version>1.7.7</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,409 @@
|
|||
/**
|
||||
* Copyright (c) 2016 YCSB contributors. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
* may not use this file except in compliance with the License. You
|
||||
* may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License. See accompanying
|
||||
* LICENSE file.
|
||||
*/
|
||||
package com.yahoo.ycsb.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.Vector;
|
||||
|
||||
import org.hbase.async.Bytes;
|
||||
import org.hbase.async.Config;
|
||||
import org.hbase.async.DeleteRequest;
|
||||
import org.hbase.async.GetRequest;
|
||||
import org.hbase.async.HBaseClient;
|
||||
import org.hbase.async.KeyValue;
|
||||
import org.hbase.async.PutRequest;
|
||||
import org.hbase.async.Scanner;
|
||||
|
||||
import com.yahoo.ycsb.ByteArrayByteIterator;
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.DBException;
|
||||
import com.yahoo.ycsb.Status;
|
||||
|
||||
/**
|
||||
* Alternative Java client for Apache HBase.
|
||||
*
|
||||
* This client provides a subset of the main HBase client and uses a completely
|
||||
* asynchronous pipeline for all calls. It is particularly useful for write heavy
|
||||
* workloads. It is also compatible with all production versions of HBase.
|
||||
*/
|
||||
public class AsyncHBaseClient extends com.yahoo.ycsb.DB {
|
||||
public static final Charset UTF8_CHARSET = Charset.forName("UTF8");
|
||||
private static final String CLIENT_SIDE_BUFFERING_PROPERTY = "clientbuffering";
|
||||
private static final String DURABILITY_PROPERTY = "durability";
|
||||
private static final String PREFETCH_META_PROPERTY = "prefetchmeta";
|
||||
private static final String CONFIG_PROPERTY = "config";
|
||||
private static final String COLUMN_FAMILY_PROPERTY = "columnfamily";
|
||||
private static final String JOIN_TIMEOUT_PROPERTY = "jointimeout";
|
||||
private static final String JOIN_TIMEOUT_PROPERTY_DEFAULT = "30000";
|
||||
|
||||
/** Mutex for instantiating a single instance of the client. */
|
||||
private static final Object MUTEX = new Object();
|
||||
|
||||
/** Use for tracking running thread counts so we know when to shutdown the client. */
|
||||
private static int threadCount = 0;
|
||||
|
||||
/** The client that's used for all threads. */
|
||||
private static HBaseClient client;
|
||||
|
||||
/** Print debug information to standard out. */
|
||||
private boolean debug = false;
|
||||
|
||||
/** The column family use for the workload. */
|
||||
private byte[] columnFamilyBytes;
|
||||
|
||||
/** Cache for the last table name/ID to avoid byte conversions. */
|
||||
private String lastTable = "";
|
||||
private byte[] lastTableBytes;
|
||||
|
||||
private long joinTimeout;
|
||||
|
||||
/** Whether or not to bypass the WAL for puts and deletes. */
|
||||
private boolean durability = true;
|
||||
|
||||
/**
|
||||
* If true, buffer mutations on the client. This is the default behavior for
|
||||
* AsyncHBase. For measuring insert/update/delete latencies, client side
|
||||
* buffering should be disabled.
|
||||
*
|
||||
* A single instance of this
|
||||
*/
|
||||
private boolean clientSideBuffering = false;
|
||||
|
||||
@Override
|
||||
public void init() throws DBException {
|
||||
if (getProperties().getProperty(CLIENT_SIDE_BUFFERING_PROPERTY, "false")
|
||||
.toLowerCase().equals("true")) {
|
||||
clientSideBuffering = true;
|
||||
}
|
||||
if (getProperties().getProperty(DURABILITY_PROPERTY, "true")
|
||||
.toLowerCase().equals("false")) {
|
||||
durability = false;
|
||||
}
|
||||
final String columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY);
|
||||
if (columnFamily == null || columnFamily.isEmpty()) {
|
||||
System.err.println("Error, must specify a columnfamily for HBase table");
|
||||
throw new DBException("No columnfamily specified");
|
||||
}
|
||||
columnFamilyBytes = columnFamily.getBytes();
|
||||
|
||||
if ((getProperties().getProperty("debug") != null)
|
||||
&& (getProperties().getProperty("debug").compareTo("true") == 0)) {
|
||||
debug = true;
|
||||
}
|
||||
|
||||
joinTimeout = Integer.parseInt(getProperties().getProperty(
|
||||
JOIN_TIMEOUT_PROPERTY, JOIN_TIMEOUT_PROPERTY_DEFAULT));
|
||||
|
||||
final boolean prefetchMeta = getProperties()
|
||||
.getProperty(PREFETCH_META_PROPERTY, "false")
|
||||
.toLowerCase().equals("true") ? true : false;
|
||||
try {
|
||||
synchronized (MUTEX) {
|
||||
++threadCount;
|
||||
if (client == null) {
|
||||
final String configPath = getProperties().getProperty(CONFIG_PROPERTY);
|
||||
final Config config;
|
||||
if (configPath == null || configPath.isEmpty()) {
|
||||
config = new Config();
|
||||
final Iterator<Entry<Object, Object>> iterator = getProperties()
|
||||
.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
final Entry<Object, Object> property = iterator.next();
|
||||
config.overrideConfig((String)property.getKey(),
|
||||
(String)property.getValue());
|
||||
}
|
||||
} else {
|
||||
config = new Config(configPath);
|
||||
}
|
||||
client = new HBaseClient(config);
|
||||
|
||||
// Terminate right now if table does not exist, since the client
|
||||
// will not propagate this error upstream once the workload
|
||||
// starts.
|
||||
String table = com.yahoo.ycsb.workloads.CoreWorkload.table;
|
||||
try {
|
||||
client.ensureTableExists(table).join(joinTimeout);
|
||||
} catch (InterruptedException e1) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
throw new DBException(e);
|
||||
}
|
||||
|
||||
if (prefetchMeta) {
|
||||
try {
|
||||
if (debug) {
|
||||
System.out.println("Starting meta prefetch for table " + table);
|
||||
}
|
||||
client.prefetchMeta(table).join(joinTimeout);
|
||||
if (debug) {
|
||||
System.out.println("Completed meta prefetch for table " + table);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
System.err.println("Interrupted during prefetch");
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
throw new DBException("Failed prefetch", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new DBException("Failed instantiation of client", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() throws DBException {
|
||||
synchronized (MUTEX) {
|
||||
--threadCount;
|
||||
if (client != null && threadCount < 1) {
|
||||
try {
|
||||
if (debug) {
|
||||
System.out.println("Shutting down client");
|
||||
}
|
||||
client.shutdown().joinUninterruptibly(joinTimeout);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Failed to shutdown the AsyncHBase client "
|
||||
+ "properly: " + e.getMessage());
|
||||
}
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status read(String table, String key, Set<String> fields,
|
||||
HashMap<String, ByteIterator> result) {
|
||||
setTable(table);
|
||||
|
||||
final GetRequest get = new GetRequest(
|
||||
lastTableBytes, key.getBytes(), columnFamilyBytes);
|
||||
if (fields != null) {
|
||||
get.qualifiers(getQualifierList(fields));
|
||||
}
|
||||
|
||||
try {
|
||||
if (debug) {
|
||||
System.out.println("Doing read from HBase columnfamily " +
|
||||
Bytes.pretty(columnFamilyBytes));
|
||||
System.out.println("Doing read for key: " + key);
|
||||
}
|
||||
|
||||
final ArrayList<KeyValue> row = client.get(get).join(joinTimeout);
|
||||
if (row == null || row.isEmpty()) {
|
||||
return Status.NOT_FOUND;
|
||||
}
|
||||
|
||||
// got something so populate the results
|
||||
for (final KeyValue column : row) {
|
||||
result.put(new String(column.qualifier()),
|
||||
// TODO - do we need to clone this array? YCSB may keep it in memory
|
||||
// for a while which would mean the entire KV would hang out and won't
|
||||
// be GC'd.
|
||||
new ByteArrayByteIterator(column.value()));
|
||||
|
||||
if (debug) {
|
||||
System.out.println(
|
||||
"Result for field: " + Bytes.pretty(column.qualifier())
|
||||
+ " is: " + Bytes.pretty(column.value()));
|
||||
}
|
||||
}
|
||||
return Status.OK;
|
||||
} catch (InterruptedException e) {
|
||||
System.err.println("Thread interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Failure reading from row with key " + key +
|
||||
": " + e.getMessage());
|
||||
return Status.ERROR;
|
||||
}
|
||||
return Status.ERROR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status scan(String table, String startkey, int recordcount,
|
||||
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
|
||||
setTable(table);
|
||||
|
||||
final Scanner scanner = client.newScanner(lastTableBytes);
|
||||
scanner.setFamily(columnFamilyBytes);
|
||||
scanner.setStartKey(startkey.getBytes(UTF8_CHARSET));
|
||||
// No end key... *sniff*
|
||||
if (fields != null) {
|
||||
scanner.setQualifiers(getQualifierList(fields));
|
||||
}
|
||||
|
||||
// no filters? *sniff*
|
||||
ArrayList<ArrayList<KeyValue>> rows = null;
|
||||
try {
|
||||
int numResults = 0;
|
||||
while ((rows = scanner.nextRows().join(joinTimeout)) != null) {
|
||||
for (final ArrayList<KeyValue> row : rows) {
|
||||
final HashMap<String, ByteIterator> rowResult =
|
||||
new HashMap<String, ByteIterator>(row.size());
|
||||
for (final KeyValue column : row) {
|
||||
rowResult.put(new String(column.qualifier()),
|
||||
// TODO - do we need to clone this array? YCSB may keep it in memory
|
||||
// for a while which would mean the entire KV would hang out and won't
|
||||
// be GC'd.
|
||||
new ByteArrayByteIterator(column.value()));
|
||||
if (debug) {
|
||||
System.out.println("Got scan result for key: " +
|
||||
Bytes.pretty(column.key()));
|
||||
}
|
||||
}
|
||||
result.add(rowResult);
|
||||
numResults++;
|
||||
|
||||
if (numResults >= recordcount) {// if hit recordcount, bail out
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
scanner.close().join(joinTimeout);
|
||||
return Status.OK;
|
||||
} catch (InterruptedException e) {
|
||||
System.err.println("Thread interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Failure reading from row with key " + startkey +
|
||||
": " + e.getMessage());
|
||||
return Status.ERROR;
|
||||
}
|
||||
|
||||
return Status.ERROR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status update(String table, String key,
|
||||
HashMap<String, ByteIterator> values) {
|
||||
setTable(table);
|
||||
|
||||
if (debug) {
|
||||
System.out.println("Setting up put for key: " + key);
|
||||
}
|
||||
|
||||
final byte[][] qualifiers = new byte[values.size()][];
|
||||
final byte[][] byteValues = new byte[values.size()][];
|
||||
|
||||
int idx = 0;
|
||||
for (final Entry<String, ByteIterator> entry : values.entrySet()) {
|
||||
qualifiers[idx] = entry.getKey().getBytes();
|
||||
byteValues[idx++] = entry.getValue().toArray();
|
||||
if (debug) {
|
||||
System.out.println("Adding field/value " + entry.getKey() + "/"
|
||||
+ Bytes.pretty(entry.getValue().toArray()) + " to put request");
|
||||
}
|
||||
}
|
||||
|
||||
final PutRequest put = new PutRequest(lastTableBytes, key.getBytes(),
|
||||
columnFamilyBytes, qualifiers, byteValues);
|
||||
if (!durability) {
|
||||
put.setDurable(false);
|
||||
}
|
||||
if (!clientSideBuffering) {
|
||||
put.setBufferable(false);
|
||||
try {
|
||||
client.put(put).join(joinTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
System.err.println("Thread interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Failure reading from row with key " + key +
|
||||
": " + e.getMessage());
|
||||
return Status.ERROR;
|
||||
}
|
||||
} else {
|
||||
// hooray! Asynchronous write. But without a callback and an async
|
||||
// YCSB call we don't know whether it succeeded or not
|
||||
client.put(put);
|
||||
}
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status insert(String table, String key,
|
||||
HashMap<String, ByteIterator> values) {
|
||||
return update(table, key, values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status delete(String table, String key) {
|
||||
setTable(table);
|
||||
|
||||
if (debug) {
|
||||
System.out.println("Doing delete for key: " + key);
|
||||
}
|
||||
|
||||
final DeleteRequest delete = new DeleteRequest(
|
||||
lastTableBytes, key.getBytes(), columnFamilyBytes);
|
||||
if (!durability) {
|
||||
delete.setDurable(false);
|
||||
}
|
||||
if (!clientSideBuffering) {
|
||||
delete.setBufferable(false);
|
||||
try {
|
||||
client.delete(delete).join(joinTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
System.err.println("Thread interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Failure reading from row with key " + key +
|
||||
": " + e.getMessage());
|
||||
return Status.ERROR;
|
||||
}
|
||||
} else {
|
||||
// hooray! Asynchronous write. But without a callback and an async
|
||||
// YCSB call we don't know whether it succeeded or not
|
||||
client.delete(delete);
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
/**
|
||||
* Little helper to set the table byte array. If it's different than the last
|
||||
* table we reset the byte array. Otherwise we just use the existing array.
|
||||
* @param table The table we're operating against
|
||||
*/
|
||||
private void setTable(final String table) {
|
||||
if (!lastTable.equals(table)) {
|
||||
lastTable = table;
|
||||
lastTableBytes = table.getBytes();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Little helper to build a qualifier byte array from a field set.
|
||||
* @param fields The fields to fetch.
|
||||
* @return The column qualifier byte arrays.
|
||||
*/
|
||||
private byte[][] getQualifierList(final Set<String> fields) {
|
||||
final byte[][] qualifiers = new byte[fields.size()][];
|
||||
int idx = 0;
|
||||
for (final String field : fields) {
|
||||
qualifiers[idx++] = field.getBytes();
|
||||
}
|
||||
return qualifiers;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Copyright (c) 2016 YCSB contributors. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
* may not use this file except in compliance with the License. You
|
||||
* may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License. See accompanying
|
||||
* LICENSE file.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The YCSB binding for HBase using the AsyncHBase client.
|
||||
*/
|
||||
package com.yahoo.ycsb.db;
|
|
@ -0,0 +1,278 @@
|
|||
/*
|
||||
* Copyright (C) 2008 The Guava Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.google.common.base;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static java.util.concurrent.TimeUnit.MICROSECONDS;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.annotations.GwtCompatible;
|
||||
import com.google.common.annotations.GwtIncompatible;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* An object that measures elapsed time in nanoseconds. It is useful to measure
|
||||
* elapsed time using this class instead of direct calls to {@link
|
||||
* System#nanoTime} for a few reasons:
|
||||
*
|
||||
* <ul>
|
||||
* <li>An alternate time source can be substituted, for testing or performance
|
||||
* reasons.
|
||||
* <li>As documented by {@code nanoTime}, the value returned has no absolute
|
||||
* meaning, and can only be interpreted as relative to another timestamp
|
||||
* returned by {@code nanoTime} at a different time. {@code Stopwatch} is a
|
||||
* more effective abstraction because it exposes only these relative values,
|
||||
* not the absolute ones.
|
||||
* </ul>
|
||||
*
|
||||
* <p>Basic usage:
|
||||
* <pre>
|
||||
* Stopwatch stopwatch = Stopwatch.{@link #createStarted createStarted}();
|
||||
* doSomething();
|
||||
* stopwatch.{@link #stop stop}(); // optional
|
||||
*
|
||||
* long millis = stopwatch.elapsed(MILLISECONDS);
|
||||
*
|
||||
* log.info("that took: " + stopwatch); // formatted string like "12.3 ms"
|
||||
* </pre>
|
||||
*
|
||||
* <p>Stopwatch methods are not idempotent; it is an error to start or stop a
|
||||
* stopwatch that is already in the desired state.
|
||||
*
|
||||
* <p>When testing code that uses this class, use the {@linkplain
|
||||
* #Stopwatch(Ticker) alternate constructor} to supply a fake or mock ticker.
|
||||
* <!-- TODO(kevinb): restore the "such as" --> This allows you to
|
||||
* simulate any valid behavior of the stopwatch.
|
||||
*
|
||||
* <p><b>Note:</b> This class is not thread-safe.
|
||||
*
|
||||
* @author Kevin Bourrillion
|
||||
* @since 10.0
|
||||
*/
|
||||
@Beta
|
||||
@GwtCompatible(emulated = true)
|
||||
public final class Stopwatch {
|
||||
private final Ticker ticker;
|
||||
private boolean isRunning;
|
||||
private long elapsedNanos;
|
||||
private long startTick;
|
||||
|
||||
/**
|
||||
* Creates (but does not start) a new stopwatch using {@link System#nanoTime}
|
||||
* as its time source.
|
||||
*
|
||||
* @since 15.0
|
||||
*/
|
||||
public static Stopwatch createUnstarted() {
|
||||
return new Stopwatch();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates (but does not start) a new stopwatch, using the specified time
|
||||
* source.
|
||||
*
|
||||
* @since 15.0
|
||||
*/
|
||||
public static Stopwatch createUnstarted(Ticker ticker) {
|
||||
return new Stopwatch(ticker);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates (and starts) a new stopwatch using {@link System#nanoTime}
|
||||
* as its time source.
|
||||
*
|
||||
* @since 15.0
|
||||
*/
|
||||
public static Stopwatch createStarted() {
|
||||
return new Stopwatch().start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates (and starts) a new stopwatch, using the specified time
|
||||
* source.
|
||||
*
|
||||
* @since 15.0
|
||||
*/
|
||||
public static Stopwatch createStarted(Ticker ticker) {
|
||||
return new Stopwatch(ticker).start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates (but does not start) a new stopwatch using {@link System#nanoTime}
|
||||
* as its time source.
|
||||
*
|
||||
* @deprecated Use {@link Stopwatch#createUnstarted()} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public Stopwatch() {
|
||||
this(Ticker.systemTicker());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates (but does not start) a new stopwatch, using the specified time
|
||||
* source.
|
||||
*
|
||||
* @deprecated Use {@link Stopwatch#createUnstarted(Ticker)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public Stopwatch(Ticker ticker) {
|
||||
this.ticker = checkNotNull(ticker, "ticker");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if {@link #start()} has been called on this stopwatch,
|
||||
* and {@link #stop()} has not been called since the last call to {@code
|
||||
* start()}.
|
||||
*/
|
||||
public boolean isRunning() {
|
||||
return isRunning;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the stopwatch.
|
||||
*
|
||||
* @return this {@code Stopwatch} instance
|
||||
* @throws IllegalStateException if the stopwatch is already running.
|
||||
*/
|
||||
public Stopwatch start() {
|
||||
checkState(!isRunning, "This stopwatch is already running.");
|
||||
isRunning = true;
|
||||
startTick = ticker.read();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the stopwatch. Future reads will return the fixed duration that had
|
||||
* elapsed up to this point.
|
||||
*
|
||||
* @return this {@code Stopwatch} instance
|
||||
* @throws IllegalStateException if the stopwatch is already stopped.
|
||||
*/
|
||||
public Stopwatch stop() {
|
||||
long tick = ticker.read();
|
||||
checkState(isRunning, "This stopwatch is already stopped.");
|
||||
isRunning = false;
|
||||
elapsedNanos += tick - startTick;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the elapsed time for this stopwatch to zero,
|
||||
* and places it in a stopped state.
|
||||
*
|
||||
* @return this {@code Stopwatch} instance
|
||||
*/
|
||||
public Stopwatch reset() {
|
||||
elapsedNanos = 0;
|
||||
isRunning = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
private long elapsedNanos() {
|
||||
return isRunning ? ticker.read() - startTick + elapsedNanos : elapsedNanos;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current elapsed time shown on this stopwatch, expressed
|
||||
* in the desired time unit, with any fraction rounded down.
|
||||
*
|
||||
* <p>Note that the overhead of measurement can be more than a microsecond, so
|
||||
* it is generally not useful to specify {@link TimeUnit#NANOSECONDS}
|
||||
* precision here.
|
||||
*
|
||||
* @since 14.0 (since 10.0 as {@code elapsedTime()})
|
||||
*/
|
||||
public long elapsed(TimeUnit desiredUnit) {
|
||||
return desiredUnit.convert(elapsedNanos(), NANOSECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current elapsed time shown on this stopwatch, expressed
|
||||
* in the desired time unit, with any fraction rounded down.
|
||||
*
|
||||
* <p>Note that the overhead of measurement can be more than a microsecond, so
|
||||
* it is generally not useful to specify {@link TimeUnit#NANOSECONDS}
|
||||
* precision here.
|
||||
*
|
||||
* @deprecated Use {@link Stopwatch#elapsed(TimeUnit)} instead. This method is
|
||||
* scheduled to be removed in Guava release 16.0.
|
||||
*/
|
||||
@Deprecated
|
||||
public long elapsedTime(TimeUnit desiredUnit) {
|
||||
return elapsed(desiredUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current elapsed time shown on this stopwatch, expressed
|
||||
* in milliseconds, with any fraction rounded down. This is identical to
|
||||
* {@code elapsed(TimeUnit.MILLISECONDS)}.
|
||||
*
|
||||
* @deprecated Use {@code stopwatch.elapsed(MILLISECONDS)} instead. This
|
||||
* method is scheduled to be removed in Guava release 16.0.
|
||||
*/
|
||||
@Deprecated
|
||||
public long elapsedMillis() {
|
||||
return elapsed(MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a string representation of the current elapsed time.
|
||||
*/
|
||||
@GwtIncompatible("String.format()")
|
||||
@Override public String toString() {
|
||||
long nanos = elapsedNanos();
|
||||
|
||||
TimeUnit unit = chooseUnit(nanos);
|
||||
double value = (double) nanos / NANOSECONDS.convert(1, unit);
|
||||
|
||||
// Too bad this functionality is not exposed as a regular method call
|
||||
return String.format("%.4g %s", value, abbreviate(unit));
|
||||
}
|
||||
|
||||
private static TimeUnit chooseUnit(long nanos) {
|
||||
if (SECONDS.convert(nanos, NANOSECONDS) > 0) {
|
||||
return SECONDS;
|
||||
}
|
||||
if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) {
|
||||
return MILLISECONDS;
|
||||
}
|
||||
if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) {
|
||||
return MICROSECONDS;
|
||||
}
|
||||
return NANOSECONDS;
|
||||
}
|
||||
|
||||
private static String abbreviate(TimeUnit unit) {
|
||||
switch (unit) {
|
||||
case NANOSECONDS:
|
||||
return "ns";
|
||||
case MICROSECONDS:
|
||||
return "\u03bcs"; // μs
|
||||
case MILLISECONDS:
|
||||
return "ms";
|
||||
case SECONDS:
|
||||
return "s";
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Copyright (C) 2007 The Guava Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.google.common.io;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Utility methods for working with {@link Closeable} objects.
|
||||
*
|
||||
* @author Michael Lancaster
|
||||
* @since 1.0
|
||||
*/
|
||||
@Beta
|
||||
public final class Closeables {
|
||||
@VisibleForTesting static final Logger logger
|
||||
= Logger.getLogger(Closeables.class.getName());
|
||||
|
||||
private Closeables() {}
|
||||
|
||||
/**
|
||||
* Closes a {@link Closeable}, with control over whether an
|
||||
* {@code IOException} may be thrown. This is primarily useful in a
|
||||
* finally block, where a thrown exception needs to be logged but not
|
||||
* propagated (otherwise the original exception will be lost).
|
||||
*
|
||||
* <p>If {@code swallowIOException} is true then we never throw
|
||||
* {@code IOException} but merely log it.
|
||||
*
|
||||
* <p>Example:
|
||||
*
|
||||
* <p><pre>public void useStreamNicely() throws IOException {
|
||||
* SomeStream stream = new SomeStream("foo");
|
||||
* boolean threw = true;
|
||||
* try {
|
||||
* // Some code which does something with the Stream. May throw a
|
||||
* // Throwable.
|
||||
* threw = false; // No throwable thrown.
|
||||
* } finally {
|
||||
* // Close the stream.
|
||||
* // If an exception occurs, only rethrow it if (threw==false).
|
||||
* Closeables.close(stream, threw);
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @param closeable the {@code Closeable} object to be closed, or null,
|
||||
* in which case this method does nothing
|
||||
* @param swallowIOException if true, don't propagate IO exceptions
|
||||
* thrown by the {@code close} methods
|
||||
* @throws IOException if {@code swallowIOException} is false and
|
||||
* {@code close} throws an {@code IOException}.
|
||||
*/
|
||||
public static void close(@Nullable Closeable closeable,
|
||||
boolean swallowIOException) throws IOException {
|
||||
if (closeable == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
closeable.close();
|
||||
} catch (IOException e) {
|
||||
if (swallowIOException) {
|
||||
logger.log(Level.WARNING,
|
||||
"IOException thrown while closing Closeable.", e);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to calling {@code close(closeable, true)}, but with no
|
||||
* IOException in the signature.
|
||||
* @param closeable the {@code Closeable} object to be closed, or null, in
|
||||
* which case this method does nothing
|
||||
*/
|
||||
public static void closeQuietly(@Nullable Closeable closeable) {
|
||||
try {
|
||||
close(closeable, true);
|
||||
} catch (IOException e) {
|
||||
logger.log(Level.SEVERE, "IOException should not have been thrown.", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Copyright (C) 2007 The Guava Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.google.common.io;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* An InputStream that limits the number of bytes which can be read.
|
||||
*
|
||||
* @author Charles Fry
|
||||
* @since 1.0
|
||||
*/
|
||||
@Beta
|
||||
public final class LimitInputStream extends FilterInputStream {
|
||||
|
||||
private long left;
|
||||
private long mark = -1;
|
||||
|
||||
/**
|
||||
* Wraps another input stream, limiting the number of bytes which can be read.
|
||||
*
|
||||
* @param in the input stream to be wrapped
|
||||
* @param limit the maximum number of bytes to be read
|
||||
*/
|
||||
public LimitInputStream(InputStream in, long limit) {
|
||||
super(in);
|
||||
Preconditions.checkNotNull(in);
|
||||
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
|
||||
left = limit;
|
||||
}
|
||||
|
||||
@Override public int available() throws IOException {
|
||||
return (int) Math.min(in.available(), left);
|
||||
}
|
||||
|
||||
@Override public synchronized void mark(int readlimit) {
|
||||
in.mark(readlimit);
|
||||
mark = left;
|
||||
// it's okay to mark even if mark isn't supported, as reset won't work
|
||||
}
|
||||
|
||||
@Override public int read() throws IOException {
|
||||
if (left == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int result = in.read();
|
||||
if (result != -1) {
|
||||
--left;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (left == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
len = (int) Math.min(len, left);
|
||||
int result = in.read(b, off, len);
|
||||
if (result != -1) {
|
||||
left -= result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override public synchronized void reset() throws IOException {
|
||||
if (!in.markSupported()) {
|
||||
throw new IOException("Mark not supported");
|
||||
}
|
||||
if (mark == -1) {
|
||||
throw new IOException("Mark not set");
|
||||
}
|
||||
|
||||
in.reset();
|
||||
left = mark;
|
||||
}
|
||||
|
||||
@Override public long skip(long n) throws IOException {
|
||||
n = Math.min(n, left);
|
||||
long skipped = in.skip(n);
|
||||
left -= skipped;
|
||||
return skipped;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,211 @@
|
|||
/**
|
||||
* Copyright (c) 2016 YCSB contributors. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
* may not use this file except in compliance with the License. You
|
||||
* may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License. See accompanying
|
||||
* LICENSE file.
|
||||
*/
|
||||
package com.yahoo.ycsb.db;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.Status;
|
||||
import com.yahoo.ycsb.StringByteIterator;
|
||||
import com.yahoo.ycsb.db.AsyncHBaseClient;
|
||||
import com.yahoo.ycsb.measurements.Measurements;
|
||||
import com.yahoo.ycsb.workloads.CoreWorkload;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Vector;
|
||||
|
||||
/**
|
||||
* Integration tests for the YCSB AsyncHBase client, using an HBase minicluster.
|
||||
* These are the same as those for the hbase10 client.
|
||||
*/
|
||||
public class AsyncHBaseTest {
|
||||
|
||||
private final static String COLUMN_FAMILY = "cf";
|
||||
|
||||
private static HBaseTestingUtility testingUtil;
|
||||
private AsyncHBaseClient client;
|
||||
private Table table = null;
|
||||
|
||||
private static boolean isWindows() {
|
||||
final String os = System.getProperty("os.name");
|
||||
return os.startsWith("Windows");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a mini-cluster for use in these tests.
|
||||
*
|
||||
* This is a heavy-weight operation, so invoked only once for the test class.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpClass() throws Exception {
|
||||
// Minicluster setup fails on Windows with an UnsatisfiedLinkError.
|
||||
// Skip if windows.
|
||||
assumeTrue(!isWindows());
|
||||
testingUtil = HBaseTestingUtility.createLocalHTU();
|
||||
testingUtil.startMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tears down mini-cluster.
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownClass() throws Exception {
|
||||
if (testingUtil != null) {
|
||||
testingUtil.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the mini-cluster for testing.
|
||||
*
|
||||
* We re-create the table for each test.
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("columnfamily", COLUMN_FAMILY);
|
||||
|
||||
Measurements.setProperties(p);
|
||||
final CoreWorkload workload = new CoreWorkload();
|
||||
workload.init(p);
|
||||
|
||||
table = testingUtil.createTable(TableName.valueOf(CoreWorkload.table), Bytes.toBytes(COLUMN_FAMILY));
|
||||
|
||||
final String zkQuorum = "127.0.0.1:" + testingUtil.getZkCluster().getClientPort();
|
||||
p.setProperty("hbase.zookeeper.quorum", zkQuorum);
|
||||
client = new AsyncHBaseClient();
|
||||
client.setProperties(p);
|
||||
client.init();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
table.close();
|
||||
testingUtil.deleteTable(CoreWorkload.table);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRead() throws Exception {
|
||||
final String rowKey = "row1";
|
||||
final Put p = new Put(Bytes.toBytes(rowKey));
|
||||
p.addColumn(Bytes.toBytes(COLUMN_FAMILY),
|
||||
Bytes.toBytes("column1"), Bytes.toBytes("value1"));
|
||||
p.addColumn(Bytes.toBytes(COLUMN_FAMILY),
|
||||
Bytes.toBytes("column2"), Bytes.toBytes("value2"));
|
||||
table.put(p);
|
||||
|
||||
final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
|
||||
final Status status = client.read(CoreWorkload.table, rowKey, null, result);
|
||||
assertEquals(Status.OK, status);
|
||||
assertEquals(2, result.size());
|
||||
assertEquals("value1", result.get("column1").toString());
|
||||
assertEquals("value2", result.get("column2").toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadMissingRow() throws Exception {
|
||||
final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
|
||||
final Status status = client.read(CoreWorkload.table, "Missing row", null, result);
|
||||
assertEquals(Status.NOT_FOUND, status);
|
||||
assertEquals(0, result.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScan() throws Exception {
|
||||
// Fill with data
|
||||
final String colStr = "row_number";
|
||||
final byte[] col = Bytes.toBytes(colStr);
|
||||
final int n = 10;
|
||||
final List<Put> puts = new ArrayList<Put>(n);
|
||||
for(int i = 0; i < n; i++) {
|
||||
final byte[] key = Bytes.toBytes(String.format("%05d", i));
|
||||
final byte[] value = java.nio.ByteBuffer.allocate(4).putInt(i).array();
|
||||
final Put p = new Put(key);
|
||||
p.addColumn(Bytes.toBytes(COLUMN_FAMILY), col, value);
|
||||
puts.add(p);
|
||||
}
|
||||
table.put(puts);
|
||||
|
||||
// Test
|
||||
final Vector<HashMap<String, ByteIterator>> result =
|
||||
new Vector<HashMap<String, ByteIterator>>();
|
||||
|
||||
// Scan 5 records, skipping the first
|
||||
client.scan(CoreWorkload.table, "00001", 5, null, result);
|
||||
|
||||
assertEquals(5, result.size());
|
||||
for(int i = 0; i < 5; i++) {
|
||||
final HashMap<String, ByteIterator> row = result.get(i);
|
||||
assertEquals(1, row.size());
|
||||
assertTrue(row.containsKey(colStr));
|
||||
final byte[] bytes = row.get(colStr).toArray();
|
||||
final ByteBuffer buf = ByteBuffer.wrap(bytes);
|
||||
final int rowNum = buf.getInt();
|
||||
assertEquals(i + 1, rowNum);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate() throws Exception{
|
||||
final String key = "key";
|
||||
final HashMap<String, String> input = new HashMap<String, String>();
|
||||
input.put("column1", "value1");
|
||||
input.put("column2", "value2");
|
||||
final Status status = client.insert(CoreWorkload.table, key, StringByteIterator.getByteIteratorMap(input));
|
||||
assertEquals(Status.OK, status);
|
||||
|
||||
// Verify result
|
||||
final Get get = new Get(Bytes.toBytes(key));
|
||||
final Result result = this.table.get(get);
|
||||
assertFalse(result.isEmpty());
|
||||
assertEquals(2, result.size());
|
||||
for(final java.util.Map.Entry<String, String> entry : input.entrySet()) {
|
||||
assertEquals(entry.getValue(),
|
||||
new String(result.getValue(Bytes.toBytes(COLUMN_FAMILY),
|
||||
Bytes.toBytes(entry.getKey()))));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Not yet implemented")
|
||||
public void testDelete() {
|
||||
fail("Not yet implemented");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright (c) 2016 YCSB contributors. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You
|
||||
may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied. See the License for the specific language governing
|
||||
permissions and limitations under the License. See accompanying
|
||||
LICENSE file.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hbase.master.info.port</name>
|
||||
<value>-1</value>
|
||||
<description>The port for the hbase master web UI
|
||||
Set to -1 if you do not want the info server to run.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regionserver.info.port</name>
|
||||
<value>-1</value>
|
||||
<description>The port for the hbase regionserver web UI
|
||||
Set to -1 if you do not want the info server to run.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,28 @@
|
|||
#
|
||||
# Copyright (c) 2015 YCSB contributors. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||
# may not use this file except in compliance with the License. You
|
||||
# may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License. See accompanying
|
||||
# LICENSE file.
|
||||
#
|
||||
|
||||
# Root logger option
|
||||
log4j.rootLogger=WARN, stderr
|
||||
|
||||
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stderr.target=System.err
|
||||
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n
|
||||
|
||||
# Suppress messages from ZKTableStateManager: Creates a large number of table
|
||||
# state change messages.
|
||||
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKTableStateManager=ERROR
|
1
bin/ycsb
1
bin/ycsb
|
@ -51,6 +51,7 @@ COMMANDS = {
|
|||
DATABASES = {
|
||||
"accumulo" : "com.yahoo.ycsb.db.accumulo.AccumuloClient",
|
||||
"aerospike" : "com.yahoo.ycsb.db.AerospikeClient",
|
||||
"asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient",
|
||||
"basic" : "com.yahoo.ycsb.BasicDB",
|
||||
"cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7",
|
||||
"cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8",
|
||||
|
|
|
@ -49,6 +49,11 @@ LICENSE file.
|
|||
<artifactId>aerospike-binding</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>asynchbase-binding</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.ycsb</groupId>
|
||||
<artifactId>cassandra-binding</artifactId>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -68,6 +68,7 @@ LICENSE file.
|
|||
<properties>
|
||||
<maven.assembly.version>2.5.5</maven.assembly.version>
|
||||
<maven.dependency.version>2.10</maven.dependency.version>
|
||||
<asynchbase.version>1.7.1</asynchbase.version>
|
||||
<hbase094.version>0.94.27</hbase094.version>
|
||||
<hbase098.version>0.98.14-hadoop2</hbase098.version>
|
||||
<hbase10.version>1.0.2</hbase10.version>
|
||||
|
@ -105,6 +106,7 @@ LICENSE file.
|
|||
<!-- all the datastore bindings, lex sorted please -->
|
||||
<module>accumulo</module>
|
||||
<module>aerospike</module>
|
||||
<module>asynchbase</module>
|
||||
<module>cassandra</module>
|
||||
<module>cassandra2</module>
|
||||
<module>couchbase</module>
|
||||
|
|
Загрузка…
Ссылка в новой задаче