From eeeabcd3f08db39e853e9ee724ee32677d817dc4 Mon Sep 17 00:00:00 2001 From: Naveen Singh Date: Fri, 8 Nov 2019 17:56:35 -0500 Subject: [PATCH] Adding collection routing map refresh flag on partition migration (#285) * adding collection routing map refresh flag on partition migration * resolving formatting comments --- .../cosmosdb/internal/HttpConstants.java | 1 + .../GatewayAddressCache.java | 9 ++- .../GatewayAddressCacheTest.java | 57 +++++++++++-------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java index 26e8003b..5b47aaa5 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java @@ -148,6 +148,7 @@ public class HttpConstants { // Address related headers. public static final String FORCE_REFRESH = "x-ms-force-refresh"; + public static final String FORCE_COLLECTION_ROUTING_MAP_REFRESH = "x-ms-collectionroutingmap-refresh"; public static final String ITEM_COUNT = "x-ms-item-count"; public static final String NEW_RESOURCE_ID = "x-ms-new-resource-id"; public static final String USE_MASTER_COLLECTION_RESOLVER = "x-ms-use-master-collection-resolver"; diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCache.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCache.java index 2e4573f3..5005cc55 100644 --- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCache.java +++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCache.java @@ -58,7 +58,6 @@ import rx.Observable; import rx.Single; import java.net.MalformedURLException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.time.Duration; @@ -288,6 +287,10 @@ public class GatewayAddressCache implements IAddressCache { headers.put(HttpConstants.HttpHeaders.FORCE_REFRESH, Boolean.TRUE.toString()); } + if (request.forceCollectionRoutingMapRefresh) { + headers.put(HttpConstants.HttpHeaders.FORCE_COLLECTION_ROUTING_MAP_REFRESH, Boolean.TRUE.toString()); + } + addressQuery.put(HttpConstants.QueryStrings.FILTER, HttpUtils.urlEncode(this.protocolFilter)); addressQuery.put(HttpConstants.QueryStrings.PARTITION_KEY_RANGE_IDS, String.join(",", partitionKeyRangeIds)); @@ -483,6 +486,10 @@ public class GatewayAddressCache implements IAddressCache { headers.put(HttpConstants.HttpHeaders.USE_MASTER_COLLECTION_RESOLVER, Boolean.TRUE.toString()); } + if (request.forceCollectionRoutingMapRefresh) { + headers.put(HttpConstants.HttpHeaders.FORCE_COLLECTION_ROUTING_MAP_REFRESH, Boolean.TRUE.toString()); + } + queryParameters.put(HttpConstants.QueryStrings.FILTER, HttpUtils.urlEncode(this.protocolFilter)); headers.put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123()); String token = this.tokenProvider.getUserAuthorizationToken( diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java index 1ee77206..5bfe9aa4 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java @@ -44,7 +44,6 @@ import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientImpl; import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest; import io.netty.buffer.ByteBuf; import io.reactivex.netty.protocol.http.client.CompositeHttpClient; -import org.apache.commons.lang3.StringUtils; import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -120,20 +119,26 @@ public class GatewayAddressCacheTest extends TestSuiteBase { null, getCompositeHttpClient(configs)); - RxDocumentServiceRequest req = - RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document, - collectionLink + "/docs/", - getDocumentDefinition(), new HashMap<>()); + for (int i = 0; i < 2; i++) { + RxDocumentServiceRequest req = + RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document, + collectionLink + "/docs/", + getDocumentDefinition(), new HashMap<>()); - Single> addresses = cache.getServerAddressesViaGatewayAsync( - req, createdCollection.getResourceId(), partitionKeyRangeIds, false); + if (i == 1) { + req.forceCollectionRoutingMapRefresh = true; //testing address api with x-ms-collectionroutingmap-refresh true + } - PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder() - .withProtocol(protocol) - .replicasOfPartitions(partitionKeyRangeIds) - .build(); + Single> addresses = cache.getServerAddressesViaGatewayAsync( + req, createdCollection.getResourceId(), partitionKeyRangeIds, false); - validateSuccess(addresses, validator, TIMEOUT); + PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder() + .withProtocol(protocol) + .replicasOfPartitions(partitionKeyRangeIds) + .build(); + + validateSuccess(addresses, validator, TIMEOUT); + } } @Test(groups = { "direct" }, dataProvider = "protocolProvider", timeOut = TIMEOUT) @@ -148,21 +153,25 @@ public class GatewayAddressCacheTest extends TestSuiteBase { authorizationTokenProvider, null, getCompositeHttpClient(configs)); + for (int i = 0; i < 2; i++) { + RxDocumentServiceRequest req = + RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Database, + "/dbs", + new Database(), new HashMap<>()); - RxDocumentServiceRequest req = - RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Database, - "/dbs", - new Database(), new HashMap<>()); + Single> addresses = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database, + null, "/dbs/", false, false, null); + if (i == 1) { + req.forceCollectionRoutingMapRefresh = true; //testing address api with x-ms-collectionroutingmap-refresh true + } - Single> addresses = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database, - null, "/dbs/", false, false, null); + PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder() + .withProtocol(protocol) + .replicasOfSamePartition() + .build(); - PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder() - .withProtocol(protocol) - .replicasOfSamePartition() - .build(); - - validateSuccess(addresses, validator, TIMEOUT); + validateSuccess(addresses, validator, TIMEOUT); + } } @DataProvider(name = "targetPartitionsKeyRangeAndCollectionLinkParams")