stream: Split Writable logic into small functions
1. Get rid of unnecessary 'finishing' flag 2. Dont check both ending and ended. Extraneous. Also: Remove extraneous 'finishing' flag, and don't check both 'ending' and 'ended', since checking just 'ending' is sufficient.
This commit is contained in:
Родитель
e4383c0170
Коммит
049903e333
|
@ -53,10 +53,8 @@ function WritableState(options, stream) {
|
|||
this.ending = false;
|
||||
// when end() has been called, and returned
|
||||
this.ended = false;
|
||||
// when 'finish' has emitted
|
||||
// when 'finish' is emitted
|
||||
this.finished = false;
|
||||
// when 'finish' is being emitted
|
||||
this.finishing = false;
|
||||
|
||||
// should we decode strings into buffers before passing to _write?
|
||||
// this is here so that some node-core streams can optimize string
|
||||
|
@ -116,183 +114,196 @@ Writable.prototype.pipe = function() {
|
|||
this.emit('error', new Error('Cannot pipe. Not readable.'));
|
||||
};
|
||||
|
||||
// Override this method or _write(chunk, cb)
|
||||
Writable.prototype.write = function(chunk, encoding, cb) {
|
||||
var state = this._writableState;
|
||||
|
||||
if (typeof encoding === 'function') {
|
||||
cb = encoding;
|
||||
encoding = null;
|
||||
}
|
||||
function writeAfterEnd(stream, state, cb) {
|
||||
var er = new Error('write after end');
|
||||
// TODO: defer error events consistently everywhere, not just the cb
|
||||
stream.emit('error', er);
|
||||
process.nextTick(function() {
|
||||
cb(er);
|
||||
});
|
||||
}
|
||||
|
||||
if (state.ended) {
|
||||
var self = this;
|
||||
var er = new Error('write after end');
|
||||
// TODO: defer error events consistently everywhere, not just the cb
|
||||
self.emit('error', er);
|
||||
if (typeof cb === 'function') {
|
||||
process.nextTick(function() {
|
||||
cb(er);
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// If we get something that is not a buffer, string, null, or undefined,
|
||||
// and we're not in objectMode, then that's an error.
|
||||
// Otherwise stream chunks are all considered to be of length=1, and the
|
||||
// watermarks determine how many objects to keep in the buffer, rather than
|
||||
// how many bytes or characters.
|
||||
// If we get something that is not a buffer, string, null, or undefined,
|
||||
// and we're not in objectMode, then that's an error.
|
||||
// Otherwise stream chunks are all considered to be of length=1, and the
|
||||
// watermarks determine how many objects to keep in the buffer, rather than
|
||||
// how many bytes or characters.
|
||||
function validChunk(stream, state, chunk, cb) {
|
||||
var valid = true;
|
||||
if (!Buffer.isBuffer(chunk) &&
|
||||
'string' !== typeof chunk &&
|
||||
chunk !== null &&
|
||||
chunk !== undefined &&
|
||||
!state.objectMode) {
|
||||
var er = new TypeError('Invalid non-string/buffer chunk');
|
||||
if (typeof cb === 'function')
|
||||
stream.emit('error', er);
|
||||
process.nextTick(function() {
|
||||
cb(er);
|
||||
this.emit('error', er);
|
||||
return;
|
||||
});
|
||||
valid = false;
|
||||
}
|
||||
return valid;
|
||||
}
|
||||
|
||||
var len;
|
||||
if (state.objectMode)
|
||||
len = 1;
|
||||
else {
|
||||
len = chunk.length;
|
||||
if (false === state.decodeStrings)
|
||||
chunk = [chunk, encoding || 'utf8'];
|
||||
else if (typeof chunk === 'string') {
|
||||
chunk = new Buffer(chunk, encoding);
|
||||
len = chunk.length;
|
||||
}
|
||||
function decodeChunk(state, chunk, encoding) {
|
||||
if (!state.objectMode &&
|
||||
state.decodeStrings !== false &&
|
||||
typeof chunk === 'string') {
|
||||
chunk = new Buffer(chunk, encoding);
|
||||
}
|
||||
return chunk;
|
||||
}
|
||||
|
||||
Writable.prototype.write = function(chunk, encoding, cb) {
|
||||
var state = this._writableState;
|
||||
var ret = false;
|
||||
|
||||
if (typeof encoding === 'function') {
|
||||
cb = encoding;
|
||||
encoding = null;
|
||||
}
|
||||
if (!encoding)
|
||||
encoding = 'utf8';
|
||||
|
||||
if (typeof cb !== 'function')
|
||||
cb = function() {};
|
||||
|
||||
if (state.ended)
|
||||
writeAfterEnd(this, state, cb);
|
||||
else if (validChunk(this, state, chunk, cb))
|
||||
ret = writeOrBuffer(this, state, chunk, encoding, cb);
|
||||
|
||||
return ret;
|
||||
};
|
||||
|
||||
// if we're already writing something, then just put this
|
||||
// in the queue, and wait our turn. Otherwise, call _write
|
||||
// If we return false, then we need a drain event, so set that flag.
|
||||
function writeOrBuffer(stream, state, chunk, encoding, cb) {
|
||||
chunk = decodeChunk(state, chunk, encoding);
|
||||
var len = state.objectMode ? 1 : chunk.length;
|
||||
|
||||
// XXX Remove. _write() should take an encoding.
|
||||
if (state.decodeStrings === false)
|
||||
chunk = [chunk, encoding];
|
||||
|
||||
state.length += len;
|
||||
|
||||
var ret = state.length < state.highWaterMark;
|
||||
if (ret === false)
|
||||
state.needDrain = true;
|
||||
state.needDrain = !ret;
|
||||
|
||||
// if we're already writing something, then just put this
|
||||
// in the queue, and wait our turn.
|
||||
if (state.writing) {
|
||||
state.buffer.push([chunk, cb]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
state.writing = true;
|
||||
state.sync = true;
|
||||
state.writelen = len;
|
||||
state.writecb = cb;
|
||||
this._write(chunk, state.onwrite);
|
||||
state.sync = false;
|
||||
if (state.writing)
|
||||
state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb]
|
||||
else
|
||||
doWrite(stream, state, len, chunk, encoding, cb);
|
||||
|
||||
return ret;
|
||||
};
|
||||
}
|
||||
|
||||
function doWrite(stream, state, len, chunk, encoding, cb) {
|
||||
state.writelen = len;
|
||||
state.writecb = cb;
|
||||
state.writing = true;
|
||||
state.sync = true;
|
||||
// XXX stream._write(chunk, encoding, state.onwrite)
|
||||
stream._write(chunk, state.onwrite);
|
||||
state.sync = false;
|
||||
}
|
||||
|
||||
function onwriteError(stream, state, sync, er, cb) {
|
||||
if (sync)
|
||||
process.nextTick(function() {
|
||||
cb(er);
|
||||
});
|
||||
else
|
||||
cb(er);
|
||||
|
||||
stream.emit('error', er);
|
||||
}
|
||||
|
||||
function onwriteStateUpdate(state) {
|
||||
state.writing = false;
|
||||
state.writecb = null;
|
||||
state.length -= state.writelen;
|
||||
state.writelen = 0;
|
||||
}
|
||||
|
||||
function onwrite(stream, er) {
|
||||
var state = stream._writableState;
|
||||
var sync = state.sync;
|
||||
var cb = state.writecb;
|
||||
var len = state.writelen;
|
||||
|
||||
state.writing = false;
|
||||
state.writelen = null;
|
||||
state.writecb = null;
|
||||
onwriteStateUpdate(state);
|
||||
|
||||
if (er) {
|
||||
if (cb) {
|
||||
// If _write(chunk,cb) calls cb() in this tick, we still defer
|
||||
// the *user's* write callback to the next tick.
|
||||
// Never present an external API that is *sometimes* async!
|
||||
if (sync)
|
||||
process.nextTick(function() {
|
||||
cb(er);
|
||||
});
|
||||
else
|
||||
cb(er);
|
||||
if (er)
|
||||
onwriteError(stream, state, sync, er, cb);
|
||||
else {
|
||||
if (!finishMaybe(stream, state)) {
|
||||
if (state.length === 0 && state.needDrain)
|
||||
onwriteDrain(stream, state);
|
||||
|
||||
if (!state.bufferProcessing && state.buffer.length)
|
||||
clearBuffer(stream, state);
|
||||
}
|
||||
|
||||
// backwards compatibility. still emit if there was a cb.
|
||||
stream.emit('error', er);
|
||||
return;
|
||||
}
|
||||
state.length -= len;
|
||||
|
||||
if (cb) {
|
||||
// Don't call the cb until the next tick if we're in sync mode.
|
||||
if (sync)
|
||||
process.nextTick(cb);
|
||||
else
|
||||
cb();
|
||||
}
|
||||
}
|
||||
|
||||
if (state.length === 0 && (state.ended || state.ending) &&
|
||||
!state.finished && !state.finishing) {
|
||||
// emit 'finish' at the very end.
|
||||
state.finishing = true;
|
||||
stream.emit('finish');
|
||||
state.finished = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (state.length === 0 && state.needDrain) {
|
||||
// Must force callback to be called on nextTick, so that we don't
|
||||
// emit 'drain' before the write() consumer gets the 'false' return
|
||||
// value, and has a chance to attach a 'drain' listener.
|
||||
process.nextTick(function() {
|
||||
if (!state.needDrain)
|
||||
return;
|
||||
// Must force callback to be called on nextTick, so that we don't
|
||||
// emit 'drain' before the write() consumer gets the 'false' return
|
||||
// value, and has a chance to attach a 'drain' listener.
|
||||
function onwriteDrain(stream, state) {
|
||||
process.nextTick(function() {
|
||||
if (state.needDrain) {
|
||||
state.needDrain = false;
|
||||
stream.emit('drain');
|
||||
});
|
||||
}
|
||||
|
||||
// if there's something in the buffer waiting, then process it
|
||||
// It would be nice if there were TCO in JS, and we could just
|
||||
// shift the top off the buffer and _write that, but that approach
|
||||
// causes RangeErrors when you have a very large number of very
|
||||
// small writes, and is not very efficient otherwise.
|
||||
if (!state.bufferProcessing && state.buffer.length) {
|
||||
state.bufferProcessing = true;
|
||||
|
||||
for (var c = 0; c < state.buffer.length; c++) {
|
||||
var chunkCb = state.buffer[c];
|
||||
var chunk = chunkCb[0];
|
||||
cb = chunkCb[1];
|
||||
|
||||
if (state.objectMode)
|
||||
len = 1;
|
||||
else if (false === state.decodeStrings)
|
||||
len = chunk[0].length;
|
||||
else
|
||||
len = chunk.length;
|
||||
|
||||
state.writelen = len;
|
||||
state.writecb = cb;
|
||||
state.writechunk = chunk;
|
||||
state.writing = true;
|
||||
state.sync = true;
|
||||
stream._write(chunk, state.onwrite);
|
||||
state.sync = false;
|
||||
|
||||
// if we didn't call the onwrite immediately, then
|
||||
// it means that we need to wait until it does.
|
||||
// also, that means that the chunk and cb are currently
|
||||
// being processed, so move the buffer counter past them.
|
||||
if (state.writing) {
|
||||
c++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
state.bufferProcessing = false;
|
||||
if (c < state.buffer.length)
|
||||
state.buffer = state.buffer.slice(c);
|
||||
else
|
||||
state.buffer.length = 0;
|
||||
|
||||
// if there's something in the buffer waiting, then process it
|
||||
function clearBuffer(stream, state) {
|
||||
state.bufferProcessing = true;
|
||||
|
||||
// XXX buffer entry should be [chunk, encoding, cb]
|
||||
for (var c = 0; c < state.buffer.length; c++) {
|
||||
var chunkCb = state.buffer[c];
|
||||
var chunk = chunkCb[0];
|
||||
var cb = chunkCb[1];
|
||||
var encoding = '';
|
||||
var len;
|
||||
|
||||
if (state.objectMode)
|
||||
len = 1;
|
||||
else if (false === state.decodeStrings) {
|
||||
len = chunk[0].length;
|
||||
encoding = chunk[1];
|
||||
} else
|
||||
len = chunk.length;
|
||||
|
||||
doWrite(stream, state, len, chunk, encoding, cb);
|
||||
|
||||
// if we didn't call the onwrite immediately, then
|
||||
// it means that we need to wait until it does.
|
||||
// also, that means that the chunk and cb are currently
|
||||
// being processed, so move the buffer counter past them.
|
||||
if (state.writing) {
|
||||
c++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
state.bufferProcessing = false;
|
||||
if (c < state.buffer.length)
|
||||
state.buffer = state.buffer.slice(c);
|
||||
else
|
||||
state.buffer.length = 0;
|
||||
}
|
||||
|
||||
Writable.prototype._write = function(chunk, cb) {
|
||||
|
@ -317,19 +328,23 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
|||
this.write(chunk, encoding);
|
||||
|
||||
// ignore unnecessary end() calls.
|
||||
if (!state.ending && !state.ended && !state.finished)
|
||||
if (!state.ending && !state.finished)
|
||||
endWritable(this, state, cb);
|
||||
};
|
||||
|
||||
function finishMaybe(stream, state) {
|
||||
if (state.ending && state.length === 0 && !state.finished) {
|
||||
state.finished = true;
|
||||
stream.emit('finish');
|
||||
}
|
||||
return state.finished;
|
||||
}
|
||||
|
||||
function endWritable(stream, state, cb) {
|
||||
state.ending = true;
|
||||
if (state.length === 0 && !state.finishing) {
|
||||
state.finishing = true;
|
||||
stream.emit('finish');
|
||||
state.finished = true;
|
||||
}
|
||||
finishMaybe(stream, state);
|
||||
if (cb) {
|
||||
if (state.finished || state.finishing)
|
||||
if (state.finished)
|
||||
process.nextTick(cb);
|
||||
else
|
||||
stream.once('finish', cb);
|
||||
|
|
|
@ -391,7 +391,7 @@ Socket.prototype.destroySoon = function() {
|
|||
if (this.writable)
|
||||
this.end();
|
||||
|
||||
if (this._writableState.finishing || this._writableState.finished)
|
||||
if (this._writableState.finished)
|
||||
this.destroy();
|
||||
else
|
||||
this.once('finish', this.destroy);
|
||||
|
@ -748,7 +748,6 @@ Socket.prototype.connect = function(options, cb) {
|
|||
this._writableState.ended = false;
|
||||
this._writableState.ending = false;
|
||||
this._writableState.finished = false;
|
||||
this._writableState.finishing = false;
|
||||
this.destroyed = false;
|
||||
this._handle = null;
|
||||
}
|
||||
|
|
|
@ -573,7 +573,7 @@ CryptoStream.prototype.destroySoon = function(err) {
|
|||
if (this.writable)
|
||||
this.end();
|
||||
|
||||
if (this._writableState.finishing || this._writableState.finished)
|
||||
if (this._writableState.finished)
|
||||
this.destroy();
|
||||
else
|
||||
this.once('finish', this.destroy);
|
||||
|
|
Загрузка…
Ссылка в новой задаче