[Cosmos] Fix retry handling for Session Not Found errors (#17034)

This commit is contained in:
Steve Faulkner 2021-08-23 12:04:15 -07:00 коммит произвёл GitHub
Родитель 2bd2e7703a
Коммит d630c5f4f0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 74 добавлений и 362 удалений

Просмотреть файл

@ -8,6 +8,8 @@
### Bugs Fixed
- Fixed bugs in session token clearing logic. Session Not found (404, substatus 1002) was not being handled correctly by the session retry policy and would mistakenly retry the request with the same session token.
### Other Changes
## 3.13.0 (2021-08-10)
@ -41,8 +43,10 @@
## 3.12.0 (2021-07-06)
### Features Added
- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.
- Added background refresher for endpoints, and new `ConnectionPolicy` options. Refreshing defaults to true, and the default refresh rate is every 5 minutes.
```js
const client = new CosmosClient({
endpoint,
@ -52,13 +56,14 @@ const client = new CosmosClient({
endpointRefreshRateInMs: 700,
enableBackgroundEndpointRefreshing: true
}
})
});
```
- Added `client.dispose()` for closing the endpoint refresher verbosely. Necessary when destroying the CosmosClient inside existing processes like an express web server, or when you want to destroy the client and create a new one in the same process.
```js
const client = new CosmosClient()
client.dispose() // cancels background endpoint refreshing
const client = new CosmosClient();
client.dispose(); // cancels background endpoint refreshing
```
## 3.11.5 (2021-06-10)

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
export interface RetryContext {
retryCount?: number;
retryCount: number;
retryRequestOnPreferredLocations?: boolean;
clearSessionTokenNotAvailable?: boolean;
}

Просмотреть файл

@ -36,7 +36,7 @@ interface RetryPolicies {
* @hidden
*/
export async function execute({
retryContext = {},
retryContext = { retryCount: 0 },
retryPolicies,
requestContext,
executeRequest
@ -64,6 +64,7 @@ export async function execute({
}
if (retryContext && retryContext.clearSessionTokenNotAvailable) {
requestContext.client.clearSessionToken(requestContext.path);
delete requestContext.headers["x-ms-session-token"];
}
requestContext.endpoint = await requestContext.globalEndpointManager.resolveServiceEndpoint(
requestContext.resourceType,

Просмотреть файл

@ -56,7 +56,8 @@ export class SessionRetryPolicy implements RetryPolicy {
if (this.currentRetryAttemptCount > endpoints.length) {
return false;
} else {
retryContext.retryCount = ++this.currentRetryAttemptCount - 1;
this.currentRetryAttemptCount++;
retryContext.retryCount++;
retryContext.retryRequestOnPreferredLocations = this.currentRetryAttemptCount > 1;
retryContext.clearSessionTokenNotAvailable =
this.currentRetryAttemptCount === endpoints.length;
@ -66,7 +67,8 @@ export class SessionRetryPolicy implements RetryPolicy {
if (this.currentRetryAttemptCount > 1) {
return false;
} else {
retryContext.retryCount = ++this.currentRetryAttemptCount - 1;
this.currentRetryAttemptCount++;
retryContext.retryCount++;
retryContext.retryRequestOnPreferredLocations = false; // Forces all operations to primary write endpoint
retryContext.clearSessionTokenNotAvailable = true;
return true;

Просмотреть файл

@ -1,37 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import assert from "assert";
import { Context } from "mocha";
import { Suite } from "mocha";
import * as sinon from "sinon";
import { ClientContext, PluginConfig, PluginOn } from "../../src";
import { OperationType, ResourceType, trimSlashes } from "../../src/common";
import { ClientContext, Container, PluginConfig, PluginOn } from "../../src";
import { OperationType, ResourceType } from "../../src/common";
import { ConsistencyLevel } from "../../src";
import { Constants, CosmosClient } from "../../src";
import { CosmosClient } from "../../src";
import { SessionContainer } from "../../src/session/sessionContainer";
import { VectorSessionToken } from "../../src/session/VectorSessionToken";
import { endpoint, masterKey } from "../public/common/_testConfig";
import { getTestDatabase, removeAllDatabases } from "../public/common/TestHelpers";
import * as RequestHandler from "../../src/request/RequestHandler";
import { addEntropy, getTestDatabase, removeAllDatabases } from "../public/common/TestHelpers";
import { RequestContext } from "../../src";
import { Response } from "../../src/request/Response";
// TODO: there is alot of "any" types for tokens here
// TODO: there is alot of leaky document client stuff here that will make removing document client hard
const client = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session,
connectionPolicy: { enableBackgroundEndpointRefreshing: false }
});
function getCollection2TokenMap(
sessionContainer: SessionContainer
): Map<string, Map<string, VectorSessionToken>> {
return (sessionContainer as any).collectionResourceIdToSessionTokens;
}
describe("New session token", function() {
it("preserves tokens", async function() {
let response: Response<any>;
@ -88,348 +68,72 @@ describe("New session token", function() {
});
});
// For some reason this test does not pass against the emulator. Skipping it for now
describe.skip("Session Token", function(this: Suite) {
this.timeout(process.env.MOCHA_TIMEOUT || 20000);
const containerId = "sessionTestColl";
const containerDefinition = {
id: containerId,
partitionKey: { paths: ["/id"] }
};
const containerOptions = { offerThroughput: 25100 };
const clientContext: ClientContext = (client as any).clientContext;
const sessionContainer: SessionContainer = (clientContext as any).sessionContainer;
const spy = sinon.spy(RequestHandler, "request");
beforeEach(async function() {
await removeAllDatabases();
});
it("validate session tokens for sequence of operations", async function() {
const database = await getTestDatabase("session test", client);
const { resource: createdContainerDef } = await database.containers.create(
containerDefinition,
containerOptions
);
const container = database.container(createdContainerDef.id);
assert.equal(spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken], undefined);
// TODO: testing implementation detail by looking at containerResourceIdToSesssionTokens
let collRid2SessionToken: Map<
string,
Map<string, VectorSessionToken>
> = (sessionContainer as any).collectionResourceIdToSessionTokens;
assert.equal(collRid2SessionToken.size, 0, "Should have no tokens in container");
const { resource: document1 } = await container.items.create({ id: "1" });
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
undefined,
"Initial create token should be qual"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
const containerRid = collRid2SessionToken.keys().next().value;
let containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 1, "Should only have one partition in container");
const firstPartition = containerTokens.keys().next().value;
let firstPartitionToken = containerTokens.get(firstPartition);
assert.notEqual(firstPartitionToken, "Should have a token for first partition");
const token = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Create,
resourceAddress: container.url,
resourceType: ResourceType.item,
resourceId: "2"
});
const { resource: document2 } = await container.items.create({ id: "2" });
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
token,
"create token should be equal"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 2, "Should have two partitions in container");
const keysIterator = containerTokens.keys();
keysIterator.next(); // partition 1
const secondPartition = keysIterator.next().value;
assert.equal(
containerTokens.get(firstPartition).toString(),
firstPartitionToken.toString(),
"First partition token should still match after create"
);
let secondPartitionToken = containerTokens.get(secondPartition);
assert(secondPartitionToken, "Should have a LSN for second partition");
const readToken = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Read,
resourceAddress: container.url,
resourceType: ResourceType.item,
resourceId: "1"
});
await container.item(document1.id, "1").read();
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
readToken,
"read token should be equal"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 2, "Should have two partitions in container");
assert.equal(
containerTokens.get(firstPartition).toString(),
firstPartitionToken.toString(),
"First partition token should still match after read"
);
assert.equal(
containerTokens.get(secondPartition).toString(),
secondPartitionToken.toString(),
"Second partition token should still match after read"
);
const upsertToken = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Upsert,
resourceAddress: container.url,
resourceType: ResourceType.item,
resourceId: "1"
});
const { resource: document13 } = await container.items.upsert({ id: "1", operation: "upsert" });
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
upsertToken,
"upsert token should be equal"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 2, "Should have two partitions in container");
// TODO: should validate the LSN only increased by 1...
assert.notEqual(
containerTokens.get(firstPartition).toString(),
firstPartitionToken.toString(),
"First partition token should no longer match after upsert"
);
assert.equal(
containerTokens.get(secondPartition).toString(),
secondPartitionToken.toString(),
"Second partition token should still match after upsert"
);
firstPartitionToken = containerTokens.get(firstPartition);
const deleteToken = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Delete,
resourceAddress: container.url,
resourceType: ResourceType.item,
resourceId: "2"
});
await container.item(document2.id, "2").delete();
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
deleteToken,
"delete token should be equal"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 2, "Should have two partitions in container");
assert.equal(
containerTokens.get(firstPartition).toString(),
firstPartitionToken.toString(),
"First partition token should still match delete"
);
// TODO: should validate the LSN only increased by 1...
assert.notEqual(
containerTokens.get(secondPartition).toString(),
secondPartitionToken.toString(),
"Second partition token should not match after delete"
);
secondPartitionToken = containerTokens.get(secondPartition);
const replaceToken = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Replace,
resourceAddress: container.url,
resourceType: ResourceType.item,
resourceId: "1"
});
await container.item(document13.id, "1").replace({ id: "1", operation: "replace" });
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
replaceToken,
"replace token should be equal"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 2, "Should have two partitions in container");
// TODO: should validate the LSN only increased by 1...
assert.notEqual(
containerTokens.get(firstPartition).toString(),
firstPartitionToken.toString(),
"First partition token should no longer match after replace"
);
assert.equal(
containerTokens.get(secondPartition).toString(),
secondPartitionToken.toString(),
"Second partition token should still match after replace"
);
firstPartitionToken = containerTokens.get(firstPartition);
const query = `SELECT * from c WHERE c.id = "1"`;
const queryIterator = container.items.query(query);
const queryToken = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Query,
resourceAddress: container.url,
resourceType: ResourceType.item
});
await queryIterator.fetchAll();
assert.equal(spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken], queryToken);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(
collRid2SessionToken.size,
1,
"Should only have one container in the sessioncontainer"
);
containerTokens = collRid2SessionToken.get(containerRid);
assert.equal(containerTokens.size, 2, "Should have two partitions in container");
assert.equal(
containerTokens.get(firstPartition).toString(),
firstPartitionToken.toString(),
"First partition token should still match after query"
);
assert.equal(
containerTokens.get(secondPartition).toString(),
secondPartitionToken.toString(),
"Second partition token should still match after query"
);
const deleteContainerToken = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Delete,
resourceAddress: container.url,
resourceType: ResourceType.container,
resourceId: container.id
});
await container.delete();
assert.equal(
spy.lastCall.args[0].headers[Constants.HttpHeaders.SessionToken],
deleteContainerToken,
"delete container token should match"
);
collRid2SessionToken = getCollection2TokenMap(sessionContainer);
assert.equal(collRid2SessionToken.size, 0, "collRid map should be empty on container delete");
spy.restore();
});
it("validate 'lsn not caught up' error for higher lsn and clearing session token", async function(this: Context) {
this.retries(2);
const database = await getTestDatabase("session test", client);
const containerLink = "dbs/" + database.id + "/colls/" + containerId;
const increaseLSN = function(oldTokens: Map<string, Map<string, VectorSessionToken>>): string {
for (const [, tokens] of oldTokens.entries()) {
for (const [pk, token] of tokens.entries()) {
(token as any).globalLsn = (token as any).globalLsn + 200;
const newToken = token.merge(token);
return `${pk}:${newToken.toString()}`;
}
}
throw new Error("No valid token found to increase");
};
await database.containers.create(containerDefinition, containerOptions);
const container = database.container(containerDefinition.id);
await container.items.create({ id: "1" });
const callbackSpy = sinon.spy(function(requestContext: RequestContext) {
const oldTokens = getCollection2TokenMap(sessionContainer);
requestContext.headers[Constants.HttpHeaders.SessionToken] = increaseLSN(oldTokens);
});
const applySessionTokenStub = sinon
.stub(clientContext as any, "applySessionToken")
.callsFake(callbackSpy as any);
const resp = await container.item("1", "1").read();
assert.equal(resp.resource, undefined);
assert.equal(resp.substatus, 1002, "Substatus should indicate the LSN didn't catchup.");
assert.equal(callbackSpy.callCount, 1);
assert.equal(trimSlashes(callbackSpy.lastCall.args[0].path), containerLink + "/docs/1");
applySessionTokenStub.restore();
await container.item("1", "1").read();
});
it("validate session container update on 'Not found' with 'undefined' status code for non master resource", async function() {
const client2 = new CosmosClient({
it("retries session not found successfully", async function() {
const clientA = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session,
connectionPolicy: { enableBackgroundEndpointRefreshing: false }
});
const db = await getTestDatabase("session test", client);
const { resource: createdContainerDef } = await db.containers.create(
containerDefinition,
containerOptions
);
const createdContainer = db.container(createdContainerDef.id);
const { resource: createdDocument } = await createdContainer.items.create({
id: "1"
// Create a second client with a plugin that simulates "Session Not Found" error
const clientB = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session,
connectionPolicy: { enableBackgroundEndpointRefreshing: false },
plugins: [
{
on: "request",
plugin: async (context, next) => {
// Simulate a "Session Not Found" error by manually making the client session token *way* ahead of any available session on the server
// This is just a way to simulate the error. Getting this to happen in practice is difficult and only usually occurs cross region where there is significant replication lag
if (context.headers["x-ms-session-token"]) {
context.headers["x-ms-session-token"] = "0:0#900000#3=8600000#10=-1";
}
console.log(context.method, context.path, context.headers["x-ms-session-token"]);
const repsonse = await next(context);
console.log(repsonse.code, repsonse.substatus);
return repsonse;
}
}
]
});
await client2
.database(db.id)
.container(createdContainerDef.id)
.item(createdDocument.id, "1")
.delete();
const setSessionTokenSpy = sinon.spy(sessionContainer, "set");
const resp = await createdContainer.item(createdDocument.id, "1").read();
assert.equal(resp.resource, undefined);
assert.equal(resp.statusCode, 404, "expecting 404 (Not found)");
assert.equal(resp.substatus, undefined, "expecting substatus code to be undefined");
assert.equal(setSessionTokenSpy.callCount, 1, "unexpected number of calls to sesSessionToken");
setSessionTokenSpy.restore();
const dbId = addEntropy("sessionTestDB");
const containerId = addEntropy("sessionTestContainer");
// Create Database and Container
const { database } = await clientA.databases.createIfNotExists({
id: dbId
});
const { container } = await database.containers.createIfNotExists({
id: containerId
});
// Create items using both clients so they each establish a session with the backend
const container2 = clientB.database(dbId).container(containerId);
await Promise.all([createItem(container), createItem(container2)]);
// Create an item using client
const id = await createItem(container);
const { resource, statusCode } = await container2.item(id).read();
console.log(statusCode, resource);
assert.ok(resource);
assert.strictEqual(statusCode, 200);
});
});
async function createItem(container: Container) {
const {
resource: { id }
} = await container.items.create({
id: (Math.random() + 1).toString(36).substring(7)
});
return id;
}