[core-sse] Fix NodeJS socket behavior (#30707)

### Packages impacted by this PR
@azure/core-sse

### Issues associated with this PR
https://github.com/Azure/azure-sdk-for-js/issues/30414

### Describe the problem that is addressed by this PR
In NodeJS, when the response stream is exhausted and the server closes
the connection, the underlying socket gets set to null. This is
problematic because core-sse is misled by the type of
`http.IncomingMessage.socket` that stipulates the socket is always a
valid object. This PR checks first if the socket is defined before it
closes it.

### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?
N/A

### Are there test cases added in this PR? _(If not, why?)_
Yes!

### Provide a list of related PRs _(if any)_
N/A

### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [x] Added impacted package name to the issue description
- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [x] Added a changelog (if necessary)
This commit is contained in:
Deyaaeldeen Almahallawi 2024-08-09 17:42:36 -07:00 коммит произвёл GitHub
Родитель d19f8fb5e5
Коммит 679254d71e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
11 изменённых файлов: 981 добавлений и 428 удалений

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -8,6 +8,8 @@
### Bugs Fixed
- Fixes a bug when running in NodeJS where we attempt to close the underlying socket despite it being set to null when the stream has been consumed [BUG [#30414](https://github.com/Azure/azure-sdk-for-js/issues/30414)].
### Other Changes
## 2.1.2 (2024-04-09)

Просмотреть файл

@ -12,22 +12,18 @@
"./package.json": "./package.json",
".": {
"browser": {
"source": "./src/index.ts",
"types": "./dist/browser/index.d.ts",
"default": "./dist/browser/index.js"
},
"react-native": {
"source": "./src/index.ts",
"types": "./dist/react-native/index.d.ts",
"default": "./dist/react-native/index.js"
},
"import": {
"source": "./src/index.ts",
"types": "./dist/esm/index.d.ts",
"default": "./dist/esm/index.js"
},
"require": {
"source": "./src/index.ts",
"types": "./dist/commonjs/index.d.ts",
"default": "./dist/commonjs/index.js"
}
@ -73,6 +69,7 @@
"lint:fix": "eslint package.json api-extractor.json src --ext .ts --ext .cts --ext .mts --fix --fix-type [problem,suggestion]",
"lint": "eslint package.json api-extractor.json src --ext .ts --ext .cts --ext .mts",
"pack": "npm pack 2>&1",
"start-server": "tsx test/server/start.mts",
"test:browser": "npm run clean && npm run build && npm run integration-test:browser",
"test:node": "npm run clean && tshy && npm run integration-test:node",
"test": "npm run clean && tshy && npm run unit-test:node && dev-tool run build-test && npm run unit-test:browser && npm run integration-test",
@ -83,18 +80,24 @@
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
"@azure/eslint-plugin-azure-sdk": "^3.0.0",
"@azure/logger": "^1.1.4",
"@azure-tools/test-utils": "^1.0.1",
"@azure-rest/core-client": "^2.2.0",
"@azure-tools/vite-plugin-browser-test-map": "^1.0.0",
"@microsoft/api-extractor": "^7.40.3",
"@types/express": "^4.17.21",
"@types/node": "^18.0.0",
"@vitest/browser": "^1.3.1",
"@vitest/coverage-istanbul": "^1.3.1",
"@vitest/browser": "^2.0.5",
"@vitest/coverage-istanbul": "^2.0.5",
"dotenv": "^16.0.0",
"eslint": "^8.56.0",
"playwright": "^1.41.2",
"rimraf": "^5.0.5",
"tshy": "^1.17.0",
"typescript": "~5.5.3",
"vitest": "^1.3.1"
"express": "^4.19.2",
"playwright": "^1.46.0",
"rimraf": "^6.0.1",
"tshy": "^3.0.2",
"tsx": "^4.17.0",
"typescript": "~5.5.4",
"vitest": "^2.0.5"
},
"dependencies": {
"tslib": "^2.6.2"

Просмотреть файл

@ -3,6 +3,10 @@
import type { IncomingMessage } from "node:http";
type Nullable<T, K extends keyof T> = {
[P in K]: T[P] | null;
} & Omit<T, K>;
export function createStream<T>(
asyncIter: AsyncIterableIterator<T>,
cancel: () => PromiseLike<void>,
@ -60,7 +64,9 @@ function iteratorToStream<T>(
});
}
export function ensureAsyncIterable(stream: IncomingMessage | ReadableStream<Uint8Array>): {
export function ensureAsyncIterable(
stream: Nullable<IncomingMessage, "socket"> | ReadableStream<Uint8Array>,
): {
cancel(): Promise<void>;
iterable: AsyncIterable<Uint8Array>;
} {
@ -73,7 +79,11 @@ export function ensureAsyncIterable(stream: IncomingMessage | ReadableStream<Uin
} else {
return {
cancel: async () => {
stream.socket.end();
/**
* socket is set to null when the stream has been consumed
* so we need to make sure to guard against nullability.
*/
stream.socket?.end();
},
iterable: stream as AsyncIterable<Uint8Array>,
};

Просмотреть файл

@ -5,11 +5,6 @@ import { describe, it, assert, beforeEach } from "vitest";
import { createStream } from "../../src/utils.js";
describe("createStream", () => {
beforeEach((ctx) => {
console.log("Context:", ctx.task.name);
console.log("Suite:", ctx.task.suite.name);
});
const createIter = async function* (): AsyncGenerator<number> {
yield 1;
yield 2;
@ -20,7 +15,7 @@ describe("createStream", () => {
const stream = createStream(createIter(), async () => {
/** nothing needs to be cleaned up */
});
const result = [];
const result: number[] = [];
for await (const item of stream) {
result.push(item);
}

Просмотреть файл

@ -0,0 +1,121 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { createSseStream } from "../../../src/index.js";
import { Client, getClient } from "@azure-rest/core-client";
import { assert, beforeAll, beforeEach, afterEach, describe, it } from "vitest";
import { port } from "../../server/config.mts";
import { IncomingMessage } from "http";
import { matrix } from "@azure-tools/test-utils";
const contentType = "text/event-stream";
function getEndpoint(): string {
return `http://localhost:${port}`;
}
async function sendRequest(
client: Client,
path: string,
abortSignal?: AbortSignal,
): Promise<IncomingMessage> {
const res = await client
.pathUnchecked(path)
.get({ accept: contentType, abortSignal })
.asNodeStream();
if (res.status !== "200") {
throw new Error(`Unexpected status code: ${res.status}`);
}
if (!res.body) {
throw new Error("Expected a readable stream body");
}
const receivedContentType = res.headers["content-type"];
if (!receivedContentType.includes(contentType)) {
throw new Error(`Expected a text/event-stream content but received\"${receivedContentType}\"`);
}
return res.body as IncomingMessage;
}
describe("[Node] Connections", () => {
let client: Client;
let ran: boolean;
beforeAll(async function () {
client = getClient(getEndpoint(), { allowInsecureConnection: true });
});
matrix(
[
[
"/events/no-fin/3",
"/events/no-fin/1",
"/events/1",
"/events/hang",
"/events/extra-newline/3",
"/events/extra-newline/hang",
"/events/extra-event/3",
"/events/extra-event/hang",
],
],
async function (path) {
describe(`${path}`, function () {
beforeEach(async function () {
ran = false;
});
afterEach(async function () {
if (path !== "/events/no-fin/1") {
assert.isTrue(ran);
}
ran = false;
});
it("loop until stream ends and then break", async function () {
let stream: IncomingMessage;
try {
stream = await sendRequest(client, path);
} catch (e) {
assert.equal(path, "/events/no-fin/1");
assert.equal(e.code, "ECONNRESET");
return;
}
const sses = createSseStream(stream);
try {
for await (const sse of sses) {
ran = true;
if (sse.data === "[DONE]") {
if (path.includes("no-fin")) {
assert.isNull(stream.socket);
}
break;
}
}
} catch (e) {
assert.equal(path, "/events/no-fin/3");
assert.equal(e.code, "ECONNRESET");
}
});
it("break early from loop", async function () {
let stream: IncomingMessage;
try {
// sometimes the server gets into a bad state and doesn't respond so we need to timeout
stream = await sendRequest(client, path, AbortSignal.timeout(25000));
} catch (e) {
assert.equal(path, "/events/no-fin/1");
if (e.code) {
assert.equal(e.code, "ECONNRESET");
} else {
assert.match(e.message, /The operation was aborted./);
}
return;
}
const sses = createSseStream(stream);
for await (const _ of sses) {
ran = true;
break;
}
});
});
},
);
});

Просмотреть файл

@ -0,0 +1,5 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
export const port = 3000;
export const waitBetweenEventsInMS = 100;

Просмотреть файл

@ -0,0 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { createClientLogger } from "@azure/logger";
export const logger = createClientLogger("sse-test-server");

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { Response } from "express";
import { logger } from "./logger.mts";
export function sendHeaders(res: Response) {
res.writeHead(200, {
"Cache-Control": "no-cache",
"Content-Type": "text/event-stream",
Connection: "keep-alive",
});
}
export async function sendEvents(res: Response, count: number, waitTimeInMs: number) {
for (let i = 0; i < count; i++) {
await new Promise((resolve) => setTimeout(resolve, waitTimeInMs));
logger.verbose("Emit", ++i);
res.write(`data: ${i}\n\n`);
}
logger.verbose("Emit [DONE]");
res.write(`data: [DONE]\n\n`);
}

Просмотреть файл

@ -0,0 +1,75 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import express from "express";
import { port, waitBetweenEventsInMS } from "./config.mts";
import { logger } from "./logger.mts";
import { sendEvents, sendHeaders } from "./responses.mts";
import { Server } from "http";
async function run(): Promise<Server> {
const app = express();
app.get("/events/:eventCount", async function (req, res) {
logger.info(`Got /events/${req.params.eventCount}`);
sendHeaders(res);
const eventCount = parseInt(req.params.eventCount);
await sendEvents(res, eventCount, waitBetweenEventsInMS);
res.end();
});
app.get("/events/no-fin/:eventCount", async function (req, res) {
logger.info(`Got /events/no-fin/${req.params.eventCount}`);
sendHeaders(res);
const eventCount = parseInt(req.params.eventCount);
await sendEvents(res, eventCount, waitBetweenEventsInMS);
res.socket?.destroy();
//res.end();
});
app.get("/events/hang", async function (_, res) {
logger.info("Got /events/hang");
sendHeaders(res);
await sendEvents(res, 0, waitBetweenEventsInMS);
});
app.get("/events/extra-newline/:eventCount", async function (req, res) {
logger.info(`Got /events/extra-newline/${req.params.eventCount}`);
sendHeaders(res);
const eventCount = parseInt(req.params.eventCount);
await sendEvents(res, eventCount, waitBetweenEventsInMS);
res.end("\n");
});
app.get("/events/extra-newline/hang", async function (req, res) {
logger.info(`Got /events/extra-newline/hang`);
sendHeaders(res);
await sendEvents(res, 0, waitBetweenEventsInMS);
res.write("\n");
});
app.get("/events/extra-event/:eventCount", async function (req, res) {
logger.info(`Got /events/extra-event/${req.params.eventCount}`);
sendHeaders(res);
const eventCount = parseInt(req.params.eventCount);
await sendEvents(res, eventCount, waitBetweenEventsInMS);
res.end(`data: truly done this time :)\n\n`);
});
app.get("/events/extra-event/hang", async function (req, res) {
logger.info(`Got /events/hang/extra-event`);
sendHeaders(res);
await sendEvents(res, 0, waitBetweenEventsInMS);
res.write(`data: truly done this time :)\n\n`);
});
return app.listen(port);
}
export default async function () {
const server = await run();
logger.info(`Listening on port ${port}`);
return function () {
server.close();
};
}

Просмотреть файл

@ -5,13 +5,16 @@ import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
reporters: ["basic", "junit"],
reporters: ["verbose", "junit"],
fileParallelism: false,
testTimeout: 30000,
typecheck: {
enabled: true,
},
globalSetup: "test/server/start.mts",
outputFile: {
junit: "test-results.xml",
},
fakeTimers: {
toFake: ["setTimeout", "Date"],
},
watch: false,
include: ["test/**/*.spec.ts"],
exclude: ["test/**/browser/*.spec.ts"],