Refactor callbacks in net.Stream for fewer closures

This commit is contained in:
Ryan Dahl 2010-10-27 15:29:08 -07:00
Родитель 38dde9684f
Коммит 67652ddf27
1 изменённых файлов: 134 добавлений и 104 удалений

Просмотреть файл

@ -92,16 +92,6 @@ function allocEmptyBuffer () {
emptyBuffer.length = 0;
}
function _doFlush () {
var socket = this.socket;
// Stream becomes writable on connect() but don't flush if there's
// nothing actually to write
if (socket.flush()) {
if (socket._events && socket._events['drain']) socket.emit("drain");
if (socket.ondrain) socket.ondrain(); // Optimization
}
}
function setImplmentationMethods (self) {
function noData(buf, off, len) {
return !buf ||
@ -225,7 +215,7 @@ function setImplmentationMethods (self) {
if (self.secureStream.clearPending()) {
process.nextTick(function () {
if (self._readWatcher) self._readWatcher.callback();
if (self.readable) self._onReadable(true);
});
}
@ -281,74 +271,28 @@ function setImplmentationMethods (self) {
}
};
function onReadable (readable, writeable) {
assert(this.socket);
var socket = this.socket;
socket._onReadable();
}
function onWritable (readable, writeable) {
assert(this.socket);
var socket = this.socket;
if (socket._connecting) {
socket._onConnect();
} else {
socket._onWritable();
}
}
function initStream (self) {
self._readWatcher = ioWatchers.alloc();
self._readWatcher.callback = function () {
// If this is the first recv (pool doesn't exist) or we've used up
// most of the pool, allocate a new one.
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
// users might have refernces to slices on it.
pool = null;
allocNewPool();
}
//debug('pool.used ' + pool.used);
var bytesRead;
try {
bytesRead = self._readImpl(pool,
pool.used,
pool.length - pool.used,
(arguments.length > 0));
} catch (e) {
self.destroy(e);
return;
}
// Note that some _readImpl() implementations return -1 bytes
// read as an indication not to do any processing on the result
// (but not an error).
if (bytesRead === 0) {
self.readable = false;
self._readWatcher.stop();
if (!self.writable) self.destroy();
// Note: 'close' not emitted until nextTick.
if (!self.allowHalfOpen) self.end();
if (self._events && self._events['end']) self.emit('end');
if (self.onend) self.onend();
} else if (bytesRead > 0) {
require('timers').active(self);
var start = pool.used;
var end = pool.used + bytesRead;
pool.used += bytesRead;
if (self._decoder) {
// emit String
var string = self._decoder.write(pool.slice(start, end));
if (string.length) self.emit('data', string);
} else {
// emit buffer
if (self._events && self._events['data']) {
// emit a slice
self.emit('data', pool.slice(start, end));
}
}
// Optimization: emit the original buffer with end points
if (self.ondata) self.ondata(pool, start, end);
} else if (bytesRead == -2) {
// Temporary fix - need SSL refactor.
// -2 originates from SecureStream::ReadExtract
self.destroy(new Error('openssl read error'));
return false;
}
};
self._readWatcher.socket = self;
self._readWatcher.callback = onReadable;
self.readable = false;
// Queue of buffers and string that need to be written to socket.
@ -358,7 +302,7 @@ function initStream (self) {
self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
self._writeWatcher.callback = _doFlush;
self._writeWatcher.callback = onWritable;
self.writable = false;
}
@ -707,6 +651,7 @@ function doConnect (socket, port, host) {
// Don't start the read watcher until connection is established
socket._readWatcher.set(socket.fd, true, false);
socket._readWatcher.socket = socket;
// How to connect on POSIX: Wait for fd to become writable, then call
// socketError() if there isn't an error, we're connected. AFAIK this a
@ -715,37 +660,120 @@ function doConnect (socket, port, host) {
// Manual Page connect(2) under the error code EINPROGRESS.
socket._writeWatcher.set(socket.fd, false, true);
socket._writeWatcher.start();
socket._writeWatcher.callback = function () {
var errno = socketError(socket.fd);
if (errno == 0) {
// connection established
socket._connecting = false;
socket.resume();
socket.readable = socket.writable = true;
socket._writeWatcher.callback = _doFlush;
try {
socket.emit('connect');
} catch (e) {
socket.destroy(e);
return;
}
if (socket._writeQueue && socket._writeQueue.length) {
// Flush socket in case any writes are queued up while connecting.
// ugly
_doFlush.call(socket._writeWatcher);
}
} else if (errno != EINPROGRESS) {
socket.destroy(errnoException(errno, 'connect'));
}
};
socket._writeWatcher.socket = socket;
socket._writeWatcher.callback = onWritable;
}
function toPort (x) { return (x = Number(x)) >= 0 ? x : false; }
Stream.prototype._onConnect = function () {
var errno = socketError(this.fd);
if (errno == 0) {
// connection established
this._connecting = false;
this.resume();
this.readable = this.writable = true;
try {
this.emit('connect');
} catch (e) {
this.destroy(e);
return;
}
if (this._writeQueue && this._writeQueue.length) {
// Flush this in case any writes are queued up while connecting.
this._onWritable();
}
} else if (errno != EINPROGRESS) {
this.destroy(errnoException(errno, 'connect'));
}
};
Stream.prototype._onWritable = function () {
// Stream becomes writable on connect() but don't flush if there's
// nothing actually to write
if (this.flush()) {
if (this._events && this._events['drain']) this.emit("drain");
if (this.ondrain) this.ondrain(); // Optimization
}
};
Stream.prototype._onReadable = function () {
var self = this;
// If this is the first recv (pool doesn't exist) or we've used up
// most of the pool, allocate a new one.
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
// users might have refernces to slices on it.
pool = null;
allocNewPool();
}
//debug('pool.used ' + pool.used);
var bytesRead;
try {
bytesRead = self._readImpl(pool,
pool.used,
pool.length - pool.used,
!arguments[0] /* stupid calledByIOWatcher shit */);
} catch (e) {
self.destroy(e);
return;
}
// Note that some _readImpl() implementations return -1 bytes
// read as an indication not to do any processing on the result
// (but not an error).
if (bytesRead === 0) {
self.readable = false;
self._readWatcher.stop();
if (!self.writable) self.destroy();
// Note: 'close' not emitted until nextTick.
if (!self.allowHalfOpen) self.end();
if (self._events && self._events['end']) self.emit('end');
if (self.onend) self.onend();
} else if (bytesRead > 0) {
require('timers').active(self);
var start = pool.used;
var end = pool.used + bytesRead;
pool.used += bytesRead;
if (self._decoder) {
// emit String
var string = self._decoder.write(pool.slice(start, end));
if (string.length) self.emit('data', string);
} else {
// emit buffer
if (self._events && self._events['data']) {
// emit a slice
self.emit('data', pool.slice(start, end));
}
}
// Optimization: emit the original buffer with end points
if (self.ondata) self.ondata(pool, start, end);
} else if (bytesRead == -2) {
// Temporary fix - need SSL refactor.
// -2 originates from SecureStream::ReadExtract
self.destroy(new Error('openssl read error'));
return false;
}
};
// var stream = new Stream();
// stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
@ -836,12 +864,14 @@ Stream.prototype.destroy = function (exception) {
if (this._writeWatcher) {
this._writeWatcher.stop();
this._writeWatcher.socket = null;
ioWatchers.free(this._writeWatcher);
this._writeWatcher = null;
}
if (this._readWatcher) {
this._readWatcher.stop();
this._readWatcher.socket = null;
ioWatchers.free(this._readWatcher);
this._readWatcher = null;
}