diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java index feabed7..8f0eaed 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java @@ -114,6 +114,7 @@ class BatchInserter { @Override public InsertMetrics call() throws Exception { + logger.debug("pki {} importing mini batch started", partitionKeyRangeId); Stopwatch stopwatch = Stopwatch.createStarted(); double requestUnitsCounsumed = 0; int numberOfThrottles = 0; @@ -123,6 +124,8 @@ class BatchInserter { int currentDocumentIndex = 0; while (currentDocumentIndex < miniBatch.size()) { + logger.debug("pki {} inside for loop, currentDocumentIndex", partitionKeyRangeId, currentDocumentIndex); + String[] docBatch = miniBatch.subList(currentDocumentIndex, miniBatch.size()).toArray(new String[0]); // Before c# 6, await not allowed in the catch block, hence the following two local variables. @@ -134,7 +137,7 @@ class BatchInserter { // set partition key range id requestOptions.setPartitionKeyRengeId(partitionKeyRangeId); - logger.debug("Partition Key Range Id {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length); + logger.debug("pki {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length); if (!timedOut) { @@ -158,7 +161,7 @@ class BatchInserter { if (bulkImportResponse != null) { if (bulkImportResponse.errorCode != 0) { - logger.warn("Partition range id {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode); + logger.warn("pki {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode); } double requestCharge = response.getRequestCharge(); @@ -168,36 +171,36 @@ class BatchInserter { totalRequestUnitsConsumed.addAndGet(requestCharge); } else { - logger.warn("Partition range id {} Failed to receive response", partitionKeyRangeId); + logger.warn("pki {} Failed to receive response", partitionKeyRangeId); } } catch (DocumentClientException e) { - logger.trace("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e); + logger.debug("pki {} Importing minibatch failed", partitionKeyRangeId, e); if (isThrottled(e)) { - logger.debug("Throttled on partition range id {}", partitionKeyRangeId); + logger.debug("pki {} Throttled on partition range id", partitionKeyRangeId); numberOfThrottles++; isThrottled = true; retryAfter = Duration.ofMillis(e.getRetryAfterInMilliseconds()); } else if (isTimedOut(e)) { - logger.debug("Request timed out on partition range id {}", partitionKeyRangeId); + logger.debug("pki {} Request timed out", partitionKeyRangeId); timedOut = true; } else if (isGone(e)) { if (isSplit(e)) { - String errorMessage = String.format("Partition range id %s is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId); + String errorMessage = String.format("pki %s is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId); logger.error(errorMessage); throw new RuntimeException(errorMessage); } else { - String errorMessage = String.format("Partition range id %s is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId); + String errorMessage = String.format("pki %s is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId); logger.error(errorMessage); throw new RuntimeException(errorMessage); } } else { - String errorMessage = String.format("Partition range id %s failed to import mini-batch. Exception was %s. Status code was %s", + String errorMessage = String.format("pki %s failed to import mini-batch. Exception was %s. Status code was %s", partitionKeyRangeId, e.getMessage(), e.getStatusCode()); @@ -206,7 +209,7 @@ class BatchInserter { } } catch (Exception e) { - String errorMessage = String.format("Partition range id %s Failed to import mini-batch. Exception was %s", partitionKeyRangeId, + String errorMessage = String.format("pki %s Failed to import mini-batch. Exception was %s", partitionKeyRangeId, e.getMessage()); logger.error(errorMessage, e); throw new RuntimeException(errorMessage, e); @@ -214,13 +217,16 @@ class BatchInserter { if (isThrottled) { try { - Thread.sleep(retryAfter.getSeconds() * 1000); + logger.debug("pki {} throttled going to sleep for {} millis ", partitionKeyRangeId, retryAfter.toMillis()); + Thread.sleep(retryAfter.toMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } } } + logger.debug("pki {} completed", partitionKeyRangeId); + stopwatch.stop(); InsertMetrics insertMetrics = new InsertMetrics(currentDocumentIndex, stopwatch.elapsed(), requestUnitsCounsumed, numberOfThrottles); diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java index b7232ab..fc081ab 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BulkImporter.java @@ -38,6 +38,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -66,7 +67,7 @@ import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal; import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache; import com.microsoft.azure.documentdb.internal.routing.Range; -public class BulkImporter { +public class BulkImporter implements AutoCloseable { /** * The name of the system stored procedure for bulk import. @@ -157,7 +158,7 @@ public class BulkImporter { this.partitionKeyDefinition = collection.getPartitionKey(); - this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newWorkStealingPool(client.getConnectionPolicy().getMaxPoolSize())); + this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); this.initializationFuture = this.listeningExecutorService.submit(new Callable() { @@ -169,6 +170,30 @@ public class BulkImporter { }); } + /** + * Releases any internal resources. + * It is responsibility of the caller to close {@link DocumentClient}. + */ + @Override + public void close() { + // disable submission of new tasks + listeningExecutorService.shutdown(); + try { + // wait for existing tasks to terminate + if (!listeningExecutorService.awaitTermination(60, TimeUnit.SECONDS)) { + // cancel any currently running executing tasks + listeningExecutorService.shutdownNow(); + // wait for cancelled tasks to terminate + if (!listeningExecutorService.awaitTermination(60, TimeUnit.SECONDS)) { + logger.error("some tasks did not terminate"); + } + } + } catch (InterruptedException e) { + listeningExecutorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + /** * Initializes {@link BulkImporter}. This happens only once * @throws DocumentClientException @@ -310,7 +335,6 @@ public class BulkImporter { } }); - logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis"); logger.debug("Beginning bulk import within each partition bucket"); Map batchInserters = new HashMap(); @@ -331,17 +355,26 @@ public class BulkImporter { new CongestionController(listeningExecutorService, collectionThroughput / partitionKeyRangeIds.size(), partitionKeyRangeId, batchInserter, this.partitionDegreeOfConcurrency.get(partitionKeyRangeId))); } + logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis"); System.out.println("total preprocessing took: " + watch.elapsed().toMillis() + " millis"); - List> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList()); + List> futures = new ArrayList<>(); + + for(CongestionController cc: congestionControllers.values()) { + ListenableFuture f = cc.executeAllAsync(); + futures.add(f); + } + + // TODO go with the following + // List> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList()); + + FutureCombiner futureContainer = Futures.whenAllComplete(futures); AsyncCallable completeAsyncCallback = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { - // TODO: this can change so aggregation of the result happen at each - for(String partitionKeyRangeId: partitionKeyRangeIds) { partitionDegreeOfConcurrency.put(partitionKeyRangeId, congestionControllers.get(partitionKeyRangeId).getDegreeOfConcurrency()); } @@ -357,7 +390,7 @@ public class BulkImporter { } }; - return futureContainer.callAsync(completeAsyncCallback, MoreExecutors.directExecutor()); + return futureContainer.callAsync(completeAsyncCallback, listeningExecutorService); } private CollectionRoutingMap getCollectionRoutingMap(DocumentClient client) { diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java index 9c683b4..a1e995d 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java @@ -21,23 +21,16 @@ public class Configuration { @Parameter(names = "-collectionId", description = "Collection ID", required = true) private String collectionId; - -// @Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)") -// private int documentDataFieldSize = 1; - - @Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size") + + @Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size, it is a good idea to") private Integer maxConnectionPoolSize = 200; @Parameter(names = "-consistencyLevel", description = "Consistency Level") - private ConsistencyLevel consistencyLevel = ConsistencyLevel.Session; + private ConsistencyLevel consistencyLevel = ConsistencyLevel.Eventual; @Parameter(names = "-connectionMode", description = "Connection Mode") private ConnectionMode connectionMode = ConnectionMode.Gateway; -// @Parameter(names = "-concurrency", description = "Degree of Concurrency in Inserting Documents (only applies to blocking client)." -// + " If this value is not specified, the max connection pool size will be used as the concurrency level.") -// private Integer concurrency; - @Parameter(names = "-numberOfDocumentsForEachCheckpoint", description = "Number of documents in each checkpoint.") private int numberOfDocumentsForEachCheckpoint = 100000; @@ -66,10 +59,6 @@ public class Configuration { public boolean isHelp() { return help; } -// -// public int getDocumentDataFieldSize() { -// return documentDataFieldSize; -// } public ConnectionPolicy getConnectionPolicy() { ConnectionPolicy policy = new ConnectionPolicy(); @@ -90,14 +79,6 @@ public class Configuration { return collectionId; } -// public int getConcurrency() { -// if (this.concurrency != null) { -// return concurrency; -// } else { -// return this.maxConnectionPoolSize; -// } -// } - @Override public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java index 45b3295..f926f89 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -45,7 +46,7 @@ class CongestionController { /** * The degree of concurrency to start at. */ - private static final int STARTING_DEGREE_OF_CONCURRENCY = 2; + private static final int STARTING_DEGREE_OF_CONCURRENCY = 3; /** * The maximum degree of concurrency to go upto for a single physical partition. @@ -138,7 +139,7 @@ class CongestionController { this.throttleSemaphore = new Semaphore(this.degreeOfConcurrency); this.aggregatedInsertMetrics = new InsertMetrics(); this.executor = executor; - this.partitionThroughput = partitionThroughput; + this.partitionThroughput = partitionThroughput; } private synchronized InsertMetrics atomicGetAndReplace(InsertMetrics metrics) { @@ -157,15 +158,16 @@ class CongestionController { // TODO: FIXME I think semaphore may reach 0 here and if so that will create a deadlock // verify and fix - logger.debug("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds()); + logger.debug("pki {} goes to sleep for {} seconds. availabel semaphore permits {}, current degree of parallelism {}", + partitionKeyRangeId, samplePeriod.getSeconds(), throttleSemaphore.availablePermits(), degreeOfConcurrency); Thread.sleep(samplePeriod.toMillis()); - logger.debug("{} wakes up", partitionKeyRangeId); + logger.debug("pki {} wakes up", partitionKeyRangeId); InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics()); if (insertMetricsSample.numberOfThrottles > 0) { - logger.debug("Importing for partition key range id {} encountered {} throttling, reducing parallelism", - partitionKeyRangeId, insertMetricsSample.numberOfThrottles); + logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}", + partitionKeyRangeId, insertMetricsSample.numberOfThrottles, degreeOfConcurrency, degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR); // We got a throttle so we need to back off on the degree of concurrency. // Get the current degree of concurrency and decrease that (AIMD). @@ -173,8 +175,11 @@ class CongestionController { for (int i = 0; i < degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR; i++) { throttleSemaphore.acquire(); } + degreeOfConcurrency -= (degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR); + + logger.debug("pki {} degree of parallelism reduced to {}, sem available permits", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits()); } if (insertMetricsSample.numberOfDocumentsInserted == 0) { @@ -182,21 +187,25 @@ class CongestionController { continue; } - logger.debug("{} aggregating inserts metrics", partitionKeyRangeId); + logger.debug("pki {} aggregating inserts metrics", partitionKeyRangeId); if (insertMetricsSample.numberOfThrottles == 0) { if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) && degreeOfConcurrency + ADDITIVE_INCREASE_FACTOR <= MAX_DEGREE_OF_CONCURRENCY) { // We aren't getting throttles, so we should bump of the degree of concurrency (AIAD). + logger.debug("pki {} increasing degree of prallelism and releasing semaphore", partitionKeyRangeId); + throttleSemaphore.release(ADDITIVE_INCREASE_FACTOR); degreeOfConcurrency += ADDITIVE_INCREASE_FACTOR; + + logger.debug("pki {} degree of parallelism increased to {}. available semaphore permits {}", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits()); } } double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds(); documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted; - logger.debug("Partition key range id {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks." + logger.debug("pki {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks." + " Faced {} throttles. Total documents inserterd so far {}.", partitionKeyRangeId, insertMetricsSample.numberOfDocumentsInserted, @@ -205,10 +214,13 @@ class CongestionController { degreeOfConcurrency, insertMetricsSample.numberOfThrottles, documentsInsertedSoFar); - + } catch (InterruptedException e) { logger.warn("Interrupted", e); break; + } catch (Exception e) { + logger.error("pki {} unexpected failure", partitionKeyRangeId, e); + throw e; } } @@ -217,9 +229,33 @@ class CongestionController { }; } - public ListenableFuture ExecuteAll() { + public ListenableFuture executeAllAsync() { + + Callable> c = new Callable>() { - logger.debug("Executing batching in partition {}", partitionKeyRangeId); + @Override + public ListenableFuture call() throws Exception { + return executeAll(); + } + }; + + ListenableFuture> f = executor.submit(c); + AsyncFunction, Void> function = new AsyncFunction, Void>() { + + @Override + public ListenableFuture apply(ListenableFuture input) throws Exception { + return input; + } + }; + return Futures.transformAsync(f, function, executor); + } + + public ListenableFuture executeAll() { + + logger.debug("pki{} Executing batching", partitionKeyRangeId); + + ListenableFuture completionFuture = executor.submit(congestionControlTask()); + Iterator> batchExecutionIterator = batchInserter.miniBatchInsertExecutionCallableIterator(); List> futureList = new ArrayList<>(); @@ -228,11 +264,12 @@ class CongestionController { // Main thread waits on the throttleSem so no more than MaxDegreeOfParallelism Tasks are run at a time. try { - logger.trace("trying to accequire semaphore"); + logger.debug("pki {} trying to accequire semaphore to execute a task. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits()); this.throttleSemaphore.acquire(); - logger.trace("semaphore accequired"); + logger.debug("pki {} accquiring semaphore for executing a task succeeded. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits()); } catch (InterruptedException e) { - logger.error("Interrupted", e); + logger.error("pki {} Interrupted, releasing semaphore", partitionKeyRangeId, e); + this.throttleSemaphore.release(); throw new RuntimeException(e); } @@ -242,16 +279,18 @@ class CongestionController { @Override public void onSuccess(InsertMetrics result) { + logger.debug("pki {} accquiring a synchronized lock to update metrics", partitionKeyRangeId); + synchronized (CongestionController.this) { aggregatedInsertMetrics = InsertMetrics.sum(aggregatedInsertMetrics, result); } - logger.trace("releasing semaphore"); + logger.debug("pki {} releasing semaphore on completion of task", partitionKeyRangeId); throttleSemaphore.release(); } @Override public void onFailure(Throwable t) { - logger.trace("encountered failure {} releasing semaphore", t); + logger.trace("pki {} encountered failure {} releasing semaphore", partitionKeyRangeId, t); throttleSemaphore.release(); } }; @@ -266,28 +305,31 @@ class CongestionController { @Override public void onSuccess(List result) { - logger.debug("importing for partition key range {} completed", partitionKeyRangeId); + logger.debug("pki {} importing completed", partitionKeyRangeId); getAndSet(State.Completed); } @Override public void onFailure(Throwable t) { - logger.error("importing for partition key range {} failed", partitionKeyRangeId, t); + logger.error("pki {} importing failed", partitionKeyRangeId, t); getAndSet(State.Failure); } }; Futures.addCallback(allFutureResults, completionCallback, MoreExecutors.directExecutor()); - return executor.submit(congestionControlTask()); + return completionFuture; } public synchronized State getAndSet(State state) { + logger.debug("pki {} state set to {}", partitionKeyRangeId, state); + State res = this.state; this.state = state; return res; } public synchronized boolean isRunning() { + logger.debug("pki {} in isRunning", partitionKeyRangeId); return state == State.Running; } diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java index 354856a..86e2f10 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java @@ -22,6 +22,7 @@ */ package com.microsoft.azure.documentdb.bulkimport; +import java.util.Collection; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -31,10 +32,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.microsoft.azure.documentdb.ConnectionPolicy; import com.microsoft.azure.documentdb.DocumentClient; import com.microsoft.azure.documentdb.DocumentClientException; import com.microsoft.azure.documentdb.DocumentCollection; +import com.microsoft.azure.documentdb.PartitionKeyDefinition; import com.microsoft.azure.documentdb.RetryOptions; public class Main { @@ -49,18 +53,23 @@ public class Main { String collectionLink = String.format("/dbs/%s/colls/%s", cfg.getDatabaseId(), cfg.getCollectionId()); // this assumes database and collection already exists + // also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection. DocumentCollection collection = client.readCollection(collectionLink, null).getResource(); // instantiates bulk importer BulkImporter bulkImporter = new BulkImporter(client, collection); + Stopwatch totalWatch = Stopwatch.createStarted(); + double totalRequestCharge = 0; long totalTimeInMillis = 0; long totalNumberOfDocumentsImported = 0; for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) { - Iterator inputDocumentIterator = generatedDocuments(cfg); + Iterator inputDocumentIterator = generatedDocuments(cfg, collection); + System.out.println("##########################################################################################"); + BulkImportResponse bulkImportResponse = bulkImporter.bulkImport(inputDocumentIterator, false); totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported(); @@ -68,9 +77,8 @@ public class Main { totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed(); // print stats - System.out.println("##########################################################################################"); System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported()); - System.out.println("Import time for this checkpoint: " + bulkImportResponse.getTotalTimeTaken()); + System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis()); System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed()); System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis())); @@ -78,42 +86,58 @@ public class Main { System.out.println("##########################################################################################"); } - System.out.println("##########################################################################################"); + totalWatch.stop(); + System.out.println("##########################################################################################"); + System.out.println("Total summed Import time in milli seconds: " + totalTimeInMillis); System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported); - System.out.println("Total Import time in seconds: " + totalTimeInMillis / 1000); + System.out.println("Total Import time measured by stop watch in milli seconds: " + totalWatch.elapsed().toMillis()); System.out.println("Total request unit consumed: " + totalRequestCharge); + System.out.println("Average RUs/second:" + totalRequestCharge / (totalWatch.elapsed().toMillis() * 0.001)); + System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalWatch.elapsed().toMillis() * 0.001)); - System.out.println("Average RUs/second:" + totalRequestCharge / (totalTimeInMillis * 0.001)); - System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalTimeInMillis * 0.001)); - + // close bulk importer to release any existing resources + bulkImporter.close(); + + // close document client client.close(); } - private static Iterator generatedDocuments(Configuration cfg) { + private static Iterator generatedDocuments(Configuration cfg, DocumentCollection collection) { + + PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey(); + Preconditions.checkArgument(partitionKeyDefinition != null && + partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition"); + + Collection partitionKeyPath = partitionKeyDefinition.getPaths(); + Preconditions.checkArgument(partitionKeyPath.size() == 1, + "the command line benchmark tool only support simple partition key path"); + + String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", ""); + // the size of each document is approximately 1KB // return documents to be bulk imported // if you are reading documents from disk you can change this to read documents from disk return IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(i -> { - String id = UUID.randomUUID().toString(); - String mypk = UUID.randomUUID().toString(); - String v = UUID.randomUUID().toString(); - String fieldValue = v + "123456789"; - for(int j = 0; j < 24 ; j++) { - fieldValue += v; + + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("\""); + sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("abc\""); + + String data = UUID.randomUUID().toString(); + data = data + data + "0123456789012"; + + for(int j = 0; j < 10;j++) { + sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\""); } - String doc = String.format("{" + - " \"dataField\": \"%s\"," + - " \"mypk\": \"%s\"," + - " \"id\": \"%s\"" + - "}", fieldValue, mypk, id); - //System.out.println("number of bytes in document: " + doc.getBytes(Charset.forName("UTF-8")).length); - - return doc; + sb.append("}"); + + return sb.toString(); }).iterator(); } diff --git a/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/CongestionControllerTests.java b/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/CongestionControllerTests.java index 519cfab..dcf1c45 100644 --- a/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/CongestionControllerTests.java +++ b/bulkimport/src/test/java/com/microsoft/azure/documentdb/bulkimport/CongestionControllerTests.java @@ -99,7 +99,7 @@ public class CongestionControllerTests { Mockito.any(Object[].class))).thenReturn(bulkImportResponse); CongestionController cc = new CongestionController(listeningExecutorService, 10000, paritionKeyRangeId, bi); - ListenableFuture listenableFuture = cc.ExecuteAll(); + ListenableFuture listenableFuture = cc.executeAllAsync(); CountDownLatch latch = new CountDownLatch(1); @@ -168,7 +168,7 @@ public class CongestionControllerTests { Mockito.any(Object[].class))).thenReturn(bulkImportResponse1).thenReturn(bulkImportResponse2); CongestionController cc = new CongestionController(listeningExecutorService, 10000, paritionKeyRangeId, bi); - ListenableFuture listenableFuture = cc.ExecuteAll(); + ListenableFuture listenableFuture = cc.executeAllAsync(); CountDownLatch latch = new CountDownLatch(1);