зеркало из https://github.com/Azure/YCSB.git
Merge pull request #692 from saintstack/691
[hbase10] Still too many threads #691.
This commit is contained in:
Коммит
604c50dbda
|
@ -51,6 +51,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* HBase 1.0 client for YCSB framework.
|
||||
|
@ -63,14 +64,22 @@ import java.util.Vector;
|
|||
*/
|
||||
public class HBaseClient10 extends com.yahoo.ycsb.DB {
|
||||
private Configuration config = HBaseConfiguration.create();
|
||||
|
||||
// Must be an object for synchronization and tracking running thread counts.
|
||||
private static Integer threadCount = 0;
|
||||
|
||||
private static AtomicInteger threadCount = new AtomicInteger(0);
|
||||
|
||||
private boolean debug = false;
|
||||
|
||||
private String tableName = "";
|
||||
|
||||
/**
|
||||
* A Cluster Connection instance that is shared by all running ycsb threads.
|
||||
* Needs to be initialized late so we pick up command-line configs if any.
|
||||
* To ensure one instance only in a multi-threaded context, guard access
|
||||
* with a 'lock' object.
|
||||
* @See #CONNECTION_LOCK.
|
||||
*/
|
||||
private static Connection connection = null;
|
||||
private static final Object CONNECTION_LOCK = new Object();
|
||||
|
||||
// Depending on the value of clientSideBuffering, either bufferedMutator
|
||||
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
|
||||
|
@ -121,10 +130,10 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
|
|||
UserGroupInformation.setConfiguration(config);
|
||||
}
|
||||
|
||||
if ((getProperties().getProperty("principal")!=null)
|
||||
if ((getProperties().getProperty("principal")!=null)
|
||||
&& (getProperties().getProperty("keytab")!=null)) {
|
||||
try {
|
||||
UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"),
|
||||
UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"),
|
||||
getProperties().getProperty("keytab"));
|
||||
} catch (IOException e) {
|
||||
System.err.println("Keytab file is not readable or not found");
|
||||
|
@ -133,9 +142,10 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
|
|||
}
|
||||
|
||||
try {
|
||||
synchronized(threadCount) {
|
||||
++threadCount;
|
||||
threadCount.getAndIncrement();
|
||||
synchronized (CONNECTION_LOCK) {
|
||||
if (connection == null) {
|
||||
// Initialize if not set up already.
|
||||
connection = ConnectionFactory.createConnection(config);
|
||||
}
|
||||
}
|
||||
|
@ -166,7 +176,9 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
|
|||
String table = com.yahoo.ycsb.workloads.CoreWorkload.table;
|
||||
try {
|
||||
final TableName tName = TableName.valueOf(table);
|
||||
connection.getTable(tName).getTableDescriptor();
|
||||
synchronized (CONNECTION_LOCK) {
|
||||
connection.getTable(tName).getTableDescriptor();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new DBException(e);
|
||||
}
|
||||
|
@ -193,11 +205,14 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
|
|||
long en = System.nanoTime();
|
||||
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
|
||||
measurements.measure(type, (int) ((en - st) / 1000));
|
||||
synchronized(threadCount) {
|
||||
--threadCount;
|
||||
if (threadCount <= 0 && connection != null) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
threadCount.decrementAndGet();
|
||||
if (threadCount.get() <= 0) {
|
||||
// Means we are done so ok to shut down the Connection.
|
||||
synchronized (CONNECTION_LOCK) {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -207,14 +222,13 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
|
|||
|
||||
public void getHTable(String table) throws IOException {
|
||||
final TableName tName = TableName.valueOf(table);
|
||||
this.currentTable = this.connection.getTable(tName);
|
||||
// suggestions from
|
||||
// http://ryantwopointoh.blogspot.com/2009/01/
|
||||
// performance-of-hbase-importing.html
|
||||
if (clientSideBuffering) {
|
||||
final BufferedMutatorParams p = new BufferedMutatorParams(tName);
|
||||
p.writeBufferSize(writeBufferSize);
|
||||
this.bufferedMutator = this.connection.getBufferedMutator(p);
|
||||
synchronized (CONNECTION_LOCK) {
|
||||
this.currentTable = connection.getTable(tName);
|
||||
if (clientSideBuffering) {
|
||||
final BufferedMutatorParams p = new BufferedMutatorParams(tName);
|
||||
p.writeBufferSize(writeBufferSize);
|
||||
this.bufferedMutator = connection.getBufferedMutator(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче