node/lib/net.js

450 строки
12 KiB
JavaScript
Исходник Обычный вид История

var debugLevel = 0;
2009-12-17 11:31:10 +03:00
if ('NODE_DEBUG' in process.ENV) debugLevel = 1;
function debug (x) {
if (debugLevel > 0) {
2009-12-17 11:31:10 +03:00
process.stdio.writeError(x + '\n');
}
}
2009-12-17 11:31:10 +03:00
var IOWatcher = process.IOWatcher;
2009-12-18 16:52:02 +03:00
var assert = process.assert;
2009-12-17 11:31:10 +03:00
var socket = process.socket;
var bind = process.bind;
var connect = process.connect;
var listen = process.listen;
var accept = process.accept;
var close = process.close;
var shutdown = process.shutdown;
var read = process.read;
var write = process.write;
var toRead = process.toRead;
var socketError = process.socketError;
2009-12-18 16:52:02 +03:00
var getsockname = process.getsockname;
var getaddrinfo = process.getaddrinfo;
var needsLookup = process.needsLookup;
2009-12-17 11:31:10 +03:00
var EINPROGRESS = process.EINPROGRESS;
var END_OF_FILE = 42;
2009-12-17 11:31:10 +03:00
function Socket (peerInfo) {
process.EventEmitter.call();
var self = this;
2009-12-16 10:35:09 +03:00
// Allocated on demand.
self.recvBuffer = null;
self.readWatcher = new IOWatcher()
self.readWatcher.callback = function () {
2009-12-16 10:35:09 +03:00
// If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one.
if (!self.recvBuffer ||
2009-12-16 15:50:28 +03:00
self.recvBuffer.length - self.recvBuffer.used < 128) {
2009-12-16 10:35:09 +03:00
self._allocateNewRecvBuf();
}
2009-12-17 11:31:10 +03:00
debug('recvBuffer.used ' + self.recvBuffer.used);
2009-12-16 10:35:09 +03:00
var bytesRead = read(self.fd,
self.recvBuffer,
2009-12-16 15:50:28 +03:00
self.recvBuffer.used,
self.recvBuffer.length - self.recvBuffer.used);
2009-12-17 11:31:10 +03:00
debug('bytesRead ' + bytesRead + '\n');
2009-12-16 10:35:09 +03:00
2009-12-16 15:50:28 +03:00
if (bytesRead == 0) {
2009-12-16 10:35:09 +03:00
self.readable = false;
self.readWatcher.stop();
2009-12-17 11:31:10 +03:00
self.emit('eof');
if (!self.writable) self.forceClose();
2009-12-16 10:35:09 +03:00
} else {
2009-12-16 15:50:28 +03:00
var slice = self.recvBuffer.slice(self.recvBuffer.used,
self.recvBuffer.used + bytesRead);
self.recvBuffer.used += bytesRead;
self.emit('data', slice);
2009-12-16 10:35:09 +03:00
}
};
2009-12-18 16:52:02 +03:00
self.readable = false;
2009-12-18 16:52:02 +03:00
self.sendQueue = []; // queue of buffers that need to be written to socket
// XXX use link list?
2009-12-18 16:52:02 +03:00
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
self._doFlush = function () {
assert(self.sendQueueSize > 0);
if (self.flush()) {
assert(self.sendQueueSize == 0);
self.emit("drain");
}
2009-12-17 11:31:10 +03:00
};
self.writeWatcher = new IOWatcher();
self.writeWatcher.callback = self._doFlush;
2009-12-17 11:31:10 +03:00
self.writable = false;
2009-12-17 11:31:10 +03:00
if (peerInfo) {
self.fd = peerInfo.fd;
self.remoteAddress = peerInfo.remoteAddress;
self.remotePort = peerInfo.remotePort;
self.readWatcher.set(self.fd, true, false);
self.readWatcher.start();
self.readable = true;
2009-12-18 16:52:02 +03:00
self.writeWatcher.set(self.fd, false, true);
2009-12-17 11:31:10 +03:00
self.writable = true;
}
};
process.inherits(Socket, process.EventEmitter);
exports.Socket = Socket;
2009-12-17 11:31:10 +03:00
2009-12-18 16:52:02 +03:00
exports.createConnection = function (port, host) {
var s = new Socket();
2009-12-18 16:52:02 +03:00
s.connect(port, host);
return s;
};
Socket.prototype._allocateNewRecvBuf = function () {
2009-12-16 10:35:09 +03:00
var self = this;
var newBufferSize = 1024; // TODO make this adjustable from user API
if (toRead) {
// Note: only Linux supports toRead().
// Is the extra system call even worth it?
var bytesToRead = toRead(self.fd);
if (bytesToRead > 1024) {
newBufferSize = 4*1024;
} else if (bytesToRead == 0) {
// Probably getting an EOF - so let's not allocate so much.
if (self.recvBuffer &&
2009-12-16 15:50:28 +03:00
self.recvBuffer.length - self.recvBuffer.used > 0) {
2009-12-16 10:35:09 +03:00
return; // just recv the eof on the old buf.
}
newBufferSize = 128;
}
}
self.recvBuffer = new process.Buffer(newBufferSize);
2009-12-16 15:50:28 +03:00
self.recvBuffer.used = 0;
};
2009-12-17 11:31:10 +03:00
Socket.prototype._allocateSendBuffer = function () {
2009-12-16 15:50:28 +03:00
var b = new process.Buffer(1024);
b.used = 0;
b.sent = 0;
this.sendQueue.push(b);
return b;
};
2009-12-17 11:31:10 +03:00
Socket.prototype._sendString = function (data, encoding) {
2009-12-16 15:50:28 +03:00
var self = this;
if (!self.writable) throw new Error('Socket is not writable');
2009-12-17 11:31:10 +03:00
var buffer;
if (self.sendQueue.length == 0) {
buffer = self._allocateSendBuffer();
} else {
// walk through the sendQueue, find the buffer with free space
for (var i = 0; i < self.sendQueue.length; i++) {
if (self.sendQueue[i].used == 0) {
buffer = self.sendQueue[i];
break;
2009-12-16 15:50:28 +03:00
}
}
2009-12-17 11:31:10 +03:00
// if we didn't find one, take the last
if (!buffer) {
buffer = self._sendQueueLast();
2009-12-17 11:31:10 +03:00
// if last buffer is used up
if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
}
}
2009-12-16 15:50:28 +03:00
2009-12-17 11:31:10 +03:00
encoding = encoding || 'ascii'; // default to ascii since it's faster
2009-12-16 15:50:28 +03:00
2009-12-17 11:31:10 +03:00
var charsWritten;
2009-12-18 16:52:02 +03:00
var bytesWritten;
2009-12-16 15:50:28 +03:00
2009-12-17 11:31:10 +03:00
if (encoding.toLowerCase() == 'utf8') {
charsWritten = buffer.utf8Write(data,
buffer.used,
buffer.length - buffer.used);
2009-12-18 16:52:02 +03:00
bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten));
2009-12-17 11:31:10 +03:00
} else {
// ascii
charsWritten = buffer.asciiWrite(data,
buffer.used,
buffer.length - buffer.used);
2009-12-18 16:52:02 +03:00
bytesWritten = charsWritten;
2009-12-17 11:31:10 +03:00
}
2009-12-18 16:52:02 +03:00
buffer.used += bytesWritten;
self.sendQueueSize += bytesWritten;
2009-12-16 15:50:28 +03:00
2009-12-18 16:52:02 +03:00
debug('charsWritten ' + charsWritten);
debug('buffer.used ' + buffer.used);
2009-12-16 15:50:28 +03:00
2009-12-17 11:31:10 +03:00
// If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) {
debug('recursive send');
self._sendString(data.slice(charsWritten), encoding);
}
};
Socket.prototype._sendQueueLast = function () {
return this.sendQueue.length > 0 ? this.sendQueue[this.sendQueue.length-1]
: null;
};
2009-12-18 16:52:02 +03:00
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
Socket.prototype.send = function (data, encoding) {
2009-12-17 11:31:10 +03:00
var self = this;
if (!self.writable) throw new Error('Socket is not writable');
if (self._sendQueueLast == END_OF_FILE) {
throw new Error('socket.close() called already; cannot write.');
}
2009-12-17 11:31:10 +03:00
if (typeof(data) == 'string') {
self._sendString(data, encoding);
2009-12-16 15:50:28 +03:00
} else {
// data is a process.Buffer
// walk through the sendQueue, find the first empty buffer
var inserted = false;
data.sent = 0;
data.used = data.length;
for (var i = 0; i < self.sendQueue.length; i++) {
if (self.sendQueue[i].used == 0) {
// if found, insert the data there
self.sendQueue.splice(i, 0, data);
inserted = true;
break;
}
}
if (!inserted) self.sendQueue.push(data);
2009-12-18 16:52:02 +03:00
self.sendQueueSize += data.used;
2009-12-16 15:50:28 +03:00
}
2009-12-18 16:52:02 +03:00
return this.flush();
2009-12-16 15:50:28 +03:00
};
2009-12-17 11:31:10 +03:00
2009-12-18 16:52:02 +03:00
// Flushes the write buffer out. Emits "drain" if the buffer is empty.
Socket.prototype.flush = function () {
2009-12-16 15:50:28 +03:00
var self = this;
2009-12-18 16:52:02 +03:00
2009-12-16 15:50:28 +03:00
var bytesWritten;
while (self.sendQueue.length > 0) {
if (!self.writable) throw new Error('Socket is not writable');
2009-12-16 15:50:28 +03:00
var b = self.sendQueue[0];
if (b == END_OF_FILE) {
self._shutdown();
break;
}
2009-12-16 15:50:28 +03:00
if (b.sent == b.used) {
// this can be improved - save the buffer for later?
self.sendQueue.shift()
continue;
}
bytesWritten = write(self.fd,
b,
b.sent,
b.used - b.sent);
if (bytesWritten === null) {
2009-12-18 16:52:02 +03:00
// could not flush everything
self.writeWatcher.start();
assert(self.sendQueueSize > 0);
2009-12-16 15:50:28 +03:00
return false;
}
b.sent += bytesWritten;
2009-12-18 16:52:02 +03:00
self.sendQueueSize -= bytesWritten;
2009-12-17 11:31:10 +03:00
debug('bytes sent: ' + b.sent);
2009-12-16 15:50:28 +03:00
}
2009-12-18 16:52:02 +03:00
self.writeWatcher.stop();
2009-12-16 15:50:28 +03:00
return true;
2009-12-16 10:35:09 +03:00
};
// var stream = new Socket();
2009-12-17 11:31:10 +03:00
// stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
// stream.connect('/tmp/socket') - UNIX connect to socket specified by path
Socket.prototype.connect = function () {
2009-12-17 11:31:10 +03:00
var self = this;
if (self.fd) throw new Error('Socket already opened');
2009-12-17 11:31:10 +03:00
if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
2009-12-28 19:01:49 +03:00
self.fd = socket('unix');
self.type = 'unix';
2009-12-17 11:31:10 +03:00
// TODO check if sockfile exists?
} else {
2009-12-28 19:01:49 +03:00
self.fd = socket('tcp');
self.type = 'tcp';
2009-12-17 11:31:10 +03:00
// TODO dns resolution on arguments[1]
}
try {
connect(self.fd, arguments[0], arguments[1]);
} catch (e) {
close(self.fd);
throw e;
}
// Don't start the read watcher until connection is established
self.readWatcher.set(self.fd, true, false);
// How to connect on POSIX: Wait for fd to become writable, then call
// socketError() if there isn't an error, we're connected. AFAIK this a
// platform independent way determining when a non-blocking connection
// is established, but I have only seen it documented in the Linux
// Manual Page connect(2) under the error code EINPROGRESS.
self.writeWatcher.set(self.fd, false, true);
self.writeWatcher.start();
self.writeWatcher.callback = function () {
var errno = socketError(self.fd);
if (errno == 0) {
// connection established
self.readWatcher.start();
self.readable = true;
self.writable = true;
2009-12-18 16:52:02 +03:00
self.writeWatcher.callback = self._doFlush;
self.emit('connect');
2009-12-17 11:31:10 +03:00
} else if (errno != EINPROGRESS) {
var e = new Error('connection error');
e.errno = errno;
2009-12-18 16:52:02 +03:00
self.forceClose(e);
2009-12-17 11:31:10 +03:00
}
};
};
Socket.prototype.forceClose = function (exception) {
2009-12-17 11:31:10 +03:00
if (this.fd) {
this.readable = false;
this.writable = false;
this.writeWatcher.stop();
this.readWatcher.stop();
close(this.fd);
debug('close socket ' + this.fd);
2009-12-17 11:31:10 +03:00
this.fd = null;
this.emit('close', exception);
2009-12-17 11:31:10 +03:00
}
};
2009-12-17 11:31:10 +03:00
Socket.prototype._shutdown = function () {
2009-12-18 16:52:02 +03:00
if (this.writable) {
2009-12-17 11:31:10 +03:00
this.writable = false;
shutdown(this.fd, 'write');
2009-12-18 16:52:02 +03:00
}
};
Socket.prototype.close = function () {
if (this.writable) {
if (this._sendQueueLast() != END_OF_FILE) {
this.sendQueue.push(END_OF_FILE);
this.flush();
2009-12-18 16:52:02 +03:00
}
}
2009-12-17 11:31:10 +03:00
};
function Server (listener) {
var self = this;
if (listener) {
2009-12-17 11:31:10 +03:00
self.addListener('connection', listener);
}
self.watcher = new IOWatcher();
self.watcher.callback = function (readable, writeable) {
while (self.fd) {
var peerInfo = accept(self.fd);
2009-12-17 11:31:10 +03:00
debug('accept: ' + JSON.stringify(peerInfo));
if (!peerInfo) return;
var peer = new Socket(peerInfo);
2009-12-17 11:31:10 +03:00
self.emit('connection', peer);
}
};
}
process.inherits(Server, process.EventEmitter);
2009-12-17 11:31:10 +03:00
exports.Server = Server;
2009-12-18 16:52:02 +03:00
exports.createServer = function (listener) {
return new Server(listener);
};
2009-12-28 19:01:49 +03:00
// Listen on a UNIX socket
// server.listen("/tmp/socket");
//
// Listen on port 8000, accept connections from INADDR_ANY.
// server.listen(8000);
//
// Listen on port 8000, accept connections to "192.168.1.2"
// server.listen(8000, "192.168.1.2");
2009-12-16 10:35:09 +03:00
Server.prototype.listen = function () {
var self = this;
2009-12-17 11:31:10 +03:00
if (self.fd) throw new Error('Server already opened');
2009-12-17 11:31:10 +03:00
if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
2009-12-16 10:35:09 +03:00
// the first argument specifies a path
2009-12-28 19:01:49 +03:00
self.fd = socket('unix');
self.type = 'unix';
2009-12-16 10:35:09 +03:00
// TODO unlink sockfile if exists?
// if (lstat(SOCKFILE, &tstat) == 0) {
// assert(S_ISSOCK(tstat.st_mode));
// unlink(SOCKFILE);
// }
bind(self.fd, arguments[0]);
2009-12-18 16:52:02 +03:00
} else if (arguments.length == 0) {
2009-12-28 19:01:49 +03:00
self.fd = socket('tcp');
self.type = 'tcp';
2009-12-18 16:52:02 +03:00
// Don't bind(). OS will assign a port with INADDR_ANY. The port will be
// passed to the 'listening' event.
2009-12-16 10:35:09 +03:00
} else {
// the first argument is the port, the second an IP
2009-12-28 19:01:49 +03:00
self.fd = socket('tcp');
self.type = 'tcp';
2009-12-18 16:52:02 +03:00
if (needsLookup(arguments[1])) {
getaddrinfo(arguments[1], function (ip) {
});
}
2009-12-16 10:35:09 +03:00
// TODO dns resolution on arguments[1]
bind(self.fd, arguments[0], arguments[1]);
}
2009-12-18 16:52:02 +03:00
listen(self.fd, 128);
2009-12-17 11:31:10 +03:00
self.emit("listening");
self.watcher.set(self.fd, true, false);
self.watcher.start();
};
2009-12-17 11:31:10 +03:00
2009-12-18 16:52:02 +03:00
Server.prototype.sockName = function () {
return getsockname(self.fd);
};
Server.prototype.close = function () {
2009-12-18 16:52:02 +03:00
if (!this.fd) throw new Error('Not running');
this.watcher.stop();
close(this.fd);
this.fd = null;
this.emit("close");
};