addressed review comments, code cleaned up

This commit is contained in:
Mohammad Derakhshani 2017-10-17 15:51:58 -07:00
Родитель d22e4aefc6
Коммит 208fcd3d9c
7 изменённых файлов: 156 добавлений и 139 удалений

Просмотреть файл

@ -134,7 +134,6 @@ class BatchInserter {
public Iterator<Callable<InsertMetrics>> miniBatchInsertExecutionCallableIterator() {
// TODO: FIXME handle scenario where due to a non-retriable error we should break out of the stream loop
Stream<Callable<InsertMetrics>> stream = batchesToInsert.stream().map(miniBatch -> {
return new Callable<InsertMetrics>() {

Просмотреть файл

@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.Iterator;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -34,7 +36,8 @@ import com.microsoft.azure.documentdb.Undefined;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
class DocumentAnalyzer {
private final static ObjectMapper objectMapper = new ObjectMapper();
private final static Logger LOGGER = LoggerFactory.getLogger(DocumentAnalyzer.class);
/**
* Extracts effective {@link PartitionKeyInternal} from serialized document.
* @param documentAsString Serialized document to extract partition key value from.
@ -52,43 +55,53 @@ class DocumentAnalyzer {
}
private static PartitionKeyInternal extractPartitionKeyValueInternal(String documentAsString, PartitionKeyDefinition partitionKeyDefinition) {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode root;
try {
root = objectMapper.readTree(documentAsString);
}catch (Exception e) {
throw new RuntimeException(e);
Iterator<String> path = partitionKeyDefinition.getPaths().iterator();
JsonNode node = root.path(path.next().substring(1));
while(path.hasNext()) {
node = node.path(path.next());
}
Object partitionKeyValue = null;
switch (node.getNodeType()) {
case BOOLEAN:
partitionKeyValue = node.booleanValue();
break;
case MISSING:
partitionKeyValue = Undefined.Value();
break;
case NULL:
partitionKeyValue = JSONObject.NULL;
break;
case NUMBER:
partitionKeyValue = node.numberValue();
break;
case STRING:
partitionKeyValue = node.textValue();
break;
default:
throw new RuntimeException(String.format("undefined json type %s", node.getNodeType()));
}
return fromPartitionKeyvalue(partitionKeyValue);
} catch (Exception e) {
LOGGER.error("Failed to extract partition key value from document {}", documentAsString, e);
throw ExceptionUtils.toRuntimeException(e);
}
}
Iterator<String> path = partitionKeyDefinition.getPaths().iterator();
JsonNode node = root.path(path.next().substring(1));
while(path.hasNext()) {
node = node.path(path.next());
public static PartitionKeyInternal fromPartitionKeyvalue(Object partitionKeyValue) {
try {
return PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), true);
} catch (Exception e) {
LOGGER.error("Failed to instantiate ParitionKeyInternal from {}", partitionKeyValue, e);
throw ExceptionUtils.toRuntimeException(e);
}
Object partitionKeyValue = null;
switch (node.getNodeType()) {
case BOOLEAN:
partitionKeyValue = node.booleanValue();
break;
case MISSING:
partitionKeyValue = Undefined.Value();
break;
case NULL:
partitionKeyValue = JSONObject.NULL;
break;
case NUMBER:
partitionKeyValue = node.numberValue();
break;
case STRING:
partitionKeyValue = node.textValue();
break;
default:
throw new RuntimeException(String.format("undefined json type %s", node.getNodeType()));
}
return PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), true);
}
}

Просмотреть файл

@ -45,6 +45,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AsyncCallable;
@ -62,7 +63,6 @@ import com.microsoft.azure.documentdb.Offer;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.internal.HttpConstants;
import com.microsoft.azure.documentdb.internal.query.funcs.Func2;
import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache;
@ -248,7 +248,14 @@ public class DocumentBulkImporter implements AutoCloseable {
* Executes a bulk import in the Azure Cosmos DB database service.
*
* <code>
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, null, null);
* ConnectionPolicy connectionPolicy = new ConnectionPolicy();
* RetryOptions retryOptions = new RetryOptions();
* // set to 0 to let bulk importer handles throttling
* retryOptions.setMaxRetryAttemptsOnThrottledRequests(0);
* connectionPolicy.setRetryOptions(retryOptions);
* connectionPolicy.setMaxPoolSize(200);
*
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, connectionPolicy, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycol");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
@ -264,6 +271,7 @@ public class DocumentBulkImporter implements AutoCloseable {
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
* }
*
* importer.close();
* client.close();
* </code>
* @param documents to insert
@ -272,28 +280,24 @@ public class DocumentBulkImporter implements AutoCloseable {
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse importAll(Collection<String> documents, boolean isUpsert) throws DocumentClientException {
Func2<Collection<String>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction = new Func2<Collection<String>, ConcurrentHashMap<String,Set<String>>, Void>() {
@Override
public Void apply(Collection<String> documents, ConcurrentHashMap<String, Set<String>> partitionKeyToBucket) throws Exception {
documents.parallelStream().forEach(document -> {
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
partitionKeyToBucket.get(partitionRangeId).add(document);
});
return null;
}
};
return executeBulkImportInternal(documents, bucketingFunction, isUpsert);
return executeBulkImportInternal(documents,
document -> document,
document -> DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition),
isUpsert);
}
/**
* Executes a bulk import in the Azure Cosmos DB database service.
*
* <code>
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, null, null);
* ConnectionPolicy connectionPolicy = new ConnectionPolicy();
* RetryOptions retryOptions = new RetryOptions();
* // set to 0 to let bulk importer handles throttling
* retryOptions.setMaxRetryAttemptsOnThrottledRequests(0);
* connectionPolicy.setRetryOptions(retryOptions);
* connectionPolicy.setMaxPoolSize(200);
*
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, connectionPolicy, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycold");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
@ -301,7 +305,7 @@ public class DocumentBulkImporter implements AutoCloseable {
* BulkImporter importer = new BulkImporter(client, collection);
*
* for(int i = 0; i < 10; i++) {
* List<Tuple> tuples = documentSource.getMoreDocumentsPartitionKeyValueTuples();
* List<DocumentPKValuePair> tuples = documentSource.getMoreDocumentsPartitionKeyValueTuples();
*
* BulkImportResponse bulkImportResponse = importer.importAllWithPartitionKey(tuples, false);
*
@ -309,7 +313,9 @@ public class DocumentBulkImporter implements AutoCloseable {
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
* }
*
* importer.close();
* client.close();
*
* </code>
* @param documentPartitionKeyValueTuples list of {@link DocumentPKValuePair}
* @param isUpsert whether enable upsert (overwrite if it exists)
@ -318,31 +324,20 @@ public class DocumentBulkImporter implements AutoCloseable {
*/
public BulkImportResponse importAllWithPartitionKey(Collection<DocumentPKValuePair> documentPartitionKeyValueTuples, boolean isUpsert) throws DocumentClientException {
Func2<Collection<DocumentPKValuePair>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction =
new Func2<Collection<DocumentPKValuePair>, ConcurrentHashMap<String,Set<String>>, Void>() {
@Override
public Void apply(Collection<DocumentPKValuePair> input, ConcurrentHashMap<String, Set<String>> partitionKeyToBucket) throws Exception {
input.parallelStream().forEach(tuple -> {
PartitionKeyInternal partitionKeyValue = PartitionKeyInternal.fromObjectArray(Collections.singletonList(tuple.partitionKeyValue), true);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
partitionKeyToBucket.get(partitionRangeId).add(tuple.document);
});
return null;
};
};
return executeBulkImportInternal(documentPartitionKeyValueTuples, bucketingFunction, isUpsert);
return executeBulkImportInternal(documentPartitionKeyValueTuples,
tuple -> tuple.document,
tuple -> DocumentAnalyzer.fromPartitionKeyvalue(tuple.partitionKeyValue),
isUpsert);
}
private <T> BulkImportResponse executeBulkImportInternal(Collection<T> input,
Func2<Collection<T>, ConcurrentHashMap<String, Set<String>>, Void> bucketByPartitionFunc,
Function<T, String> getDocument,
Function<T, PartitionKeyInternal> getPartitionKey,
boolean isUpsert) throws DocumentClientException {
Preconditions.checkNotNull(input, "document collection cannot be null");
try {
initializationFuture.get();
return executeBulkImportAsyncImpl(input, bucketByPartitionFunc, isUpsert).get();
return executeBulkImportAsyncImpl(input, getDocument, getPartitionKey, isUpsert).get();
} catch (ExecutionException e) {
logger.debug("Failed to import documents", e);
@ -367,7 +362,8 @@ public class DocumentBulkImporter implements AutoCloseable {
}
private <T> ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<T> input,
Func2<Collection<T>, ConcurrentHashMap<String, Set<String>>, Void> bucketByPartitionFunc,
Function<T, String> getDocument,
Function<T, PartitionKeyInternal> getPartitionKey,
boolean isUpsert) throws Exception {
Stopwatch watch = Stopwatch.createStarted();
@ -385,7 +381,12 @@ public class DocumentBulkImporter implements AutoCloseable {
logger.debug("Bucketing documents ...");
bucketByPartitionFunc.apply(input, documentsToImportByPartition);
input.parallelStream().forEach(item -> {
PartitionKeyInternal partitionKeyValue = getPartitionKey.apply(item);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
documentsToImportByPartition.get(partitionRangeId).add(getDocument.apply(item));
});
logger.trace("Creating mini batches within each partition bucket");
int maxMiniBatchSize = (int)Math.floor(MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE * FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED);

Просмотреть файл

@ -27,7 +27,6 @@ public class DocumentPKValuePair {
public final Object partitionKeyValue;
public DocumentPKValuePair(String documentAsString, Object partitionKeyValue) {
//Preconditions.checkNotNull(documentAsString);
this.document = documentAsString;
this.partitionKeyValue = partitionKeyValue;
}

Просмотреть файл

@ -58,4 +58,12 @@ class ExceptionUtils {
}
return dce;
}
public static RuntimeException toRuntimeException(Exception e) {
if (e instanceof RuntimeException) {
return (RuntimeException) e;
} else {
return new RuntimeException(e);
}
}
}

Просмотреть файл

@ -49,83 +49,80 @@ public class Main {
CmdLineConfiguration cfg = parseCommandLineArgs(args);
DocumentClient client = documentClientFrom(cfg);
try(DocumentClient client = documentClientFrom(cfg)) {
String collectionLink = String.format("/dbs/%s/colls/%s", cfg.getDatabaseId(), cfg.getCollectionId());
// this assumes database and collection already exists
// also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection.
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
String collectionLink = String.format("/dbs/%s/colls/%s", cfg.getDatabaseId(), cfg.getCollectionId());
// this assumes database and collection already exists
// also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection.
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
// instantiates bulk importer
DocumentBulkImporter bulkImporter = new DocumentBulkImporter(client, collection);
// instantiates bulk importer
try(DocumentBulkImporter bulkImporter = new DocumentBulkImporter(client, collection)) {
Stopwatch totalWatch = Stopwatch.createUnstarted();
Stopwatch totalWatch = Stopwatch.createUnstarted();
double totalRequestCharge = 0;
long totalTimeInMillis = 0;
long totalNumberOfDocumentsImported = 0;
double totalRequestCharge = 0;
long totalTimeInMillis = 0;
long totalNumberOfDocumentsImported = 0;
for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) {
for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) {
BulkImportResponse bulkImportResponse;
if (cfg.isWithPreprocessedPartitionKeyValue()) {
Collection<DocumentPKValuePair> documentPartitionKeyValueTuples = DataMigrationDocumentSource.loadDocumentPartitionKeyValueTuples(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
BulkImportResponse bulkImportResponse;
if (cfg.isWithPreprocessedPartitionKeyValue()) {
Collection<DocumentPKValuePair> documentPartitionKeyValueTuples = DataMigrationDocumentSource.loadDocumentPartitionKeyValueTuples(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAllWithPartitionKey(documentPartitionKeyValueTuples, false);
totalWatch.stop();
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAllWithPartitionKey(documentPartitionKeyValueTuples, false);
totalWatch.stop();
} else {
Collection<String> documents = DataMigrationDocumentSource.loadDocuments(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
} else {
Collection<String> documents = DataMigrationDocumentSource.loadDocuments(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAll(documents, false);
totalWatch.stop();
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAll(documents, false);
totalWatch.stop();
}
}
System.out.println("##########################################################################################");
System.out.println("##########################################################################################");
totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported();
totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis();
totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed();
totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported();
totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis();
totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed();
// check the number of imported documents to ensure everything is successfully imported
// bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
if (bulkImportResponse.getNumberOfDocumentsImported() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
System.err.println("Some documents failed to get inserted in this checkpoint");
}
// check the number of imported documents to ensure everything is successfully imported
// bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
if (bulkImportResponse.getNumberOfDocumentsImported() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
System.err.println("Some documents failed to get inserted in this checkpoint");
}
// print stats
System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported());
System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis());
System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed());
// print stats
System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported());
System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis());
System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed());
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("##########################################################################################");
}
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("##########################################################################################");
}
// print average stats
System.out.println("##########################################################################################");
// print average stats
System.out.println("##########################################################################################");
System.out.println("Total import time in milli seconds measured by stopWatch: " + totalWatch.elapsed().toMillis());
System.out.println("Total import time in milli seconds measured by api : " + totalTimeInMillis);
System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported);
System.out.println("Total request unit consumed: " + totalRequestCharge);
System.out.println("Average RUs/second:" + totalRequestCharge / (totalWatch.elapsed().toMillis() * 0.001));
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalWatch.elapsed().toMillis() * 0.001));
System.out.println("Total import time in milli seconds measured by stopWatch: " + totalWatch.elapsed().toMillis());
System.out.println("Total import time in milli seconds measured by api : " + totalTimeInMillis);
System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported);
System.out.println("Total request unit consumed: " + totalRequestCharge);
System.out.println("Average RUs/second:" + totalRequestCharge / (totalWatch.elapsed().toMillis() * 0.001));
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalWatch.elapsed().toMillis() * 0.001));
// close bulk importer to release any existing resources
bulkImporter.close();
// close document client
client.close();
}
} // close bulk importer
} // closes client
}
static class DataMigrationDocumentSource {
@ -225,7 +222,7 @@ public class Main {
policy.setRetryOptions(retryOptions);
policy.setConnectionMode(cfg.getConnectionMode());
policy.setMaxPoolSize(cfg.getMaxConnectionPoolSize());
return new DocumentClient(cfg.getServiceEndpoint(), cfg.getMasterKey(),
policy, cfg.getConsistencyLevel());
}

Просмотреть файл

@ -40,14 +40,14 @@ public class Sample {
public static void main(String[] args) throws DocumentClientException, InterruptedException, ExecutionException {
ConnectionPolicy policy = new ConnectionPolicy();
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
RetryOptions retryOptions = new RetryOptions();
// set to 0 to let bulk importer handles throttling
retryOptions.setMaxRetryAttemptsOnThrottledRequests(0);
policy.setRetryOptions(retryOptions);
policy.setMaxPoolSize(200);
connectionPolicy.setRetryOptions(retryOptions);
connectionPolicy.setMaxPoolSize(200);
try(DocumentClient client = new DocumentClient(HOST, MASTER_KEY, policy, ConsistencyLevel.Session)) {
try(DocumentClient client = new DocumentClient(HOST, MASTER_KEY, connectionPolicy, ConsistencyLevel.Session)) {
String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycol");
// this assumes database and collection already exists