This commit is contained in:
Mohammad Derakhshani 2017-10-10 12:25:41 -07:00
Родитель 3770d7d0a6
Коммит f59981b6d6
18 изменённых файлов: 2167 добавлений и 0 удалений

70
bulkimport/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,70 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>documentdb-bulkimport</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>Bulk Importer for Microsoft Azure DocumentDB</description>
<url>http://azure.microsoft.com/en-us/services/documentdb/</url>
<licenses>
<license>
<name>MIT License</name>
<url>http://www.opensource.org/licenses/mit-license.php</url>
</license>
</licenses>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<slf4j.version>1.7.6</slf4j.version>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-documentdb</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<!-- TODO: we should remove it (it is here for now for testing with log4j
logging backend -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,261 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AtomicDouble;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
import com.microsoft.azure.documentdb.internal.HttpConstants;
class BatchInserter {
private final Logger logger = LoggerFactory.getLogger(BatchInserter.class);
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* The index of the physical partition this batch inserter is responsible for.
*/
private final String partitionKeyRangeId;
/**
* The list of mini-batches this batch inserter is responsible to import.
*/
private final List<List<String>> batchesToInsert;
/**
* The document client to use.
*/
private final DocumentClient client;
/**
* The link to the system bulk import stored procedure.
*/
private final String bulkImportSprocLink;
/**
* The options passed to the system bulk import stored procedure.
*/
private final BulkImportStoredProcedureOptions storedProcOptions;
/**
* The count of documents bulk imported by this batch inserter.
*/
public final AtomicInteger numberOfDocumentsImported;
/**
* The total request units consumed by this batch inserter.
*/
public final AtomicDouble totalRequestUnitsConsumed;
public BatchInserter(String partitionKeyRangeId, List<List<String>> batchesToInsert, DocumentClient client, String bulkImportSprocLink,
BulkImportStoredProcedureOptions options) {
this.partitionKeyRangeId = partitionKeyRangeId;
this.batchesToInsert = batchesToInsert;
this.client = client;
this.bulkImportSprocLink = bulkImportSprocLink;
this.storedProcOptions = options;
this.numberOfDocumentsImported = new AtomicInteger();
this.totalRequestUnitsConsumed = new AtomicDouble();
}
public int getNumberOfDocumentsImported() {
return numberOfDocumentsImported.get();
}
public double getTotalRequestUnitsConsumed() {
return totalRequestUnitsConsumed.get();
}
public Iterator<Callable<InsertMetrics>> miniBatchInsertExecutionCallableIterator() {
Stream<Callable<InsertMetrics>> stream = batchesToInsert.stream().map(miniBatch -> {
return new Callable<InsertMetrics>() {
@Override
public InsertMetrics call() throws Exception {
Stopwatch stopwatch = Stopwatch.createStarted();
double requestUnitsCounsumed = 0;
int numberOfThrottles = 0;
StoredProcedureResponse response;
boolean timedOut = false;
int currentDocumentIndex = 0;
while (currentDocumentIndex < miniBatch.size()) {
String[] docBatch = miniBatch.subList(currentDocumentIndex, miniBatch.size() - currentDocumentIndex).toArray(new String[0]);
// Before c# 6, await not allowed in the catch block, hence the following two local variables.
boolean isThrottled = false;
Duration retryAfter = Duration.ZERO;
try {
RequestOptions requestOptions = new RequestOptions();
// set partition key range id
requestOptions.setPartitionKeyRengeId(partitionKeyRangeId);
logger.debug("Partition Key Range Id {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length);
if (!timedOut) {
response = client.executeStoredProcedure(bulkImportSprocLink, requestOptions, new Object[] { docBatch, storedProcOptions, null });
} else {
BulkImportStoredProcedureOptions modifiedStoredProcOptions = new BulkImportStoredProcedureOptions(
storedProcOptions.disableAutomaticIdGeneration,
storedProcOptions.softStopOnConflict,
storedProcOptions.systemCollectionId,
storedProcOptions.enableBsonSchema,
true);
response = client.executeStoredProcedure(
bulkImportSprocLink, requestOptions,
new Object[] { docBatch, modifiedStoredProcOptions, null });
}
BulkImportStoredProcedureResponse bulkImportResponse = parseFrom(response);
if (bulkImportResponse != null) {
if (bulkImportResponse.errorCode != 0) {
logger.warn("Partition range id {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode);
}
double requestCharge = response.getRequestCharge();
currentDocumentIndex += bulkImportResponse.count;
numberOfDocumentsImported.addAndGet(bulkImportResponse.count);
requestUnitsCounsumed += requestCharge;
totalRequestUnitsConsumed.addAndGet(requestCharge);
}
else {
logger.warn("Partition range id {} Failed to receive response", partitionKeyRangeId);
}
} catch (DocumentClientException e) {
logger.debug("Importing minibatch for partition key range id {} failed", partitionKeyRangeId, e);
if (isThrottled(e)) {
logger.debug("Throttled on partition range id {}", partitionKeyRangeId);
numberOfThrottles++;
isThrottled = true;
retryAfter = Duration.ofMillis(e.getRetryAfterInMilliseconds());
} else if (isTimedOut(e)) {
logger.debug("Request timed out on partition range id {}", partitionKeyRangeId);
timedOut = true;
} else if (isGone(e)) {
if (isSplit(e)) {
String errorMessage = String.format("Partition range id {} is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
} else {
String errorMessage = String.format("Partition range id {} is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
}
} else {
String errorMessage = String.format("Partition range id {} failed to import mini-batch. Exception was {}. Status code was {}",
partitionKeyRangeId,
e.getMessage(),
e.getStatusCode());
logger.error(errorMessage, e);
throw new RuntimeException(e);
}
} catch (Exception e) {
String errorMessage = String.format("Partition range id {} Failed to import mini-batch. Exception was {}", partitionKeyRangeId,
e.getMessage());
logger.error(errorMessage, e);
throw new RuntimeException(e);
}
if (isThrottled) {
try {
Thread.sleep(retryAfter.getSeconds() * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
stopwatch.stop();
InsertMetrics insertMetrics = new InsertMetrics(currentDocumentIndex, stopwatch.elapsed(), requestUnitsCounsumed, numberOfThrottles);
return insertMetrics;
}
};
});
return stream.iterator();
}
private boolean isThrottled(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS;
}
private boolean isTimedOut(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.TIMEOUT;
}
private boolean isGone(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.GONE;
}
private boolean isSplit(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.GONE
&& HttpConstants.SubStatusCodes.SPLITTING == e.getSubStatusCode();
}
private BulkImportStoredProcedureResponse parseFrom(StoredProcedureResponse storedProcResponse) throws JsonParseException, JsonMappingException, IOException {
String res = storedProcResponse.getResponseAsString();
logger.debug("MiniBatch Insertion for Partition Key Range Id {}: Stored Proc Response as String {}", partitionKeyRangeId, res);
if (StringUtils.isEmpty(res))
return null;
return objectMapper.readValue(res, BulkImportStoredProcedureResponse.class);
}
}

Просмотреть файл

@ -0,0 +1,69 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.time.Duration;
public class BulkImportResponse {
/**
* Total number of documents imported.
*/
private int numberOfDocumentsImported;
/**
* Total request units consumed.
*/
private double totalRequestUnitsConsumed;
/**
* Total bulk import time.
*/
private Duration totalTimeTaken;
public BulkImportResponse(int numberOfDocumentsImported, double totalRequestUnitsConsumed, Duration totalTimeTaken) {
this.numberOfDocumentsImported = numberOfDocumentsImported;
this.totalRequestUnitsConsumed = totalRequestUnitsConsumed;
this.totalTimeTaken = totalTimeTaken;
}
/**
* @return the numberOfDocumentsImported
*/
public int getNumberOfDocumentsImported() {
return numberOfDocumentsImported;
}
/**
* @return the totalRequestUnitsConsumed
*/
public double getTotalRequestUnitsConsumed() {
return totalRequestUnitsConsumed;
}
/**
* @return the totalTimeTaken
*/
public Duration getTotalTimeTaken() {
return totalTimeTaken;
}
}

Просмотреть файл

@ -0,0 +1,64 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import com.fasterxml.jackson.annotation.JsonProperty;
class BulkImportStoredProcedureOptions {
@JsonProperty("disableAutomaticIdGeneration")
public boolean disableAutomaticIdGeneration;
@JsonProperty("softStopOnConflict")
public boolean softStopOnConflict;
@JsonProperty("systemCollectionId")
public String systemCollectionId;
@JsonProperty("enableBsonSchema")
public boolean enableBsonSchema;
@JsonProperty("enableUpsert")
public boolean enableUpsert;
public BulkImportStoredProcedureOptions(
boolean disableAutomaticIdGeneration,
boolean softStopOnConflict,
String systemCollectionId,
boolean enableBsonSchema) {
this(disableAutomaticIdGeneration, softStopOnConflict, systemCollectionId, enableBsonSchema, false);
}
public BulkImportStoredProcedureOptions(
boolean disableAutomaticIdGeneration,
boolean softStopOnConflict,
String systemCollectionId,
boolean enableBsonSchema,
boolean enableUpsert) {
this.disableAutomaticIdGeneration = disableAutomaticIdGeneration;
this.softStopOnConflict = softStopOnConflict;
this.systemCollectionId = systemCollectionId;
this.enableBsonSchema = enableBsonSchema;
this.enableUpsert = enableUpsert;
}
}

Просмотреть файл

@ -0,0 +1,40 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import com.fasterxml.jackson.annotation.JsonProperty;
class BulkImportStoredProcedureResponse {
/**
* Represents the bulk import progress index
*/
@JsonProperty("count")
public int count;
/**
* Represents the specific error which occurred during bulk import execution.
*/
@JsonProperty("errorCode")
public int errorCode;
}

Просмотреть файл

@ -0,0 +1,352 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Futures.FutureCombiner;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.microsoft.azure.documentdb.Database;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache;
import com.microsoft.azure.documentdb.internal.routing.Range;
public class BulkImporter {
/**
* The name of the system stored procedure for bulk import.
*/
private final static String BULK_IMPORT_STORED_PROCECURE_NAME = "__.sys.commonBulkInsert";
/**
* The maximal sproc payload size sent (as a fraction of 2MB).
*/
private final static int MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE = (2202010 * 5) / 10;
/**
* The fraction of maximum sproc payload size upto which documents allowed to be fit in a mini-batch.
*/
private final static double FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED = 0.5;
/**
* Logger
*/
private final Logger logger = LoggerFactory.getLogger(BulkImporter.class);
/**
* Degree of parallelism for each partition
*/
private final Map<String, Integer> partitionDegreeOfConcurrency = Collections.synchronizedMap(new HashMap<>());
/**
* Executor Service
*/
private final ListeningExecutorService listeningExecutorService;
/**
* The DocumentDB client instance.
*/
private final DocumentClient client;
/**
* The document collection to which documents are to be bulk imported.
*/
private final DocumentCollection collection;
/**
* The list of degrees of concurrency per partition.
*/
private final PartitionKeyDefinition partitionKeyDefinition;
/**
* Partition Key Range Ids
*/
private List<String> partitionKeyRangeIds;
/**
* Collection routing map used to retrieve partition key range Ids of a given collection
*/
private CollectionRoutingMap collectionRoutingMap;
/**
* Bulk Import Stored Procedure Link relevant to the given collection
*/
private String bulkImportStoredProcLink;
/**
* Initialization Future
*/
private final Future<Void> initializationFuture;
/**
* Initializes a new instance of {@link BulkImporter}
*
* @param client {@link DocumentClient} instance to use
* @param collection inserts documents to {@link DocumentCollection}
* @throws DocumentClientException if any failure
*/
public BulkImporter(DocumentClient client, DocumentCollection collection) {
Preconditions.checkNotNull(client, "DocumentClient cannot be null");
Preconditions.checkNotNull(collection, "collection cannot be null");
this.client = client;
this.collection = collection;
this.partitionKeyDefinition = collection.getPartitionKey();
this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newWorkStealingPool(client.getConnectionPolicy().getMaxPoolSize()));
this.initializationFuture = this.listeningExecutorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
initialize();
return null;
}
});
}
/**
* Initializes {@link BulkImporter}. This happens only once
* @throws DocumentClientException
*/
private void initialize() throws DocumentClientException {
logger.debug("Initializing ...");
String databaseId = collection.getSelfLink().split("/")[1];
Database d = client.readDatabase(String.format("/dbs/%s", databaseId), null).getResource();
bulkImportStoredProcLink = String.format("/dbs/%s/colls/%s/sprocs/%s", d.getId(), collection.getId(), BULK_IMPORT_STORED_PROCECURE_NAME);
logger.trace("Fetching partition map of collection");
Range<String> fullRange = new Range<String>(
PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
true,
false);
this.collectionRoutingMap = getCollectionRoutingMap(client);
Collection<PartitionKeyRange> partitionKeyRanges = this.collectionRoutingMap.getOverlappingRanges(fullRange);
this.partitionKeyRangeIds = partitionKeyRanges.stream().map(partitionKeyRange -> partitionKeyRange.getId()).collect(Collectors.toList());
logger.debug("Initialization completed");
}
/**
* Executes a bulk import in the Azure Cosmos DB database service.
*
* <code>
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, null, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "perf-col");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* BulkImporter importer = new BulkImporter(client, collection);
*
* List<String> docs = new ArrayList<String>();
* for(int i = 0; i < 200000; i++) {
* String id = UUID.randomUUID().toString();
* String mypk = "Seattle";
* String v = UUID.randomUUID().toString();
* String doc = String.format("{" +
* " \"dataField\": \"%s\"," +
* " \"mypk\": \"%s\"," +
* " \"id\": \"%s\"" +
* "}", v, mypk, id);
*
* docs.add(doc);
* }
*
* BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false);
*
* client.close();
* </code>
* @param documents to insert
* @param enableUpsert whether enable upsert (overwrite if it exists)
* @return an instance of {@link BulkImportResponse}
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse bulkImport(Collection<String> documents, boolean enableUpsert) throws DocumentClientException {
Preconditions.checkNotNull(documents, "documents cannot be null");
try {
initializationFuture.get();
return executeBulkImportAsyncImpl(documents, enableUpsert).get();
} catch(Exception e) {
logger.error("Failed to import documents", e);
throw new DocumentClientException(500, e);
}
}
private ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<String> documents, boolean enableUpsert) {
Stopwatch watch = Stopwatch.createStarted();
BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, null, false, enableUpsert);
ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = new ConcurrentHashMap<String, Set<String>>();
ConcurrentHashMap<String, List<List<String>>> miniBatchesToImportByPartition = new ConcurrentHashMap<String, List<List<String>>>();
for (String partitionKeyRangeId: this.partitionKeyRangeIds) {
documentsToImportByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet());
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>());
}
// Sort documents into partition buckets.
logger.debug("Sorting documents into partition buckets");
documents.parallelStream().forEach(document -> {
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(document, partitionKeyDefinition);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
documentsToImportByPartition.get(partitionRangeId).add(document);
});
logger.trace("Creating mini batches within each partition bucket");
int maxMiniBatchSize = (int)Math.floor(MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE * FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED);
documentsToImportByPartition.entrySet().parallelStream().forEach(entry -> {
String partitionRangeId = entry.getKey();
Set<String> documentsToImportInPartition = entry.getValue();
Iterator<String> it = documentsToImportInPartition.iterator();
while (it.hasNext())
{
List<String> currentMiniBatch = new ArrayList<String>();
int currentMiniBatchSize = 0;
do
{
String currentDocument = it.next();
int currentDocumentSize = getSizeInBytes(currentDocument);
if (currentDocumentSize > maxMiniBatchSize)
{
logger.error("Document size {} larger than script payload limit. {}", currentDocumentSize, maxMiniBatchSize);
throw new UnsupportedOperationException ("Cannot try to import a document whose size is larger than script payload limit.");
}
currentMiniBatch.add(currentDocument);
currentMiniBatchSize += currentDocumentSize;
} while ((currentMiniBatchSize < maxMiniBatchSize) && (it.hasNext()));
miniBatchesToImportByPartition.get(partitionRangeId).add(currentMiniBatch);
}
});
logger.debug("Beginning bulk import within each partition bucket");
Map<String, BatchInserter> batchInserters = new HashMap<String, BatchInserter>();
Map<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();
for (String partitionKeyRangeId: this.partitionKeyRangeIds) {
BatchInserter batchInserter = new BatchInserter(
partitionKeyRangeId,
miniBatchesToImportByPartition.get(partitionKeyRangeId),
this.client,
bulkImportStoredProcLink,
options);
batchInserters.put(partitionKeyRangeId, batchInserter);
congestionControllers.put(partitionKeyRangeId,
new CongestionController(listeningExecutorService, partitionKeyRangeId, batchInserter, this.partitionDegreeOfConcurrency.get(partitionKeyRangeId)));
}
List<ListenableFuture<Void>> futures = congestionControllers.values().parallelStream().map(c -> c.ExecuteAll()).collect(Collectors.toList());
FutureCombiner<Void> futureContainer = Futures.whenAllComplete(futures);
AsyncCallable<BulkImportResponse> completeAsyncCallback = new AsyncCallable<BulkImportResponse>() {
@Override
public ListenableFuture<BulkImportResponse> call() throws Exception {
// TODO: this can change so aggregation of the result happen at each
watch.stop();
for(String partitionKeyRangeId: partitionKeyRangeIds) {
partitionDegreeOfConcurrency.put(partitionKeyRangeId, congestionControllers.get(partitionKeyRangeId).getDegreeOfConcurrency());
}
int numberOfDocumentsImported = batchInserters.values().stream().mapToInt(b -> b.getNumberOfDocumentsImported()).sum();
double totalRequestUnitsConsumed = batchInserters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
BulkImportResponse bulkImportResponse = new BulkImportResponse(numberOfDocumentsImported, totalRequestUnitsConsumed, watch.elapsed());
return Futures.immediateFuture(bulkImportResponse);
}
};
return futureContainer.callAsync(completeAsyncCallback, MoreExecutors.directExecutor());
}
private CollectionRoutingMap getCollectionRoutingMap(DocumentClient client) {
try {
// NOTE: Java doesn't have internal access modifier
// This is only invoked once per Bulk Import Initialization. So this is not costly.
// TODO: explore other options here (if any)
Field f = client.getClass().getDeclaredField("partitionKeyRangeCache"); //NoSuchFieldException
f.setAccessible(true);
PartitionKeyRangeCache cache = (PartitionKeyRangeCache) f.get(client); //IllegalAccessException
return cache.tryLookUp(collection.getSelfLink(), null);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
private int getSizeInBytes(String document) {
return document.getBytes(Charset.forName("UTF-8")).length;
}
}

Просмотреть файл

@ -0,0 +1,297 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
class CongestionController {
private final Logger logger = LoggerFactory.getLogger(CongestionController.class);
/**
* The degree of concurrency to start at.
*/
private static final int STARTING_DEGREE_OF_CONCURRENCY = 2;
/**
* The maximum degree of concurrency to go upto for a single physical partition.
*/
private static final int MAX_DEGREE_OF_CONCURRENCY = 20;
/**
* The congestion controller uses AIMD (additive increase / multiplicative decrease) and this is the additive increase factor.
* If we don't receive a throttle in the sample period, then we increase the degree of concurrency by this amount.
* For example if this is set to 3 and we need to increase the degree of concurrency then "degreeOfConcurrency += 3".
*/
private static final int ADDITIVE_INCREASE_FACTOR = 1;
/**
* The congestion controller uses AIMD (additive increase / multiplicative decrease) and this is the multiplicative decrease factor.
* If we receive a throttle in the sample period, then we decrease the degree of concurrency by this amount.
* For example if this is set to 2 and we need to decrease the degree of concurrency then "degreeOfConcurrency /= 2".
*/
private static final int MULTIPLICATIVE_DECREASE_FACTOR = 2;
/**
* The id of the physical partition that this congestion controller is responsible for.
*/
private final String partitionKeyRangeId;
/**
* This determines how often the code will sample the InsertMetrics and check to see if the degree of concurrency needs to be changed.
*/
private final Duration samplePeriod = Duration.ofSeconds(3);
/**
* The {@link BatchInserter} that exposes a stream of {@link Callable} that insert document batches and returns an {@link InsertMetrics}
*/
private final BatchInserter batchInserter;
/**
* The semaphore that throttles the BatchInserter tasks by only allowing at most 'degreeOfConcurrency' to run at a time.
*/
private final Semaphore throttleSemaphore;
/**
* The last snapshot of the aggregatedInsertMetrics that gets atomically replaced by a task that monitors for congestion,
* while all the task add to this using a lock.
*/
private InsertMetrics aggregatedInsertMetrics;
/**
* Whether or not all the documents have been inserted.
*/
private long documentsInsertedSoFar;
static enum State {
Running, Completed, Failure
}
private State state = State.Running;
/**
* The degree of concurrency (maximum number of tasks allowed to execute concurrently).
*/
private int degreeOfConcurrency ;
/**
* executor service for running tasks.
*/
private ListeningExecutorService executor;
public CongestionController(ListeningExecutorService executor, String partitionKeyRangeId, BatchInserter batchInserter) {
this(executor, partitionKeyRangeId, batchInserter, null);
}
public CongestionController(ListeningExecutorService executor, String partitionKeyRangeId, BatchInserter batchInserter, Integer startingDegreeOfConcurrency) {
this.partitionKeyRangeId = partitionKeyRangeId;
this.batchInserter = batchInserter;
// Starting the semaphore with 'StartingDegreeOfConcurrency' count and will release when no throttles are received
// and decrease when we get throttled.
this.degreeOfConcurrency = startingDegreeOfConcurrency != null ? startingDegreeOfConcurrency: STARTING_DEGREE_OF_CONCURRENCY;
this.throttleSemaphore = new Semaphore(this.degreeOfConcurrency);
this.aggregatedInsertMetrics = new InsertMetrics();
this.executor = executor;
}
private synchronized InsertMetrics atomicGetAndReplace(InsertMetrics metrics) {
InsertMetrics old = this.aggregatedInsertMetrics;
this.aggregatedInsertMetrics = metrics;
return old;
}
private Callable<Void> congestionControlTask() {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
while (isRunning())
{
try {
// TODO: FIXME I think semaphore may reach 0 here and if so that will create a deadlock
// verify and fix
logger.trace("partition key range id {} goes to sleep for {} seconds", partitionKeyRangeId, samplePeriod.getSeconds());
Thread.sleep(samplePeriod.getSeconds() * 1000);
logger.trace("{} wakes up", partitionKeyRangeId);
InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics());
if (insertMetricsSample.numberOfThrottles > 0) {
logger.trace("{} encountered {} throttling, reducing parallelism", partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
// We got a throttle so we need to back off on the degree of concurrency.
// Get the current degree of concurrency and decrease that (AIMD).
logger.trace("{} encountered {} throttling", partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
for (int i = 0; i < degreeOfConcurrency / MULTIPLICATIVE_DECREASE_FACTOR; i++) {
throttleSemaphore.acquire();
}
logger.trace("{} encountered {} throttling", partitionKeyRangeId, insertMetricsSample.numberOfThrottles);
degreeOfConcurrency /= MULTIPLICATIVE_DECREASE_FACTOR;
}
if (insertMetricsSample.numberOfDocumentsInserted == 0) {
// We haven't made any progress, since the last sampling
continue;
}
logger.trace("{} aggregating inserts metrics", partitionKeyRangeId);
if (insertMetricsSample.numberOfThrottles == 0) {
if (degreeOfConcurrency + ADDITIVE_INCREASE_FACTOR <= MAX_DEGREE_OF_CONCURRENCY) {
// We aren't getting throttles, so we should bump of the degree of concurrency (AIAD).
throttleSemaphore.release(ADDITIVE_INCREASE_FACTOR);
degreeOfConcurrency += ADDITIVE_INCREASE_FACTOR;
}
}
double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds();
documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted;
logger.info("Partition index {} : {} Inserted {} docs in {} seconds at {} RU/s with {} tasks. Faced {} throttles",
partitionKeyRangeId,
insertMetricsSample.numberOfThrottles,
degreeOfConcurrency,
ruPerSecond,
samplePeriod.getSeconds(),
insertMetricsSample.numberOfDocumentsInserted,
documentsInsertedSoFar
);
} catch (InterruptedException e) {
logger.warn("Interrupted", e);
break;
}
}
return null;
};
};
}
public ListenableFuture<Void> ExecuteAll() {
logger.debug("Executing batching in partition {}", partitionKeyRangeId);
Iterator<Callable<InsertMetrics>> batchExecutionIterator = batchInserter.miniBatchInsertExecutionCallableIterator();
List<ListenableFuture<InsertMetrics>> futureList = new ArrayList<>();
while(batchExecutionIterator.hasNext()) {
Callable<InsertMetrics> task = batchExecutionIterator.next();
// Main thread waits on the throttleSem so no more than MaxDegreeOfParallelism Tasks are run at a time.
try {
logger.trace("trying to accequire semaphore");
this.throttleSemaphore.acquire();
logger.trace("semaphore accequired");
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
ListenableFuture<InsertMetrics> insertMetricsFuture = executor.submit(task);
FutureCallback<InsertMetrics> aggregateMetricsReleaseSemaphoreCallback = new FutureCallback<InsertMetrics>() {
@Override
public void onSuccess(InsertMetrics result) {
synchronized (CongestionController.this) {
aggregatedInsertMetrics = InsertMetrics.sum(aggregatedInsertMetrics, result);
}
logger.trace("releasing semaphore");
throttleSemaphore.release();
}
@Override
public void onFailure(Throwable t) {
logger.trace("encountered failure {} releasing semaphore", t);
throttleSemaphore.release();
}
};
Futures.addCallback(insertMetricsFuture, aggregateMetricsReleaseSemaphoreCallback , MoreExecutors.directExecutor());
futureList.add(insertMetricsFuture);
}
ListenableFuture<List<InsertMetrics>> allFutureResults = Futures.allAsList(futureList);
FutureCallback<List<InsertMetrics>> completionCallback = new FutureCallback<List<InsertMetrics>>() {
@Override
public void onSuccess(List<InsertMetrics> result) {
logger.info("Completed");
getAndSet(State.Completed);
}
@Override
public void onFailure(Throwable t) {
logger.error("Encountered failure", t);
getAndSet(State.Failure);
}
};
Futures.addCallback(allFutureResults, completionCallback, MoreExecutors.directExecutor());
return executor.submit(congestionControlTask());
}
public synchronized State getAndSet(State state) {
State res = this.state;
this.state = state;
return res;
}
public synchronized boolean isRunning() {
return state == State.Running;
}
public synchronized boolean hasCompletedAsSuccess() {
return state == State.Completed;
}
public synchronized boolean hasCompletedAsFailure() {
return state == State.Failure;
}
public int getDegreeOfConcurrency() {
return this.degreeOfConcurrency;
}
}

Просмотреть файл

@ -0,0 +1,62 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
class DocumentAnalyzer {
/**
* Extracts effective {@link PartitionKeyInternal} from serialized document.
* @param documentAsString Serialized document to extract partition key value from.
* @param partitionKeyDefinition Information about partition key.
* @return PartitionKeyInternal
*/
public static PartitionKeyInternal extractPartitionKeyValue(String documentAsString,
PartitionKeyDefinition partitionKeyDefinition) {
if (partitionKeyDefinition == null || partitionKeyDefinition.getPaths().size() == 0) {
return PartitionKeyInternal.getEmpty();
}
return DocumentAnalyzer.extractPartitionKeyValueInternal(documentAsString, partitionKeyDefinition);
}
private static PartitionKeyInternal extractPartitionKeyValueInternal(String documentAsString, PartitionKeyDefinition partitionKeyDefinition) {
String partitionKeyPath = String.join("/", partitionKeyDefinition.getPaths());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode root;
try {
root = objectMapper.readTree(documentAsString);
}catch (Exception e) {
throw new RuntimeException(e);
}
String partitionKeyValueAsString = root.at(partitionKeyPath).asText();
return PartitionKeyInternal.fromObjectArray(ImmutableList.of(partitionKeyValueAsString), true);
}
}

Просмотреть файл

@ -0,0 +1,77 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.time.Duration;
import com.google.common.base.Preconditions;
public class InsertMetrics {
final long numberOfDocumentsInserted;
final Duration timeTaken;
final double requestUnitsConsumed;
final long numberOfThrottles;
/**
* Initializes a new instance of the InsertMetrics class (default constructor).
*/
public InsertMetrics()
{
this(0, Duration.ZERO, 0, 0);
}
/**
* Initializes a new instance of the InsertMetrics class (instance constructor).
* @param numberOfDocumentsInserted Number of documents inserted.
* @param timeTaken Amount of time taken to insert the documents.
* @param requestUnitsConsumed The request units consumed to insert the documents.
* @param numberOfThrottles The number of throttles encountered to insert the documents.
*/
public InsertMetrics(long numberOfDocumentsInserted, Duration timeTaken, double requestUnitsConsumed, long numberOfThrottles)
{
Preconditions.checkArgument(numberOfDocumentsInserted >= 0, "numberOfDocumentsInserted must be non negative");
Preconditions.checkArgument(requestUnitsConsumed >= 0, "requestUnitsConsumed must be non negative");
Preconditions.checkArgument(numberOfThrottles >= 0, "numberOfThrottles must be non negative");
this.numberOfDocumentsInserted = numberOfDocumentsInserted;
this.timeTaken = timeTaken;
this.requestUnitsConsumed = requestUnitsConsumed;
this.numberOfThrottles = numberOfThrottles;
}
/**
* Sums two {@link InsertMetrics} instances
* @param m1
* @param m2
* @return the sum aggregate result
*/
public static InsertMetrics sum(InsertMetrics m1, InsertMetrics m2) {
long totalDocsInserted = m1.numberOfDocumentsInserted + m2.numberOfDocumentsInserted;
Duration totalTimeTaken = m1.timeTaken.plus(m2.timeTaken);
double totalRequestUnitsConsumed = m1.requestUnitsConsumed + m2.requestUnitsConsumed;
long totalNumberOfThrottles = m1.numberOfThrottles + m2.numberOfThrottles;
return new InsertMetrics(totalDocsInserted, totalTimeTaken, totalRequestUnitsConsumed, totalNumberOfThrottles);
}
}

Просмотреть файл

@ -0,0 +1,71 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
public class Main {
public static final String MASTER_KEY = "[YOUR-MASTERKEY]";
public static final String HOST = "[YOUR-ENDPOINT]";
public static void main(String[] args) throws DocumentClientException, InterruptedException, ExecutionException {
DocumentClient client = new DocumentClient(HOST, MASTER_KEY, null, null);
String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycol");
// this assumes database and collection already exists
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
BulkImporter importer = new BulkImporter(client, collection);
List<String> docs = new ArrayList<String>();
for(int i = 0; i < 200000; i++) {
String id = UUID.randomUUID().toString();
String mypk = "Seattle";
String v = UUID.randomUUID().toString();
String doc = String.format("{" +
" \"dataField\": \"%s\"," +
" \"mypk\": \"%s\"," +
" \"id\": \"%s\"" +
"}", v, mypk, id);
docs.add(doc);
}
BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false);
System.out.println("Number of documents inserted: " + bulkImportResponse.getNumberOfDocumentsImported());
System.out.println("Import total time: " + bulkImportResponse.getTotalTimeTaken());
System.out.println("Total request unit consumed: " + bulkImportResponse.getTotalRequestUnitsConsumed());
client.close();
}
}

Просмотреть файл

@ -0,0 +1,16 @@
# this is the log4j configuration for tests
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# Set HTTP components' logger to INFO
log4j.category.org.apache.http=INFO
log4j.category.org.apache.http.wire=INFO
log4j.category.org.apache.http.headers=INFO
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n

Просмотреть файл

@ -0,0 +1,236 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.getBulkImportStoredProcedureResponse;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.getStoredProcedureResponse;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.getThrottleException;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.withRequestCharge;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Iterators;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
public class BatchInserterTests {
@Test
public void callbackCount() {
BulkImportStoredProcedureOptions options = null;
String bulkImportSproc = null;
DocumentClient client = Mockito.mock(DocumentClient.class);
List<List<String>> batchesToInsert = new ArrayList<>();
batchesToInsert.add(new ArrayList<>());
batchesToInsert.add(new ArrayList<>());
batchesToInsert.add(new ArrayList<>());
String partitionIndex = "0";
BatchInserter bi = new BatchInserter(partitionIndex, batchesToInsert, client, bulkImportSproc, options);
Iterator<Callable<InsertMetrics>> callbackIterator = bi.miniBatchInsertExecutionCallableIterator();
List<Callable<InsertMetrics>> list = new ArrayList<>();
Iterators.addAll(list, callbackIterator);
assertThat(list.size(), equalTo(3));
}
@Test
public void simple() throws Exception {
BulkImportStoredProcedureOptions options = null;
String bulkImportSproc = null;
DocumentClient client = Mockito.mock(DocumentClient.class);
List<List<String>> batchesToInsert = new ArrayList<>();
batchesToInsert.add(new ArrayList<>());
int numberOfDocuments = 10;
for (int i = 0; i < numberOfDocuments; i++) {
batchesToInsert.get(0).add("{}");
}
String partitionIndex = "0";
BatchInserter bi = new BatchInserter(partitionIndex, batchesToInsert, client, bulkImportSproc, options);
Iterator<Callable<InsertMetrics>> callbackIterator = bi.miniBatchInsertExecutionCallableIterator();
List<Callable<InsertMetrics>> list = new ArrayList<>();
Iterators.addAll(list, callbackIterator);
assertThat(list.size(), equalTo(1));
Callable<InsertMetrics> callable = list.get(0);
Map<String, String> headers = withRequestCharge(null, 5.5);
StoredProcedureResponse bulkImportResponse = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(numberOfDocuments, 0), headers);
when(client.executeStoredProcedure(Mockito.any(String.class), Mockito.any(RequestOptions.class),
Mockito.any(Object[].class))).thenReturn(bulkImportResponse);
InsertMetrics metrics = callable.call();
verify(client, Mockito.times(1)).executeStoredProcedure(Mockito.anyString(), Mockito.any(RequestOptions.class), Mockito.any(Object[].class));
// verify all documents inserted
assertThat(metrics.numberOfDocumentsInserted, equalTo(10l));
// verify no throttle
assertThat(metrics.numberOfThrottles, equalTo(0l));
// verify request charge
assertThat(metrics.requestUnitsConsumed, equalTo(5.5));
// verify non zero total time
assertThat(metrics.timeTaken.getNano() > 0, equalTo(true));
assertThat(bi.getNumberOfDocumentsImported(), equalTo((int) metrics.numberOfDocumentsInserted));
assertThat(bi.getTotalRequestUnitsConsumed(), equalTo(metrics.requestUnitsConsumed));
}
@Test
public void partialProgress() throws Exception {
BulkImportStoredProcedureOptions options = null;
String bulkImportSproc = null;
DocumentClient client = Mockito.mock(DocumentClient.class);
List<List<String>> batchesToInsert = new ArrayList<>();
batchesToInsert.add(new ArrayList<>());
int numberOfDocuments = 10;
for (int i = 0; i < numberOfDocuments; i++) {
batchesToInsert.get(0).add("{}");
}
String partitionIndex = "0";
BatchInserter bi = new BatchInserter(partitionIndex, batchesToInsert, client, bulkImportSproc, options);
Iterator<Callable<InsertMetrics>> callbackIterator = bi.miniBatchInsertExecutionCallableIterator();
List<Callable<InsertMetrics>> list = new ArrayList<>();
Iterators.addAll(list, callbackIterator);
assertThat(list.size(), equalTo(1));
Callable<InsertMetrics> callable = list.get(0);
// 3 documents progress
StoredProcedureResponse bulkImportResponse1 = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(3, 1), withRequestCharge(null, 1.5));
// 1 document progress
StoredProcedureResponse bulkImportResponse2 = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(1, 1), withRequestCharge(null, 2.5));
// 0 document progress
StoredProcedureResponse bulkImportResponse3 = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(0, 1), withRequestCharge(null, 3.5));
// 6 document progress
StoredProcedureResponse bulkImportResponse4 = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(6, 1), withRequestCharge(null, 4.5));
when(client.executeStoredProcedure(Mockito.any(String.class), Mockito.any(RequestOptions.class),
Mockito.any(Object[].class)))
.thenReturn(bulkImportResponse1)
.thenReturn(bulkImportResponse2)
.thenReturn(bulkImportResponse3)
.thenReturn(bulkImportResponse4);
InsertMetrics metrics = callable.call();
verify(client, Mockito.times(4)).executeStoredProcedure(Mockito.anyString(), Mockito.any(RequestOptions.class), Mockito.any(Object[].class));
// verify all documents inserted
assertThat(metrics.numberOfDocumentsInserted, equalTo(10l));
// verify no throttle
assertThat(metrics.numberOfThrottles, equalTo(0l));
// verify request charge
assertThat(metrics.requestUnitsConsumed, equalTo(1.5+ 2.5 + 3.5 + 4.5));
// verify non zero total time
assertThat(metrics.timeTaken.getNano() > 0, equalTo(true));
assertThat(bi.getNumberOfDocumentsImported(), equalTo((int) metrics.numberOfDocumentsInserted));
assertThat(bi.getTotalRequestUnitsConsumed(), equalTo(metrics.requestUnitsConsumed));
}
@Test
public void throttle() throws Exception {
BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, "fakeCollectionId", true);
String bulkImportSproc = null;
DocumentClient client = Mockito.mock(DocumentClient.class);
List<List<String>> batchesToInsert = new ArrayList<>();
batchesToInsert.add(new ArrayList<>());
int numberOfDocuments = 10;
for (int i = 0; i < numberOfDocuments; i++) {
batchesToInsert.get(0).add("{}");
}
String partitionIndex = "0";
BatchInserter bi = new BatchInserter(partitionIndex, batchesToInsert, client, bulkImportSproc, options);
Iterator<Callable<InsertMetrics>> callbackIterator = bi.miniBatchInsertExecutionCallableIterator();
List<Callable<InsertMetrics>> list = new ArrayList<>();
Iterators.addAll(list, callbackIterator);
assertThat(list.size(), equalTo(1));
Callable<InsertMetrics> callable = list.get(0);
Map<String, String> headers = withRequestCharge(null, 5.5);
StoredProcedureResponse bulkImportResponse = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(numberOfDocuments, 0), headers);
DocumentClientException throttleException = getThrottleException();
when(client.executeStoredProcedure(Mockito.any(String.class), Mockito.any(RequestOptions.class),
Mockito.any(Object[].class))).thenThrow(throttleException).thenReturn(bulkImportResponse);
InsertMetrics metrics = callable.call();
// verify that execute stored proc is invoked twice (once with throttle and second time without throttle)
verify(client, Mockito.times(2)).executeStoredProcedure(Mockito.anyString(), Mockito.any(RequestOptions.class), Mockito.any(Object[].class));
// verify all documents inserted
assertThat(metrics.numberOfDocumentsInserted, equalTo((long) numberOfDocuments));
// verify one throttle
assertThat(metrics.numberOfThrottles, equalTo(1l));
// verify request charge
assertThat(metrics.requestUnitsConsumed, equalTo(5.5));
// verify non zero total time
assertThat(metrics.timeTaken.getNano() > 0, equalTo(true));
assertThat(bi.getNumberOfDocumentsImported(), equalTo((int) metrics.numberOfDocumentsInserted));
assertThat(bi.getTotalRequestUnitsConsumed(), equalTo(metrics.requestUnitsConsumed));
}
}

Просмотреть файл

@ -0,0 +1,51 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class BulkImportStoredProcedureOptionsTests {
@Test
public void serialization() throws IOException {
BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, "myCollectionId", true, false);
ObjectMapper mapper = new ObjectMapper();
String serializedValue = mapper.writeValueAsString(options);
JsonNode root = mapper.readTree(serializedValue);
assertThat(root.at("/disableAutomaticIdGeneration").asBoolean(), equalTo(true));
assertThat(root.at("/softStopOnConflict").asBoolean(), equalTo(true));
assertThat(root.at("/systemCollectionId").asText(), equalTo("myCollectionId"));
assertThat(root.at("/enableUpsert").asBoolean(), equalTo(false));
}
}

Просмотреть файл

@ -0,0 +1,204 @@
/**
* The MIT License (MIT)
* Copyright (c) 2016 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.getBulkImportStoredProcedureResponse;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.getStoredProcedureResponse;
import static com.microsoft.azure.documentdb.bulkimport.TestUtils.withRequestCharge;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
public class CongestionControllerTests {
private ListeningExecutorService listeningExecutorService;
private static final int TIMEOUT = 5000;
@Before
public void setUp() {
listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
}
@After
public void shutDown() {
listeningExecutorService.shutdown();
}
@Test(timeout = TIMEOUT)
public void simple() throws Exception {
BulkImportStoredProcedureOptions options = null;
String bulkImportSproc = null;
DocumentClient client = Mockito.mock(DocumentClient.class);
List<List<String>> batchesToInsert = new ArrayList<>();
batchesToInsert.add(new ArrayList<>());
int numberOfDocuments = 10;
for (int i = 0; i < numberOfDocuments; i++) {
batchesToInsert.get(0).add("{}");
}
String paritionKeyRangeId = "0";
BatchInserter bi = new BatchInserter(paritionKeyRangeId, batchesToInsert, client, bulkImportSproc, options);
Iterator<Callable<InsertMetrics>> callbackIterator = bi.miniBatchInsertExecutionCallableIterator();
List<Callable<InsertMetrics>> list = new ArrayList<>();
Iterators.addAll(list, callbackIterator);
assertThat(list.size(), equalTo(1));
Map<String, String> headers = withRequestCharge(null, 5.5);
StoredProcedureResponse bulkImportResponse = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(numberOfDocuments, 0), headers);
when(client.executeStoredProcedure(Mockito.any(String.class), Mockito.any(RequestOptions.class),
Mockito.any(Object[].class))).thenReturn(bulkImportResponse);
CongestionController cc = new CongestionController(listeningExecutorService, paritionKeyRangeId, bi);
ListenableFuture<Void> listenableFuture = cc.ExecuteAll();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean(false);
Runnable listener = new Runnable() {
@Override
public void run() {
try {
assertThat(cc.hasCompletedAsSuccess(), equalTo(true));
assertThat(cc.hasCompletedAsFailure(), equalTo(false));
assertThat(cc.isRunning(), equalTo(false));
assertThat(cc.isRunning(), equalTo(false));
assertThat(bi.numberOfDocumentsImported.get(), equalTo(numberOfDocuments));
success.set(true);
} finally {
latch.countDown();
}
}
};
listenableFuture.addListener(listener, listeningExecutorService);
latch.await();
assertThat(success.get(), equalTo(true));
}
@Test(timeout = TIMEOUT)
public void multipleRounds() throws Exception {
BulkImportStoredProcedureOptions options = null;
String bulkImportSproc = null;
DocumentClient client = Mockito.mock(DocumentClient.class);
List<List<String>> batchesToInsert = new ArrayList<>();
batchesToInsert.add(new ArrayList<>());
int numberOfDocumentsInFirstBatch = 10;
for (int i = 0; i < numberOfDocumentsInFirstBatch; i++) {
batchesToInsert.get(0).add("{}");
}
int numberOfDocumentsInSecondBatch = 10;
batchesToInsert.add(new ArrayList<>());
for (int i = 0; i < numberOfDocumentsInSecondBatch; i++) {
batchesToInsert.get(1).add("{}");
}
String paritionKeyRangeId = "0";
BatchInserter bi = new BatchInserter(paritionKeyRangeId, batchesToInsert, client, bulkImportSproc, options);
Iterator<Callable<InsertMetrics>> callbackIterator = bi.miniBatchInsertExecutionCallableIterator();
List<Callable<InsertMetrics>> list = new ArrayList<>();
Iterators.addAll(list, callbackIterator);
assertThat(list.size(), equalTo(2));
StoredProcedureResponse bulkImportResponse1 = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(numberOfDocumentsInFirstBatch, 0), withRequestCharge(null, 1.5));
StoredProcedureResponse bulkImportResponse2 = getStoredProcedureResponse(getBulkImportStoredProcedureResponse(numberOfDocumentsInSecondBatch, 0), withRequestCharge(null, 4.1));
when(client.executeStoredProcedure(Mockito.any(String.class), Mockito.any(RequestOptions.class),
Mockito.any(Object[].class))).thenReturn(bulkImportResponse1).thenReturn(bulkImportResponse2);
CongestionController cc = new CongestionController(listeningExecutorService, paritionKeyRangeId, bi);
ListenableFuture<Void> listenableFuture = cc.ExecuteAll();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean(false);
Runnable listener = new Runnable() {
@Override
public void run() {
try {
assertThat(cc.hasCompletedAsSuccess(), equalTo(true));
assertThat(cc.hasCompletedAsFailure(), equalTo(false));
assertThat(cc.isRunning(), equalTo(false));
assertThat(cc.isRunning(), equalTo(false));
assertThat(bi.numberOfDocumentsImported.get(), equalTo(numberOfDocumentsInFirstBatch + numberOfDocumentsInSecondBatch));
assertThat(bi.totalRequestUnitsConsumed.get(), equalTo(1.5 + 4.1));
success.set(true);
} finally {
latch.countDown();
}
}
};
listenableFuture.addListener(listener, listeningExecutorService);
latch.await();
assertThat(success.get(), equalTo(true));
}
}

Просмотреть файл

@ -0,0 +1,126 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
public class DocumentAnalyzerTests {
@Test
public void simple() throws JsonProcessingException {
PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
Collection<String> paths = new ArrayList<>();
paths.add("/city");
partitionKeyDefinition.setPaths(paths);
@SuppressWarnings("unused")
class Pojo {
public String city;
public String state;
public int population;
}
Pojo data = new Pojo();
data.city = "Seattle";
data.state = "WA";
data.population = 700000;
ObjectMapper mapper = new ObjectMapper();
String dataAsString = mapper.writeValueAsString(data);
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(dataAsString, partitionKeyDefinition);
assertThat(partitionKeyValue.toJson(), equalTo(mapper.writeValueAsString(ImmutableList.of("Seattle"))));
}
@Test
public void compositePath() throws JsonProcessingException {
PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
Collection<String> paths = new ArrayList<>();
paths.add("/city");
paths.add("name");
partitionKeyDefinition.setPaths(paths);
@SuppressWarnings("unused")
class City {
public String name;
public String zip;
}
@SuppressWarnings("unused")
class Pojo {
public City city;
public String state;
public int population;
}
Pojo data = new Pojo();
data.city = new City();
data.city.name = "Seattle";
data.state = "WA";
data.population = 700000;
ObjectMapper mapper = new ObjectMapper();
String dataAsString = mapper.writeValueAsString(data);
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(dataAsString, partitionKeyDefinition);
assertThat(partitionKeyValue.toJson(), equalTo(mapper.writeValueAsString(ImmutableList.of("Seattle"))));
}
@Test
public void missingPartitionKeyValue() throws JsonProcessingException {
PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
Collection<String> paths = new ArrayList<>();
paths.add("/city");
partitionKeyDefinition.setPaths(paths);
@SuppressWarnings("unused")
class Pojo {
public String state;
public int population;
}
Pojo data = new Pojo();
data.state = "WA";
data.population = 700000;
ObjectMapper mapper = new ObjectMapper();
String dataAsString = mapper.writeValueAsString(data);
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(dataAsString, partitionKeyDefinition);
assertThat(partitionKeyValue.toJson(), equalTo(mapper.writeValueAsString(ImmutableList.of(""))));
}
}

Просмотреть файл

@ -0,0 +1,49 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import java.time.Duration;
import org.junit.Test;
public class InsertMetricsTests {
@Test
public void sum() {
InsertMetrics metrics1 = new InsertMetrics();
InsertMetrics metrics2 = new InsertMetrics(2, Duration.ofSeconds(3), 1.5, 4);
InsertMetrics metrics3 = new InsertMetrics(20, Duration.ofSeconds(30), 15.0, 40);
InsertMetrics sum = InsertMetrics.sum(metrics1, metrics2);
sum = InsertMetrics.sum(sum, metrics3);
assertThat(sum.numberOfDocumentsInserted, equalTo(22l));
assertThat(sum.numberOfThrottles, equalTo(44L));
assertThat(sum.requestUnitsConsumed, equalTo(16.5));
assertThat(sum.timeTaken, equalTo(Duration.ofSeconds(33)));
}
}

Просмотреть файл

@ -0,0 +1,37 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
public final class TestConfigurations {
// Replace MASTER_KEY and HOST with values from your DocumentDB account.
// The default values are credentials of the local emulator, which are not used in any production environment.
// <!--[SuppressMessage("Microsoft.Security", "CS002:SecretInNextLine")]-->
//public static final String MASTER_KEY =
//"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
//public static final String HOST = "https://localhost:443/";
public static String MASTER_KEY = System.getProperty("ACCOUNT_KEY");
public static String HOST = System.getProperty("ACCOUNT_HOST");
static {
System.out.println("host is " + System.getProperty("ACCOUNT_HOST"));
}
}

Просмотреть файл

@ -0,0 +1,85 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import static org.mockito.Mockito.when;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.mockito.Mockito;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
import com.microsoft.azure.documentdb.internal.DocumentServiceResponse;
import com.microsoft.azure.documentdb.internal.HttpConstants;
public class TestUtils {
private static final ObjectMapper MAPPER = new ObjectMapper();
public static DocumentClientException getThrottleException() {
DocumentClientException e = Mockito.mock(DocumentClientException.class);
when(e.getStatusCode()).thenReturn(429);
when(e.getRetryAfterInMilliseconds()).thenReturn(1l);
return e;
}
public static Map<String, String> withRequestCharge(Map<String, String> headerResponse, double requestCharge) {
if (headerResponse == null) {
headerResponse = new HashMap<>();
}
headerResponse.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge));
return headerResponse;
}
public static BulkImportStoredProcedureResponse getBulkImportStoredProcedureResponse(int count, int error) {
BulkImportStoredProcedureResponse bispr = new BulkImportStoredProcedureResponse();
bispr.count = count;
bispr.errorCode = error;
return bispr;
}
public static StoredProcedureResponse getStoredProcedureResponse(
BulkImportStoredProcedureResponse bulkImportResponse, Map<String, String> headerResponse)
throws JsonProcessingException, InstantiationException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException, SecurityException {
DocumentServiceResponse documentServiceResponse = Mockito.mock(DocumentServiceResponse.class);
when(documentServiceResponse.getReponseBodyAsString()).thenReturn(MAPPER.writeValueAsString(bulkImportResponse));
when(documentServiceResponse.getResponseHeaders()).thenReturn(headerResponse);
Constructor<StoredProcedureResponse> constructor = StoredProcedureResponse.class
.getDeclaredConstructor(DocumentServiceResponse.class);
constructor.setAccessible(true);
StoredProcedureResponse storedProcedureResponse = constructor.newInstance(documentServiceResponse);
return storedProcedureResponse;
}
}