* Added consistent and better exception handling,
Added reactive repository integration test.
Fixed few API bugs

* Updated innerException to CosmosClientException
This commit is contained in:
Kushagra Thapar 2019-12-30 19:08:37 -08:00 коммит произвёл GitHub
Родитель 6ea8892b5c
Коммит 8c240abc91
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 567 добавлений и 284 удалений

Просмотреть файл

@ -9,6 +9,7 @@ import com.azure.data.cosmos.CosmosResponse;
import com.azure.data.cosmos.CosmosResponseDiagnostics;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.FeedResponseDiagnostics;
import com.azure.data.cosmos.Resource;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.spring.data.cosmosdb.core.ResponseDiagnostics;
import com.microsoft.azure.spring.data.cosmosdb.core.ResponseDiagnosticsProcessor;
@ -34,8 +35,9 @@ public class CosmosdbUtils {
}
}
public static void fillAndProcessResponseDiagnostics(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
CosmosResponse cosmosResponse, FeedResponse feedResponse) {
public static <T extends Resource> void fillAndProcessResponseDiagnostics(
ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
CosmosResponse<T> cosmosResponse, FeedResponse<T> feedResponse) {
if (responseDiagnosticsProcessor == null) {
return;
}

Просмотреть файл

@ -9,7 +9,6 @@ package com.microsoft.azure.spring.data.cosmosdb.core;
import com.azure.data.cosmos.AccessCondition;
import com.azure.data.cosmos.AccessConditionType;
import com.azure.data.cosmos.CosmosClient;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.CosmosContainerProperties;
import com.azure.data.cosmos.CosmosContainerResponse;
import com.azure.data.cosmos.CosmosItemProperties;
@ -19,7 +18,6 @@ import com.azure.data.cosmos.FeedOptions;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.PartitionKey;
import com.azure.data.cosmos.SqlQuerySpec;
import com.azure.data.cosmos.internal.HttpConstants;
import com.microsoft.azure.spring.data.cosmosdb.CosmosDbFactory;
import com.microsoft.azure.spring.data.cosmosdb.common.Memoizer;
import com.microsoft.azure.spring.data.cosmosdb.core.convert.MappingCosmosConverter;
@ -30,7 +28,6 @@ import com.microsoft.azure.spring.data.cosmosdb.core.query.CosmosPageRequest;
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
import com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBAccessException;
import com.microsoft.azure.spring.data.cosmosdb.repository.support.CosmosEntityInformation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@ -52,6 +49,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static com.microsoft.azure.spring.data.cosmosdb.common.CosmosdbUtils.fillAndProcessResponseDiagnostics;
import static com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBExceptionUtils.exceptionHandler;
import static com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBExceptionUtils.findAPIExceptionHandler;
@Slf4j
public class CosmosTemplate implements CosmosOperations, ApplicationContextAware {
@ -96,32 +95,26 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
final CosmosItemProperties originalItem = mappingCosmosConverter.writeCosmosItemProperties(objectToSave);
log.debug("execute createDocument in database {} collection {}", this.databaseName, collectionName);
log.debug("execute createItem in database {} collection {}", this.databaseName, collectionName);
try {
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
@SuppressWarnings("unchecked")
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
@SuppressWarnings("unchecked")
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
final CosmosItemResponse response = cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.createItem(originalItem, options)
.doOnNext(cosmosItemResponse -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null))
.onErrorResume(Mono::error)
.block();
final CosmosItemResponse response = cosmosClient
.getDatabase(this.databaseName)
.getContainer(collectionName)
.createItem(originalItem, options)
.doOnNext(cosmosItemResponse -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null))
.onErrorResume(throwable ->
exceptionHandler("Failed to insert item", throwable))
.block();
if (response == null) {
throw new CosmosDBAccessException("Failed to insert item");
}
return mappingCosmosConverter.read(domainClass, response.properties());
} catch (Exception e) {
throw new CosmosDBAccessException("insert exception", e);
}
assert response != null;
return mappingCosmosConverter.read(domainClass, response.properties());
}
public <T> T findById(Object id, Class<T> entityClass) {
@ -136,33 +129,21 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
Assert.notNull(partitionKey, "partitionKey should not be null");
assertValidId(id);
try {
final String collectionName = getCollectionName(entityClass);
return cosmosClient
.getDatabase(databaseName)
.getContainer(collectionName)
.getItem(id.toString(), partitionKey)
.read()
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.justOrEmpty(toDomainObject(entityClass,
cosmosItemResponse.properties()));
})
.onErrorResume(e -> {
if (e instanceof CosmosClientException) {
final CosmosClientException cosmosClientException = (CosmosClientException) e;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return Mono.empty();
}
}
return Mono.error(e);
})
.block();
} catch (Exception e) {
throw new CosmosDBAccessException("findById exception", e);
}
final String collectionName = getCollectionName(entityClass);
return cosmosClient
.getDatabase(databaseName)
.getContainer(collectionName)
.getItem(id.toString(), partitionKey)
.read()
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.justOrEmpty(toDomainObject(entityClass,
cosmosItemResponse.properties()));
})
.onErrorResume(throwable ->
findAPIExceptionHandler("Failed to find item", throwable))
.block();
}
public <T> T findById(String collectionName, Object id, Class<T> domainClass) {
@ -170,39 +151,26 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
Assert.notNull(domainClass, "entityClass should not be null");
assertValidId(id);
try {
final String query = String.format("select * from root where root.id = '%s'", id.toString());
final FeedOptions options = new FeedOptions();
options.enableCrossPartitionQuery(true);
options.populateQueryMetrics(isPopulateQueryMetrics);
return cosmosClient
.getDatabase(databaseName)
.getContainer(collectionName)
.queryItems(query, options)
.flatMap(cosmosItemFeedResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, cosmosItemFeedResponse);
return Mono.justOrEmpty(cosmosItemFeedResponse
.results()
.stream()
.map(cosmosItem -> mappingCosmosConverter.read(domainClass, cosmosItem))
.findFirst());
})
.onErrorResume(e -> {
if (e instanceof CosmosClientException) {
final CosmosClientException cosmosClientException = (CosmosClientException) e;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return Mono.empty();
}
}
return Mono.error(e);
})
.blockFirst();
} catch (Exception e) {
throw new CosmosDBAccessException("findById exception", e);
}
final String query = String.format("select * from root where root.id = '%s'", id.toString());
final FeedOptions options = new FeedOptions();
options.enableCrossPartitionQuery(true);
options.populateQueryMetrics(isPopulateQueryMetrics);
return cosmosClient
.getDatabase(databaseName)
.getContainer(collectionName)
.queryItems(query, options)
.flatMap(cosmosItemFeedResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, cosmosItemFeedResponse);
return Mono.justOrEmpty(cosmosItemFeedResponse
.results()
.stream()
.map(cosmosItem -> mappingCosmosConverter.read(domainClass, cosmosItem))
.findFirst());
})
.onErrorResume(throwable ->
findAPIExceptionHandler("Failed to find item", throwable))
.blockFirst();
}
public <T> void upsert(T object, PartitionKey partitionKey) {
@ -215,30 +183,24 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
Assert.hasText(collectionName, "collectionName should not be null, empty or only whitespaces");
Assert.notNull(object, "Upsert object should not be null");
try {
final CosmosItemProperties originalItem = mappingCosmosConverter.writeCosmosItemProperties(object);
final CosmosItemProperties originalItem = mappingCosmosConverter.writeCosmosItemProperties(object);
log.debug("execute upsert document in database {} collection {}", this.databaseName, collectionName);
log.debug("execute upsert item in database {} collection {}", this.databaseName, collectionName);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
applyVersioning(object.getClass(), originalItem, options);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
applyVersioning(object.getClass(), originalItem, options);
final CosmosItemResponse cosmosItemResponse = cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.upsertItem(originalItem, options)
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.onErrorResume(Mono::error)
.block();
final CosmosItemResponse cosmosItemResponse = cosmosClient
.getDatabase(this.databaseName)
.getContainer(collectionName)
.upsertItem(originalItem, options)
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.onErrorResume(throwable -> exceptionHandler("Failed to upsert item", throwable))
.block();
if (cosmosItemResponse == null) {
throw new CosmosDBAccessException("Failed to upsert item");
}
} catch (Exception ex) {
throw new CosmosDBAccessException("Failed to upsert document to database.", ex);
}
assert cosmosItemResponse != null;
}
public <T> List<T> findAll(Class<T> entityClass) {
@ -253,8 +215,8 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
final DocumentQuery query = new DocumentQuery(Criteria.getInstance(CriteriaType.ALL));
final List<CosmosItemProperties> documents = findDocuments(query, domainClass, collectionName);
return documents.stream()
final List<CosmosItemProperties> items = findItems(query, domainClass, collectionName);
return items.stream()
.map(d -> getConverter().read(domainClass, d))
.collect(Collectors.toList());
}
@ -270,18 +232,14 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
@Override
public void deleteCollection(@NonNull String collectionName) {
Assert.hasText(collectionName, "collectionName should have text.");
try {
cosmosClient
.getDatabase(this.databaseName)
.getContainer(collectionName)
.delete()
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.block();
} catch (Exception e) {
throw new CosmosDBAccessException("failed to delete collection: " + collectionName,
e);
}
cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.delete()
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.onErrorResume(throwable ->
exceptionHandler("Failed to delete collection", throwable))
.block();
}
public String getCollectionName(Class<?> domainClass) {
@ -294,6 +252,8 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
public CosmosContainerProperties createCollectionIfNotExists(@NonNull CosmosEntityInformation<?, ?> information) {
final CosmosContainerResponse response = cosmosClient
.createDatabaseIfNotExists(this.databaseName)
.onErrorResume(throwable ->
exceptionHandler("Failed to create database", throwable))
.flatMap(cosmosDatabaseResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosDatabaseResponse, null);
@ -301,14 +261,14 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
.database()
.createContainerIfNotExists(information.getCollectionName(),
"/" + information.getPartitionKeyFieldName(), information.getRequestUnit())
.onErrorResume(throwable ->
exceptionHandler("Failed to create container", throwable))
.doOnNext(cosmosContainerResponse ->
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosContainerResponse, null));
})
.block();
if (response == null) {
throw new CosmosDBAccessException("Failed to create collection");
}
assert response != null;
return response.properties();
}
@ -316,26 +276,22 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
Assert.hasText(collectionName, "collectionName should not be null, empty or only whitespaces");
assertValidId(id);
log.debug("execute deleteById in database {} collection {}", this.databaseName, collectionName);
log.debug("execute deleteById in database {} container {}", this.databaseName, collectionName);
if (partitionKey == null) {
partitionKey = PartitionKey.None;
}
try {
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.getItem(id.toString(), partitionKey)
.delete(options)
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.onErrorResume(Mono::error)
.block();
} catch (Exception e) {
throw new CosmosDBAccessException("deleteById exception", e);
}
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.getItem(id.toString(), partitionKey)
.delete(options)
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.onErrorResume(throwable ->
exceptionHandler("Failed to delete item", throwable))
.block();
}
@Override
@ -352,16 +308,12 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
public <T> List<T> find(@NonNull DocumentQuery query, @NonNull Class<T> domainClass, String collectionName) {
Assert.notNull(query, "DocumentQuery should not be null.");
Assert.notNull(domainClass, "domainClass should not be null.");
Assert.hasText(collectionName, "collection should not be null, empty or only whitespaces");
Assert.hasText(collectionName, "container should not be null, empty or only whitespaces");
try {
return findDocuments(query, domainClass, collectionName)
.stream()
.map(cosmosItemProperties -> toDomainObject(domainClass, cosmosItemProperties))
.collect(Collectors.toList());
} catch (Exception e) {
throw new CosmosDBAccessException("Failed to execute find operation from " + collectionName, e);
}
return findItems(query, domainClass, collectionName)
.stream()
.map(cosmosItemProperties -> toDomainObject(domainClass, cosmosItemProperties))
.collect(Collectors.toList());
}
public <T> Boolean exists(@NonNull DocumentQuery query, @NonNull Class<T> domainClass, String collectionName) {
@ -369,31 +321,31 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
}
/**
* Delete the DocumentQuery, need to query the domains at first, then delete the document
* Delete the DocumentQuery, need to query the domains at first, then delete the item
* from the result.
* The cosmosdb Sql API do _NOT_ support DELETE query, we cannot add one DeleteQueryGenerator.
*
* @param query The representation for query method.
* @param domainClass Class of domain
* @param collectionName Collection Name of database
* @param collectionName Container name of database
* @param <T>
* @return All the deleted documents as List.
* @return All the deleted items as List.
*/
@Override
public <T> List<T> delete(@NonNull DocumentQuery query, @NonNull Class<T> domainClass,
@NonNull String collectionName) {
Assert.notNull(query, "DocumentQuery should not be null.");
Assert.notNull(domainClass, "domainClass should not be null.");
Assert.hasText(collectionName, "collection should not be null, empty or only whitespaces");
Assert.hasText(collectionName, "container should not be null, empty or only whitespaces");
final List<CosmosItemProperties> results = findDocuments(query, domainClass, collectionName);
final List<CosmosItemProperties> results = findItems(query, domainClass, collectionName);
final List<String> partitionKeyName = getPartitionKeyNames(domainClass);
results.forEach(d -> deleteDocument(d, partitionKeyName, collectionName, domainClass));
return results.stream()
.map(d -> getConverter().read(domainClass, d))
.collect(Collectors.toList());
return results.stream().map(cosmosItemProperties -> {
final CosmosItemResponse cosmosItemResponse = deleteItem(cosmosItemProperties,
partitionKeyName, collectionName, domainClass);
return getConverter().read(domainClass, cosmosItemResponse.properties());
}).collect(Collectors.toList());
}
@Override
@ -409,7 +361,7 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
@Override
public <T> Page<T> paginationQuery(DocumentQuery query, Class<T> domainClass, String collectionName) {
Assert.isTrue(query.getPageable().getPageSize() > 0, "pageable should have page size larger than 0");
Assert.hasText(collectionName, "collection should not be null, empty or only whitespaces");
Assert.hasText(collectionName, "container should not be null, empty or only whitespaces");
final Pageable pageable = query.getPageable();
final FeedOptions feedOptions = new FeedOptions();
@ -422,19 +374,18 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
feedOptions.populateQueryMetrics(isPopulateQueryMetrics);
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
final FeedResponse<CosmosItemProperties> feedResponse =
cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.queryItems(sqlQuerySpec, feedOptions)
.doOnNext(propertiesFeedResponse -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, propertiesFeedResponse))
.next()
.block();
if (feedResponse == null) {
throw new CosmosDBAccessException("Failed to query documents");
}
final FeedResponse<CosmosItemProperties> feedResponse = cosmosClient
.getDatabase(this.databaseName)
.getContainer(collectionName)
.queryItems(sqlQuerySpec, feedOptions)
.doOnNext(propertiesFeedResponse -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, propertiesFeedResponse))
.onErrorResume(throwable ->
exceptionHandler("Failed to query items", throwable))
.next()
.block();
assert feedResponse != null;
final Iterator<CosmosItemProperties> it = feedResponse.results().iterator();
final List<T> result = new ArrayList<>();
@ -456,7 +407,7 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
if (contentSize < pageable.getPageSize() && contentSize > 0) {
// If the content size is less than page size,
// this means, cosmosDB is returning less documents than page size,
// this means, cosmosDB is returning less items than page size,
// because of either RU limit, or payload limit
// Set the page size to content size.
@ -476,27 +427,23 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
@Override
public long count(String collectionName) {
Assert.hasText(collectionName, "collectionName should not be empty");
Assert.hasText(collectionName, "container name should not be empty");
final DocumentQuery query = new DocumentQuery(Criteria.getInstance(CriteriaType.ALL));
final Long count = getCountValue(query, true, collectionName);
if (count == null) {
throw new CosmosDBAccessException("Failed to get count for collectionName: " + collectionName);
}
assert count != null;
return count;
}
@Override
public <T> long count(DocumentQuery query, Class<T> domainClass, String collectionName) {
Assert.notNull(domainClass, "domainClass should not be null");
Assert.hasText(collectionName, "collectionName should not be empty");
Assert.hasText(collectionName, "container name should not be empty");
final boolean isCrossPartitionQuery =
query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
final Long count = getCountValue(query, isCrossPartitionQuery, collectionName);
if (count == null) {
throw new CosmosDBAccessException("Failed to get count for collectionName: " + collectionName);
}
assert count != null;
return count;
}
@ -513,7 +460,8 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
options.populateQueryMetrics(isPopulateQueryMetrics);
return executeQuery(querySpec, containerName, options)
.onErrorResume(this::databaseAccessExceptionHandler)
.onErrorResume(throwable ->
exceptionHandler("Failed to get count value", throwable))
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, response))
.next()
@ -521,15 +469,13 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
.block();
}
private <T> Mono<T> databaseAccessExceptionHandler(Throwable e) {
throw new CosmosDBAccessException("failed to access cosmosdb database", e);
}
private Flux<FeedResponse<CosmosItemProperties>> executeQuery(SqlQuerySpec sqlQuerySpec, String collectionName,
FeedOptions options) {
return cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.queryItems(sqlQuerySpec, options);
.getContainer(collectionName)
.queryItems(sqlQuerySpec, options)
.onErrorResume(throwable ->
exceptionHandler("Failed to execute query", throwable));
}
private List<String> getPartitionKeyNames(Class<?> domainClass) {
@ -549,9 +495,9 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
}
}
private List<CosmosItemProperties> findDocuments(@NonNull DocumentQuery query,
@NonNull Class<?> domainClass,
@NonNull String containerName) {
private List<CosmosItemProperties> findItems(@NonNull DocumentQuery query,
@NonNull Class<?> domainClass,
@NonNull String containerName) {
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
final boolean isCrossPartitionQuery =
query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
@ -568,14 +514,16 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
null, cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.results());
})
.onErrorResume(throwable ->
exceptionHandler("Failed to find items", throwable))
.collectList()
.block();
}
private CosmosItemResponse deleteDocument(@NonNull CosmosItemProperties cosmosItemProperties,
@NonNull List<String> partitionKeyNames,
String containerName,
@NonNull Class<?> domainClass) {
private CosmosItemResponse deleteItem(@NonNull CosmosItemProperties cosmosItemProperties,
@NonNull List<String> partitionKeyNames,
String containerName,
@NonNull Class<?> domainClass) {
Assert.isTrue(partitionKeyNames.size() <= 1, "Only one Partition is supported.");
PartitionKey partitionKey = null;
@ -598,6 +546,8 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
.delete(options)
.doOnNext(response -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
response, null))
.onErrorResume(throwable ->
exceptionHandler("Failed to delete item", throwable))
.block();
}

Просмотреть файл

@ -21,7 +21,6 @@ import com.microsoft.azure.spring.data.cosmosdb.core.generator.FindQuerySpecGene
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
import com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBAccessException;
import com.microsoft.azure.spring.data.cosmosdb.repository.support.CosmosEntityInformation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@ -38,6 +37,8 @@ import java.util.Collections;
import java.util.List;
import static com.microsoft.azure.spring.data.cosmosdb.common.CosmosdbUtils.fillAndProcessResponseDiagnostics;
import static com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBExceptionUtils.exceptionHandler;
import static com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBExceptionUtils.findAPIExceptionHandler;
@Slf4j
public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, ApplicationContextAware {
@ -93,6 +94,8 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
return cosmosClient
.createDatabaseIfNotExists(this.databaseName)
.onErrorResume(throwable ->
exceptionHandler("Failed to create database", throwable))
.flatMap(cosmosDatabaseResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosDatabaseResponse, null);
@ -105,7 +108,9 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
cosmosContainerResponse, null);
this.collectionCache.add(information.getCollectionName());
return cosmosContainerResponse;
});
})
.onErrorResume(throwable ->
exceptionHandler("Failed to create collection", throwable));
});
}
@ -179,7 +184,8 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
.map(cosmosItem -> toDomainObject(entityClass, cosmosItem))
.findFirst());
})
.onErrorResume(this::databaseAccessExceptionHandler)
.onErrorResume(throwable ->
findAPIExceptionHandler("Failed to find item", throwable))
.next();
}
@ -207,7 +213,8 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
return Mono.justOrEmpty(toDomainObject(entityClass,
cosmosItemResponse.properties()));
})
.onErrorResume(this::databaseAccessExceptionHandler);
.onErrorResume(throwable ->
findAPIExceptionHandler("Failed to find item", throwable));
}
/**
@ -233,15 +240,17 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
Assert.notNull(objectToSave, "objectToSave should not be null");
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
final CosmosItemProperties originalItem = mappingCosmosConverter.writeCosmosItemProperties(objectToSave);
return cosmosClient.getDatabase(this.databaseName)
.getContainer(getContainerName(objectToSave.getClass()))
.createItem(objectToSave, new CosmosItemRequestOptions())
.onErrorResume(this::databaseAccessExceptionHandler)
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.just(toDomainObject(domainClass, cosmosItemResponse.properties()));
});
.getContainer(getContainerName(objectToSave.getClass()))
.createItem(originalItem, new CosmosItemRequestOptions())
.onErrorResume(throwable ->
exceptionHandler("Failed to insert item", throwable))
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.just(toDomainObject(domainClass, cosmosItemResponse.properties()));
});
}
/**
@ -256,21 +265,22 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(objectToSave, "objectToSave should not be null");
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
final CosmosItemProperties originalItem = mappingCosmosConverter.writeCosmosItemProperties(objectToSave);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
if (partitionKey != null) {
options.partitionKey(partitionKey);
}
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
return cosmosClient.getDatabase(this.databaseName)
.getContainer(containerName)
.createItem(objectToSave, options)
.onErrorResume(this::databaseAccessExceptionHandler)
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.just(toDomainObject(domainClass, cosmosItemResponse.properties()));
});
.getContainer(containerName)
.createItem(originalItem, options)
.onErrorResume(throwable ->
exceptionHandler("Failed to insert item", throwable))
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.just(toDomainObject(domainClass, cosmosItemResponse.properties()));
});
}
/**
@ -296,20 +306,22 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
@Override
public <T> Mono<T> upsert(String containerName, T object, PartitionKey partitionKey) {
final Class<T> domainClass = (Class<T>) object.getClass();
final CosmosItemProperties originalItem = mappingCosmosConverter.writeCosmosItemProperties(object);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
if (partitionKey != null) {
options.partitionKey(partitionKey);
}
return cosmosClient.getDatabase(this.databaseName)
.getContainer(containerName)
.upsertItem(object, options)
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.just(toDomainObject(domainClass, cosmosItemResponse.properties()));
})
.onErrorResume(this::databaseAccessExceptionHandler);
.getContainer(containerName)
.upsertItem(originalItem, options)
.flatMap(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return Mono.just(toDomainObject(domainClass, cosmosItemResponse.properties()));
})
.onErrorResume(throwable ->
exceptionHandler("Failed to upsert item", throwable));
}
/**
@ -324,7 +336,10 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
public Mono<Void> deleteById(String containerName, Object id, PartitionKey partitionKey) {
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
assertValidId(id);
Assert.notNull(partitionKey, "partitionKey should not be null");
if (partitionKey == null) {
partitionKey = PartitionKey.None;
}
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
options.partitionKey(partitionKey);
@ -335,7 +350,8 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
.doOnNext(cosmosItemResponse ->
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null))
.onErrorResume(this::databaseAccessExceptionHandler)
.onErrorResume(throwable ->
exceptionHandler("Failed to delete item", throwable))
.then();
}
@ -359,22 +375,26 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
options.enableCrossPartitionQuery(isCrossPartitionQuery);
options.populateQueryMetrics(isPopulateQueryMetrics);
return cosmosClient.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(sqlQuerySpec, options)
.flatMap(cosmosItemFeedResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.results());
})
.flatMap(cosmosItemProperties -> cosmosClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.getItem(cosmosItemProperties.id(), cosmosItemProperties.get(partitionKeyName))
.delete()
.doOnNext(cosmosItemResponse -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null)))
.onErrorResume(this::databaseAccessExceptionHandler)
.then();
.getContainer(containerName)
.queryItems(sqlQuerySpec, options)
.onErrorResume(throwable ->
exceptionHandler("Failed to query items", throwable))
.flatMap(cosmosItemFeedResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.results());
})
.flatMap(cosmosItemProperties -> cosmosClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.getItem(cosmosItemProperties.id(), cosmosItemProperties.get(partitionKeyName))
.delete()
.doOnNext(cosmosItemResponse ->
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null))
.onErrorResume(throwable ->
exceptionHandler("Failed to delete items", throwable)))
.then();
}
/**
@ -391,10 +411,10 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
Assert.notNull(entityClass, "domainClass should not be null.");
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
final Flux<CosmosItemProperties> results = findDocuments(query, entityClass, containerName);
final Flux<CosmosItemProperties> results = findItems(query, entityClass, containerName);
final List<String> partitionKeyName = getPartitionKeyNames(entityClass);
return results.flatMap(d -> deleteDocument(d, partitionKeyName, containerName))
return results.flatMap(d -> deleteItem(d, partitionKeyName, containerName))
.flatMap(cosmosItemProperties -> Mono.just(toDomainObject(entityClass, cosmosItemProperties)));
}
@ -408,7 +428,7 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
*/
@Override
public <T> Flux<T> find(DocumentQuery query, Class<T> entityClass, String containerName) {
return findDocuments(query, entityClass, containerName)
return findItems(query, entityClass, containerName)
.map(cosmosItemProperties -> toDomainObject(entityClass, cosmosItemProperties));
}
@ -480,7 +500,8 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
return executeQuery(querySpec, containerName, options)
.doOnNext(feedResponse -> fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, feedResponse))
.onErrorResume(this::databaseAccessExceptionHandler)
.onErrorResume(throwable ->
exceptionHandler("Failed to get count value", throwable))
.next()
.map(r -> r.results().get(0).getLong(COUNT_VALUE_KEY));
}
@ -489,12 +510,10 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
FeedOptions options) {
return cosmosClient.getDatabase(this.databaseName)
.getContainer(collectionName)
.queryItems(sqlQuerySpec, options);
}
private <T> Mono<T> databaseAccessExceptionHandler(Throwable e) {
throw new CosmosDBAccessException("failed to access cosmosdb database", e);
.getContainer(collectionName)
.queryItems(sqlQuerySpec, options)
.onErrorResume(throwable ->
exceptionHandler("Failed to execute query", throwable));
}
/**
@ -505,18 +524,16 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
@Override
public void deleteContainer(@NonNull String containerName) {
Assert.hasText(containerName, "containerName should have text.");
try {
cosmosClient.getDatabase(this.databaseName)
.getContainer(containerName)
.delete()
.doOnNext(cosmosContainerResponse ->
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosClient.getDatabase(this.databaseName)
.getContainer(containerName)
.delete()
.doOnNext(cosmosContainerResponse ->
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosContainerResponse, null))
.block();
this.collectionCache.remove(containerName);
} catch (Exception e) {
throw new CosmosDBAccessException("failed to delete collection: " + containerName, e);
}
.onErrorResume(throwable ->
exceptionHandler("Failed to delete container", throwable))
.block();
this.collectionCache.remove(containerName);
}
/**
@ -529,8 +546,8 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
return new CosmosEntityInformation<>(domainClass).getCollectionName();
}
private Flux<CosmosItemProperties> findDocuments(@NonNull DocumentQuery query, @NonNull Class<?> domainClass,
@NonNull String containerName) {
private Flux<CosmosItemProperties> findItems(@NonNull DocumentQuery query, @NonNull Class<?> domainClass,
@NonNull String containerName) {
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
final boolean isCrossPartitionQuery = query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
final FeedOptions feedOptions = new FeedOptions();
@ -545,7 +562,9 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
null, cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.results());
});
})
.onErrorResume(throwable ->
exceptionHandler("Failed to query items", throwable));
}
private void assertValidId(Object id) {
@ -565,9 +584,9 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
return Collections.singletonList(entityInfo.getPartitionKeyFieldName());
}
private Mono<CosmosItemProperties> deleteDocument(@NonNull CosmosItemProperties cosmosItemProperties,
@NonNull List<String> partitionKeyNames,
String containerName) {
private Mono<CosmosItemProperties> deleteItem(@NonNull CosmosItemProperties cosmosItemProperties,
@NonNull List<String> partitionKeyNames,
String containerName) {
Assert.isTrue(partitionKeyNames.size() <= 1, "Only one Partition is supported.");
PartitionKey partitionKey = null;
@ -578,16 +597,17 @@ public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, Applica
final CosmosItemRequestOptions options = new CosmosItemRequestOptions(partitionKey);
return cosmosClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.getItem(cosmosItemProperties.id(), partitionKey)
.delete(options)
.map(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return cosmosItemProperties;
});
return cosmosClient.getDatabase(this.databaseName)
.getContainer(containerName)
.getItem(cosmosItemProperties.id(), partitionKey)
.delete(options)
.map(cosmosItemResponse -> {
fillAndProcessResponseDiagnostics(responseDiagnosticsProcessor,
cosmosItemResponse, null);
return cosmosItemProperties;
})
.onErrorResume(throwable ->
exceptionHandler("Failed to delete item", throwable));
}
private <T> T toDomainObject(@NonNull Class<T> domainClass, CosmosItemProperties cosmosItemProperties) {

Просмотреть файл

@ -5,15 +5,45 @@
*/
package com.microsoft.azure.spring.data.cosmosdb.exception;
import com.azure.data.cosmos.CosmosClientException;
import org.springframework.dao.DataAccessException;
import org.springframework.lang.Nullable;
/**
* Public class extending DataAccessException, exposes innerException.
* Every API in {@link com.microsoft.azure.spring.data.cosmosdb.repository.CosmosRepository}
* and {@link com.microsoft.azure.spring.data.cosmosdb.repository.ReactiveCosmosRepository}
* should throw {@link CosmosDBAccessException}.
* innerException refers to the exception thrown by CosmosDB SDK. Callers of repository APIs can
* rely on innerException for any retriable logic, or for more details on the failure of
* the operation.
*/
public class CosmosDBAccessException extends DataAccessException {
protected final CosmosClientException cosmosClientException;
public CosmosDBAccessException(String msg) {
super(msg);
this.cosmosClientException = null;
}
public CosmosDBAccessException(@Nullable String msg, @Nullable Throwable cause) {
super(msg, cause);
if (cause instanceof CosmosClientException) {
this.cosmosClientException = (CosmosClientException) cause;
} else {
this.cosmosClientException = null;
}
}
public CosmosDBAccessException(@Nullable String msg, @Nullable Exception cause) {
super(msg, cause);
this.cosmosClientException = cause instanceof CosmosClientException
? (CosmosClientException) cause
: null;
}
public CosmosClientException getCosmosClientException() {
return cosmosClientException;
}
}

Просмотреть файл

@ -0,0 +1,36 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.data.cosmosdb.exception;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.internal.HttpConstants;
import org.springframework.util.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
public class CosmosDBExceptionUtils {
public static <T> Mono<T> exceptionHandler(String message, Throwable throwable) {
if (StringUtils.isEmpty(message)) {
message = "Failed to access cosmosdb database";
}
// Unwrap the exception in case if it is a reactive exception
final Throwable unwrappedThrowable = Exceptions.unwrap(throwable);
throw new CosmosDBAccessException(message, unwrappedThrowable);
}
public static <T> Mono<T> findAPIExceptionHandler(String message, Throwable throwable) {
// Unwrap the exception in case if it is a reactive exception
final Throwable unwrappedThrowable = Exceptions.unwrap(throwable);
if (unwrappedThrowable instanceof CosmosClientException) {
final CosmosClientException cosmosClientException = (CosmosClientException) unwrappedThrowable;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return Mono.empty();
}
}
return exceptionHandler(message, unwrappedThrowable);
}
}

Просмотреть файл

@ -223,8 +223,8 @@ public class CosmosTemplateIT {
try {
cosmosTemplate.upsert(Person.class.getSimpleName(), updated, null);
} catch (CosmosDBAccessException e) {
assertThat(e.getCause()).isNotNull();
final Throwable cosmosClientException = e.getCause().getCause();
assertThat(e.getCosmosClientException()).isNotNull();
final Throwable cosmosClientException = e.getCosmosClientException();
assertThat(cosmosClientException).isInstanceOf(CosmosClientException.class);
assertThat(cosmosClientException.getMessage()).contains(PRECONDITION_IS_NOT_MET);

Просмотреть файл

@ -0,0 +1,30 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.data.cosmosdb.domain;
import com.microsoft.azure.spring.data.cosmosdb.core.mapping.Document;
import com.microsoft.azure.spring.data.cosmosdb.core.mapping.PartitionKey;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.data.annotation.Id;
@Data
@Document
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class Course {
@Id
private String courseId;
private String name;
@PartitionKey
private String department;
}

Просмотреть файл

@ -11,6 +11,7 @@ import com.microsoft.azure.spring.data.cosmosdb.common.TestConstants;
import com.microsoft.azure.spring.data.cosmosdb.config.AbstractCosmosConfiguration;
import com.microsoft.azure.spring.data.cosmosdb.config.CosmosDBConfig;
import com.microsoft.azure.spring.data.cosmosdb.repository.config.EnableCosmosRepositories;
import com.microsoft.azure.spring.data.cosmosdb.repository.config.EnableReactiveCosmosRepositories;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -20,6 +21,7 @@ import org.springframework.util.StringUtils;
@Configuration
@PropertySource(value = {"classpath:application.properties"})
@EnableCosmosRepositories
@EnableReactiveCosmosRepositories
public class TestRepositoryConfig extends AbstractCosmosConfiguration {
@Value("${cosmosdb.uri:}")
private String cosmosDbUri;

Просмотреть файл

@ -10,7 +10,6 @@ import com.microsoft.azure.spring.data.cosmosdb.common.TestConstants;
import com.microsoft.azure.spring.data.cosmosdb.common.TestUtils;
import com.microsoft.azure.spring.data.cosmosdb.core.CosmosTemplate;
import com.microsoft.azure.spring.data.cosmosdb.domain.Address;
import com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBAccessException;
import com.microsoft.azure.spring.data.cosmosdb.repository.TestRepositoryConfig;
import com.microsoft.azure.spring.data.cosmosdb.repository.repository.AddressRepository;
import com.microsoft.azure.spring.data.cosmosdb.repository.support.CosmosEntityInformation;

Просмотреть файл

@ -136,7 +136,7 @@ public class ProjectRepositorySortIT {
this.repository.findAll(sort);
}
@Test(expected = CosmosDBAccessException.class)
@Test(expected = IllegalArgumentException.class)
public void testFindAllSortIgnoreCaseException() {
final Sort.Order order = Sort.Order.by("name").ignoreCase();
final Sort sort = Sort.by(order);

Просмотреть файл

@ -0,0 +1,197 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.data.cosmosdb.repository.integration;
import com.azure.data.cosmos.PartitionKey;
import com.microsoft.azure.spring.data.cosmosdb.core.ReactiveCosmosTemplate;
import com.microsoft.azure.spring.data.cosmosdb.domain.Course;
import com.microsoft.azure.spring.data.cosmosdb.exception.CosmosDBAccessException;
import com.microsoft.azure.spring.data.cosmosdb.repository.TestRepositoryConfig;
import com.microsoft.azure.spring.data.cosmosdb.repository.repository.ReactiveCourseRepository;
import com.microsoft.azure.spring.data.cosmosdb.repository.support.CosmosEntityInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.Collections;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TestRepositoryConfig.class)
public class ReactiveCourseRepositoryIT {
private static final String COURSE_ID_1 = "1";
private static final String COURSE_ID_2 = "2";
private static final String COURSE_ID_3 = "3";
private static final String COURSE_ID_4 = "4";
private static final String COURSE_ID_5 = "5";
private static final String COURSE_NAME_1 = "Course1";
private static final String COURSE_NAME_2 = "Course2";
private static final String COURSE_NAME_3 = "Course3";
private static final String COURSE_NAME_4 = "Course4";
private static final String COURSE_NAME_5 = "Course5";
private static final String DEPARTMENT_NAME_1 = "Department1";
private static final String DEPARTMENT_NAME_2 = "Department2";
private static final String DEPARTMENT_NAME_3 = "Department3";
private static final Course COURSE_1 = new Course(COURSE_ID_1, COURSE_NAME_1, DEPARTMENT_NAME_3);
private static final Course COURSE_2 = new Course(COURSE_ID_2, COURSE_NAME_2, DEPARTMENT_NAME_2);
private static final Course COURSE_3 = new Course(COURSE_ID_3, COURSE_NAME_3, DEPARTMENT_NAME_2);
private static final Course COURSE_4 = new Course(COURSE_ID_4, COURSE_NAME_4, DEPARTMENT_NAME_1);
private static final Course COURSE_5 = new Course(COURSE_ID_5, COURSE_NAME_5, DEPARTMENT_NAME_1);
private final CosmosEntityInformation<Course, String> entityInformation =
new CosmosEntityInformation<>(Course.class);
@Autowired
private ReactiveCosmosTemplate template;
@Autowired
private ReactiveCourseRepository repository;
@PreDestroy
public void cleanUpCollection() {
template.deleteContainer(entityInformation.getCollectionName());
}
@Before
public void setup() {
final Flux<Course> savedFlux = repository.saveAll(Arrays.asList(COURSE_1, COURSE_2,
COURSE_3, COURSE_4));
StepVerifier.create(savedFlux).thenConsumeWhile(course -> true).expectComplete().verify();
}
@After
public void cleanup() {
final Mono<Void> deletedMono = repository.deleteAll();
StepVerifier.create(deletedMono).thenAwait().verifyComplete();
}
@Test
public void testFindById() {
final Mono<Course> idMono = repository.findById(COURSE_ID_4);
StepVerifier.create(idMono).expectNext(COURSE_4).expectComplete().verify();
}
@Test
public void testFindByIdAndPartitionKey() {
final Mono<Course> idMono = repository.findById(COURSE_ID_4,
new PartitionKey(entityInformation.getPartitionKeyFieldValue(COURSE_4)));
StepVerifier.create(idMono).expectNext(COURSE_4).expectComplete().verify();
}
@Test
public void testFindByIdAsPublisher() {
final Mono<Course> byId = repository.findById(Mono.just(COURSE_ID_1));
StepVerifier.create(byId).expectNext(COURSE_1).verifyComplete();
}
@Test
public void testFindAllWithSort() {
final Flux<Course> sortAll = repository.findAll(Sort.by(Sort.Order.desc("name")));
StepVerifier.create(sortAll).expectNext(COURSE_4, COURSE_3, COURSE_2, COURSE_1).verifyComplete();
}
@Test
public void testFindByIdNotFound() {
final Mono<Course> idMono = repository.findById("10");
// Expect an empty mono as return value
StepVerifier.create(idMono).expectComplete().verify();
}
@Test
public void testFindByIdAndPartitionKeyNotFound() {
final Mono<Course> idMono = repository.findById("10",
new PartitionKey(entityInformation.getPartitionKeyFieldValue(COURSE_1)));
// Expect an empty mono as return value
StepVerifier.create(idMono).expectComplete().verify();
}
@Test
public void testFindAll() {
final Flux<Course> allFlux = repository.findAll();
StepVerifier.create(allFlux).expectNextCount(4).verifyComplete();
}
@Test
public void testInsert() {
final Mono<Course> save = repository.save(COURSE_5);
StepVerifier.create(save).expectNext(COURSE_5).verifyComplete();
}
@Test
public void testUpsert() {
Mono<Course> save = repository.save(COURSE_1);
StepVerifier.create(save).expectNext(COURSE_1).expectComplete().verify();
save = repository.save(COURSE_1);
StepVerifier.create(save).expectNext(COURSE_1).expectComplete().verify();
}
@Test
public void testDeleteByIdWithoutPartitionKey() {
final Mono<Void> deleteMono = repository.deleteById(COURSE_1.getCourseId());
StepVerifier.create(deleteMono).expectError(CosmosDBAccessException.class).verify();
}
@Test
public void testDeleteByIdAndPartitionKey() {
final Mono<Void> deleteMono = repository.deleteById(COURSE_1.getCourseId(),
new PartitionKey(entityInformation.getPartitionKeyFieldValue(COURSE_1)));
StepVerifier.create(deleteMono).verifyComplete();
final Mono<Course> byId = repository.findById(COURSE_ID_1,
new PartitionKey(entityInformation.getPartitionKeyFieldValue(COURSE_1)));
// Expect an empty mono as return value
StepVerifier.create(byId).verifyComplete();
}
@Test
public void testDeleteByEntity() {
final Mono<Void> deleteMono = repository.delete(COURSE_4);
StepVerifier.create(deleteMono).verifyComplete();
final Mono<Course> byId = repository.findById(COURSE_ID_4);
// Expect an empty mono as return value
StepVerifier.create(byId).expectComplete().verify();
}
@Test
public void testDeleteByIdNotFound() {
final Mono<Void> deleteMono = repository.deleteById(COURSE_ID_5);
StepVerifier.create(deleteMono).expectError(CosmosDBAccessException.class).verify();
}
@Test
public void testDeleteByEntityNotFound() {
final Mono<Void> deleteMono = repository.delete(COURSE_5);
StepVerifier.create(deleteMono).expectError(CosmosDBAccessException.class).verify();
}
@Test
public void testCountAll() {
final Mono<Long> countMono = repository.count();
StepVerifier.create(countMono).expectNext(4L).verifyComplete();
}
@Test
public void testFindByDepartmentIn() {
final Flux<Course> byDepartmentIn =
repository.findByDepartmentIn(Collections.singletonList(DEPARTMENT_NAME_2));
StepVerifier.create(byDepartmentIn).expectNextCount(2).verifyComplete();
}
}

Просмотреть файл

@ -0,0 +1,17 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.data.cosmosdb.repository.repository;
import com.microsoft.azure.spring.data.cosmosdb.domain.Course;
import com.microsoft.azure.spring.data.cosmosdb.repository.ReactiveCosmosRepository;
import reactor.core.publisher.Flux;
import java.util.Collection;
public interface ReactiveCourseRepository extends ReactiveCosmosRepository<Course, String> {
Flux<Course> findByDepartmentIn(Collection<String> departments);
}