added support for passing max mini batch size in builder pattern

This commit is contained in:
Mohammad Derakhshani 2017-10-18 12:08:45 -07:00
Родитель 151bf2cfb2
Коммит a9683362a9
4 изменённых файлов: 84 добавлений и 14 удалений

Просмотреть файл

@ -71,6 +71,56 @@ import com.microsoft.azure.documentdb.internal.routing.Range;
public class DocumentBulkImporter implements AutoCloseable {
public static class Builder {
private DocumentClient client;
private DocumentCollection collection;
private int miniBatchSize = (int)Math.floor(MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE * FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED);
/**
* Use the instance of {@link DocumentClient} to bulk import to the given instance of {@link DocumentCollection}
* @param client
* @param collection
* @return {@link Builder}
*/
public Builder from(DocumentClient client, DocumentCollection collection) {
this.client = client;
this.collection = collection;
return this;
}
/**
* use the given size to configure max mini batch size.
*
* If not specified will use the default.
* @param size
* @return {@link Builder}
*/
public Builder withMaxMiniBatchSize(int size) {
this.miniBatchSize = size;
return this;
}
/**
* Instantiates {@link DocumentBulkImporter} given the configured {@link Builder}.
*
* @return the new builder
*/
public DocumentBulkImporter build() {
return new DocumentBulkImporter(client, collection, miniBatchSize);
}
private Builder() {}
}
/**
* Creates a new {@link DocumentBulkImporter.Builder} instance
* @return
*/
public static DocumentBulkImporter.Builder builder() {
return new DocumentBulkImporter.Builder();
}
/**
* The name of the system stored procedure for bulk import.
*/
@ -141,20 +191,27 @@ public class DocumentBulkImporter implements AutoCloseable {
*/
private int collectionThroughput;
/**
* Max Mini Batch Size
*/
private int maxMiniBatchSize;
/**
* Initializes a new instance of {@link DocumentBulkImporter}
*
* @param client {@link DocumentClient} instance to use
* @param collection inserts documents to {@link DocumentCollection}
* @param maxMiniBatchSize
* @throws DocumentClientException if any failure
*/
public DocumentBulkImporter(DocumentClient client, DocumentCollection collection) {
private DocumentBulkImporter(DocumentClient client, DocumentCollection collection, int maxMiniBatchSize) {
Preconditions.checkNotNull(client, "client cannot be null");
Preconditions.checkNotNull(collection, "collection cannot be null");
Preconditions.checkArgument(maxMiniBatchSize > 0, "maxMiniBatchSize cannot be negative");
this.client = client;
this.collection = collection;
this.maxMiniBatchSize = maxMiniBatchSize;
this.partitionKeyDefinition = collection.getPartitionKey();
@ -261,8 +318,8 @@ public class DocumentBulkImporter implements AutoCloseable {
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycol");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* BulkImporter importer = new BulkImporter(client, collection);
*
* DocumentBulkImporter importer = DocumentBulkImporter.builder().from(client, collection).build();
*
* for(int i = 0; i < 10; i++) {
* List<String> documents = documentSource.getMoreDocuments();
*
@ -303,7 +360,7 @@ public class DocumentBulkImporter implements AutoCloseable {
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycold");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* BulkImporter importer = new BulkImporter(client, collection);
* DocumentBulkImporter importer = DocumentBulkImporter.builder().from(client, collection).build();
*
* for(int i = 0; i < 10; i++) {
* List<DocumentPKValuePair> tuples = documentSource.getMoreDocumentsPartitionKeyValueTuples();
@ -324,7 +381,6 @@ public class DocumentBulkImporter implements AutoCloseable {
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse importAllWithPartitionKey(Collection<DocumentPKValuePair> documentPartitionKeyValueTuples, boolean isUpsert) throws DocumentClientException {
return executeBulkImportInternal(documentPartitionKeyValueTuples,
tuple -> tuple.document,
tuple -> DocumentAnalyzer.fromPartitionKeyvalue(tuple.partitionKeyValue),
@ -390,7 +446,6 @@ public class DocumentBulkImporter implements AutoCloseable {
});
logger.trace("Creating mini batches within each partition bucket");
int maxMiniBatchSize = (int)Math.floor(MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE * FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED);
documentsToImportByPartition.entrySet().parallelStream().forEach(entry -> {
@ -409,7 +464,7 @@ public class DocumentBulkImporter implements AutoCloseable {
int currentDocumentSize = getSizeInBytes(currentDocument);
if (currentDocumentSize > maxMiniBatchSize) {
logger.error("Document size {} larger than script payload limit. {}", currentDocumentSize, maxMiniBatchSize);
throw new UnsupportedOperationException ("Cannot try to import a document whose size is larger than script payload limit.");
throw new UnsupportedOperationException ("Cannot import a document whose size is larger than script payload limit.");
}
currentMiniBatch.add(currentDocument);

Просмотреть файл

@ -40,6 +40,7 @@ import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.RetryOptions;
import com.microsoft.azure.documentdb.bulkimport.DocumentBulkImporter.Builder;
public class Main {
@ -56,8 +57,10 @@ public class Main {
// also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection.
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, collection);
// instantiates bulk importer
try(DocumentBulkImporter bulkImporter = new DocumentBulkImporter(client, collection)) {
try(DocumentBulkImporter bulkImporter = bulkImporterBuilder.build()) {
Stopwatch totalWatch = Stopwatch.createUnstarted();

Просмотреть файл

@ -40,6 +40,7 @@ import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.Undefined;
import com.microsoft.azure.documentdb.bulkimport.DocumentBulkImporter.Builder;
@RunWith(Parameterized.class)
public class EndToEndBulkImportTests extends EndToEndTestBase {
@ -64,7 +65,9 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
@Test
public void bulkImport() throws Exception {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, this.pCollection);
try (DocumentBulkImporter importer = bulkImporterBuilder.build()) {
List<String> documents = new ArrayList<>();
@ -81,7 +84,9 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
@Test
public void bulkImportAlreadyExists() throws Exception {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, this.pCollection);
try (DocumentBulkImporter importer = bulkImporterBuilder.build()) {
List<String> documents = new ArrayList<>();
@ -104,7 +109,9 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
@Test
public void bulkImportWithPreknownPartitionKeyValues() throws Exception {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, this.pCollection);
try (DocumentBulkImporter importer = bulkImporterBuilder.build()) {
List<DocumentPKValuePair> tuples = new ArrayList<>();
@ -122,7 +129,9 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
}
public void bulkImportWithMissingParitionKeyField() throws Exception {
try (DocumentBulkImporter importer = new DocumentBulkImporter(client, this.pCollection)) {
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, this.pCollection);
try (DocumentBulkImporter importer = bulkImporterBuilder.build()) {
List<DocumentPKValuePair> tuples = new ArrayList<>();

Просмотреть файл

@ -31,6 +31,7 @@ import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.RetryOptions;
import com.microsoft.azure.documentdb.bulkimport.DocumentBulkImporter.Builder;
import com.microsoft.azure.documentdb.bulkimport.Main.DataMigrationDocumentSource;
public class Sample {
@ -53,7 +54,9 @@ public class Sample {
// this assumes database and collection already exists
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
try(DocumentBulkImporter importer = new DocumentBulkImporter(client, collection)) {
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, collection);
try(DocumentBulkImporter importer = bulkImporterBuilder.build()) {
//NOTE: for getting higher throughput please