* null return fix and logging

* 2.6.2
This commit is contained in:
Mohammad Derakhshani 2019-10-04 22:53:39 -07:00 коммит произвёл Kushagra Thapar
Родитель 486f2a9010
Коммит ed8a0144fb
18 изменённых файлов: 125 добавлений и 31 удалений

Просмотреть файл

@ -37,7 +37,7 @@ For example, using maven, you can add the following dependency to your maven pom
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</dependency>
```

Просмотреть файл

@ -29,7 +29,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-benchmark</artifactId>

Просмотреть файл

@ -1,5 +1,10 @@
# Changelog
## 2.6.2
- Fixed query failure when setting MaxItemCount to -1 ([#261](https://github.com/Azure/azure-cosmosdb-java/issues/261)).
- Fixed a NPE bug on Partitoin split ([#267](https://github.com/Azure/azure-cosmosdb-java/pull/267).
- Improved logging in Direct mode.
## 2.6.1
- Multimaster regional failover fixes for query logic ([#245](https://github.com/Azure/azure-cosmosdb-java/commit/104b4a3b30ffd7c8add01096ce6a9b6e7a3f6318))

Просмотреть файл

@ -28,7 +28,7 @@ SOFTWARE.
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-commons-test-utils</artifactId>
<name>Common Test Components for Testing Async SDK for SQL API of Azure Cosmos DB Service</name>

Просмотреть файл

@ -27,7 +27,7 @@ SOFTWARE.
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-commons</artifactId>
<name>Common Components for Async SDK for SQL API of Azure Cosmos DB Service</name>

Просмотреть файл

@ -275,7 +275,7 @@ public class HttpConstants {
// TODO: FIXME we can use maven plugin for generating a version file
// @see
// https://stackoverflow.com/questions/2469922/generate-a-version-java-file-in-maven
public static final String SDK_VERSION = "2.6.1";
public static final String SDK_VERSION = "2.6.2";
public static final String SDK_NAME = "cosmosdb-java-sdk";
public static final String QUERY_VERSION = "1.0";
}

Просмотреть файл

@ -23,24 +23,36 @@
package com.microsoft.azure.cosmosdb.internal;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JavaStreamUtils {
private static <T> String safeToString(T t) {
return t != null ? t.toString() : "null";
}
public static <T> String toString(Collection<T> collection, String delimiter) {
return collection.stream()
.map( t -> safeToString(t) )
.collect(Collectors.joining(delimiter));
public static <T> String info(Collection<T> collection) {
return collection == null ? "null collection" :
"collection size: " + collection.size();
}
public static <T> String info(T[] collection) {
return collection == null ? "null collection" :
"collection size: " + collection.length;
}
public static <T> String toString(Collection<T> collection, String delimiter) {
return collection == null ? "null collection" :
collection.isEmpty() ? "empty collection" :
collection.stream()
.map(t -> safeToString(t))
.collect(Collectors.joining(delimiter));
}
public static <T> String toString(T[] array, String delimiter) {
return array == null ? "null array" :
toString(Arrays.asList(array), delimiter);
}
}

Просмотреть файл

@ -82,6 +82,14 @@ public final class PartitionKeyRangeIdentity {
return String.format("%s", this.partitionKeyRangeId);
}
@Override
public String toString() {
return "PartitionKeyRangeIdentity{" +
"collectionRid='" + collectionRid + '\'' +
", partitionKeyRangeId='" + partitionKeyRangeId + '\'' +
'}';
}
@Override
public boolean equals(Object other) {
if (null == other) {

Просмотреть файл

@ -28,7 +28,7 @@ SOFTWARE.
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-direct</artifactId>
<name>Azure Cosmos DB Async SDK Direct Internal Implementation</name>
<version>2.6.1</version>
<version>2.6.2</version>
<description>Azure Cosmos DB Async SDK Direct Internal Implementation</description>
<url>https://docs.microsoft.com/en-us/azure/cosmos-db</url>
<packaging>jar</packaging>

Просмотреть файл

@ -249,7 +249,9 @@ public class AddressResolver implements IAddressResolver {
if (range == null) {
// Collection cache or routing map cache is potentially outdated. Return null -
// upper logic will refresh cache and retry.
return null;
logger.debug("Collection cache or routing map cache is potentially outdated." +
" Returning null. Upper logic will refresh cache and retry.");
return Single.just(null);
}
Single<AddressInformation[]> addressesObs = this.addressCache.tryGetAddresses(
@ -295,6 +297,7 @@ public class AddressResolver implements IAddressResolver {
return (PartitionKeyRange) routingMap.getOrderedPartitionKeyRanges().get(0);
}
logger.debug("tryResolveSinglePartitionCollection: collectionCacheIsUptoDate = {}", collectionCacheIsUptoDate);
if (collectionCacheIsUptoDate) {
throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.getResourceAddress());
} else {
@ -573,6 +576,7 @@ public class AddressResolver implements IAddressResolver {
throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.getResourceAddress());
}
logger.debug("handleRangeAddressResolutionFailure returns null");
return null;
}

Просмотреть файл

@ -29,6 +29,7 @@ import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.internal.Constants;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.internal.JavaStreamUtils;
import com.microsoft.azure.cosmosdb.internal.OperationType;
import com.microsoft.azure.cosmosdb.internal.Paths;
import com.microsoft.azure.cosmosdb.internal.PathsHelper;
@ -161,6 +162,10 @@ public class GatewayAddressCache implements IAddressCache {
com.microsoft.azure.cosmosdb.rx.internal.Utils.checkNotNullOrThrow(request, "request", "");
com.microsoft.azure.cosmosdb.rx.internal.Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", "");
logger.debug("PartitionKeyRangeIdentity {}, forceRefreshPartitionAddresses {}",
partitionKeyRangeIdentity,
forceRefreshPartitionAddresses);
if (StringUtils.equals(partitionKeyRangeIdentity.getPartitionKeyRangeId(),
PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
@ -171,6 +176,8 @@ public class GatewayAddressCache implements IAddressCache {
Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity);
if (suboptimalServerPartitionTimestamp != null) {
logger.debug("suboptimalServerPartitionTimestamp is {}", suboptimalServerPartitionTimestamp);
boolean forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds()
> this.suboptimalPartitionForceRefreshIntervalInSeconds;
@ -179,6 +186,8 @@ public class GatewayAddressCache implements IAddressCache {
// and if they are equal, updates the key with a third value.
Instant newValue = this.suboptimalServerPartitionTimestamps.computeIfPresent(partitionKeyRangeIdentity,
(key, oldVal) -> {
logger.debug("key = {}, oldValue = {}", key, oldVal);
if (suboptimalServerPartitionTimestamp.equals(oldVal)) {
return Instant.MAX;
} else {
@ -186,7 +195,11 @@ public class GatewayAddressCache implements IAddressCache {
}
});
logger.debug("newValue is {}", newValue);
if (!newValue.equals(suboptimalServerPartitionTimestamp)) {
logger.debug("setting forceRefreshPartitionAddresses to true");
// the value was replaced;
forceRefreshPartitionAddresses = true;
}
@ -196,6 +209,8 @@ public class GatewayAddressCache implements IAddressCache {
final boolean forceRefreshPartitionAddressesModified = forceRefreshPartitionAddresses;
if (forceRefreshPartitionAddressesModified) {
logger.debug("refresh serverPartitionAddressCache for {}", partitionKeyRangeIdentity);
this.serverPartitionAddressCache.refresh(
partitionKeyRangeIdentity,
() -> this.getAddressesForRangeId(
@ -219,6 +234,10 @@ public class GatewayAddressCache implements IAddressCache {
return addressesObs.map(
addresses -> {
if (notAllReplicasAvailable(addresses)) {
if (logger.isDebugEnabled()) {
logger.debug("not all replicas available {}", JavaStreamUtils.info(addresses));
}
this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now());
}
@ -226,22 +245,27 @@ public class GatewayAddressCache implements IAddressCache {
}).onErrorResumeNext(ex -> {
DocumentClientException dce = com.microsoft.azure.cosmosdb.rx.internal.Utils.as(ex, DocumentClientException.class);
if (dce == null) {
logger.error("unexpected failure", ex);
if (forceRefreshPartitionAddressesModified) {
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
}
return Single.error(ex);
} else {
assert dce != null;
logger.debug("tryGetAddresses dce", dce);
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) ||
Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) ||
Exceptions.isSubStatusCode(dce, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) {
//remove from suboptimal cache in case the collection+pKeyRangeId combo is gone.
// remove from suboptimal cache in case the collection+pKeyRangeId combo is gone.
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
return null;
logger.debug("tryGetAddresses: inner onErrorResumeNext return null", dce);
return Single.just(null);
}
return Single.error(ex);
}
});
}
@ -250,6 +274,10 @@ public class GatewayAddressCache implements IAddressCache {
String collectionRid,
List<String> partitionKeyRangeIds,
boolean forceRefresh) {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", collectionRid,
JavaStreamUtils.toString(partitionKeyRangeIds, ","));
}
String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
HashMap<String, String> addressQuery = new HashMap<>();
@ -303,6 +331,9 @@ public class GatewayAddressCache implements IAddressCache {
HttpClientUtils.parseResponseAsync(rsp));
return dsrObs.map(
dsr -> {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync deserializes result");
}
logAddressResolutionEnd(request, identifier);
List<Address> addresses = dsr.getQueryResponse(Address.class);
return addresses;
@ -315,6 +346,7 @@ public class GatewayAddressCache implements IAddressCache {
}
private Single<Pair<PartitionKeyRangeIdentity, AddressInformation[]>> resolveMasterAsync(RxDocumentServiceRequest request, boolean forceRefresh, Map<String, Object> properties) {
logger.debug("resolveMasterAsync forceRefresh: {}", forceRefresh);
Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeInitial = this.masterPartitionAddressCache;
forceRefresh = forceRefresh ||
@ -366,18 +398,27 @@ public class GatewayAddressCache implements IAddressCache {
String collectionRid,
String partitionKeyRangeId,
boolean forceRefresh) {
logger.debug("getAddressesForRangeId collectionRid {}, partitionKeyRangeId {}, forceRefresh {}",
collectionRid, partitionKeyRangeId, forceRefresh);
Single<List<Address>> addressResponse = this.getServerAddressesViaGatewayAsync(request, collectionRid, Collections.singletonList(partitionKeyRangeId), forceRefresh);
Single<List<Pair<PartitionKeyRangeIdentity, AddressInformation[]>>> addressInfos =
addressResponse.map(
addresses ->
addresses.stream().filter(addressInfo ->
addresses -> {
if (logger.isDebugEnabled()) {
logger.debug("addresses from getServerAddressesViaGatewayAsync in getAddressesForRangeId {}",
JavaStreamUtils.info(addresses));
}
return addresses.stream().filter(addressInfo ->
this.protocolScheme.equals(addressInfo.getProtocolScheme()))
.collect(Collectors.groupingBy(
address -> address.getParitionKeyRangeId()))
.values().stream()
.map(groupedAddresses -> toPartitionAddressAndRange(collectionRid, addresses))
.collect(Collectors.toList()));
.collect(Collectors.toList());
});
Single<List<Pair<PartitionKeyRangeIdentity, AddressInformation[]>>> result = addressInfos.map(addressInfo -> addressInfo.stream()
.filter(a ->
@ -386,8 +427,11 @@ public class GatewayAddressCache implements IAddressCache {
return result.flatMap(
list -> {
if (list.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("getAddressesForRangeId flatMap got result {}", JavaStreamUtils.info(list));
}
if (list.isEmpty()) {
String errorMessage = String.format(
RMResources.PartitionKeyRangeNotFound,
partitionKeyRangeId,
@ -400,7 +444,9 @@ public class GatewayAddressCache implements IAddressCache {
} else {
return Single.just(list.get(0).getRight());
}
});
}).doOnError(e -> {
logger.debug("getAddressesForRangeId", e);
});
}
Single<List<Address>> getMasterAddressesViaGatewayAsync(
@ -411,6 +457,20 @@ public class GatewayAddressCache implements IAddressCache {
boolean forceRefresh,
boolean useMasterCollectionResolver,
Map<String, Object> properties) {
logger.debug("getMasterAddressesViaGatewayAsync " +
"resourceType {}, " +
"resourceAddress {}, " +
"entryUrl {}, " +
"forceRefresh {}, " +
"useMasterCollectionResolver {}",
resourceType,
resourceAddress,
entryUrl,
forceRefresh,
useMasterCollectionResolver
);
HashMap<String, String> queryParameters = new HashMap<>();
queryParameters.put(HttpConstants.QueryStrings.URL, HttpUtils.urlEncode(entryUrl));
HashMap<String, String> headers = new HashMap<>(defaultRequestHeaders);
@ -456,6 +516,7 @@ public class GatewayAddressCache implements IAddressCache {
}
private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddressAndRange(String collectionRid, List<Address> addresses) {
logger.debug("toPartitionAddressAndRange");
Address address = addresses.get(0);
AddressInformation[] addressInfos =
@ -472,6 +533,10 @@ public class GatewayAddressCache implements IAddressCache {
public Completable openAsync(
DocumentCollection collection,
List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {
if (logger.isDebugEnabled()) {
logger.debug("openAsync {}", collection, JavaStreamUtils.toString(partitionKeyRangeIdentities, ","));
}
List<Observable<List<Address>>> tasks = new ArrayList<>();
int batchSize = GatewayAddressCache.DefaultBatchSize;

Просмотреть файл

@ -83,7 +83,7 @@ public class GoneAndRetryWithRetryPolicy implements IRetryPolicy {
(this.request.getPartitionKeyRangeIdentity() == null ||
this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) &&
!(exception instanceof PartitionKeyRangeIsSplittingException)) {
logger.debug("Operation will NOT be retried. Current attempt {}, Exception: {} ", this.attemptCount,
logger.debug("Operation will NOT be retried. Current attempt {}, Exception:", this.attemptCount,
exception);
stopStopWatch(this.durationTimer);
return Single.just(ShouldRetryResult.noRetry());

Просмотреть файл

@ -29,7 +29,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-examples</artifactId>

Просмотреть файл

@ -28,7 +28,7 @@ SOFTWARE.
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-gateway</artifactId>
<name>Common Gateway Components for Async SDK for SQL API of Azure Cosmos DB Service</name>

Просмотреть файл

@ -81,7 +81,7 @@ public class ClientRetryPolicy implements IDocumentClientRetryPolicy {
@Override
public Single<ShouldRetryResult> shouldRetry(Exception e) {
logger.debug("retry count {}, isReadRequest {}, canUseMultipleWriteLocations {}, due to failure {}",
logger.debug("retry count {}, isReadRequest {}, canUseMultipleWriteLocations {}, due to failure:",
cnt.incrementAndGet(),
isReadRequest,
canUseMultipleWriteLocations,

Просмотреть файл

@ -105,7 +105,7 @@ public class AsyncCache<TKey, TValue> {
}, err -> {
logger.debug("cache[{}] resulted in error {}, computing new value", key, err);
logger.debug("cache[{}] resulted in error, computing new value", key, err);
AsyncLazy<TValue> asyncLazy = new AsyncLazy<>(singleValueInitFunc);
AsyncLazy<TValue> resultAsyncLazy = values.merge(key, asyncLazy,
(lazyValue1, lazyValu2) -> lazyValue1 == initialLazyValue ? lazyValu2 : lazyValue1);

Просмотреть файл

@ -27,7 +27,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
<packaging>pom</packaging>
<name>Azure Cosmos DB SQL API</name>
<description>Java Async SDK (with Reactive Extension RX support) for Azure Cosmos DB SQL API</description>

Просмотреть файл

@ -27,7 +27,7 @@ SOFTWARE.
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb</artifactId>
<name>Async SDK for SQL API of Azure Cosmos DB Service</name>