[jdbc] Separate use of JDBC batch update APIs from autoCommit

Tried to simplify the distinction between using JDBC's addBatch()/executeBatch()
API calls and "batching" updates via autoCommit=false and a manual commit() after
a given number of updates.

Breaks out flavors into their own package to reduce bloat in JdbcDBClient.

Encompasses changes from Enis Soztutar.
This commit is contained in:
Josh Elser 2016-06-08 17:51:01 -07:00 коммит произвёл Josh Elser
Родитель 74b5080086
Коммит 5dc57b77e0
8 изменённых файлов: 427 добавлений и 252 удалений

Просмотреть файл

@ -101,6 +101,8 @@ db.passwd=admin # Password for the connection.
db.batchsize=1000 # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching.
jdbc.fetchsize=10 # The JDBC fetch size hinted to the driver.
jdbc.autocommit=true # The JDBC connection auto-commit property for the driver.
jdbc.batchupdateapi=false # Use addBatch()/executeBatch() JDBC methods instead of executeUpdate() for writes (default: false)
db.batchsize=1000 # The number of rows to be batched before commit (or executeBatch() when jdbc.batchupdateapi=true)
```
Please refer to https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for all other YCSB core properties.

Просмотреть файл

@ -26,6 +26,7 @@ import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.yahoo.ycsb.db.flavors.DBFlavor;
/**
* A class that wraps a JDBC compliant database to allow it to be interfaced
@ -64,6 +65,8 @@ public class JdbcDBClient extends DB {
/** The JDBC connection auto-commit property for the driver. */
public static final String JDBC_AUTO_COMMIT = "jdbc.autocommit";
public static final String JDBC_BATCH_UPDATES = "jdbc.batchupdateapi";
/** The name of the property for the number of fields in a record. */
public static final String FIELD_COUNT_PROPERTY = "fieldcount";
@ -84,6 +87,8 @@ public class JdbcDBClient extends DB {
private Properties props;
private int jdbcFetchSize;
private int batchSize;
private boolean autoCommit;
private boolean batchUpdates;
private static final String DEFAULT_PROP = "";
private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
private long numRowsInBatch = 0;
@ -112,136 +117,6 @@ public class JdbcDBClient extends DB {
}
}
/**
* DBFlavor captures minor differences in syntax and behavior among JDBC implementations and SQL
* dialects.
*/
private abstract static class DBFlavor {
enum DBName {
DEFAULT,
PHOENIX
}
private final DBName dbName;
public DBFlavor(DBName dbName) {
this.dbName = dbName;
}
public static DBFlavor fromJdbcUrl(String url) {
if (url.startsWith("jdbc:phoenix")) {
return new PhoenixDBFlavor();
}
return new DefaultDBFlavor();
}
/**
* Create and return a SQL statement for inserting data.
*/
abstract String createInsertStatement(StatementType insertType, String key);
/**
* Create and return a SQL statement for reading data.
*/
abstract String createReadStatement(StatementType readType, String key);
/**
* Create and return a SQL statement for deleting data.
*/
abstract String createDeleteStatement(StatementType deleteType, String key);
/**
* Create and return a SQL statement for updating data.
*/
abstract String createUpdateStatement(StatementType updateType, String key);
/**
* Create and return a SQL statement for scanning data.
*/
abstract String createScanStatement(StatementType scanType, String key);
}
/**
* The statement type for the prepared statements.
*/
private static class StatementType {
enum Type {
INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5);
private final int internalType;
private Type(int type) {
internalType = type;
}
int getHashCode() {
final int prime = 31;
int result = 1;
result = prime * result + internalType;
return result;
}
}
private Type type;
private int shardIndex;
private int numFields;
private String tableName;
private String fieldString;
StatementType(Type type, String tableName, int numFields, String fieldString, int shardIndex) {
this.type = type;
this.tableName = tableName;
this.numFields = numFields;
this.fieldString = fieldString;
this.shardIndex = shardIndex;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + numFields + 100 * shardIndex;
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
result = prime * result + ((type == null) ? 0 : type.getHashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StatementType other = (StatementType) obj;
if (numFields != other.numFields) {
return false;
}
if (shardIndex != other.shardIndex) {
return false;
}
if (tableName == null) {
if (other.tableName != null) {
return false;
}
} else if (!tableName.equals(other.tableName)) {
return false;
}
if (type != other.type) {
return false;
}
if (!fieldString.equals(other.fieldString)) {
return false;
}
return true;
}
}
/**
* For the given key, returns what shard contains data for this key.
*
@ -266,6 +141,9 @@ public class JdbcDBClient extends DB {
private void cleanupAllConnections() throws SQLException {
for (Connection conn : conns) {
if (!autoCommit) {
conn.commit();
}
conn.close();
}
}
@ -308,7 +186,8 @@ public class JdbcDBClient extends DB {
this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
this.autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
this.batchUpdates = getBoolProperty(props, JDBC_BATCH_UPDATES, false);
try {
if (driver != null) {
@ -316,7 +195,7 @@ public class JdbcDBClient extends DB {
}
int shardCount = 0;
conns = new ArrayList<Connection>(3);
String[] urlArr = urls.split(",");
final String[] urlArr = urls.split(",");
for (String url : urlArr) {
System.out.println("Adding shard node URL: " + url);
Connection conn = DriverManager.getConnection(url, user, passwd);
@ -432,104 +311,6 @@ public class JdbcDBClient extends DB {
return stmt;
}
private static class DefaultDBFlavor extends DBFlavor {
public DefaultDBFlavor() {
super(DBName.DEFAULT);
}
public DefaultDBFlavor(DBName dbName) {
super(dbName);
}
@Override
String createInsertStatement(StatementType insertType, String key) {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.tableName);
insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(")");
return insert.toString();
}
@Override
String createReadStatement(StatementType readType, String key) {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
read.append(" WHERE ");
read.append(PRIMARY_KEY);
read.append(" = ");
read.append("?");
return read.toString();
}
@Override
String createDeleteStatement(StatementType deleteType, String key) {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
delete.append(" = ?");
return delete.toString();
}
@Override
String createUpdateStatement(StatementType updateType, String key) {
String[] fieldKeys = updateType.fieldString.split(",");
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
update.append(" SET ");
for (int i = 0; i < fieldKeys.length; i++) {
update.append(fieldKeys[i]);
update.append("=?");
if (i < fieldKeys.length - 1) {
update.append(", ");
}
}
update.append(" WHERE ");
update.append(PRIMARY_KEY);
update.append(" = ?");
return update.toString();
}
@Override
String createScanStatement(StatementType scanType, String key) {
StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
select.append(" WHERE ");
select.append(PRIMARY_KEY);
select.append(" >= ?");
select.append(" ORDER BY ");
select.append(PRIMARY_KEY);
select.append(" LIMIT ?");
return select.toString();
}
}
/**
* Database flavor for Apache Phoenix. Captures syntax differences used by Phoenix.
*/
private static class PhoenixDBFlavor extends DefaultDBFlavor {
public PhoenixDBFlavor() {
super(DBName.PHOENIX);
}
@Override
String createInsertStatement(StatementType insertType, String key) {
// Phoenix uses UPSERT syntax
StringBuilder insert = new StringBuilder("UPSERT INTO ");
insert.append(insertType.tableName);
insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(")");
return insert.toString();
}
}
@Override
public Status read(String tableName, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
try {
@ -631,24 +412,49 @@ public class JdbcDBClient extends DB {
for (String value: fieldInfo.getFieldValues()) {
insertStatement.setString(index++, value);
}
int result;
if (batchSize > 0) {
// Using the batch insert API
if (batchUpdates) {
insertStatement.addBatch();
if (++numRowsInBatch % batchSize == 0) {
int[] results = insertStatement.executeBatch();
for (int r : results) {
if (r != 1) {
return Status.ERROR;
// Check for a sane batch size
if (batchSize > 0) {
// Commit the batch after it grows beyond the configured size
if (++numRowsInBatch % batchSize == 0) {
int[] results = insertStatement.executeBatch();
for (int r : results) {
if (r != 1) {
return Status.ERROR;
}
}
}
return Status.OK;
}
// If autoCommit is off, make sure we commit the batch
if (!autoCommit) {
getShardConnectionByKey(key).commit();
}
return Status.OK;
} // else, the default value of -1 or a nonsense. Treat it as an infinitely large batch.
} // else, we let the batch accumulate
// Added element to the batch, potentially committing the batch too.
return Status.BATCHED_OK;
} else {
result = insertStatement.executeUpdate();
}
if (result == 1) {
return Status.OK;
// Normal update
int result = insertStatement.executeUpdate();
// If we are not autoCommit, we might have to commit now
if (!autoCommit) {
// Let updates be batcher locally
if (batchSize > 0) {
if (++numRowsInBatch % batchSize == 0) {
// Send the batch of updates
getShardConnectionByKey(key).commit();
}
// uhh
return Status.OK;
} else {
// Commit each update
getShardConnectionByKey(key).commit();
}
}
if (result == 1) {
return Status.OK;
}
}
return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
@ -679,7 +485,7 @@ public class JdbcDBClient extends DB {
private OrderedFieldInfo getFieldInfo(HashMap<String, ByteIterator> values) {
String fieldKeys = "";
List<String> fieldValues = new ArrayList();
List<String> fieldValues = new ArrayList<>();
int count = 0;
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
fieldKeys += entry.getKey();

Просмотреть файл

@ -0,0 +1,110 @@
/**
* Copyright (c) 2010 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
/**
* The statement type for the prepared statements.
*/
public class StatementType {
enum Type {
INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5);
private final int internalType;
private Type(int type) {
internalType = type;
}
int getHashCode() {
final int prime = 31;
int result = 1;
result = prime * result + internalType;
return result;
}
}
private Type type;
private int shardIndex;
private int numFields;
private String tableName;
private String fieldString;
public StatementType(Type type, String tableName, int numFields, String fieldString, int shardIndex) {
this.type = type;
this.tableName = tableName;
this.numFields = numFields;
this.fieldString = fieldString;
this.shardIndex = shardIndex;
}
public String getTableName() {
return tableName;
}
public String getFieldString() {
return fieldString;
}
public int getNumFields() {
return numFields;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + numFields + 100 * shardIndex;
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
result = prime * result + ((type == null) ? 0 : type.getHashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StatementType other = (StatementType) obj;
if (numFields != other.numFields) {
return false;
}
if (shardIndex != other.shardIndex) {
return false;
}
if (tableName == null) {
if (other.tableName != null) {
return false;
}
} else if (!tableName.equals(other.tableName)) {
return false;
}
if (type != other.type) {
return false;
}
if (!fieldString.equals(other.fieldString)) {
return false;
}
return true;
}
}

Просмотреть файл

@ -0,0 +1,69 @@
/**
* Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.flavors;
import com.yahoo.ycsb.db.StatementType;
/**
* DBFlavor captures minor differences in syntax and behavior among JDBC implementations and SQL
* dialects. This class also acts as a factory to instantiate concrete flavors based on the JDBC URL.
*/
public abstract class DBFlavor {
enum DBName {
DEFAULT,
PHOENIX
}
private final DBName dbName;
public DBFlavor(DBName dbName) {
this.dbName = dbName;
}
public static DBFlavor fromJdbcUrl(String url) {
if (url.startsWith("jdbc:phoenix")) {
return new PhoenixDBFlavor();
}
return new DefaultDBFlavor();
}
/**
* Create and return a SQL statement for inserting data.
*/
public abstract String createInsertStatement(StatementType insertType, String key);
/**
* Create and return a SQL statement for reading data.
*/
public abstract String createReadStatement(StatementType readType, String key);
/**
* Create and return a SQL statement for deleting data.
*/
public abstract String createDeleteStatement(StatementType deleteType, String key);
/**
* Create and return a SQL statement for updating data.
*/
public abstract String createUpdateStatement(StatementType updateType, String key);
/**
* Create and return a SQL statement for scanning data.
*/
public abstract String createScanStatement(StatementType scanType, String key);
}

Просмотреть файл

@ -0,0 +1,98 @@
/**
* Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.flavors;
import com.yahoo.ycsb.db.JdbcDBClient;
import com.yahoo.ycsb.db.StatementType;
/**
* A default flavor for relational databases.
*/
public class DefaultDBFlavor extends DBFlavor {
public DefaultDBFlavor() {
super(DBName.DEFAULT);
}
public DefaultDBFlavor(DBName dbName) {
super(dbName);
}
@Override
public String createInsertStatement(StatementType insertType, String key) {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.getTableName());
insert.append(" (" + JdbcDBClient.PRIMARY_KEY + "," + insertType.getFieldString() + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.getNumFields(); i++) {
insert.append(",?");
}
insert.append(")");
return insert.toString();
}
@Override
public String createReadStatement(StatementType readType, String key) {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.getTableName());
read.append(" WHERE ");
read.append(JdbcDBClient.PRIMARY_KEY);
read.append(" = ");
read.append("?");
return read.toString();
}
@Override
public String createDeleteStatement(StatementType deleteType, String key) {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.getTableName());
delete.append(" WHERE ");
delete.append(JdbcDBClient.PRIMARY_KEY);
delete.append(" = ?");
return delete.toString();
}
@Override
public String createUpdateStatement(StatementType updateType, String key) {
String[] fieldKeys = updateType.getFieldString().split(",");
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.getTableName());
update.append(" SET ");
for (int i = 0; i < fieldKeys.length; i++) {
update.append(fieldKeys[i]);
update.append("=?");
if (i < fieldKeys.length - 1) {
update.append(", ");
}
}
update.append(" WHERE ");
update.append(JdbcDBClient.PRIMARY_KEY);
update.append(" = ?");
return update.toString();
}
@Override
public String createScanStatement(StatementType scanType, String key) {
StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.getTableName());
select.append(" WHERE ");
select.append(JdbcDBClient.PRIMARY_KEY);
select.append(" >= ?");
select.append(" ORDER BY ");
select.append(JdbcDBClient.PRIMARY_KEY);
select.append(" LIMIT ?");
return select.toString();
}
}

Просмотреть файл

@ -0,0 +1,65 @@
/**
* Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.flavors;
import com.yahoo.ycsb.db.JdbcDBClient;
import com.yahoo.ycsb.db.StatementType;
/**
* Database flavor for Apache Phoenix. Captures syntax differences used by Phoenix.
*/
public class PhoenixDBFlavor extends DefaultDBFlavor {
public PhoenixDBFlavor() {
super(DBName.PHOENIX);
}
@Override
public String createInsertStatement(StatementType insertType, String key) {
// Phoenix uses UPSERT syntax
StringBuilder insert = new StringBuilder("UPSERT INTO ");
insert.append(insertType.getTableName());
insert.append(" (" + JdbcDBClient.PRIMARY_KEY + "," + insertType.getFieldString() + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.getNumFields(); i++) {
insert.append(",?");
}
insert.append(")");
return insert.toString();
}
@Override
public String createUpdateStatement(StatementType updateType, String key) {
// Phoenix doesn't have UPDATE semantics, just re-use UPSERT VALUES on the specific columns
String[] fieldKeys = updateType.getFieldString().split(",");
StringBuilder update = new StringBuilder("UPSERT INTO ");
update.append(updateType.getTableName());
update.append(" (");
// Each column to update
for (int i = 0; i < fieldKeys.length; i++) {
update.append(fieldKeys[i]).append(",");
}
// And then set the primary key column
update.append(JdbcDBClient.PRIMARY_KEY).append(") VALUES(");
// Add an unbound param for each column to update
for (int i = 0; i < fieldKeys.length; i++) {
update.append("?, ");
}
// Then the primary key column's value
update.append("?)");
return update.toString();
}
}

Просмотреть файл

@ -0,0 +1,22 @@
/**
* Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* This package contains a collection of database-specific overrides. This accounts for the variance
* that can be present where JDBC does not explicitly define what a database must do or when a
* database has a non-standard SQL implementation.
*/
package com.yahoo.ycsb.db.flavors;

Просмотреть файл

@ -1,5 +1,5 @@
/**
* Copyright (c) 2015 - 2016 Yahoo! Inc. All rights reserved.
* Copyright (c) 2015 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
@ -46,10 +46,10 @@ public class JdbcDBClientTest {
@BeforeClass
public static void setup() {
setupWithBatch(1);
setupWithBatch(1, true);
}
public static void setupWithBatch(int batchSize) {
public static void setupWithBatch(int batchSize, boolean autoCommit) {
try {
jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
jdbcDBClient = new JdbcDBClient();
@ -59,6 +59,8 @@ public class JdbcDBClientTest {
p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER);
p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER);
p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize));
p.setProperty(JdbcDBClient.JDBC_BATCH_UPDATES, "true");
p.setProperty(JdbcDBClient.JDBC_AUTO_COMMIT, Boolean.toString(autoCommit));
jdbcDBClient.setProperties(p);
jdbcDBClient.init();
@ -337,19 +339,18 @@ public class JdbcDBClientTest {
public void insertBatchTest(int numRows) throws DBException {
teardown();
setupWithBatch(10);
setupWithBatch(10, false);
try {
String insertKey = "user0";
HashMap<String, ByteIterator> insertMap = insertRow(insertKey);
assertEquals(3, insertMap.size());
ResultSet resultSet = jdbcConnection.prepareStatement(
String.format("SELECT * FROM %s", TABLE_NAME)
).executeQuery();
// Check we do not have a result Row (because batch is not full yet
// Check we do not have a result Row (because batch is not full yet)
assertFalse(resultSet.next());
// insert more rows, completing 1 batch (still results are partial).
for (int i = 1; i < numRows; i++) {
insertMap = insertRow("user" + i);
@ -360,6 +361,8 @@ public class JdbcDBClientTest {
// call cleanup, which should insert the partial batch
jdbcDBClient.cleanup();
// Prevent a teardown() from printing an error
jdbcDBClient = null;
// Check that we have all rows
assertNumRows(numRows);