зеркало из https://github.com/Azure/YCSB.git
gh-95 Update the MongoDB driver to use a singleton Mongo instance for
each test client. Add a control (via properties) for the number of connections the Mongo instance will create/use.
This commit is contained in:
Родитель
0d2bf50e9e
Коммит
92d86e74a8
|
@ -40,3 +40,5 @@ See the next section for the list of configuration parameters for MongoDB.
|
|||
### `mongodb.database` (default: `ycsb`)
|
||||
|
||||
### `mongodb.writeConcern` (default `safe`)
|
||||
|
||||
### `mongodb.maxconnections` (default `10`)
|
||||
|
|
|
@ -11,10 +11,11 @@ package com.yahoo.ycsb.db;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.mongodb.BasicDBObject;
|
||||
import com.mongodb.DBAddress;
|
||||
|
@ -22,102 +23,133 @@ import com.mongodb.DBCollection;
|
|||
import com.mongodb.DBCursor;
|
||||
import com.mongodb.DBObject;
|
||||
import com.mongodb.Mongo;
|
||||
import com.mongodb.MongoOptions;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.WriteResult;
|
||||
|
||||
import com.yahoo.ycsb.ByteArrayByteIterator;
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.DB;
|
||||
import com.yahoo.ycsb.DBException;
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.StringByteIterator;
|
||||
|
||||
/**
|
||||
* MongoDB client for YCSB framework.
|
||||
*
|
||||
*
|
||||
* Properties to set:
|
||||
*
|
||||
* mongodb.url=mongodb://localhost:27017
|
||||
* mongodb.database=ycsb
|
||||
*
|
||||
* mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb
|
||||
* mongodb.writeConcern=normal
|
||||
*
|
||||
*
|
||||
* @author ypai
|
||||
*
|
||||
*/
|
||||
public class MongoDbClient extends DB {
|
||||
|
||||
private Mongo mongo;
|
||||
private WriteConcern writeConcern;
|
||||
private String database;
|
||||
/** Used to include a field in a response. */
|
||||
protected static final Integer INCLUDE = Integer.valueOf(1);
|
||||
|
||||
/** A singleton Mongo instance. */
|
||||
private static Mongo mongo;
|
||||
|
||||
/** The default write concern for the test. */
|
||||
private static WriteConcern writeConcern;
|
||||
|
||||
/** The database to access. */
|
||||
private static String database;
|
||||
|
||||
/** Count the number of times initialized to teardown on the last {@link #cleanup()}. */
|
||||
private static final AtomicInteger initCount = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Initialize any state for this DB.
|
||||
* Called once per DB instance; there is one DB instance per client thread.
|
||||
*/
|
||||
@Override
|
||||
public void init() throws DBException {
|
||||
// initialize MongoDb driver
|
||||
Properties props = getProperties();
|
||||
String url = props.getProperty("mongodb.url", "mongodb://localhost:27017");
|
||||
database = props.getProperty("mongodb.database", "ycsb");
|
||||
String writeConcernType = props.getProperty("mongodb.writeConcern", "safe").toLowerCase();
|
||||
|
||||
if ("none".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.NONE;
|
||||
} else if ("safe".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.SAFE;
|
||||
} else if ("normal".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.NORMAL;
|
||||
} else if ("fsync_safe".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.FSYNC_SAFE;
|
||||
} else if ("replicas_safe".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.REPLICAS_SAFE;
|
||||
} else {
|
||||
System.err.println("ERROR: Invalid writeConcern: '" + writeConcernType + "'. " +
|
||||
"Must be [ none | safe | normal | fsync_safe | replicas_safe ]");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
try {
|
||||
// strip out prefix since Java driver doesn't currently support
|
||||
// standard connection format URL yet
|
||||
// http://www.mongodb.org/display/DOCS/Connections
|
||||
if (url.startsWith("mongodb://")) {
|
||||
url = url.substring(10);
|
||||
initCount.incrementAndGet();
|
||||
synchronized (INCLUDE) {
|
||||
if (mongo != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// need to append db to url.
|
||||
url += "/"+database;
|
||||
System.out.println("new database url = "+url);
|
||||
mongo = new Mongo(new DBAddress(url));
|
||||
System.out.println("mongo connection created with "+url);
|
||||
} catch (Exception e1) {
|
||||
System.err.println(
|
||||
"Could not initialize MongoDB connection pool for Loader: "
|
||||
+ e1.toString());
|
||||
e1.printStackTrace();
|
||||
return;
|
||||
}
|
||||
// initialize MongoDb driver
|
||||
Properties props = getProperties();
|
||||
String url = props.getProperty("mongodb.url",
|
||||
"mongodb://localhost:27017");
|
||||
database = props.getProperty("mongodb.database", "ycsb");
|
||||
String writeConcernType = props.getProperty("mongodb.writeConcern",
|
||||
"safe").toLowerCase();
|
||||
final String maxConnections = props.getProperty(
|
||||
"mongodb.maxconnections", "10");
|
||||
|
||||
if ("none".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.NONE;
|
||||
}
|
||||
else if ("safe".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.SAFE;
|
||||
}
|
||||
else if ("normal".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.NORMAL;
|
||||
}
|
||||
else if ("fsync_safe".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.FSYNC_SAFE;
|
||||
}
|
||||
else if ("replicas_safe".equals(writeConcernType)) {
|
||||
writeConcern = WriteConcern.REPLICAS_SAFE;
|
||||
}
|
||||
else {
|
||||
System.err
|
||||
.println("ERROR: Invalid writeConcern: '"
|
||||
+ writeConcernType
|
||||
+ "'. "
|
||||
+ "Must be [ none | safe | normal | fsync_safe | replicas_safe ]");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
try {
|
||||
// strip out prefix since Java driver doesn't currently support
|
||||
// standard connection format URL yet
|
||||
// http://www.mongodb.org/display/DOCS/Connections
|
||||
if (url.startsWith("mongodb://")) {
|
||||
url = url.substring(10);
|
||||
}
|
||||
|
||||
// need to append db to url.
|
||||
url += "/" + database;
|
||||
System.out.println("new database url = " + url);
|
||||
MongoOptions options = new MongoOptions();
|
||||
options.connectionsPerHost = Integer.parseInt(maxConnections);
|
||||
mongo = new Mongo(new DBAddress(url), options);
|
||||
|
||||
System.out.println("mongo connection created with " + url);
|
||||
}
|
||||
catch (Exception e1) {
|
||||
System.err
|
||||
.println("Could not initialize MongoDB connection pool for Loader: "
|
||||
+ e1.toString());
|
||||
e1.printStackTrace();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Cleanup any state for this DB.
|
||||
* Called once per DB instance; there is one DB instance per client thread.
|
||||
*/
|
||||
public void cleanup() throws DBException
|
||||
{
|
||||
try {
|
||||
mongo.close();
|
||||
} catch (Exception e1) {
|
||||
System.err.println(
|
||||
"Could not close MongoDB connection pool: "
|
||||
+ e1.toString());
|
||||
e1.printStackTrace();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup any state for this DB.
|
||||
* Called once per DB instance; there is one DB instance per client thread.
|
||||
*/
|
||||
@Override
|
||||
public void cleanup() throws DBException {
|
||||
if (initCount.decrementAndGet() <= 0) {
|
||||
try {
|
||||
mongo.close();
|
||||
}
|
||||
catch (Exception e1) {
|
||||
System.err.println("Could not close MongoDB connection pool: "
|
||||
+ e1.toString());
|
||||
e1.printStackTrace();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a record from the database.
|
||||
*
|
||||
|
@ -125,8 +157,9 @@ public class MongoDbClient extends DB {
|
|||
* @param key The record key of the record to delete.
|
||||
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
|
||||
*/
|
||||
@Override
|
||||
public int delete(String table, String key) {
|
||||
com.mongodb.DB db=null;
|
||||
com.mongodb.DB db = null;
|
||||
try {
|
||||
db = mongo.getDB(database);
|
||||
db.requestStart();
|
||||
|
@ -134,20 +167,18 @@ public class MongoDbClient extends DB {
|
|||
DBObject q = new BasicDBObject().append("_id", key);
|
||||
WriteResult res = collection.remove(q, writeConcern);
|
||||
return res.getN() == 1 ? 0 : 1;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
System.err.println(e.toString());
|
||||
return 1;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (db!=null)
|
||||
{
|
||||
finally {
|
||||
if (db != null) {
|
||||
db.requestDone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
|
||||
* record key.
|
||||
|
@ -157,7 +188,9 @@ public class MongoDbClient extends DB {
|
|||
* @param values A HashMap of field/value pairs to insert in the record
|
||||
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
|
||||
*/
|
||||
public int insert(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
@Override
|
||||
public int insert(String table, String key,
|
||||
HashMap<String, ByteIterator> values) {
|
||||
com.mongodb.DB db = null;
|
||||
try {
|
||||
db = mongo.getDB(database);
|
||||
|
@ -166,24 +199,23 @@ public class MongoDbClient extends DB {
|
|||
|
||||
DBCollection collection = db.getCollection(table);
|
||||
DBObject r = new BasicDBObject().append("_id", key);
|
||||
for(String k: values.keySet()) {
|
||||
r.put(k, values.get(k).toArray());
|
||||
}
|
||||
WriteResult res = collection.insert(r,writeConcern);
|
||||
for (String k : values.keySet()) {
|
||||
r.put(k, values.get(k).toArray());
|
||||
}
|
||||
WriteResult res = collection.insert(r, writeConcern);
|
||||
return res.getError() == null ? 0 : 1;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
} finally {
|
||||
if (db!=null)
|
||||
{
|
||||
}
|
||||
finally {
|
||||
if (db != null) {
|
||||
db.requestDone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
|
||||
*
|
||||
|
@ -193,6 +225,8 @@ public class MongoDbClient extends DB {
|
|||
* @param result A HashMap of field/value pairs for the result
|
||||
* @return Zero on success, a non-zero error code on error or "not found".
|
||||
*/
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public int read(String table, String key, Set<String> fields,
|
||||
HashMap<String, ByteIterator> result) {
|
||||
com.mongodb.DB db = null;
|
||||
|
@ -204,16 +238,16 @@ public class MongoDbClient extends DB {
|
|||
DBCollection collection = db.getCollection(table);
|
||||
DBObject q = new BasicDBObject().append("_id", key);
|
||||
DBObject fieldsToReturn = new BasicDBObject();
|
||||
boolean returnAllFields = fields == null;
|
||||
|
||||
DBObject queryResult = null;
|
||||
if (!returnAllFields) {
|
||||
if (fields != null) {
|
||||
Iterator<String> iter = fields.iterator();
|
||||
while (iter.hasNext()) {
|
||||
fieldsToReturn.put(iter.next(), 1);
|
||||
fieldsToReturn.put(iter.next(), INCLUDE);
|
||||
}
|
||||
queryResult = collection.findOne(q, fieldsToReturn);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
queryResult = collection.findOne(q);
|
||||
}
|
||||
|
||||
|
@ -221,19 +255,18 @@ public class MongoDbClient extends DB {
|
|||
result.putAll(queryResult.toMap());
|
||||
}
|
||||
return queryResult != null ? 0 : 1;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
System.err.println(e.toString());
|
||||
return 1;
|
||||
} finally {
|
||||
if (db!=null)
|
||||
{
|
||||
}
|
||||
finally {
|
||||
if (db != null) {
|
||||
db.requestDone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
|
||||
* record key, overwriting any existing values with the same field name.
|
||||
|
@ -243,7 +276,9 @@ public class MongoDbClient extends DB {
|
|||
* @param values A HashMap of field/value pairs to update in the record
|
||||
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
|
||||
*/
|
||||
public int update(String table, String key, HashMap<String, ByteIterator> values) {
|
||||
@Override
|
||||
public int update(String table, String key,
|
||||
HashMap<String, ByteIterator> values) {
|
||||
com.mongodb.DB db = null;
|
||||
try {
|
||||
db = mongo.getDB(database);
|
||||
|
@ -264,19 +299,18 @@ public class MongoDbClient extends DB {
|
|||
WriteResult res = collection.update(q, u, false, false,
|
||||
writeConcern);
|
||||
return res.getN() == 1 ? 0 : 1;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
System.err.println(e.toString());
|
||||
return 1;
|
||||
} finally {
|
||||
if (db!=null)
|
||||
{
|
||||
}
|
||||
finally {
|
||||
if (db != null) {
|
||||
db.requestDone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap.
|
||||
*
|
||||
|
@ -287,9 +321,10 @@ public class MongoDbClient extends DB {
|
|||
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
|
||||
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
|
||||
*/
|
||||
@Override
|
||||
public int scan(String table, String startkey, int recordcount,
|
||||
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
|
||||
com.mongodb.DB db=null;
|
||||
com.mongodb.DB db = null;
|
||||
try {
|
||||
db = mongo.getDB(database);
|
||||
db.requestStart();
|
||||
|
@ -299,23 +334,44 @@ public class MongoDbClient extends DB {
|
|||
DBObject q = new BasicDBObject().append("_id", scanRange);
|
||||
DBCursor cursor = collection.find(q).limit(recordcount);
|
||||
while (cursor.hasNext()) {
|
||||
//toMap() returns a Map, but result.add() expects a Map<String,String>. Hence, the suppress warnings.
|
||||
result.add(StringByteIterator.getByteIteratorMap((Map<String,String>)cursor.next().toMap()));
|
||||
// toMap() returns a Map, but result.add() expects a
|
||||
// Map<String,String>. Hence, the suppress warnings.
|
||||
HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
|
||||
|
||||
DBObject obj = cursor.next();
|
||||
fillMap(resultMap, obj);
|
||||
|
||||
result.add(resultMap);
|
||||
}
|
||||
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
System.err.println(e.toString());
|
||||
return 1;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (db!=null)
|
||||
{
|
||||
finally {
|
||||
if (db != null) {
|
||||
db.requestDone();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO - Finish
|
||||
*
|
||||
* @param resultMap
|
||||
* @param obj
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void fillMap(HashMap<String, ByteIterator> resultMap, DBObject obj) {
|
||||
Map<String, Object> objMap = obj.toMap();
|
||||
for (Map.Entry<String, Object> entry : objMap.entrySet()) {
|
||||
if (entry.getValue() instanceof byte[]) {
|
||||
resultMap.put(entry.getKey(), new ByteArrayByteIterator(
|
||||
(byte[]) entry.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче