From 5abc72c71aef0c5c57bb64f2df0256dd98ea446a Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Tue, 8 Oct 2024 09:54:54 -0700 Subject: [PATCH] [core] fix isStreamComplete issue (#31270) Extract @MaryGao's fix from PR https://github.com/Azure/azure-sdk-for-js/pull/31027 Fix an issue in `isStreamComplete` where the method never resolves when the stream is not readable. --------- Co-authored-by: Mary Gao --- sdk/core/core-rest-pipeline/CHANGELOG.md | 2 + .../core-rest-pipeline/src/nodeHttpClient.ts | 5 +- .../test/node/nodeHttpClient.spec.ts | 47 ++++++++++++++++++ .../ts-http-runtime/src/nodeHttpClient.ts | 4 ++ .../test/node/nodeHttpClient.spec.ts | 48 ++++++++++++++++++- 5 files changed, 104 insertions(+), 2 deletions(-) diff --git a/sdk/core/core-rest-pipeline/CHANGELOG.md b/sdk/core/core-rest-pipeline/CHANGELOG.md index 122264d9ef7..a25e821ada4 100644 --- a/sdk/core/core-rest-pipeline/CHANGELOG.md +++ b/sdk/core/core-rest-pipeline/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fix an issue in `isStreamComplete` where the method never resolves if the stream is not readable. + ### Other Changes ## 1.17.0 (2024-09-12) diff --git a/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts b/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts index f4a81d44f93..1ac007aef9d 100644 --- a/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts +++ b/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts @@ -27,6 +27,10 @@ function isReadableStream(body: any): body is NodeJS.ReadableStream { } function isStreamComplete(stream: NodeJS.ReadableStream): Promise { + if (stream.readable === false) { + return Promise.resolve(); + } + return new Promise((resolve) => { const handler = (): void => { resolve(); @@ -184,7 +188,6 @@ class NodeHttpClient implements HttpClient { if (isReadableStream(responseStream)) { downloadStreamDone = isStreamComplete(responseStream); } - Promise.all([uploadStreamDone, downloadStreamDone]) .then(() => { // eslint-disable-next-line promise/always-return diff --git a/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts b/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts index f3321527193..f98954bfe9a 100644 --- a/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts +++ b/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts @@ -4,6 +4,8 @@ import { assert, describe, it, vi, beforeEach, afterEach } from "vitest"; import { PassThrough, Writable } from "stream"; import type { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http"; +import { AbortSignalLike } from "@azure/abort-controller"; +import { delay } from "@azure/core-util"; import { createDefaultHttpClient, createPipelineRequest } from "../../src/index.js"; vi.mock("https", async () => { @@ -453,4 +455,49 @@ describe("NodeHttpClient", function () { assert.strictEqual(e.name, "AbortError"); } }); + + it("should release abort listener when stream body ends already", async function () { + vi.useRealTimers(); + const client = createDefaultHttpClient(); + const writable = new Writable({ + write: (_chunk, _, next) => { + next(); + }, + }) as unknown as ClientRequest; + vi.mocked(https.request).mockReturnValueOnce(writable); + + const controller = new AbortController(); + let listenerRemoved = false; + const abortSignal: AbortSignalLike = { + aborted: false, + addEventListener: function ( + _type: "abort", + listener: (this: AbortSignalLike, ev: any) => any, + options?: any, + ): void { + controller.signal.addEventListener("abort", listener, options); + }, + removeEventListener: function ( + _type: "abort", + listener: (this: AbortSignalLike, ev: any) => any, + options?: any, + ): void { + listenerRemoved = true; + controller.signal.removeEventListener("abort", listener, options); + }, + }; + + const stream = new PassThrough(); + stream.end(); + const body = stream; + const request = createPipelineRequest({ + url: "https://example.com", + body, + abortSignal, + }); + const promise = client.sendRequest(request); + yieldHttpsResponse(createResponse(200)); + await Promise.all([promise, delay(10)]); + assert.equal(listenerRemoved, true); + }); }); diff --git a/sdk/core/ts-http-runtime/src/nodeHttpClient.ts b/sdk/core/ts-http-runtime/src/nodeHttpClient.ts index a1a6419460a..501b03ebf9c 100644 --- a/sdk/core/ts-http-runtime/src/nodeHttpClient.ts +++ b/sdk/core/ts-http-runtime/src/nodeHttpClient.ts @@ -27,6 +27,10 @@ function isReadableStream(body: any): body is NodeJS.ReadableStream { } function isStreamComplete(stream: NodeJS.ReadableStream): Promise { + if (stream.readable === false) { + return Promise.resolve(); + } + return new Promise((resolve) => { stream.on("close", resolve); stream.on("end", resolve); diff --git a/sdk/core/ts-http-runtime/test/node/nodeHttpClient.spec.ts b/sdk/core/ts-http-runtime/test/node/nodeHttpClient.spec.ts index c2b9d45a859..6ab1ae6c4f1 100644 --- a/sdk/core/ts-http-runtime/test/node/nodeHttpClient.spec.ts +++ b/sdk/core/ts-http-runtime/test/node/nodeHttpClient.spec.ts @@ -4,7 +4,8 @@ import { describe, it, assert, vi, beforeEach, afterEach } from "vitest"; import { PassThrough, Writable } from "node:stream"; import { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http"; -import { createDefaultHttpClient, createPipelineRequest } from "../../src/index.js"; +import { AbortSignalLike } from "../../src/abort-controller/AbortSignalLike.js"; +import { createDefaultHttpClient, createPipelineRequest, delay } from "../../src/index.js"; vi.mock("https", async () => { const actual = await vi.importActual("https"); @@ -453,4 +454,49 @@ describe("NodeHttpClient", function () { assert.strictEqual(e.name, "AbortError"); } }); + + it("should release abort listener when stream body ends already", async function () { + vi.useRealTimers(); + const client = createDefaultHttpClient(); + const writable = new Writable({ + write: (_chunk, _, next) => { + next(); + }, + }) as unknown as ClientRequest; + vi.mocked(https.request).mockReturnValueOnce(writable); + + const controller = new AbortController(); + let listenerRemoved = false; + const abortSignal: AbortSignalLike = { + aborted: false, + addEventListener: function ( + _type: "abort", + listener: (this: AbortSignalLike, ev: any) => any, + options?: any, + ): void { + controller.signal.addEventListener("abort", listener, options); + }, + removeEventListener: function ( + _type: "abort", + listener: (this: AbortSignalLike, ev: any) => any, + options?: any, + ): void { + listenerRemoved = true; + controller.signal.removeEventListener("abort", listener, options); + }, + }; + + const stream = new PassThrough(); + stream.end(); + const body = stream; + const request = createPipelineRequest({ + url: "https://example.com", + body, + abortSignal, + }); + const promise = client.sendRequest(request); + yieldHttpsResponse(createResponse(200)); + await Promise.all([promise, delay(10)]); + assert.equal(listenerRemoved, true); + }); });