fix: allow ClientRequest responses to be throttled (#25531)

* fix: allow net streams to be throttled [WIP]

* fix: handle resume throttling within IncomingMessage [WIP]

* fix: fix urlLoader typing, add throttle test

* fix: fix lint and increase test timeout for Linux

* fix: increase test chunk limit to 20 and timeout to 2000

Co-authored-by: Jeremy Rose <nornagon@nornagon.net>
This commit is contained in:
Keeley Hammond 2020-10-05 17:47:41 -07:00 коммит произвёл GitHub
Родитель 53ee708fe8
Коммит 6356cd4018
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 40 добавлений и 8 удалений

Просмотреть файл

@ -38,12 +38,14 @@ class IncomingMessage extends Readable {
_shouldPush: boolean; _shouldPush: boolean;
_data: (Buffer | null)[]; _data: (Buffer | null)[];
_responseHead: NodeJS.ResponseHead; _responseHead: NodeJS.ResponseHead;
_resume: (() => void) | null;
constructor (responseHead: NodeJS.ResponseHead) { constructor (responseHead: NodeJS.ResponseHead) {
super(); super();
this._shouldPush = false; this._shouldPush = false;
this._data = []; this._data = [];
this._responseHead = responseHead; this._responseHead = responseHead;
this._resume = null;
} }
get statusCode () { get statusCode () {
@ -103,7 +105,9 @@ class IncomingMessage extends Readable {
throw new Error('HTTP trailers are not supported'); throw new Error('HTTP trailers are not supported');
} }
_storeInternalData (chunk: Buffer | null) { _storeInternalData (chunk: Buffer | null, resume: (() => void) | null) {
// save the network callback for use in _pushInternalData
this._resume = resume;
this._data.push(chunk); this._data.push(chunk);
this._pushInternalData(); this._pushInternalData();
} }
@ -113,6 +117,12 @@ class IncomingMessage extends Readable {
const chunk = this._data.shift(); const chunk = this._data.shift();
this._shouldPush = this.push(chunk); this._shouldPush = this.push(chunk);
} }
if (this._shouldPush && this._resume) {
this._resume();
// Reset the callback, so that a new one is used for each
// batch of throttled data
this._resume = null;
}
} }
_read () { _read () {
@ -404,11 +414,11 @@ export class ClientRequest extends Writable implements Electron.ClientRequest {
const response = this._response = new IncomingMessage(responseHead); const response = this._response = new IncomingMessage(responseHead);
this.emit('response', response); this.emit('response', response);
}); });
this._urlLoader.on('data', (event, data) => { this._urlLoader.on('data', (event, data, resume) => {
this._response!._storeInternalData(Buffer.from(data)); this._response!._storeInternalData(Buffer.from(data), resume);
}); });
this._urlLoader.on('complete', () => { this._urlLoader.on('complete', () => {
if (this._response) { this._response._storeInternalData(null); } if (this._response) { this._response._storeInternalData(null, null); }
}); });
this._urlLoader.on('error', (event, netErrorString) => { this._urlLoader.on('error', (event, netErrorString) => {
const error = new Error(netErrorString); const error = new Error(netErrorString);

Просмотреть файл

@ -458,8 +458,8 @@ void SimpleURLLoaderWrapper::OnDataReceived(base::StringPiece string_piece,
auto array_buffer = v8::ArrayBuffer::New(isolate, string_piece.size()); auto array_buffer = v8::ArrayBuffer::New(isolate, string_piece.size());
auto backing_store = array_buffer->GetBackingStore(); auto backing_store = array_buffer->GetBackingStore();
memcpy(backing_store->Data(), string_piece.data(), string_piece.size()); memcpy(backing_store->Data(), string_piece.data(), string_piece.size());
Emit("data", array_buffer); Emit("data", array_buffer,
std::move(resume).Run(); base::AdaptCallbackForRepeating(std::move(resume)));
} }
void SimpleURLLoaderWrapper::OnComplete(bool success) { void SimpleURLLoaderWrapper::OnComplete(bool success) {

Просмотреть файл

@ -4,7 +4,7 @@ import * as http from 'http';
import * as url from 'url'; import * as url from 'url';
import { AddressInfo, Socket } from 'net'; import { AddressInfo, Socket } from 'net';
import { emittedOnce } from './events-helpers'; import { emittedOnce } from './events-helpers';
import { defer } from './spec-helpers'; import { defer, delay } from './spec-helpers';
const kOneKiloByte = 1024; const kOneKiloByte = 1024;
const kOneMegaByte = kOneKiloByte * kOneKiloByte; const kOneMegaByte = kOneKiloByte * kOneKiloByte;
@ -1476,6 +1476,28 @@ describe('net module', () => {
await collectStreamBody(nodeResponse); await collectStreamBody(nodeResponse);
expect(nodeRequestProcessed).to.equal(true); expect(nodeRequestProcessed).to.equal(true);
}); });
it('should correctly throttle an incoming stream', async () => {
let numChunksSent = 0;
const serverUrl = await respondOnce.toSingleURL((request, response) => {
const data = randomString(kOneMegaByte);
const write = () => {
let ok = true;
do {
numChunksSent++;
if (numChunksSent > 30) return;
ok = response.write(data);
} while (ok);
response.once('drain', write);
};
write();
});
const urlRequest = net.request(serverUrl);
urlRequest.on('response', () => {});
urlRequest.end();
await delay(2000);
expect(numChunksSent).to.be.at.most(20);
});
}); });
describe('Stability and performance', () => { describe('Stability and performance', () => {

2
typings/internal-ambient.d.ts поставляемый
Просмотреть файл

@ -120,7 +120,7 @@ declare namespace NodeJS {
interface URLLoader extends EventEmitter { interface URLLoader extends EventEmitter {
cancel(): void; cancel(): void;
on(eventName: 'data', listener: (event: any, data: ArrayBuffer) => void): this; on(eventName: 'data', listener: (event: any, data: ArrayBuffer, resume: () => void) => void): this;
on(eventName: 'response-started', listener: (event: any, finalUrl: string, responseHead: ResponseHead) => void): this; on(eventName: 'response-started', listener: (event: any, finalUrl: string, responseHead: ResponseHead) => void): this;
on(eventName: 'complete', listener: (event: any) => void): this; on(eventName: 'complete', listener: (event: any) => void): this;
on(eventName: 'error', listener: (event: any, netErrorString: string) => void): this; on(eventName: 'error', listener: (event: any, netErrorString: string) => void): this;