From 24aded078fd6838d2f21934e57c7cc8dfd7303d1 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 17 Dec 2010 13:56:47 -0800 Subject: [PATCH] Add optional filters to stream.pipe() --- doc/api/streams.markdown | 44 +++++++++++++++++++++++++++++++++++++++- lib/stream.js | 43 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/doc/api/streams.markdown b/doc/api/streams.markdown index 6392eec7bb..88ae04f5c0 100644 --- a/doc/api/streams.markdown +++ b/doc/api/streams.markdown @@ -65,7 +65,7 @@ Resumes the incoming `'data'` events after a `pause()`. Closes the underlying file descriptor. Stream will not emit any more events. -### stream.pipe(destination, [options]) +### stream.pipe(destination, [options], [filter]) This is a `Stream.prototype` method available on all `Stream`s. @@ -92,6 +92,48 @@ NOTE: If the source stream does not support `pause()` and `resume()`, this funct adds simple definitions which simply emit `'pause'` and `'resume'` events on the source stream. + +The `filter` argument is an optional callback which can be used to filter all +data passing through the pipe. This makes it easy to do arbitrary transforms +(like gzip) while still maintaining the proper throttling. `filter` gets +three arguments: a buffer, a write function, and a done function. Here is an +example of a chat which uses a `filter` to append each message with the +address of the sender. + + var net = require('net'); + var people = []; + + function address(socket) { + return '<' + socket.remoteAddress + ':' + socket.remotePort + '> '; + } + + net.Server(function (socket) { + socket.write("hello!\r\n"); + + people.forEach(function (p) { + socket.pipe(p, { end: false }, function (d, write, done) { + write(address(socket)); + write(d); + done(); + }); + + p.pipe(socket, { end: false }, function (d, write, done) { + write(address(p)); + write(d); + done(); + }); + }); + + people.push(socket); + + socket.on('end', function () { + people.splice(people.indexOf(socket), 1); + }); + }).listen(8000); + + + + ## Writable Stream A `Writable Stream` has the following methods, members, and events. diff --git a/lib/stream.js b/lib/stream.js index 98f0c37fcb..e92c1bc5db 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -7,16 +7,55 @@ function Stream() { util.inherits(Stream, events.EventEmitter); exports.Stream = Stream; -Stream.prototype.pipe = function(dest, options) { +Stream.prototype.pipe = function(dest /* options, filter */) { var source = this; + // parse arguments + var options, filter; + if (typeof arguments[1] == 'object') { + options = arguments[1]; + filter = arguments[2]; + } else { + filter = arguments[1]; + } + function ondata(chunk) { + // FIXME shouldn't need to test writable - this is working around bug. + // .writable should not change before a 'end' event is fired. if (dest.writable) { if (false === dest.write(chunk)) source.pause(); } } - source.on('data', ondata); + if (!filter) { + source.on('data', ondata); + } else { + // + // TODO: needs tests + // + var wait = false; + var waitQueue = []; + + function done () { + wait = false; + // Drain the waitQueue + if (dest.writable && waitQueue.length) { + wait = true; + filter(waitQueue.shift(), ondata, done); + } + } + + source.on('data', function (d) { + if (wait) { + waitQueue.push(d); + source.pause(); + } else { + wait = true; + filter(d, ondata, done); + } + }); + } + function ondrain() { if (source.readable) source.resume();