This commit is contained in:
Mohammad Derakhshani 2017-10-10 23:25:09 -07:00
Родитель 023276baa9
Коммит 2ba537704a
5 изменённых файлов: 43 добавлений и 37 удалений

Просмотреть файл

@ -173,7 +173,7 @@ class BatchInserter {
} catch (DocumentClientException e) {
logger.debug("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e);
logger.trace("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e);
if (isThrottled(e)) {
logger.debug("Throttled on partition range id {}", partitionKeyRangeId);
@ -191,7 +191,7 @@ class BatchInserter {
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
} else {
String errorMessage = String.format("Partition range id {} is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
String errorMessage = String.format("Partition range id {} is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
}

Просмотреть файл

@ -7,6 +7,7 @@ import com.beust.jcommander.Parameter;
import com.microsoft.azure.documentdb.ConnectionMode;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.RetryOptions;
public class Configuration {
@ -22,8 +23,8 @@ public class Configuration {
@Parameter(names = "-collectionId", description = "Collection ID", required = true)
private String collectionId;
@Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)")
private int documentDataFieldSize = 1;
// @Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)")
// private int documentDataFieldSize = 1;
@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
private Integer maxConnectionPoolSize = 200;
@ -39,10 +40,10 @@ public class Configuration {
// private Integer concurrency;
@Parameter(names = "-numberOfDocumentsForEachCheckpoint", description = "Number of documents in each checkpoint.")
private int numberOfDocumentsForEachCheckpoint = 1000000;
private int numberOfDocumentsForEachCheckpoint = 100000;
@Parameter(names = "-numberOfCheckpoints", description = "Number of checkpoints.")
private int numberOfCheckpoints = 100;
private int numberOfCheckpoints = 10;
@Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true)
private boolean help = false;
@ -66,10 +67,10 @@ public class Configuration {
public boolean isHelp() {
return help;
}
public int getDocumentDataFieldSize() {
return documentDataFieldSize;
}
//
// public int getDocumentDataFieldSize() {
// return documentDataFieldSize;
// }
public ConnectionPolicy getConnectionPolicy() {
ConnectionPolicy policy = new ConnectionPolicy();

Просмотреть файл

@ -80,7 +80,7 @@ class CongestionController {
/**
* This determines how often the code will sample the InsertMetrics and check to see if the degree of concurrency needs to be changed.
*/
private final Duration samplePeriod = Duration.ofSeconds(3);
private final Duration samplePeriod = Duration.ofSeconds(1);
/**
* The {@link BatchInserter} that exposes a stream of {@link Callable} that insert document batches and returns an {@link InsertMetrics}
@ -157,25 +157,23 @@ class CongestionController {
// TODO: FIXME I think semaphore may reach 0 here and if so that will create a deadlock
// verify and fix
logger.trace("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds());
Thread.sleep(samplePeriod.getSeconds() * 1000);
logger.trace("{} wakes up", partitionKeyRangeId);
logger.debug("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds());
Thread.sleep(samplePeriod.toMillis());
logger.debug("{} wakes up", partitionKeyRangeId);
InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics());
if (insertMetricsSample.numberOfThrottles > 0) {
logger.trace("{} encountered {} throttling, reducing parallelism", partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
logger.debug("Importing for partition key range id {} encountered {} throttling, reducing parallelism",
partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
// We got a throttle so we need to back off on the degree of concurrency.
// Get the current degree of concurrency and decrease that (AIMD).
logger.trace("{} encountered {} throttling", partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
for (int i = 0; i < degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR; i++) {
throttleSemaphore.acquire();
}
logger.trace("{} encountered {} throttling", partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
degreeOfConcurrency -= (degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR);
}
@ -184,7 +182,7 @@ class CongestionController {
continue;
}
logger.trace("{} aggregating inserts metrics", partitionKeyRangeId);
logger.debug("{} aggregating inserts metrics", partitionKeyRangeId);
if (insertMetricsSample.numberOfThrottles == 0) {
if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) &&
@ -198,15 +196,15 @@ class CongestionController {
double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds();
documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted;
logger.info("Partition index {} : {} Inserted {} docs in {} seconds at {} RU/s with {} tasks. Faced {} throttles",
logger.debug("Partition key range id {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks."
+ " Faced {} throttles. Total documents inserterd so far {}.",
partitionKeyRangeId,
insertMetricsSample.numberOfThrottles,
degreeOfConcurrency,
ruPerSecond,
samplePeriod.getSeconds(),
insertMetricsSample.numberOfDocumentsInserted,
documentsInsertedSoFar
);
samplePeriod.toMillis(),
ruPerSecond,
degreeOfConcurrency,
insertMetricsSample.numberOfThrottles,
documentsInsertedSoFar);
} catch (InterruptedException e) {
logger.warn("Interrupted", e);
@ -268,13 +266,13 @@ class CongestionController {
@Override
public void onSuccess(List<InsertMetrics> result) {
logger.info("Completed");
logger.debug("importing for partition key range {} completed", partitionKeyRangeId);
getAndSet(State.Completed);
}
@Override
public void onFailure(Throwable t) {
logger.error("Encountered failure", t);
logger.error("importing for partition key range {} failed", partitionKeyRangeId, t);
getAndSet(State.Failure);
}
};

Просмотреть файл

@ -22,7 +22,6 @@
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@ -32,15 +31,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.JCommander;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.RetryOptions;
public class Main {
public static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws DocumentClientException, InterruptedException, ExecutionException {
Configuration cfg = parseCommandLineArgs(args);
@ -73,8 +73,8 @@ public class Main {
System.out.println("Import time for this checkpoint: " + bulkImportResponse.getTotalTimeTaken());
System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed());
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / bulkImportResponse.getTotalTimeTaken().getSeconds());
System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / bulkImportResponse.getTotalTimeTaken().getSeconds());
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("##########################################################################################");
}
@ -85,8 +85,8 @@ public class Main {
System.out.println("Total Import time in seconds: " + totalTimeInMillis / 1000);
System.out.println("Total request unit consumed: " + totalRequestCharge);
System.out.println("Average RUs/second:" + totalRequestCharge / (totalTimeInMillis / 1000));
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalTimeInMillis / 1000));
System.out.println("Average RUs/second:" + totalRequestCharge / (totalTimeInMillis * 0.001));
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalTimeInMillis * 0.001));
client.close();
}
@ -118,8 +118,14 @@ public class Main {
}
public static DocumentClient documentClientFrom(Configuration cfg) throws DocumentClientException {
ConnectionPolicy policy = cfg.getConnectionPolicy();
RetryOptions retryOptions = new RetryOptions();
retryOptions.setMaxRetryAttemptsOnThrottledRequests(0);
policy.setRetryOptions(retryOptions);
return new DocumentClient(cfg.getServiceEndpoint(), cfg.getMasterKey(),
cfg.getConnectionPolicy(), cfg.getConsistencyLevel());
policy, cfg.getConsistencyLevel());
}
private static Configuration parseCommandLineArgs(String[] args) {
@ -141,6 +147,7 @@ public class Main {
if (cfg.isHelp()) {
// prints out the usage help
jcommander.usage();
System.exit(0);
return null;
}
return cfg;

Просмотреть файл

@ -1,7 +1,7 @@
# this is the log4j configuration for tests
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
log4j.rootLogger=INFO, A1
# Set HTTP components' logger to INFO
log4j.category.org.apache.http=INFO