Spring Data Cosmos DB v3 reactor SDK implementation (#394)
* Adding v3 sdk dependency * Adding support to Reactor types using Cosmos V3 beta SDK This PR includes Reactive Operations, Templates and Repository as part of a complete plan of migrating entire operations to new Cosmos SDK * Fixing comment * Debug commit * Debug commit * Debugging test failures * Debugging test failures * Debugging test failures * Debugging test failures * Debugging test failures * Debugging test failures * Debugging test failures * Moving to latest cosmos sdk (v3) * Removing debugging code * Minor refactoring changes * Removed unused code, print statements, TODOs * Reducing the number of characters in a line * Reducing number of lines again to resolve check style * Code review comments, removed unused variables, converted private instances to local variables * More code review comments by codacy * More codacy comments * Another set of codacy comments * Added comments to empty method body * Code Review comments * Removed unused and un-implemented code
This commit is contained in:
Родитель
b0f028b3fd
Коммит
1fefc23172
31
pom.xml
31
pom.xml
|
@ -56,8 +56,10 @@
|
|||
<java.tuples.version>1.2</java.tuples.version>
|
||||
<slf4j.version>1.7.25</slf4j.version>
|
||||
<gson.version>2.8.4</gson.version>
|
||||
<project.reactor.test.version>3.2.5.RELEASE</project.reactor.test.version>
|
||||
|
||||
<azure.documentdb.version>1.16.2</azure.documentdb.version>
|
||||
<azure.cosmos.version>3.0.0</azure.cosmos.version>
|
||||
<azure.test.resourcegroup>spring-data-cosmosdb-test</azure.test.resourcegroup>
|
||||
<azure.test.dbname>testdb-${maven.build.timestamp}</azure.test.dbname>
|
||||
<skip.integration.tests>true</skip.integration.tests>
|
||||
|
@ -120,6 +122,12 @@
|
|||
<version>${azure.documentdb.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-cosmos</artifactId>
|
||||
<version>${azure.cosmos.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.module</groupId>
|
||||
<artifactId>jackson-module-parameter-names</artifactId>
|
||||
|
@ -181,6 +189,13 @@
|
|||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<version>${project.reactor.test.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
|
@ -274,6 +289,22 @@
|
|||
<inherited>true</inherited>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-help-plugin</artifactId>
|
||||
<version>3.1.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>show-profiles</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>active-profiles</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>findbugs-maven-plugin</artifactId>
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
|
||||
package com.microsoft.azure.spring.data.cosmosdb;
|
||||
|
||||
import com.azure.data.cosmos.CosmosClient;
|
||||
import com.microsoft.azure.documentdb.ConnectionPolicy;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.common.MacAddress;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.common.PropertyLoader;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.common.TelemetrySender;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.config.DocumentDBConfig;
|
||||
import lombok.Getter;
|
||||
import lombok.NonNull;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
public class CosmosDbFactory {
|
||||
|
||||
@Getter
|
||||
private final DocumentDBConfig config;
|
||||
|
||||
private static final boolean IS_TELEMETRY_ALLOWED = PropertyLoader.isApplicationTelemetryAllowed();
|
||||
|
||||
private static final String USER_AGENT_SUFFIX = Constants.USER_AGENT_SUFFIX + PropertyLoader.getProjectVersion();
|
||||
|
||||
private String getUserAgentSuffix() {
|
||||
String suffix = ";" + USER_AGENT_SUFFIX;
|
||||
|
||||
if (IS_TELEMETRY_ALLOWED) {
|
||||
suffix += ";" + MacAddress.getHashMac();
|
||||
}
|
||||
|
||||
return suffix;
|
||||
}
|
||||
|
||||
public CosmosDbFactory(@NonNull DocumentDBConfig config) {
|
||||
validateConfig(config);
|
||||
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public CosmosClient getCosmosClient() {
|
||||
final ConnectionPolicy policy = config.getConnectionPolicy();
|
||||
final String userAgent = getUserAgentSuffix() + ";" + policy.getUserAgentSuffix();
|
||||
|
||||
policy.setUserAgentSuffix(userAgent);
|
||||
return CosmosClient.builder()
|
||||
.endpoint(config.getUri())
|
||||
.key(config.getKey())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void validateConfig(@NonNull DocumentDBConfig config) {
|
||||
Assert.hasText(config.getUri(), "cosmosdb host url should have text!");
|
||||
Assert.hasText(config.getKey(), "cosmosdb host key should have text!");
|
||||
Assert.hasText(config.getDatabase(), "cosmosdb database should have text!");
|
||||
Assert.notNull(config.getConnectionPolicy(), "cosmosdb connection policy should not be null!");
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void sendTelemetry() {
|
||||
if (IS_TELEMETRY_ALLOWED) {
|
||||
final TelemetrySender sender = new TelemetrySender();
|
||||
|
||||
sender.send(this.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -48,7 +48,6 @@ public class DocumentDbFactory {
|
|||
final String userAgent = getUserAgentSuffix() + ";" + policy.getUserAgentSuffix();
|
||||
|
||||
policy.setUserAgentSuffix(userAgent);
|
||||
|
||||
return new DocumentClient(config.getUri(), config.getKey(), policy, config.getConsistencyLevel());
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
|
||||
package com.microsoft.azure.spring.data.cosmosdb.core;
|
||||
|
||||
import com.azure.data.cosmos.CosmosContainerResponse;
|
||||
import com.azure.data.cosmos.PartitionKey;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.repository.support.DocumentDbEntityInformation;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface ReactiveCosmosOperations {
|
||||
|
||||
Mono<CosmosContainerResponse> createCollectionIfNotExists(DocumentDbEntityInformation information);
|
||||
|
||||
<T> Flux<T> findAll(String collectionName, Class<T> entityClass);
|
||||
|
||||
<T> Flux<T> findAll(Class<T> entityClass);
|
||||
|
||||
<T> Mono<T> findById(Object id, Class<T> entityClass);
|
||||
|
||||
<T> Mono<T> findById(String collectionName, Object id, Class<T> entityClass);
|
||||
|
||||
<T> Mono<T> insert(T objectToSave, PartitionKey partitionKey);
|
||||
|
||||
<T> Mono<T> insert(String collectionName, Object objectToSave, PartitionKey partitionKey);
|
||||
|
||||
<T> Mono<T> upsert(T object, PartitionKey partitionKey);
|
||||
|
||||
<T> Mono<T> upsert(String collectionName, T object, PartitionKey partitionKey);
|
||||
|
||||
Mono<Void> deleteById(String collectionName, Object id, PartitionKey partitionKey);
|
||||
|
||||
Mono<Void> deleteAll(String collectionName, String partitionKey);
|
||||
|
||||
void deleteContainer(String collectionName);
|
||||
|
||||
<T> Mono<T> delete(DocumentQuery query, Class<T> entityClass, String collectionName);
|
||||
|
||||
<T> Flux<T> find(DocumentQuery query, Class<T> entityClass, String collectionName);
|
||||
|
||||
Mono<Boolean> exists(DocumentQuery query, Class<?> entityClass, String collectionName);
|
||||
|
||||
Mono<Boolean> existsById(Object id, Class<?> entityClass, String containerName);
|
||||
|
||||
Mono<Long> count(String collectionName);
|
||||
|
||||
Mono<Long> count(DocumentQuery query, String containerName);
|
||||
}
|
|
@ -0,0 +1,506 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
|
||||
package com.microsoft.azure.spring.data.cosmosdb.core;
|
||||
|
||||
import com.azure.data.cosmos.CosmosClient;
|
||||
import com.azure.data.cosmos.CosmosContainerResponse;
|
||||
import com.azure.data.cosmos.CosmosItemProperties;
|
||||
import com.azure.data.cosmos.CosmosItemRequestOptions;
|
||||
import com.azure.data.cosmos.FeedOptions;
|
||||
import com.azure.data.cosmos.FeedResponse;
|
||||
import com.azure.data.cosmos.PartitionKey;
|
||||
import com.azure.data.cosmos.SqlQuerySpec;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.CosmosDbFactory;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.convert.MappingDocumentDbConverter;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.generator.CountQueryGenerator;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.generator.FindQuerySpecGenerator;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.exception.DocumentDBAccessException;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.repository.support.DocumentDbEntityInformation;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.json.JSONObject;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, ApplicationContextAware {
|
||||
private static final String COUNT_VALUE_KEY = "_aggregate";
|
||||
public static final int DEFAULT_THROUGHPUT = 400;
|
||||
|
||||
private final String databaseName;
|
||||
|
||||
@Getter(AccessLevel.PRIVATE)
|
||||
private final CosmosClient cosmosClient;
|
||||
|
||||
private final List<String> collectionCache;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param cosmosDbFactory the cosmosdbfactory
|
||||
* @param mappingDocumentDbConverter the mappingDocumentDbConverter
|
||||
* @param dbName database name
|
||||
*/
|
||||
public ReactiveCosmosTemplate(CosmosDbFactory cosmosDbFactory,
|
||||
MappingDocumentDbConverter mappingDocumentDbConverter,
|
||||
String dbName) {
|
||||
Assert.notNull(cosmosDbFactory, "CosmosDbFactory must not be null!");
|
||||
Assert.notNull(mappingDocumentDbConverter, "MappingDocumentDbConverter must not be null!");
|
||||
|
||||
this.databaseName = dbName;
|
||||
this.collectionCache = new ArrayList<>();
|
||||
|
||||
this.cosmosClient = cosmosDbFactory.getCosmosClient();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param applicationContext the application context
|
||||
* @throws BeansException the bean exception
|
||||
*/
|
||||
public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
|
||||
// NOTE: When application context instance variable gets introduced, assign it here.
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a collection if it doesn't already exist
|
||||
*
|
||||
* @param information the DocumentDbEntityInformation
|
||||
* @return Mono containing CosmosContainerResponse
|
||||
*/
|
||||
@Override
|
||||
public Mono<CosmosContainerResponse> createCollectionIfNotExists(DocumentDbEntityInformation information) {
|
||||
|
||||
// Note: Once Cosmos DB publishes new build, we shouldn't pass DEFAULT_THROUGHPUT.
|
||||
// There is a NPE if we call the other overloaded method which automatically passes options as null.
|
||||
return cosmosClient
|
||||
.createDatabaseIfNotExists(this.databaseName)
|
||||
.flatMap(cosmosDatabaseResponse -> cosmosDatabaseResponse
|
||||
.database()
|
||||
.createContainerIfNotExists(information.getCollectionName(),
|
||||
"/" + information.getPartitionKeyFieldName(), DEFAULT_THROUGHPUT)
|
||||
.map(cosmosContainerResponse -> {
|
||||
this.collectionCache.add(information.getCollectionName());
|
||||
return cosmosContainerResponse;
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items in a given container
|
||||
*
|
||||
* @param containerName the containerName
|
||||
* @param entityClass the entityClass
|
||||
* @return Flux with all the found items or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Flux<T> findAll(String containerName, Class<T> entityClass) {
|
||||
final DocumentQuery query = new DocumentQuery(Criteria.getInstance(CriteriaType.ALL));
|
||||
|
||||
return find(query, entityClass, containerName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items in a given container
|
||||
*
|
||||
* @param entityClass the entityClass
|
||||
* @return Flux with all the found items or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Flux<T> findAll(Class<T> entityClass) {
|
||||
return findAll(entityClass.getSimpleName(), entityClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find by id
|
||||
*
|
||||
* @param id the id
|
||||
* @param entityClass the entityclass
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> findById(Object id, Class<T> entityClass) {
|
||||
Assert.notNull(entityClass, "entityClass should not be null");
|
||||
return findById(getContainerName(entityClass), id, entityClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find by id
|
||||
*
|
||||
* @param containerName the containername
|
||||
* @param id the id
|
||||
* @param entityClass the entity class
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> findById(String containerName, Object id, Class<T> entityClass) {
|
||||
Assert.hasText(containerName, "collectionName should not be null, empty or only whitespaces");
|
||||
Assert.notNull(entityClass, "entityClass should not be null");
|
||||
assertValidId(id);
|
||||
|
||||
final String query = String.format("select * from root where root.id = '%s'", id.toString());
|
||||
final FeedOptions options = new FeedOptions();
|
||||
options.enableCrossPartitionQuery(true);
|
||||
return getCosmosClient().getDatabase(databaseName)
|
||||
.getContainer(containerName)
|
||||
.queryItems(query, options)
|
||||
.flatMap(cosmosItemFeedResponse -> Mono.just(cosmosItemFeedResponse
|
||||
.results()
|
||||
.stream()
|
||||
.map(cosmosItem -> cosmosItem.toObject(entityClass))
|
||||
.collect(Collectors.toList()).get(0)))
|
||||
.next();
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert
|
||||
*
|
||||
* @param objectToSave the object to save
|
||||
* @param partitionKey the partition key
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
public <T> Mono<T> insert(T objectToSave, PartitionKey partitionKey) {
|
||||
Assert.notNull(objectToSave, "entityClass should not be null");
|
||||
|
||||
return insert(getContainerName(objectToSave.getClass()), objectToSave, partitionKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert
|
||||
*
|
||||
* @param objectToSave the object to save
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
public <T> Mono<T> insert(T objectToSave) {
|
||||
Assert.notNull(objectToSave, "objectToSave should not be null");
|
||||
|
||||
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
|
||||
return getCosmosClient().getDatabase(this.databaseName)
|
||||
.getContainer(getContainerName(objectToSave.getClass()))
|
||||
.createItem(objectToSave, new CosmosItemRequestOptions())
|
||||
.doOnError(Mono::error)
|
||||
.flatMap(cosmosItemResponse -> Mono.just(cosmosItemResponse.properties().toObject(domainClass)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert
|
||||
*
|
||||
* @param containerName the container name
|
||||
* @param objectToSave the object to save
|
||||
* @param partitionKey the partition key
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
public <T> Mono<T> insert(String containerName, Object objectToSave, PartitionKey partitionKey) {
|
||||
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
|
||||
Assert.notNull(objectToSave, "objectToSave should not be null");
|
||||
|
||||
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
|
||||
if (partitionKey != null) {
|
||||
options.partitionKey(partitionKey);
|
||||
}
|
||||
|
||||
final Class<T> domainClass = (Class<T>) objectToSave.getClass();
|
||||
return getCosmosClient().getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.createItem(objectToSave, options)
|
||||
.onErrorResume(Mono::error)
|
||||
.flatMap(cosmosItemResponse -> Mono.just(cosmosItemResponse.properties().toObject(domainClass)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Upsert
|
||||
*
|
||||
* @param object the object to upsert
|
||||
* @param partitionKey the partition key
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> upsert(T object, PartitionKey partitionKey) {
|
||||
return upsert(getContainerName(object.getClass()), object, partitionKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Upsert
|
||||
*
|
||||
* @param containerName the container name
|
||||
* @param object the object to save
|
||||
* @param partitionKey the partition key
|
||||
* @return Mono with the item or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> upsert(String containerName, T object, PartitionKey partitionKey) {
|
||||
final Class<T> domainClass = (Class<T>) object.getClass();
|
||||
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
|
||||
if (partitionKey != null) {
|
||||
options.partitionKey(partitionKey);
|
||||
}
|
||||
|
||||
return getCosmosClient().getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.upsertItem(object, options)
|
||||
.flatMap(cosmosItemResponse -> Mono.just(cosmosItemResponse.properties().toObject(domainClass)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete an item by id
|
||||
*
|
||||
* @param containerName the container name
|
||||
* @param id the id
|
||||
* @param partitionKey the partition key
|
||||
* @return void Mono
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> deleteById(String containerName, Object id, PartitionKey partitionKey) {
|
||||
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
|
||||
assertValidId(id);
|
||||
Assert.notNull(partitionKey, "partitionKey should not be null");
|
||||
|
||||
final JSONObject jo = new JSONObject();
|
||||
jo.put("id", id.toString());
|
||||
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
|
||||
options.partitionKey(partitionKey);
|
||||
return getCosmosClient().getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.getItem(id.toString(), partitionKey)
|
||||
.delete(options)
|
||||
.onErrorResume(throwable -> Mono.empty())
|
||||
.then();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all items in a container
|
||||
*
|
||||
* @param containerName the container name
|
||||
* @param partitionKeyName the partition key path
|
||||
* @return void Mono
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> deleteAll(String containerName, String partitionKeyName) {
|
||||
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
|
||||
Assert.notNull(partitionKeyName, "partitionKeyName should not be null");
|
||||
|
||||
final Criteria criteria = Criteria.getInstance(CriteriaType.ALL);
|
||||
final DocumentQuery query = new DocumentQuery(criteria);
|
||||
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
|
||||
final FeedOptions options = new FeedOptions();
|
||||
final boolean isCrossPartitionQuery = query.isCrossPartitionQuery(Collections.singletonList(partitionKeyName));
|
||||
options.enableCrossPartitionQuery(isCrossPartitionQuery);
|
||||
return getCosmosClient().getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.queryItems(sqlQuerySpec, options)
|
||||
.onErrorResume(this::databaseAccessExceptionHandler)
|
||||
.map(cosmosItemFeedResponse -> cosmosItemFeedResponse.results()
|
||||
.stream()
|
||||
.map(cosmosItemProperties -> getCosmosClient()
|
||||
.getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.getItem(cosmosItemProperties.id(), partitionKeyName)
|
||||
.delete(new CosmosItemRequestOptions())))
|
||||
.then();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete items matching query
|
||||
*
|
||||
* @param query the document query
|
||||
* @param entityClass the entity class
|
||||
* @param containerName the container name
|
||||
* @return Mono
|
||||
*/
|
||||
@Override
|
||||
public <T> Mono<T> delete(DocumentQuery query, Class<T> entityClass, String containerName) {
|
||||
Assert.notNull(query, "DocumentQuery should not be null.");
|
||||
Assert.notNull(entityClass, "domainClass should not be null.");
|
||||
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
|
||||
|
||||
final Flux<CosmosItemProperties> results = findDocuments(query, entityClass, containerName);
|
||||
final List<String> partitionKeyName = getPartitionKeyNames(entityClass);
|
||||
|
||||
results.flatMap(d -> deleteDocument(d, partitionKeyName, containerName));
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find items
|
||||
*
|
||||
* @param query the document query
|
||||
* @param entityClass the entity class
|
||||
* @param containerName the container name
|
||||
* @return Flux with found items or error
|
||||
*/
|
||||
@Override
|
||||
public <T> Flux<T> find(DocumentQuery query, Class<T> entityClass, String containerName) {
|
||||
return findDocuments(query, entityClass, containerName)
|
||||
.map(cosmosItemProperties -> cosmosItemProperties.toObject(entityClass));
|
||||
}
|
||||
|
||||
/**
|
||||
* Exists
|
||||
*
|
||||
* @param query the document query
|
||||
* @param entityClass the entity class
|
||||
* @param containerName the container name
|
||||
* @return Mono with a boolean or error
|
||||
*/
|
||||
@Override
|
||||
public Mono<Boolean> exists(DocumentQuery query, Class<?> entityClass, String containerName) {
|
||||
return getCountValue(query, true, containerName).flatMap(count -> Mono.just(count > 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Exists
|
||||
* @param id the id
|
||||
* @param entityClass the entity class
|
||||
* @param containerName the containercontainer nam,e
|
||||
* @return Mono with a boolean or error
|
||||
*/
|
||||
public Mono<Boolean> existsById(Object id, Class<?> entityClass, String containerName) {
|
||||
return findById(containerName, id, entityClass)
|
||||
.flatMap(o -> Mono.just(o != null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Count
|
||||
*
|
||||
* @param containerName the container name
|
||||
* @return Mono with the count or error
|
||||
*/
|
||||
@Override
|
||||
public Mono<Long> count(String containerName) {
|
||||
final DocumentQuery query = new DocumentQuery(Criteria.getInstance(CriteriaType.ALL));
|
||||
return getCountValue(query, true, containerName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Count
|
||||
*
|
||||
* @param query the document query
|
||||
* @param containerName the container name
|
||||
* @return Mono with count or error
|
||||
*/
|
||||
@Override
|
||||
public Mono<Long> count(DocumentQuery query, String containerName) {
|
||||
return getCountValue(query, true, containerName);
|
||||
}
|
||||
|
||||
private Mono<Long> getCountValue(DocumentQuery query, boolean isCrossPartitionQuery, String containerName) {
|
||||
final SqlQuerySpec querySpec = new CountQueryGenerator().generateCosmos(query);
|
||||
final FeedOptions options = new FeedOptions();
|
||||
|
||||
options.enableCrossPartitionQuery(isCrossPartitionQuery);
|
||||
|
||||
return executeQuery(querySpec, containerName, options)
|
||||
.onErrorResume(this::databaseAccessExceptionHandler)
|
||||
.next()
|
||||
.map(r -> r.results().get(0).getLong(COUNT_VALUE_KEY));
|
||||
}
|
||||
|
||||
private Flux<FeedResponse<CosmosItemProperties>> executeQuery(SqlQuerySpec sqlQuerySpec, String collectionName,
|
||||
FeedOptions options) {
|
||||
|
||||
return getCosmosClient().getDatabase(this.databaseName)
|
||||
.getContainer(collectionName)
|
||||
.queryItems(sqlQuerySpec, options);
|
||||
}
|
||||
|
||||
private <T> Mono<T> databaseAccessExceptionHandler(Throwable e) {
|
||||
throw new DocumentDBAccessException("failed to access cosmosdb database", e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete container with container name
|
||||
*
|
||||
* @param containerName the container name
|
||||
*/
|
||||
@Override
|
||||
public void deleteContainer(@NonNull String containerName) {
|
||||
Assert.hasText(containerName, "containerName should have text.");
|
||||
try {
|
||||
getCosmosClient().getDatabase(this.databaseName).getContainer(containerName).delete().block();
|
||||
this.collectionCache.remove(containerName);
|
||||
} catch (Exception e) {
|
||||
throw new DocumentDBAccessException("failed to delete collection: " + containerName, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param domainClass the domain class
|
||||
* @return the container name
|
||||
*/
|
||||
public String getContainerName(Class<?> domainClass) {
|
||||
Assert.notNull(domainClass, "domainClass should not be null");
|
||||
|
||||
return new DocumentDbEntityInformation<>(domainClass).getCollectionName();
|
||||
}
|
||||
|
||||
private Flux<CosmosItemProperties> findDocuments(@NonNull DocumentQuery query, @NonNull Class<?> domainClass,
|
||||
@NonNull String containerName) {
|
||||
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
|
||||
final boolean isCrossPartitionQuery = query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
|
||||
final FeedOptions feedOptions = new FeedOptions();
|
||||
feedOptions.enableCrossPartitionQuery(isCrossPartitionQuery);
|
||||
return getCosmosClient()
|
||||
.getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.queryItems(sqlQuerySpec, feedOptions)
|
||||
.flatMap(cosmosItemFeedResponse -> Flux.fromIterable(cosmosItemFeedResponse.results()));
|
||||
}
|
||||
|
||||
private void assertValidId(Object id) {
|
||||
Assert.notNull(id, "id should not be null");
|
||||
if (id instanceof String) {
|
||||
Assert.hasText(id.toString(), "id should not be empty or only whitespaces.");
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getPartitionKeyNames(Class<?> domainClass) {
|
||||
final DocumentDbEntityInformation entityInfo = new DocumentDbEntityInformation(domainClass);
|
||||
|
||||
if (entityInfo.getPartitionKeyFieldName() == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
return Collections.singletonList(entityInfo.getPartitionKeyFieldName());
|
||||
}
|
||||
|
||||
private Mono<Void> deleteDocument(@NonNull CosmosItemProperties cosmosItemProperties,
|
||||
@NonNull List<String> partitionKeyNames,
|
||||
String containerName) {
|
||||
Assert.isTrue(partitionKeyNames.size() <= 1, "Only one Partition is supported.");
|
||||
|
||||
PartitionKey partitionKey = null;
|
||||
|
||||
if (!partitionKeyNames.isEmpty() && StringUtils.hasText(partitionKeyNames.get(0))) {
|
||||
partitionKey = new PartitionKey(cosmosItemProperties.get(partitionKeyNames.get(0)));
|
||||
}
|
||||
|
||||
final CosmosItemRequestOptions options = new CosmosItemRequestOptions(partitionKey);
|
||||
|
||||
return getCosmosClient()
|
||||
.getDatabase(this.databaseName)
|
||||
.getContainer(containerName)
|
||||
.getItem(cosmosItemProperties.id(), partitionKey)
|
||||
.delete(options)
|
||||
.then();
|
||||
}
|
||||
|
||||
}
|
|
@ -223,4 +223,23 @@ public abstract class AbstractQueryGenerator {
|
|||
|
||||
return new SqlQuerySpec(queryString, sqlParameters);
|
||||
}
|
||||
|
||||
|
||||
protected com.azure.data.cosmos.SqlQuerySpec generateCosmosQuery(@NonNull DocumentQuery query,
|
||||
@NonNull String queryHead) {
|
||||
final Pair<String, List<Pair<String, Object>>> queryBody = generateQueryBody(query);
|
||||
final String queryString = String.join(" ", queryHead, queryBody.getValue0(), generateQueryTail(query));
|
||||
final List<Pair<String, Object>> parameters = queryBody.getValue1();
|
||||
final com.azure.data.cosmos.SqlParameterList sqlParameters =
|
||||
new com.azure.data.cosmos.SqlParameterList();
|
||||
|
||||
sqlParameters.addAll(
|
||||
parameters.stream()
|
||||
.map(p -> new com.azure.data.cosmos.SqlParameter("@" + p.getValue0(),
|
||||
toDocumentDBValue(p.getValue1())))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
|
||||
return new com.azure.data.cosmos.SqlQuerySpec(queryString, sqlParameters);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,4 +14,9 @@ public class CountQueryGenerator extends AbstractQueryGenerator implements Query
|
|||
public SqlQuerySpec generate(DocumentQuery query) {
|
||||
return super.generateQuery(query, "SELECT VALUE COUNT(1) FROM r");
|
||||
}
|
||||
|
||||
@Override
|
||||
public com.azure.data.cosmos.SqlQuerySpec generateCosmos(DocumentQuery query) {
|
||||
return super.generateCosmosQuery(query, "SELECT VALUE COUNT(1) FROM r");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,4 +17,9 @@ public class FindQuerySpecGenerator extends AbstractQueryGenerator implements Qu
|
|||
public SqlQuerySpec generate(@NonNull DocumentQuery query) {
|
||||
return super.generateQuery(query, "SELECT * FROM ROOT r");
|
||||
}
|
||||
|
||||
@Override
|
||||
public com.azure.data.cosmos.SqlQuerySpec generateCosmos(DocumentQuery query) {
|
||||
return super.generateCosmosQuery(query, "SELECT * FROM ROOT r");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,4 +17,11 @@ public interface QuerySpecGenerator {
|
|||
* @return SqlQuerySpec executed by documentDb client.
|
||||
*/
|
||||
SqlQuerySpec generate(DocumentQuery query);
|
||||
|
||||
/**
|
||||
* Generate the SqlQuerySpec for cosmosDB client.
|
||||
* @param query tree structured query condition.
|
||||
* @return SqlQuerySpec executed by documentDb client.
|
||||
*/
|
||||
com.azure.data.cosmos.SqlQuerySpec generateCosmos(DocumentQuery query);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
package com.microsoft.azure.spring.data.cosmosdb.repository;
|
||||
|
||||
import org.springframework.data.repository.NoRepositoryBean;
|
||||
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
|
||||
|
||||
@NoRepositoryBean
|
||||
public interface ReactiveCosmosRepository<T, K> extends ReactiveSortingRepository<T, K> {
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
package com.microsoft.azure.spring.data.cosmosdb.repository.support;
|
||||
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.data.repository.core.EntityInformation;
|
||||
import org.springframework.data.repository.core.RepositoryInformation;
|
||||
import org.springframework.data.repository.core.RepositoryMetadata;
|
||||
import org.springframework.data.repository.core.support.ReactiveRepositoryFactorySupport;
|
||||
import org.springframework.data.repository.query.QueryLookupStrategy;
|
||||
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Optional;
|
||||
|
||||
public class ReactiveCosmosRepositoryFactory extends ReactiveRepositoryFactorySupport {
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
public ReactiveCosmosRepositoryFactory(ApplicationContext applicationContext) {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, ID> EntityInformation<T, ID> getEntityInformation(Class<T> domainClass) {
|
||||
return new DocumentDbEntityInformation<>(domainClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object getTargetRepository(RepositoryInformation information) {
|
||||
final EntityInformation<?, Serializable> entityInformation = getEntityInformation(information.getDomainType());
|
||||
return getTargetRepositoryViaReflection(information, entityInformation, this.applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> getRepositoryBaseClass(RepositoryMetadata metadata) {
|
||||
return SimpleReactiveCosmosRepository.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Optional<QueryLookupStrategy> getQueryLookupStrategy(
|
||||
QueryLookupStrategy.Key key, QueryMethodEvaluationContextProvider evaluationContextProvider) {
|
||||
return super.getQueryLookupStrategy(key, evaluationContextProvider);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
package com.microsoft.azure.spring.data.cosmosdb.repository.support;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.data.mapping.context.MappingContext;
|
||||
import org.springframework.data.repository.Repository;
|
||||
import org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport;
|
||||
import org.springframework.data.repository.core.support.RepositoryFactorySupport;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ReactiveCosmosRepositoryFactoryBean<T extends Repository<S, K>, S, K extends Serializable>
|
||||
extends RepositoryFactoryBeanSupport<T, S, K>
|
||||
implements ApplicationContextAware {
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
public ReactiveCosmosRepositoryFactoryBean(Class<? extends T> repositoryInterface) {
|
||||
super(repositoryInterface);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RepositoryFactorySupport createRepositoryFactory() {
|
||||
return getFactoryInstance(applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
protected RepositoryFactorySupport getFactoryInstance(ApplicationContext applicationContext) {
|
||||
return new ReactiveCosmosRepositoryFactory(applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setMappingContext(MappingContext<?, ?> mappingContext) {
|
||||
super.setMappingContext(mappingContext);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
package com.microsoft.azure.spring.data.cosmosdb.repository.support;
|
||||
|
||||
import com.azure.data.cosmos.PartitionKey;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.ReactiveCosmosOperations;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.repository.ReactiveCosmosRepository;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
public class SimpleReactiveCosmosRepository<T, K extends Serializable> implements ReactiveCosmosRepository<T, K> {
|
||||
|
||||
public final DocumentDbEntityInformation<T, K> entityInformation;
|
||||
public final ReactiveCosmosOperations cosmosOperations;
|
||||
|
||||
@Override
|
||||
public Flux<T> findAll(Sort sort) {
|
||||
Assert.notNull(sort, "Sort must not be null!");
|
||||
|
||||
final DocumentQuery query = new DocumentQuery(Criteria.getInstance(CriteriaType.ALL)).with(sort);
|
||||
|
||||
return cosmosOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S extends T> Mono<S> save(S entity) {
|
||||
|
||||
Assert.notNull(entity, "Entity must not be null!");
|
||||
|
||||
if (entityInformation.isNew(entity)) {
|
||||
return cosmosOperations.insert(entityInformation.getCollectionName(),
|
||||
entity,
|
||||
createKey(entityInformation.getPartitionKeyFieldValue(entity)));
|
||||
} else {
|
||||
return cosmosOperations.upsert(entityInformation.getCollectionName(),
|
||||
entity, createKey(entityInformation.getPartitionKeyFieldValue(entity)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
|
||||
|
||||
Assert.notNull(entities, "The given Iterable of entities must not be null!");
|
||||
|
||||
return Flux.fromIterable(entities).flatMap(this::save);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
|
||||
|
||||
Assert.notNull(entityStream, "The given Publisher of entities must not be null!");
|
||||
|
||||
return Flux.from(entityStream).flatMap(this::save);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<T> findById(K id) {
|
||||
Assert.notNull(id, "The given id must not be null!");
|
||||
return cosmosOperations.findById(entityInformation.getCollectionName(), id, entityInformation.getJavaType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<T> findById(Publisher<K> publisher) {
|
||||
Assert.notNull(publisher, "The given id must not be null!");
|
||||
|
||||
return Mono.from(publisher).flatMap(
|
||||
id -> cosmosOperations.findById(entityInformation.getCollectionName(),
|
||||
id, entityInformation.getJavaType()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> existsById(K id) {
|
||||
Assert.notNull(id, "The given id must not be null!");
|
||||
|
||||
return cosmosOperations.existsById(id, entityInformation.getJavaType(),
|
||||
entityInformation.getCollectionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> existsById(Publisher<K> publisher) {
|
||||
Assert.notNull(publisher, "The given id must not be null!");
|
||||
|
||||
return Mono.from(publisher).flatMap(id -> cosmosOperations.existsById(id, entityInformation.getJavaType(),
|
||||
entityInformation.getCollectionName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<T> findAll() {
|
||||
return cosmosOperations.findAll(entityInformation.getCollectionName(), entityInformation.getJavaType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<T> findAllById(Iterable<K> ids) {
|
||||
Assert.notNull(ids, "Iterable ids should not be null");
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<T> findAllById(Publisher<K> ids) {
|
||||
Assert.notNull(ids, "The given Publisher of Id's must not be null!");
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> count() {
|
||||
return cosmosOperations.count(entityInformation.getCollectionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> deleteById(K id) {
|
||||
Assert.notNull(id, "The given id must not be null!");
|
||||
|
||||
return cosmosOperations.deleteById(entityInformation.getCollectionName(), id, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> deleteById(Publisher<K> publisher) {
|
||||
Assert.notNull(publisher, "Id must not be null!");
|
||||
|
||||
return Mono.from(publisher).flatMap(id -> cosmosOperations.deleteById(entityInformation.getCollectionName(),
|
||||
id, null)).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> delete(@NonNull T entity) {
|
||||
Assert.notNull(entity, "entity to be deleted must not be null!");
|
||||
|
||||
final Object id = entityInformation.getId(entity);
|
||||
return cosmosOperations.deleteById(entityInformation.getCollectionName(),
|
||||
id,
|
||||
createKey(entityInformation.getPartitionKeyFieldValue(entity)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> deleteAll(Iterable<? extends T> entities) {
|
||||
Assert.notNull(entities, "The given Iterable of entities must not be null!");
|
||||
|
||||
return Flux.fromIterable(entities).flatMap(this::delete).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
|
||||
|
||||
Assert.notNull(entityStream, "The given Publisher of entities must not be null!");
|
||||
|
||||
return Flux.from(entityStream)//
|
||||
.map(entityInformation::getRequiredId)//
|
||||
.flatMap(this::deleteById)//
|
||||
.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> deleteAll() {
|
||||
return cosmosOperations.deleteAll(entityInformation.getCollectionName(),
|
||||
entityInformation.getPartitionKeyFieldName());
|
||||
}
|
||||
|
||||
private PartitionKey createKey(String partitionKeyValue) {
|
||||
if (StringUtils.isEmpty(partitionKeyValue)) {
|
||||
return null;
|
||||
}
|
||||
return new PartitionKey(partitionKeyValue);
|
||||
}
|
||||
|
||||
}
|
|
@ -91,6 +91,7 @@ public class TestConstants {
|
|||
public static final String ID_1 = "id-1";
|
||||
public static final String ID_2 = "id-2";
|
||||
public static final String ID_3 = "id-3";
|
||||
public static final String ID_4 = "id-4";
|
||||
public static final String NEW_FIRST_NAME = "new_first_name";
|
||||
public static final String NEW_LAST_NAME = "new_last_name";
|
||||
public static final String UPDATED_FIRST_NAME = "updated_first_name";
|
||||
|
|
|
@ -85,7 +85,7 @@ public class DocumentDbTemplateIT {
|
|||
collectionName = personInfo.getCollectionName();
|
||||
|
||||
mappingContext.setInitialEntitySet(new EntityScanner(this.applicationContext).scan(Persistent.class));
|
||||
|
||||
|
||||
dbConverter = new MappingDocumentDbConverter(mappingContext, objectMapper);
|
||||
dbTemplate = new DocumentDbTemplate(dbFactory, dbConverter, DB_NAME);
|
||||
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
package com.microsoft.azure.spring.data.cosmosdb.core;
|
||||
|
||||
import com.azure.data.cosmos.CosmosClientException;
|
||||
import com.azure.data.cosmos.PartitionKey;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.CosmosDbFactory;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.common.TestConstants;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.config.DocumentDBConfig;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.convert.MappingDocumentDbConverter;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.mapping.DocumentDbMappingContext;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.domain.Person;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.repository.support.DocumentDbEntityInformation;
|
||||
import io.reactivex.subscribers.TestSubscriber;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.domain.EntityScanner;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.data.annotation.Persistent;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.DB_NAME;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@PropertySource(value = {"classpath:application.properties"})
|
||||
public class ReactiveCosmosTemplateIT {
|
||||
private static final Person TEST_PERSON = new Person(TestConstants.ID_1, TestConstants.FIRST_NAME,
|
||||
TestConstants.LAST_NAME, TestConstants.HOBBIES, TestConstants.ADDRESSES);
|
||||
|
||||
private static final Person TEST_PERSON_2 = new Person(TestConstants.ID_2, TestConstants.NEW_FIRST_NAME,
|
||||
TestConstants.NEW_LAST_NAME, TestConstants.HOBBIES, TestConstants.ADDRESSES);
|
||||
|
||||
private static final Person TEST_PERSON_3 = new Person(TestConstants.ID_3, TestConstants.NEW_FIRST_NAME,
|
||||
TestConstants.NEW_LAST_NAME, TestConstants.HOBBIES, TestConstants.ADDRESSES);
|
||||
|
||||
private static final Person TEST_PERSON_4 = new Person(TestConstants.ID_4, TestConstants.NEW_FIRST_NAME,
|
||||
TestConstants.NEW_LAST_NAME, TestConstants.HOBBIES, TestConstants.ADDRESSES);
|
||||
|
||||
@Value("${cosmosdb.uri}")
|
||||
private String documentDbUri;
|
||||
@Value("${cosmosdb.key}")
|
||||
private String documentDbKey;
|
||||
|
||||
private ReactiveCosmosTemplate dbTemplate;
|
||||
private String containerName;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Before
|
||||
public void setUp() throws ClassNotFoundException {
|
||||
final DocumentDBConfig dbConfig = DocumentDBConfig.builder(documentDbUri, documentDbKey, DB_NAME).build();
|
||||
final CosmosDbFactory dbFactory = new CosmosDbFactory(dbConfig);
|
||||
|
||||
final DocumentDbMappingContext mappingContext = new DocumentDbMappingContext();
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
final DocumentDbEntityInformation<Person, String> personInfo = new DocumentDbEntityInformation<>(Person.class);
|
||||
containerName = personInfo.getCollectionName();
|
||||
|
||||
mappingContext.setInitialEntitySet(new EntityScanner(this.applicationContext).scan(Persistent.class));
|
||||
|
||||
final MappingDocumentDbConverter dbConverter = new MappingDocumentDbConverter(mappingContext, objectMapper);
|
||||
|
||||
dbTemplate = new ReactiveCosmosTemplate(dbFactory, dbConverter, DB_NAME);
|
||||
dbTemplate.createCollectionIfNotExists(personInfo).block().container();
|
||||
dbTemplate.insert(TEST_PERSON).block();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
dbTemplate.deleteContainer(Person.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertDuplicateId() {
|
||||
final Mono<Person> insertMono = dbTemplate.insert(TEST_PERSON);
|
||||
final TestSubscriber testSubscriber = new TestSubscriber();
|
||||
insertMono.subscribe(testSubscriber);
|
||||
testSubscriber.awaitTerminalEvent();
|
||||
testSubscriber.assertNotComplete();
|
||||
testSubscriber.assertTerminated();
|
||||
assertThat(testSubscriber.errors()).hasSize(1);
|
||||
assertThat(((List) testSubscriber.getEvents().get(1)).get(0)).isInstanceOf(CosmosClientException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindByID() {
|
||||
final Mono<Person> findById = dbTemplate.findById(Person.class.getSimpleName(), TEST_PERSON.getId(),
|
||||
Person.class);
|
||||
StepVerifier.create(findById).consumeNextWith(actual -> {
|
||||
Assert.assertThat(actual.getFirstName(), is(equalTo(TEST_PERSON.getFirstName())));
|
||||
Assert.assertThat(actual.getLastName(), is(equalTo(TEST_PERSON.getLastName())));
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindAll() {
|
||||
final Flux<Person> flux = dbTemplate.findAll(Person.class.getSimpleName(), Person.class);
|
||||
StepVerifier.create(flux).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindByIdWithContainerName() {
|
||||
StepVerifier.create(dbTemplate.findById(Person.class.getSimpleName(), TEST_PERSON.getId(), Person.class))
|
||||
.consumeNextWith(actual -> {
|
||||
Assert.assertThat(actual.getFirstName(), is(equalTo(TEST_PERSON.getFirstName())));
|
||||
Assert.assertThat(actual.getLastName(), is(equalTo(TEST_PERSON.getLastName())));
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsert() {
|
||||
StepVerifier.create(dbTemplate.insert(TEST_PERSON_3))
|
||||
.expectNext(TEST_PERSON_3).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertWithCollectionName() {
|
||||
StepVerifier.create(dbTemplate.insert(Person.class.getSimpleName(), TEST_PERSON_2, null))
|
||||
.expectNext(TEST_PERSON_2).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsert() {
|
||||
final Person p = TEST_PERSON_2;
|
||||
final ArrayList<String> hobbies = new ArrayList<>(p.getHobbies());
|
||||
hobbies.add("more code");
|
||||
p.setHobbies(hobbies);
|
||||
final Mono<Person> upsert = dbTemplate.upsert(p, null);
|
||||
StepVerifier.create(upsert).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertWithCollectionName() {
|
||||
final Person p = TEST_PERSON_2;
|
||||
final ArrayList<String> hobbies = new ArrayList<>(p.getHobbies());
|
||||
hobbies.add("more code");
|
||||
p.setHobbies(hobbies);
|
||||
final Mono<Person> upsert = dbTemplate.upsert(Person.class.getSimpleName(), p, null);
|
||||
StepVerifier.create(upsert).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteById() {
|
||||
dbTemplate.insert(TEST_PERSON_4).block();
|
||||
final Mono<Void> voidMono = dbTemplate.deleteById(Person.class.getSimpleName(), TEST_PERSON_4.getId(),
|
||||
new PartitionKey(TEST_PERSON_4.getId()));
|
||||
StepVerifier.create(voidMono).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFind() {
|
||||
final Criteria criteria = Criteria.getInstance(CriteriaType.IS_EQUAL, "firstName",
|
||||
Arrays.asList(TEST_PERSON.getFirstName()));
|
||||
final DocumentQuery query = new DocumentQuery(criteria);
|
||||
final Flux<Person> personFlux = dbTemplate.find(query, Person.class, Person.class.getSimpleName());
|
||||
StepVerifier.create(personFlux).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExists() {
|
||||
final Criteria criteria = Criteria.getInstance(CriteriaType.IS_EQUAL, "firstName",
|
||||
Arrays.asList(TEST_PERSON.getFirstName()));
|
||||
final DocumentQuery query = new DocumentQuery(criteria);
|
||||
final Mono<Boolean> exists = dbTemplate.exists(query, Person.class, containerName);
|
||||
StepVerifier.create(exists).expectNext(true).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCount() {
|
||||
final Mono<Long> count = dbTemplate.count(containerName);
|
||||
StepVerifier.create(count).expectNext((long) 1).verifyComplete();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See LICENSE in the project root for
|
||||
* license information.
|
||||
*/
|
||||
package com.microsoft.azure.spring.data.cosmosdb.core;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.azure.data.cosmos.PartitionKey;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.CosmosDbFactory;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.config.DocumentDBConfig;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.convert.MappingDocumentDbConverter;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.mapping.DocumentDbMappingContext;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.DocumentQuery;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.domain.PartitionPerson;
|
||||
import com.microsoft.azure.spring.data.cosmosdb.repository.support.DocumentDbEntityInformation;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.domain.EntityScanner;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.data.annotation.Persistent;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.ADDRESSES;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.DB_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.FIRST_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.HOBBIES;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.ID_1;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.ID_2;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.LAST_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.NEW_FIRST_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.NEW_LAST_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.PROPERTY_LAST_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.common.TestConstants.UPDATED_FIRST_NAME;
|
||||
import static com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType.IS_EQUAL;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@PropertySource(value = {"classpath:application.properties"})
|
||||
public class ReactiveCosmosTemplatePartitionIT {
|
||||
private static final PartitionPerson TEST_PERSON = new PartitionPerson(ID_1, FIRST_NAME, LAST_NAME,
|
||||
HOBBIES, ADDRESSES);
|
||||
|
||||
private static final PartitionPerson TEST_PERSON_2 = new PartitionPerson(ID_2, NEW_FIRST_NAME,
|
||||
TEST_PERSON.getLastName(), HOBBIES, ADDRESSES);
|
||||
|
||||
@Value("${cosmosdb.uri}")
|
||||
private String documentDbUri;
|
||||
@Value("${cosmosdb.key}")
|
||||
private String documentDbKey;
|
||||
|
||||
private ReactiveCosmosTemplate dbTemplate;
|
||||
private String containerName;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Before
|
||||
public void setUp() throws ClassNotFoundException {
|
||||
final DocumentDBConfig dbConfig = DocumentDBConfig.builder(documentDbUri, documentDbKey, DB_NAME).build();
|
||||
final CosmosDbFactory dbFactory = new CosmosDbFactory(dbConfig);
|
||||
|
||||
final DocumentDbMappingContext mappingContext = new DocumentDbMappingContext();
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
final DocumentDbEntityInformation<PartitionPerson, String> personInfo =
|
||||
new DocumentDbEntityInformation<>(PartitionPerson.class);
|
||||
containerName = personInfo.getCollectionName();
|
||||
|
||||
mappingContext.setInitialEntitySet(new EntityScanner(this.applicationContext).scan(Persistent.class));
|
||||
|
||||
final MappingDocumentDbConverter dbConverter = new MappingDocumentDbConverter(mappingContext, objectMapper);
|
||||
dbTemplate = new ReactiveCosmosTemplate(dbFactory, dbConverter, DB_NAME);
|
||||
|
||||
dbTemplate.createCollectionIfNotExists(personInfo).block().container();
|
||||
dbTemplate.insert(TEST_PERSON).block();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
dbTemplate.deleteContainer(PartitionPerson.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindWithPartition() {
|
||||
final Criteria criteria = Criteria.getInstance(IS_EQUAL, PROPERTY_LAST_NAME, Arrays.asList(LAST_NAME));
|
||||
final DocumentQuery query = new DocumentQuery(criteria);
|
||||
final Flux<PartitionPerson> partitionPersonFlux = dbTemplate.find(query, PartitionPerson.class,
|
||||
PartitionPerson.class.getSimpleName());
|
||||
StepVerifier.create(partitionPersonFlux).consumeNextWith(actual -> {
|
||||
Assert.assertThat(actual.getFirstName(), is(equalTo(TEST_PERSON.getFirstName())));
|
||||
Assert.assertThat(actual.getLastName(), is(equalTo(TEST_PERSON.getLastName())));
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
// @Test
|
||||
// public void testFindByNonExistIdWithPartition() {
|
||||
//
|
||||
// }
|
||||
|
||||
@Test
|
||||
public void testUpsertNewDocumentPartition() {
|
||||
final String firstName = NEW_FIRST_NAME + "_" + UUID.randomUUID().toString();
|
||||
final PartitionPerson newPerson = new PartitionPerson(UUID.randomUUID().toString(), firstName, NEW_LAST_NAME,
|
||||
null, null);
|
||||
final String partitionKeyValue = newPerson.getLastName();
|
||||
final Mono<PartitionPerson> upsert = dbTemplate.upsert(newPerson, new PartitionKey(partitionKeyValue));
|
||||
StepVerifier.create(upsert).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateWithPartition() {
|
||||
final PartitionPerson updated = new PartitionPerson(TEST_PERSON.getId(), UPDATED_FIRST_NAME,
|
||||
TEST_PERSON.getLastName(), TEST_PERSON.getHobbies(), TEST_PERSON.getShippingAddresses());
|
||||
dbTemplate.upsert(updated, new PartitionKey(updated.getLastName())).block();
|
||||
|
||||
final PartitionPerson person = dbTemplate.findAll(PartitionPerson.class.getSimpleName(), PartitionPerson.class)
|
||||
.toStream()
|
||||
.filter(p -> TEST_PERSON.getId().equals(p.getId())).findFirst().get();
|
||||
assertTrue(person.equals(updated));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteByIdPartition() {
|
||||
dbTemplate.insert(TEST_PERSON_2, new PartitionKey(TEST_PERSON_2.getLastName())).block();
|
||||
System.out.println("TEST_PERSON_2 = " + TEST_PERSON_2);
|
||||
StepVerifier.create(dbTemplate.findAll(PartitionPerson.class)).expectNextCount(2).verifyComplete();
|
||||
|
||||
dbTemplate.deleteById(PartitionPerson.class.getSimpleName(),
|
||||
TEST_PERSON.getId(), new PartitionKey(TEST_PERSON.getLastName())).block();
|
||||
StepVerifier.create(dbTemplate.findAll(PartitionPerson.class))
|
||||
.expectNext(TEST_PERSON_2)
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountForPartitionedCollection() {
|
||||
StepVerifier.create(dbTemplate.count(containerName)).expectNext((long) 1).verifyComplete();
|
||||
dbTemplate.insert(TEST_PERSON_2, new PartitionKey(TEST_PERSON_2.getLastName())).block();
|
||||
StepVerifier.create(dbTemplate.count(containerName)).expectNext((long) 2).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountForPartitionedCollectionByQuery() {
|
||||
dbTemplate.insert(TEST_PERSON_2, new PartitionKey(TEST_PERSON_2.getLastName())).block();
|
||||
final Criteria criteria = Criteria.getInstance(IS_EQUAL, "firstName",
|
||||
Arrays.asList(TEST_PERSON_2.getFirstName()));
|
||||
final DocumentQuery query = new DocumentQuery(criteria);
|
||||
StepVerifier.create(dbTemplate.count(query, containerName)).expectNext((long) 1).verifyComplete();
|
||||
|
||||
}
|
||||
}
|
||||
|
Загрузка…
Ссылка в новой задаче