memory improvements (cherry pick of #230 to master) (#248)

* version bump to 2.4.6-SNAPSHOT

* byte buffer pool optimized

* unnessery uuid conversion

* URI changed to Uri

* fixes

* initlize StringBuilder with proper size for auth token

* minor fix in eralier commits

* fixed test

* addressed review comments

* fix in reading bytes set limit, fixed test

* fixed a broken test

* some unrelated test failrue: fixed

* fixed tests

* 2.4.6 release

* added getters for Uri and made fields final
This commit is contained in:
Mohammad Derakhshani 2019-08-12 10:36:12 -07:00 коммит произвёл GitHub
Родитель b707094634
Коммит 0bc6bdc636
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
31 изменённых файлов: 556 добавлений и 332 удалений

Просмотреть файл

@ -26,6 +26,7 @@ package com.microsoft.azure.cosmosdb;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Uri;
import com.microsoft.azure.cosmosdb.internal.query.metrics.ClientSideMetrics;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceResponse;
import com.microsoft.azure.cosmosdb.rx.internal.Strings;
@ -216,7 +217,7 @@ public class BridgeInternal {
policy.setUsingMultipleWriteLocations(value);
}
public static <E extends DocumentClientException> URI getRequestUri(DocumentClientException documentClientException) {
public static <E extends DocumentClientException> Uri getRequestUri(DocumentClientException documentClientException) {
return documentClientException.requestUri;
}

Просмотреть файл

@ -27,6 +27,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Uri;
import org.apache.commons.lang3.StringUtils;
import com.microsoft.azure.cosmosdb.internal.Constants;
@ -55,11 +56,10 @@ public class DocumentClientException extends Exception {
private ClientSideRequestStatistics clientSideRequestStatistics;
private Error error;
long lsn;
String partitionKeyRangeId;
Map<String, String> requestHeaders;
URI requestUri;
Uri requestUri;
String resourceAddress;
private DocumentClientException(int statusCode, String message, Map<String, String> responseHeaders, Throwable cause) {

Просмотреть файл

@ -0,0 +1,135 @@
/*
* The MIT License (MIT)
* Copyright (c) 2018 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.cosmosdb.internal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
* Pool of Byte Buffers, this helps in re-using memory
*/
public class ByteBufferPool {
public class ByteBufferWrapper {
private final PoolSegment poolSegment;
private final ByteBuffer byteBuffer;
private ByteBufferWrapper(ByteBuffer byteBuffer, PoolSegment poolSegment) {
this.byteBuffer = byteBuffer;
this.poolSegment = poolSegment;
}
public ByteBuffer getByteBuffer() {
return this.byteBuffer;
}
}
private class PoolSegment {
public PoolSegment(int byteBufferSize, ConcurrentLinkedDeque<ByteBufferWrapper> byteBuffersPool) {
this.byteBufferSize = byteBufferSize;
this.byteBuffersPool = byteBuffersPool;
}
private final int byteBufferSize;
private final ConcurrentLinkedDeque<ByteBufferWrapper> byteBuffersPool;
}
private final ArrayList<PoolSegment> poolSegmentList = new ArrayList<>();
private final static ByteBufferPool instant = new ByteBufferPool();
public static ByteBufferPool getInstant() {
return instant;
}
private final Logger logger = LoggerFactory.getLogger(ByteBufferPool.class);
private ByteBufferPool() {
logger.debug("Initializing ByteBuffer Pool");
long totalSize = 0;
for(int byteBufferSize = 1024, segmentSize = 1024; segmentSize > 0; byteBufferSize *= 2, segmentSize /= 2) {
logger.debug("Creating pool segment: ByteBuffer size {}, pool segment size {}", byteBufferSize, segmentSize);
poolSegmentList.add(createByteBufferPoolSegment(byteBufferSize, segmentSize));
totalSize += (byteBufferSize * segmentSize);
}
logger.debug("Total ByteBuffer Pool Size {}", totalSize);
}
private PoolSegment createByteBufferPoolSegment(int byteBufferSize, int count) {
ConcurrentLinkedDeque<ByteBufferWrapper> deq = new ConcurrentLinkedDeque<>();
PoolSegment poolSegment = new PoolSegment(count, deq);
for(int i = 0; i < count; i++) {
deq.add(new ByteBufferWrapper(ByteBuffer.allocate(byteBufferSize), poolSegment));
}
return new PoolSegment(byteBufferSize, deq);
}
private int findLowestIndex(int size) {
for (int i = 0; i < poolSegmentList.size(); i++) {
if (poolSegmentList.get(i).byteBufferSize >= size) {
return i;
}
}
return -1;
}
public ByteBufferWrapper lease(int size) {
int poolIndex = findLowestIndex(size);
if (poolIndex == -1) {
logger.info("Requested byte buffer size {} is greater than the max the pool supports, creating a garbage collectable instance.", size);
return new ByteBufferWrapper(ByteBuffer.allocate(size), null);
}
for (int i = poolIndex; i < poolSegmentList.size(); i++) {
ByteBufferWrapper byteBuffer = poolSegmentList.get(i).byteBuffersPool.poll();
if (byteBuffer != null) {
return byteBuffer;
}
}
logger.warn("Configured Byte Buffer Pool is not sufficient, creating new garbage collectable instance");
return new ByteBufferWrapper(ByteBuffer.allocate(size), null);
}
public void release(ByteBufferWrapper byteBufferWrapper) {
PoolSegment parentPoolSegment = byteBufferWrapper.poolSegment;
if (parentPoolSegment != null) {
parentPoolSegment.byteBuffersPool.add(byteBufferWrapper);
return;
}
// else let it get garbage collected
}
}

Просмотреть файл

@ -23,6 +23,8 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity;
import org.apache.commons.lang3.StringUtils;
import java.util.Objects;
/**
@ -32,14 +34,33 @@ public class AddressInformation {
private Protocol protocol;
private boolean isPublic;
private boolean isPrimary;
private String physicalUri;
private Uri physicalUri;
public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUri, Protocol protocol) {
Objects.requireNonNull(protocol);
this.protocol = protocol;
this.isPublic = isPublic;
this.isPrimary = isPrimary;
this.physicalUri = physicalUri;
this.physicalUri = new Uri(normalizePhysicalUri(physicalUri));
}
private static String normalizePhysicalUri(String physicalUri) {
if (StringUtils.isEmpty(physicalUri)) {
return physicalUri;
}
// backend returns non normalized uri with "//" tail
// e.g, https://cdb-ms-prod-westus2-fd2.documents.azure.com:15248/apps/4f5c042d-76fb-4ce6-bda3-517e6ef3984f/
// services/cf4b9ab2-019c-45ca-ac88-25a92b66dddf/partitions/2078862a-d698-475b-a308-02598370d1d9/replicas/132077748219659199s//
// we should trim the tail double "//"
int i = physicalUri.length() -1;
while(i >= 0 && physicalUri.charAt(i) == '/') {
i--;
}
return physicalUri.substring(0, i + 1) + '/';
}
public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUri, String protocolScheme) {
@ -54,7 +75,7 @@ public class AddressInformation {
return isPrimary;
}
public String getPhysicalUri() {
public Uri getPhysicalUri() {
return physicalUri;
}

Просмотреть файл

@ -65,15 +65,6 @@ public class HttpUtils {
}
}
public static URI toURI(String uri) {
try {
return new URI(uri);
} catch (Exception e) {
log.error("failed to parse {}", uri, e);
throw new IllegalArgumentException("failed to parse uri " + uri, e);
}
}
public static Map<String, String> asMap(HttpResponseHeaders headers) {
if (headers == null) {
return new HashMap<>();

Просмотреть файл

@ -57,7 +57,7 @@ public class StoreResult {
final public boolean isGoneException;
final public boolean isNotFoundException;
final public boolean isInvalidPartitionException;
final public URI storePhysicalAddress;
final public Uri storePhysicalAddress;
public StoreResult(
StoreResponse storeResponse,
@ -69,7 +69,7 @@ public class StoreResult {
int currentReplicaSetSize,
int currentWriteQuorum,
boolean isValid,
URI storePhysicalAddress,
Uri storePhysicalAddress,
long globalCommittedLSN,
int numberOfReadRegions,
long itemLSN,

Просмотреть файл

@ -0,0 +1,75 @@
/*
* The MIT License (MIT)
* Copyright (c) 2018 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.cosmosdb.internal.directconnectivity;
import java.net.URI;
import java.util.Objects;
public class Uri {
private final String uriAsString;
private final URI uri;
public static Uri create(String uriAsString) {
return new Uri(uriAsString);
}
public Uri(String uri) {
this.uriAsString = uri;
URI uriValue = null;
try {
uriValue = URI.create(uri);
} catch (IllegalArgumentException e) {
uriValue = null;
}
this.uri = uriValue;
}
public URI getURI() {
return this.uri;
}
public String getURIAsString() {
return this.uriAsString;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Uri uri1 = (Uri) o;
return uriAsString.equals(uri1.uriAsString) &&
uri.equals(uri1.uri);
}
@Override
public int hashCode() {
return Objects.hash(uriAsString, uri);
}
@Override
public String toString() {
return this.uriAsString;
}
}

Просмотреть файл

@ -41,16 +41,16 @@ public class AddressSelector {
this.protocol = protocol;
}
public Single<List<URI>> resolveAllUriAsync(
public Single<List<Uri>> resolveAllUriAsync(
RxDocumentServiceRequest request,
boolean includePrimary,
boolean forceRefresh) {
Single<List<AddressInformation>> allReplicaAddressesObs = this.resolveAddressesAsync(request, forceRefresh);
return allReplicaAddressesObs.map(allReplicaAddresses -> allReplicaAddresses.stream().filter(a -> includePrimary || !a.isPrimary())
.map(a -> HttpUtils.toURI(a.getPhysicalUri())).collect(Collectors.toList()));
.map(a -> a.getPhysicalUri()).collect(Collectors.toList()));
}
public Single<URI> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
public Single<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
Single<List<AddressInformation>> replicaAddressesObs = this.resolveAddressesAsync(request, forceAddressRefresh);
return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
@ -61,7 +61,7 @@ public class AddressSelector {
});
}
public static URI getPrimaryUri(RxDocumentServiceRequest request, List<AddressInformation> replicaAddresses) throws GoneException {
public static Uri getPrimaryUri(RxDocumentServiceRequest request, List<AddressInformation> replicaAddresses) throws GoneException {
AddressInformation primaryAddress = null;
if (request.getDefaultReplicaIndex() != null) {
@ -70,17 +70,17 @@ public class AddressSelector {
primaryAddress = replicaAddresses.get(defaultReplicaIndex);
}
} else {
primaryAddress = replicaAddresses.stream().filter(address -> address.isPrimary() && !address.getPhysicalUri().contains("["))
primaryAddress = replicaAddresses.stream().filter(address -> address.isPrimary() && !address.getPhysicalUri().getURIAsString().contains("["))
.findAny().orElse(null);
}
if (primaryAddress == null) {
// Primary endpoint (of the desired protocol) was not found.
throw new GoneException(String.format("The requested resource is no longer available at the server. Returned addresses are {%s}",
String.join(",", replicaAddresses.stream().map(address -> address.getPhysicalUri()).collect(Collectors.toList()))), null);
String.join(",", replicaAddresses.stream().map(address -> address.getPhysicalUri().getURIAsString()).collect(Collectors.toList()))), null);
}
return HttpUtils.toURI(primaryAddress.getPhysicalUri());
return primaryAddress.getPhysicalUri();
}
public Single<List<AddressInformation>> resolveAddressesAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
@ -88,7 +88,7 @@ public class AddressSelector {
(this.addressResolver.resolveAsync(request, forceAddressRefresh))
.map(addresses -> Arrays.stream(addresses)
.filter(address -> {
return !Strings.isNullOrEmpty(address.getPhysicalUri()) && Strings.areEqualIgnoreCase(address.getProtocolScheme(), this.protocol.scheme());
return !Strings.isNullOrEmpty(address.getPhysicalUri().getURIAsString()) && Strings.areEqualIgnoreCase(address.getProtocolScheme(), this.protocol.scheme());
})
.collect(Collectors.toList()));

Просмотреть файл

@ -148,12 +148,12 @@ public class ConsistencyWriter {
if (request.requestContext.globalStrongWriteResponse == null) {
Single<List<AddressInformation>> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh);
AtomicReference<URI> primaryURI = new AtomicReference<>();
AtomicReference<Uri> primaryURI = new AtomicReference<>();
return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
List<URI> contactedReplicas = new ArrayList<>();
replicaAddresses.forEach(replicaAddress -> contactedReplicas.add(HttpUtils.toURI(replicaAddress.getPhysicalUri())));
replicaAddresses.forEach(replicaAddress -> contactedReplicas.add(replicaAddress.getPhysicalUri().getURI()));
request.requestContext.clientSideRequestStatistics.setContactedReplicas(contactedReplicas);
return Single.just(AddressSelector.getPrimaryUri(request, replicaAddresses));
} catch (GoneException e) {

Просмотреть файл

@ -23,6 +23,7 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.apache.commons.lang3.StringUtils;
@ -42,11 +43,11 @@ public class ErrorUtils {
return Single.just(StringUtils.EMPTY);
}
return getErrorFromStream(responseMessage.getContent());
return getErrorFromStream(responseMessage.getContent(), responseMessage.getHeaders().getIntHeader(HttpConstants.HttpHeaders.CONTENT_LENGTH, -1));
}
protected static Single<String> getErrorFromStream(Observable<ByteBuf> stream) {
return ResponseUtils.toString(stream).toSingle();
protected static Single<String> getErrorFromStream(Observable<ByteBuf> stream, int contentLength) {
return ResponseUtils.toString(stream, contentLength).toSingle();
}
protected static void logGoneException(URI physicalAddress, String activityId) {

Просмотреть файл

@ -49,7 +49,7 @@ public class HttpClientUtils {
}
private static Single<DocumentClientException> createDocumentClientException(HttpClientResponse<ByteBuf> responseMessage) {
Single<String> readStream = ResponseUtils.toString(responseMessage.getContent()).toSingle();
Single<String> readStream = ResponseUtils.toString(responseMessage.getContent(), responseMessage.getHeaders().getIntHeader(HttpConstants.HttpHeaders.CONTENT_LENGTH, -1)).toSingle();
return readStream.map(body -> {
Error error = new Error(body);

Просмотреть файл

@ -111,14 +111,16 @@ public class HttpTransportClient extends TransportClient {
}
public Single<StoreResponse> invokeStoreAsync(
URI physicalAddress,
Uri physicalAddressUri,
ResourceOperation resourceOperation,
RxDocumentServiceRequest request) {
try {
URI physicalAddress = physicalAddressUri.getURI();
// uuid correlation manager
UUID activityId = UUID.fromString(request.getActivityId());
String activityId = request.getActivityId();
if (resourceOperation.operationType == OperationType.Recreate) {
Map<String, String> errorResponseHeaders = new HashMap<>();
@ -128,7 +130,7 @@ public class HttpTransportClient extends TransportClient {
throw new InternalServerErrorException(RMResources.InternalServerError, null, errorResponseHeaders, null);
}
HttpClientRequest<ByteBuf> httpRequest = prepareHttpMessage(activityId, physicalAddress, resourceOperation, request);
HttpClientRequest<ByteBuf> httpRequest = prepareHttpMessage(activityId, physicalAddressUri.getURIAsString(), resourceOperation, request);
RxClient.ServerInfo serverInfo = new RxClient.ServerInfo(physicalAddress.getHost(), physicalAddress.getPort());
MutableVolatile<Instant> sendTimeUtc = new MutableVolatile<>();
@ -231,12 +233,12 @@ public class HttpTransportClient extends TransportClient {
}
}
private void beforeRequest(UUID activityId, String uri, ResourceType resourceType, HttpRequestHeaders requestHeaders) {
private void beforeRequest(String activityId, String uri, ResourceType resourceType, HttpRequestHeaders requestHeaders) {
// TODO: perf counters
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/258624
}
private void afterRequest(UUID activityId,
private void afterRequest(String activityId,
int statusCode,
double durationInMilliSeconds,
HttpResponseHeaders responseHeaders) {
@ -276,13 +278,13 @@ public class HttpTransportClient extends TransportClient {
}
private HttpClientRequest<ByteBuf> prepareHttpMessage(
UUID activityId,
URI physicalAddress,
String activityId,
String physicalAddress,
ResourceOperation resourceOperation,
RxDocumentServiceRequest request) throws Exception {
HttpClientRequest<ByteBuf> httpRequestMessage = null;
URI requestUri;
String requestUri;
HttpMethod method;
// The StreamContent created below will own and dispose its underlying stream, but we may need to reuse the stream on the
@ -488,7 +490,7 @@ public class HttpTransportClient extends TransportClient {
return httpRequestMessage;
}
static URI getResourceFeedUri(ResourceType resourceType, URI physicalAddress, RxDocumentServiceRequest request) throws Exception {
static String getResourceFeedUri(ResourceType resourceType, String physicalAddress, RxDocumentServiceRequest request) throws Exception {
switch (resourceType) {
case Attachment:
return getAttachmentFeedUri(physicalAddress, request);
@ -524,7 +526,7 @@ public class HttpTransportClient extends TransportClient {
}
}
static URI getResourceEntryUri(ResourceType resourceType, URI physicalAddress, RxDocumentServiceRequest request) throws Exception {
static String getResourceEntryUri(ResourceType resourceType, String physicalAddress, RxDocumentServiceRequest request) throws Exception {
switch (resourceType) {
case Attachment:
return getAttachmentEntryUri(physicalAddress, request);
@ -559,108 +561,113 @@ public class HttpTransportClient extends TransportClient {
}
}
static URI createURI(URI baseAddress, String resourcePath) {
return baseAddress.resolve(HttpUtils.urlEncode(trimBeginningAndEndingSlashes(resourcePath)));
static String createURI(String baseAddress, String resourcePath) {
if (baseAddress.charAt(baseAddress.length()-1) == '/') {
return baseAddress + HttpUtils.urlEncode(trimBeginningAndEndingSlashes(resourcePath));
} else {
return baseAddress + '/' + HttpUtils.urlEncode(trimBeginningAndEndingSlashes(resourcePath));
}
}
static URI getRootFeedUri(URI baseAddress) {
static String getRootFeedUri(String baseAddress) {
return baseAddress;
}
static URI getDatabaseFeedUri(URI baseAddress) throws Exception {
static String getDatabaseFeedUri(String baseAddress) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Database, StringUtils.EMPTY, true));
}
static URI getDatabaseEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getDatabaseEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Database, request, false));
}
static URI getCollectionFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getCollectionFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.DocumentCollection, request, true));
}
static URI getStoredProcedureFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getStoredProcedureFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.StoredProcedure, request, true));
}
static URI getTriggerFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getTriggerFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Trigger, request, true));
}
static URI getUserDefinedFunctionFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getUserDefinedFunctionFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.UserDefinedFunction, request, true));
}
static URI getCollectionEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getCollectionEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.DocumentCollection, request, false));
}
static URI getStoredProcedureEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getStoredProcedureEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.StoredProcedure, request, false));
}
static URI getTriggerEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getTriggerEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Trigger, request, false));
}
static URI getUserDefinedFunctionEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getUserDefinedFunctionEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.UserDefinedFunction, request, false));
}
static URI getDocumentFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getDocumentFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Document, request, true));
}
static URI getDocumentEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getDocumentEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Document, request, false));
}
static URI getConflictFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getConflictFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Conflict, request, true));
}
static URI getConflictEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getConflictEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Conflict, request, false));
}
static URI getAttachmentFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getAttachmentFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Attachment, request, true));
}
static URI getAttachmentEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getAttachmentEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Attachment, request, false));
}
static URI getUserFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getUserFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.User, request, true));
}
static URI getUserEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getUserEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.User, request, false));
}
static URI getPermissionFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getPermissionFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Permission, request, true));
}
static URI getPermissionEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getPermissionEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Permission, request, false));
}
static URI getOfferFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getOfferFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Offer, request, true));
}
static URI getSchemaFeedUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getSchemaFeedUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Schema, request, true));
}
static URI getSchemaEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getSchemaEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Schema, request, false));
}
static URI getOfferEntryUri(URI baseAddress, RxDocumentServiceRequest request) throws Exception {
static String getOfferEntryUri(String baseAddress, RxDocumentServiceRequest request) throws Exception {
return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Offer, request, false));
}

Просмотреть файл

@ -23,43 +23,49 @@
package com.microsoft.azure.cosmosdb.internal.directconnectivity;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.Error;
import com.microsoft.azure.cosmosdb.internal.ByteBufferPool;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.HttpResponseHeaders;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
class ResponseUtils {
private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024;
private final static Logger logger = LoggerFactory.getLogger(ResponseUtils.class);
public static Observable<String> toString(Observable<ByteBuf> contentObservable, final int contentLength) {
if (contentLength <= 0) {
return Observable.just("");
}
ByteBufferPool.ByteBufferWrapper byteBufferWrapper = ByteBufferPool.getInstant().lease(contentLength);
public static Observable<String> toString(Observable<ByteBuf> contentObservable) {
return contentObservable
.reduce(
new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE),
byteBufferWrapper,
(out, bb) -> {
try {
bb.readBytes(out, bb.readableBytes());
int limit = bb.readableBytes();
out.getByteBuffer().limit(limit);
bb.readBytes(out.getByteBuffer());
assert contentLength == limit;
return out;
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Throwable t) {
ByteBufferPool.getInstant().release(byteBufferWrapper);
throw new RuntimeException(t);
}
})
.map(out -> {
return new String(out.toByteArray(), StandardCharsets.UTF_8);
try {
out.getByteBuffer().position(0);
return new String(out.getByteBuffer().array(), 0, contentLength, StandardCharsets.UTF_8);
} finally {
ByteBufferPool.getInstant().release(byteBufferWrapper);
}
});
}
@ -75,7 +81,7 @@ class ResponseUtils {
contentObservable = Observable.just(null);
} else {
// transforms the observable<ByteBuf> to Observable<InputStream>
contentObservable = toString(clientResponse.getContent());
contentObservable = toString(clientResponse.getContent(), clientResponse.getHeaders().getIntHeader(HttpConstants.HttpHeaders.CONTENT_LENGTH, -1));
}
Observable<StoreResponse> storeResponseObservable = contentObservable
@ -91,34 +97,4 @@ class ResponseUtils {
return storeResponseObservable.toSingle();
}
private static void validateOrThrow(RxDocumentServiceRequest request, HttpResponseStatus status, HttpResponseHeaders headers, String body,
InputStream inputStream) throws DocumentClientException {
int statusCode = status.code();
if (statusCode >= HttpConstants.StatusCodes.MINIMUM_STATUSCODE_AS_ERROR_GATEWAY) {
if (body == null && inputStream != null) {
try {
body = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} catch (IOException e) {
logger.error("Failed to get content from the http response", e);
throw new IllegalStateException("Failed to get content from the http response", e);
} finally {
IOUtils.closeQuietly(inputStream);
}
}
String statusCodeString = status.reasonPhrase() != null
? status.reasonPhrase().replace(" ", "")
: "";
Error error = null;
error = (body != null) ? new Error(body) : new Error();
error = new Error(statusCodeString,
String.format("%s, StatusCode: %s", error.getMessage(), statusCodeString),
error.getPartitionedQueryExecutionInfo());
throw new DocumentClientException(statusCode, error, HttpUtils.asMap(headers));
}
}
}

Просмотреть файл

@ -134,12 +134,13 @@ public final class RntbdTransportClient extends TransportClient implements AutoC
@Override
public Single<StoreResponse> invokeStoreAsync(
final URI physicalAddress, final ResourceOperation unused, final RxDocumentServiceRequest request
final Uri physicalAddressUri, final ResourceOperation unused, final RxDocumentServiceRequest request
) {
checkNotNull(physicalAddress, "physicalAddress");
checkNotNull(physicalAddressUri, "physicalAddress");
checkNotNull(request, "request");
this.throwIfClosed();
URI physicalAddress = physicalAddressUri.getURI();
final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, physicalAddress);
requestArgs.traceOperation(logger, null, "invokeStoreAsync");

Просмотреть файл

@ -50,7 +50,6 @@ import rx.Single;
import rx.exceptions.CompositeException;
import rx.schedulers.Schedulers;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -142,7 +141,7 @@ public class StoreReader {
}).toSingle();
}
private Observable<ReadReplicaResult> earlyResultIfNotEnoughReplicas(List<URI> replicaAddresses,
private Observable<ReadReplicaResult> earlyResultIfNotEnoughReplicas(List<Uri> replicaAddresses,
RxDocumentServiceRequest request,
int replicaCountToRead) {
if (replicaAddresses.size() < replicaCountToRead) {
@ -159,7 +158,7 @@ public class StoreReader {
}
private Observable<StoreResult> toStoreResult(RxDocumentServiceRequest request,
Pair<Observable<StoreResponse>, URI> storeRespAndURI,
Pair<Observable<StoreResponse>, Uri> storeRespAndURI,
ReadMode readMode,
boolean requiresValidLsn) {
@ -172,7 +171,7 @@ public class StoreReader {
readMode != ReadMode.Strong,
storeRespAndURI.getRight());
request.requestContext.clientSideRequestStatistics.getContactedReplicas().add(storeRespAndURI.getRight());
request.requestContext.clientSideRequestStatistics.getContactedReplicas().add(storeRespAndURI.getRight().getURI());
return Observable.just(storeResult);
} catch (Exception e) {
// RxJava1 doesn't allow throwing checked exception from Observable operators
@ -195,7 +194,7 @@ public class StoreReader {
readMode != ReadMode.Strong,
null);
if (storeException instanceof TransportException) {
request.requestContext.clientSideRequestStatistics.getFailedReplicas().add(storeRespAndURI.getRight());
request.requestContext.clientSideRequestStatistics.getFailedReplicas().add(storeRespAndURI.getRight().getURI());
}
return Observable.just(storeResult);
} catch (Exception e) {
@ -206,7 +205,7 @@ public class StoreReader {
}
private Observable<List<StoreResult>> readFromReplicas(List<StoreResult> resultCollector,
List<URI> resolveApiResults,
List<Uri> resolveApiResults,
final AtomicInteger replicasToRead,
RxDocumentServiceRequest entity,
boolean includePrimary,
@ -223,13 +222,13 @@ public class StoreReader {
if (entity.requestContext.timeoutHelper.isElapsed()) {
return Observable.error(new GoneException());
}
List<Pair<Observable<StoreResponse>, URI>> readStoreTasks = new ArrayList<>();
List<Pair<Observable<StoreResponse>, Uri>> readStoreTasks = new ArrayList<>();
int uriIndex = StoreReader.generateNextRandom(resolveApiResults.size());
while (resolveApiResults.size() > 0) {
uriIndex = uriIndex % resolveApiResults.size();
URI uri = resolveApiResults.get(uriIndex);
Pair<Single<StoreResponse>, URI> res;
Uri uri = resolveApiResults.get(uriIndex);
Pair<Single<StoreResponse>, Uri> res;
try {
res = this.readFromStoreAsync(resolveApiResults.get(uriIndex),
entity);
@ -368,7 +367,7 @@ public class StoreReader {
requestedCollectionId = entity.requestContext.resolvedCollectionRid;
}
Single<List<URI>> resolveApiResultsObs = this.addressSelector.resolveAllUriAsync(
Single<List<Uri>> resolveApiResultsObs = this.addressSelector.resolveAllUriAsync(
entity,
includePrimary,
entity.requestContext.forceRefreshAddressCache);
@ -519,7 +518,7 @@ public class StoreReader {
return Single.error(new GoneException());
}
Single<URI> primaryUriObs = this.addressSelector.resolvePrimaryUriAsync(
Single<Uri> primaryUriObs = this.addressSelector.resolvePrimaryUriAsync(
entity,
entity.requestContext.forceRefreshAddressCache);
@ -536,7 +535,7 @@ public class StoreReader {
}
Pair<Single<StoreResponse>, URI> storeResponseObsAndUri = this.readFromStoreAsync(primaryUri, entity);
Pair<Single<StoreResponse>, Uri> storeResponseObsAndUri = this.readFromStoreAsync(primaryUri, entity);
return storeResponseObsAndUri.getLeft().flatMap(
storeResponse -> {
@ -598,8 +597,8 @@ public class StoreReader {
});
}
private Pair<Single<StoreResponse>, URI> readFromStoreAsync(
URI physicalAddress,
private Pair<Single<StoreResponse>, Uri> readFromStoreAsync(
Uri physicalAddress,
RxDocumentServiceRequest request) throws DocumentClientException {
if (request.requestContext.timeoutHelper.isElapsed()) {
@ -679,7 +678,7 @@ public class StoreReader {
Exception responseException,
boolean requiresValidLsn,
boolean useLocalLSNBasedHeaders,
URI storePhysicalAddress) throws DocumentClientException {
Uri storePhysicalAddress) throws DocumentClientException {
if (responseException == null) {
String headerValue = null;

Просмотреть файл

@ -31,12 +31,12 @@ import java.net.URI;
public abstract class TransportClient implements AutoCloseable {
// Uses requests's ResourceOperation to determine the operation
public Single<StoreResponse> invokeResourceOperationAsync(URI physicalAddress, RxDocumentServiceRequest request) {
public Single<StoreResponse> invokeResourceOperationAsync(Uri physicalAddress, RxDocumentServiceRequest request) {
return this.invokeStoreAsync(physicalAddress, new ResourceOperation(request.getOperationType(), request.getResourceType()), request);
}
protected abstract Single<StoreResponse> invokeStoreAsync(
URI physicalAddress,
ResourceOperation resourceOperation,
RxDocumentServiceRequest request);
Uri physicalAddress,
ResourceOperation resourceOperation,
RxDocumentServiceRequest request);
}

Просмотреть файл

@ -49,7 +49,7 @@ public class AddressSelectorTest {
}
@Test(groups = "unit", expectedExceptions = GoneException.class, expectedExceptionsMessageRegExp =
"The requested resource is no longer available at the server. Returned addresses are \\{https://cosmos1,https://cosmos2\\}")
"The requested resource is no longer available at the server. Returned addresses are \\{https://cosmos1/,https://cosmos2/\\}")
public void getPrimaryUri_NoPrimaryAddress() throws Exception {
RxDocumentServiceRequest request = Mockito.mock(RxDocumentServiceRequest.class);
Mockito.doReturn(null).when(request).getDefaultReplicaIndex();
@ -73,9 +73,9 @@ public class AddressSelectorTest {
replicaAddresses.add(new AddressInformation(true, true, "https://cosmos2", Protocol.Https));
replicaAddresses.add(new AddressInformation(true, false, "https://cosmos3", Protocol.Https));
URI res = AddressSelector.getPrimaryUri(request, replicaAddresses);
Uri res = AddressSelector.getPrimaryUri(request, replicaAddresses);
assertThat(res).isEqualTo(URI.create("https://cosmos2"));
assertThat(res).isEqualTo(Uri.create("https://cosmos2/"));
}
@Test(groups = "unit")
@ -89,9 +89,9 @@ public class AddressSelectorTest {
replicaAddresses.add(new AddressInformation(true, false, "https://cosmos2", Protocol.Https));
replicaAddresses.add(new AddressInformation(true, false, "https://cosmos3", Protocol.Https));
URI res = AddressSelector.getPrimaryUri(request, replicaAddresses);
Uri res = AddressSelector.getPrimaryUri(request, replicaAddresses);
assertThat(res).isEqualTo(URI.create("https://cosmos2"));
assertThat(res).isEqualTo(Uri.create("https://cosmos2/"));
}
@Test(groups = "unit")
@ -112,9 +112,9 @@ public class AddressSelectorTest {
Mockito.doReturn(Single.just(replicaAddresses.toArray(new AddressInformation[0]))).when(addressResolver).resolveAsync(Mockito.any(RxDocumentServiceRequest.class), Matchers.eq(false));
URI res = selector.resolvePrimaryUriAsync(request, false).toBlocking().value();
Uri res = selector.resolvePrimaryUriAsync(request, false).toBlocking().value();
assertThat(res).isEqualTo(URI.create("https://cosmos2"));
assertThat(res).isEqualTo(Uri.create("https://cosmos2/"));
}
@Test(groups = "unit")
@ -135,9 +135,9 @@ public class AddressSelectorTest {
Mockito.doReturn(Single.just(replicaAddresses.toArray(new AddressInformation[0]))).when(addressResolver).resolveAsync(Mockito.any(RxDocumentServiceRequest.class), Matchers.eq(false));
List<URI> res = selector.resolveAllUriAsync(request, true, false).toBlocking().value();
List<Uri> res = selector.resolveAllUriAsync(request, true, false).toBlocking().value();
assertThat(res).isEqualTo(ImmutableList.of(URI.create("https://cosmos1"), URI.create("https://cosmos2"), URI.create("https://cosmos3")));
assertThat(res).isEqualTo(ImmutableList.of(Uri.create("https://cosmos1/"), Uri.create("https://cosmos2/"), Uri.create("https://cosmos3/")));
}
@Test(groups = "unit")
@ -181,9 +181,9 @@ public class AddressSelectorTest {
Mockito.doReturn(Single.just(replicaAddresses.toArray(new AddressInformation[0]))).when(addressResolver).resolveAsync(Mockito.any(RxDocumentServiceRequest.class), Matchers.eq(false));
List<URI> res = selector.resolveAllUriAsync(request, true, false).toBlocking().value();
List<Uri> res = selector.resolveAllUriAsync(request, true, false).toBlocking().value();
assertThat(res).isEqualTo(ImmutableList.of(URI.create("rntbd://cosmos1"), URI.create("rntbd://cosmos2")));
assertThat(res).isEqualTo(ImmutableList.of(Uri.create("rntbd://cosmos1/"), Uri.create("rntbd://cosmos2/")));
}
}

Просмотреть файл

@ -34,7 +34,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import rx.Single;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -312,7 +311,7 @@ public class AddressSelectorWrapper {
addressSelector = Mockito.mock(AddressSelector.class);
}
public PrimaryReplicaMoveBuilder withPrimaryReplicaMove(URI primaryURIBeforeForceRefresh, URI primaryURIAfterForceRefresh) {
public PrimaryReplicaMoveBuilder withPrimaryReplicaMove(Uri primaryURIBeforeForceRefresh, Uri primaryURIAfterForceRefresh) {
AtomicBoolean refreshed = new AtomicBoolean(false);
Mockito.doAnswer((invocation) -> {
capture(invocation);
@ -347,8 +346,8 @@ public class AddressSelectorWrapper {
public static class ReplicaMoveBuilder extends Builder {
List<Pair<URI, URI>> secondary = new ArrayList<>();
Pair<URI, URI> primary;
List<Pair<Uri, Uri>> secondary = new ArrayList<>();
Pair<Uri, Uri> primary;
private Function<RxDocumentServiceRequest, PartitionKeyRange> partitionKeyRangeFunction;
static ReplicaMoveBuilder create(Protocol protocol) {
@ -360,12 +359,12 @@ public class AddressSelectorWrapper {
addressSelector = Mockito.mock(AddressSelector.class);
}
public ReplicaMoveBuilder withPrimaryMove(URI uriBeforeForceRefresh, URI uriAfterForceRefresh) {
public ReplicaMoveBuilder withPrimaryMove(Uri uriBeforeForceRefresh, Uri uriAfterForceRefresh) {
withReplicaMove(uriBeforeForceRefresh, uriAfterForceRefresh, true);
return this;
}
public ReplicaMoveBuilder withSecondaryMove(URI uriBeforeForceRefresh, URI uriAfterForceRefresh) {
public ReplicaMoveBuilder withSecondaryMove(Uri uriBeforeForceRefresh, Uri uriAfterForceRefresh) {
withReplicaMove(uriBeforeForceRefresh, uriAfterForceRefresh, false);
return this;
}
@ -375,7 +374,7 @@ public class AddressSelectorWrapper {
return this;
}
public ReplicaMoveBuilder withReplicaMove(URI uriBeforeForceRefresh, URI uriAfterForceRefresh, boolean isPrimary) {
public ReplicaMoveBuilder withReplicaMove(Uri uriBeforeForceRefresh, Uri uriAfterForceRefresh, boolean isPrimary) {
if (isPrimary) {
primary = ImmutablePair.of(uriBeforeForceRefresh, uriAfterForceRefresh);
} else {
@ -409,7 +408,7 @@ public class AddressSelectorWrapper {
boolean includePrimary = invocation.getArgumentAt(1, Boolean.class);
boolean forceRefresh = invocation.getArgumentAt(2, Boolean.class);
ImmutableList.Builder<URI> b = ImmutableList.builder();
ImmutableList.Builder<Uri> b = ImmutableList.builder();
if (forceRefresh || refreshed.get()) {
if (partitionKeyRangeFunction != null) {
@ -437,7 +436,7 @@ public class AddressSelectorWrapper {
RxDocumentServiceRequest request = invocation.getArgumentAt(0, RxDocumentServiceRequest.class);
boolean forceRefresh = invocation.getArgumentAt(1, Boolean.class);
ImmutableList.Builder<URI> b = ImmutableList.builder();
ImmutableList.Builder<Uri> b = ImmutableList.builder();
if (forceRefresh || refreshed.get()) {
if (partitionKeyRangeFunction != null) {
@ -461,8 +460,8 @@ public class AddressSelectorWrapper {
}
public static class Simple extends Builder {
private URI primaryAddress;
private List<URI> secondaryAddresses;
private Uri primaryAddress;
private List<Uri> secondaryAddresses;
static Simple create() {
return new Simple(Protocol.Https);
}
@ -472,16 +471,17 @@ public class AddressSelectorWrapper {
addressSelector = Mockito.mock(AddressSelector.class);
}
public Simple withPrimary(URI primaryAddress) {
public Simple withPrimary(Uri primaryAddress) {
this.primaryAddress = primaryAddress;
return this;
}
public Simple withSecondary(List<URI> secondaryAddresses) {
public Simple withSecondary(List<Uri> secondaryAddresses) {
this.secondaryAddresses = secondaryAddresses;
return this;
}
public AddressSelectorWrapper build() {
Mockito.doAnswer((invocation) -> {
capture(invocation);
@ -515,8 +515,8 @@ public class AddressSelectorWrapper {
return new AddressSelectorWrapper(this.addressSelector, this.invocationOnMockList);
}
private AddressInformation toAddressInformation(URI uri, boolean isPrimary, Protocol protocol) {
return new AddressInformation(true, isPrimary, uri.toString(), protocol);
private AddressInformation toAddressInformation(Uri uri, boolean isPrimary, Protocol protocol) {
return new AddressInformation(true, isPrimary, uri.getURIAsString(), protocol);
}
}

Просмотреть файл

@ -49,7 +49,6 @@ import rx.observers.TestSubscriber;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -155,8 +154,8 @@ public class ConsistencyReaderTest {
@Test(groups = "unit")
public void readAny() {
List<URI> secondaries = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"), URI.create("secondary3"));
URI primaryAddress = URI.create("primary");
List<Uri> secondaries = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"), Uri.create("secondary3"));
Uri primaryAddress = Uri.create("primary");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryAddress)
.withSecondary(secondaries)
@ -244,8 +243,8 @@ public class ConsistencyReaderTest {
String partitionKeyRangeId = "1";
long fasterReplicaLSN = 651177;
List<URI> secondaries = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"), URI.create("secondary3"));
URI primaryAddress = URI.create("primary");
List<Uri> secondaries = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"), Uri.create("secondary3"));
Uri primaryAddress = Uri.create("primary");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryAddress)
.withSecondary(secondaries)
@ -395,10 +394,10 @@ public class ConsistencyReaderTest {
.then(storeResponse) // 4th replica read returns storeResponse satisfying requested session token
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -464,10 +463,10 @@ public class ConsistencyReaderTest {
.then(foundException) // 4th replica read returns not found lsn(response) >= lsn(request)
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -530,10 +529,10 @@ public class ConsistencyReaderTest {
.then(foundException) // 4th replica read lsn lags behind the request lsn
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -594,10 +593,10 @@ public class ConsistencyReaderTest {
.then(requestTooLargeException) // 4th replica read result in throttling
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -651,8 +650,8 @@ public class ConsistencyReaderTest {
@Test(groups = "unit", dataProvider = "simpleReadStrongArgProvider")
public void basicReadStrong_AllReplicasSameLSN(int replicaCountToRead, ReadMode readMode) {
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryReplicaURI = URI.create("primary");
ImmutableList<URI> secondaryReplicaURIs = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"), URI.create("secondary3"));
Uri primaryReplicaURI = Uri.create("primary");
ImmutableList<Uri> secondaryReplicaURIs = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"), Uri.create("secondary3"));
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryReplicaURI)
.withSecondary(secondaryReplicaURIs)

Просмотреть файл

@ -42,7 +42,6 @@ import rx.Single;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import java.net.URI;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
@ -83,10 +82,10 @@ public class ConsistencyWriterTest {
.then(ex)
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -129,15 +128,15 @@ public class ConsistencyWriterTest {
initializeConsistencyWriter(false);
CyclicBarrier b = new CyclicBarrier(2);
PublishSubject<URI> subject = PublishSubject.create();
PublishSubject<Uri> subject = PublishSubject.create();
CountDownLatch c = new CountDownLatch(1);
URI uri = URI.create("https://localhost:5050");
Uri uri = Uri.create("https://localhost:5050");
List<InvocationOnMock> invocationOnMocks = Collections.synchronizedList(new ArrayList<>());
Mockito.doAnswer(new Answer() {
@Override
public Single<URI> answer(InvocationOnMock invocationOnMock) {
public Single<Uri> answer(InvocationOnMock invocationOnMock) {
invocationOnMocks.add(invocationOnMock);
return subject.toSingle().doOnSuccess(x -> c.countDown()).doAfterTerminate(() -> {
new Thread() {

Просмотреть файл

@ -28,7 +28,6 @@ import com.microsoft.azure.cosmosdb.internal.OperationType;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import org.apache.commons.collections4.map.HashedMap;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -84,17 +83,17 @@ abstract public class EndpointMock {
class ReplicasWithSameSpeed extends Builder {
URI primary;
List<URI> secondaries = new ArrayList<>();
Uri primary;
List<Uri> secondaries = new ArrayList<>();
StoreResponse headStoreResponse;
StoreResponse readStoreResponse;
ReplicasWithSameSpeed addPrimary(URI replicaAddress) {
ReplicasWithSameSpeed addPrimary(Uri replicaAddress) {
primary = replicaAddress;
return this;
}
ReplicasWithSameSpeed addSecondary(URI replicaAddress) {
ReplicasWithSameSpeed addSecondary(Uri replicaAddress) {
secondaries.add(replicaAddress);
return this;
}
@ -112,9 +111,9 @@ abstract public class EndpointMock {
public EndpointMock build() {
TransportClientWrapper.Builder.ReplicaResponseBuilder transportClientWrapperBuilder = TransportClientWrapper.Builder.replicaResponseBuilder();
ImmutableList<URI> replicas = ImmutableList.<URI>builder().add(primary).addAll(secondaries).build();
ImmutableList<Uri> replicas = ImmutableList.<Uri>builder().add(primary).addAll(secondaries).build();
for(URI replica: replicas) {
for(Uri replica: replicas) {
transportClientWrapperBuilder.addReplica(replica, (i, request) -> {
if (request.getOperationType() == OperationType.Head || request.getOperationType() == OperationType.HeadFeed) {
return headStoreResponse;
@ -132,20 +131,20 @@ abstract public class EndpointMock {
}
class QuorumNotMetSecondaryReplicasDisappear {
URI primary;
Map<URI, Function2WithCheckedException<Integer, RxDocumentServiceRequest, Boolean>> disappearDictionary = new HashedMap<>();
public QuorumNotMetSecondaryReplicasDisappear primaryReplica(URI primaryReplica) {
Uri primary;
Map<Uri, Function2WithCheckedException<Integer, RxDocumentServiceRequest, Boolean>> disappearDictionary = new HashedMap();
public QuorumNotMetSecondaryReplicasDisappear primaryReplica(Uri primaryReplica) {
this.primary = primaryReplica;
return this;
}
public QuorumNotMetSecondaryReplicasDisappear secondaryReplicasDisappearWhen(URI secondary,
public QuorumNotMetSecondaryReplicasDisappear secondaryReplicasDisappearWhen(Uri secondary,
Function2WithCheckedException<Integer, RxDocumentServiceRequest, Boolean> disappearPredicate) {
disappearDictionary.put(secondary, disappearPredicate);
return this;
}
public QuorumNotMetSecondaryReplicasDisappear secondaryReplicasDisappearAfter(URI secondary, int attempt) {
public QuorumNotMetSecondaryReplicasDisappear secondaryReplicasDisappearAfter(Uri secondary, int attempt) {
disappearDictionary.put(secondary, (i, r) -> i >= attempt);
return this;
}
@ -154,8 +153,8 @@ abstract public class EndpointMock {
static public class NoSecondaryReplica extends Builder {
private long LOCAL_LSN = 19;
private long LSN = 52;
private URI defaultPrimaryURI = URI.create("primary");
private URI primary = defaultPrimaryURI;
private Uri defaultPrimaryURI = Uri.create("primary");
private Uri primary = defaultPrimaryURI;
private StoreResponse defaultResponse = StoreResponseBuilder.create()
.withLSN(LSN)
.withLocalLSN(LOCAL_LSN)
@ -169,7 +168,7 @@ abstract public class EndpointMock {
private StoreResponse readStoreResponse = defaultResponse;
private Function1WithCheckedException<RxDocumentServiceRequest, StoreResponse> storeResponseFunc;
public NoSecondaryReplica primaryReplica(URI primaryReplica) {
public NoSecondaryReplica primaryReplica(Uri primaryReplica) {
this.primary = primaryReplica;
return this;
}
@ -189,9 +188,9 @@ abstract public class EndpointMock {
TransportClientWrapper.Builder.ReplicaResponseBuilder transportClientWrapperBuilder = TransportClientWrapper.Builder.replicaResponseBuilder();
ImmutableList<URI> replicas = ImmutableList.<URI>builder().add(primary).build();
ImmutableList<Uri> replicas = ImmutableList.<Uri>builder().add(primary).build();
for(URI replica: replicas) {
for(Uri replica: replicas) {
transportClientWrapperBuilder.addReplica(replica, (i, request) -> {
if (storeResponseFunc != null) {
@ -216,8 +215,8 @@ abstract public class EndpointMock {
static public class NoSecondaryReplica_TwoSecondaryReplicasGoLiveAfterFirstHitOnPrimary extends Builder {
private long LOCAL_LSN = 19;
private long LSN = 52;
private URI primary = URI.create("primary");
private ImmutableList<URI> secondaryReplicas = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"));
private Uri primary = Uri.create("primary");
private ImmutableList<Uri> secondaryReplicas = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"));
private StoreResponse primaryDefaultResponse = StoreResponseBuilder.create()
.withLSN(LSN)
.withLocalLSN(LOCAL_LSN)
@ -234,17 +233,17 @@ abstract public class EndpointMock {
.withHeader(WFConstants.BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long.toString(LOCAL_LSN))
.withRequestCharge(0)
.build();
Map<URI, Function1WithCheckedException<RxDocumentServiceRequest, StoreResponse>> secondaryResponseFunc =
Map<Uri, Function1WithCheckedException<RxDocumentServiceRequest, StoreResponse>> secondaryResponseFunc =
new HashMap<>();
public NoSecondaryReplica_TwoSecondaryReplicasGoLiveAfterFirstHitOnPrimary primaryReplica(URI primaryReplica) {
public NoSecondaryReplica_TwoSecondaryReplicasGoLiveAfterFirstHitOnPrimary primaryReplica(Uri primaryReplica) {
this.primary = primaryReplica;
return this;
}
public NoSecondaryReplica_TwoSecondaryReplicasGoLiveAfterFirstHitOnPrimary responseFromSecondary(
URI replica,
Uri replica,
Function1WithCheckedException<RxDocumentServiceRequest, StoreResponse> func) {
secondaryResponseFunc.put(replica, func);
return this;

Просмотреть файл

@ -64,10 +64,10 @@ public class HttpTransportClientTest {
private final static Configs configs = new Configs();
private final static int TIMEOUT = 1000;
private final URI physicalAddress = URI.create(
private final String physicalAddress =
"https://by4prdddc03-docdb-1.documents.azure.com:9056" +
"/apps/b76af614-5421-4318-4c9e-33056ff5a2bf/services/e7c8d429-c379-40c9-9486-65b89b70be2f" +
"/partitions/5f5b8766-3bdf-4713-b85a-a55ac2ccd62c/replicas/131828696163674404p/");
"/partitions/5f5b8766-3bdf-4713-b85a-a55ac2ccd62c/replicas/131828696163674404p/";
private final long lsn = 5;
private final String partitionKeyRangeId = "3";
@ -76,7 +76,7 @@ public class HttpTransportClientTest {
public void getResourceFeedUri_Document() throws Exception {
RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Document);
URI res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
String res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
assertThat(res.toString()).isEqualTo(physicalAddress.toString() + HttpUtils.urlEncode("dbs/db/colls/col/docs"));
}
@ -84,7 +84,7 @@ public class HttpTransportClientTest {
public void getResourceFeedUri_Attachment() throws Exception {
RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Attachment);
URI res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
String res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
assertThat(res.toString()).isEqualTo(physicalAddress.toString() + HttpUtils.urlEncode("dbs/db/colls/col/attachments"));
}
@ -92,7 +92,7 @@ public class HttpTransportClientTest {
public void getResourceFeedUri_Collection() throws Exception {
RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName(
OperationType.Create, "dbs/db", ResourceType.DocumentCollection);
URI res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
String res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
assertThat(res.toString()).isEqualTo(physicalAddress.toString() + HttpUtils.urlEncode("dbs/db/colls"));
}
@ -100,7 +100,7 @@ public class HttpTransportClientTest {
public void getResourceFeedUri_Conflict() throws Exception {
RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName(
OperationType.Create, "/dbs/db/colls/col", ResourceType.Conflict);
URI res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
String res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
assertThat(res.toString()).isEqualTo(physicalAddress.toString() + HttpUtils.urlEncode("dbs/db/colls/col/conflicts"));
}
@ -108,7 +108,7 @@ public class HttpTransportClientTest {
public void getResourceFeedUri_Database() throws Exception {
RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName(
OperationType.Create, "/", ResourceType.Database);
URI res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
String res = HttpTransportClient.getResourceFeedUri(req.getResourceType(), physicalAddress, req);
assertThat(res.toString()).isEqualTo(physicalAddress.toString() + "dbs");
}
@ -148,7 +148,7 @@ public class HttpTransportClientTest {
OperationType.Create, "dbs/db/colls/col", ResourceType.Document);
request.setContentBytes(new byte[0]);
transportClient.invokeStoreAsync(physicalAddress,
transportClient.invokeStoreAsync(Uri.create(physicalAddress),
new ResourceOperation(OperationType.Create, ResourceType.Document),
request).toBlocking().value();
@ -468,7 +468,7 @@ public class HttpTransportClientTest {
request.setContentBytes(new byte[0]);
Single<StoreResponse> storeResp = transportClient.invokeStoreAsync(
physicalAddress,
Uri.create(physicalAddress),
new ResourceOperation(OperationType.Create, ResourceType.Document),
request);
@ -577,7 +577,7 @@ public class HttpTransportClientTest {
httpClientMockWrapper.getClient());
Single<StoreResponse> storeResp = transportClient.invokeStoreAsync(
physicalAddress,
Uri.create(physicalAddress),
new ResourceOperation(OperationType.Create, ResourceType.Document),
request);

Просмотреть файл

@ -44,7 +44,6 @@ import rx.observers.TestSubscriber;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -95,8 +94,8 @@ public class QuorumReaderTest {
@Test(groups = "unit", dataProvider = "simpleReadStrongArgProvider")
public void basicReadStrong_AllReplicasSameLSN(int replicaCountToRead, ReadMode readMode, Long lsn, Long localLSN) {
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryReplicaURI = URI.create("primary");
ImmutableList<URI> secondaryReplicaURIs = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"), URI.create("secondary3"));
Uri primaryReplicaURI = Uri.create("primary");
ImmutableList<Uri> secondaryReplicaURIs = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"), Uri.create("secondary3"));
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryReplicaURI)
.withSecondary(secondaryReplicaURIs)
@ -169,8 +168,8 @@ public class QuorumReaderTest {
int replicaCountToRead = 2;
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryReplicaURI = URI.create("primary");
ImmutableList<URI> secondaryReplicaURIs = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"));
Uri primaryReplicaURI = Uri.create("primary");
ImmutableList<Uri> secondaryReplicaURIs = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"));
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryReplicaURI)
.withSecondary(secondaryReplicaURIs)
@ -315,8 +314,8 @@ public class QuorumReaderTest {
int replicaCountToRead = 2;
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryReplicaURI = URI.create("primary");
ImmutableList<URI> secondaryReplicaURIs = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"));
Uri primaryReplicaURI = Uri.create("primary");
ImmutableList<Uri> secondaryReplicaURIs = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"));
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryReplicaURI)
.withSecondary(secondaryReplicaURIs)
@ -474,8 +473,8 @@ public class QuorumReaderTest {
int replicaCountToRead = 2;
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryReplicaURI = URI.create("primary");
ImmutableList<URI> secondaryReplicaURIs = ImmutableList.of(URI.create("secondary1"));
Uri primaryReplicaURI = Uri.create("primary");
ImmutableList<Uri> secondaryReplicaURIs = ImmutableList.of(Uri.create("secondary1"));
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryReplicaURI)
.withSecondary(secondaryReplicaURIs)

Просмотреть файл

@ -44,7 +44,6 @@ import rx.Single;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -63,11 +62,11 @@ public class ReplicatedResourceClientPartitionSplitTest {
@Test(groups = { "unit" }, dataProvider = "partitionIsSplittingArgProvider", timeOut = TIMEOUT)
public void partitionSplit_RefreshCache_Read(ConsistencyLevel consistencyLevel, int partitionIsSplitting) {
URI secondary1AddressBeforeMove = URI.create("secondary");
URI secondary1AddressAfterMove = URI.create("secondaryNew");
Uri secondary1AddressBeforeMove = Uri.create("secondary");
Uri secondary1AddressAfterMove = Uri.create("secondaryNew");
URI primaryAddressBeforeMove = URI.create("primary");
URI primaryAddressAfterMove = URI.create("primaryNew");
Uri primaryAddressBeforeMove = Uri.create("primary");
Uri primaryAddressAfterMove = Uri.create("primaryNew");
String partitionKeyRangeIdBeforeSplit = "1";
String partitionKeyRangeIdAfterSplit = "2";

Просмотреть файл

@ -94,12 +94,12 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public final class RntbdTransportClientTest {
private static final int lsn = 5;
private static final ByteBuf noContent = Unpooled.wrappedBuffer(new byte[0]);
private static final String partitionKeyRangeId = "3";
private static final URI physicalAddress = URI.create("rntbd://host:10251/replica-path/");
private static final Duration requestTimeout = Duration.ofSeconds(1000);
final private static Logger logger = LoggerFactory.getLogger(RntbdTransportClientTest.class);
final private static int lsn = 5;
final private static ByteBuf noContent = Unpooled.wrappedBuffer(new byte[0]);
final private static String partitionKeyRangeId = "3";
final private static Uri physicalAddress = new Uri("rntbd://host:10251/replica-path/");
final private static Duration requestTimeout = Duration.ofSeconds(1000);
@DataProvider(name = "fromMockedNetworkFailureToExpectedDocumentClientException")
public Object[][] fromMockedNetworkFailureToExpectedDocumentClientException() {
@ -629,7 +629,7 @@ public final class RntbdTransportClientTest {
RntbdTestConfiguration.AccountKey
);
final URI physicalAddress = new URI("rntbd://"
final Uri physicalAddress = new Uri("rntbd://"
+ RntbdTestConfiguration.RntbdAuthority
+ "/apps/DocDbApp/services/DocDbMaster0/partitions/780e44f4-38c8-11e6-8106-8cdcd42c33be/replicas/1p/"
);

Просмотреть файл

@ -122,13 +122,13 @@ public class StoreReaderDotNetTest {
Mockito.doReturn(Single.just(srb.build()))
.when(mockTransportClient)
.invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[0].getPhysicalUri())),
Mockito.eq(addressInformation[0].getPhysicalUri()),
Mockito.any(RxDocumentServiceRequest.class));
// get response from mock object
StoreResponse response = mockTransportClient.invokeResourceOperationAsync(URI.create(addressInformation[0].getPhysicalUri()), entity).toBlocking().value();
StoreResponse response = mockTransportClient.invokeResourceOperationAsync(addressInformation[0].getPhysicalUri(), entity).toBlocking().value();
// validate that the LSN matches
// validate that the ActivityId Matches
@ -156,7 +156,7 @@ public class StoreReaderDotNetTest {
// setup mock transport client for the first replica
Mockito.doReturn(Single.just(mockStoreResponseFast))
.when(mockTransportClient)
.invokeResourceOperationAsync(Mockito.eq(URI.create(addressInformation[0].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
.invokeResourceOperationAsync(Mockito.eq(addressInformation[0].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
// setup mock transport client with a sequence of outputs
@ -169,7 +169,7 @@ public class StoreReaderDotNetTest {
.doReturn(Single.just(mockStoreResponseFast))
.doReturn(Single.just(mockStoreResponseFast))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[1].getPhysicalUri())),
Mockito.eq(addressInformation[1].getPhysicalUri()),
Mockito.any(RxDocumentServiceRequest.class));
// After this, the product code should reset target identity, and lsn response
@ -189,7 +189,7 @@ public class StoreReaderDotNetTest {
Mockito.doAnswer((params) -> Single.just(queueOfResponses.poll()))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[2].getPhysicalUri())),
Mockito.eq(addressInformation[2].getPhysicalUri()),
Mockito.any(RxDocumentServiceRequest.class));
return mockTransportClient;
@ -255,7 +255,7 @@ public class StoreReaderDotNetTest {
// setup mock transport client for the first replica
Mockito.doReturn(Single.just(mockStoreResponse5))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[0].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[0].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Single.just(mockStoreResponse1))
.doReturn(Single.just(mockStoreResponse1))
@ -264,7 +264,7 @@ public class StoreReaderDotNetTest {
.doReturn(Single.just(mockStoreResponse1))
.doReturn(Single.just(mockStoreResponse5))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[1].getPhysicalUri())),
Mockito.eq(addressInformation[1].getPhysicalUri()),
Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Single.just(mockStoreResponse2))
@ -274,7 +274,7 @@ public class StoreReaderDotNetTest {
.doReturn(Single.just(mockStoreResponse4))
.doReturn(Single.just(mockStoreResponse5))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[2].getPhysicalUri())),
Mockito.eq(addressInformation[2].getPhysicalUri()),
Mockito.any(RxDocumentServiceRequest.class));
}
@ -282,32 +282,32 @@ public class StoreReaderDotNetTest {
// setup mock transport client for the first replica
Mockito.doReturn(Single.just(mockStoreResponse2))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[0].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[0].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
// setup mock transport client with a sequence of outputs
Mockito.doReturn(Single.just(mockStoreResponse1))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[1].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[1].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
// setup mock transport client with a sequence of outputs
Mockito.doReturn(Single.just(mockStoreResponse2))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[2].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[2].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
} else if (result == ReadQuorumResultKind.QuorumNotSelected) {
// setup mock transport client for the first replica
Mockito.doReturn(Single.just(mockStoreResponse5))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[0].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[0].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Single.just(mockStoreResponse5))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[1].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[1].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Single.error(new GoneException("test")))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[2].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[2].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
}
return mockTransportClient;
@ -387,7 +387,7 @@ public class StoreReaderDotNetTest {
.doReturn(Single.just(mockStoreResponse1))
.doReturn(Single.just(finalResponse))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[i].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[i].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
} else {
Mockito.doReturn(Single.just(mockStoreResponse1))
@ -397,7 +397,7 @@ public class StoreReaderDotNetTest {
.doReturn(Single.just(mockStoreResponse1))
.doReturn(Single.just(mockStoreResponse1))
.when(mockTransportClient).invokeResourceOperationAsync(
Mockito.eq(URI.create(addressInformation[i].getPhysicalUri())), Mockito.any(RxDocumentServiceRequest.class));
Mockito.eq(addressInformation[i].getPhysicalUri()), Mockito.any(RxDocumentServiceRequest.class));
}
}
@ -478,16 +478,16 @@ public class StoreReaderDotNetTest {
assertThat(addressInfo[0]).isEqualTo(addressInformation[0]);
AddressSelector addressSelector = new AddressSelector(mockAddressCache, Protocol.Tcp);
URI primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false /*forceAddressRefresh*/).toBlocking().value();
Uri primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false /*forceAddressRefresh*/).toBlocking().value();
// check if the address return from Address Selector matches the original address info
assertThat(primaryAddress.toString()).isEqualTo(addressInformation[0].getPhysicalUri());
assertThat(primaryAddress).isEqualTo(addressInformation[0].getPhysicalUri());
// get mock transport client that returns a sequence of responses to simulate upgrade
TransportClient mockTransportClient = getMockTransportClientDuringUpgrade(addressInformation);
// get response from mock object
StoreResponse response = mockTransportClient.invokeResourceOperationAsync(URI.create(addressInformation[0].getPhysicalUri()), entity).toBlocking().value();
StoreResponse response = mockTransportClient.invokeResourceOperationAsync(addressInformation[0].getPhysicalUri(), entity).toBlocking().value();
// validate that the LSN matches
assertThat(response.getLSN()).isEqualTo(50);
@ -606,7 +606,7 @@ public class StoreReaderDotNetTest {
assertThat(addressInfo[0]).isEqualTo(addressInformations[0]);
AddressSelector addressSelector = new AddressSelector(mockAddressCache, Protocol.Tcp);
URI primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false).toBlocking().value();
Uri primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false).toBlocking().value();
// check if the address return from Address Selector matches the original address info
assertThat(primaryAddress.toString()).isEqualTo(addressInformations[0].getPhysicalUri());
@ -615,7 +615,7 @@ public class StoreReaderDotNetTest {
TransportClient mockTransportClient = getMockTransportClientDuringUpgrade(addressInformations);
// get response from mock object
StoreResponse response = mockTransportClient.invokeResourceOperationAsync(new URI(addressInformations[0].getPhysicalUri()), entity).toBlocking().value();
StoreResponse response = mockTransportClient.invokeResourceOperationAsync(addressInformations[0].getPhysicalUri(), entity).toBlocking().value();
// validate that the LSN matches
assertThat(response.getLSN()).isEqualTo(50);
@ -694,10 +694,10 @@ public class StoreReaderDotNetTest {
assertThat(addressInformations[0]).isEqualTo(addressInfo[0]);
AddressSelector addressSelector = new AddressSelector(mockAddressCache, Protocol.Tcp);
URI primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false).toBlocking().value();
Uri primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false).toBlocking().value();
// check if the address return from Address Selector matches the original address info
assertThat(primaryAddress.toString()).isEqualTo(addressInformations[0].getPhysicalUri());
assertThat(primaryAddress).isEqualTo(addressInformations[0].getPhysicalUri());
// create a real session container - we don't need session for this test anyway
SessionContainer sessionContainer = new SessionContainer(StringUtils.EMPTY);
@ -780,10 +780,10 @@ public class StoreReaderDotNetTest {
assertThat(addressInfo[0]).isEqualTo(addressInformations[0]);
AddressSelector addressSelector = new AddressSelector(mockAddressCache, Protocol.Tcp);
URI primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false).toBlocking().value();
Uri primaryAddress = addressSelector.resolvePrimaryUriAsync(entity, false).toBlocking().value();
// check if the address return from Address Selector matches the original address info
assertThat(primaryAddress.toString()).isEqualTo(addressInformations[0].getPhysicalUri());
assertThat(primaryAddress).isEqualTo(addressInformations[0].getPhysicalUri());
// Quorum Met scenario Start
{

Просмотреть файл

@ -51,7 +51,6 @@ import rx.Single;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@ -79,15 +78,15 @@ public class StoreReaderTest {
StoreReader storeReader = new StoreReader(transportClient, addressSelector, sessionContainer);
CyclicBarrier b = new CyclicBarrier(2);
PublishSubject<List<URI>> subject = PublishSubject.create();
PublishSubject<List<Uri>> subject = PublishSubject.create();
CountDownLatch c = new CountDownLatch(1);
List<URI> uris = ImmutableList.of(URI.create("https://localhost:5050"), URI.create("https://localhost:5051"),
URI.create("https://localhost:50502"), URI.create("https://localhost:5053"));
List<Uri> uris = ImmutableList.of(Uri.create("https://localhost:5050"), Uri.create("https://localhost:5051"),
Uri.create("https://localhost:50502"), Uri.create("https://localhost:5053"));
Mockito.doAnswer(new Answer() {
@Override
public Single<List<URI>> answer(InvocationOnMock invocationOnMock) throws Throwable {
public Single<List<Uri>> answer(InvocationOnMock invocationOnMock) throws Throwable {
return subject.toSingle().doOnSuccess(x -> c.countDown()).doAfterTerminate(() -> {
new Thread() {
@ -162,10 +161,10 @@ public class StoreReaderTest {
.then(ex)
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -232,10 +231,10 @@ public class StoreReaderTest {
.then(storeResponse) // 4th replica read returns storeResponse satisfying requested session token
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -309,10 +308,10 @@ public class StoreReaderTest {
.then(foundException) // 4th replica read returns not found
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -378,10 +377,10 @@ public class StoreReaderTest {
.then(foundException) // 4th replica read returns not found
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -440,10 +439,10 @@ public class StoreReaderTest {
.then(requestRateTooLargeException) // 4th replica read returns 429
.build();
URI primaryUri = URI.create("primary");
URI secondaryUri1 = URI.create("secondary1");
URI secondaryUri2 = URI.create("secondary2");
URI secondaryUri3 = URI.create("secondary3");
Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
@ -489,7 +488,7 @@ public class StoreReaderTest {
AddressSelector addressSelector = Mockito.mock(AddressSelector.class);
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryURI = URI.create("primaryLoc");
Uri primaryURI = Uri.create("primaryLoc");
RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
@ -520,7 +519,7 @@ public class StoreReaderTest {
AddressSelector addressSelector = Mockito.mock(AddressSelector.class);
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryURI = URI.create("primaryLoc");
Uri primaryURI = Uri.create("primaryLoc");
RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
@ -547,7 +546,7 @@ public class StoreReaderTest {
AddressSelector addressSelector = Mockito.mock(AddressSelector.class);
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryURI = URI.create("primaryLoc");
Uri primaryURI = Uri.create("primaryLoc");
RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
@ -628,8 +627,8 @@ public class StoreReaderTest {
.then(response)
.build();
URI primaryURIPriorToRefresh = URI.create("stale");
URI primaryURIAfterRefresh = URI.create("new");
Uri primaryURIPriorToRefresh = Uri.create("stale");
Uri primaryURIAfterRefresh = Uri.create("new");
RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
@ -690,8 +689,8 @@ public class StoreReaderTest {
// This adds basic tests for StoreReader.readMultipleReplicasAsync(.) without failure
// TODO: add some tests for readMultipleReplicasAsync which mock behaviour of failure of reading from a replica
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
URI primaryReplicaURI = URI.create("primary");
ImmutableList<URI> secondaryReplicaURIs = ImmutableList.of(URI.create("secondary1"), URI.create("secondary2"), URI.create("secondary3"));
Uri primaryReplicaURI = Uri.create("primary");
ImmutableList<Uri> secondaryReplicaURIs = ImmutableList.of(Uri.create("secondary1"), Uri.create("secondary2"), Uri.create("secondary3"));
AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryReplicaURI)
.withSecondary(secondaryReplicaURIs)

Просмотреть файл

@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Single;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -52,9 +51,9 @@ public class TransportClientWrapper {
public final TransportClient transportClient;
private final AtomicBoolean valid;
private final AtomicInteger cnt;
private final List<Pair<URI, RxDocumentServiceRequest>> requests;
private final List<Pair<Uri, RxDocumentServiceRequest>> requests;
TransportClientWrapper(TransportClient transportClient, AtomicInteger cnt, AtomicBoolean valid, List<Pair<URI, RxDocumentServiceRequest>> requests) {
TransportClientWrapper(TransportClient transportClient, AtomicInteger cnt, AtomicBoolean valid, List<Pair<Uri, RxDocumentServiceRequest>> requests) {
this.transportClient = transportClient;
this.valid = valid;
this.cnt = cnt;
@ -88,7 +87,7 @@ public class TransportClientWrapper {
return this;
}
public List<Pair<URI, RxDocumentServiceRequest>> getCapturedArgs() {
public List<Pair<Uri, RxDocumentServiceRequest>> getCapturedArgs() {
return requests;
}
@ -103,10 +102,10 @@ public class TransportClientWrapper {
public interface Builder {
static void capture(List<Pair<URI, RxDocumentServiceRequest>> capturedRequests, InvocationOnMock invocation) {
URI physicalUri = invocation.getArgumentAt(0, URI.class);
static void capture(List<Pair<Uri, RxDocumentServiceRequest>> capturedRequests, InvocationOnMock invocation) {
Uri physicalUri = invocation.getArgumentAt(0, Uri.class);
RxDocumentServiceRequest request = invocation.getArgumentAt(1, RxDocumentServiceRequest.class);
logger.debug("URI: {}, request {}", physicalUri, request);
logger.debug("Uri: {}, request {}", physicalUri, request);
capturedRequests.add(Pair.of(physicalUri, request));
}
@ -117,9 +116,9 @@ public class TransportClientWrapper {
}
class ReplicaResponseBuilder implements Builder {
Map<URI, Function2WithCheckedException> responseFunctionDictionary = new HashMap<>();
Map<Uri, Function2WithCheckedException> responseFunctionDictionary = new HashMap<>();
public ReplicaResponseBuilder addReplica(URI replicaURI,
public ReplicaResponseBuilder addReplica(Uri replicaURI,
Function2WithCheckedException<Integer, RxDocumentServiceRequest, StoreResponse> invocationNumberToStoreResponse) {
responseFunctionDictionary.put(replicaURI, invocationNumberToStoreResponse);
@ -128,16 +127,16 @@ public class TransportClientWrapper {
public TransportClientWrapper build() {
Map<URI, AtomicInteger> replicaResponseCounterDict = new HashMap<>();
Map<Uri, AtomicInteger> replicaResponseCounterDict = new HashMap<>();
AtomicInteger i = new AtomicInteger(0);
AtomicBoolean valid = new AtomicBoolean(true);
List<Pair<URI, RxDocumentServiceRequest>> capturedArgs = Collections.synchronizedList(new ArrayList<>());
List<Pair<Uri, RxDocumentServiceRequest>> capturedArgs = Collections.synchronizedList(new ArrayList<>());
TransportClient transportClient = Mockito.mock(TransportClient.class);
Mockito.doAnswer(invocation -> {
i.incrementAndGet();
URI physicalUri = invocation.getArgumentAt(0, URI.class);
Uri physicalUri = invocation.getArgumentAt(0, Uri.class);
RxDocumentServiceRequest request = invocation.getArgumentAt(1, RxDocumentServiceRequest.class);
Function2WithCheckedException function = responseFunctionDictionary.get(physicalUri);
if (function == null) {
@ -163,7 +162,7 @@ public class TransportClientWrapper {
return Single.error(e);
}
}).when(transportClient).invokeResourceOperationAsync(Mockito.any(URI.class), Mockito.any(RxDocumentServiceRequest.class));
}).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
return new TransportClientWrapper(transportClient, i, valid, capturedArgs);
}
@ -190,7 +189,7 @@ public class TransportClientWrapper {
public TransportClientWrapper build() {
AtomicInteger i = new AtomicInteger(0);
AtomicBoolean valid = new AtomicBoolean(true);
List<Pair<URI, RxDocumentServiceRequest>> capturedArgs = Collections.synchronizedList(new ArrayList<>());
List<Pair<Uri, RxDocumentServiceRequest>> capturedArgs = Collections.synchronizedList(new ArrayList<>());
TransportClient transportClient = Mockito.mock(TransportClient.class);
Mockito.doAnswer(invocation -> {
@ -209,7 +208,7 @@ public class TransportClientWrapper {
return Single.error((Exception) obj);
}
}).when(transportClient).invokeResourceOperationAsync(Mockito.any(URI.class), Mockito.any(RxDocumentServiceRequest.class));
}).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
return new TransportClientWrapper(transportClient, i, valid, capturedArgs);
}
@ -233,11 +232,11 @@ public class TransportClientWrapper {
}
private static class Tuple {
URI replicaURI;
Uri replicaURI;
OperationType operationType;
ResourceType resourceType;
public Tuple(URI replicaURI, OperationType operationType, ResourceType resourceType) {
public Tuple(Uri replicaURI, OperationType operationType, ResourceType resourceType) {
this.replicaURI = replicaURI;
this.operationType = operationType;
this.resourceType = resourceType;
@ -270,7 +269,7 @@ public class TransportClientWrapper {
private Map<Tuple, List<Result>> uriToResult = new HashMap<>();
private UriToResultBuilder resultOn(URI replicaURI, OperationType operationType, ResourceType resourceType, StoreResponse rsp, Exception ex, boolean stickyResult) {
private UriToResultBuilder resultOn(Uri replicaURI, OperationType operationType, ResourceType resourceType, StoreResponse rsp, Exception ex, boolean stickyResult) {
Tuple key = new Tuple(replicaURI, operationType, resourceType);
List<Result> list = uriToResult.get(key);
if (list == null) {
@ -281,12 +280,12 @@ public class TransportClientWrapper {
return this;
}
public UriToResultBuilder storeResponseOn(URI replicaURI, OperationType operationType, ResourceType resourceType, StoreResponse response, boolean stickyResult) {
public UriToResultBuilder storeResponseOn(Uri replicaURI, OperationType operationType, ResourceType resourceType, StoreResponse response, boolean stickyResult) {
resultOn(replicaURI, operationType, resourceType, response, null, stickyResult);
return this;
}
public UriToResultBuilder exceptionOn(URI replicaURI, OperationType operationType, ResourceType resourceType, Exception exception, boolean stickyResult) {
public UriToResultBuilder exceptionOn(Uri replicaURI, OperationType operationType, ResourceType resourceType, Exception exception, boolean stickyResult) {
resultOn(replicaURI, operationType, resourceType, null, exception, stickyResult);
return this;
}
@ -294,11 +293,11 @@ public class TransportClientWrapper {
public TransportClientWrapper build() {
AtomicBoolean valid = new AtomicBoolean(true);
AtomicInteger cnt = new AtomicInteger(0);
List<Pair<URI, RxDocumentServiceRequest>> capturedArgs = Collections.synchronizedList(new ArrayList<>());
List<Pair<Uri, RxDocumentServiceRequest>> capturedArgs = Collections.synchronizedList(new ArrayList<>());
TransportClient transportClient = Mockito.mock(TransportClient.class);
Mockito.doAnswer(invocation -> {
cnt.getAndIncrement();
URI physicalUri = invocation.getArgumentAt(0, URI.class);
Uri physicalUri = invocation.getArgumentAt(0, Uri.class);
RxDocumentServiceRequest request = invocation.getArgumentAt(1, RxDocumentServiceRequest.class);
capture(capturedArgs, invocation);
@ -321,7 +320,7 @@ public class TransportClientWrapper {
return Single.error(result.exception);
}
}).when(transportClient).invokeResourceOperationAsync(Mockito.any(URI.class), Mockito.any(RxDocumentServiceRequest.class));
}).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
return new TransportClientWrapper(transportClient, cnt, valid, capturedArgs);
}

Просмотреть файл

@ -152,7 +152,10 @@ public class BaseAuthorizationTokenProvider implements AuthorizationTokenProvide
}
// Skipping lower casing of resourceId since it may now contain "ID" of the resource as part of the FullName
StringBuilder body = new StringBuilder();
int len = verb.length() + resourceSegment.length() + resourceIdOrFullName.length() +
"EEE, dd MMM yyyy HH:mm:ss zzz".length() + 5;
StringBuilder body = new StringBuilder(len);
body.append(verb.toLowerCase())
.append('\n')
.append(resourceSegment)
@ -297,7 +300,11 @@ public class BaseAuthorizationTokenProvider implements AuthorizationTokenProvide
resourceId = resourceId.toLowerCase();
}
StringBuilder payload = new StringBuilder();
String xDateOrDateOrEmpty = StringUtils.isEmpty(xDate) ? date.toLowerCase() : "";
int len = verb.length() + resourceType.length() + resourceId.length() + xDate.length() +
xDateOrDateOrEmpty.length() + 5;
StringBuilder payload = new StringBuilder(len);
payload.append(verb.toLowerCase())
.append('\n')
.append(resourceType.toLowerCase())
@ -306,7 +313,7 @@ public class BaseAuthorizationTokenProvider implements AuthorizationTokenProvide
.append('\n')
.append(xDate.toLowerCase())
.append('\n')
.append(StringUtils.isEmpty(xDate) ? date.toLowerCase() : "")
.append(xDateOrDateOrEmpty)
.append('\n');
return payload.toString();

Просмотреть файл

@ -44,6 +44,7 @@ import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientImpl;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.CompositeHttpClient;
import org.apache.commons.lang3.StringUtils;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -779,7 +780,7 @@ public class GatewayAddressCacheTest extends TestSuiteBase {
}
private static void assertEqual(AddressInformation actual, Address expected) {
assertThat(actual.getPhysicalUri()).isEqualTo(expected.getPhyicalUri());
assertThat(actual.getPhysicalUri().getURIAsString()).isEqualTo(fixPhysicalURI(expected.getPhyicalUri()));
assertThat(actual.getProtocolScheme()).isEqualTo(expected.getProtocolScheme().toLowerCase());
assertThat(actual.isPrimary()).isEqualTo(expected.IsPrimary());
}
@ -881,4 +882,15 @@ public class GatewayAddressCacheTest extends TestSuiteBase {
, uuid, uuid));
return doc;
}
private static String fixPhysicalURI(String physicalURI) {
// BE returns a physical URI ending with "//"
// this ensures there is only one "/" at the end
int i = physicalURI.length() - 1;
while(i >= 0 && physicalURI.charAt(i) == '/') {
i--;
}
return physicalURI.substring(0, i + 1) + '/';
}
}

Просмотреть файл

@ -30,6 +30,7 @@ import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import org.apache.commons.io.IOUtils;
import org.mockito.Matchers;
import org.mockito.Mockito;
@ -182,10 +183,14 @@ public class GatewayServiceConfigurationReaderTest extends TestSuiteBase {
private HttpClientResponse<ByteBuf> getMockResponse(String databaseAccountJson) {
HttpClientResponse<ByteBuf> resp = Mockito.mock(HttpClientResponse.class);
Mockito.doReturn(HttpResponseStatus.valueOf(200)).when(resp).getStatus();
Mockito.doReturn(Observable.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson)))
ByteBuf byteBuffer = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson);
Mockito.doReturn(Observable.just(byteBuffer))
.when(resp).getContent();
HttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders = httpHeaders.add(HttpConstants.HttpHeaders.CONTENT_LENGTH, byteBuffer.writerIndex());
DefaultHttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.valueOf(200), httpHeaders);