[core] fix isStreamComplete issue (#31270)

Extract @MaryGao's fix from PR
https://github.com/Azure/azure-sdk-for-js/pull/31027

Fix an issue in `isStreamComplete` where the method never resolves when
the stream is not readable.

---------

Co-authored-by: Mary Gao <yanmeigao1210@gmail.com>
This commit is contained in:
Jeremy Meng 2024-10-08 09:54:54 -07:00 коммит произвёл GitHub
Родитель c7378eef41
Коммит 5abc72c71a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 104 добавлений и 2 удалений

Просмотреть файл

@ -8,6 +8,8 @@
### Bugs Fixed
- Fix an issue in `isStreamComplete` where the method never resolves if the stream is not readable.
### Other Changes
## 1.17.0 (2024-09-12)

Просмотреть файл

@ -27,6 +27,10 @@ function isReadableStream(body: any): body is NodeJS.ReadableStream {
}
function isStreamComplete(stream: NodeJS.ReadableStream): Promise<void> {
if (stream.readable === false) {
return Promise.resolve();
}
return new Promise((resolve) => {
const handler = (): void => {
resolve();
@ -184,7 +188,6 @@ class NodeHttpClient implements HttpClient {
if (isReadableStream(responseStream)) {
downloadStreamDone = isStreamComplete(responseStream);
}
Promise.all([uploadStreamDone, downloadStreamDone])
.then(() => {
// eslint-disable-next-line promise/always-return

Просмотреть файл

@ -4,6 +4,8 @@
import { assert, describe, it, vi, beforeEach, afterEach } from "vitest";
import { PassThrough, Writable } from "stream";
import type { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http";
import { AbortSignalLike } from "@azure/abort-controller";
import { delay } from "@azure/core-util";
import { createDefaultHttpClient, createPipelineRequest } from "../../src/index.js";
vi.mock("https", async () => {
@ -453,4 +455,49 @@ describe("NodeHttpClient", function () {
assert.strictEqual(e.name, "AbortError");
}
});
it("should release abort listener when stream body ends already", async function () {
vi.useRealTimers();
const client = createDefaultHttpClient();
const writable = new Writable({
write: (_chunk, _, next) => {
next();
},
}) as unknown as ClientRequest;
vi.mocked(https.request).mockReturnValueOnce(writable);
const controller = new AbortController();
let listenerRemoved = false;
const abortSignal: AbortSignalLike = {
aborted: false,
addEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
controller.signal.addEventListener("abort", listener, options);
},
removeEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
listenerRemoved = true;
controller.signal.removeEventListener("abort", listener, options);
},
};
const stream = new PassThrough();
stream.end();
const body = stream;
const request = createPipelineRequest({
url: "https://example.com",
body,
abortSignal,
});
const promise = client.sendRequest(request);
yieldHttpsResponse(createResponse(200));
await Promise.all([promise, delay(10)]);
assert.equal(listenerRemoved, true);
});
});

Просмотреть файл

@ -27,6 +27,10 @@ function isReadableStream(body: any): body is NodeJS.ReadableStream {
}
function isStreamComplete(stream: NodeJS.ReadableStream): Promise<void> {
if (stream.readable === false) {
return Promise.resolve();
}
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);

Просмотреть файл

@ -4,7 +4,8 @@
import { describe, it, assert, vi, beforeEach, afterEach } from "vitest";
import { PassThrough, Writable } from "node:stream";
import { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http";
import { createDefaultHttpClient, createPipelineRequest } from "../../src/index.js";
import { AbortSignalLike } from "../../src/abort-controller/AbortSignalLike.js";
import { createDefaultHttpClient, createPipelineRequest, delay } from "../../src/index.js";
vi.mock("https", async () => {
const actual = await vi.importActual("https");
@ -453,4 +454,49 @@ describe("NodeHttpClient", function () {
assert.strictEqual(e.name, "AbortError");
}
});
it("should release abort listener when stream body ends already", async function () {
vi.useRealTimers();
const client = createDefaultHttpClient();
const writable = new Writable({
write: (_chunk, _, next) => {
next();
},
}) as unknown as ClientRequest;
vi.mocked(https.request).mockReturnValueOnce(writable);
const controller = new AbortController();
let listenerRemoved = false;
const abortSignal: AbortSignalLike = {
aborted: false,
addEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
controller.signal.addEventListener("abort", listener, options);
},
removeEventListener: function (
_type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any,
): void {
listenerRemoved = true;
controller.signal.removeEventListener("abort", listener, options);
},
};
const stream = new PassThrough();
stream.end();
const body = stream;
const request = createPipelineRequest({
url: "https://example.com",
body,
abortSignal,
});
const promise = client.sendRequest(request);
yieldHttpsResponse(createResponse(200));
await Promise.all([promise, delay(10)]);
assert.equal(listenerRemoved, true);
});
});