Adding collection routing map refresh flag on partition migration (#285)
* adding collection routing map refresh flag on partition migration * resolving formatting comments
This commit is contained in:
Родитель
9085d5aa44
Коммит
eeeabcd3f0
|
@ -148,6 +148,7 @@ public class HttpConstants {
|
|||
|
||||
// Address related headers.
|
||||
public static final String FORCE_REFRESH = "x-ms-force-refresh";
|
||||
public static final String FORCE_COLLECTION_ROUTING_MAP_REFRESH = "x-ms-collectionroutingmap-refresh";
|
||||
public static final String ITEM_COUNT = "x-ms-item-count";
|
||||
public static final String NEW_RESOURCE_ID = "x-ms-new-resource-id";
|
||||
public static final String USE_MASTER_COLLECTION_RESOLVER = "x-ms-use-master-collection-resolver";
|
||||
|
|
|
@ -58,7 +58,6 @@ import rx.Observable;
|
|||
import rx.Single;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.time.Duration;
|
||||
|
@ -288,6 +287,10 @@ public class GatewayAddressCache implements IAddressCache {
|
|||
headers.put(HttpConstants.HttpHeaders.FORCE_REFRESH, Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
if (request.forceCollectionRoutingMapRefresh) {
|
||||
headers.put(HttpConstants.HttpHeaders.FORCE_COLLECTION_ROUTING_MAP_REFRESH, Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
addressQuery.put(HttpConstants.QueryStrings.FILTER, HttpUtils.urlEncode(this.protocolFilter));
|
||||
|
||||
addressQuery.put(HttpConstants.QueryStrings.PARTITION_KEY_RANGE_IDS, String.join(",", partitionKeyRangeIds));
|
||||
|
@ -483,6 +486,10 @@ public class GatewayAddressCache implements IAddressCache {
|
|||
headers.put(HttpConstants.HttpHeaders.USE_MASTER_COLLECTION_RESOLVER, Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
if (request.forceCollectionRoutingMapRefresh) {
|
||||
headers.put(HttpConstants.HttpHeaders.FORCE_COLLECTION_ROUTING_MAP_REFRESH, Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
queryParameters.put(HttpConstants.QueryStrings.FILTER, HttpUtils.urlEncode(this.protocolFilter));
|
||||
headers.put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123());
|
||||
String token = this.tokenProvider.getUserAuthorizationToken(
|
||||
|
|
|
@ -44,7 +44,6 @@ import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientImpl;
|
|||
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.reactivex.netty.protocol.http.client.CompositeHttpClient;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
|
@ -120,20 +119,26 @@ public class GatewayAddressCacheTest extends TestSuiteBase {
|
|||
null,
|
||||
getCompositeHttpClient(configs));
|
||||
|
||||
RxDocumentServiceRequest req =
|
||||
RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document,
|
||||
collectionLink + "/docs/",
|
||||
getDocumentDefinition(), new HashMap<>());
|
||||
for (int i = 0; i < 2; i++) {
|
||||
RxDocumentServiceRequest req =
|
||||
RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document,
|
||||
collectionLink + "/docs/",
|
||||
getDocumentDefinition(), new HashMap<>());
|
||||
|
||||
Single<List<Address>> addresses = cache.getServerAddressesViaGatewayAsync(
|
||||
req, createdCollection.getResourceId(), partitionKeyRangeIds, false);
|
||||
if (i == 1) {
|
||||
req.forceCollectionRoutingMapRefresh = true; //testing address api with x-ms-collectionroutingmap-refresh true
|
||||
}
|
||||
|
||||
PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
|
||||
.withProtocol(protocol)
|
||||
.replicasOfPartitions(partitionKeyRangeIds)
|
||||
.build();
|
||||
Single<List<Address>> addresses = cache.getServerAddressesViaGatewayAsync(
|
||||
req, createdCollection.getResourceId(), partitionKeyRangeIds, false);
|
||||
|
||||
validateSuccess(addresses, validator, TIMEOUT);
|
||||
PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
|
||||
.withProtocol(protocol)
|
||||
.replicasOfPartitions(partitionKeyRangeIds)
|
||||
.build();
|
||||
|
||||
validateSuccess(addresses, validator, TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(groups = { "direct" }, dataProvider = "protocolProvider", timeOut = TIMEOUT)
|
||||
|
@ -148,21 +153,25 @@ public class GatewayAddressCacheTest extends TestSuiteBase {
|
|||
authorizationTokenProvider,
|
||||
null,
|
||||
getCompositeHttpClient(configs));
|
||||
for (int i = 0; i < 2; i++) {
|
||||
RxDocumentServiceRequest req =
|
||||
RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Database,
|
||||
"/dbs",
|
||||
new Database(), new HashMap<>());
|
||||
|
||||
RxDocumentServiceRequest req =
|
||||
RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Database,
|
||||
"/dbs",
|
||||
new Database(), new HashMap<>());
|
||||
Single<List<Address>> addresses = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database,
|
||||
null, "/dbs/", false, false, null);
|
||||
if (i == 1) {
|
||||
req.forceCollectionRoutingMapRefresh = true; //testing address api with x-ms-collectionroutingmap-refresh true
|
||||
}
|
||||
|
||||
Single<List<Address>> addresses = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database,
|
||||
null, "/dbs/", false, false, null);
|
||||
PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
|
||||
.withProtocol(protocol)
|
||||
.replicasOfSamePartition()
|
||||
.build();
|
||||
|
||||
PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
|
||||
.withProtocol(protocol)
|
||||
.replicasOfSamePartition()
|
||||
.build();
|
||||
|
||||
validateSuccess(addresses, validator, TIMEOUT);
|
||||
validateSuccess(addresses, validator, TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
@DataProvider(name = "targetPartitionsKeyRangeAndCollectionLinkParams")
|
||||
|
|
Загрузка…
Ссылка в новой задаче