fixed tests, minor refactoring, added retries for throttling in initialization

This commit is contained in:
Mohammad Derakhshani 2017-10-15 22:07:19 -07:00
Родитель 16e95e9527
Коммит 80843dc8a3
7 изменённых файлов: 185 добавлений и 88 удалений

Просмотреть файл

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>documentdb-bulkimport</artifactId>
<version>0.0.1-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<description>Bulk Importer for Microsoft Azure DocumentDB</description>
<url>http://azure.microsoft.com/en-us/services/documentdb/</url>

Просмотреть файл

@ -22,6 +22,11 @@
*/
package com.microsoft.azure.documentdb.bulkimport;
import static com.microsoft.azure.documentdb.bulkimport.ExceptionUtils.isGone;
import static com.microsoft.azure.documentdb.bulkimport.ExceptionUtils.isSplit;
import static com.microsoft.azure.documentdb.bulkimport.ExceptionUtils.isThrottled;
import static com.microsoft.azure.documentdb.bulkimport.ExceptionUtils.isTimedOut;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
@ -43,7 +48,6 @@ import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
import com.microsoft.azure.documentdb.internal.HttpConstants;
class BatchInserter {
@ -179,6 +183,10 @@ class BatchInserter {
if (bulkImportResponse != null) {
if (bulkImportResponse.errorCode != 0) {
logger.warn("pki {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode);
if (bulkImportResponse.count == 0) {
throw new RuntimeException(
String.format("Stored proc returned failure %s", bulkImportResponse.errorCode));
}
}
double requestCharge = response.getRequestCharge();
@ -259,22 +267,6 @@ class BatchInserter {
return stream.iterator();
}
private boolean isThrottled(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS;
}
private boolean isTimedOut(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.TIMEOUT;
}
private boolean isGone(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.GONE;
}
private boolean isSplit(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.GONE
&& HttpConstants.SubStatusCodes.SPLITTING == e.getSubStatusCode();
}
private BulkImportStoredProcedureResponse parseFrom(StoredProcedureResponse storedProcResponse) throws JsonParseException, JsonMappingException, IOException {
String res = storedProcResponse.getResponseAsString();

Просмотреть файл

@ -22,6 +22,8 @@
*/
package com.microsoft.azure.documentdb.bulkimport;
import static com.microsoft.azure.documentdb.bulkimport.ExceptionUtils.extractDeepestDocumentClientException;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
@ -59,6 +61,7 @@ import com.microsoft.azure.documentdb.FeedResponse;
import com.microsoft.azure.documentdb.Offer;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.internal.HttpConstants;
import com.microsoft.azure.documentdb.internal.query.funcs.Func2;
import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
@ -162,7 +165,21 @@ public class BulkImporter implements AutoCloseable {
@Override
public Void call() throws Exception {
initialize();
int count = 0;
while(true) {
try {
initialize();
} catch (Exception e) {
count++;
DocumentClientException dce = extractDeepestDocumentClientException(e);
if (count < 3 && dce != null && dce.getStatusCode() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS ) {
Thread.sleep(count * dce.getRetryAfterInMilliseconds());
continue;
}
throw e;
}
break;
}
return null;
}
});
@ -234,30 +251,23 @@ public class BulkImporter implements AutoCloseable {
* <code>
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, null, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "perf-col");
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycol");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* BulkImporter importer = new BulkImporter(client, collection);
*
* List<String> docs = new ArrayList<String>();
* for(int i = 0; i < 200000; i++) {
* String id = UUID.randomUUID().toString();
* String mypk = "Seattle";
* String v = UUID.randomUUID().toString();
* String doc = String.format("{" +
* " \"dataField\": \"%s\"," +
* " \"mypk\": \"%s\"," +
* " \"id\": \"%s\"" +
* "}", v, mypk, id);
* for(int i = 0; i < 10; i++) {
* List<String> documents = documentSource.getMoreDocuments();
*
* docs.add(doc);
* BulkImportResponse bulkImportResponse = importer.bulkImport(documents, false);
*
* //validate that all documents inserted to ensure no failure.
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
* }
*
* BulkImportResponse bulkImportResponse = importer.bulkImport(docs, false);
*
* client.close();
* </code>
* @param documentIterator to insert
* @param documents to insert
* @param enableUpsert whether enable upsert (overwrite if it exists)
* @return an instance of {@link BulkImportResponse}
* @throws DocumentClientException if any failure happens
@ -280,7 +290,34 @@ public class BulkImporter implements AutoCloseable {
return executeBulkImportInternal(documents, bucketingFunction, enableUpsert);
}
public BulkImportResponse bulkImportWithPreprocessedPartitionKey(Collection<Tuple> input, boolean enableUpsert) throws DocumentClientException {
/**
* Executes a bulk import in the Azure Cosmos DB database service.
*
* <code>
* DocumentClient client = new DocumentClient(HOST, MASTER_KEY, null, null);
*
* String collectionLink = String.format("/dbs/%s/colls/%s", "mydb", "mycold");
* DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
*
* BulkImporter importer = new BulkImporter(client, collection);
*
* for(int i = 0; i < 10; i++) {
* List<Tuple> tuples = documentSource.getMoreDocumentsPartitionKeyValueTuples();
*
* BulkImportResponse bulkImportResponse = importer.bulkImportWithPreprocessedPartitionKey(tuples, false);
*
* // validate that all documents inserted to ensure no failure.
* // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
* }
*
* client.close();
* </code>
* @param documentPartitionKeyValueTuples list of {@link Tuple}
* @param enableUpsert whether enable upsert (overwrite if it exists)
* @return an instance of {@link BulkImportResponse}
* @throws DocumentClientException if any failure happens
*/
public BulkImportResponse bulkImportWithPreprocessedPartitionKey(Collection<Tuple> documentPartitionKeyValueTuples, boolean enableUpsert) throws DocumentClientException {
Func2<Collection<Tuple>, ConcurrentHashMap<String, Set<String>>, Void> bucketingFunction =
new Func2<Collection<Tuple>, ConcurrentHashMap<String,Set<String>>, Void>() {
@ -297,7 +334,7 @@ public class BulkImporter implements AutoCloseable {
return null;
};
};
return executeBulkImportInternal(input, bucketingFunction, enableUpsert);
return executeBulkImportInternal(documentPartitionKeyValueTuples, bucketingFunction, enableUpsert);
}
private <T> BulkImportResponse executeBulkImportInternal(Collection<T> input,
@ -347,8 +384,7 @@ public class BulkImporter implements AutoCloseable {
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>(estimateMiniBatchesToImportByPartitionSize));
}
// Sort documents into partition buckets.
logger.debug("Sorting documents into partition buckets");
logger.debug("Bucketing documents ...");
bucketByPartitionFunc.apply(input, documentsToImportByPartition);

Просмотреть файл

@ -0,0 +1,61 @@
/**
* The MIT License (MIT)
* Copyright (c) 2017 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.documentdb.bulkimport;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.internal.HttpConstants;
class ExceptionUtils {
public static boolean isThrottled(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS;
}
public static boolean isTimedOut(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.TIMEOUT;
}
public static boolean isGone(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.GONE;
}
public static boolean isSplit(DocumentClientException e) {
return e.getStatusCode() == HttpConstants.StatusCodes.GONE
&& HttpConstants.SubStatusCodes.SPLITTING == e.getSubStatusCode();
}
public static DocumentClientException extractDeepestDocumentClientException(Exception e) {
DocumentClientException dce = null;
while(e != null) {
if (e instanceof DocumentClientException) {
dce = (DocumentClientException) e;
}
if (e.getCause() instanceof Exception) {
e = (Exception) e.getCause();
} else {
break;
}
}
return dce;
}
}

Просмотреть файл

@ -28,6 +28,7 @@ import java.util.UUID;
import com.google.common.base.Preconditions;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.Undefined;
public class DocumentDataSource {
@ -46,7 +47,10 @@ public class DocumentDataSource {
Document d = new Document();
d.setId(UUID.randomUUID().toString());
d.set(partitionKeyName, partitionKeyValue);
if (!Undefined.Value().equals(partitionKeyValue)) {
d.set(partitionKeyName, partitionKeyValue);
}
for(int i = 0; i < 3; i++) {
d.set(UUID.randomUUID().toString(), UUID.randomUUID().toString());

Просмотреть файл

@ -23,13 +23,11 @@
package com.microsoft.azure.documentdb.bulkimport;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.junit.After;
@ -41,9 +39,6 @@ import org.junit.runners.Parameterized;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.PartitionKey;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.Undefined;
@RunWith(Parameterized.class)
@ -83,6 +78,29 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
validateSuccess(deserialize(documents), response);
}
}
@Test
public void bulkImportAlreadyExists() throws Exception {
try (BulkImporter importer = new BulkImporter(client, this.pCollection)) {
List<String> documents = new ArrayList<>();
Object [] partitionKeyValues = new Object[] { "abc", null, "", Undefined.Value(), 123, 0, -10, 9,223,372,036,854,775,000, 0.5, true, false };
for(Object partitionKeyValue: partitionKeyValues) {
documents.add(DocumentDataSource.randomDocument(partitionKeyValue, pCollection.getPartitionKey()));
}
BulkImportResponse response = importer.bulkImport(documents, false);
validateSuccess(deserialize(documents), response);
response = importer.bulkImport(documents, false);
assertThat(response.getNumberOfDocumentsImported(), equalTo(0));
response = importer.bulkImport(documents, true);
assertThat(response.getNumberOfDocumentsImported(), equalTo(documents.size()));
}
}
@Test
public void bulkImportWithPreknownPartitionKeyValues() throws Exception {
@ -90,7 +108,25 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
List<Tuple> tuples = new ArrayList<>();
Object [] partitionKeyValues = new Object[] { "abc", "", null, Undefined.Value(), 123, 0, -10, 9,223,372,036,854,775,000, 0.5, true, false };
Object [] partitionKeyValues = new Object[] { "abc", null, "", Undefined.Value(), 123, 0, -10, 9,223,372,036,854,775,000, 0.5, true, false };
for(Object partitionKeyValue: partitionKeyValues) {
String d = DocumentDataSource.randomDocument(partitionKeyValue, pCollection.getPartitionKey());
Tuple t = new Tuple(d, partitionKeyValue);
tuples.add(t);
}
BulkImportResponse response = importer.bulkImportWithPreprocessedPartitionKey(tuples, false);
validateSuccess(deserialize(tuples.stream().map(t -> t.document).collect(Collectors.toList())), response);
}
}
public void bulkImportWithMissingParitionKeyField() throws Exception {
try (BulkImporter importer = new BulkImporter(client, this.pCollection)) {
List<Tuple> tuples = new ArrayList<>();
Object [] partitionKeyValues = new Object[] { "abc", "", null, 123, 0, -10, 9,223,372,036,854,775,000, 0.5, true, false };
for(Object partitionKeyValue: partitionKeyValues) {
String d = DocumentDataSource.randomDocument(partitionKeyValue, pCollection.getPartitionKey());
@ -107,42 +143,8 @@ public class EndToEndBulkImportTests extends EndToEndTestBase {
return documents.stream().map(d -> new Document(d)).collect(Collectors.toList());
}
private Object getPartitionKeyValue(Document d) {
PartitionKeyDefinition partitionKeyDef = pCollection.getPartitionKey();
String partitionKeyName = partitionKeyDef.getPaths().iterator().next().replaceFirst("^/", "");
return d.get(partitionKeyName);
}
private boolean contains(Document d1, Document d2) {
return !d1.getHashMap().keySet().stream()
.filter(fieldName -> !SYSTEM_FIELD_NAMES.contains(fieldName))
.filter(fieldName -> !Objects.equals(d1.get(fieldName), d2.get(fieldName)) )
.findAny()
.isPresent();
}
private void areEqual(Document d1, Document d2) {
assertThat(contains(d1, d2), is(true));
assertThat(contains(d2, d1), is(true));
}
private void validateSuccess(Collection<Document> documents, BulkImportResponse response) throws DocumentClientException, InterruptedException {
assertThat(response.getNumberOfDocumentsImported(), equalTo(documents.size()));
Thread.sleep(5000);
for(Document d : documents) {
String documentLink = COLLECTION_LINK + "/docs/" + d.getId();
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(getPartitionKeyValue(d)));
try {
Document actualDocument = client.readDocument(documentLink, options).getResource();
System.out.println("I read : " + actualDocument.toJson());
areEqual(actualDocument, d);
} catch (Exception e) {
logger.error("failure", e);
}
}
}
}

Просмотреть файл

@ -51,13 +51,11 @@ public class EndToEndTestBase {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private static String DATABASE_ID = "bulkImportTestDatabase";
private static String COLLECTION_ID_PARTITIONED = UUID.randomUUID().toString();
protected static String COLLECTION_LINK = "dbs/" + DATABASE_ID + "/colls/" + COLLECTION_ID_PARTITIONED;
protected static int TOTAL_NUMBER_OF_DOCUMENTS = 500;
private String databaseId = "bulkImportTestDatabase";
private String collectionId;
protected String collectionLink;
protected DocumentCollection pCollection;
protected static Set<String> SYSTEM_FIELD_NAMES;
static {
@ -67,6 +65,9 @@ public class EndToEndTestBase {
public EndToEndTestBase() {
HttpClientFactory.DISABLE_HOST_NAME_VERIFICATION = true;
collectionId = UUID.randomUUID().toString();
collectionLink = "dbs/" + databaseId + "/colls/" + collectionId;
}
@Parameters
@ -93,18 +94,19 @@ public class EndToEndTestBase {
public void cleanUpDatabase(DocumentClient client) throws DocumentClientException {
try {
if (client != null) {
client.deleteDatabase("/dbs/" + DATABASE_ID, null);
client.deleteDatabase("/dbs/" + databaseId, null);
}
} catch (Exception e) {
}
}
public void setup(DocumentClient client) throws DocumentClientException {
public void setup(DocumentClient client) throws Exception {
logger.debug("setting up ...");
cleanUpDatabase(client);
Thread.sleep(1000);
Database d = new Database();
d.setId(DATABASE_ID);
d.setId(databaseId);
Database database = client.createDatabase(d, null).getResource();
@ -114,7 +116,7 @@ public class EndToEndTestBase {
partitionKeyDef.setPaths(paths);
pCollection = new DocumentCollection();
pCollection.setId(COLLECTION_ID_PARTITIONED);
pCollection.setId(collectionId);
pCollection.setPartitionKey(partitionKeyDef);
RequestOptions options = new RequestOptions();