diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java index 8f0eaed..99dab79 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java @@ -108,6 +108,7 @@ class BatchInserter { public Iterator> miniBatchInsertExecutionCallableIterator() { + // TODO: FIXME handle scenario where due to a non-retriable error we should break out of the stream loop Stream> stream = batchesToInsert.stream().map(miniBatch -> { return new Callable() { diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java index fc081ab..ef624da 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java @@ -32,22 +32,20 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures.FutureCombiner; @@ -62,6 +60,7 @@ import com.microsoft.azure.documentdb.FeedResponse; import com.microsoft.azure.documentdb.Offer; import com.microsoft.azure.documentdb.PartitionKeyDefinition; import com.microsoft.azure.documentdb.PartitionKeyRange; +import com.microsoft.azure.documentdb.internal.query.funcs.Func2; import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap; import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal; import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache; @@ -193,7 +192,7 @@ public class BulkImporter implements AutoCloseable { Thread.currentThread().interrupt(); } } - + /** * Initializes {@link BulkImporter}. This happens only once * @throws DocumentClientException @@ -255,28 +254,86 @@ public class BulkImporter implements AutoCloseable { * docs.add(doc); * } * - * BulkImportResponse bulkImportResponse = importer.bulkImport(docs.iterator(), false); + * BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false); * * client.close(); * - * @param documents to insert + * @param documentIterator to insert * @param enableUpsert whether enable upsert (overwrite if it exists) * @return an instance of {@link BulkImportResponse} * @throws DocumentClientException if any failure happens */ - public BulkImportResponse bulkImport(Iterator documents, boolean enableUpsert) throws DocumentClientException { + public BulkImportResponse bulkImport(Collection documents, boolean enableUpsert) throws DocumentClientException { + Func2, ConcurrentHashMap>, Void> bucketingFunction = new Func2, ConcurrentHashMap>, Void>() { - Preconditions.checkNotNull(documents, "documents cannot be null"); + @Override + public Void apply(Collection documents, ConcurrentHashMap> partitionKeyToBucket) throws Exception { + + documents.parallelStream().forEach(document -> { + PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition); + String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true); + String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId(); + partitionKeyToBucket.get(partitionRangeId).add(document); + }); + return null; + } + }; + return executeBulkImportInternal(documents, bucketingFunction, enableUpsert); + } + + public BulkImportResponse bulkImportWithPreprocessedPartitionKey(Collection input, boolean enableUpsert) throws DocumentClientException { + + Func2, ConcurrentHashMap>, Void> bucketingFunction = + new Func2, ConcurrentHashMap>, Void>() { + + @Override + public Void apply(Collection input, ConcurrentHashMap> partitionKeyToBucket) throws Exception { + + input.parallelStream().forEach(tuple -> { + PartitionKeyInternal partitionKeyValue = PartitionKeyInternal.fromObjectArray(ImmutableList.of(tuple.partitionKeyValue), true); + String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true); + String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId(); + partitionKeyToBucket.get(partitionRangeId).add(tuple.document); + }); + return null; + }; + }; + return executeBulkImportInternal(input, bucketingFunction, enableUpsert); + } + + private BulkImportResponse executeBulkImportInternal(Collection input, + Func2, ConcurrentHashMap>, Void> bucketByPartitionFunc, + boolean enableUpsert) throws DocumentClientException { + Preconditions.checkNotNull(input, "document collection cannot be null"); try { initializationFuture.get(); - return executeBulkImportAsyncImpl(documents, enableUpsert).get(); + return executeBulkImportAsyncImpl(input, bucketByPartitionFunc, enableUpsert).get(); + + } catch (ExecutionException e) { + logger.debug("Failed to import documents", e); + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw toDocumentClientException((Exception) cause); + } else { + throw toDocumentClientException(e); + } } catch(Exception e) { logger.error("Failed to import documents", e); - throw new DocumentClientException(500, e); + throw toDocumentClientException(e); + } + } + + private DocumentClientException toDocumentClientException(Exception e) { + if (e instanceof DocumentClientException) { + return (DocumentClientException) e; + } else { + return new DocumentClientException(500, e); } } - private ListenableFuture executeBulkImportAsyncImpl(Iterator documents, boolean enableUpsert) { + private ListenableFuture executeBulkImportAsyncImpl(Collection input, + Func2, ConcurrentHashMap>, Void> bucketByPartitionFunc, + boolean enableUpsert) throws Exception { Stopwatch watch = Stopwatch.createStarted(); BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, null, false, enableUpsert); @@ -284,24 +341,17 @@ public class BulkImporter implements AutoCloseable { ConcurrentHashMap> documentsToImportByPartition = new ConcurrentHashMap>(); ConcurrentHashMap>> miniBatchesToImportByPartition = new ConcurrentHashMap>>(); + int estimateMiniBatchesToImportByPartitionSize = input.size() / this.partitionKeyRangeIds.size(); + for (String partitionKeyRangeId: this.partitionKeyRangeIds) { documentsToImportByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet()); - miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList>()); + miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList>(estimateMiniBatchesToImportByPartitionSize)); } // Sort documents into partition buckets. logger.debug("Sorting documents into partition buckets"); - Stream stream = StreamSupport.stream( - Spliterators.spliteratorUnknownSize(documents, Spliterator.ORDERED), - false).parallel(); - - stream.forEach(document -> { - PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition); - String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true); - String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId(); - documentsToImportByPartition.get(partitionRangeId).add(document); - }); + bucketByPartitionFunc.apply(input, documentsToImportByPartition); logger.trace("Creating mini batches within each partition bucket"); int maxMiniBatchSize = (int)Math.floor(MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE * FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED); @@ -357,18 +407,18 @@ public class BulkImporter implements AutoCloseable { logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis"); System.out.println("total preprocessing took: " + watch.elapsed().toMillis() + " millis"); - + List> futures = new ArrayList<>(); - + for(CongestionController cc: congestionControllers.values()) { ListenableFuture f = cc.executeAllAsync(); futures.add(f); } // TODO go with the following - // List> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList()); - - + // List> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList()); + + FutureCombiner futureContainer = Futures.whenAllComplete(futures); AsyncCallable completeAsyncCallback = new AsyncCallable() { diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java index a1e995d..bf41b7e 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java @@ -21,18 +21,21 @@ public class Configuration { @Parameter(names = "-collectionId", description = "Collection ID", required = true) private String collectionId; - + @Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size, it is a good idea to") private Integer maxConnectionPoolSize = 200; @Parameter(names = "-consistencyLevel", description = "Consistency Level") - private ConsistencyLevel consistencyLevel = ConsistencyLevel.Eventual; + private ConsistencyLevel consistencyLevel = ConsistencyLevel.Session; @Parameter(names = "-connectionMode", description = "Connection Mode") private ConnectionMode connectionMode = ConnectionMode.Gateway; + @Parameter(names = "-withPreprocessedPartitionKeyValue", description = "Feed With Preprocessed Partition Key Value") + private boolean withPreprocessedPartitionKeyValue = false; + @Parameter(names = "-numberOfDocumentsForEachCheckpoint", description = "Number of documents in each checkpoint.") - private int numberOfDocumentsForEachCheckpoint = 100000; + private int numberOfDocumentsForEachCheckpoint = 500000; @Parameter(names = "-numberOfCheckpoints", description = "Number of checkpoints.") private int numberOfCheckpoints = 10; @@ -40,14 +43,22 @@ public class Configuration { @Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true) private boolean help = false; + + /** + * @return the withPreprocessedPartitionKeyValue + */ + public boolean isWithPreprocessedPartitionKeyValue() { + return withPreprocessedPartitionKeyValue; + } + public int getNumberOfCheckpoints() { return numberOfCheckpoints; } - + public int getNumberOfDocumentsForEachCheckpoint() { return numberOfDocumentsForEachCheckpoint; } - + public String getServiceEndpoint() { return serviceEndpoint; } diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/DocumentAnalyzer.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/DocumentAnalyzer.java index b342964..c2879ea 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/DocumentAnalyzer.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/DocumentAnalyzer.java @@ -56,6 +56,8 @@ class DocumentAnalyzer { }catch (Exception e) { throw new RuntimeException(e); } + + // TODO FIXME: this works only for string partition key value type. String partitionKeyValueAsString = root.at(partitionKeyPath).asText(); return PartitionKeyInternal.fromObjectArray(ImmutableList.of(partitionKeyValueAsString), true); } diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java index 86e2f10..0d5bd7d 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.slf4j.Logger; @@ -59,7 +60,7 @@ public class Main { // instantiates bulk importer BulkImporter bulkImporter = new BulkImporter(client, collection); - Stopwatch totalWatch = Stopwatch.createStarted(); + Stopwatch totalWatch = Stopwatch.createUnstarted(); double totalRequestCharge = 0; long totalTimeInMillis = 0; @@ -67,11 +68,28 @@ public class Main { for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) { - Iterator inputDocumentIterator = generatedDocuments(cfg, collection); + BulkImportResponse bulkImportResponse; + if (cfg.isWithPreprocessedPartitionKeyValue()) { + + Collection documentPartitionKeyValueTuples = DataSource.loadDocumentPartitionKeyValueTuples(cfg, collection.getPartitionKey()); + // NOTE: only sum the bulk import time, + // loading/generating documents is out of the scope of bulk importer and so has to be excluded + totalWatch.start(); + bulkImportResponse = bulkImporter.bulkImportWithPreprocessedPartitionKey(documentPartitionKeyValueTuples, false); + totalWatch.stop(); + + } else { + Collection documents = DataSource.loadDocuments(cfg, collection.getPartitionKey()); + // NOTE: only sum the bulk import time, + // loading/generating documents is out of the scope of bulk importer and so has to be excluded + totalWatch.start(); + bulkImportResponse = bulkImporter.bulkImport(documents, false); + totalWatch.stop(); + + } System.out.println("##########################################################################################"); - BulkImportResponse bulkImportResponse = bulkImporter.bulkImport(inputDocumentIterator, false); - + totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported(); totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis(); totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed(); @@ -85,8 +103,6 @@ public class Main { System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis())); System.out.println("##########################################################################################"); } - - totalWatch.stop(); System.out.println("##########################################################################################"); @@ -104,43 +120,84 @@ public class Main { client.close(); } - private static Iterator generatedDocuments(Configuration cfg, DocumentCollection collection) { - PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey(); - Preconditions.checkArgument(partitionKeyDefinition != null && - partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition"); - - Collection partitionKeyPath = partitionKeyDefinition.getPaths(); - Preconditions.checkArgument(partitionKeyPath.size() == 1, - "the command line benchmark tool only support simple partition key path"); - - String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", ""); - - // the size of each document is approximately 1KB - - // return documents to be bulk imported - // if you are reading documents from disk you can change this to read documents from disk - return IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(i -> - { - - StringBuilder sb = new StringBuilder(); - sb.append("{"); - sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("\""); - sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("abc\""); - String data = UUID.randomUUID().toString(); - data = data + data + "0123456789012"; - - for(int j = 0; j < 10;j++) { - sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\""); - } - - sb.append("}"); + private static class DataSource { + + private static Collection loadDocuments(Configuration cfg, PartitionKeyDefinition partitionKeyDefinition) { - return sb.toString(); - }).iterator(); + Preconditions.checkArgument(partitionKeyDefinition != null && + partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition"); + + Collection partitionKeyPath = partitionKeyDefinition.getPaths(); + Preconditions.checkArgument(partitionKeyPath.size() == 1, + "the command line benchmark tool only support simple partition key path"); + + String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", ""); + + // the size of each document is approximately 1KB + + // return documents to be bulk imported + // if you are reading documents from disk you can change this to read documents from disk + return IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(i -> + { + + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\""); + sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("\""); + + String data = UUID.randomUUID().toString(); + data = data + data + "0123456789012"; + + for(int j = 0; j < 10;j++) { + sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\""); + } + + sb.append("}"); + + return sb.toString(); + }).collect(Collectors.toList()); + } + + private static Collection loadDocumentPartitionKeyValueTuples(Configuration cfg, PartitionKeyDefinition partitionKeyDefinition) { + + Preconditions.checkArgument(partitionKeyDefinition != null && + partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition"); + + Collection partitionKeyPath = partitionKeyDefinition.getPaths(); + Preconditions.checkArgument(partitionKeyPath.size() == 1, + "the command line benchmark tool only support simple partition key path"); + + String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", ""); + + // the size of each document is approximately 1KB + + // return collection of to be bulk imported + // if you are reading documents from disk you can change this to read documents from disk + return IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(i -> + { + StringBuilder sb = new StringBuilder(); + String partitionKeyValue = UUID.randomUUID().toString(); + sb.append("{"); + sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("abc\""); + sb.append(",\"").append(partitionKeyName).append("\":\"").append(partitionKeyValue).append("\""); + + String data = UUID.randomUUID().toString(); + data = data + data + "0123456789012"; + + for(int j = 0; j < 10;j++) { + sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\""); + } + + sb.append("}"); + + return new Tuple(sb.toString(), partitionKeyValue); + + }).collect(Collectors.toList()); + } } - + public static DocumentClient documentClientFrom(Configuration cfg) throws DocumentClientException { ConnectionPolicy policy = cfg.getConnectionPolicy(); diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Tuple.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Tuple.java new file mode 100644 index 0000000..e9c5bda --- /dev/null +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Tuple.java @@ -0,0 +1,34 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2017 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.documentdb.bulkimport; + +public class Tuple { + public final String document; + public final Object partitionKeyValue; + + public Tuple(String documentAsString, Object partitionKeyValue) { + //Preconditions.checkNotNull(documentAsString); + this.document = documentAsString; + this.partitionKeyValue = partitionKeyValue; + } +} diff --git a/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/Sample.java b/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/Sample.java index c996ca7..d88912f 100644 --- a/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/Sample.java +++ b/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/Sample.java @@ -59,13 +59,15 @@ public class Sample { docs.add(doc); } - BulkImportResponse bulkImportResponse = importer.bulkImport(docs.iterator(), false); + BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false); // returned stats System.out.println("Number of documents inserted: " + bulkImportResponse.getNumberOfDocumentsImported()); System.out.println("Import total time: " + bulkImportResponse.getTotalTimeTaken()); System.out.println("Total request unit consumed: " + bulkImportResponse.getTotalRequestUnitsConsumed()); + importer.close(); + client.close(); }