This commit is contained in:
Mohammad Derakhshani 2017-10-19 12:47:28 -07:00
Родитель 6931007cbb
Коммит 860c41a444
4 изменённых файлов: 114 добавлений и 99 удалений

Просмотреть файл

@ -75,7 +75,7 @@ class BatchInserter {
*/
private final String bulkImportSprocLink;
/**
/**
* Request options specifying the underlying partition key range id
*/
private final RequestOptions requestOptions;

Просмотреть файл

@ -149,7 +149,7 @@ class CongestionController {
this.throttleSemaphore = new Semaphore(this.degreeOfConcurrency);
this.aggregatedInsertMetrics = new InsertMetrics();
this.executor = executor;
this.partitionThroughput = partitionThroughput;
this.partitionThroughput = partitionThroughput;
}
private void addFailure(Exception e) {
@ -164,7 +164,7 @@ class CongestionController {
synchronized (aggregateLock) {
InsertMetrics old = this.aggregatedInsertMetrics;
this.aggregatedInsertMetrics = metrics;
return old;
return old;
}
}
@ -176,7 +176,7 @@ class CongestionController {
while (isRunning()) {
try {
logger.debug("pki {} goes to sleep for {} seconds. availabel semaphore permits {}, current degree of parallelism {}",
logger.debug("pki {} goes to sleep for {} seconds. availabel semaphore permits {}, current degree of parallelism {}",
partitionKeyRangeId, samplePeriod.getSeconds(), throttleSemaphore.availablePermits(), degreeOfConcurrency);
Thread.sleep(samplePeriod.toMillis());
logger.debug("pki {} wakes up", partitionKeyRangeId);
@ -184,7 +184,7 @@ class CongestionController {
InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics());
if (insertMetricsSample.numberOfThrottles > 0) {
logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}",
logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}",
partitionKeyRangeId, insertMetricsSample.numberOfThrottles, degreeOfConcurrency, degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR);
// We got a throttle so we need to back off on the degree of concurrency.
@ -317,6 +317,7 @@ class CongestionController {
logger.error("pki {} encountered failure {} releasing semaphore", partitionKeyRangeId, t);
// if a batch inserter encounters failure which cannot be retried then we have to stop.
setState(State.Failure);
addFailure(ExceptionUtils.toException(t));
throttleSemaphore.release();
}
};
@ -338,7 +339,6 @@ class CongestionController {
@Override
public void onFailure(Throwable t) {
logger.error("pki {} importing failed", partitionKeyRangeId, t);
addFailure(ExceptionUtils.toException(t));
setState(State.Failure);
}
};

Просмотреть файл

@ -91,7 +91,7 @@ public class DocumentBulkImporter implements AutoCloseable {
/**
* use the given size to configure max mini batch size.
*
*
* If not specified will use the default.
* @param size
* @return {@link Builder}
@ -103,7 +103,7 @@ public class DocumentBulkImporter implements AutoCloseable {
/**
* Instantiates {@link DocumentBulkImporter} given the configured {@link Builder}.
*
*
* @return the new builder
*/
public DocumentBulkImporter build() {
@ -249,7 +249,7 @@ public class DocumentBulkImporter implements AutoCloseable {
@Override
public void close() {
// disable submission of new tasks
listeningExecutorService.shutdown();
listeningExecutorService.shutdown();
try {
// wait for existing tasks to terminate
if (!listeningExecutorService.awaitTermination(60, TimeUnit.SECONDS)) {
@ -310,27 +310,31 @@ public class DocumentBulkImporter implements AutoCloseable {
* RetryOptions retryOptions = new RetryOptions();
* // set to 0 to let bulk importer handles throttling
* retryOptions.setMaxRetryAttemptsOnThrottledRequests(0);
* connectionPolicy.setRetryOptions(retryOptions);
* connectionPolicy.setRetryOptions(retryOptions);
* connectionPolicy.setMaxPoolSize(200);
*
*
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, connectionPolicy, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycol");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* DocumentBulkImporter importer = DocumentBulkImporter.builder().from(client, collection).build();
*
*
* for(int i = 0; i < 10; i++) {
* List<String> documents = documentSource.getMoreDocuments();
*
* BulkImportResponse bulkImportResponse = importer.importAll(documents, false);
*
*
* //validate that all documents inserted to ensure no failure.
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
* if (bulkImportResponse.getNumberOfDocumentsImported() < documents.size()) {
* for(Exception e: bulkImportResponse.getFailuresIfAny()) {
* // validate why there were some failures
* // in case if you decide to re-import these batch of documents (as some of them may already have inserted)
* // you should import them with upsert enable option
* e.printStackTrace();
* }
* break;
* }
* }
*
@ -349,7 +353,7 @@ public class DocumentBulkImporter implements AutoCloseable {
@Override
public ConcurrentHashMap<String, Set<String>> apply(Collection<String> input) {
logger.debug("Bucketing documents ...");
ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = new ConcurrentHashMap<String, Set<String>>();
for (String partitionKeyRangeId: partitionKeyRangeIds) {
@ -362,12 +366,12 @@ public class DocumentBulkImporter implements AutoCloseable {
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
documentsToImportByPartition.get(partitionRangeId).add(documentAsString);
});
return documentsToImportByPartition;
}
};
return executeBulkImportInternal(documents,
return executeBulkImportInternal(documents,
bucketingFunction,
isUpsert);
}
@ -380,33 +384,37 @@ public class DocumentBulkImporter implements AutoCloseable {
* RetryOptions retryOptions = new RetryOptions();
* // set to 0 to let bulk importer handles throttling
* retryOptions.setMaxRetryAttemptsOnThrottledRequests(0);
* connectionPolicy.setRetryOptions(retryOptions);
* connectionPolicy.setRetryOptions(retryOptions);
* connectionPolicy.setMaxPoolSize(200);
*
*
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, connectionPolicy, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycold");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* DocumentBulkImporter importer = DocumentBulkImporter.builder().from(client, collection).build();
*
* for(int i = 0; i < 10; i++) {
* HashMap<String, Object> tuples = documentSource.loadDocumentToPartitionKeyValueMap();
*
* BulkImportResponse bulkImportResponse = importer.importAllWithPartitionKey(tuples, false);
*
* for(int i = 0; i < 10; i++) {
* HashMap<String, Object> documentToPartitionKeyValue = documentSource.loadDocumentToPartitionKeyValueMap();
*
* BulkImportResponse bulkImportResponse = importer.importAllWithPartitionKey(documentToPartitionKeyValue, false);
*
* //validate that all documents inserted to ensure no failure.
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
* if (bulkImportResponse.getNumberOfDocumentsImported() < documents.size()) {
* // bulkImportResponse.getNumberOfDocumentsImported() == documentToPartitionKeyValue.size()
* if (bulkImportResponse.getNumberOfDocumentsImported() < documentToPartitionKeyValue.size()) {
* // validate why there were some failures
* // in case if you decide to re-import these batch of documents (as some of them may already have inserted)
* // you should import them with upsert enable option
* for(Exception e: bulkImportResponse.getFailuresIfAny()) {
* e.printStackTrace();
* }
* break;
* }
* }
*
* importer.close();
* client.close();
*
*
* </code>
* @param documentPartitionKeyValueTuples list of {@link DocumentPKValuePair}
* @param isUpsert whether enable upsert (overwrite if it exists)
@ -420,7 +428,7 @@ public class DocumentBulkImporter implements AutoCloseable {
@Override
public ConcurrentHashMap<String, Set<String>> apply(HashMap<String, Object> input) {
logger.debug("Bucketing documents ...");
ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = new ConcurrentHashMap<String, Set<String>>();
for (String partitionKeyRangeId: partitionKeyRangeIds) {
@ -433,17 +441,17 @@ public class DocumentBulkImporter implements AutoCloseable {
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
documentsToImportByPartition.get(partitionRangeId).add(entry.getKey());
});
return documentsToImportByPartition;
}
};
return executeBulkImportInternal(documentPartitionKeyValueTuples,
bucketingFunction,
return executeBulkImportInternal(documentPartitionKeyValueTuples,
bucketingFunction,
isUpsert);
}
private <T> BulkImportResponse executeBulkImportInternal(T input,
private <T> BulkImportResponse executeBulkImportInternal(T input,
Func1<T, ConcurrentHashMap<String, Set<String>>> bucketingFunction,
boolean isUpsert) throws DocumentClientException {
Preconditions.checkNotNull(input, "document collection cannot be null");
@ -462,7 +470,7 @@ public class DocumentBulkImporter implements AutoCloseable {
} catch(Exception e) {
logger.error("Failed to import documents", e);
throw toDocumentClientException(e);
}
}
}
private DocumentClientException toDocumentClientException(Exception e) {
@ -482,9 +490,9 @@ public class DocumentBulkImporter implements AutoCloseable {
return documentSize;
}
private <T> ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(T input,
private <T> ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(T input,
Func1<T, ConcurrentHashMap<String, Set<String>>> bucketingFunction,
boolean isUpsert) throws Exception {
boolean isUpsert) throws Exception {
Stopwatch watch = Stopwatch.createStarted();
BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, null, false, isUpsert);
@ -492,18 +500,14 @@ public class DocumentBulkImporter implements AutoCloseable {
logger.debug("Bucketing documents ...");
ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = bucketingFunction.apply(input);
int estimateMiniBatchesToImportByPartitionSize = documentsToImportByPartition
.entrySet().stream()
.mapToInt(e -> e.getValue().size())
.sum() / partitionKeyRangeIds.size();
logger.trace("Creating mini batches within each partition bucket");
ConcurrentHashMap<String, List<List<String>>> miniBatchesToImportByPartition = new ConcurrentHashMap<String, List<List<String>>>();
for (String partitionKeyRangeId: this.partitionKeyRangeIds) {
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>(estimateMiniBatchesToImportByPartitionSize));
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>(1000));
}
documentsToImportByPartition.entrySet().parallelStream().forEach(entry -> {
String partitionRangeId = entry.getKey();
@ -511,7 +515,7 @@ public class DocumentBulkImporter implements AutoCloseable {
Set<String> documentsToImportInPartition = entry.getValue();
Iterator<String> it = documentsToImportInPartition.iterator();
List<String> currentMiniBatch = new ArrayList<String>();
ArrayList<String> currentMiniBatch = new ArrayList<String>(500);
int currentMiniBatchSize = 0;
while (it.hasNext()) {
@ -525,8 +529,9 @@ public class DocumentBulkImporter implements AutoCloseable {
} else {
// this batch has reached its max size
miniBatchesToImportByPartition.get(partitionRangeId).add(currentMiniBatch);
currentMiniBatch = new ArrayList<String>();
currentMiniBatchSize = 0;
currentMiniBatch = new ArrayList<String>(500);
currentMiniBatch.add(currentDocument);
currentMiniBatchSize = currentDocumentSize;
}
}
@ -540,7 +545,7 @@ public class DocumentBulkImporter implements AutoCloseable {
Map<String, BatchInserter> batchInserters = new HashMap<String, BatchInserter>();
Map<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();
logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
List<ListenableFuture<Void>> futures = new ArrayList<>();
for (String partitionKeyRangeId: this.partitionKeyRangeIds) {
@ -553,15 +558,15 @@ public class DocumentBulkImporter implements AutoCloseable {
options);
batchInserters.put(partitionKeyRangeId, batchInserter);
CongestionController cc = new CongestionController(listeningExecutorService,
collectionThroughput / partitionKeyRangeIds.size(),
partitionKeyRangeId,
batchInserter,
CongestionController cc = new CongestionController(listeningExecutorService,
collectionThroughput / partitionKeyRangeIds.size(),
partitionKeyRangeId,
batchInserter,
partitionKeyRangeIdToInferredDegreeOfParallelism.get(partitionKeyRangeId));
congestionControllers.put(partitionKeyRangeId,cc);
// starting
// starting
futures.add(cc.executeAllAsync());
}

Просмотреть файл

@ -23,6 +23,7 @@
package com.microsoft.azure.documentdb.bulkimport;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
@ -60,10 +61,12 @@ public class Main {
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, collection);
// instantiates bulk importer
try(DocumentBulkImporter bulkImporter = bulkImporterBuilder.build()) {
Stopwatch fromStartToEnd = Stopwatch.createStarted();
Stopwatch totalWatch = Stopwatch.createUnstarted();
double totalRequestCharge = 0;
@ -75,8 +78,10 @@ public class Main {
BulkImportResponse bulkImportResponse;
if (cfg.isImportAllWithPartitionKey()) {
HashMap<String, Object> documentPartitionKeyValueTuples = DataMigrationDocumentSource.loadDocumentToPartitionKeyValueMap(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
// NOTE: only sum the bulk import time,
if (documentPartitionKeyValueTuples.size() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
throw new RuntimeException("not enough documents generated");
}
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAllWithPartitionKey(documentPartitionKeyValueTuples, false);
@ -85,7 +90,11 @@ public class Main {
} else {
Collection<String> documents = DataMigrationDocumentSource.loadDocuments(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
// NOTE: only sum the bulk import time,
if (documents.size() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
throw new RuntimeException("not enough documents generated");
}
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAll(documents, false);
@ -99,12 +108,6 @@ public class Main {
totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis();
totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed();
// check the number of imported documents to ensure everything is successfully imported
// bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
if (bulkImportResponse.getNumberOfDocumentsImported() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
System.err.println("Some documents failed to get inserted in this checkpoint");
}
// print stats
System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported());
System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis());
@ -113,11 +116,24 @@ public class Main {
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("##########################################################################################");
// check the number of imported documents to ensure everything is successfully imported
// bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
if (bulkImportResponse.getNumberOfDocumentsImported() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
System.err.println("Some documents failed to get inserted in this checkpoint. This checkpoint has to get retried with upsert enabled");
System.err.println("Number of surfaced failures: " + bulkImportResponse.getFailuresIfAny().size());
for(int j = 0; j < bulkImportResponse.getFailuresIfAny().size(); j++) {
bulkImportResponse.getFailuresIfAny().get(j).printStackTrace();
}
break;
}
}
fromStartToEnd.stop();
// print average stats
System.out.println("##########################################################################################");
System.out.println("Total import time including data generation: " + fromStartToEnd.elapsed().toMillis());
System.out.println("Total import time in milli seconds measured by stopWatch: " + totalWatch.elapsed().toMillis());
System.out.println("Total import time in milli seconds measured by api : " + totalTimeInMillis);
System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported);
@ -131,9 +147,29 @@ public class Main {
static class DataMigrationDocumentSource {
private static String generateDocument(String partitionKeyName, String partitionKeyValue) {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\"");
String data = UUID.randomUUID().toString();
data = data + data + "0123456789012";
for(int j = 0; j < 10;j++) {
sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\"");
}
// partition key
sb.append(",\"").append(partitionKeyName).append("\":\"").append(partitionKeyValue).append("\"");
sb.append("}");
return sb.toString();
}
/**
* Creates a collection of documents.
*
*
* @param numberOfDocuments
* @param partitionKeyDefinition
* @return collection of documents.
@ -144,39 +180,27 @@ public class Main {
partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition");
Collection<String> partitionKeyPath = partitionKeyDefinition.getPaths();
Preconditions.checkArgument(partitionKeyPath.size() == 1,
Preconditions.checkArgument(partitionKeyPath.size() == 1,
"the command line benchmark tool only support simple partition key path");
String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", "");
// the size of each document is approximately 1KB
ArrayList<String> allDocs = new ArrayList<>(numberOfDocuments);
// return documents to be bulk imported
// if you are reading documents from disk you can change this to read documents from disk
return IntStream.range(0, numberOfDocuments).mapToObj(i ->
{
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\"");
sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("\"");
String data = UUID.randomUUID().toString();
data = data + data + "0123456789012";
for(int j = 0; j < 10;j++) {
sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\"");
}
sb.append("}");
return sb.toString();
}).collect(Collectors.toList());
String partitionKeyValue = UUID.randomUUID().toString();
return generateDocument(partitionKeyName, partitionKeyValue);
}).collect(Collectors.toCollection(() -> allDocs));
}
/**
* Creates a collection of documents.
*
* Creates a map of documents to partition key value
*
* @param numberOfDocuments
* @param partitionKeyDefinition
* @return collection of documents
@ -187,38 +211,24 @@ public class Main {
partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition");
Collection<String> partitionKeyPath = partitionKeyDefinition.getPaths();
Preconditions.checkArgument(partitionKeyPath.size() == 1,
Preconditions.checkArgument(partitionKeyPath.size() == 1,
"the command line benchmark tool only support simple partition key path");
String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", "");
HashMap<String, Object> documentsToPartitionKeyValue = new HashMap<String, Object>(numberOfDocuments);
// the size of each document is approximately 1KB
// return collection of <document, partitionKeyValue> to be bulk imported
// if you are reading documents from disk you can change this to read documents from disk
IntStream.range(0, numberOfDocuments).mapToObj(i ->
{
StringBuilder sb = new StringBuilder();
String partitionKeyValue = UUID.randomUUID().toString();
sb.append("{");
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\"");
sb.append(",\"").append(partitionKeyName).append("\":\"").append(partitionKeyValue).append("\"");
String data = UUID.randomUUID().toString();
data = data + data + "0123456789012";
for(int j = 0; j < 10;j++) {
sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\"");
}
sb.append("}");
return new AbstractMap.SimpleEntry<String, Object>(sb.toString(), partitionKeyValue);
String doc = generateDocument(partitionKeyName, partitionKeyValue);
return new AbstractMap.SimpleEntry<String, Object>(doc, partitionKeyValue);
}).forEach(entry -> documentsToPartitionKeyValue.put(entry.getKey(), entry.getValue()));
return documentsToPartitionKeyValue;
}
}