stream: Remove bufferSize option
Now that highWaterMark increases when there are large reads, this greatly reduces the number of calls necessary to _read(size), assuming that _read actually respects the size argument.
This commit is contained in:
Родитель
d5a0940fff
Коммит
b0f6789a78
|
@ -90,8 +90,6 @@ method. (See below.)
|
|||
### new stream.Readable([options])
|
||||
|
||||
* `options` {Object}
|
||||
* `bufferSize` {Number} The size of the chunks to consume from the
|
||||
underlying resource. Default=16kb
|
||||
* `highWaterMark` {Number} The maximum number of bytes to store in
|
||||
the internal buffer before ceasing to read from the underlying
|
||||
resource. Default=16kb
|
||||
|
|
|
@ -32,16 +32,12 @@ util.inherits(Readable, Stream);
|
|||
function ReadableState(options, stream) {
|
||||
options = options || {};
|
||||
|
||||
// the argument passed to this._read(n)
|
||||
this.bufferSize = options.bufferSize || 16 * 1024;
|
||||
|
||||
// the point at which it stops calling _read() to fill the buffer
|
||||
// Note: 0 is a valid value, means "don't call _read preemptively ever"
|
||||
var hwm = options.highWaterMark;
|
||||
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
|
||||
|
||||
// cast to ints.
|
||||
this.bufferSize = ~~this.bufferSize;
|
||||
this.highWaterMark = ~~this.highWaterMark;
|
||||
|
||||
this.buffer = [];
|
||||
|
@ -265,7 +261,7 @@ Readable.prototype.read = function(n) {
|
|||
if (state.length === 0)
|
||||
state.needReadable = true;
|
||||
// call internal read method
|
||||
this._read(state.bufferSize);
|
||||
this._read(state.highWaterMark);
|
||||
state.sync = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ function afterTransform(stream, er, data) {
|
|||
|
||||
var rs = stream._readableState;
|
||||
if (rs.needReadable || rs.length < rs.highWaterMark) {
|
||||
stream._read(rs.bufferSize);
|
||||
stream._read(rs.highWaterMark);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,7 +165,7 @@ Transform.prototype._write = function(chunk, encoding, cb) {
|
|||
if (ts.needTransform ||
|
||||
rs.needReadable ||
|
||||
rs.length < rs.highWaterMark)
|
||||
this._read(rs.bufferSize);
|
||||
this._read(rs.highWaterMark);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1431,7 +1431,6 @@ function ReadStream(path, options) {
|
|||
|
||||
// a little bit bigger buffer and water marks by default
|
||||
options = util._extend({
|
||||
bufferSize: 64 * 1024,
|
||||
highWaterMark: 64 * 1024
|
||||
}, options || {});
|
||||
|
||||
|
@ -1505,10 +1504,9 @@ ReadStream.prototype._read = function(n) {
|
|||
return;
|
||||
|
||||
if (!pool || pool.length - pool.used < kMinPoolSpace) {
|
||||
// discard the old pool. Can't add to the free list because
|
||||
// users might have refernces to slices on it.
|
||||
// discard the old pool.
|
||||
pool = null;
|
||||
allocNewPool(this._readableState.bufferSize);
|
||||
allocNewPool(this._readableState.highWaterMark);
|
||||
}
|
||||
|
||||
// Grab another reference to the pool in the case that while we're
|
||||
|
|
|
@ -32,7 +32,7 @@ var Readable = require('stream').Readable;
|
|||
// throw an error if we trigger a nextTick warning.
|
||||
process.throwDeprecation = true;
|
||||
|
||||
var stream = new Readable({ highWaterMark: 2, bufferSize: 2 });
|
||||
var stream = new Readable({ highWaterMark: 2 });
|
||||
var reads = 0;
|
||||
stream._read = function(size) {
|
||||
reads++;
|
||||
|
@ -59,7 +59,7 @@ flow(stream, 5000, function() {
|
|||
});
|
||||
|
||||
process.on('exit', function(code) {
|
||||
assert.equal(reads, 5000);
|
||||
assert.equal(reads, 2);
|
||||
// we pushed up the high water mark
|
||||
assert.equal(stream._readableState.highWaterMark, 5000);
|
||||
assert.equal(stream._readableState.length, 5000);
|
||||
|
|
|
@ -32,11 +32,7 @@ var file = path.resolve(common.fixturesDir, 'x1024.txt');
|
|||
|
||||
var size = fs.statSync(file).size;
|
||||
|
||||
// expect to see chunks no more than 10 bytes each.
|
||||
var expectLengths = [];
|
||||
for (var i = size; i > 0; i -= 10) {
|
||||
expectLengths.push(Math.min(i, 10));
|
||||
}
|
||||
var expectLengths = [1024];
|
||||
|
||||
var util = require('util');
|
||||
var Stream = require('stream');
|
||||
|
@ -60,7 +56,7 @@ TestWriter.prototype.end = function(c) {
|
|||
this.emit('results', this.buffer);
|
||||
}
|
||||
|
||||
var r = new FSReadable(file, { bufferSize: 10 });
|
||||
var r = new FSReadable(file);
|
||||
var w = new TestWriter();
|
||||
|
||||
w.on('results', function(res) {
|
||||
|
|
|
@ -64,9 +64,7 @@ process.nextTick(run);
|
|||
util.inherits(TestReader, R);
|
||||
|
||||
function TestReader(n, opts) {
|
||||
R.call(this, util._extend({
|
||||
bufferSize: 5
|
||||
}, opts));
|
||||
R.call(this, opts);
|
||||
|
||||
this.pos = 0;
|
||||
this.len = n || 100;
|
||||
|
|
Загрузка…
Ссылка в новой задаче