net_uv: Implement end(), destroySoon()
This commit is contained in:
Родитель
e697cfb6fc
Коммит
dc0556c8cd
|
@ -5,6 +5,11 @@ var util = require('util');
|
|||
var assert = require('assert');
|
||||
var TCP = process.binding('tcp_wrap').TCP;
|
||||
|
||||
/* Bit flags for socket._flags */
|
||||
var FLAG_GOT_EOF = 1 << 0;
|
||||
var FLAG_SHUTDOWN = 1 << 1;
|
||||
var FLAG_DESTROY_SOON = 1 << 2;
|
||||
|
||||
|
||||
var debug;
|
||||
if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) {
|
||||
|
@ -40,7 +45,11 @@ function Socket(options) {
|
|||
this._handle.socket = this;
|
||||
this._handle.onread = onread;
|
||||
|
||||
this.allowHalfOpen = options ? (options.allowHalfOpen || false) : false;
|
||||
|
||||
this._writeRequests = [];
|
||||
|
||||
this._flags = 0;
|
||||
}
|
||||
util.inherits(Socket, stream.Stream);
|
||||
|
||||
|
@ -68,16 +77,12 @@ Object.defineProperty(Socket.prototype, 'readyState', {
|
|||
if (this._connecting) {
|
||||
return 'opening';
|
||||
} else if (this.readable && this.writable) {
|
||||
assert(typeof this.fd === 'number');
|
||||
return 'open';
|
||||
} else if (this.readable && !this.writable) {
|
||||
assert(typeof this.fd === 'number');
|
||||
return 'readOnly';
|
||||
} else if (!this.readable && this.writable) {
|
||||
assert(typeof this.fd === 'number');
|
||||
return 'writeOnly';
|
||||
} else {
|
||||
assert(typeof this.fd !== 'number');
|
||||
return 'closed';
|
||||
}
|
||||
}
|
||||
|
@ -101,13 +106,42 @@ Socket.prototype.resume = function() {
|
|||
};
|
||||
|
||||
|
||||
Socket.prototype.end = function() {
|
||||
throw new Error("implement me");
|
||||
Socket.prototype.end = function(data, encoding) {
|
||||
if (!this.writable) return;
|
||||
this.writable = false;
|
||||
|
||||
if (data) this.write(data, encoding);
|
||||
DTRACE_NET_STREAM_END(this);
|
||||
|
||||
if (this._flags & FLAG_GOT_EOF) {
|
||||
this.destroySoon();
|
||||
} else {
|
||||
this._flags |= FLAG_SHUTDOWN;
|
||||
var shutdownReq = this._handle.shutdown();
|
||||
shutdownReq.oncomplete = afterShutdown;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
function afterShutdown(status, handle, req) {
|
||||
var self = handle.socket;
|
||||
|
||||
assert.ok(self._flags & FLAG_SHUTDOWN);
|
||||
|
||||
if (self._flags & FLAG_GOT_EOF) {
|
||||
self.destroy();
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Socket.prototype.destroySoon = function() {
|
||||
throw new Error("implement me");
|
||||
this.writable = false;
|
||||
this._flags |= FLAG_DESTROY_SOON;
|
||||
|
||||
if (this._writeRequests.length == 0) {
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
@ -167,8 +201,12 @@ function onread(buffer, offset, length) {
|
|||
// EOF
|
||||
self.readable = false;
|
||||
|
||||
assert.ok(!(self._flags & FLAG_GOT_EOF));
|
||||
self._flags |= FLAG_GOT_EOF;
|
||||
|
||||
// We call destroy() before end(). 'close' not emitted until nextTick so
|
||||
// the 'end' event will come first as required.
|
||||
if (!self.writable) self.destroy();
|
||||
// Note: 'close' not emitted until nextTick.
|
||||
|
||||
if (!self.allowHalfOpen) self.end();
|
||||
if (self._events && self._events['end']) self.emit('end');
|
||||
|
@ -230,7 +268,15 @@ function afterWrite(status, handle, req, buffer) {
|
|||
var req_ = self._writeRequests.shift();
|
||||
assert.equal(req, req_);
|
||||
|
||||
if (self._writeRequests.length == 0) {
|
||||
self.emit('drain');
|
||||
}
|
||||
|
||||
if (req.cb) req.cb();
|
||||
|
||||
if (self._writeRequests.length == 0 && self._flags & FLAG_DESTROY_SOON) {
|
||||
self.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -257,12 +303,13 @@ Socket.prototype.connect = function(port, host) {
|
|||
// TODO retrun promise from Socket.prototype.connect which
|
||||
// wraps _connectReq.
|
||||
|
||||
assert.ok(!self._connectReq);
|
||||
assert.ok(!self._connecting);
|
||||
|
||||
self._connectReq = self._handle.connect(ip, port);
|
||||
var connectReq = self._handle.connect(ip, port);
|
||||
|
||||
if (self._connectReq) {
|
||||
self._connectReq.oncomplete = afterConnect;
|
||||
if (connectReq) {
|
||||
self._connecting = true;
|
||||
connectReq.oncomplete = afterConnect;
|
||||
} else {
|
||||
self.destroy(errnoException(errno, 'connect'));
|
||||
}
|
||||
|
@ -275,6 +322,9 @@ function afterConnect(status, handle, req) {
|
|||
var self = handle.socket;
|
||||
assert.equal(handle, self._handle);
|
||||
|
||||
assert.ok(self._connecting);
|
||||
self._connecting = false;
|
||||
|
||||
if (status == 0) {
|
||||
self.readable = self.writable = true;
|
||||
timers.active(self);
|
||||
|
@ -320,7 +370,7 @@ function Server(/* [ options, ] listener */) {
|
|||
|
||||
this.on('connection', listenerCallback);
|
||||
this.connections = 0;
|
||||
self.allowHalfOpen = options.allowHalfOpen || false;
|
||||
this.allowHalfOpen = options.allowHalfOpen || false;
|
||||
|
||||
|
||||
this._handle = new TCP();
|
||||
|
@ -376,7 +426,10 @@ function onconnection(clientHandle) {
|
|||
var handle = this;
|
||||
var self = handle.socket;
|
||||
|
||||
var socket = new Socket({ handle: clientHandle });
|
||||
var socket = new Socket({
|
||||
handle: clientHandle,
|
||||
allowHalfOpen: self.allowHalfOpen
|
||||
});
|
||||
socket.readable = socket.writable = true;
|
||||
socket.resume();
|
||||
|
||||
|
@ -387,3 +440,7 @@ function onconnection(clientHandle) {
|
|||
self.emit('connection', socket);
|
||||
}
|
||||
|
||||
|
||||
Server.prototype.close = function() {
|
||||
this._handle.close();
|
||||
};
|
||||
|
|
Загрузка…
Ссылка в новой задаче