stream: add bytesRead property for readable

Add a bytesRead property for readable is
useful in some use cases.

When user want know how many bytes read of
readable, need to caculate it in userland.
If encoding is specificed, get the value is
very slowly.

PR-URL: https://github.com/nodejs/node/pull/4372
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Jackson Tian 2015-12-18 11:29:08 +08:00 коммит произвёл James M Snell
Родитель ebd9addcd1
Коммит bfb2cd0bfd
4 изменённых файлов: 127 добавлений и 5 удалений

Просмотреть файл

@ -250,6 +250,11 @@ readable: null
end
```
#### readable.bytesRead
The amount of read bytes. If `objectMode` is `true`, the value is 0 always.
#### readable.isPaused()
* Return: `Boolean`

Просмотреть файл

@ -83,6 +83,8 @@ function Readable(options) {
this._readableState = new ReadableState(options, this);
this.bytesRead = 0;
// legacy
this.readable = true;
@ -135,6 +137,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var e = new Error('stream.unshift() after end event');
stream.emit('error', e);
} else {
stream.bytesRead += state.objectMode ? 0 : chunk.length;
if (state.decoder && !addToFront && !encoding)
chunk = state.decoder.write(chunk);

Просмотреть файл

@ -91,7 +91,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs;
// called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) {
self.destroyed = false;
self.bytesRead = 0;
self._bytesDispatched = 0;
self._sockname = null;
@ -515,10 +514,6 @@ function onread(nread, buffer) {
// will prevent this from being called again until _read() gets
// called again.
// if it's not enough data, we'll just call handle.readStart()
// again right away.
self.bytesRead += nread;
// Optimization: emit the original buffer with end points
var ret = self.push(buffer);

Просмотреть файл

@ -0,0 +1,119 @@
'use strict';
require('../common');
const assert = require('assert');
const Readable = require('stream').Readable;
const Duplex = require('stream').Duplex;
const Transform = require('stream').Transform;
(function() {
const readable = new Readable({
read: function(n) {
var i = this._index++;
if (i > this._max)
this.push(null);
else
this.push(new Buffer('a'));
}
});
readable._max = 1000;
readable._index = 1;
var total = 0;
readable.on('data', function(chunk) {
total += chunk.length;
});
readable.on('end', function() {
assert.equal(total, readable.bytesRead);
});
})();
(function() {
const readable = new Readable({
read: function(n) {
var i = this._index++;
if (i > this._max)
this.push(null);
else
this.push(new Buffer('a'));
}
});
readable._max = 1000;
readable._index = 1;
var total = 0;
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
total += Buffer.byteLength(chunk);
});
readable.on('end', function() {
assert.equal(total, readable.bytesRead);
});
})();
(function() {
const duplex = new Duplex({
read: function(n) {
var i = this._index++;
if (i > this._max)
this.push(null);
else
this.push(new Buffer('a'));
},
write: function(chunk, encoding, next) {
next();
}
});
duplex._max = 1000;
duplex._index = 1;
var total = 0;
duplex.setEncoding('utf8');
duplex.on('data', function(chunk) {
total += Buffer.byteLength(chunk);
});
duplex.on('end', function() {
assert.equal(total, duplex.bytesRead);
});
})();
(function() {
const readable = new Readable({
read: function(n) {
var i = this._index++;
if (i > this._max)
this.push(null);
else
this.push(new Buffer('{"key":"value"}'));
}
});
readable._max = 1000;
readable._index = 1;
const transform = new Transform({
readableObjectMode : true,
transform: function(chunk, encoding, next) {
next(null, JSON.parse(chunk));
},
flush: function(done) {
done();
}
});
var total = 0;
readable.on('data', function(chunk) {
total += chunk.length;
});
transform.on('end', function() {
assert.equal(0, transform.bytesRead);
assert.equal(total, readable.bytesRead);
});
readable.pipe(transform);
})();