[hbase10] Still too many threads #691.

Fix broken synchronization meant to guard against over-creation
of cluster connection instances on startup when multiple threads.
This commit is contained in:
stack 2016-04-08 16:13:43 -07:00
Родитель f675c05616
Коммит 57e1ab5a0c
1 изменённых файлов: 35 добавлений и 21 удалений

Просмотреть файл

@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* HBase 1.0 client for YCSB framework. * HBase 1.0 client for YCSB framework.
@ -64,13 +65,21 @@ import java.util.Vector;
public class HBaseClient10 extends com.yahoo.ycsb.DB { public class HBaseClient10 extends com.yahoo.ycsb.DB {
private Configuration config = HBaseConfiguration.create(); private Configuration config = HBaseConfiguration.create();
// Must be an object for synchronization and tracking running thread counts. private static AtomicInteger threadCount = new AtomicInteger(0);
private static Integer threadCount = 0;
private boolean debug = false; private boolean debug = false;
private String tableName = ""; private String tableName = "";
/**
* A Cluster Connection instance that is shared by all running ycsb threads.
* Needs to be initialized late so we pick up command-line configs if any.
* To ensure one instance only in a multi-threaded context, guard access
* with a 'lock' object.
* @See #CONNECTION_LOCK.
*/
private static Connection connection = null; private static Connection connection = null;
private static final Object CONNECTION_LOCK = new Object();
// Depending on the value of clientSideBuffering, either bufferedMutator // Depending on the value of clientSideBuffering, either bufferedMutator
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used. // (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
@ -133,9 +142,10 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
} }
try { try {
synchronized(threadCount) { threadCount.getAndIncrement();
++threadCount; synchronized (CONNECTION_LOCK) {
if (connection == null) { if (connection == null) {
// Initialize if not set up already.
connection = ConnectionFactory.createConnection(config); connection = ConnectionFactory.createConnection(config);
} }
} }
@ -166,7 +176,9 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
String table = com.yahoo.ycsb.workloads.CoreWorkload.table; String table = com.yahoo.ycsb.workloads.CoreWorkload.table;
try { try {
final TableName tName = TableName.valueOf(table); final TableName tName = TableName.valueOf(table);
connection.getTable(tName).getTableDescriptor(); synchronized (CONNECTION_LOCK) {
connection.getTable(tName).getTableDescriptor();
}
} catch (IOException e) { } catch (IOException e) {
throw new DBException(e); throw new DBException(e);
} }
@ -193,11 +205,14 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
long en = System.nanoTime(); long en = System.nanoTime();
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP"; final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
measurements.measure(type, (int) ((en - st) / 1000)); measurements.measure(type, (int) ((en - st) / 1000));
synchronized(threadCount) { threadCount.decrementAndGet();
--threadCount; if (threadCount.get() <= 0) {
if (threadCount <= 0 && connection != null) { // Means we are done so ok to shut down the Connection.
connection.close(); synchronized (CONNECTION_LOCK) {
connection = null; if (connection != null) {
connection.close();
connection = null;
}
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -207,14 +222,13 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
public void getHTable(String table) throws IOException { public void getHTable(String table) throws IOException {
final TableName tName = TableName.valueOf(table); final TableName tName = TableName.valueOf(table);
this.currentTable = this.connection.getTable(tName); synchronized (CONNECTION_LOCK) {
// suggestions from this.currentTable = connection.getTable(tName);
// http://ryantwopointoh.blogspot.com/2009/01/ if (clientSideBuffering) {
// performance-of-hbase-importing.html final BufferedMutatorParams p = new BufferedMutatorParams(tName);
if (clientSideBuffering) { p.writeBufferSize(writeBufferSize);
final BufferedMutatorParams p = new BufferedMutatorParams(tName); this.bufferedMutator = connection.getBufferedMutator(p);
p.writeBufferSize(writeBufferSize); }
this.bufferedMutator = this.connection.getBufferedMutator(p);
} }
} }