improved multi-threading and throughtput, cleanup
This commit is contained in:
Родитель
cfd1938a42
Коммит
27583c5265
|
@ -114,6 +114,7 @@ class BatchInserter {
|
|||
@Override
|
||||
public InsertMetrics call() throws Exception {
|
||||
|
||||
logger.debug("pki {} importing mini batch started", partitionKeyRangeId);
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
double requestUnitsCounsumed = 0;
|
||||
int numberOfThrottles = 0;
|
||||
|
@ -123,6 +124,8 @@ class BatchInserter {
|
|||
int currentDocumentIndex = 0;
|
||||
|
||||
while (currentDocumentIndex < miniBatch.size()) {
|
||||
logger.debug("pki {} inside for loop, currentDocumentIndex", partitionKeyRangeId, currentDocumentIndex);
|
||||
|
||||
String[] docBatch = miniBatch.subList(currentDocumentIndex, miniBatch.size()).toArray(new String[0]);
|
||||
|
||||
// Before c# 6, await not allowed in the catch block, hence the following two local variables.
|
||||
|
@ -134,7 +137,7 @@ class BatchInserter {
|
|||
// set partition key range id
|
||||
requestOptions.setPartitionKeyRengeId(partitionKeyRangeId);
|
||||
|
||||
logger.debug("Partition Key Range Id {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length);
|
||||
logger.debug("pki {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length);
|
||||
|
||||
if (!timedOut) {
|
||||
|
||||
|
@ -158,7 +161,7 @@ class BatchInserter {
|
|||
|
||||
if (bulkImportResponse != null) {
|
||||
if (bulkImportResponse.errorCode != 0) {
|
||||
logger.warn("Partition range id {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode);
|
||||
logger.warn("pki {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode);
|
||||
}
|
||||
|
||||
double requestCharge = response.getRequestCharge();
|
||||
|
@ -168,36 +171,36 @@ class BatchInserter {
|
|||
totalRequestUnitsConsumed.addAndGet(requestCharge);
|
||||
}
|
||||
else {
|
||||
logger.warn("Partition range id {} Failed to receive response", partitionKeyRangeId);
|
||||
logger.warn("pki {} Failed to receive response", partitionKeyRangeId);
|
||||
}
|
||||
|
||||
} catch (DocumentClientException e) {
|
||||
|
||||
logger.trace("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e);
|
||||
logger.debug("pki {} Importing minibatch failed", partitionKeyRangeId, e);
|
||||
|
||||
if (isThrottled(e)) {
|
||||
logger.debug("Throttled on partition range id {}", partitionKeyRangeId);
|
||||
logger.debug("pki {} Throttled on partition range id", partitionKeyRangeId);
|
||||
numberOfThrottles++;
|
||||
isThrottled = true;
|
||||
retryAfter = Duration.ofMillis(e.getRetryAfterInMilliseconds());
|
||||
|
||||
} else if (isTimedOut(e)) {
|
||||
logger.debug("Request timed out on partition range id {}", partitionKeyRangeId);
|
||||
logger.debug("pki {} Request timed out", partitionKeyRangeId);
|
||||
timedOut = true;
|
||||
|
||||
} else if (isGone(e)) {
|
||||
if (isSplit(e)) {
|
||||
String errorMessage = String.format("Partition range id %s is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
|
||||
String errorMessage = String.format("pki %s is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
|
||||
logger.error(errorMessage);
|
||||
throw new RuntimeException(errorMessage);
|
||||
} else {
|
||||
String errorMessage = String.format("Partition range id %s is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
|
||||
String errorMessage = String.format("pki %s is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
|
||||
logger.error(errorMessage);
|
||||
throw new RuntimeException(errorMessage);
|
||||
}
|
||||
|
||||
} else {
|
||||
String errorMessage = String.format("Partition range id %s failed to import mini-batch. Exception was %s. Status code was %s",
|
||||
String errorMessage = String.format("pki %s failed to import mini-batch. Exception was %s. Status code was %s",
|
||||
partitionKeyRangeId,
|
||||
e.getMessage(),
|
||||
e.getStatusCode());
|
||||
|
@ -206,7 +209,7 @@ class BatchInserter {
|
|||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
String errorMessage = String.format("Partition range id %s Failed to import mini-batch. Exception was %s", partitionKeyRangeId,
|
||||
String errorMessage = String.format("pki %s Failed to import mini-batch. Exception was %s", partitionKeyRangeId,
|
||||
e.getMessage());
|
||||
logger.error(errorMessage, e);
|
||||
throw new RuntimeException(errorMessage, e);
|
||||
|
@ -214,13 +217,16 @@ class BatchInserter {
|
|||
|
||||
if (isThrottled) {
|
||||
try {
|
||||
Thread.sleep(retryAfter.getSeconds() * 1000);
|
||||
logger.debug("pki {} throttled going to sleep for {} millis ", partitionKeyRangeId, retryAfter.toMillis());
|
||||
Thread.sleep(retryAfter.toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("pki {} completed", partitionKeyRangeId);
|
||||
|
||||
stopwatch.stop();
|
||||
InsertMetrics insertMetrics = new InsertMetrics(currentDocumentIndex, stopwatch.elapsed(), requestUnitsCounsumed, numberOfThrottles);
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -66,7 +67,7 @@ import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
|
|||
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache;
|
||||
import com.microsoft.azure.documentdb.internal.routing.Range;
|
||||
|
||||
public class BulkImporter {
|
||||
public class BulkImporter implements AutoCloseable {
|
||||
|
||||
/**
|
||||
* The name of the system stored procedure for bulk import.
|
||||
|
@ -157,7 +158,7 @@ public class BulkImporter {
|
|||
|
||||
this.partitionKeyDefinition = collection.getPartitionKey();
|
||||
|
||||
this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newWorkStealingPool(client.getConnectionPolicy().getMaxPoolSize()));
|
||||
this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
|
||||
|
||||
this.initializationFuture = this.listeningExecutorService.submit(new Callable<Void>() {
|
||||
|
||||
|
@ -169,6 +170,30 @@ public class BulkImporter {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases any internal resources.
|
||||
* It is responsibility of the caller to close {@link DocumentClient}.
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
// disable submission of new tasks
|
||||
listeningExecutorService.shutdown();
|
||||
try {
|
||||
// wait for existing tasks to terminate
|
||||
if (!listeningExecutorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
// cancel any currently running executing tasks
|
||||
listeningExecutorService.shutdownNow();
|
||||
// wait for cancelled tasks to terminate
|
||||
if (!listeningExecutorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
logger.error("some tasks did not terminate");
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
listeningExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes {@link BulkImporter}. This happens only once
|
||||
* @throws DocumentClientException
|
||||
|
@ -310,7 +335,6 @@ public class BulkImporter {
|
|||
}
|
||||
});
|
||||
|
||||
logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
|
||||
|
||||
logger.debug("Beginning bulk import within each partition bucket");
|
||||
Map<String, BatchInserter> batchInserters = new HashMap<String, BatchInserter>();
|
||||
|
@ -331,17 +355,26 @@ public class BulkImporter {
|
|||
new CongestionController(listeningExecutorService, collectionThroughput / partitionKeyRangeIds.size(), partitionKeyRangeId, batchInserter, this.partitionDegreeOfConcurrency.get(partitionKeyRangeId)));
|
||||
}
|
||||
|
||||
logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
|
||||
System.out.println("total preprocessing took: " + watch.elapsed().toMillis() + " millis");
|
||||
|
||||
List<ListenableFuture<Void>> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList());
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
|
||||
for(CongestionController cc: congestionControllers.values()) {
|
||||
ListenableFuture<Void> f = cc.executeAllAsync();
|
||||
futures.add(f);
|
||||
}
|
||||
|
||||
// TODO go with the following
|
||||
// List<ListenableFuture<Void>> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList());
|
||||
|
||||
|
||||
FutureCombiner<Void> futureContainer = Futures.whenAllComplete(futures);
|
||||
AsyncCallable<BulkImportResponse> completeAsyncCallback = new AsyncCallable<BulkImportResponse>() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<BulkImportResponse> call() throws Exception {
|
||||
|
||||
// TODO: this can change so aggregation of the result happen at each
|
||||
|
||||
for(String partitionKeyRangeId: partitionKeyRangeIds) {
|
||||
partitionDegreeOfConcurrency.put(partitionKeyRangeId, congestionControllers.get(partitionKeyRangeId).getDegreeOfConcurrency());
|
||||
}
|
||||
|
@ -357,7 +390,7 @@ public class BulkImporter {
|
|||
}
|
||||
};
|
||||
|
||||
return futureContainer.callAsync(completeAsyncCallback, MoreExecutors.directExecutor());
|
||||
return futureContainer.callAsync(completeAsyncCallback, listeningExecutorService);
|
||||
}
|
||||
|
||||
private CollectionRoutingMap getCollectionRoutingMap(DocumentClient client) {
|
||||
|
|
|
@ -21,23 +21,16 @@ public class Configuration {
|
|||
|
||||
@Parameter(names = "-collectionId", description = "Collection ID", required = true)
|
||||
private String collectionId;
|
||||
|
||||
// @Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)")
|
||||
// private int documentDataFieldSize = 1;
|
||||
|
||||
@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
|
||||
|
||||
@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size, it is a good idea to")
|
||||
private Integer maxConnectionPoolSize = 200;
|
||||
|
||||
@Parameter(names = "-consistencyLevel", description = "Consistency Level")
|
||||
private ConsistencyLevel consistencyLevel = ConsistencyLevel.Session;
|
||||
private ConsistencyLevel consistencyLevel = ConsistencyLevel.Eventual;
|
||||
|
||||
@Parameter(names = "-connectionMode", description = "Connection Mode")
|
||||
private ConnectionMode connectionMode = ConnectionMode.Gateway;
|
||||
|
||||
// @Parameter(names = "-concurrency", description = "Degree of Concurrency in Inserting Documents (only applies to blocking client)."
|
||||
// + " If this value is not specified, the max connection pool size will be used as the concurrency level.")
|
||||
// private Integer concurrency;
|
||||
|
||||
@Parameter(names = "-numberOfDocumentsForEachCheckpoint", description = "Number of documents in each checkpoint.")
|
||||
private int numberOfDocumentsForEachCheckpoint = 100000;
|
||||
|
||||
|
@ -66,10 +59,6 @@ public class Configuration {
|
|||
public boolean isHelp() {
|
||||
return help;
|
||||
}
|
||||
//
|
||||
// public int getDocumentDataFieldSize() {
|
||||
// return documentDataFieldSize;
|
||||
// }
|
||||
|
||||
public ConnectionPolicy getConnectionPolicy() {
|
||||
ConnectionPolicy policy = new ConnectionPolicy();
|
||||
|
@ -90,14 +79,6 @@ public class Configuration {
|
|||
return collectionId;
|
||||
}
|
||||
|
||||
// public int getConcurrency() {
|
||||
// if (this.concurrency != null) {
|
||||
// return concurrency;
|
||||
// } else {
|
||||
// return this.maxConnectionPoolSize;
|
||||
// }
|
||||
// }
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.util.concurrent.AsyncFunction;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
@ -45,7 +46,7 @@ class CongestionController {
|
|||
/**
|
||||
* The degree of concurrency to start at.
|
||||
*/
|
||||
private static final int STARTING_DEGREE_OF_CONCURRENCY = 2;
|
||||
private static final int STARTING_DEGREE_OF_CONCURRENCY = 3;
|
||||
|
||||
/**
|
||||
* The maximum degree of concurrency to go upto for a single physical partition.
|
||||
|
@ -138,7 +139,7 @@ class CongestionController {
|
|||
this.throttleSemaphore = new Semaphore(this.degreeOfConcurrency);
|
||||
this.aggregatedInsertMetrics = new InsertMetrics();
|
||||
this.executor = executor;
|
||||
this.partitionThroughput = partitionThroughput;
|
||||
this.partitionThroughput = partitionThroughput;
|
||||
}
|
||||
|
||||
private synchronized InsertMetrics atomicGetAndReplace(InsertMetrics metrics) {
|
||||
|
@ -157,15 +158,16 @@ class CongestionController {
|
|||
// TODO: FIXME I think semaphore may reach 0 here and if so that will create a deadlock
|
||||
// verify and fix
|
||||
|
||||
logger.debug("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds());
|
||||
logger.debug("pki {} goes to sleep for {} seconds. availabel semaphore permits {}, current degree of parallelism {}",
|
||||
partitionKeyRangeId, samplePeriod.getSeconds(), throttleSemaphore.availablePermits(), degreeOfConcurrency);
|
||||
Thread.sleep(samplePeriod.toMillis());
|
||||
logger.debug("{} wakes up", partitionKeyRangeId);
|
||||
logger.debug("pki {} wakes up", partitionKeyRangeId);
|
||||
|
||||
InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics());
|
||||
|
||||
if (insertMetricsSample.numberOfThrottles > 0) {
|
||||
logger.debug("Importing for partition key range id {} encountered {} throttling, reducing parallelism",
|
||||
partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
|
||||
logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}",
|
||||
partitionKeyRangeId, insertMetricsSample.numberOfThrottles, degreeOfConcurrency, degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR);
|
||||
|
||||
// We got a throttle so we need to back off on the degree of concurrency.
|
||||
// Get the current degree of concurrency and decrease that (AIMD).
|
||||
|
@ -173,8 +175,11 @@ class CongestionController {
|
|||
for (int i = 0; i < degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR; i++) {
|
||||
throttleSemaphore.acquire();
|
||||
}
|
||||
|
||||
|
||||
degreeOfConcurrency -= (degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR);
|
||||
|
||||
logger.debug("pki {} degree of parallelism reduced to {}, sem available permits", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits());
|
||||
}
|
||||
|
||||
if (insertMetricsSample.numberOfDocumentsInserted == 0) {
|
||||
|
@ -182,21 +187,25 @@ class CongestionController {
|
|||
continue;
|
||||
}
|
||||
|
||||
logger.debug("{} aggregating inserts metrics", partitionKeyRangeId);
|
||||
logger.debug("pki {} aggregating inserts metrics", partitionKeyRangeId);
|
||||
|
||||
if (insertMetricsSample.numberOfThrottles == 0) {
|
||||
if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) &&
|
||||
degreeOfConcurrency + ADDITIVE_INCREASE_FACTOR <= MAX_DEGREE_OF_CONCURRENCY) {
|
||||
// We aren't getting throttles, so we should bump of the degree of concurrency (AIAD).
|
||||
logger.debug("pki {} increasing degree of prallelism and releasing semaphore", partitionKeyRangeId);
|
||||
|
||||
throttleSemaphore.release(ADDITIVE_INCREASE_FACTOR);
|
||||
degreeOfConcurrency += ADDITIVE_INCREASE_FACTOR;
|
||||
|
||||
logger.debug("pki {} degree of parallelism increased to {}. available semaphore permits {}", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits());
|
||||
}
|
||||
}
|
||||
|
||||
double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds();
|
||||
documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted;
|
||||
|
||||
logger.debug("Partition key range id {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks."
|
||||
logger.debug("pki {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks."
|
||||
+ " Faced {} throttles. Total documents inserterd so far {}.",
|
||||
partitionKeyRangeId,
|
||||
insertMetricsSample.numberOfDocumentsInserted,
|
||||
|
@ -205,10 +214,13 @@ class CongestionController {
|
|||
degreeOfConcurrency,
|
||||
insertMetricsSample.numberOfThrottles,
|
||||
documentsInsertedSoFar);
|
||||
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("Interrupted", e);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
logger.error("pki {} unexpected failure", partitionKeyRangeId, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -217,9 +229,33 @@ class CongestionController {
|
|||
};
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> ExecuteAll() {
|
||||
public ListenableFuture<Void> executeAllAsync() {
|
||||
|
||||
Callable<ListenableFuture<Void>> c = new Callable<ListenableFuture<Void>>() {
|
||||
|
||||
logger.debug("Executing batching in partition {}", partitionKeyRangeId);
|
||||
@Override
|
||||
public ListenableFuture<Void> call() throws Exception {
|
||||
return executeAll();
|
||||
}
|
||||
};
|
||||
|
||||
ListenableFuture<ListenableFuture<Void>> f = executor.submit(c);
|
||||
AsyncFunction<ListenableFuture<Void>, Void> function = new AsyncFunction<ListenableFuture<Void>, Void>() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> apply(ListenableFuture<Void> input) throws Exception {
|
||||
return input;
|
||||
}
|
||||
};
|
||||
return Futures.transformAsync(f, function, executor);
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> executeAll() {
|
||||
|
||||
logger.debug("pki{} Executing batching", partitionKeyRangeId);
|
||||
|
||||
ListenableFuture<Void> completionFuture = executor.submit(congestionControlTask());
|
||||
|
||||
Iterator<Callable<InsertMetrics>> batchExecutionIterator = batchInserter.miniBatchInsertExecutionCallableIterator();
|
||||
|
||||
List<ListenableFuture<InsertMetrics>> futureList = new ArrayList<>();
|
||||
|
@ -228,11 +264,12 @@ class CongestionController {
|
|||
|
||||
// Main thread waits on the throttleSem so no more than MaxDegreeOfParallelism Tasks are run at a time.
|
||||
try {
|
||||
logger.trace("trying to accequire semaphore");
|
||||
logger.debug("pki {} trying to accequire semaphore to execute a task. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits());
|
||||
this.throttleSemaphore.acquire();
|
||||
logger.trace("semaphore accequired");
|
||||
logger.debug("pki {} accquiring semaphore for executing a task succeeded. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits());
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Interrupted", e);
|
||||
logger.error("pki {} Interrupted, releasing semaphore", partitionKeyRangeId, e);
|
||||
this.throttleSemaphore.release();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
|
@ -242,16 +279,18 @@ class CongestionController {
|
|||
|
||||
@Override
|
||||
public void onSuccess(InsertMetrics result) {
|
||||
logger.debug("pki {} accquiring a synchronized lock to update metrics", partitionKeyRangeId);
|
||||
|
||||
synchronized (CongestionController.this) {
|
||||
aggregatedInsertMetrics = InsertMetrics.sum(aggregatedInsertMetrics, result);
|
||||
}
|
||||
logger.trace("releasing semaphore");
|
||||
logger.debug("pki {} releasing semaphore on completion of task", partitionKeyRangeId);
|
||||
throttleSemaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.trace("encountered failure {} releasing semaphore", t);
|
||||
logger.trace("pki {} encountered failure {} releasing semaphore", partitionKeyRangeId, t);
|
||||
throttleSemaphore.release();
|
||||
}
|
||||
};
|
||||
|
@ -266,28 +305,31 @@ class CongestionController {
|
|||
|
||||
@Override
|
||||
public void onSuccess(List<InsertMetrics> result) {
|
||||
logger.debug("importing for partition key range {} completed", partitionKeyRangeId);
|
||||
logger.debug("pki {} importing completed", partitionKeyRangeId);
|
||||
getAndSet(State.Completed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.error("importing for partition key range {} failed", partitionKeyRangeId, t);
|
||||
logger.error("pki {} importing failed", partitionKeyRangeId, t);
|
||||
getAndSet(State.Failure);
|
||||
}
|
||||
};
|
||||
|
||||
Futures.addCallback(allFutureResults, completionCallback, MoreExecutors.directExecutor());
|
||||
return executor.submit(congestionControlTask());
|
||||
return completionFuture;
|
||||
}
|
||||
|
||||
public synchronized State getAndSet(State state) {
|
||||
logger.debug("pki {} state set to {}", partitionKeyRangeId, state);
|
||||
|
||||
State res = this.state;
|
||||
this.state = state;
|
||||
return res;
|
||||
}
|
||||
|
||||
public synchronized boolean isRunning() {
|
||||
logger.debug("pki {} in isRunning", partitionKeyRangeId);
|
||||
return state == State.Running;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
*/
|
||||
package com.microsoft.azure.documentdb.bulkimport;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -31,10 +32,13 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.microsoft.azure.documentdb.ConnectionPolicy;
|
||||
import com.microsoft.azure.documentdb.DocumentClient;
|
||||
import com.microsoft.azure.documentdb.DocumentClientException;
|
||||
import com.microsoft.azure.documentdb.DocumentCollection;
|
||||
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
|
||||
import com.microsoft.azure.documentdb.RetryOptions;
|
||||
|
||||
public class Main {
|
||||
|
@ -49,18 +53,23 @@ public class Main {
|
|||
|
||||
String collectionLink = String.format("/dbs/%s/colls/%s", cfg.getDatabaseId(), cfg.getCollectionId());
|
||||
// this assumes database and collection already exists
|
||||
// also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection.
|
||||
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
|
||||
|
||||
// instantiates bulk importer
|
||||
BulkImporter bulkImporter = new BulkImporter(client, collection);
|
||||
|
||||
Stopwatch totalWatch = Stopwatch.createStarted();
|
||||
|
||||
double totalRequestCharge = 0;
|
||||
long totalTimeInMillis = 0;
|
||||
long totalNumberOfDocumentsImported = 0;
|
||||
|
||||
for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) {
|
||||
|
||||
Iterator<String> inputDocumentIterator = generatedDocuments(cfg);
|
||||
Iterator<String> inputDocumentIterator = generatedDocuments(cfg, collection);
|
||||
System.out.println("##########################################################################################");
|
||||
|
||||
BulkImportResponse bulkImportResponse = bulkImporter.bulkImport(inputDocumentIterator, false);
|
||||
|
||||
totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported();
|
||||
|
@ -68,9 +77,8 @@ public class Main {
|
|||
totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed();
|
||||
|
||||
// print stats
|
||||
System.out.println("##########################################################################################");
|
||||
System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported());
|
||||
System.out.println("Import time for this checkpoint: " + bulkImportResponse.getTotalTimeTaken());
|
||||
System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis());
|
||||
System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed());
|
||||
|
||||
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
|
||||
|
@ -78,42 +86,58 @@ public class Main {
|
|||
System.out.println("##########################################################################################");
|
||||
}
|
||||
|
||||
System.out.println("##########################################################################################");
|
||||
totalWatch.stop();
|
||||
|
||||
System.out.println("##########################################################################################");
|
||||
|
||||
System.out.println("Total summed Import time in milli seconds: " + totalTimeInMillis);
|
||||
System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported);
|
||||
System.out.println("Total Import time in seconds: " + totalTimeInMillis / 1000);
|
||||
System.out.println("Total Import time measured by stop watch in milli seconds: " + totalWatch.elapsed().toMillis());
|
||||
System.out.println("Total request unit consumed: " + totalRequestCharge);
|
||||
System.out.println("Average RUs/second:" + totalRequestCharge / (totalWatch.elapsed().toMillis() * 0.001));
|
||||
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalWatch.elapsed().toMillis() * 0.001));
|
||||
|
||||
System.out.println("Average RUs/second:" + totalRequestCharge / (totalTimeInMillis * 0.001));
|
||||
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalTimeInMillis * 0.001));
|
||||
|
||||
// close bulk importer to release any existing resources
|
||||
bulkImporter.close();
|
||||
|
||||
// close document client
|
||||
client.close();
|
||||
}
|
||||
|
||||
private static Iterator<String> generatedDocuments(Configuration cfg) {
|
||||
private static Iterator<String> generatedDocuments(Configuration cfg, DocumentCollection collection) {
|
||||
|
||||
PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey();
|
||||
Preconditions.checkArgument(partitionKeyDefinition != null &&
|
||||
partitionKeyDefinition.getPaths().size() > 0, "there is no partition key definition");
|
||||
|
||||
Collection<String> partitionKeyPath = partitionKeyDefinition.getPaths();
|
||||
Preconditions.checkArgument(partitionKeyPath.size() == 1,
|
||||
"the command line benchmark tool only support simple partition key path");
|
||||
|
||||
String partitionKeyName = partitionKeyPath.iterator().next().replaceFirst("^/", "");
|
||||
|
||||
// the size of each document is approximately 1KB
|
||||
|
||||
// return documents to be bulk imported
|
||||
// if you are reading documents from disk you can change this to read documents from disk
|
||||
return IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(i ->
|
||||
{
|
||||
String id = UUID.randomUUID().toString();
|
||||
String mypk = UUID.randomUUID().toString();
|
||||
String v = UUID.randomUUID().toString();
|
||||
String fieldValue = v + "123456789";
|
||||
for(int j = 0; j < 24 ; j++) {
|
||||
fieldValue += v;
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{");
|
||||
sb.append("\"id\":\"").append(UUID.randomUUID().toString()).append("\"");
|
||||
sb.append(",\"").append(partitionKeyName).append("\":\"").append(UUID.randomUUID().toString()).append("abc\"");
|
||||
|
||||
String data = UUID.randomUUID().toString();
|
||||
data = data + data + "0123456789012";
|
||||
|
||||
for(int j = 0; j < 10;j++) {
|
||||
sb.append(",").append("\"f").append(j).append("\":\"").append(data).append("\"");
|
||||
}
|
||||
String doc = String.format("{" +
|
||||
" \"dataField\": \"%s\"," +
|
||||
" \"mypk\": \"%s\"," +
|
||||
" \"id\": \"%s\"" +
|
||||
"}", fieldValue, mypk, id);
|
||||
|
||||
//System.out.println("number of bytes in document: " + doc.getBytes(Charset.forName("UTF-8")).length);
|
||||
|
||||
return doc;
|
||||
sb.append("}");
|
||||
|
||||
return sb.toString();
|
||||
}).iterator();
|
||||
}
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ public class CongestionControllerTests {
|
|||
Mockito.any(Object[].class))).thenReturn(bulkImportResponse);
|
||||
|
||||
CongestionController cc = new CongestionController(listeningExecutorService, 10000, paritionKeyRangeId, bi);
|
||||
ListenableFuture<Void> listenableFuture = cc.ExecuteAll();
|
||||
ListenableFuture<Void> listenableFuture = cc.executeAllAsync();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
|
@ -168,7 +168,7 @@ public class CongestionControllerTests {
|
|||
Mockito.any(Object[].class))).thenReturn(bulkImportResponse1).thenReturn(bulkImportResponse2);
|
||||
|
||||
CongestionController cc = new CongestionController(listeningExecutorService, 10000, paritionKeyRangeId, bi);
|
||||
ListenableFuture<Void> listenableFuture = cc.ExecuteAll();
|
||||
ListenableFuture<Void> listenableFuture = cc.executeAllAsync();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче