[zookeeper]: add the zk binding (#1327)

This commit is contained in:
maoling 2020-12-07 11:47:29 +08:00 коммит произвёл GitHub
Родитель 261cc785f3
Коммит 7ddc8fe068
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 602 добавлений и 1 удалений

Просмотреть файл

@ -72,3 +72,4 @@ solr7:site.ycsb.db.solr7.SolrClient
tarantool:site.ycsb.db.TarantoolClient
tablestore:site.ycsb.db.tablestore.TableStoreClient
voltdb:site.ycsb.db.voltdb.VoltClient4
zookeeper:site.ycsb.db.zookeeper.ZKClient

Просмотреть файл

@ -100,7 +100,8 @@ DATABASES = {
"seaweedfs" : "site.ycsb.db.seaweed.SeaweedClient",
"solr7" : "site.ycsb.db.solr7.SolrClient",
"tarantool" : "site.ycsb.db.TarantoolClient",
"tablestore" : "site.ycsb.db.tablestore.TableStoreClient"
"tablestore" : "site.ycsb.db.tablestore.TableStoreClient",
"zookeeper" : "site.ycsb.db.zookeeper.ZKClient"
}
OPTIONS = {

Просмотреть файл

@ -254,6 +254,11 @@ LICENSE file.
<artifactId>voltdb-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>site.ycsb</groupId>
<artifactId>zookeeper-binding</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

Просмотреть файл

@ -148,6 +148,7 @@ LICENSE file.
<thrift.version>0.8.0</thrift.version>
<tablestore.version>4.8.0</tablestore.version>
<voltdb.version>10.1.1</voltdb.version>
<zookeeper.version>3.5.8</zookeeper.version>
</properties>
<modules>
@ -199,6 +200,7 @@ LICENSE file.
<module>tarantool</module>
<module>tablestore</module>
<module>voltdb</module>
<module>zookeeper</module>
</modules>
<build>

91
zookeeper/README.md Normal file
Просмотреть файл

@ -0,0 +1,91 @@
<!--
Copyright (c) 2020 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
## Quick Start
This section describes how to run YCSB on ZooKeeper.
### 1. Start ZooKeeper Server(s)
### 2. Install Java and Maven
### 3. Set Up YCSB
Git clone YCSB and compile:
git clone http://github.com/brianfrankcooper/YCSB.git
# more details in the landing page for instructions on downloading YCSB(https://github.com/brianfrankcooper/YCSB#getting-started).
cd YCSB
mvn -pl site.ycsb:zookeeper-binding -am clean package -DskipTests
### 4. Provide ZooKeeper Connection Parameters
Set connectString, sessionTimeout, watchFlag in the workload you plan to run.
- `zookeeper.connectString`
- `zookeeper.sessionTimeout`
- `zookeeper.watchFlag`
* A parameter for enabling ZooKeeper's watch, optional values:true or false.the default value is false.
* This parameter cannot test the watch performance, but for testing what effect will take on the read/write requests when enabling the watch.
```bash
./bin/ycsb run zookeeper -s -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p zookeeper.watchFlag=true
```
Or, you can set configs with the shell command, EG:
# create a /benchmark namespace for sake of cleaning up the workspace after test.
# e.g the CLI:create /benchmark
./bin/ycsb run zookeeper -s -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p zookeeper.sessionTimeout=30000
### 5. Load data and run tests
Load the data:
# -p recordcount,the count of records/paths you want to insert
./bin/ycsb load zookeeper -s -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p recordcount=10000 > outputLoad.txt
Run the workload test:
# YCSB workloadb is the most suitable workload for read-heavy workload for the ZooKeeper in the real world.
# -p fieldlength, test the length of value/data-content took effect on performance
./bin/ycsb run zookeeper -s -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p fieldlength=1000
# -p fieldcount
./bin/ycsb run zookeeper -s -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p fieldcount=20
# -p hdrhistogram.percentiles,show the hdrhistogram benchmark result
./bin/ycsb run zookeeper -threads 1 -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p hdrhistogram.percentiles=10,25,50,75,90,95,99,99.9 -p histogram.buckets=500
# -threads: multi-clients test, increase the **maxClientCnxns** in the zoo.cfg to handle more connections.
./bin/ycsb run zookeeper -threads 10 -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark
# show the timeseries benchmark result
./bin/ycsb run zookeeper -threads 1 -P workloads/workloadb -p zookeeper.connectString=127.0.0.1:2181/benchmark -p measurementtype=timeseries -p timeseries.granularity=50
# cluster test
./bin/ycsb run zookeeper -P workloads/workloadb -p zookeeper.connectString=192.168.10.43:2181,192.168.10.45:2181,192.168.10.27:2181/benchmark
# test leader's read/write performance by setting zookeeper.connectString to leader's(192.168.10.43:2181)
./bin/ycsb run zookeeper -P workloads/workloadb -p zookeeper.connectString=192.168.10.43:2181/benchmark
# test for large znode(by default: jute.maxbuffer is 1048575 bytes/1 MB ). Notice:jute.maxbuffer should also be set the same value in all the zk servers.
./bin/ycsb run zookeeper -jvm-args="-Djute.maxbuffer=4194304" -s -P workloads/workloadc -p zookeeper.connectString=127.0.0.1:2181/benchmark
# Cleaning up the workspace after finishing the benchmark.
# e.g the CLI:deleteall /benchmark

78
zookeeper/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>site.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.18.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>zookeeper-binding</artifactId>
<name>ZooKeeper DB Binding</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>site.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,222 @@
/**
* Copyright (c) 2020 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
* <p>
* ZooKeeper client binding for YCSB.
* <p>
*/
package site.ycsb.db.zookeeper;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import site.ycsb.ByteIterator;
import site.ycsb.DB;
import site.ycsb.DBException;
import site.ycsb.Status;
import site.ycsb.StringByteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* YCSB binding for <a href="https://zookeeper.apache.org/">ZooKeeper</a>.
*
* See {@code zookeeper/README.md} for details.
*/
public class ZKClient extends DB {
private ZooKeeper zk;
private Watcher watcher;
private static final String CONNECT_STRING = "zookeeper.connectString";
private static final String DEFAULT_CONNECT_STRING = "127.0.0.1:2181";
private static final String SESSION_TIMEOUT_PROPERTY = "zookeeper.sessionTimeout";
private static final long DEFAULT_SESSION_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
private static final String WATCH_FLAG = "zookeeper.watchFlag";
private static final Charset UTF_8 = Charset.forName("UTF-8");
private static final Logger LOG = LoggerFactory.getLogger(ZKClient.class);
public void init() throws DBException {
Properties props = getProperties();
String connectString = props.getProperty(CONNECT_STRING);
if (connectString == null || connectString.length() == 0) {
connectString = DEFAULT_CONNECT_STRING;
}
if(Boolean.parseBoolean(props.getProperty(WATCH_FLAG))) {
watcher = new SimpleWatcher();
} else {
watcher = null;
}
long sessionTimeout;
String sessionTimeoutString = props.getProperty(SESSION_TIMEOUT_PROPERTY);
if (sessionTimeoutString != null) {
sessionTimeout = Integer.parseInt(sessionTimeoutString);
} else {
sessionTimeout = DEFAULT_SESSION_TIMEOUT;
}
try {
zk = new ZooKeeper(connectString, (int) sessionTimeout, new SimpleWatcher());
} catch (IOException e) {
throw new DBException("Creating connection failed.");
}
}
public void cleanup() throws DBException {
try {
zk.close();
} catch (InterruptedException e) {
throw new DBException("Closing connection failed.");
}
}
@Override
public Status read(String table, String key, Set<String> fields,
Map<String, ByteIterator> result) {
String path = getPath(key);
try {
byte[] data = zk.getData(path, watcher, null);
if (data == null || data.length == 0) {
return Status.NOT_FOUND;
}
deserializeValues(data, fields, result);
return Status.OK;
} catch (KeeperException | InterruptedException e) {
LOG.error("Error when reading a path:{},tableName:{}", path, table, e);
return Status.ERROR;
}
}
@Override
public Status insert(String table, String key,
Map<String, ByteIterator> values) {
String path = getPath(key);
String data = getJsonStrFromByteMap(values);
try {
zk.create(path, data.getBytes(UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return Status.OK;
} catch (KeeperException.NodeExistsException e1) {
return Status.OK;
} catch (KeeperException | InterruptedException e2) {
LOG.error("Error when inserting a path:{},tableName:{}", path, table, e2);
return Status.ERROR;
}
}
@Override
public Status delete(String table, String key) {
String path = getPath(key);
try {
zk.delete(path, -1);
return Status.OK;
} catch (InterruptedException | KeeperException e) {
LOG.error("Error when deleting a path:{},tableName:{}", path, table, e);
return Status.ERROR;
}
}
@Override
public Status update(String table, String key,
Map<String, ByteIterator> values) {
String path = getPath(key);
try {
// we have to do a read operation here before setData to meet the YCSB's update semantics:
// update a single record in the database, adding or replacing the specified fields.
byte[] data = zk.getData(path, watcher, null);
if (data == null || data.length == 0) {
return Status.NOT_FOUND;
}
final Map<String, ByteIterator> result = new HashMap<>();
deserializeValues(data, null, result);
result.putAll(values);
// update
zk.setData(path, getJsonStrFromByteMap(result).getBytes(UTF_8), -1);
return Status.OK;
} catch (KeeperException | InterruptedException e) {
LOG.error("Error when updating a path:{},tableName:{}", path, table, e);
return Status.ERROR;
}
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
return Status.NOT_IMPLEMENTED;
}
private String getPath(String key) {
return key.startsWith("/") ? key : "/" + key;
}
/**
* converting the key:values map to JSON Strings.
*/
private static String getJsonStrFromByteMap(Map<String, ByteIterator> map) {
Map<String, String> stringMap = StringByteIterator.getStringMap(map);
return JSONValue.toJSONString(stringMap);
}
private Map<String, ByteIterator> deserializeValues(final byte[] data, final Set<String> fields,
final Map<String, ByteIterator> result) {
JSONObject jsonObject = (JSONObject)JSONValue.parse(new String(data, UTF_8));
Iterator<String> iterator = jsonObject.keySet().iterator();
while(iterator.hasNext()) {
String field = iterator.next();
String value = jsonObject.get(field).toString();
if(fields == null || fields.contains(field)) {
result.put(field, new StringByteIterator(value));
}
}
return result;
}
private static class SimpleWatcher implements Watcher {
public void process(WatchedEvent e) {
if (e.getType() == Event.EventType.None) {
return;
}
if (e.getState() == Event.KeeperState.SyncConnected) {
//do nothing
}
}
}
}

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Copyright (c) 2020 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* The YCSB binding for <a href="https://zookeeper.apache.org/">ZooKeeper</a>.
*/
package site.ycsb.db.zookeeper;

Просмотреть файл

@ -0,0 +1,22 @@
# Copyright (c) 2020 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
# Root logger option
log4j.rootLogger=INFO, stderr
# Direct log messages to stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.Target=System.err
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

Просмотреть файл

@ -0,0 +1,157 @@
/**
* Copyright (c) 2020 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package site.ycsb.db.zookeeper;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import site.ycsb.ByteIterator;
import site.ycsb.Status;
import site.ycsb.StringByteIterator;
import site.ycsb.measurements.Measurements;
import site.ycsb.workloads.CoreWorkload;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.fail;
import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY;
import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY_DEFAULT;
/**
* Integration tests for the YCSB ZooKeeper client.
*/
public class ZKClientTest {
private static TestingServer zkTestServer;
private ZKClient client;
private String tableName;
private final static String path = "benchmark";
private static final int PORT = 2181;
@BeforeClass
public static void setUpClass() throws Exception {
zkTestServer = new TestingServer(PORT);
zkTestServer.start();
}
@AfterClass
public static void tearDownClass() throws Exception {
zkTestServer.stop();
}
@Before
public void setUp() throws Exception {
client = new ZKClient();
Properties p = new Properties();
p.setProperty("zookeeper.connectString", "127.0.0.1:" + String.valueOf(PORT));
Measurements.setProperties(p);
final CoreWorkload workload = new CoreWorkload();
workload.init(p);
tableName = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
client.setProperties(p);
client.init();
}
@After
public void tearDown() throws Exception {
client.cleanup();
}
@Test
public void testZKClient() {
// insert
Map<String, String> m = new HashMap<>();
String field1 = "field_1";
String value1 = "value_1";
m.put(field1, value1);
Map<String, ByteIterator> result = StringByteIterator.getByteIteratorMap(m);
client.insert(tableName, path, result);
// read
result.clear();
Status status = client.read(tableName, path, null, result);
assertEquals(Status.OK, status);
assertEquals(1, result.size());
assertEquals(value1, result.get(field1).toString());
// update(the same field)
m.clear();
result.clear();
String newVal = "value_new";
m.put(field1, newVal);
result = StringByteIterator.getByteIteratorMap(m);
client.update(tableName, path, result);
assertEquals(1, result.size());
// Verify result
result.clear();
status = client.read(tableName, path, null, result);
assertEquals(Status.OK, status);
// here we only have one field: field_1
assertEquals(1, result.size());
assertEquals(newVal, result.get(field1).toString());
// update(two different field)
m.clear();
result.clear();
String field2 = "field_2";
String value2 = "value_2";
m.put(field2, value2);
result = StringByteIterator.getByteIteratorMap(m);
client.update(tableName, path, result);
assertEquals(1, result.size());
// Verify result
result.clear();
status = client.read(tableName, path, null, result);
assertEquals(Status.OK, status);
// here we have two field: field_1 and field_2
assertEquals(2, result.size());
assertEquals(value2, result.get(field2).toString());
assertEquals(newVal, result.get(field1).toString());
// delete
status = client.delete(tableName, path);
assertEquals(Status.OK, status);
// Verify result
result.clear();
status = client.read(tableName, path, null, result);
// NoNode return ERROR
assertEquals(Status.ERROR, status);
assertEquals(0, result.size());
}
@Test
@Ignore("Not yet implemented")
public void testScan() {
fail("Not yet implemented");
}
}