Add optional filters to stream.pipe()

This commit is contained in:
Ryan Dahl 2010-12-17 13:56:47 -08:00
Родитель a8f666ebb8
Коммит 24aded078f
2 изменённых файлов: 84 добавлений и 3 удалений

Просмотреть файл

@ -65,7 +65,7 @@ Resumes the incoming `'data'` events after a `pause()`.
Closes the underlying file descriptor. Stream will not emit any more events.
### stream.pipe(destination, [options])
### stream.pipe(destination, [options], [filter])
This is a `Stream.prototype` method available on all `Stream`s.
@ -92,6 +92,48 @@ NOTE: If the source stream does not support `pause()` and `resume()`, this funct
adds simple definitions which simply emit `'pause'` and `'resume'` events on
the source stream.
The `filter` argument is an optional callback which can be used to filter all
data passing through the pipe. This makes it easy to do arbitrary transforms
(like gzip) while still maintaining the proper throttling. `filter` gets
three arguments: a buffer, a write function, and a done function. Here is an
example of a chat which uses a `filter` to append each message with the
address of the sender.
var net = require('net');
var people = [];
function address(socket) {
return '<' + socket.remoteAddress + ':' + socket.remotePort + '> ';
}
net.Server(function (socket) {
socket.write("hello!\r\n");
people.forEach(function (p) {
socket.pipe(p, { end: false }, function (d, write, done) {
write(address(socket));
write(d);
done();
});
p.pipe(socket, { end: false }, function (d, write, done) {
write(address(p));
write(d);
done();
});
});
people.push(socket);
socket.on('end', function () {
people.splice(people.indexOf(socket), 1);
});
}).listen(8000);
## Writable Stream
A `Writable Stream` has the following methods, members, and events.

Просмотреть файл

@ -7,16 +7,55 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
Stream.prototype.pipe = function(dest, options) {
Stream.prototype.pipe = function(dest /* options, filter */) {
var source = this;
// parse arguments
var options, filter;
if (typeof arguments[1] == 'object') {
options = arguments[1];
filter = arguments[2];
} else {
filter = arguments[1];
}
function ondata(chunk) {
// FIXME shouldn't need to test writable - this is working around bug.
// .writable should not change before a 'end' event is fired.
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
}
}
if (!filter) {
source.on('data', ondata);
} else {
//
// TODO: needs tests
//
var wait = false;
var waitQueue = [];
function done () {
wait = false;
// Drain the waitQueue
if (dest.writable && waitQueue.length) {
wait = true;
filter(waitQueue.shift(), ondata, done);
}
}
source.on('data', function (d) {
if (wait) {
waitQueue.push(d);
source.pause();
} else {
wait = true;
filter(d, ondata, done);
}
});
}
function ondrain() {
if (source.readable) source.resume();