Removed dependency from reactive cosmos template because it was initiating two cosmos clients (#412)
This commit is contained in:
Родитель
d1304164a1
Коммит
cb6a76cb23
|
@ -18,6 +18,7 @@ import com.microsoft.azure.documentdb.DocumentCollection;
|
||||||
import com.microsoft.azure.documentdb.PartitionKey;
|
import com.microsoft.azure.documentdb.PartitionKey;
|
||||||
import com.microsoft.azure.spring.data.cosmosdb.CosmosDbFactory;
|
import com.microsoft.azure.spring.data.cosmosdb.CosmosDbFactory;
|
||||||
import com.microsoft.azure.spring.data.cosmosdb.core.convert.MappingDocumentDbConverter;
|
import com.microsoft.azure.spring.data.cosmosdb.core.convert.MappingDocumentDbConverter;
|
||||||
|
import com.microsoft.azure.spring.data.cosmosdb.core.generator.CountQueryGenerator;
|
||||||
import com.microsoft.azure.spring.data.cosmosdb.core.generator.FindQuerySpecGenerator;
|
import com.microsoft.azure.spring.data.cosmosdb.core.generator.FindQuerySpecGenerator;
|
||||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
|
import com.microsoft.azure.spring.data.cosmosdb.core.query.Criteria;
|
||||||
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
|
import com.microsoft.azure.spring.data.cosmosdb.core.query.CriteriaType;
|
||||||
|
@ -47,8 +48,9 @@ import java.util.stream.Collectors;
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DocumentDbTemplate implements DocumentDbOperations, ApplicationContextAware {
|
public class DocumentDbTemplate implements DocumentDbOperations, ApplicationContextAware {
|
||||||
|
|
||||||
|
private static final String COUNT_VALUE_KEY = "_aggregate";
|
||||||
|
|
||||||
private final MappingDocumentDbConverter mappingDocumentDbConverter;
|
private final MappingDocumentDbConverter mappingDocumentDbConverter;
|
||||||
private final ReactiveCosmosTemplate reactiveCosmosTemplate;
|
|
||||||
private final String databaseName;
|
private final String databaseName;
|
||||||
|
|
||||||
private final CosmosClient cosmosClient;
|
private final CosmosClient cosmosClient;
|
||||||
|
@ -61,7 +63,6 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
|
|
||||||
this.mappingDocumentDbConverter = mappingDocumentDbConverter;
|
this.mappingDocumentDbConverter = mappingDocumentDbConverter;
|
||||||
|
|
||||||
this.reactiveCosmosTemplate = new ReactiveCosmosTemplate(cosmosDbFactory, mappingDocumentDbConverter, dbName);
|
|
||||||
this.databaseName = dbName;
|
this.databaseName = dbName;
|
||||||
this.cosmosClient = cosmosDbFactory.getCosmosClient();
|
this.cosmosClient = cosmosDbFactory.getCosmosClient();
|
||||||
}
|
}
|
||||||
|
@ -201,7 +202,12 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
@Override
|
@Override
|
||||||
public void deleteCollection(@NonNull String collectionName) {
|
public void deleteCollection(@NonNull String collectionName) {
|
||||||
Assert.hasText(collectionName, "collectionName should have text.");
|
Assert.hasText(collectionName, "collectionName should have text.");
|
||||||
reactiveCosmosTemplate.deleteContainer(collectionName);
|
try {
|
||||||
|
cosmosClient.getDatabase(this.databaseName).getContainer(collectionName).delete().block();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new DocumentDBAccessException("failed to delete collection: " + collectionName,
|
||||||
|
e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getCollectionName(Class<?> domainClass) {
|
public String getCollectionName(Class<?> domainClass) {
|
||||||
|
@ -212,8 +218,13 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DocumentCollection createCollectionIfNotExists(@NonNull DocumentDbEntityInformation information) {
|
public DocumentCollection createCollectionIfNotExists(@NonNull DocumentDbEntityInformation information) {
|
||||||
final CosmosContainerResponse response = reactiveCosmosTemplate
|
final CosmosContainerResponse response = cosmosClient
|
||||||
.createCollectionIfNotExists(information)
|
.createDatabaseIfNotExists(this.databaseName)
|
||||||
|
.flatMap(cosmosDatabaseResponse -> cosmosDatabaseResponse
|
||||||
|
.database()
|
||||||
|
.createContainerIfNotExists(information.getCollectionName(),
|
||||||
|
"/" + information.getPartitionKeyFieldName())
|
||||||
|
.map(cosmosContainerResponse -> cosmosContainerResponse))
|
||||||
.block();
|
.block();
|
||||||
if (response == null) {
|
if (response == null) {
|
||||||
throw new DocumentDBAccessException("Failed to create collection");
|
throw new DocumentDBAccessException("Failed to create collection");
|
||||||
|
@ -232,7 +243,15 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
pk = com.azure.data.cosmos.PartitionKey.None;
|
pk = com.azure.data.cosmos.PartitionKey.None;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
reactiveCosmosTemplate.deleteById(collectionName, id, pk).block();
|
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
|
||||||
|
options.partitionKey(pk);
|
||||||
|
cosmosClient.getDatabase(this.databaseName)
|
||||||
|
.getContainer(collectionName)
|
||||||
|
.getItem(id.toString(), partitionKey)
|
||||||
|
.delete(options)
|
||||||
|
.onErrorResume(Mono::error)
|
||||||
|
.then()
|
||||||
|
.block();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new DocumentDBAccessException("deleteById exception", e);
|
throw new DocumentDBAccessException("deleteById exception", e);
|
||||||
}
|
}
|
||||||
|
@ -255,7 +274,10 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
Assert.hasText(collectionName, "collection should not be null, empty or only whitespaces");
|
Assert.hasText(collectionName, "collection should not be null, empty or only whitespaces");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return reactiveCosmosTemplate.find(query, domainClass, collectionName).collectList().block();
|
return findDocuments(query, domainClass, collectionName)
|
||||||
|
.stream()
|
||||||
|
.map(cosmosItemProperties -> cosmosItemProperties.toObject(domainClass))
|
||||||
|
.collect(Collectors.toList());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new DocumentDBAccessException("Failed to execute find operation from " + collectionName, e);
|
throw new DocumentDBAccessException("Failed to execute find operation from " + collectionName, e);
|
||||||
}
|
}
|
||||||
|
@ -355,7 +377,8 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
public long count(String collectionName) {
|
public long count(String collectionName) {
|
||||||
Assert.hasText(collectionName, "collectionName should not be empty");
|
Assert.hasText(collectionName, "collectionName should not be empty");
|
||||||
|
|
||||||
final Long count = reactiveCosmosTemplate.count(collectionName).block();
|
final DocumentQuery query = new DocumentQuery(Criteria.getInstance(CriteriaType.ALL));
|
||||||
|
final Long count = getCountValue(query, true, collectionName);
|
||||||
if (count == null) {
|
if (count == null) {
|
||||||
throw new DocumentDBAccessException("Failed to get count for collectionName: " + collectionName);
|
throw new DocumentDBAccessException("Failed to get count for collectionName: " + collectionName);
|
||||||
}
|
}
|
||||||
|
@ -367,8 +390,9 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
Assert.notNull(domainClass, "domainClass should not be null");
|
Assert.notNull(domainClass, "domainClass should not be null");
|
||||||
Assert.hasText(collectionName, "collectionName should not be empty");
|
Assert.hasText(collectionName, "collectionName should not be empty");
|
||||||
|
|
||||||
final boolean isCrossPartitionQuery = query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
|
final boolean isCrossPartitionQuery =
|
||||||
final Long count = reactiveCosmosTemplate.count(query, isCrossPartitionQuery, collectionName).block();
|
query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
|
||||||
|
final Long count = getCountValue(query, isCrossPartitionQuery, collectionName);
|
||||||
if (count == null) {
|
if (count == null) {
|
||||||
throw new DocumentDBAccessException("Failed to get count for collectionName: " + collectionName);
|
throw new DocumentDBAccessException("Failed to get count for collectionName: " + collectionName);
|
||||||
}
|
}
|
||||||
|
@ -380,6 +404,30 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
return this.mappingDocumentDbConverter;
|
return this.mappingDocumentDbConverter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Long getCountValue(DocumentQuery query, boolean isCrossPartitionQuery, String containerName) {
|
||||||
|
final SqlQuerySpec querySpec = new CountQueryGenerator().generateCosmos(query);
|
||||||
|
final FeedOptions options = new FeedOptions();
|
||||||
|
|
||||||
|
options.enableCrossPartitionQuery(isCrossPartitionQuery);
|
||||||
|
|
||||||
|
return executeQuery(querySpec, containerName, options)
|
||||||
|
.onErrorResume(this::databaseAccessExceptionHandler)
|
||||||
|
.next()
|
||||||
|
.map(r -> r.results().get(0).getLong(COUNT_VALUE_KEY))
|
||||||
|
.block();
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> Mono<T> databaseAccessExceptionHandler(Throwable e) {
|
||||||
|
throw new DocumentDBAccessException("failed to access cosmosdb database", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Flux<FeedResponse<CosmosItemProperties>> executeQuery(SqlQuerySpec sqlQuerySpec, String collectionName,
|
||||||
|
FeedOptions options) {
|
||||||
|
return cosmosClient.getDatabase(this.databaseName)
|
||||||
|
.getContainer(collectionName)
|
||||||
|
.queryItems(sqlQuerySpec, options);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private List<String> getPartitionKeyNames(Class<?> domainClass) {
|
private List<String> getPartitionKeyNames(Class<?> domainClass) {
|
||||||
final DocumentDbEntityInformation entityInfo = new DocumentDbEntityInformation(domainClass);
|
final DocumentDbEntityInformation entityInfo = new DocumentDbEntityInformation(domainClass);
|
||||||
|
@ -405,10 +453,12 @@ public class DocumentDbTemplate implements DocumentDbOperations, ApplicationCont
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CosmosItemProperties> findDocuments(@NonNull DocumentQuery query, @NonNull Class<?> domainClass,
|
private List<CosmosItemProperties> findDocuments(@NonNull DocumentQuery query,
|
||||||
|
@NonNull Class<?> domainClass,
|
||||||
@NonNull String containerName) {
|
@NonNull String containerName) {
|
||||||
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
|
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
|
||||||
final boolean isCrossPartitionQuery = query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
|
final boolean isCrossPartitionQuery =
|
||||||
|
query.isCrossPartitionQuery(getPartitionKeyNames(domainClass));
|
||||||
final FeedOptions feedOptions = new FeedOptions();
|
final FeedOptions feedOptions = new FeedOptions();
|
||||||
feedOptions.enableCrossPartitionQuery(isCrossPartitionQuery);
|
feedOptions.enableCrossPartitionQuery(isCrossPartitionQuery);
|
||||||
return cosmosClient
|
return cosmosClient
|
||||||
|
|
Загрузка…
Ссылка в новой задаче