added support for tuple<document, partitionkeyvalue>

This commit is contained in:
Mohammad Derakhshani 2017-10-13 13:45:28 -07:00
Родитель 27583c5265
Коммит 6e1bbb7231
7 изменённых файлов: 230 добавлений и 73 удалений

Просмотреть файл

@ -108,6 +108,7 @@ class BatchInserter {
public Iterator<Callable<InsertMetrics>> miniBatchInsertExecutionCallableIterator() {
// TODO: FIXME handle scenario where due to a non-retriable error we should break out of the stream loop
Stream<Callable<InsertMetrics>> stream = batchesToInsert.stream().map(miniBatch -> {
return new Callable<InsertMetrics>() {

Просмотреть файл

@ -32,22 +32,20 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Futures.FutureCombiner;
@ -62,6 +60,7 @@ import com.microsoft.azure.documentdb.FeedResponse;
import com.microsoft.azure.documentdb.Offer;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.internal.query.funcs.Func2;
import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache;
@ -255,28 +254,86 @@ public class BulkImporter implements AutoCloseable {
* docs.add(doc);
* }
*
* BulkImportResponse bulkImportResponse = importer.bulkImport(docs.iterator(), false);
* BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false);
*
* client.close();
* </code>
* @param documents to insert
* @param documentIterator to insert
* @param enableUpsert whether enable upsert (overwrite if it exists)
* @return an instance of {@link BulkImportResponse}
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse bulkImport(Iterator<String> documents, boolean enableUpsert) throws DocumentClientException {
public BulkImportResponse bulkImport(Collection<String> documents, boolean enableUpsert) throws DocumentClientException {
Func2<Collection<String>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction = new Func2<Collection<String>, ConcurrentHashMap<String,Set<String>>, Void>() {
Preconditions.checkNotNull(documents, "documents cannot be null");
@Override
public Void apply(Collection<String> documents, ConcurrentHashMap<String, Set<String>> partitionKeyToBucket) throws Exception {
documents.parallelStream().forEach(document -> {
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
partitionKeyToBucket.get(partitionRangeId).add(document);
});
return null;
}
};
return executeBulkImportInternal(documents, bucketingFunction, enableUpsert);
}
public BulkImportResponse bulkImportWithPreprocessedPartitionKey(Collection<Tuple> input, boolean enableUpsert) throws DocumentClientException {
Func2<Collection<Tuple>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction =
new Func2<Collection<Tuple>, ConcurrentHashMap<String,Set<String>>, Void>() {
@Override
public Void apply(Collection<Tuple> input, ConcurrentHashMap<String, Set<String>> partitionKeyToBucket) throws Exception {
input.parallelStream().forEach(tuple -> {
PartitionKeyInternal partitionKeyValue = PartitionKeyInternal.fromObjectArray(ImmutableList.of(tuple.partitionKeyValue), true);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
partitionKeyToBucket.get(partitionRangeId).add(tuple.document);
});
return null;
};
};
return executeBulkImportInternal(input, bucketingFunction, enableUpsert);
}
private <T> BulkImportResponse executeBulkImportInternal(Collection<T> input,
Func2<Collection<T>, ConcurrentHashMap<String, Set<String>>, Void> bucketByPartitionFunc,
boolean enableUpsert) throws DocumentClientException {
Preconditions.checkNotNull(input, "document collection cannot be null");
try {
initializationFuture.get();
return executeBulkImportAsyncImpl(documents, enableUpsert).get();
return executeBulkImportAsyncImpl(input, bucketByPartitionFunc, enableUpsert).get();
} catch (ExecutionException e) {
logger.debug("Failed to import documents", e);
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw toDocumentClientException((Exception) cause);
} else {
throw toDocumentClientException(e);
}
} catch(Exception e) {
logger.error("Failed to import documents", e);
throw new DocumentClientException(500, e);
throw toDocumentClientException(e);
}
}
private ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Iterator<String> documents, boolean enableUpsert) {
private DocumentClientException toDocumentClientException(Exception e) {
if (e instanceof DocumentClientException) {
return (DocumentClientException) e;
} else {
return new DocumentClientException(500, e);
}
}
private <T> ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<T> input,
Func2<Collection<T>, ConcurrentHashMap<String, Set<String>>, Void> bucketByPartitionFunc,
boolean enableUpsert) throws Exception {
Stopwatch watch = Stopwatch.createStarted();
BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, null, false, enableUpsert);
@ -284,24 +341,17 @@ public class BulkImporter implements AutoCloseable {
ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = new ConcurrentHashMap<String, Set<String>>();
ConcurrentHashMap<String, List<List<String>>> miniBatchesToImportByPartition = new ConcurrentHashMap<String, List<List<String>>>();
int estimateMiniBatchesToImportByPartitionSize = input.size() / this.partitionKeyRangeIds.size();
for (String partitionKeyRangeId: this.partitionKeyRangeIds) {
documentsToImportByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet());
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>());
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>(estimateMiniBatchesToImportByPartitionSize));
}
// Sort documents into partition buckets.
logger.debug("Sorting documents into partition buckets");
Stream<String> stream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(documents, Spliterator.ORDERED),
false).parallel();
stream.forEach(document -> {
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
documentsToImportByPartition.get(partitionRangeId).add(document);
});
bucketByPartitionFunc.apply(input, documentsToImportByPartition);
logger.trace("Creating mini batches within each partition bucket");
int maxMiniBatchSize = (int)Math.floor(MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE * FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED);

Просмотреть файл

@ -26,13 +26,16 @@ public class Configuration {
private Integer maxConnectionPoolSize = 200;
@Parameter(names = "-consistencyLevel", description = "Consistency Level")
private ConsistencyLevel consistencyLevel = ConsistencyLevel.Eventual;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.Session;
@Parameter(names = "-connectionMode", description = "Connection Mode")
private ConnectionMode connectionMode = ConnectionMode.Gateway;
@Parameter(names = "-withPreprocessedPartitionKeyValue", description = "Feed With Preprocessed Partition Key Value")
private boolean withPreprocessedPartitionKeyValue = false;
@Parameter(names = "-numberOfDocumentsForEachCheckpoint", description = "Number of documents in each checkpoint.")
private int numberOfDocumentsForEachCheckpoint = 100000;
private int numberOfDocumentsForEachCheckpoint = 500000;
@Parameter(names = "-numberOfCheckpoints", description = "Number of checkpoints.")
private int numberOfCheckpoints = 10;
@ -40,6 +43,14 @@ public class Configuration {
@Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true)
private boolean help = false;
/**
* @return the withPreprocessedPartitionKeyValue
*/
public boolean isWithPreprocessedPartitionKeyValue() {
return withPreprocessedPartitionKeyValue;
}
public int getNumberOfCheckpoints() {
return numberOfCheckpoints;
}

Просмотреть файл

@ -56,6 +56,8 @@ class DocumentAnalyzer {
}catch (Exception e) {
throw new RuntimeException(e);
}
// TODO FIXME: this works only for string partition key value type.
String partitionKeyValueAsString = root.at(partitionKeyPath).asText();
return PartitionKeyInternal.fromObjectArray(ImmutableList.of(partitionKeyValueAsString), true);
}

Просмотреть файл

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
@ -59,7 +60,7 @@ public class Main {
// instantiates bulk importer
BulkImporter bulkImporter = new BulkImporter(client, collection);
Stopwatch totalWatch = Stopwatch.createStarted();
Stopwatch totalWatch = Stopwatch.createUnstarted();
double totalRequestCharge = 0;
long totalTimeInMillis = 0;
@ -67,10 +68,27 @@ public class Main {
for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) {
Iterator<String> inputDocumentIterator = generatedDocuments(cfg, collection);
BulkImportResponse bulkImportResponse;
if (cfg.isWithPreprocessedPartitionKeyValue()) {
Collection<Tuple> documentPartitionKeyValueTuples = DataSource.loadDocumentPartitionKeyValueTuples(cfg, collection.getPartitionKey());
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.bulkImportWithPreprocessedPartitionKey(documentPartitionKeyValueTuples, false);
totalWatch.stop();
} else {
Collection<String> documents = DataSource.loadDocuments(cfg, collection.getPartitionKey());
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.bulkImport(documents, false);
totalWatch.stop();
}
System.out.println("##########################################################################################");
BulkImportResponse bulkImportResponse = bulkImporter.bulkImport(inputDocumentIterator, false);
totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported();
totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis();
@ -86,8 +104,6 @@ public class Main {
System.out.println("##########################################################################################");
}
totalWatch.stop();
System.out.println("##########################################################################################");
System.out.println("Total summed Import time in milli seconds: " + totalTimeInMillis);
@ -104,9 +120,12 @@ public class Main {
client.close();
}
private static Iterator<String> generatedDocuments(Configuration cfg, DocumentCollection collection) {
PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey();
private static class DataSource {
private static Collection<String> loadDocuments(Configuration cfg, PartitionKeyDefinition partitionKeyDefinition) {
Preconditions.checkArgument(partitionKeyDefinition != null &&
partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition");
@ -125,8 +144,8 @@ public class Main {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("\"");
sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("abc\"");
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\"");
sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("\"");
String data = UUID.randomUUID().toString();
data = data + data + "0123456789012";
@ -138,7 +157,45 @@ public class Main {
sb.append("}");
return sb.toString();
}).iterator();
}).collect(Collectors.toList());
}
private static Collection<Tuple> loadDocumentPartitionKeyValueTuples(Configuration cfg, PartitionKeyDefinition partitionKeyDefinition) {
Preconditions.checkArgument(partitionKeyDefinition != null &&
partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition");
Collection<String> partitionKeyPath = partitionKeyDefinition.getPaths();
Preconditions.checkArgument(partitionKeyPath.size() == 1,
"the command line benchmark tool only support simple partition key path");
String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", "");
// the size of each document is approximately 1KB
// return collection of <document, partitionKeyValue> to be bulk imported
// if you are reading documents from disk you can change this to read documents from disk
return IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(i ->
{
StringBuilder sb = new StringBuilder();
String partitionKeyValue = UUID.randomUUID().toString();
sb.append("{");
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\"");
sb.append(",\"").append(partitionKeyName).append("\":\"").append(partitionKeyValue).append("\"");
String data = UUID.randomUUID().toString();
data = data + data + "0123456789012";
for(int j = 0; j < 10;j++) {
sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\"");
}
sb.append("}");
return new Tuple(sb.toString(), partitionKeyValue);
}).collect(Collectors.toList());
}
}
public static DocumentClient documentClientFrom(Configuration cfg) throws DocumentClientException {

Просмотреть файл

@ -0,0 +1,34 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
public class Tuple {
public final String document;
public final Object partitionKeyValue;
public Tuple(String documentAsString, Object partitionKeyValue) {
//Preconditions.checkNotNull(documentAsString);
this.document = documentAsString;
this.partitionKeyValue = partitionKeyValue;
}
}

Просмотреть файл

@ -59,13 +59,15 @@ public class Sample {
docs.add(doc);
}
BulkImportResponse bulkImportResponse = importer.bulkImport(docs.iterator(), false);
BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false);
// returned stats
System.out.println("Number of documents inserted: " + bulkImportResponse.getNumberOfDocumentsImported());
System.out.println("Import total time: " + bulkImportResponse.getTotalTimeTaken());
System.out.println("Total request unit consumed: " + bulkImportResponse.getTotalRequestUnitsConsumed());
importer.close();
client.close();
}