diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java index 4cf3f88..e532c5f 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java @@ -173,7 +173,7 @@ class BatchInserter { } catch (DocumentClientException e) { - logger.debug("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e); + logger.trace("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e); if (isThrottled(e)) { logger.debug("Throttled on partition range id {}", partitionKeyRangeId); @@ -191,7 +191,7 @@ class BatchInserter { logger.error(errorMessage); throw new RuntimeException(errorMessage); } else { - String errorMessage = String.format("Partition range id {} is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId); + String errorMessage = String.format("Partition range id {} is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId); logger.error(errorMessage); throw new RuntimeException(errorMessage); } diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java index 4a58b4b..749ce8a 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Configuration.java @@ -7,6 +7,7 @@ import com.beust.jcommander.Parameter; import com.microsoft.azure.documentdb.ConnectionMode; import com.microsoft.azure.documentdb.ConnectionPolicy; import com.microsoft.azure.documentdb.ConsistencyLevel; +import com.microsoft.azure.documentdb.RetryOptions; public class Configuration { @@ -22,8 +23,8 @@ public class Configuration { @Parameter(names = "-collectionId", description = "Collection ID", required = true) private String collectionId; - @Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)") - private int documentDataFieldSize = 1; +// @Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)") +// private int documentDataFieldSize = 1; @Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size") private Integer maxConnectionPoolSize = 200; @@ -39,10 +40,10 @@ public class Configuration { // private Integer concurrency; @Parameter(names = "-numberOfDocumentsForEachCheckpoint", description = "Number of documents in each checkpoint.") - private int numberOfDocumentsForEachCheckpoint = 1000000; + private int numberOfDocumentsForEachCheckpoint = 100000; @Parameter(names = "-numberOfCheckpoints", description = "Number of checkpoints.") - private int numberOfCheckpoints = 100; + private int numberOfCheckpoints = 10; @Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true) private boolean help = false; @@ -66,10 +67,10 @@ public class Configuration { public boolean isHelp() { return help; } - - public int getDocumentDataFieldSize() { - return documentDataFieldSize; - } +// +// public int getDocumentDataFieldSize() { +// return documentDataFieldSize; +// } public ConnectionPolicy getConnectionPolicy() { ConnectionPolicy policy = new ConnectionPolicy(); diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java index acac670..45b3295 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java @@ -80,7 +80,7 @@ class CongestionController { /** * This determines how often the code will sample the InsertMetrics and check to see if the degree of concurrency needs to be changed. */ - private final Duration samplePeriod = Duration.ofSeconds(3); + private final Duration samplePeriod = Duration.ofSeconds(1); /** * The {@link BatchInserter} that exposes a stream of {@link Callable} that insert document batches and returns an {@link InsertMetrics} @@ -157,25 +157,23 @@ class CongestionController { // TODO: FIXME I think semaphore may reach 0 here and if so that will create a deadlock // verify and fix - logger.trace("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds()); - Thread.sleep(samplePeriod.getSeconds() * 1000); - logger.trace("{} wakes up", partitionKeyRangeId); + logger.debug("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds()); + Thread.sleep(samplePeriod.toMillis()); + logger.debug("{} wakes up", partitionKeyRangeId); InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics()); if (insertMetricsSample.numberOfThrottles > 0) { - logger.trace("{} encountered {} throttling, reducing parallelism", partitionKeyRangeId, insertMetricsSample.numberOfThrottles); - + logger.debug("Importing for partition key range id {} encountered {} throttling, reducing parallelism", + partitionKeyRangeId, insertMetricsSample.numberOfThrottles); + // We got a throttle so we need to back off on the degree of concurrency. // Get the current degree of concurrency and decrease that (AIMD). - logger.trace("{} encountered {} throttling", partitionKeyRangeId, insertMetricsSample.numberOfThrottles); for (int i = 0; i < degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR; i++) { throttleSemaphore.acquire(); } - logger.trace("{} encountered {} throttling", partitionKeyRangeId, insertMetricsSample.numberOfThrottles); - degreeOfConcurrency -= (degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR); } @@ -184,7 +182,7 @@ class CongestionController { continue; } - logger.trace("{} aggregating inserts metrics", partitionKeyRangeId); + logger.debug("{} aggregating inserts metrics", partitionKeyRangeId); if (insertMetricsSample.numberOfThrottles == 0) { if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) && @@ -198,15 +196,15 @@ class CongestionController { double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds(); documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted; - logger.info("Partition index {} : {} Inserted {} docs in {} seconds at {} RU/s with {} tasks. Faced {} throttles", + logger.debug("Partition key range id {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks." + + " Faced {} throttles. Total documents inserterd so far {}.", partitionKeyRangeId, - insertMetricsSample.numberOfThrottles, - degreeOfConcurrency, - ruPerSecond, - samplePeriod.getSeconds(), insertMetricsSample.numberOfDocumentsInserted, - documentsInsertedSoFar - ); + samplePeriod.toMillis(), + ruPerSecond, + degreeOfConcurrency, + insertMetricsSample.numberOfThrottles, + documentsInsertedSoFar); } catch (InterruptedException e) { logger.warn("Interrupted", e); @@ -268,13 +266,13 @@ class CongestionController { @Override public void onSuccess(List result) { - logger.info("Completed"); + logger.debug("importing for partition key range {} completed", partitionKeyRangeId); getAndSet(State.Completed); } @Override public void onFailure(Throwable t) { - logger.error("Encountered failure", t); + logger.error("importing for partition key range {} failed", partitionKeyRangeId, t); getAndSet(State.Failure); } }; diff --git a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java index 1ecabac..354856a 100644 --- a/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java +++ b/bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java @@ -22,7 +22,6 @@ */ package com.microsoft.azure.documentdb.bulkimport; -import java.nio.charset.Charset; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -32,15 +31,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; +import com.microsoft.azure.documentdb.ConnectionPolicy; import com.microsoft.azure.documentdb.DocumentClient; import com.microsoft.azure.documentdb.DocumentClientException; import com.microsoft.azure.documentdb.DocumentCollection; +import com.microsoft.azure.documentdb.RetryOptions; public class Main { public static final Logger LOGGER = LoggerFactory.getLogger(Main.class); - public static void main(String[] args) throws DocumentClientException, InterruptedException, ExecutionException { Configuration cfg = parseCommandLineArgs(args); @@ -73,8 +73,8 @@ public class Main { System.out.println("Import time for this checkpoint: " + bulkImportResponse.getTotalTimeTaken()); System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed()); - System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / bulkImportResponse.getTotalTimeTaken().getSeconds()); - System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / bulkImportResponse.getTotalTimeTaken().getSeconds()); + System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis())); + System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis())); System.out.println("##########################################################################################"); } @@ -85,8 +85,8 @@ public class Main { System.out.println("Total Import time in seconds: " + totalTimeInMillis / 1000); System.out.println("Total request unit consumed: " + totalRequestCharge); - System.out.println("Average RUs/second:" + totalRequestCharge / (totalTimeInMillis / 1000)); - System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalTimeInMillis / 1000)); + System.out.println("Average RUs/second:" + totalRequestCharge / (totalTimeInMillis * 0.001)); + System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalTimeInMillis * 0.001)); client.close(); } @@ -118,8 +118,14 @@ public class Main { } public static DocumentClient documentClientFrom(Configuration cfg) throws DocumentClientException { + + ConnectionPolicy policy = cfg.getConnectionPolicy(); + RetryOptions retryOptions = new RetryOptions(); + retryOptions.setMaxRetryAttemptsOnThrottledRequests(0); + policy.setRetryOptions(retryOptions); + return new DocumentClient(cfg.getServiceEndpoint(), cfg.getMasterKey(), - cfg.getConnectionPolicy(), cfg.getConsistencyLevel()); + policy, cfg.getConsistencyLevel()); } private static Configuration parseCommandLineArgs(String[] args) { @@ -141,6 +147,7 @@ public class Main { if (cfg.isHelp()) { // prints out the usage help jcommander.usage(); + System.exit(0); return null; } return cfg; diff --git a/bulkimport/src/main/resources/log4j.properties b/bulkimport/src/main/resources/log4j.properties index 84b0a5b..c6d0c47 100644 --- a/bulkimport/src/main/resources/log4j.properties +++ b/bulkimport/src/main/resources/log4j.properties @@ -1,7 +1,7 @@ # this is the log4j configuration for tests # Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=DEBUG, A1 +log4j.rootLogger=INFO, A1 # Set HTTP components' logger to INFO log4j.category.org.apache.http=INFO