stream: Simplify flowing, passive data listening

Closes #5860

In streams2, there is an "old mode" for compatibility.  Once switched
into this mode, there is no going back.

With this change, there is a "flowing mode" and a "paused mode".  If you
add a data listener, then this will start the flow of data.  However,
hitting the `pause()` method will switch *back* into a non-flowing mode,
where the `read()` method will pull data out.

Every time `read()` returns a data chunk, it also emits a `data` event.
In this way, a passive data listener can be added, and the stream passed
off to some other reader, for use with progress bars and the like.

There is no API change beyond this added flexibility.
This commit is contained in:
isaacs 2013-07-17 18:24:02 -07:00
Родитель 5fcd6e4038
Коммит 0f8de5e1f9
11 изменённых файлов: 433 добавлений и 201 удалений

Просмотреть файл

@ -104,11 +104,35 @@ Readable stream.
A Readable stream will not start emitting data until you indicate that
you are ready to receive it.
Readable streams have two "modes": a **flowing mode** and a **non-flowing
Readable streams have two "modes": a **flowing mode** and a **paused
mode**. When in flowing mode, data is read from the underlying system
and provided to your program as fast as possible. In non-flowing
mode, you must explicitly call `stream.read()` to get chunks of data
out.
and provided to your program as fast as possible. In paused mode, you
must explicitly call `stream.read()` to get chunks of data out.
Streams start out in paused mode.
**Note**: If no data event handlers are attached, and there are no
[`pipe()`][] destinations, and the stream is switched into flowing
mode, then data will be lost.
You can switch to flowing mode by doing any of the following:
* Adding a [`'data'` event][] handler to listen for data.
* Calling the [`resume()`][] method to explicitly open the flow.
* Calling the [`pipe()`][] method to send the data to a [Writable][].
You can switch back to paused mode by doing either of the following:
* If there are no pipe destinations, by calling the [`pause()`][]
method.
* If there are pipe destinations, by removing any [`'data'` event][]
handlers, and removing all pipe destinations by calling the
[`unpipe()`][] method.
Note that, for backwards compatibility reasons, removing `'data'`
event handlers will **not** automatically pause the stream. Also, if
there are piped destinations, then calling `pause()` will not
guarantee that the stream will *remain* paused once those
destinations drain and ask for more data.
Examples of readable streams include:
@ -144,9 +168,9 @@ again when more data is available.
* `chunk` {Buffer | String} The chunk of data.
If you attach a `data` event listener, then it will switch the stream
into flowing mode, and data will be passed to your handler as soon as
it is available.
Attaching a `data` event listener to a stream that has not been
explicitly paused will switch the stream into flowing mode. Data will
then be passed as soon as it is available.
If you just want to get all the data out of the stream as fast as
possible, this is the best way to do so.
@ -200,9 +224,9 @@ bytes. If `size` bytes are not available, then it will return `null`.
If you do not specify a `size` argument, then it will return all the
data in the internal buffer.
This method should only be called in non-flowing mode. In
flowing-mode, this method is called automatically until the internal
buffer is drained.
This method should only be called in paused mode. In flowing mode,
this method is called automatically until the internal buffer is
drained.
```javascript
var readable = getReadableStreamSomehow();
@ -214,6 +238,9 @@ readable.on('readable', function() {
});
```
If this method returns a data chunk, then it will also trigger the
emission of a [`'data'` event][].
#### readable.setEncoding(encoding)
* `encoding` {String} The encoding to use.
@ -244,9 +271,9 @@ readable.on('data', function(chunk) {
This method will cause the readable stream to resume emitting `data`
events.
This method will switch the stream into flowing-mode. If you do *not*
This method will switch the stream into flowing mode. If you do *not*
want to consume the data from a stream, but you *do* want to get to
its `end` event, you can call `readable.resume()` to open the flow of
its `end` event, you can call [`readable.resume()`][] to open the flow of
data.
```javascript
@ -259,13 +286,9 @@ readable.on('end', function(chunk) {
#### readable.pause()
This method will cause a stream in flowing-mode to stop emitting
`data` events. Any data that becomes available will remain in the
internal buffer.
This method is only relevant in flowing mode. When called on a
non-flowing stream, it will switch into flowing mode, but remain
paused.
This method will cause a stream in flowing mode to stop emitting
`data` events, switching out of flowing mode. Any data that becomes
available will remain in the internal buffer.
```javascript
var readable = getReadableStreamSomehow();
@ -414,7 +437,7 @@ entire Streams API as it is today. (See "Compatibility" below for
more information.)
If you are using an older Node library that emits `'data'` events and
has a `pause()` method that is advisory only, then you can use the
has a [`pause()`][] method that is advisory only, then you can use the
`wrap()` method to create a [Readable][] stream that uses the old stream
as its data source.
@ -1298,23 +1321,23 @@ simpler, but also less powerful and less useful.
events would start emitting immediately. If you needed to do some
I/O to decide how to handle data, then you had to store the chunks
in some kind of buffer so that they would not be lost.
* The `pause()` method was advisory, rather than guaranteed. This
* The [`pause()`][] method was advisory, rather than guaranteed. This
meant that you still had to be prepared to receive `'data'` events
even when the stream was in a paused state.
In Node v0.10, the Readable class described below was added. For
backwards compatibility with older Node programs, Readable streams
switch into "flowing mode" when a `'data'` event handler is added, or
when the `pause()` or `resume()` methods are called. The effect is
that, even if you are not using the new `read()` method and
`'readable'` event, you no longer have to worry about losing `'data'`
chunks.
when the [`resume()`][] method is called. The effect is that, even if
you are not using the new `read()` method and `'readable'` event, you
no longer have to worry about losing `'data'` chunks.
Most programs will continue to function normally. However, this
introduces an edge case in the following conditions:
* No `'data'` event handler is added.
* The `pause()` and `resume()` methods are never called.
* No [`'data'` event][] handler is added.
* The [`resume()`][] method is never called.
* The stream is not piped to any writable destination.
For example, consider the following code:
@ -1336,7 +1359,7 @@ simply discarded. However, in Node v0.10 and beyond, the socket will
remain paused forever.
The workaround in this situation is to call the `resume()` method to
trigger "old mode" behavior:
start the flow of data:
```javascript
// Workaround
@ -1352,9 +1375,9 @@ net.createServer(function(socket) {
}).listen(1337);
```
In addition to new Readable streams switching into flowing-mode, pre-v0.10
style streams can be wrapped in a Readable class using the `wrap()`
method.
In addition to new Readable streams switching into flowing mode,
pre-v0.10 style streams can be wrapped in a Readable class using the
`wrap()` method.
### Object Mode
@ -1494,3 +1517,9 @@ modify them.
[_write]: #stream_writable_write_chunk_encoding_callback_1
[`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor
[`end()`]: #stream_writable_end_chunk_encoding_callback
[`'data'` event]: #stream_event_data
[`resume()`]: #stream_readable_resume
[`readable.resume()`]: #stream_readable_resume
[`pause()`]: #stream_readable_pause
[`unpipe()`]: #stream_readable_unpipe_destination
[`pipe()`]: #stream_readable_pipe_destination_options

Просмотреть файл

@ -26,6 +26,7 @@ var EE = require('events').EventEmitter;
var Stream = require('stream');
var util = require('util');
var StringDecoder;
var debug = util.debuglog('stream');
util.inherits(Readable, Stream);
@ -44,7 +45,7 @@ function ReadableState(options, stream) {
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
this.flowing = false;
this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;
@ -250,6 +251,7 @@ function howMuchToRead(n, state) {
// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
debug('read', n);
var state = this._readableState;
state.calledRead = true;
var nOrig = n;
@ -263,7 +265,11 @@ Readable.prototype.read = function(n) {
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
emitReadable(this);
debug('read: emitReadable');
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
@ -300,17 +306,23 @@ Readable.prototype.read = function(n) {
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
debug('need readable', doRead);
// if we currently have less than the highWaterMark, then also read some
if (state.length - n <= state.highWaterMark)
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug('length less than watermark', doRead);
}
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading)
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
}
if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
@ -351,6 +363,8 @@ Readable.prototype.read = function(n) {
if (state.ended && !state.endEmitted && state.length === 0)
endReadable(this);
if (ret !== null)
this.emit('data', ret);
return ret;
};
@ -392,20 +406,22 @@ function onEofChunk(stream, state) {
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (state.emittedReadable)
return;
state.emittedReadable = true;
if (state.sync)
process.nextTick(function() {
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
if (state.sync)
process.nextTick(function() {
emitReadable_(stream);
});
else
emitReadable_(stream);
});
else
emitReadable_(stream);
}
}
function emitReadable_(stream) {
debug('emit readable');
stream.emit('readable');
flow(stream);
}
@ -428,6 +444,7 @@ function maybeReadMore_(stream, state) {
var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length)
// didn't get any data, stop spinning.
@ -462,6 +479,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
break;
}
state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
@ -475,11 +493,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.on('unpipe', onunpipe);
function onunpipe(readable) {
if (readable !== src) return;
cleanup();
debug('onunpipe');
if (readable === src) {
cleanup();
}
}
function onend() {
debug('onend');
dest.end();
}
@ -491,6 +512,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.on('drain', ondrain);
function cleanup() {
debug('cleanup');
// cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
@ -499,19 +521,34 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', cleanup);
src.removeListener('data', ondata);
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (!dest._writableState || dest._writableState.needDrain)
if (state.awaitDrain &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
var ret = dest.write(chunk);
if (false === ret) {
debug('false write response, pause',
src._readableState.awaitDrain);
src._readableState.awaitDrain++;
src.pause();
}
}
// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
function onerror(er) {
debug('onerror', er);
unpipe();
if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er);
@ -525,12 +562,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
dest.once('close', onclose);
function onfinish() {
debug('onfinish');
dest.removeListener('close', onclose);
unpipe();
}
dest.once('finish', onfinish);
function unpipe() {
debug('unpipe');
src.unpipe(dest);
}
@ -539,16 +578,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// start the flow if it hasn't been started already.
if (!state.flowing) {
// the handler that waits for readable events after all
// the data gets sucked out in flow.
// This would be easier to follow with a .once() handler
// in flow(), but that is too slow.
this.on('readable', pipeOnReadable);
state.flowing = true;
process.nextTick(function() {
flow(src);
});
debug('pipe resume');
src.resume();
}
return dest;
@ -558,61 +589,14 @@ function pipeOnDrain(src) {
return function() {
var dest = this;
var state = src._readableState;
state.awaitDrain--;
if (state.awaitDrain === 0)
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
};
}
function flow(src) {
var state = src._readableState;
var chunk;
state.awaitDrain = 0;
function write(dest, i, list) {
var written = dest.write(chunk);
if (false === written) {
state.awaitDrain++;
}
}
while (state.pipesCount && null !== (chunk = src.read())) {
if (state.pipesCount === 1)
write(state.pipes, 0, null);
else
state.pipes.forEach(write);
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
if (state.awaitDrain > 0)
return;
}
// if every destination was unpiped, either before entering this
// function, or in the while loop, then stop flowing.
//
// NB: This is a pretty rare edge case.
if (state.pipesCount === 0) {
state.flowing = false;
// if there were data event listeners added, then switch to old mode.
if (EE.listenerCount(src, 'data') > 0)
emitDataEvents(src);
return;
}
// at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again.
state.ranOut = true;
}
function pipeOnReadable() {
if (this._readableState.ranOut) {
this._readableState.ranOut = false;
flow(this);
}
};
}
@ -635,7 +619,6 @@ Readable.prototype.unpipe = function(dest) {
// got a match.
state.pipes = null;
state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
@ -650,7 +633,6 @@ Readable.prototype.unpipe = function(dest) {
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false;
for (var i = 0; i < len; i++)
@ -678,8 +660,11 @@ Readable.prototype.unpipe = function(dest) {
Readable.prototype.on = function(ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
if (ev === 'data' && !this._readableState.flowing)
emitDataEvents(this);
// If listening to data, and it has not explicitly been paused,
// then call resume to start the flow of data on the next tick.
if (ev === 'data' && false !== this._readableState.flowing) {
this.resume();
}
if (ev === 'readable' && this.readable) {
var state = this._readableState;
@ -688,7 +673,11 @@ Readable.prototype.on = function(ev, fn) {
state.emittedReadable = false;
state.needReadable = true;
if (!state.reading) {
this.read(0);
var self = this;
process.nextTick(function() {
debug('readable nexttick read 0');
self.read(0);
});
} else if (state.length) {
emitReadable(this, state);
}
@ -702,63 +691,52 @@ Readable.prototype.addListener = Readable.prototype.on;
// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
emitDataEvents(this);
this.read(0);
this.emit('resume');
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
if (!state.reading) {
debug('resume read 0');
this.read(0);
}
resume(this, state);
}
};
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
process.nextTick(function() {
resume_(stream, state);
});
}
}
function resume_(stream, state) {
state.resumeScheduled = false;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)
stream.read(0);
}
Readable.prototype.pause = function() {
emitDataEvents(this, true);
this.emit('pause');
debug('call pause flowing=%j', this._readableState.flowing);
if (false !== this._readableState.flowing) {
debug('pause');
this._readableState.flowing = false;
this.emit('pause');
}
};
function emitDataEvents(stream, startPaused) {
function flow(stream) {
var state = stream._readableState;
debug('flow', state.flowing);
if (state.flowing) {
// https://github.com/isaacs/readable-stream/issues/16
throw new Error('Cannot switch to old mode now.');
do {
var chunk = stream.read();
} while (null !== chunk && state.flowing);
}
var paused = startPaused || false;
var readable = false;
// convert to an old-style stream.
stream.readable = true;
stream.pipe = Stream.prototype.pipe;
stream.on = stream.addListener = Stream.prototype.on;
stream.on('readable', function() {
readable = true;
var c;
while (!paused && (null !== (c = stream.read())))
stream.emit('data', c);
if (c === null) {
readable = false;
stream._readableState.needReadable = true;
}
});
stream.pause = function() {
paused = true;
this.emit('pause');
};
stream.resume = function() {
paused = false;
if (readable)
process.nextTick(function() {
stream.emit('readable');
});
else
this.read(0);
this.emit('resume');
};
// now make it start, just in case it hadn't already.
stream.emit('readable');
}
// wrap an old-style stream as the async data source.
@ -770,6 +748,7 @@ Readable.prototype.wrap = function(stream) {
var self = this;
stream.on('end', function() {
debug('wrapped end');
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
if (chunk && chunk.length)
@ -780,6 +759,7 @@ Readable.prototype.wrap = function(stream) {
});
stream.on('data', function(chunk) {
debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);
if (!chunk || !state.objectMode && !chunk.length)
@ -812,6 +792,7 @@ Readable.prototype.wrap = function(stream) {
// when we try to consume some more bytes, simply unpause the
// underlying stream.
self._read = function(n) {
debug('wrapped _read', n);
if (paused) {
paused = false;
stream.resume();

Просмотреть файл

@ -0,0 +1,52 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var spawn = require('child_process').spawn;
var child = spawn(process.execPath, [], {
env: {
NODE_DEBUG: process.argv[2]
}
});
var wanted = child.pid + '\n';
var found = '';
child.stdout.setEncoding('utf8');
child.stdout.on('data', function(c) {
found += c;
});
child.stderr.setEncoding('utf8');
child.stderr.on('data', function(c) {
console.error('> ' + c.trim().split(/\n/).join('\n> '));
});
child.on('close', function(c) {
assert(!c);
assert.equal(found, wanted);
console.log('ok');
});
setTimeout(function() {
child.stdin.end('console.log(process.pid)');
});

Просмотреть файл

@ -37,10 +37,8 @@ function TestReader(n) {
util.inherits(TestReader, R);
TestReader.prototype.read = function(n) {
if (n === 0) return null;
TestReader.prototype._read = function(n) {
var max = this._buffer.length - this._pos;
n = n || max;
n = Math.max(n, 0);
var toRead = Math.min(n, max);
if (toRead === 0) {
@ -51,20 +49,21 @@ TestReader.prototype.read = function(n) {
this._bufs -= 1;
if (this._bufs <= 0) {
// read them all!
if (!this.ended) {
this.emit('end');
this.ended = true;
}
if (!this.ended)
this.push(null);
} else {
this.emit('readable');
// now we have more.
// kinda cheating by calling _read, but whatever,
// it's just fake anyway.
this._read(n);
}
}.bind(this), 10);
return null;
return;
}
var ret = this._buffer.slice(this._pos, this._pos + toRead);
this._pos += toRead;
return ret;
this.push(ret);
};
/////
@ -135,21 +134,17 @@ test('a most basic test', function(t) {
'xxx',
'xxxx',
'xxxxx',
'xxxxx',
'xxxxxxxx',
'xxxxxxxxx',
'xxx',
'xxxxxxxxxx',
'xxxxxxxxxxxx',
'xxxxxxxx',
'xxxxxxxxxxxxx',
'xxxxxxxxxxxxxxx',
'xxxxx',
'xxxxxxxxxxxxxxxxxx',
'xx',
'xxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxx' ];
'xxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxxxxxxx',
'xxxxxxxxxxxxxxxxxxxxx' ];
r.on('end', function() {
t.same(reads, expect);
@ -342,6 +337,7 @@ test('back pressure respected', function (t) {
var w1 = new R();
w1.write = function (chunk) {
console.error('w1.emit("close")');
assert.equal(chunk[0], "one");
w1.emit("close");
process.nextTick(function () {
@ -357,6 +353,7 @@ test('back pressure respected', function (t) {
var w2 = new R();
w2.write = function (chunk) {
console.error('w2 write', chunk, counter);
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 0);
@ -368,6 +365,7 @@ test('back pressure respected', function (t) {
setTimeout(function () {
counter--;
console.error("w2 drain");
w2.emit("drain");
}, 10);
@ -377,6 +375,7 @@ test('back pressure respected', function (t) {
var w3 = new R();
w3.write = function (chunk) {
console.error('w3 write', chunk, counter);
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 1);
@ -388,6 +387,7 @@ test('back pressure respected', function (t) {
setTimeout(function () {
counter--;
console.error("w3 drain");
w3.emit("drain");
}, 50);

Просмотреть файл

@ -47,4 +47,7 @@ TestReader.prototype._read = function(n) {
};
var reader = new TestReader();
assert.equal(ondataCalled, 1);
setImmediate(function() {
assert.equal(ondataCalled, 1);
console.log('ok');
});

Просмотреть файл

@ -237,9 +237,12 @@ test('high watermark _read', function(t) {
assert.equal(v, '1');
var v2 = r.read();
assert.equal(v2, '2');
var v3 = r.read();
assert.equal(v3, '3');
assert.equal(calls, 1);
assert.equal(v2, '2');
t.end();
});

Просмотреть файл

@ -40,4 +40,4 @@ oldStream.emit('end');
process.on('exit', function(){
assert.ok(ended);
});
});

Просмотреть файл

@ -39,17 +39,14 @@ function runTest(highWaterMark, objectMode, produce) {
ended = true;
});
var pauses = 0;
var resumes = 0;
old.pause = function() {
pauses++;
console.error('old.pause()');
old.emit('pause');
flowing = false;
};
old.resume = function() {
resumes++;
console.error('old.resume()');
old.emit('resume');
flow();
};
@ -63,8 +60,9 @@ function runTest(highWaterMark, objectMode, produce) {
while (flowing && chunks-- > 0) {
var item = produce();
expected.push(item);
console.log('emit', chunks);
console.log('old.emit', chunks, flowing);
old.emit('data', item);
console.log('after emit', chunks, flowing);
}
if (chunks <= 0) {
oldEnded = true;
@ -76,7 +74,7 @@ function runTest(highWaterMark, objectMode, produce) {
var w = new Writable({ highWaterMark: highWaterMark * 2, objectMode: objectMode });
var written = [];
w._write = function(chunk, encoding, cb) {
console.log(chunk);
console.log('_write', chunk);
written.push(chunk);
setTimeout(cb);
};
@ -94,11 +92,10 @@ function runTest(highWaterMark, objectMode, produce) {
assert(ended);
assert(oldEnded);
assert.deepEqual(written, expected);
assert.equal(pauses, 10);
assert.equal(resumes, 9);
}
}
runTest(100, false, function(){ return new Buffer(100); });
runTest(10, false, function(){ return new Buffer('xxxxxxxxxx'); });
runTest(1, true, function(){ return { foo: 'bar' }; });

Просмотреть файл

@ -310,8 +310,7 @@ test('complex transform', function(t) {
pt.end();
t.end();
});
t.equal(pt.read().toString(), 'abc');
t.equal(pt.read().toString(), 'def');
t.equal(pt.read().toString(), 'abcdef');
t.equal(pt.read(), null);
});
});

Просмотреть файл

@ -68,7 +68,8 @@ assert.equal(dest.listeners('finish').length, 0);
console.error(src._readableState);
process.on('exit', function() {
assert(src._readableState.length >= src._readableState.highWaterMark);
src._readableState.buffer.length = 0;
console.error(src._readableState);
assert(src._readableState.length >= src._readableState.highWaterMark);
console.log('ok');
});

Просмотреть файл

@ -0,0 +1,167 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var totalChunks = 100;
var chunkSize = 99;
var expectTotalData = totalChunks * chunkSize;
var expectEndingData = expectTotalData;
var r = new Readable({ highWaterMark: 1000 });
var chunks = totalChunks;
r._read = function(n) {
if (!(chunks % 2))
setImmediate(push);
else if (!(chunks % 3))
process.nextTick(push);
else
push();
};
var totalPushed = 0;
function push() {
var chunk = chunks-- > 0 ? new Buffer(chunkSize) : null;
if (chunk) {
totalPushed += chunk.length;
chunk.fill('x');
}
r.push(chunk);
}
read100();
// first we read 100 bytes
function read100() {
readn(100, onData);
}
function readn(n, then) {
console.error('read %d', n);
expectEndingData -= n;
;(function read() {
var c = r.read(n);
if (!c)
r.once('readable', read);
else {
assert.equal(c.length, n);
assert(!r._readableState.flowing);
then();
}
})();
}
// then we listen to some data events
function onData() {
expectEndingData -= 100;
console.error('onData');
var seen = 0;
r.on('data', function od(c) {
seen += c.length;
if (seen >= 100) {
// seen enough
r.removeListener('data', od);
r.pause();
if (seen > 100) {
// oh no, seen too much!
// put the extra back.
var diff = seen - 100;
r.unshift(c.slice(c.length - diff));
console.error('seen too much', seen, diff);
}
// Nothing should be lost in between
setImmediate(pipeLittle);
}
});
}
// Just pipe 200 bytes, then unshift the extra and unpipe
function pipeLittle() {
expectEndingData -= 200;
console.error('pipe a little');
var w = new Writable();
var written = 0;
w.on('finish', function() {
assert.equal(written, 200);
setImmediate(read1234);
});
w._write = function(chunk, encoding, cb) {
written += chunk.length;
if (written >= 200) {
r.unpipe(w);
w.end();
cb();
if (written > 200) {
var diff = written - 200;
written -= diff;
r.unshift(chunk.slice(chunk.length - diff));
}
} else {
setImmediate(cb);
}
};
r.pipe(w);
}
// now read 1234 more bytes
function read1234() {
readn(1234, resumePause);
}
function resumePause() {
console.error('resumePause');
// don't read anything, just resume and re-pause a whole bunch
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
setImmediate(pipe);
}
function pipe() {
console.error('pipe the rest');
var w = new Writable();
var written = 0;
w._write = function(chunk, encoding, cb) {
written += chunk.length;
cb();
};
w.on('finish', function() {
console.error('written', written, totalPushed);
assert.equal(written, expectEndingData);
assert.equal(totalPushed, expectTotalData);
console.log('ok');
});
r.pipe(w);
}