code review comments addressed

This commit is contained in:
Mohammad Derakhshani 2017-10-17 12:15:44 -07:00
Родитель 2efc7751f1
Коммит 6e1982acc4
7 изменённых файлов: 35 добавлений и 37 удалений

Просмотреть файл

@ -155,7 +155,6 @@ class BatchInserter {
String[] docBatch = miniBatch.subList(currentDocumentIndex, miniBatch.size()).toArray(new String[0]);
// Before c# 6, await not allowed in the catch block, hence the following two local variables.
boolean isThrottled = false;
Duration retryAfter = Duration.ZERO;

Просмотреть файл

@ -28,17 +28,17 @@ public class BulkImportResponse {
/**
* Total number of documents imported.
*/
private int numberOfDocumentsImported;
final private int numberOfDocumentsImported;
/**
* Total request units consumed.
*/
private double totalRequestUnitsConsumed;
final private double totalRequestUnitsConsumed;
/**
* Total bulk import time.
*/
private Duration totalTimeTaken;
final private Duration totalTimeTaken;
public BulkImportResponse(int numberOfDocumentsImported, double totalRequestUnitsConsumed, Duration totalTimeTaken) {
this.numberOfDocumentsImported = numberOfDocumentsImported;

Просмотреть файл

@ -195,7 +195,7 @@ class CongestionController {
if (insertMetricsSample.numberOfThrottles == 0) {
if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) &&
degreeOfConcurrency + ADDITIVE_INCREASE_FACTOR <= MAX_DEGREE_OF_CONCURRENCY) {
// We aren't getting throttles, so we should bump of the degree of concurrency (AIAD).
// We aren't getting throttles, so we should bump of the degree of concurrency (AIMD).
logger.debug("pki {} increasing degree of prallelism and releasing semaphore", partitionKeyRangeId);
throttleSemaphore.release(ADDITIVE_INCREASE_FACTOR);

Просмотреть файл

@ -68,7 +68,7 @@ import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache;
import com.microsoft.azure.documentdb.internal.routing.Range;
public class BulkImporter implements AutoCloseable {
public class DocumentBulkImporter implements AutoCloseable {
/**
* The name of the system stored procedure for bulk import.
@ -80,7 +80,6 @@ public class BulkImporter implements AutoCloseable {
*/
private final static int MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE = (2202010 * 5) / 10;
/**
* The fraction of maximum sproc payload size upto which documents allowed to be fit in a mini-batch.
*/
@ -89,12 +88,12 @@ public class BulkImporter implements AutoCloseable {
/**
* Logger
*/
private final Logger logger = LoggerFactory.getLogger(BulkImporter.class);
private final Logger logger = LoggerFactory.getLogger(DocumentBulkImporter.class);
/**
* Degree of parallelism for each partition
*/
private final Map<String, Integer> partitionDegreeOfConcurrency = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Integer> partitionDegreeOfConcurrency = new ConcurrentHashMap<>();
/**
* Executor Service
@ -112,9 +111,8 @@ public class BulkImporter implements AutoCloseable {
private final DocumentCollection collection;
/**
* The list of degrees of concurrency per partition.
* Partition Key Definition of the underlying collection.
*/
private final PartitionKeyDefinition partitionKeyDefinition;
/**
@ -143,15 +141,15 @@ public class BulkImporter implements AutoCloseable {
private int collectionThroughput;
/**
* Initializes a new instance of {@link BulkImporter}
* Initializes a new instance of {@link DocumentBulkImporter}
*
* @param client {@link DocumentClient} instance to use
* @param collection inserts documents to {@link DocumentCollection}
* @throws DocumentClientException if any failure
*/
public BulkImporter(DocumentClient client, DocumentCollection collection) {
public DocumentBulkImporter(DocumentClient client, DocumentCollection collection) {
Preconditions.checkNotNull(client, "DocumentClient cannot be null");
Preconditions.checkNotNull(client, "client cannot be null");
Preconditions.checkNotNull(collection, "collection cannot be null");
this.client = client;
@ -169,16 +167,17 @@ public class BulkImporter implements AutoCloseable {
while(true) {
try {
initialize();
break;
} catch (Exception e) {
count++;
DocumentClientException dce = extractDeepestDocumentClientException(e);
if (count < 3 && dce != null && dce.getStatusCode() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS ) {
Thread.sleep(count * dce.getRetryAfterInMilliseconds());
continue;
}
throw e;
} else {
throw e;
}
}
break;
}
return null;
}
@ -210,7 +209,7 @@ public class BulkImporter implements AutoCloseable {
}
/**
* Initializes {@link BulkImporter}. This happens only once
* Initializes {@link DocumentBulkImporter}. This happens only once
* @throws DocumentClientException
*/
private void initialize() throws DocumentClientException {
@ -259,7 +258,7 @@ public class BulkImporter implements AutoCloseable {
* for(int i = 0; i < 10; i++) {
* List<String> documents = documentSource.getMoreDocuments();
*
* BulkImportResponse bulkImportResponse = importer.bulkImport(documents, false);
* BulkImportResponse bulkImportResponse = importer.doImport(documents, false);
*
* //validate that all documents inserted to ensure no failure.
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
@ -272,7 +271,7 @@ public class BulkImporter implements AutoCloseable {
* @return an instance of {@link BulkImportResponse}
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse bulkImport(Collection<String> documents, boolean enableUpsert) throws DocumentClientException {
public BulkImportResponse importAll(Collection<String> documents, boolean enableUpsert) throws DocumentClientException {
Func2<Collection<String>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction = new Func2<Collection<String>, ConcurrentHashMap<String,Set<String>>, Void>() {
@Override
@ -304,7 +303,7 @@ public class BulkImporter implements AutoCloseable {
* for(int i = 0; i < 10; i++) {
* List<Tuple> tuples = documentSource.getMoreDocumentsPartitionKeyValueTuples();
*
* BulkImportResponse bulkImportResponse = importer.bulkImportWithPreprocessedPartitionKey(tuples, false);
* BulkImportResponse bulkImportResponse = importer.importAllWithPartitionKey(tuples, false);
*
* // validate that all documents inserted to ensure no failure.
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
@ -317,7 +316,7 @@ public class BulkImporter implements AutoCloseable {
* @return an instance of {@link BulkImportResponse}
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse bulkImportWithPreprocessedPartitionKey(Collection<Tuple> documentPartitionKeyValueTuples, boolean enableUpsert) throws DocumentClientException {
public BulkImportResponse importAllWithPartitionKey(Collection<Tuple> documentPartitionKeyValueTuples, boolean enableUpsert) throws DocumentClientException {
Func2<Collection<Tuple>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction =
new Func2<Collection<Tuple>, ConcurrentHashMap<String,Set<String>>, Void>() {

Просмотреть файл

@ -57,7 +57,7 @@ public class Main {
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
// instantiates bulk importer
BulkImporter bulkImporter = new BulkImporter(client, collection);
DocumentBulkImporter bulkImporter = new DocumentBulkImporter(client, collection);
Stopwatch totalWatch = Stopwatch.createUnstarted();
@ -74,7 +74,7 @@ public class Main {
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.bulkImportWithPreprocessedPartitionKey(documentPartitionKeyValueTuples, false);
bulkImportResponse = bulkImporter.importAllWithPartitionKey(documentPartitionKeyValueTuples, false);
totalWatch.stop();
} else {
@ -83,7 +83,7 @@ public class Main {
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.bulkImport(documents, false);
bulkImportResponse = bulkImporter.importAll(documents, false);
totalWatch.stop();
}

Просмотреть файл

@ -64,7 +64,7 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
@Test
public void bulkImport() throws Exception {
try (BulkImporter importer = new BulkImporter(client, this.pCollection)) {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
List<String> documents = new ArrayList<>();
@ -74,14 +74,14 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
documents.add(DocumentDataSource.randomDocument(partitionKeyValue, pCollection.getPartitionKey()));
}
BulkImportResponse response = importer.bulkImport(documents, false);
BulkImportResponse response = importer.importAll(documents, false);
validateSuccess(deserialize(documents), response);
}
}
@Test
public void bulkImportAlreadyExists() throws Exception {
try (BulkImporter importer = new BulkImporter(client, this.pCollection)) {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
List<String> documents = new ArrayList<>();
@ -91,20 +91,20 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
documents.add(DocumentDataSource.randomDocument(partitionKeyValue, pCollection.getPartitionKey()));
}
BulkImportResponse response = importer.bulkImport(documents, false);
BulkImportResponse response = importer.importAll(documents, false);
validateSuccess(deserialize(documents), response);
response = importer.bulkImport(documents, false);
response = importer.importAll(documents, false);
assertThat(response.getNumberOfDocumentsImported(), equalTo(0));
response = importer.bulkImport(documents, true);
response = importer.importAll(documents, true);
assertThat(response.getNumberOfDocumentsImported(), equalTo(documents.size()));
}
}
@Test
public void bulkImportWithPreknownPartitionKeyValues() throws Exception {
try (BulkImporter importer = new BulkImporter(client, this.pCollection)) {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
List<Tuple> tuples = new ArrayList<>();
@ -116,13 +116,13 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
tuples.add(t);
}
BulkImportResponse response = importer.bulkImportWithPreprocessedPartitionKey(tuples, false);
BulkImportResponse response = importer.importAllWithPartitionKey(tuples, false);
validateSuccess(deserialize(tuples.stream().map(t -> t.document).collect(Collectors.toList())), response);
}
}
public void bulkImportWithMissingParitionKeyField() throws Exception {
try (BulkImporter importer = new BulkImporter(client, this.pCollection)) {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
List<Tuple> tuples = new ArrayList<>();
@ -134,7 +134,7 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
tuples.add(t);
}
BulkImportResponse response = importer.bulkImportWithPreprocessedPartitionKey(tuples, false);
BulkImportResponse response = importer.importAllWithPartitionKey(tuples, false);
validateSuccess(deserialize(tuples.stream().map(t -> t.document).collect(Collectors.toList())), response);
}
}

Просмотреть файл

@ -45,7 +45,7 @@ public class Sample {
// this assumes database and collection already exists
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
try(BulkImporter importer = new BulkImporter(client, collection)) {
try(DocumentBulkImporter importer = new DocumentBulkImporter(client, collection)) {
//NOTE: for getting higher throughput please
// 1) Set JVM heap size to a large enough number to avoid any memory issue in handling large number of documents
@ -57,7 +57,7 @@ public class Sample {
for(int i = 0; i< 10; i++) {
Collection<String> docs = DataMigrationDocumentSource.loadDocuments(1000000, collection.getPartitionKey());
BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false);
BulkImportResponse bulkImportResponse = importer.importAll(docs, false);
// returned stats
System.out.println("Number of documents inserted: " + bulkImportResponse.getNumberOfDocumentsImported());