diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index e3c55a4bd8..d202463b80 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -104,11 +104,35 @@ Readable stream. A Readable stream will not start emitting data until you indicate that you are ready to receive it. -Readable streams have two "modes": a **flowing mode** and a **non-flowing +Readable streams have two "modes": a **flowing mode** and a **paused mode**. When in flowing mode, data is read from the underlying system -and provided to your program as fast as possible. In non-flowing -mode, you must explicitly call `stream.read()` to get chunks of data -out. +and provided to your program as fast as possible. In paused mode, you +must explicitly call `stream.read()` to get chunks of data out. +Streams start out in paused mode. + +**Note**: If no data event handlers are attached, and there are no +[`pipe()`][] destinations, and the stream is switched into flowing +mode, then data will be lost. + +You can switch to flowing mode by doing any of the following: + +* Adding a [`'data'` event][] handler to listen for data. +* Calling the [`resume()`][] method to explicitly open the flow. +* Calling the [`pipe()`][] method to send the data to a [Writable][]. + +You can switch back to paused mode by doing either of the following: + +* If there are no pipe destinations, by calling the [`pause()`][] + method. +* If there are pipe destinations, by removing any [`'data'` event][] + handlers, and removing all pipe destinations by calling the + [`unpipe()`][] method. + +Note that, for backwards compatibility reasons, removing `'data'` +event handlers will **not** automatically pause the stream. Also, if +there are piped destinations, then calling `pause()` will not +guarantee that the stream will *remain* paused once those +destinations drain and ask for more data. Examples of readable streams include: @@ -144,9 +168,9 @@ again when more data is available. * `chunk` {Buffer | String} The chunk of data. -If you attach a `data` event listener, then it will switch the stream -into flowing mode, and data will be passed to your handler as soon as -it is available. +Attaching a `data` event listener to a stream that has not been +explicitly paused will switch the stream into flowing mode. Data will +then be passed as soon as it is available. If you just want to get all the data out of the stream as fast as possible, this is the best way to do so. @@ -200,9 +224,9 @@ bytes. If `size` bytes are not available, then it will return `null`. If you do not specify a `size` argument, then it will return all the data in the internal buffer. -This method should only be called in non-flowing mode. In -flowing-mode, this method is called automatically until the internal -buffer is drained. +This method should only be called in paused mode. In flowing mode, +this method is called automatically until the internal buffer is +drained. ```javascript var readable = getReadableStreamSomehow(); @@ -214,6 +238,9 @@ readable.on('readable', function() { }); ``` +If this method returns a data chunk, then it will also trigger the +emission of a [`'data'` event][]. + #### readable.setEncoding(encoding) * `encoding` {String} The encoding to use. @@ -244,9 +271,9 @@ readable.on('data', function(chunk) { This method will cause the readable stream to resume emitting `data` events. -This method will switch the stream into flowing-mode. If you do *not* +This method will switch the stream into flowing mode. If you do *not* want to consume the data from a stream, but you *do* want to get to -its `end` event, you can call `readable.resume()` to open the flow of +its `end` event, you can call [`readable.resume()`][] to open the flow of data. ```javascript @@ -259,13 +286,9 @@ readable.on('end', function(chunk) { #### readable.pause() -This method will cause a stream in flowing-mode to stop emitting -`data` events. Any data that becomes available will remain in the -internal buffer. - -This method is only relevant in flowing mode. When called on a -non-flowing stream, it will switch into flowing mode, but remain -paused. +This method will cause a stream in flowing mode to stop emitting +`data` events, switching out of flowing mode. Any data that becomes +available will remain in the internal buffer. ```javascript var readable = getReadableStreamSomehow(); @@ -414,7 +437,7 @@ entire Streams API as it is today. (See "Compatibility" below for more information.) If you are using an older Node library that emits `'data'` events and -has a `pause()` method that is advisory only, then you can use the +has a [`pause()`][] method that is advisory only, then you can use the `wrap()` method to create a [Readable][] stream that uses the old stream as its data source. @@ -1298,23 +1321,23 @@ simpler, but also less powerful and less useful. events would start emitting immediately. If you needed to do some I/O to decide how to handle data, then you had to store the chunks in some kind of buffer so that they would not be lost. -* The `pause()` method was advisory, rather than guaranteed. This +* The [`pause()`][] method was advisory, rather than guaranteed. This meant that you still had to be prepared to receive `'data'` events even when the stream was in a paused state. In Node v0.10, the Readable class described below was added. For backwards compatibility with older Node programs, Readable streams switch into "flowing mode" when a `'data'` event handler is added, or -when the `pause()` or `resume()` methods are called. The effect is -that, even if you are not using the new `read()` method and -`'readable'` event, you no longer have to worry about losing `'data'` -chunks. +when the [`resume()`][] method is called. The effect is that, even if +you are not using the new `read()` method and `'readable'` event, you +no longer have to worry about losing `'data'` chunks. Most programs will continue to function normally. However, this introduces an edge case in the following conditions: -* No `'data'` event handler is added. -* The `pause()` and `resume()` methods are never called. +* No [`'data'` event][] handler is added. +* The [`resume()`][] method is never called. +* The stream is not piped to any writable destination. For example, consider the following code: @@ -1336,7 +1359,7 @@ simply discarded. However, in Node v0.10 and beyond, the socket will remain paused forever. The workaround in this situation is to call the `resume()` method to -trigger "old mode" behavior: +start the flow of data: ```javascript // Workaround @@ -1352,9 +1375,9 @@ net.createServer(function(socket) { }).listen(1337); ``` -In addition to new Readable streams switching into flowing-mode, pre-v0.10 -style streams can be wrapped in a Readable class using the `wrap()` -method. +In addition to new Readable streams switching into flowing mode, +pre-v0.10 style streams can be wrapped in a Readable class using the +`wrap()` method. ### Object Mode @@ -1494,3 +1517,9 @@ modify them. [_write]: #stream_writable_write_chunk_encoding_callback_1 [`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor [`end()`]: #stream_writable_end_chunk_encoding_callback +[`'data'` event]: #stream_event_data +[`resume()`]: #stream_readable_resume +[`readable.resume()`]: #stream_readable_resume +[`pause()`]: #stream_readable_pause +[`unpipe()`]: #stream_readable_unpipe_destination +[`pipe()`]: #stream_readable_pipe_destination_options diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 2259d2e778..c6f31bc1f9 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -26,6 +26,7 @@ var EE = require('events').EventEmitter; var Stream = require('stream'); var util = require('util'); var StringDecoder; +var debug = util.debuglog('stream'); util.inherits(Readable, Stream); @@ -44,7 +45,7 @@ function ReadableState(options, stream) { this.length = 0; this.pipes = null; this.pipesCount = 0; - this.flowing = false; + this.flowing = null; this.ended = false; this.endEmitted = false; this.reading = false; @@ -250,6 +251,7 @@ function howMuchToRead(n, state) { // you can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { + debug('read', n); var state = this._readableState; state.calledRead = true; var nOrig = n; @@ -263,7 +265,11 @@ Readable.prototype.read = function(n) { if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { - emitReadable(this); + debug('read: emitReadable'); + if (state.length === 0 && state.ended) + endReadable(this); + else + emitReadable(this); return null; } @@ -300,17 +306,23 @@ Readable.prototype.read = function(n) { // if we need a readable event, then we need to do some reading. var doRead = state.needReadable; + debug('need readable', doRead); // if we currently have less than the highWaterMark, then also read some - if (state.length - n <= state.highWaterMark) + if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; + debug('length less than watermark', doRead); + } // however, if we've ended, then there's no point, and if we're already // reading, then it's unnecessary. - if (state.ended || state.reading) + if (state.ended || state.reading) { doRead = false; + debug('reading or ended', doRead); + } if (doRead) { + debug('do read'); state.reading = true; state.sync = true; // if the length is currently zero, then we *need* a readable event. @@ -351,6 +363,8 @@ Readable.prototype.read = function(n) { if (state.ended && !state.endEmitted && state.length === 0) endReadable(this); + if (ret !== null) + this.emit('data', ret); return ret; }; @@ -392,20 +406,22 @@ function onEofChunk(stream, state) { function emitReadable(stream) { var state = stream._readableState; state.needReadable = false; - if (state.emittedReadable) - return; - - state.emittedReadable = true; - if (state.sync) - process.nextTick(function() { + if (!state.emittedReadable) { + debug('emitReadable', state.flowing); + state.emittedReadable = true; + if (state.sync) + process.nextTick(function() { + emitReadable_(stream); + }); + else emitReadable_(stream); - }); - else - emitReadable_(stream); + } } function emitReadable_(stream) { + debug('emit readable'); stream.emit('readable'); + flow(stream); } @@ -428,6 +444,7 @@ function maybeReadMore_(stream, state) { var len = state.length; while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) { + debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) // didn't get any data, stop spinning. @@ -462,6 +479,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { break; } state.pipesCount += 1; + debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -475,11 +493,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.on('unpipe', onunpipe); function onunpipe(readable) { - if (readable !== src) return; - cleanup(); + debug('onunpipe'); + if (readable === src) { + cleanup(); + } } function onend() { + debug('onend'); dest.end(); } @@ -491,6 +512,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.on('drain', ondrain); function cleanup() { + debug('cleanup'); // cleanup event handlers once the pipe is broken dest.removeListener('close', onclose); dest.removeListener('finish', onfinish); @@ -499,19 +521,34 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); src.removeListener('end', cleanup); + src.removeListener('data', ondata); // if the reader is waiting for a drain event from this // specific writer, then it would cause it to never start // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if (!dest._writableState || dest._writableState.needDrain) + if (state.awaitDrain && + (!dest._writableState || dest._writableState.needDrain)) ondrain(); } + src.on('data', ondata); + function ondata(chunk) { + debug('ondata'); + var ret = dest.write(chunk); + if (false === ret) { + debug('false write response, pause', + src._readableState.awaitDrain); + src._readableState.awaitDrain++; + src.pause(); + } + } + // if the dest has an error, then stop piping into it. // however, don't suppress the throwing behavior for this. function onerror(er) { + debug('onerror', er); unpipe(); if (EE.listenerCount(dest, 'error') === 0) dest.emit('error', er); @@ -525,12 +562,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } dest.once('close', onclose); function onfinish() { + debug('onfinish'); dest.removeListener('close', onclose); unpipe(); } dest.once('finish', onfinish); function unpipe() { + debug('unpipe'); src.unpipe(dest); } @@ -539,16 +578,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // start the flow if it hasn't been started already. if (!state.flowing) { - // the handler that waits for readable events after all - // the data gets sucked out in flow. - // This would be easier to follow with a .once() handler - // in flow(), but that is too slow. - this.on('readable', pipeOnReadable); - - state.flowing = true; - process.nextTick(function() { - flow(src); - }); + debug('pipe resume'); + src.resume(); } return dest; @@ -558,61 +589,14 @@ function pipeOnDrain(src) { return function() { var dest = this; var state = src._readableState; - state.awaitDrain--; - if (state.awaitDrain === 0) + debug('pipeOnDrain', state.awaitDrain); + if (state.awaitDrain) + state.awaitDrain--; + if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { + state.flowing = true; flow(src); - }; -} - -function flow(src) { - var state = src._readableState; - var chunk; - state.awaitDrain = 0; - - function write(dest, i, list) { - var written = dest.write(chunk); - if (false === written) { - state.awaitDrain++; } - } - - while (state.pipesCount && null !== (chunk = src.read())) { - - if (state.pipesCount === 1) - write(state.pipes, 0, null); - else - state.pipes.forEach(write); - - src.emit('data', chunk); - - // if anyone needs a drain, then we have to wait for that. - if (state.awaitDrain > 0) - return; - } - - // if every destination was unpiped, either before entering this - // function, or in the while loop, then stop flowing. - // - // NB: This is a pretty rare edge case. - if (state.pipesCount === 0) { - state.flowing = false; - - // if there were data event listeners added, then switch to old mode. - if (EE.listenerCount(src, 'data') > 0) - emitDataEvents(src); - return; - } - - // at this point, no one needed a drain, so we just ran out of data - // on the next readable event, start it over again. - state.ranOut = true; -} - -function pipeOnReadable() { - if (this._readableState.ranOut) { - this._readableState.ranOut = false; - flow(this); - } + }; } @@ -635,7 +619,6 @@ Readable.prototype.unpipe = function(dest) { // got a match. state.pipes = null; state.pipesCount = 0; - this.removeListener('readable', pipeOnReadable); state.flowing = false; if (dest) dest.emit('unpipe', this); @@ -650,7 +633,6 @@ Readable.prototype.unpipe = function(dest) { var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; - this.removeListener('readable', pipeOnReadable); state.flowing = false; for (var i = 0; i < len; i++) @@ -678,8 +660,11 @@ Readable.prototype.unpipe = function(dest) { Readable.prototype.on = function(ev, fn) { var res = Stream.prototype.on.call(this, ev, fn); - if (ev === 'data' && !this._readableState.flowing) - emitDataEvents(this); + // If listening to data, and it has not explicitly been paused, + // then call resume to start the flow of data on the next tick. + if (ev === 'data' && false !== this._readableState.flowing) { + this.resume(); + } if (ev === 'readable' && this.readable) { var state = this._readableState; @@ -688,7 +673,11 @@ Readable.prototype.on = function(ev, fn) { state.emittedReadable = false; state.needReadable = true; if (!state.reading) { - this.read(0); + var self = this; + process.nextTick(function() { + debug('readable nexttick read 0'); + self.read(0); + }); } else if (state.length) { emitReadable(this, state); } @@ -702,63 +691,52 @@ Readable.prototype.addListener = Readable.prototype.on; // pause() and resume() are remnants of the legacy readable stream API // If the user uses them, then switch into old mode. Readable.prototype.resume = function() { - emitDataEvents(this); - this.read(0); - this.emit('resume'); + var state = this._readableState; + if (!state.flowing) { + debug('resume'); + state.flowing = true; + if (!state.reading) { + debug('resume read 0'); + this.read(0); + } + resume(this, state); + } }; +function resume(stream, state) { + if (!state.resumeScheduled) { + state.resumeScheduled = true; + process.nextTick(function() { + resume_(stream, state); + }); + } +} + +function resume_(stream, state) { + state.resumeScheduled = false; + stream.emit('resume'); + flow(stream); + if (state.flowing && !state.reading) + stream.read(0); +} + Readable.prototype.pause = function() { - emitDataEvents(this, true); - this.emit('pause'); + debug('call pause flowing=%j', this._readableState.flowing); + if (false !== this._readableState.flowing) { + debug('pause'); + this._readableState.flowing = false; + this.emit('pause'); + } }; -function emitDataEvents(stream, startPaused) { +function flow(stream) { var state = stream._readableState; - + debug('flow', state.flowing); if (state.flowing) { - // https://github.com/isaacs/readable-stream/issues/16 - throw new Error('Cannot switch to old mode now.'); + do { + var chunk = stream.read(); + } while (null !== chunk && state.flowing); } - - var paused = startPaused || false; - var readable = false; - - // convert to an old-style stream. - stream.readable = true; - stream.pipe = Stream.prototype.pipe; - stream.on = stream.addListener = Stream.prototype.on; - - stream.on('readable', function() { - readable = true; - - var c; - while (!paused && (null !== (c = stream.read()))) - stream.emit('data', c); - - if (c === null) { - readable = false; - stream._readableState.needReadable = true; - } - }); - - stream.pause = function() { - paused = true; - this.emit('pause'); - }; - - stream.resume = function() { - paused = false; - if (readable) - process.nextTick(function() { - stream.emit('readable'); - }); - else - this.read(0); - this.emit('resume'); - }; - - // now make it start, just in case it hadn't already. - stream.emit('readable'); } // wrap an old-style stream as the async data source. @@ -770,6 +748,7 @@ Readable.prototype.wrap = function(stream) { var self = this; stream.on('end', function() { + debug('wrapped end'); if (state.decoder && !state.ended) { var chunk = state.decoder.end(); if (chunk && chunk.length) @@ -780,6 +759,7 @@ Readable.prototype.wrap = function(stream) { }); stream.on('data', function(chunk) { + debug('wrapped data'); if (state.decoder) chunk = state.decoder.write(chunk); if (!chunk || !state.objectMode && !chunk.length) @@ -812,6 +792,7 @@ Readable.prototype.wrap = function(stream) { // when we try to consume some more bytes, simply unpause the // underlying stream. self._read = function(n) { + debug('wrapped _read', n); if (paused) { paused = false; stream.resume(); diff --git a/test/simple/test-stdin-script-child.js b/test/simple/test-stdin-script-child.js new file mode 100644 index 0000000000..e940c3c3f3 --- /dev/null +++ b/test/simple/test-stdin-script-child.js @@ -0,0 +1,52 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +var spawn = require('child_process').spawn; +var child = spawn(process.execPath, [], { + env: { + NODE_DEBUG: process.argv[2] + } +}); +var wanted = child.pid + '\n'; +var found = ''; + +child.stdout.setEncoding('utf8'); +child.stdout.on('data', function(c) { + found += c; +}); + +child.stderr.setEncoding('utf8'); +child.stderr.on('data', function(c) { + console.error('> ' + c.trim().split(/\n/).join('\n> ')); +}); + +child.on('close', function(c) { + assert(!c); + assert.equal(found, wanted); + console.log('ok'); +}); + +setTimeout(function() { + child.stdin.end('console.log(process.pid)'); +}); diff --git a/test/simple/test-stream2-basic.js b/test/simple/test-stream2-basic.js index 512231b438..3814bf07b4 100644 --- a/test/simple/test-stream2-basic.js +++ b/test/simple/test-stream2-basic.js @@ -37,10 +37,8 @@ function TestReader(n) { util.inherits(TestReader, R); -TestReader.prototype.read = function(n) { - if (n === 0) return null; +TestReader.prototype._read = function(n) { var max = this._buffer.length - this._pos; - n = n || max; n = Math.max(n, 0); var toRead = Math.min(n, max); if (toRead === 0) { @@ -51,20 +49,21 @@ TestReader.prototype.read = function(n) { this._bufs -= 1; if (this._bufs <= 0) { // read them all! - if (!this.ended) { - this.emit('end'); - this.ended = true; - } + if (!this.ended) + this.push(null); } else { - this.emit('readable'); + // now we have more. + // kinda cheating by calling _read, but whatever, + // it's just fake anyway. + this._read(n); } }.bind(this), 10); - return null; + return; } var ret = this._buffer.slice(this._pos, this._pos + toRead); this._pos += toRead; - return ret; + this.push(ret); }; ///// @@ -135,21 +134,17 @@ test('a most basic test', function(t) { 'xxx', 'xxxx', 'xxxxx', - 'xxxxx', - 'xxxxxxxx', 'xxxxxxxxx', - 'xxx', + 'xxxxxxxxxx', 'xxxxxxxxxxxx', - 'xxxxxxxx', + 'xxxxxxxxxxxxx', 'xxxxxxxxxxxxxxx', - 'xxxxx', - 'xxxxxxxxxxxxxxxxxx', - 'xx', - 'xxxxxxxxxxxxxxxxxxxx', - 'xxxxxxxxxxxxxxxxxxxx', - 'xxxxxxxxxxxxxxxxxxxx', - 'xxxxxxxxxxxxxxxxxxxx', - 'xxxxxxxxxxxxxxxxxxxx' ]; + 'xxxxxxxxxxxxxxxxx', + 'xxxxxxxxxxxxxxxxxxx', + 'xxxxxxxxxxxxxxxxxxxxx', + 'xxxxxxxxxxxxxxxxxxxxxxx', + 'xxxxxxxxxxxxxxxxxxxxxxxxx', + 'xxxxxxxxxxxxxxxxxxxxx' ]; r.on('end', function() { t.same(reads, expect); @@ -342,6 +337,7 @@ test('back pressure respected', function (t) { var w1 = new R(); w1.write = function (chunk) { + console.error('w1.emit("close")'); assert.equal(chunk[0], "one"); w1.emit("close"); process.nextTick(function () { @@ -357,6 +353,7 @@ test('back pressure respected', function (t) { var w2 = new R(); w2.write = function (chunk) { + console.error('w2 write', chunk, counter); assert.equal(chunk[0], expected.shift()); assert.equal(counter, 0); @@ -368,6 +365,7 @@ test('back pressure respected', function (t) { setTimeout(function () { counter--; + console.error("w2 drain"); w2.emit("drain"); }, 10); @@ -377,6 +375,7 @@ test('back pressure respected', function (t) { var w3 = new R(); w3.write = function (chunk) { + console.error('w3 write', chunk, counter); assert.equal(chunk[0], expected.shift()); assert.equal(counter, 1); @@ -388,6 +387,7 @@ test('back pressure respected', function (t) { setTimeout(function () { counter--; + console.error("w3 drain"); w3.emit("drain"); }, 50); diff --git a/test/simple/test-stream2-compatibility.js b/test/simple/test-stream2-compatibility.js index 2b98c1fa8f..6cdd4e9948 100644 --- a/test/simple/test-stream2-compatibility.js +++ b/test/simple/test-stream2-compatibility.js @@ -47,4 +47,7 @@ TestReader.prototype._read = function(n) { }; var reader = new TestReader(); -assert.equal(ondataCalled, 1); +setImmediate(function() { + assert.equal(ondataCalled, 1); + console.log('ok'); +}); diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js index ba626cb1e7..3e6931dce0 100644 --- a/test/simple/test-stream2-objects.js +++ b/test/simple/test-stream2-objects.js @@ -237,9 +237,12 @@ test('high watermark _read', function(t) { assert.equal(v, '1'); var v2 = r.read(); + assert.equal(v2, '2'); + + var v3 = r.read(); + assert.equal(v3, '3'); assert.equal(calls, 1); - assert.equal(v2, '2'); t.end(); }); diff --git a/test/simple/test-stream2-readable-wrap-empty.js b/test/simple/test-stream2-readable-wrap-empty.js index c7042f1b12..2e5cf25c44 100644 --- a/test/simple/test-stream2-readable-wrap-empty.js +++ b/test/simple/test-stream2-readable-wrap-empty.js @@ -40,4 +40,4 @@ oldStream.emit('end'); process.on('exit', function(){ assert.ok(ended); -}); \ No newline at end of file +}); diff --git a/test/simple/test-stream2-readable-wrap.js b/test/simple/test-stream2-readable-wrap.js index 6b272be46b..90eea016c7 100644 --- a/test/simple/test-stream2-readable-wrap.js +++ b/test/simple/test-stream2-readable-wrap.js @@ -39,17 +39,14 @@ function runTest(highWaterMark, objectMode, produce) { ended = true; }); - var pauses = 0; - var resumes = 0; - old.pause = function() { - pauses++; + console.error('old.pause()'); old.emit('pause'); flowing = false; }; old.resume = function() { - resumes++; + console.error('old.resume()'); old.emit('resume'); flow(); }; @@ -63,8 +60,9 @@ function runTest(highWaterMark, objectMode, produce) { while (flowing && chunks-- > 0) { var item = produce(); expected.push(item); - console.log('emit', chunks); + console.log('old.emit', chunks, flowing); old.emit('data', item); + console.log('after emit', chunks, flowing); } if (chunks <= 0) { oldEnded = true; @@ -76,7 +74,7 @@ function runTest(highWaterMark, objectMode, produce) { var w = new Writable({ highWaterMark: highWaterMark * 2, objectMode: objectMode }); var written = []; w._write = function(chunk, encoding, cb) { - console.log(chunk); + console.log('_write', chunk); written.push(chunk); setTimeout(cb); }; @@ -94,11 +92,10 @@ function runTest(highWaterMark, objectMode, produce) { assert(ended); assert(oldEnded); assert.deepEqual(written, expected); - assert.equal(pauses, 10); - assert.equal(resumes, 9); } } +runTest(100, false, function(){ return new Buffer(100); }); runTest(10, false, function(){ return new Buffer('xxxxxxxxxx'); }); runTest(1, true, function(){ return { foo: 'bar' }; }); diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 9a7778b098..470df10e3e 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -310,8 +310,7 @@ test('complex transform', function(t) { pt.end(); t.end(); }); - t.equal(pt.read().toString(), 'abc'); - t.equal(pt.read().toString(), 'def'); + t.equal(pt.read().toString(), 'abcdef'); t.equal(pt.read(), null); }); }); diff --git a/test/simple/test-stream2-unpipe-leak.js b/test/simple/test-stream2-unpipe-leak.js index b9e8a960d8..99f8746c49 100644 --- a/test/simple/test-stream2-unpipe-leak.js +++ b/test/simple/test-stream2-unpipe-leak.js @@ -68,7 +68,8 @@ assert.equal(dest.listeners('finish').length, 0); console.error(src._readableState); process.on('exit', function() { - assert(src._readableState.length >= src._readableState.highWaterMark); src._readableState.buffer.length = 0; console.error(src._readableState); + assert(src._readableState.length >= src._readableState.highWaterMark); + console.log('ok'); }); diff --git a/test/simple/test-stream3-pause-then-read.js b/test/simple/test-stream3-pause-then-read.js new file mode 100644 index 0000000000..b91bde3f0a --- /dev/null +++ b/test/simple/test-stream3-pause-then-read.js @@ -0,0 +1,167 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +var stream = require('stream'); +var Readable = stream.Readable; +var Writable = stream.Writable; + +var totalChunks = 100; +var chunkSize = 99; +var expectTotalData = totalChunks * chunkSize; +var expectEndingData = expectTotalData; + +var r = new Readable({ highWaterMark: 1000 }); +var chunks = totalChunks; +r._read = function(n) { + if (!(chunks % 2)) + setImmediate(push); + else if (!(chunks % 3)) + process.nextTick(push); + else + push(); +}; + +var totalPushed = 0; +function push() { + var chunk = chunks-- > 0 ? new Buffer(chunkSize) : null; + if (chunk) { + totalPushed += chunk.length; + chunk.fill('x'); + } + r.push(chunk); +} + +read100(); + +// first we read 100 bytes +function read100() { + readn(100, onData); +} + +function readn(n, then) { + console.error('read %d', n); + expectEndingData -= n; + ;(function read() { + var c = r.read(n); + if (!c) + r.once('readable', read); + else { + assert.equal(c.length, n); + assert(!r._readableState.flowing); + then(); + } + })(); +} + +// then we listen to some data events +function onData() { + expectEndingData -= 100; + console.error('onData'); + var seen = 0; + r.on('data', function od(c) { + seen += c.length; + if (seen >= 100) { + // seen enough + r.removeListener('data', od); + r.pause(); + if (seen > 100) { + // oh no, seen too much! + // put the extra back. + var diff = seen - 100; + r.unshift(c.slice(c.length - diff)); + console.error('seen too much', seen, diff); + } + + // Nothing should be lost in between + setImmediate(pipeLittle); + } + }); +} + +// Just pipe 200 bytes, then unshift the extra and unpipe +function pipeLittle() { + expectEndingData -= 200; + console.error('pipe a little'); + var w = new Writable(); + var written = 0; + w.on('finish', function() { + assert.equal(written, 200); + setImmediate(read1234); + }); + w._write = function(chunk, encoding, cb) { + written += chunk.length; + if (written >= 200) { + r.unpipe(w); + w.end(); + cb(); + if (written > 200) { + var diff = written - 200; + written -= diff; + r.unshift(chunk.slice(chunk.length - diff)); + } + } else { + setImmediate(cb); + } + }; + r.pipe(w); +} + +// now read 1234 more bytes +function read1234() { + readn(1234, resumePause); +} + +function resumePause() { + console.error('resumePause'); + // don't read anything, just resume and re-pause a whole bunch + r.resume(); + r.pause(); + r.resume(); + r.pause(); + r.resume(); + r.pause(); + r.resume(); + r.pause(); + r.resume(); + r.pause(); + setImmediate(pipe); +} + + +function pipe() { + console.error('pipe the rest'); + var w = new Writable(); + var written = 0; + w._write = function(chunk, encoding, cb) { + written += chunk.length; + cb(); + }; + w.on('finish', function() { + console.error('written', written, totalPushed); + assert.equal(written, expectEndingData); + assert.equal(totalPushed, expectTotalData); + console.log('ok'); + }); + r.pipe(w); +}