From 6356cd4018bd80f531085ed6afd1cccb0def30ae Mon Sep 17 00:00:00 2001 From: Keeley Hammond Date: Mon, 5 Oct 2020 17:47:41 -0700 Subject: [PATCH] fix: allow ClientRequest responses to be throttled (#25531) * fix: allow net streams to be throttled [WIP] * fix: handle resume throttling within IncomingMessage [WIP] * fix: fix urlLoader typing, add throttle test * fix: fix lint and increase test timeout for Linux * fix: increase test chunk limit to 20 and timeout to 2000 Co-authored-by: Jeremy Rose --- lib/browser/api/net.ts | 18 +++++++++++---- shell/browser/api/electron_api_url_loader.cc | 4 ++-- spec-main/api-net-spec.ts | 24 +++++++++++++++++++- typings/internal-ambient.d.ts | 2 +- 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/lib/browser/api/net.ts b/lib/browser/api/net.ts index 7dfa2ee736..21cf730d45 100644 --- a/lib/browser/api/net.ts +++ b/lib/browser/api/net.ts @@ -38,12 +38,14 @@ class IncomingMessage extends Readable { _shouldPush: boolean; _data: (Buffer | null)[]; _responseHead: NodeJS.ResponseHead; + _resume: (() => void) | null; constructor (responseHead: NodeJS.ResponseHead) { super(); this._shouldPush = false; this._data = []; this._responseHead = responseHead; + this._resume = null; } get statusCode () { @@ -103,7 +105,9 @@ class IncomingMessage extends Readable { throw new Error('HTTP trailers are not supported'); } - _storeInternalData (chunk: Buffer | null) { + _storeInternalData (chunk: Buffer | null, resume: (() => void) | null) { + // save the network callback for use in _pushInternalData + this._resume = resume; this._data.push(chunk); this._pushInternalData(); } @@ -113,6 +117,12 @@ class IncomingMessage extends Readable { const chunk = this._data.shift(); this._shouldPush = this.push(chunk); } + if (this._shouldPush && this._resume) { + this._resume(); + // Reset the callback, so that a new one is used for each + // batch of throttled data + this._resume = null; + } } _read () { @@ -404,11 +414,11 @@ export class ClientRequest extends Writable implements Electron.ClientRequest { const response = this._response = new IncomingMessage(responseHead); this.emit('response', response); }); - this._urlLoader.on('data', (event, data) => { - this._response!._storeInternalData(Buffer.from(data)); + this._urlLoader.on('data', (event, data, resume) => { + this._response!._storeInternalData(Buffer.from(data), resume); }); this._urlLoader.on('complete', () => { - if (this._response) { this._response._storeInternalData(null); } + if (this._response) { this._response._storeInternalData(null, null); } }); this._urlLoader.on('error', (event, netErrorString) => { const error = new Error(netErrorString); diff --git a/shell/browser/api/electron_api_url_loader.cc b/shell/browser/api/electron_api_url_loader.cc index df344e2048..7f77346849 100644 --- a/shell/browser/api/electron_api_url_loader.cc +++ b/shell/browser/api/electron_api_url_loader.cc @@ -458,8 +458,8 @@ void SimpleURLLoaderWrapper::OnDataReceived(base::StringPiece string_piece, auto array_buffer = v8::ArrayBuffer::New(isolate, string_piece.size()); auto backing_store = array_buffer->GetBackingStore(); memcpy(backing_store->Data(), string_piece.data(), string_piece.size()); - Emit("data", array_buffer); - std::move(resume).Run(); + Emit("data", array_buffer, + base::AdaptCallbackForRepeating(std::move(resume))); } void SimpleURLLoaderWrapper::OnComplete(bool success) { diff --git a/spec-main/api-net-spec.ts b/spec-main/api-net-spec.ts index e7598aa0ad..1d275ccbe3 100644 --- a/spec-main/api-net-spec.ts +++ b/spec-main/api-net-spec.ts @@ -4,7 +4,7 @@ import * as http from 'http'; import * as url from 'url'; import { AddressInfo, Socket } from 'net'; import { emittedOnce } from './events-helpers'; -import { defer } from './spec-helpers'; +import { defer, delay } from './spec-helpers'; const kOneKiloByte = 1024; const kOneMegaByte = kOneKiloByte * kOneKiloByte; @@ -1476,6 +1476,28 @@ describe('net module', () => { await collectStreamBody(nodeResponse); expect(nodeRequestProcessed).to.equal(true); }); + + it('should correctly throttle an incoming stream', async () => { + let numChunksSent = 0; + const serverUrl = await respondOnce.toSingleURL((request, response) => { + const data = randomString(kOneMegaByte); + const write = () => { + let ok = true; + do { + numChunksSent++; + if (numChunksSent > 30) return; + ok = response.write(data); + } while (ok); + response.once('drain', write); + }; + write(); + }); + const urlRequest = net.request(serverUrl); + urlRequest.on('response', () => {}); + urlRequest.end(); + await delay(2000); + expect(numChunksSent).to.be.at.most(20); + }); }); describe('Stability and performance', () => { diff --git a/typings/internal-ambient.d.ts b/typings/internal-ambient.d.ts index 8846510f00..6953ef6c83 100644 --- a/typings/internal-ambient.d.ts +++ b/typings/internal-ambient.d.ts @@ -120,7 +120,7 @@ declare namespace NodeJS { interface URLLoader extends EventEmitter { cancel(): void; - on(eventName: 'data', listener: (event: any, data: ArrayBuffer) => void): this; + on(eventName: 'data', listener: (event: any, data: ArrayBuffer, resume: () => void) => void): this; on(eventName: 'response-started', listener: (event: any, finalUrl: string, responseHead: ResponseHead) => void): this; on(eventName: 'complete', listener: (event: any) => void): this; on(eventName: 'error', listener: (event: any, netErrorString: string) => void): this;