Replace request internals with fetch (#245)

* Replace internals with cross-fetch

* Remove comments

* Fix container tests

* Remove only

* Remove old files

* Accept agent

* Add timeout

* IREtryPolicy -> RetryPolicy

* Refactor interfaces

* More cleanup

* More WIP

* Fix error response

* Add timeout

* 304 -> don't parse response body

* Add href

* OperationType and ResourceType enums

* Refactor out constants

* Switch to enums

* Refactor retryUtility

* Add code to TimeoutError and use in defaultRetryPolicy

* Add back newUrl
This commit is contained in:
Steve Faulkner 2019-03-12 19:38:09 -07:00 коммит произвёл GitHub
Родитель 10fdf15356
Коммит ebe85d8071
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
34 изменённых файлов: 385 добавлений и 452 удалений

32
package-lock.json сгенерированный
Просмотреть файл

@ -270,6 +270,14 @@
"integrity": "sha1-LrHQCl5Ow/pYx2r94S4YK2bcXBw=",
"dev": true
},
"abort-controller": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/abort-controller/-/abort-controller-2.0.3.tgz",
"integrity": "sha512-EPSq5wr2aFyAZ1PejJB32IX9Qd4Nwus+adnp7STYFM5/23nLPBazqZ1oor6ZqbH+4otaaGXTlC8RN5hq3C8w9Q==",
"requires": {
"event-target-shim": "^5.0.0"
}
},
"accepts": {
"version": "1.3.5",
"resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.5.tgz",
@ -921,6 +929,15 @@
"is-windows": "^1.0.0"
}
},
"cross-fetch": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/cross-fetch/-/cross-fetch-3.0.1.tgz",
"integrity": "sha512-qWtpgBAF8ioqBOddRD+pHhrdzm/UWOArkrlIU7c08DlNbOxo5GfUbSY2vr90ZypWf0raW+HNN1F38pi5CEOjiQ==",
"requires": {
"node-fetch": "2.3.0",
"whatwg-fetch": "3.0.0"
}
},
"cross-spawn": {
"version": "6.0.5",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-6.0.5.tgz",
@ -1294,6 +1311,11 @@
"integrity": "sha1-Cr9PHKpbyx96nYrMbepPqqBLrJs=",
"dev": true
},
"event-target-shim": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz",
"integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ=="
},
"eventemitter3": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-3.1.0.tgz",
@ -3573,6 +3595,11 @@
"text-encoding": "^0.6.4"
}
},
"node-fetch": {
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.3.0.tgz",
"integrity": "sha512-MOd8pV3fxENbryESLgVIeaGKrdl+uaYhCSSVkjeOb/31/njTpcis5aWfdqgNlHIrKOLRbMnfPINPOML2CIFeXA=="
},
"normalize-path": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/normalize-path/-/normalize-path-2.1.1.tgz",
@ -5206,6 +5233,11 @@
"uuid": "^3.1.0"
}
},
"whatwg-fetch": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/whatwg-fetch/-/whatwg-fetch-3.0.0.tgz",
"integrity": "sha512-9GSJUgz1D4MfyKU7KRqwOjXCXTqWdFNvEr7eUBYchQiVc744mqK/MzXPNR2WsPkmkOa4ywfg8C2n8h+13Bey1Q=="
},
"which": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/which/-/which-1.3.0.tgz",

Просмотреть файл

@ -74,7 +74,9 @@
},
"dependencies": {
"@azure/cosmos-sign": "1.0.2",
"abort-controller": "2.0.3",
"binary-search-bounds": "2.0.3",
"cross-fetch": "3.0.1",
"priorityqueuejs": "1.0.0",
"semaphore": "1.0.5",
"tslib": "^1.9.3"

Просмотреть файл

@ -4,7 +4,7 @@ import { Resource } from "./client/Resource";
import { ConnectionPolicy, ConsistencyLevel, DatabaseAccount, QueryCompatibilityMode } from "./documents";
import { GlobalEndpointManager } from "./globalEndpointManager";
import { Constants, HTTPMethod, ResourceType } from "./common/constants";
import { Constants, HTTPMethod, OperationType, ResourceType } from "./common/constants";
import { getIdFromLink, getPathFromLink, parseConnectionPolicy, parseLink, setIsUpsertHeader } from "./common/helper";
import { StatusCodes, SubStatusCodes } from "./common/statusCodes";
import { CosmosClientOptions } from "./CosmosClientOptions";
@ -12,7 +12,8 @@ import { FetchFunctionCallback, SqlQuerySpec } from "./queryExecutionContext";
import { CosmosHeaders } from "./queryExecutionContext/CosmosHeaders";
import { QueryIterator } from "./queryIterator";
import { FeedOptions, RequestHandler, RequestOptions, Response } from "./request";
import { ErrorResponse, getHeaders } from "./request/request";
import { ErrorResponse } from "./request";
import { getHeaders } from "./request/request";
import { RequestContext } from "./request/RequestContext";
import { SessionContainer } from "./session/sessionContainer";
import { SessionContext } from "./session/SessionContext";
@ -65,17 +66,17 @@ export class ClientContext {
const request: any = {
// TODO: any
path,
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
client: this,
endpointOverride: null
};
// read will use ReadEndpoint since it uses GET operation
const endpoint = await this.globalEndpointManager.resolveServiceEndpoint(request);
const response = await this.requestHandler.get(endpoint, request, requestHeaders);
this.captureSessionToken(undefined, path, Constants.OperationTypes.Read, response.headers);
this.captureSessionToken(undefined, path, OperationType.Read, response.headers);
return response;
} catch (err) {
this.captureSessionToken(err, path, Constants.OperationTypes.Upsert, (err as ErrorResponse).headers);
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers);
throw err;
}
}
@ -95,7 +96,7 @@ export class ClientContext {
const request: any = {
// TODO: any request
path,
operationType: Constants.OperationTypes.Query,
operationType: OperationType.Query,
client: this,
endpointOverride: null
};
@ -118,7 +119,7 @@ export class ClientContext {
this.applySessionToken(path, reqHeaders);
const { result, headers: resHeaders } = await this.requestHandler.get(endpoint, request, reqHeaders);
this.captureSessionToken(undefined, path, Constants.OperationTypes.Query, resHeaders);
this.captureSessionToken(undefined, path, OperationType.Query, resHeaders);
return this.processQueryFeedResponse({ result, headers: resHeaders }, !!query, resultFn);
} else {
initialHeaders[Constants.HttpHeaders.IsQuery] = "true";
@ -151,7 +152,7 @@ export class ClientContext {
const response = await this.requestHandler.post(endpoint, request, query, reqHeaders);
const { result, headers: resHeaders } = response;
this.captureSessionToken(undefined, path, Constants.OperationTypes.Query, resHeaders);
this.captureSessionToken(undefined, path, OperationType.Query, resHeaders);
return this.processQueryFeedResponse({ result, headers: resHeaders }, !!query, resultFn);
}
}
@ -187,7 +188,7 @@ export class ClientContext {
const request: RequestContext = {
client: this,
operationType: Constants.OperationTypes.Delete,
operationType: OperationType.Delete,
path,
resourceType: type
};
@ -197,13 +198,13 @@ export class ClientContext {
const endpoint = await this.globalEndpointManager.resolveServiceEndpoint(request);
const response = await this.requestHandler.delete(endpoint, request, reqHeaders);
if (parseLink(path).type !== "colls") {
this.captureSessionToken(undefined, path, Constants.OperationTypes.Delete, response.headers);
this.captureSessionToken(undefined, path, OperationType.Delete, response.headers);
} else {
this.clearSessionToken(path);
}
return response;
} catch (err) {
this.captureSessionToken(err, path, Constants.OperationTypes.Upsert, (err as ErrorResponse).headers);
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers);
throw err;
}
}
@ -250,7 +251,7 @@ export class ClientContext {
const request: RequestContext = {
client: this,
operationType: Constants.OperationTypes.Create,
operationType: OperationType.Create,
path,
resourceType: type
};
@ -260,10 +261,10 @@ export class ClientContext {
const endpoint = await this.globalEndpointManager.resolveServiceEndpoint(request);
const response = await this.requestHandler.post(endpoint, request, body, requestHeaders);
this.captureSessionToken(undefined, path, Constants.OperationTypes.Create, response.headers);
this.captureSessionToken(undefined, path, OperationType.Create, response.headers);
return response;
} catch (err) {
this.captureSessionToken(err, path, Constants.OperationTypes.Upsert, (err as ErrorResponse).headers);
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers);
throw err;
}
}
@ -328,7 +329,7 @@ export class ClientContext {
const request: RequestContext = {
client: this,
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
path,
resourceType: type
};
@ -338,10 +339,10 @@ export class ClientContext {
// replace will use WriteEndpoint since it uses PUT operation
const endpoint = await this.globalEndpointManager.resolveServiceEndpoint(reqHeaders);
const response = await this.requestHandler.put(endpoint, request, resource, reqHeaders);
this.captureSessionToken(undefined, path, Constants.OperationTypes.Replace, response.headers);
this.captureSessionToken(undefined, path, OperationType.Replace, response.headers);
return response;
} catch (err) {
this.captureSessionToken(err, path, Constants.OperationTypes.Upsert, (err as ErrorResponse).headers);
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers);
throw err;
}
}
@ -385,7 +386,7 @@ export class ClientContext {
const request: RequestContext = {
client: this,
operationType: Constants.OperationTypes.Upsert,
operationType: OperationType.Upsert,
path,
resourceType: type
};
@ -396,10 +397,10 @@ export class ClientContext {
// upsert will use WriteEndpoint since it uses POST operation
const endpoint = await this.globalEndpointManager.resolveServiceEndpoint(request);
const response = await this.requestHandler.post(endpoint, request, body, requestHeaders);
this.captureSessionToken(undefined, path, Constants.OperationTypes.Upsert, response.headers);
this.captureSessionToken(undefined, path, OperationType.Upsert, response.headers);
return response;
} catch (err) {
this.captureSessionToken(err, path, Constants.OperationTypes.Upsert, (err as ErrorResponse).headers);
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers);
throw err;
}
}
@ -433,7 +434,7 @@ export class ClientContext {
const request: RequestContext = {
client: this,
operationType: Constants.OperationTypes.Execute,
operationType: OperationType.Execute,
path,
resourceType: ResourceType.sproc
};
@ -465,9 +466,9 @@ export class ClientContext {
const request: RequestContext = {
client: this,
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
path: "",
resourceType: "DatabaseAccount"
resourceType: ResourceType.none
};
const { result, headers } = await this.requestHandler.get(urlConnection, request, requestHeaders);
@ -485,9 +486,14 @@ export class ClientContext {
return this.globalEndpointManager.getReadEndpoint();
}
private captureSessionToken(err: ErrorResponse, path: string, opType: string, resHeaders: CosmosHeaders) {
private captureSessionToken(
err: ErrorResponse,
path: string,
operationType: OperationType,
resHeaders: CosmosHeaders
) {
const request = this.getSessionParams(path); // TODO: any request
request.operationType = opType;
request.operationType = operationType;
if (
!err ||
(!this.isMasterResource(request.resourceType) &&

Просмотреть файл

@ -1,4 +1,4 @@
import { Constants, isReadRequest, ResourceType } from "./common";
import { Constants, isReadRequest, OperationType, ResourceType } from "./common";
import { CosmosClientOptions } from "./CosmosClientOptions";
import { DatabaseAccount, Location } from "./documents";
import { LocationInfo } from "./LocationInfo";
@ -115,7 +115,8 @@ export class LocationCache {
// then default to the first two write locations, alternating (or the default endpoint)
if (
request.locationRouting.ignorePreferredLocation ||
(!isReadRequest(request) && !this.canUseMultipleWriteLocations(request))
(!isReadRequest(request.operationType) &&
!this.canUseMultipleWriteLocations(request.resourceType, request.operationType))
) {
const currentInfo = this.locationInfo;
if (currentInfo.orderedWriteLocations.length > 0) {
@ -127,7 +128,9 @@ export class LocationCache {
}
} else {
// If we're using preferred regions, then choose the correct endpoint based on the location index
const endpoints = isReadRequest(request) ? this.locationInfo.readEndpoints : this.locationInfo.writeEndpoints;
const endpoints = isReadRequest(request.operationType)
? this.locationInfo.readEndpoints
: this.locationInfo.writeEndpoints;
return endpoints[locationIndex % endpoints.length];
}
}
@ -180,14 +183,14 @@ export class LocationCache {
return { shouldRefresh: false, canRefreshInBackground };
}
public canUseMultipleWriteLocations(request?: RequestContext): boolean {
public canUseMultipleWriteLocations(resourceType?: ResourceType, operationType?: OperationType): boolean {
let canUse = this.options.connectionPolicy.UseMultipleWriteLocations && this.enableMultipleWritableLocations;
if (request) {
if (resourceType) {
canUse =
canUse &&
(request.resourceType === ResourceType.item ||
(request.resourceType === ResourceType.sproc && request.operationType === Constants.OperationTypes.Execute));
(resourceType === ResourceType.item ||
(resourceType === ResourceType.sproc && operationType === OperationType.Execute));
}
return canUse;

Просмотреть файл

@ -1,11 +1,11 @@
import { ItemDefinition } from "../Item";
import { OperationType, ResourceType } from "../../common";
export interface ConflictDefinition {
/** The id of the conflict */
id?: string;
/** Source resource id */
resourceId?: string;
resourceType?: string;
operationType?: string; // TODO: enum
resourceType?: ResourceType;
operationType?: OperationType;
content?: string;
}

Просмотреть файл

@ -210,16 +210,6 @@ export const Constants = {
DatabaseAccountPathSegment: "databaseaccount"
},
OperationTypes: {
Create: "create",
Replace: "replace",
Upsert: "upsert",
Delete: "delete",
Read: "read",
Query: "query",
Execute: "execute"
},
PartitionKeyRange: {
// Partition Key Range Constants
MinInclusive: "minInclusive",
@ -261,3 +251,13 @@ export enum HTTPMethod {
put = "PUT",
delete = "DELETE"
}
export enum OperationType {
Create = "create",
Replace = "replace",
Upsert = "upsert",
Delete = "delete",
Read = "read",
Query = "query",
Execute = "execute"
}

Просмотреть файл

@ -1,7 +1,7 @@
import { ConnectionPolicy } from "../documents/ConnectionPolicy";
import { CosmosHeaders } from "../queryExecutionContext/CosmosHeaders";
import { RequestContext } from "../request/RequestContext";
import { Constants } from "./constants";
import { Constants, OperationType, ResourceType } from "./constants";
/** @hidden */
const Regexes = Constants.RegularExpressions;
@ -45,15 +45,15 @@ export function parseLink(resourcePath: string) {
*/
const pathParts = resourcePath.split("/");
let id;
let type;
let type: ResourceType;
if (pathParts.length % 2 === 0) {
// request in form /[resourceType]/[resourceId]/ .... /[resourceType]/[resourceId].
id = pathParts[pathParts.length - 2];
type = pathParts[pathParts.length - 3];
type = pathParts[pathParts.length - 3] as ResourceType;
} else {
// request in form /[resourceType]/[resourceId]/ .... /[resourceType]/.
id = pathParts[pathParts.length - 3];
type = pathParts[pathParts.length - 2];
type = pathParts[pathParts.length - 2] as ResourceType;
}
const result = {
@ -67,10 +67,8 @@ export function parseLink(resourcePath: string) {
return result;
}
export function isReadRequest(request: RequestContext): boolean {
return (
request.operationType === Constants.OperationTypes.Read || request.operationType === Constants.OperationTypes.Query
);
export function isReadRequest(operationType: OperationType): boolean {
return operationType === OperationType.Read || operationType === OperationType.Query;
}
export function sleep(time: number): Promise<void> {

Просмотреть файл

@ -1,5 +1,5 @@
import * as url from "url";
import { Constants, sleep } from "./common";
import { Constants, OperationType, ResourceType, sleep } from "./common";
import { CosmosClientOptions } from "./CosmosClientOptions";
import { DatabaseAccount } from "./documents";
import { RequestOptions } from "./index";
@ -84,8 +84,8 @@ export class GlobalEndpointManager {
this.locationCache.markCurrentLocationUnavailableForWrite(endpoint);
}
public canUseMultipleWriteLocations(request: RequestContext) {
return this.locationCache.canUseMultipleWriteLocations(request);
public canUseMultipleWriteLocations(resourceType?: ResourceType, operationType?: OperationType) {
return this.locationCache.canUseMultipleWriteLocations(resourceType, operationType);
}
public async resolveServiceEndpoint(request: RequestContext) {

Просмотреть файл

@ -1,4 +1,4 @@
import { Response } from "../../request/request";
import { Response } from "../../request";
import { AverageAggregator, CountAggregator, MaxAggregator, MinAggregator, SumAggregator } from "../Aggregators";
import { IExecutionContext } from "../IExecutionContext";
import { CosmosHeaders } from "../index";

Просмотреть файл

@ -1,4 +1,4 @@
import { Response } from "../../request/request";
import { Response } from "../../request";
/** @hidden */
export interface IEndpointComponent {

Просмотреть файл

@ -1,4 +1,4 @@
import { Response } from "../../request/request";
import { Response } from "../../request";
import { IExecutionContext } from "../IExecutionContext";
import { IEndpointComponent } from "./IEndpointComponent";

Просмотреть файл

@ -1,4 +1,4 @@
import { Response } from "../../request/request";
import { Response } from "../../request";
import { IExecutionContext } from "../IExecutionContext";
import { IEndpointComponent } from "./IEndpointComponent";

Просмотреть файл

@ -1,4 +1,4 @@
import { Response } from "../request/request";
import { Response } from "../request";
/** @hidden */
export interface IExecutionContext {

Просмотреть файл

@ -1,7 +1,7 @@
import { ClientContext } from "../ClientContext";
import { Constants, getIdFromLink, getPathFromLink, ResourceType, StatusCodes, SubStatusCodes } from "../common";
import { FeedOptions } from "../request";
import { Response } from "../request/request";
import { Response } from "../request";
import { DefaultQueryExecutionContext } from "./defaultQueryExecutionContext";
import { FetchResult, FetchResultType } from "./FetchResult";
import { CosmosHeaders, getInitialHeader, mergeHeaders } from "./headerUtils";

Просмотреть файл

@ -3,7 +3,7 @@ import PriorityQueue from "priorityqueuejs";
import semaphore from "semaphore";
import { ClientContext } from "../ClientContext";
import { StatusCodes, SubStatusCodes } from "../common/statusCodes";
import { Response } from "../request/request";
import { Response } from "../request";
import { QueryRange } from "../routing/QueryRange";
import { PARITIONKEYRANGE, SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider";
import { CosmosHeaders } from "./CosmosHeaders";

Просмотреть файл

@ -1,5 +1,5 @@
import { ClientContext } from "../ClientContext";
import { Response } from "../request/request";
import { Response } from "../request";
import { CosmosHeaders } from "./CosmosHeaders";
import {
AggregateEndpointComponent,

Просмотреть файл

@ -1,6 +1,6 @@
import { ClientContext } from "../ClientContext";
import { StatusCodes, SubStatusCodes } from "../common/statusCodes";
import { Response } from "../request/request";
import { Response } from "../request";
import { DefaultQueryExecutionContext, FetchFunctionCallback } from "./defaultQueryExecutionContext";
import { IExecutionContext } from "./IExecutionContext";
import { PartitionedQueryExecutionContextInfo } from "./partitionedQueryExecutionContextInfoParser";

Просмотреть файл

@ -1,11 +1,12 @@
import { ClientContext } from "../ClientContext";
import { OperationType, ResourceType } from "../common";
import { LocationRouting } from "./LocationRouting";
export interface RequestContext {
path?: string;
operationType?: string;
operationType?: OperationType;
client?: ClientContext;
retryCount?: number;
resourceType?: string;
resourceType?: ResourceType;
locationRouting?: LocationRouting;
}

Просмотреть файл

@ -1,13 +1,18 @@
import { AbortController } from "abort-controller";
import fetch from "cross-fetch";
import { Agent, OutgoingHttpHeaders } from "http";
import { RequestOptions } from "https"; // TYPES ONLY
import * as querystring from "querystring";
import { Constants } from "../common/constants";
import { parse } from "url";
import { Constants, HTTPMethod } from "../common/constants";
import { ConnectionPolicy } from "../documents";
import { GlobalEndpointManager } from "../globalEndpointManager";
import { CosmosHeaders } from "../queryExecutionContext/CosmosHeaders";
import * as RetryUtility from "../retry/retryUtility";
import { bodyFromData, createRequestObject, parse, Response } from "./request";
import { ErrorResponse } from "./ErrorResponse";
import { bodyFromData } from "./request";
import { RequestContext } from "./RequestContext";
import { Response } from "./Response";
import { TimeoutError } from "./TimeoutError";
/** @hidden */
export class RequestHandler {
@ -19,9 +24,73 @@ export class RequestHandler {
public static async createRequestObjectStub(
connectionPolicy: ConnectionPolicy,
requestOptions: RequestOptions,
body: Buffer
body?: any
) {
return createRequestObject(connectionPolicy, requestOptions, body);
let didTimeout: boolean;
const controller = new AbortController();
const signal = controller.signal;
const timeout = setTimeout(() => {
didTimeout = true;
controller.abort();
}, connectionPolicy.RequestTimeout);
let response: any;
try {
// TODO Remove any
response = await fetch((requestOptions as any).href + requestOptions.path, {
method: requestOptions.method,
headers: requestOptions.headers as any,
agent: requestOptions.agent,
signal,
...(body && { body })
} as any); // TODO Remove any. Upstream issue https://github.com/lquixada/cross-fetch/issues/42
} catch (error) {
if (error.name === "AbortError") {
if (didTimeout === true) {
throw new TimeoutError();
}
// TODO handle user requested cancellation here
}
throw error;
}
clearTimeout(timeout);
const result = response.status === 204 || response.status === 304 ? null : await response.json();
const headers = {} as any;
response.headers.forEach((value: string, key: string) => {
headers[key] = value;
});
if (response.status >= 400) {
const errorResponse: ErrorResponse = {
code: response.status,
// TODO Upstream code expects this as a string.
// So after parsing to JSON we convert it back to string if there is an error
body: JSON.stringify(result),
headers
};
if (Constants.HttpHeaders.ActivityId in headers) {
errorResponse.activityId = headers[Constants.HttpHeaders.ActivityId];
}
if (Constants.HttpHeaders.SubStatus in headers) {
errorResponse.substatus = parseInt(headers[Constants.HttpHeaders.SubStatus], 10);
}
if (Constants.HttpHeaders.RetryAfterInMilliseconds in headers) {
errorResponse.retryAfterInMilliseconds = parseInt(headers[Constants.HttpHeaders.RetryAfterInMilliseconds], 10);
}
return Promise.reject(errorResponse);
}
return Promise.resolve({
headers,
result,
statusCode: response.status
});
}
/**
@ -41,11 +110,10 @@ export class RequestHandler {
globalEndpointManager: GlobalEndpointManager,
connectionPolicy: ConnectionPolicy,
requestAgent: Agent,
method: string,
method: HTTPMethod,
hostname: string,
request: RequestContext,
data: string | Buffer,
queryParams: any, // TODO: any query params types
headers: CosmosHeaders
): Promise<Response<any>> {
// TODO: any
@ -64,22 +132,6 @@ export class RequestHandler {
}
}
let buffer;
if (body) {
if (Buffer.isBuffer(body)) {
buffer = body;
} else if (typeof body === "string") {
buffer = Buffer.from(body, "utf8");
} else {
return {
result: {
message: "body must be string or Buffer"
},
headers: undefined
};
}
}
const requestOptions: RequestOptions = parse(hostname);
requestOptions.method = method;
requestOptions.path += path;
@ -91,30 +143,14 @@ export class RequestHandler {
requestOptions.rejectUnauthorized = false;
}
if (queryParams) {
requestOptions.path += "?" + querystring.stringify(queryParams);
}
if (buffer) {
requestOptions.headers[Constants.HttpHeaders.ContentLength] = buffer.length;
return RetryUtility.execute(
globalEndpointManager,
buffer,
this.createRequestObjectStub,
connectionPolicy,
requestOptions,
request
);
} else {
return RetryUtility.execute(
globalEndpointManager,
null,
this.createRequestObjectStub,
connectionPolicy,
requestOptions,
request
);
}
return RetryUtility.execute({
globalEndpointManager,
body,
createRequestObjectFunc: this.createRequestObjectStub,
connectionPolicy,
requestOptions,
request
});
}
/** @ignore */
@ -124,11 +160,10 @@ export class RequestHandler {
this.globalEndpointManager,
this.connectionPolicy,
this.requestAgent,
"GET",
HTTPMethod.get,
urlString,
request,
undefined,
"",
headers
);
}
@ -140,11 +175,10 @@ export class RequestHandler {
this.globalEndpointManager,
this.connectionPolicy,
this.requestAgent,
"POST",
HTTPMethod.post,
urlString,
request,
body,
"",
headers
);
}
@ -156,27 +190,10 @@ export class RequestHandler {
this.globalEndpointManager,
this.connectionPolicy,
this.requestAgent,
"PUT",
HTTPMethod.put,
urlString,
request,
body,
"",
headers
);
}
/** @ignore */
public head(urlString: string, request: any, headers: CosmosHeaders) {
// TODO: any
return RequestHandler.request(
this.globalEndpointManager,
this.connectionPolicy,
this.requestAgent,
"HEAD",
urlString,
request,
undefined,
"",
headers
);
}
@ -187,11 +204,10 @@ export class RequestHandler {
this.globalEndpointManager,
this.connectionPolicy,
this.requestAgent,
"DELETE",
HTTPMethod.delete,
urlString,
request,
undefined,
"",
headers
);
}

Просмотреть файл

@ -0,0 +1,9 @@
export const TimeoutErrorCode = "TimeoutError";
export class TimeoutError extends Error {
public readonly code: string = TimeoutErrorCode;
constructor(message?: string) {
super(message);
this.name = TimeoutErrorCode;
}
}

Просмотреть файл

@ -1,20 +1,7 @@
import http from "http";
import https from "https";
import { Socket } from "net";
import { Stream } from "stream";
import * as url from "url";
import { Constants, HTTPMethod, jsonStringifyAndEscapeNonASCII, ResourceType } from "../common";
import { ConnectionPolicy, MediaReadMode } from "../documents";
import { CosmosHeaders } from "../queryExecutionContext";
import { ErrorResponse } from "./ErrorResponse";
export { ErrorResponse }; // Should refactor this out
import { AuthOptions, setAuthorizationHeader } from "../auth";
import { Constants, HTTPMethod, jsonStringifyAndEscapeNonASCII, ResourceType } from "../common";
import { CosmosHeaders } from "../queryExecutionContext";
import { FeedOptions, MediaOptions, RequestOptions } from "./index";
import { Response } from "./Response";
export { Response }; // Should refactor this out
// ----------------------------------------------------------------------------
// Utility methods
@ -30,110 +17,11 @@ function javaScriptFriendlyJSONStringify(s: object) {
}
/** @hidden */
export function bodyFromData(data: Stream | Buffer | string | object) {
if ((data as Stream).pipe) {
return data;
}
if (Buffer.isBuffer(data)) {
return data;
}
if (typeof data === "string") {
return data;
}
export function bodyFromData(data: Buffer | string | object) {
if (typeof data === "object") {
return javaScriptFriendlyJSONStringify(data);
}
return undefined;
}
/** @hidden */
export function parse(urlString: string) {
return url.parse(urlString);
}
/** @hidden */
export function createRequestObject(
connectionPolicy: ConnectionPolicy,
requestOptions: https.RequestOptions,
body: Buffer
): Promise<Response<any>> {
return new Promise<Response<any>>((resolve, reject) => {
function onTimeout() {
httpsRequest.abort();
}
const httpsRequest: http.ClientRequest = https.request(requestOptions, (response: http.IncomingMessage) => {
let data = "";
response.setEncoding("utf8");
response.on("data", (chunk: any) => {
data += chunk;
});
response.on("end", () => {
if (response.statusCode >= 400) {
return reject(getErrorBody(response, data, response.headers as CosmosHeaders));
}
let result;
try {
result = data.length > 0 ? JSON.parse(data) : undefined;
} catch (exception) {
return reject(exception);
}
resolve({ result, headers: response.headers as CosmosHeaders, statusCode: response.statusCode });
});
});
httpsRequest.once("socket", (socket: Socket) => {
socket.setTimeout(connectionPolicy.RequestTimeout);
socket.once("timeout", onTimeout);
httpsRequest.once("response", () => {
socket.removeListener("timeout", onTimeout);
});
});
httpsRequest.once("error", reject);
if (body) {
httpsRequest.write(body);
httpsRequest.end();
} else {
httpsRequest.end();
}
});
}
/**
* Constructs the error body from the response and the data returned from the request.
* @param {object} response - response object returned from the executon of a request.
* @param {object} data - the data body returned from the executon of a request.
* @hidden
*/
function getErrorBody(response: http.IncomingMessage, data: string, headers: CosmosHeaders): ErrorResponse {
const errorBody: ErrorResponse = {
code: response.statusCode,
body: data,
headers
};
if (Constants.HttpHeaders.ActivityId in response.headers) {
errorBody.activityId = response.headers[Constants.HttpHeaders.ActivityId] as string;
}
if (Constants.HttpHeaders.SubStatus in response.headers) {
errorBody.substatus = parseInt(response.headers[Constants.HttpHeaders.SubStatus] as string, 10);
}
if (Constants.HttpHeaders.RetryAfterInMilliseconds in response.headers) {
errorBody.retryAfterInMilliseconds = parseInt(
response.headers[Constants.HttpHeaders.RetryAfterInMilliseconds] as string,
10
);
}
return errorBody;
return data;
}
export async function getHeaders(

Просмотреть файл

@ -1,7 +1,7 @@
import { ErrorResponse } from "../request";
import { RetryContext } from "./RetryContext";
export interface IRetryPolicy {
export interface RetryPolicy {
retryAfterInMilliseconds: number;
shouldRetry: (
errorResponse: ErrorResponse,

Просмотреть файл

@ -1,83 +1,81 @@
import { OperationType } from "../common";
import { ErrorResponse } from "../request";
import { TimeoutErrorCode } from "../request/TimeoutError";
import { RetryPolicy } from "./RetryPolicy";
// Windows Socket Error Codes
const WindowsInterruptedFunctionCall = 10004;
const WindowsFileHandleNotValid = 10009;
const WindowsPermissionDenied = 10013;
const WindowsBadAddress = 10014;
const WindowsInvalidArgumnet = 10022;
const WindowsResourceTemporarilyUnavailable = 10035;
const WindowsOperationNowInProgress = 10036;
const WindowsAddressAlreadyInUse = 10048;
const WindowsConnectionResetByPeer = 10054;
const WindowsCannotSendAfterSocketShutdown = 10058;
const WindowsConnectionTimedOut = 10060;
const WindowsConnectionRefused = 10061;
const WindowsNameTooLong = 10063;
const WindowsHostIsDown = 10064;
const WindowsNoRouteTohost = 10065;
// Linux Error Codes
const LinuxConnectionReset = "ECONNRESET";
const CONNECTION_ERROR_CODES = [
WindowsInterruptedFunctionCall,
WindowsFileHandleNotValid,
WindowsPermissionDenied,
WindowsBadAddress,
WindowsInvalidArgumnet,
WindowsResourceTemporarilyUnavailable,
WindowsOperationNowInProgress,
WindowsAddressAlreadyInUse,
WindowsConnectionResetByPeer,
WindowsCannotSendAfterSocketShutdown,
WindowsConnectionTimedOut,
WindowsConnectionRefused,
WindowsNameTooLong,
WindowsHostIsDown,
WindowsNoRouteTohost,
LinuxConnectionReset,
TimeoutErrorCode
];
function needsRetry(operationType: OperationType, code: number | string) {
if (
(operationType === OperationType.Read || operationType === OperationType.Query) &&
CONNECTION_ERROR_CODES.indexOf(code) !== -1
) {
return true;
} else {
return false;
}
}
/**
* This class implements the default connection retry policy for requests.
* @property {int} currentRetryAttemptCount - Current retry attempt count.
* @hidden
*/
export class DefaultRetryPolicy {
export class DefaultRetryPolicy implements RetryPolicy {
private maxRetryAttemptCount: number = 10;
private currentRetryAttemptCount: number = 0;
public retryAfterInMilliseconds: number = 1000;
// Windows Socket Error Codes
private WindowsInterruptedFunctionCall: number = 10004;
private WindowsFileHandleNotValid: number = 10009;
private WindowsPermissionDenied: number = 10013;
private WindowsBadAddress: number = 10014;
private WindowsInvalidArgumnet: number = 10022;
private WindowsResourceTemporarilyUnavailable: number = 10035;
private WindowsOperationNowInProgress: number = 10036;
private WindowsAddressAlreadyInUse: number = 10048;
private WindowsConnectionResetByPeer: number = 10054;
private WindowsCannotSendAfterSocketShutdown: number = 10058;
private WindowsConnectionTimedOut: number = 10060;
private WindowsConnectionRefused: number = 10061;
private WindowsNameTooLong: number = 10063;
private WindowsHostIsDown: number = 10064;
private WindowsNoRouteTohost: number = 10065;
// Linux Error Codes
private LinuxConnectionReset = "ECONNRESET";
private CONNECTION_ERROR_CODES: any[] = [
this.WindowsInterruptedFunctionCall,
this.WindowsFileHandleNotValid,
this.WindowsPermissionDenied,
this.WindowsBadAddress,
this.WindowsInvalidArgumnet,
this.WindowsResourceTemporarilyUnavailable,
this.WindowsOperationNowInProgress,
this.WindowsAddressAlreadyInUse,
this.WindowsConnectionResetByPeer,
this.WindowsCannotSendAfterSocketShutdown,
this.WindowsConnectionTimedOut,
this.WindowsConnectionRefused,
this.WindowsNameTooLong,
this.WindowsHostIsDown,
this.WindowsNoRouteTohost,
this.LinuxConnectionReset
];
/**
* @constructor ResourceThrottleRetryPolicy
* @param {string} operationType - The type of operation being performed.
*/
constructor(private operationType: string) {}
constructor(private operationType: OperationType) {}
/**
* Determines whether the request should be retried or not.
* @param {object} err - Error returned by the request.
* @param {function} callback - The callback function which takes bool argument which
* specifies whether the request will be retried or not.
*/
public async shouldRetry(err: ErrorResponse): Promise<boolean> {
if (err) {
if (this.currentRetryAttemptCount < this.maxRetryAttemptCount && this.needs_retry(err.code)) {
if (this.currentRetryAttemptCount < this.maxRetryAttemptCount && needsRetry(this.operationType, err.code)) {
this.currentRetryAttemptCount++;
return true;
}
}
return false;
}
private needs_retry(code: number | string) {
if (
(this.operationType === "read" || this.operationType === "query") &&
this.CONNECTION_ERROR_CODES.indexOf(code) !== -1
) {
return true;
} else {
return false;
}
}
}

Просмотреть файл

@ -1,15 +1,15 @@
import { isReadRequest } from "../common/helper";
import { OperationType } from "../common";
import { isReadRequest } from "../common/helper";
import { GlobalEndpointManager } from "../globalEndpointManager";
import { ErrorResponse } from "../request/request";
import { RequestContext } from "../request/RequestContext";
import { IRetryPolicy } from "./IRetryPolicy";
import { ErrorResponse } from "../request";
import { RetryContext } from "./RetryContext";
import { RetryPolicy } from "./RetryPolicy";
/**
* This class implements the retry policy for endpoint discovery.
* @hidden
*/
export class EndpointDiscoveryRetryPolicy implements IRetryPolicy {
export class EndpointDiscoveryRetryPolicy implements RetryPolicy {
/** Current retry attempt count. */
public currentRetryAttemptCount: number;
/** Retry interval in milliseconds. */
@ -24,7 +24,7 @@ export class EndpointDiscoveryRetryPolicy implements IRetryPolicy {
* @constructor EndpointDiscoveryRetryPolicy
* @param {object} globalEndpointManager The GlobalEndpointManager instance.
*/
constructor(private globalEndpointManager: GlobalEndpointManager, private request: RequestContext) {
constructor(private globalEndpointManager: GlobalEndpointManager, private operationType: OperationType) {
this.maxRetryAttemptCount = EndpointDiscoveryRetryPolicy.maxRetryAttemptCount;
this.currentRetryAttemptCount = 0;
this.retryAfterInMilliseconds = EndpointDiscoveryRetryPolicy.retryAfterInMilliseconds;
@ -57,7 +57,7 @@ export class EndpointDiscoveryRetryPolicy implements IRetryPolicy {
this.currentRetryAttemptCount++;
if (isReadRequest(this.request)) {
if (isReadRequest(this.operationType)) {
this.globalEndpointManager.markCurrentLocationUnavailableForRead(locationEndpoint);
} else {
this.globalEndpointManager.markCurrentLocationUnavailableForWrite(locationEndpoint);

Просмотреть файл

@ -1,5 +1,4 @@
import { StatusCodes } from "../common";
import { ErrorResponse } from "../request/request";
import { ErrorResponse } from "../request";
/**
* This class implements the resource throttle retry policy for requests.

Просмотреть файл

@ -10,9 +10,9 @@ import { LocationRouting } from "../request/LocationRouting";
import { RequestContext } from "../request/RequestContext";
import { DefaultRetryPolicy } from "./defaultRetryPolicy";
import { EndpointDiscoveryRetryPolicy } from "./endpointDiscoveryRetryPolicy";
import { IRetryPolicy } from "./IRetryPolicy";
import { ResourceThrottleRetryPolicy } from "./resourceThrottleRetryPolicy";
import { RetryContext } from "./RetryContext";
import { RetryPolicy } from "./RetryPolicy";
import { SessionRetryPolicy } from "./sessionRetryPolicy";
/** @hidden */
@ -22,75 +22,56 @@ export type CreateRequestObjectStubFunction = (
body: Buffer
) => Promise<Response<any>>; // TODO: any response
/**
* Executes the retry policy for the created request object.
* @param {object} globalEndpointManager - an instance of GlobalEndpointManager class.
* @param {object} body - request body. A buffer or a string.
* @param {function} createRequestObjectStub - stub function that creates the request object.
* @param {object} connectionPolicy - an instance of ConnectionPolicy that has the connection configs.
* @param {RequestOptions} requestOptions - The request options.
* @param {function} callback - the callback that will be called when the request is finished executing.
*/
export async function execute(
globalEndpointManager: GlobalEndpointManager,
body: Buffer,
createRequestObjectFunc: CreateRequestObjectStubFunction,
connectionPolicy: ConnectionPolicy,
requestOptions: RequestOptions,
request: RequestContext
): Promise<Response<any>> {
// TODO: any request
const r: RequestContext = typeof request !== "string" ? request : { path: "", operationType: "nonReadOps" };
const endpointDiscoveryRetryPolicy = new EndpointDiscoveryRetryPolicy(globalEndpointManager, r);
const resourceThrottleRetryPolicy = new ResourceThrottleRetryPolicy(
connectionPolicy.RetryOptions.MaxRetryAttemptCount,
connectionPolicy.RetryOptions.FixedRetryIntervalInMilliseconds,
connectionPolicy.RetryOptions.MaxWaitTimeInSeconds
);
const sessionReadRetryPolicy = new SessionRetryPolicy(globalEndpointManager, r, connectionPolicy);
const defaultRetryPolicy = new DefaultRetryPolicy(request.operationType);
return apply(
body,
createRequestObjectFunc,
connectionPolicy,
requestOptions,
endpointDiscoveryRetryPolicy,
resourceThrottleRetryPolicy,
sessionReadRetryPolicy,
defaultRetryPolicy,
globalEndpointManager,
request,
{}
);
interface ExecuteArgs {
globalEndpointManager: GlobalEndpointManager;
body: Buffer;
createRequestObjectFunc: CreateRequestObjectStubFunction;
connectionPolicy: ConnectionPolicy;
requestOptions: RequestOptions;
request: RequestContext;
retryContext?: RetryContext;
retryPolicies?: RetryPolicies;
}
/**
* Applies the retry policy for the created request object.
* @param {object} body - request body. A buffer or a string.
* @param {function} createRequestObjectFunc - function that creates the request object.
* @param {object} connectionPolicy - an instance of ConnectionPolicy that has the connection configs.
* @param {RequestOptions} requestOptions - The request options.
* @param {EndpointDiscoveryRetryPolicy} endpointDiscoveryRetryPolicy - The endpoint discovery retry policy \
* instance.
* @param {ResourceThrottleRetryPolicy} resourceThrottleRetryPolicy - The resource throttle retry policy instance.
* @param {function} callback - the callback that will be called when the response is retrieved and processed.
*/
export async function apply(
body: Buffer,
createRequestObjectFunc: CreateRequestObjectStubFunction,
connectionPolicy: ConnectionPolicy,
requestOptions: RequestOptions,
endpointDiscoveryRetryPolicy: EndpointDiscoveryRetryPolicy,
resourceThrottleRetryPolicy: ResourceThrottleRetryPolicy,
sessionReadRetryPolicy: SessionRetryPolicy,
defaultRetryPolicy: DefaultRetryPolicy,
globalEndpointManager: GlobalEndpointManager,
request: RequestContext,
retryContext: RetryContext
): Promise<Response<any>> {
interface RetryPolicies {
endpointDiscoveryRetryPolicy: EndpointDiscoveryRetryPolicy;
resourceThrottleRetryPolicy: ResourceThrottleRetryPolicy;
sessionReadRetryPolicy: SessionRetryPolicy;
defaultRetryPolicy: DefaultRetryPolicy;
}
export async function execute({
body,
createRequestObjectFunc,
connectionPolicy,
requestOptions,
globalEndpointManager,
request,
retryContext,
retryPolicies
}: ExecuteArgs): Promise<Response<any>> {
// TODO: any response
if (!retryContext) {
retryContext = {};
}
if (!retryPolicies) {
retryPolicies = {
endpointDiscoveryRetryPolicy: new EndpointDiscoveryRetryPolicy(globalEndpointManager, request.operationType),
resourceThrottleRetryPolicy: new ResourceThrottleRetryPolicy(
connectionPolicy.RetryOptions.MaxRetryAttemptCount,
connectionPolicy.RetryOptions.FixedRetryIntervalInMilliseconds,
connectionPolicy.RetryOptions.MaxWaitTimeInSeconds
),
sessionReadRetryPolicy: new SessionRetryPolicy(
globalEndpointManager,
request.resourceType,
request.operationType,
connectionPolicy
),
defaultRetryPolicy: new DefaultRetryPolicy(request.operationType)
};
}
const httpsRequest = createRequestObjectFunc(connectionPolicy, requestOptions, body);
if (!request.locationRouting) {
request.locationRouting = new LocationRouting();
@ -110,45 +91,44 @@ export async function apply(
request.locationRouting.routeToLocation(locationEndpoint);
try {
const { result, headers } = await (httpsRequest as Promise<Response<any>>);
headers[Constants.ThrottleRetryCount] = resourceThrottleRetryPolicy.currentRetryAttemptCount;
headers[Constants.ThrottleRetryWaitTimeInMs] = resourceThrottleRetryPolicy.cummulativeWaitTimeinMilliseconds;
headers[Constants.ThrottleRetryCount] = retryPolicies.resourceThrottleRetryPolicy.currentRetryAttemptCount;
headers[Constants.ThrottleRetryWaitTimeInMs] =
retryPolicies.resourceThrottleRetryPolicy.cummulativeWaitTimeinMilliseconds;
return { result, headers };
} catch (err) {
// TODO: any error
let retryPolicy: IRetryPolicy = null;
let retryPolicy: RetryPolicy = null;
const headers = err.headers || {};
if (err.code === StatusCodes.Forbidden && err.substatus === SubStatusCodes.WriteForbidden) {
retryPolicy = endpointDiscoveryRetryPolicy;
retryPolicy = retryPolicies.endpointDiscoveryRetryPolicy;
} else if (err.code === StatusCodes.TooManyRequests) {
retryPolicy = resourceThrottleRetryPolicy;
retryPolicy = retryPolicies.resourceThrottleRetryPolicy;
} else if (err.code === StatusCodes.NotFound && err.substatus === SubStatusCodes.ReadSessionNotAvailable) {
retryPolicy = sessionReadRetryPolicy;
retryPolicy = retryPolicies.sessionReadRetryPolicy;
} else {
retryPolicy = defaultRetryPolicy;
retryPolicy = retryPolicies.defaultRetryPolicy;
}
const results = await retryPolicy.shouldRetry(err, retryContext);
if (!results) {
headers[Constants.ThrottleRetryCount] = resourceThrottleRetryPolicy.currentRetryAttemptCount;
headers[Constants.ThrottleRetryWaitTimeInMs] = resourceThrottleRetryPolicy.cummulativeWaitTimeinMilliseconds;
headers[Constants.ThrottleRetryCount] = retryPolicies.resourceThrottleRetryPolicy.currentRetryAttemptCount;
headers[Constants.ThrottleRetryWaitTimeInMs] =
retryPolicies.resourceThrottleRetryPolicy.cummulativeWaitTimeinMilliseconds;
err.headers = { ...err.headers, ...headers };
throw err;
} else {
request.retryCount++;
const newUrl = (results as any)[1]; // TODO: any hack
await sleep(retryPolicy.retryAfterInMilliseconds);
return apply(
return execute({
body,
createRequestObjectFunc,
connectionPolicy,
requestOptions,
endpointDiscoveryRetryPolicy,
resourceThrottleRetryPolicy,
sessionReadRetryPolicy,
defaultRetryPolicy,
globalEndpointManager,
request,
retryContext
);
retryContext,
retryPolicies
});
}
}
}

Просмотреть файл

@ -1,16 +1,15 @@
import { isReadRequest } from "../common";
import { isReadRequest, OperationType, ResourceType } from "../common";
import { ConnectionPolicy } from "../documents";
import { GlobalEndpointManager } from "../globalEndpointManager";
import { ErrorResponse } from "../request/request";
import { RequestContext } from "../request/RequestContext";
import { IRetryPolicy } from "./IRetryPolicy";
import { ErrorResponse } from "../request";
import { RetryContext } from "./RetryContext";
import { RetryPolicy } from "./RetryPolicy";
/**
* This class implements the retry policy for session consistent reads.
* @hidden
*/
export class SessionRetryPolicy implements IRetryPolicy {
export class SessionRetryPolicy implements RetryPolicy {
/** Current retry attempt count. */
public currentRetryAttemptCount = 0;
/** Retry interval in milliseconds. */
@ -23,7 +22,8 @@ export class SessionRetryPolicy implements IRetryPolicy {
*/
constructor(
private globalEndpointManager: GlobalEndpointManager,
private request: RequestContext,
private resourceType: ResourceType,
private operationType: OperationType,
private connectionPolicy: ConnectionPolicy
) {}
@ -46,9 +46,9 @@ export class SessionRetryPolicy implements IRetryPolicy {
return false;
}
if (this.globalEndpointManager.canUseMultipleWriteLocations(this.request)) {
if (this.globalEndpointManager.canUseMultipleWriteLocations(this.resourceType, this.operationType)) {
// If we can write to multiple locations, we should against every write endpoint until we succeed
const endpoints = isReadRequest(this.request)
const endpoints = isReadRequest(this.operationType)
? await this.globalEndpointManager.getReadEndpoints()
: await this.globalEndpointManager.getWriteEndpoints();
if (this.currentRetryAttemptCount > endpoints.length) {

Просмотреть файл

@ -1,7 +1,9 @@
import { OperationType, ResourceType } from "../common";
export interface SessionContext {
resourceId?: string;
resourceAddress?: string;
resourceType?: string; // TODO: enum
resourceType?: ResourceType;
isNameBased?: boolean;
operationType?: string; // TODO: enum
operationType?: OperationType;
}

Просмотреть файл

@ -1,4 +1,4 @@
import { Constants, getContainerLink, trimSlashes } from "../common";
import { Constants, getContainerLink, OperationType, ResourceType, trimSlashes } from "../common";
import { CosmosHeaders } from "../queryExecutionContext";
import { SessionContext } from "./SessionContext";
import { VectorSessionToken } from "./VectorSessionToken";
@ -129,7 +129,7 @@ export class SessionContainer {
}
// TODO: have a assert if the type doesn't mastch known types
private static isReadingFromMaster(resourceType: string, operationType: string): boolean {
private static isReadingFromMaster(resourceType: ResourceType, operationType: OperationType): boolean {
if (
resourceType === Constants.Path.OffersPathSegment ||
resourceType === Constants.Path.DatabasesPathSegment ||
@ -138,7 +138,7 @@ export class SessionContainer {
resourceType === Constants.Path.TopologyPathSegment ||
resourceType === Constants.Path.DatabaseAccountPathSegment ||
resourceType === Constants.Path.PartitionKeyRangesPathSegment ||
(resourceType === Constants.Path.CollectionsPathSegment && operationType === Constants.OperationTypes.Query)
(resourceType === Constants.Path.CollectionsPathSegment && operationType === OperationType.Query)
) {
return true;
}

Просмотреть файл

@ -19,7 +19,7 @@ describe("NodeJS CRUD Tests", function() {
await getTestDatabase("request timeout", client);
assert.fail("Must throw when trying to connect to database");
} catch (err) {
assert.equal(err.code, "ECONNRESET", "client should throw exception");
assert.equal(err.name, "TimeoutError", "client should throw exception");
}
});
});

Просмотреть файл

@ -2,7 +2,6 @@ import assert from "assert";
import * as util from "util";
import { Container, ContainerDefinition } from "../../client";
import { DataType, IndexKind, PartitionKind } from "../../documents";
import { Constants } from "../../index";
import { SqlQuerySpec } from "../../queryExecutionContext";
import { QueryIterator } from "../../queryIterator";
import { bulkInsertItems, getTestContainer, removeAllDatabases } from "../common/TestHelpers";

Просмотреть файл

@ -1,7 +1,7 @@
import assert from "assert";
import * as sinon from "sinon";
import { ClientContext } from "../../ClientContext";
import { trimSlashes } from "../../common";
import { OperationType, ResourceType, trimSlashes } from "../../common";
import { ConsistencyLevel, PartitionKind } from "../../documents";
import { Constants, CosmosClient, CosmosHeaders } from "../../index";
import { RequestHandler } from "../../request";
@ -76,9 +76,9 @@ describe("Session Token", function() {
const token = sessionContainer.get({
isNameBased: true,
operationType: "create",
operationType: OperationType.Create,
resourceAddress: container.url,
resourceType: "docs",
resourceType: ResourceType.item,
resourceId: "2"
});
const { resource: document2 } = await container.items.create({ id: "2" });
@ -101,9 +101,9 @@ describe("Session Token", function() {
const readToken = sessionContainer.get({
isNameBased: true,
operationType: "read",
operationType: OperationType.Read,
resourceAddress: container.url,
resourceType: "docs",
resourceType: ResourceType.item,
resourceId: "1"
});
await container.item(document1.id, "1").read();
@ -126,9 +126,9 @@ describe("Session Token", function() {
const upsertToken = sessionContainer.get({
isNameBased: true,
operationType: "upsert",
operationType: OperationType.Upsert,
resourceAddress: container.url,
resourceType: "docs",
resourceType: ResourceType.item,
resourceId: "1"
});
const { resource: document13 } = await container.items.upsert(
@ -160,9 +160,9 @@ describe("Session Token", function() {
const deleteToken = sessionContainer.get({
isNameBased: true,
operationType: "delete",
operationType: OperationType.Delete,
resourceAddress: container.url,
resourceType: "docs",
resourceType: ResourceType.item,
resourceId: "2"
});
await container.item(document2.id, "2").delete();
@ -191,9 +191,9 @@ describe("Session Token", function() {
const replaceToken = sessionContainer.get({
isNameBased: true,
operationType: "replace",
operationType: OperationType.Replace,
resourceAddress: container.url,
resourceType: "docs",
resourceType: ResourceType.item,
resourceId: "1"
});
await container.item(document13.id).replace({ id: "1", operation: "replace" }, { partitionKey: "1" });
@ -225,9 +225,9 @@ describe("Session Token", function() {
const queryToken = sessionContainer.get({
isNameBased: true,
operationType: "query",
operationType: OperationType.Query,
resourceAddress: container.url,
resourceType: "docs"
resourceType: ResourceType.item
});
await queryIterator.fetchAll();
assert.equal(postSpy.lastCall.args[3][Constants.HttpHeaders.SessionToken], queryToken);
@ -249,9 +249,9 @@ describe("Session Token", function() {
const deleteContainerToken = sessionContainer.get({
isNameBased: true,
operationType: "delete",
operationType: OperationType.Delete,
resourceAddress: container.url,
resourceType: "container",
resourceType: ResourceType.container,
resourceId: container.id
});
await container.delete();

Просмотреть файл

@ -3,7 +3,7 @@ import { ConnectionPolicy, DatabaseAccount, Location } from "../../documents";
import { LocationCache } from "../../LocationCache";
import * as assert from "assert";
import { Constants, ResourceType } from "../../common";
import { OperationType, ResourceType } from "../../common";
const scenarios: Scenario[] = [];
const regions = ["westus", "East US", "eastus2", "south Centralus", "sEasIa"];
@ -122,7 +122,7 @@ describe("Location Cache", function() {
it(`read request for resolve endpoint, retry count 0, should match read endpoint`, function() {
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
resourceType: ResourceType.item,
retryCount: 0
});
@ -133,7 +133,7 @@ describe("Location Cache", function() {
it(`write request for resolve endpoint, retry count 0, should match write endpoint`, function() {
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 0
});
@ -155,7 +155,7 @@ describe("Location Cache", function() {
"write endpoint should match default endpoint prior to any database account info"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 1
});
@ -176,7 +176,7 @@ describe("Location Cache", function() {
"write endpoint should match default endpoint prior to any database account info"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 1
});
@ -202,7 +202,7 @@ describe("Location Cache", function() {
"read endpoint should match default endpoint prior to any database account info even if unavailable"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
resourceType: ResourceType.item,
retryCount: 1
});
@ -221,7 +221,7 @@ describe("Location Cache", function() {
"write endpoint should match default endpoint prior to any database account info"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 1
});
@ -246,7 +246,7 @@ describe("Location Cache", function() {
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
resourceType: ResourceType.item,
retryCount: 1
});
@ -269,7 +269,7 @@ describe("Location Cache", function() {
"write endpoint should match default endpoint prior to any database account info"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 1
});
@ -331,7 +331,7 @@ describe("Location Cache", function() {
it(`read request for resolve endpoint, retry count 0, should match read endpoint`, function() {
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
resourceType: ResourceType.item,
retryCount: 0
});
@ -342,7 +342,7 @@ describe("Location Cache", function() {
it(`write request for resolve endpoint, retry count 0, should match write endpoint`, function() {
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 0
});
@ -362,7 +362,7 @@ describe("Location Cache", function() {
"read endpoint should match default endpoint prior to any database account info even if unavailable"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Read,
operationType: OperationType.Read,
resourceType: ResourceType.item,
retryCount: 1
});
@ -378,7 +378,7 @@ describe("Location Cache", function() {
"write endpoint should match default endpoint prior to any database account info"
);
const resolveEndpoint = locationCache.resolveServiceEndpoint({
operationType: Constants.OperationTypes.Replace,
operationType: OperationType.Replace,
resourceType: ResourceType.item,
retryCount: 1
});

Просмотреть файл

@ -1,5 +1,5 @@
import assert from "assert";
import { Constants } from "../../common";
import { Constants, OperationType, ResourceType } from "../../common";
import { CosmosHeaders } from "../../queryExecutionContext/CosmosHeaders";
import { SessionContainer } from "../../session/sessionContainer";
import { SessionContext } from "../../session/SessionContext";
@ -17,8 +17,8 @@ describe("SessionContainer", function() {
isNameBased: true,
resourceId: null,
resourceAddress: "/" + collectionLink + "/",
resourceType: "docs",
operationType: "create"
resourceType: ResourceType.item,
operationType: OperationType.Create
};
const resHeadersNameBased: CosmosHeaders = {};