added support to specify readPreference

This commit is contained in:
Matias Cascallares 2013-10-10 12:13:00 +08:00 коммит произвёл Robert J. Moore
Родитель c960961266
Коммит 0b68bd48fe
3 изменённых файлов: 79 добавлений и 6 удалений

Просмотреть файл

@ -78,6 +78,14 @@ See the next section for the list of configuration parameters for MongoDB.
- `journaled`
- `replica_acknowledged`
- `mongodb.readPreference` default `primary`
- options are :
- `primary`
- `primary_preferred`
- `secondary`
- `secondary_preferred`
- `nearest`
- `mongodb.maxconnections` (default `100`)
- `mongodb.threadsAllowedToBlockForConnectionMultiplier` (default `5`)

Просмотреть файл

@ -24,6 +24,7 @@ import com.allanbank.mongodb.MongoDatabase;
import com.allanbank.mongodb.MongoDbUri;
import com.allanbank.mongodb.MongoFactory;
import com.allanbank.mongodb.MongoIterator;
import com.allanbank.mongodb.ReadPreference;
import com.allanbank.mongodb.bson.Document;
import com.allanbank.mongodb.bson.Element;
import com.allanbank.mongodb.bson.ElementType;
@ -68,6 +69,9 @@ public class AsyncMongoDbClient extends DB {
/** The write concern for the requests. */
private static Durability writeConcern;
/** Which servers to use for reads. */
private static ReadPreference readPreference;
/** The database to MongoDB. */
private MongoDatabase db;
@ -173,6 +177,32 @@ public class AsyncMongoDbClient extends DB {
System.exit(1);
}
// readPreference
String readPreferenceType = props.getProperty(
"mongodb.readPreference", "primary").toLowerCase();
if ("primary".equals(readPreferenceType)) {
readPreference = ReadPreference.primary();
}
else if ("primary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.preferPrimary();
}
else if ("secondary".equals(readPreferenceType)) {
readPreference = ReadPreference.secondary();
}
else if ("secondary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.preferSecondary();
}
else if ("nearest".equals(readPreferenceType)) {
readPreference = ReadPreference.closest();
}
else {
System.err
.println("ERROR: Invalid readPreference: '"
+ readPreferenceType
+ "'. Must be [ primary | primary_preferred | secondary | secondary_preferred | nearest ]");
System.exit(1);
}
try {
// need to append db to url.
url += "/" + database;
@ -261,6 +291,7 @@ public class AsyncMongoDbClient extends DB {
fb.projection(fieldsToReturn);
fb.setLimit(1);
fb.setBatchSize(1);
fb.readPreference(readPreference);
final MongoIterator<Document> ci = collection.find(fb.build());
if (ci.hasNext()) {
@ -314,6 +345,7 @@ public class AsyncMongoDbClient extends DB {
fb.setQuery(where("_id").greaterThanOrEqualTo(startkey));
fb.setLimit(recordcount);
fb.setBatchSize(recordcount);
fb.readPreference(readPreference);
if (fields != null) {
final DocumentBuilder fieldsDoc = BuilderFactory.start();
for (final String field : fields) {

Просмотреть файл

@ -25,6 +25,7 @@ import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
@ -53,6 +54,9 @@ public class MongoDbClient extends DB {
/** The default write concern for the test. */
private static WriteConcern writeConcern;
/** The default read preference for the test */
private static ReadPreference readPreference;
/** The database to access. */
private static String database;
@ -88,10 +92,7 @@ public class MongoDbClient extends DB {
}
database = props.getProperty("mongodb.database", "ycsb");
String writeConcernType = props.getProperty("mongodb.writeConcern",
"acknowledged").toLowerCase();
// Set connectionpool to size of ycsb thread pool
final String maxConnections = props.getProperty(
"mongodb.maxconnections", "100");
final String threadsAllowedToBlockForConnectionMultiplier = props
@ -99,6 +100,9 @@ public class MongoDbClient extends DB {
"mongodb.threadsAllowedToBlockForConnectionMultiplier",
"5");
// write concern
String writeConcernType = props.getProperty("mongodb.writeConcern",
"acknowledged").toLowerCase();
if ("errors_ignored".equals(writeConcernType)) {
writeConcern = WriteConcern.ERRORS_IGNORED;
}
@ -123,12 +127,39 @@ public class MongoDbClient extends DB {
System.exit(1);
}
// readPreference
String readPreferenceType = props.getProperty(
"mongodb.readPreference", "primary").toLowerCase();
if ("primary".equals(readPreferenceType)) {
readPreference = ReadPreference.primary();
}
else if ("primary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.primaryPreferred();
}
else if ("secondary".equals(readPreferenceType)) {
readPreference = ReadPreference.secondary();
}
else if ("secondary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.secondaryPreferred();
}
else if ("nearest".equals(readPreferenceType)) {
readPreference = ReadPreference.nearest();
}
else {
System.err
.println("ERROR: Invalid readPreference: '"
+ readPreferenceType
+ "'. Must be [ primary | primary_preferred | secondary | secondary_preferred | nearest ]");
System.exit(1);
}
try {
// strip out prefix since Java driver doesn't currently support
// standard connection format URL yet
// http://www.mongodb.org/display/DOCS/Connections
if (url.startsWith("mongodb://")) {
url = url.substring(10);
}
// need to append db to url.
@ -282,10 +313,11 @@ public class MongoDbClient extends DB {
while (iter.hasNext()) {
fieldsToReturn.put(iter.next(), INCLUDE);
}
queryResult = collection.findOne(q, fieldsToReturn);
queryResult = collection.findOne(q, fieldsToReturn,
readPreference);
}
else {
queryResult = collection.findOne(q);
queryResult = collection.findOne(q, null, readPreference);
}
if (queryResult != null) {
@ -382,7 +414,8 @@ public class MongoDbClient extends DB {
// { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} }
DBObject scanRange = new BasicDBObject().append("$gte", startkey);
DBObject q = new BasicDBObject().append("_id", scanRange);
cursor = collection.find(q).limit(recordcount);
cursor = collection.find(q).setReadPreference(readPreference)
.limit(recordcount);
while (cursor.hasNext()) {
// toMap() returns a Map, but result.add() expects a
// Map<String,String>. Hence, the suppress warnings.