[core,mongodb,jdbc] Merge pull request #755 from enis/jdbc-batching

Add batching in insertion in JDBC client bindings
This commit is contained in:
Sean Busbey 2016-06-21 16:29:40 -05:00 коммит произвёл GitHub
Родитель 81ba93cc66 050fd4414f
Коммит 2b3203f301
7 изменённых файлов: 154 добавлений и 55 удалений

Просмотреть файл

@ -87,6 +87,6 @@ public class Status {
public static final Status BAD_REQUEST = new Status("BAD_REQUEST", "The request was not valid.");
public static final Status FORBIDDEN = new Status("FORBIDDEN", "The operation is forbidden.");
public static final Status SERVICE_UNAVAILABLE = new Status("SERVICE_UNAVAILABLE", "Dependant service for the current binding is not available.");
public static final Status BATCHED_OK = new Status("BATCHED_OK", "The operation has been batched by the binding to be executed later.");
}

Просмотреть файл

@ -98,6 +98,7 @@ db.driver=com.mysql.jdbc.Driver # The JDBC driver class to use.
db.url=jdbc:mysql://127.0.0.1:3306/ycsb # The Database connection URL.
db.user=admin # User name for the connection.
db.passwd=admin # Password for the connection.
db.batchsize=1000 # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching.
jdbc.fetchsize=10 # The JDBC fetch size hinted to the driver.
jdbc.autocommit=true # The JDBC connection auto-commit property for the driver.
```

Просмотреть файл

@ -1,18 +1,18 @@
/**
* Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
@ -31,11 +31,11 @@ import java.util.concurrent.ConcurrentMap;
* A class that wraps a JDBC compliant database to allow it to be interfaced
* with YCSB. This class extends {@link DB} and implements the database
* interface used by YCSB client.
*
*
* <br>
* Each client will have its own instance of this class. This client is not
* thread safe.
*
*
* <br>
* This interface expects a schema <key> <field1> <field2> <field3> ... All
* attributes are of type VARCHAR. All accesses are through the primary key.
@ -55,6 +55,9 @@ public class JdbcDBClient extends DB {
/** The password to use for establishing the connection. */
public static final String CONNECTION_PASSWD = "db.passwd";
/** The batch size for batched inserts. Set to >0 to use batching */
public static final String DB_BATCH_SIZE = "db.batchsize";
/** The JDBC fetch size hinted to the driver. */
public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize";
@ -79,9 +82,11 @@ public class JdbcDBClient extends DB {
private ArrayList<Connection> conns;
private boolean initialized = false;
private Properties props;
private Integer jdbcFetchSize;
private int jdbcFetchSize;
private int batchSize;
private static final String DEFAULT_PROP = "";
private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
private long numRowsInBatch = 0;
/**
* Ordered field information for insert and update statements.
@ -213,6 +218,20 @@ public class JdbcDBClient extends DB {
}
}
/** Returns parsed int value from the properties if set, otherwise returns -1. */
private static int getIntProperty(Properties props, String key) throws DBException {
String valueStr = props.getProperty(key);
if (valueStr != null) {
try {
return Integer.parseInt(valueStr);
} catch (NumberFormatException nfe) {
System.err.println("Invalid " + key + " specified: " + valueStr);
throw new DBException(nfe);
}
}
return -1;
}
@Override
public void init() throws DBException {
if (initialized) {
@ -225,15 +244,8 @@ public class JdbcDBClient extends DB {
String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
String driver = props.getProperty(DRIVER_CLASS);
String jdbcFetchSizeStr = props.getProperty(JDBC_FETCH_SIZE);
if (jdbcFetchSizeStr != null) {
try {
this.jdbcFetchSize = Integer.parseInt(jdbcFetchSizeStr);
} catch (NumberFormatException nfe) {
System.err.println("Invalid JDBC fetch size specified: " + jdbcFetchSizeStr);
throw new DBException(nfe);
}
}
this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString());
Boolean autoCommit = Boolean.parseBoolean(autoCommitStr);
@ -258,7 +270,7 @@ public class JdbcDBClient extends DB {
conns.add(conn);
}
System.out.println("Using " + shardCount + " shards");
System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize);
cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
} catch (ClassNotFoundException e) {
@ -276,6 +288,20 @@ public class JdbcDBClient extends DB {
@Override
public void cleanup() throws DBException {
if (batchSize > 0) {
try {
// commit un-finished batches
for (PreparedStatement st : cachedStatements.values()) {
if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) {
st.executeBatch();
}
}
} catch (SQLException e) {
System.err.println("Error in cleanup execution. " + e);
throw new DBException(e);
}
}
try {
cleanupAllConnections();
} catch (SQLException e) {
@ -363,7 +389,7 @@ public class JdbcDBClient extends DB {
select.append(PRIMARY_KEY);
select.append(" LIMIT ?");
PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
if (this.jdbcFetchSize != null) {
if (this.jdbcFetchSize > 0) {
scanStatement.setFetchSize(this.jdbcFetchSize);
}
PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
@ -474,7 +500,22 @@ public class JdbcDBClient extends DB {
for (String value: fieldInfo.getFieldValues()) {
insertStatement.setString(index++, value);
}
int result = insertStatement.executeUpdate();
int result;
if (batchSize > 0) {
insertStatement.addBatch();
if (++numRowsInBatch % batchSize == 0) {
int[] results = insertStatement.executeBatch();
for (int r : results) {
if (r != 1) {
return Status.ERROR;
}
}
return Status.OK;
}
return Status.BATCHED_OK;
} else {
result = insertStatement.executeUpdate();
}
if (result == 1) {
return Status.OK;
}
@ -520,4 +561,4 @@ public class JdbcDBClient extends DB {
return new OrderedFieldInfo(fieldKeys, fieldValues);
}
}
}

Просмотреть файл

@ -46,24 +46,29 @@ public class JdbcDBClientTest {
@BeforeClass
public static void setup() {
try {
jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
jdbcDBClient = new JdbcDBClient();
setupWithBatch(1);
}
Properties p = new Properties();
p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL);
p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER);
p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER);
public static void setupWithBatch(int batchSize) {
try {
jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
jdbcDBClient = new JdbcDBClient();
jdbcDBClient.setProperties(p);
jdbcDBClient.init();
} catch (SQLException e) {
e.printStackTrace();
fail("Could not create local Database");
} catch (DBException e) {
e.printStackTrace();
fail("Could not create JdbcDBClient instance");
}
Properties p = new Properties();
p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL);
p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER);
p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER);
p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize));
jdbcDBClient.setProperties(p);
jdbcDBClient.init();
} catch (SQLException e) {
e.printStackTrace();
fail("Could not create local Database");
} catch (DBException e) {
e.printStackTrace();
fail("Could not create JdbcDBClient instance");
}
}
@AfterClass
@ -75,7 +80,7 @@ public class JdbcDBClientTest {
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (jdbcDBClient != null) {
jdbcDBClient.cleanup();
@ -319,4 +324,65 @@ public class JdbcDBClientTest {
testIndex++;
}
}
@Test
public void insertBatchTest() throws DBException {
insertBatchTest(20);
}
@Test
public void insertPartialBatchTest() throws DBException {
insertBatchTest(19);
}
public void insertBatchTest(int numRows) throws DBException {
teardown();
setupWithBatch(10);
try {
String insertKey = "user0";
HashMap<String, ByteIterator> insertMap = insertRow(insertKey);
ResultSet resultSet = jdbcConnection.prepareStatement(
String.format("SELECT * FROM %s", TABLE_NAME)
).executeQuery();
// Check we do not have a result Row (because batch is not full yet
assertFalse(resultSet.next());
// insert more rows, completing 1 batch (still results are partial).
for (int i = 1; i < numRows; i++) {
insertMap = insertRow("user" + i);
}
//
assertNumRows(10 * (numRows / 10));
// call cleanup, which should insert the partial batch
jdbcDBClient.cleanup();
// Check that we have all rows
assertNumRows(numRows);
} catch (SQLException e) {
e.printStackTrace();
fail("Failed insertBatchTest");
} finally {
teardown(); // for next tests
setup();
}
}
private void assertNumRows(long numRows) throws SQLException {
ResultSet resultSet = jdbcConnection.prepareStatement(
String.format("SELECT * FROM %s", TABLE_NAME)
).executeQuery();
for (int i = 0; i < numRows; i++) {
assertTrue("expecting " + numRows + " results, received only " + i, resultSet.next());
}
assertFalse("expecting " + numRows + " results, received more", resultSet.next());
resultSet.close();
}
}

Просмотреть файл

@ -286,7 +286,7 @@ public class AsyncMongoDbClient extends DB {
batchedWriteCount += 1;
if (batchedWriteCount < batchSize) {
return OptionsSupport.BATCHED_OK;
return Status.BATCHED_OK;
}
long count = collection.write(batchedWrite);

Просмотреть файл

@ -286,7 +286,7 @@ public class MongoDbClient extends DB {
}
bulkInserts.clear();
} else {
return OptionsSupport.BATCHED_OK;
return Status.BATCHED_OK;
}
}
return Status.OK;

Просмотреть файл

@ -18,8 +18,6 @@ package com.yahoo.ycsb.db;
import java.util.Properties;
import com.yahoo.ycsb.Status;
/**
* OptionsSupport provides methods for handling legacy options.
*
@ -27,13 +25,6 @@ import com.yahoo.ycsb.Status;
*/
public final class OptionsSupport {
/**
* Status used for operations that have not been send to the server and have
* only been batched by the client.
*/
public static final Status BATCHED_OK = new Status("BATCHED_OK",
"The operation has been batched by the binding.");
/** Value for an unavailable property. */
private static final String UNAVAILABLE = "n/a";