Backport client 'upgrade' events

This commit is contained in:
Ryan Dahl 2011-01-20 16:25:24 -08:00
Родитель 4125822bed
Коммит d89454e5d4
2 изменённых файлов: 62 добавлений и 3 удалений

Просмотреть файл

@ -937,6 +937,12 @@ Agent.prototype.appendMessage = function(options) {
};
Agent.prototype._removeSocket = function(socket) {
var i = this.sockets.indexOf(socket);
if (i >= 0) this.sockets.splice(i, 1);
}
Agent.prototype._establishNewConnection = function() {
var self = this;
assert(this.sockets.length < this.maxSockets);
@ -944,6 +950,7 @@ Agent.prototype._establishNewConnection = function() {
// Grab a new "socket". Depending on the implementation of _getConnection
// this could either be a raw TCP socket or a TLS stream.
var socket = this._getConnection(this.host, this.port, function () {
self.emit('connect'); // mostly for the shim.
debug("Agent _getConnection callback");
self._cycle();
});
@ -985,8 +992,18 @@ Agent.prototype._establishNewConnection = function() {
// in the upgradeHead from the closing lines of the headers
var upgradeHead = d.slice(start + bytesParsed + 1, end);
// Make sure we don't try to send HTTP requests to it.
self._removeSocket(socket);
socket.on('end', function() {
self.emit('end');
});
// XXX free the parser?
if (self.listeners('upgrade').length) {
self.emit('upgrade', res, res.socket, upgradeHead);
// Emit 'upgrade' on the Agent.
self.emit('upgrade', res, socket, upgradeHead);
} else {
// Got upgrade header, but have no handler.
socket.destroy();
@ -995,14 +1012,14 @@ Agent.prototype._establishNewConnection = function() {
};
socket.onend = function() {
self.emit('end'); // mostly for the shim.
parser.finish();
socket.destroy();
};
// When the socket closes remove it from the list of available sockets.
socket.on('close', function() {
var i = self.sockets.indexOf(socket);
if (i >= 0) self.sockets.splice(i, 1);
self._removeSocket(socket);
// unref the parser for easy gc
parsers.free(parser);
});
@ -1133,12 +1150,51 @@ exports.get = function(options, cb) {
// Shims to old interface.
function Client(port, host) {
var self = this;
this.port = port;
this.host = host;
this.agent = getAgent(this.host, this.port);
// proxy connect events upwards;
this.agent.on('connect', function() {
self.emit('connect');
});
this.agent.on('end', function() {
self.emit('end');
});
// proxy upgrade events upwards;
this.agent.on('upgrade', function (res, socket, upgradeHead) {
if (self.listeners('upgrade').length) {
self.emit('upgrade', res, socket, upgradeHead);
} else {
socket.destroy();
}
});
}
util.inherits(Client, EventEmitter);
// This method is used in a few tests to force the connections closed.
// Again - just a shim so as not to break code. Not really important.
Client.prototype.end = function() {
for (var i = 0; i < this.agent.sockets.length; i++) {
var socket = this.agent.sockets[i];
if (!socket._httpMessage && socket.writable) socket.end();
}
};
Client.prototype.destroy = function(e) {
for (var i = 0; i < this.agent.sockets.length; i++) {
var socket = this.agent.sockets[i];
socket.destroy(e);
}
};
Client.prototype.request = function(method, path, headers) {
if (typeof(path) != 'string') {
// assume method was omitted, shift arguments

Просмотреть файл

@ -21,11 +21,13 @@ server.listen(common.PORT, function() {
var client = http.createClient(common.PORT);
function upgradeRequest(fn) {
console.log("req");
var header = { 'Connection': 'Upgrade', 'Upgrade': 'Test' };
var request = client.request('GET', '/', header);
var wasUpgrade = false;
function onUpgrade(res, socket, head) {
console.log("client upgraded");
wasUpgrade = true;
client.removeListener('upgrade', onUpgrade);
@ -34,6 +36,7 @@ server.listen(common.PORT, function() {
client.on('upgrade', onUpgrade);
function onEnd() {
console.log("client end");
client.removeListener('end', onEnd);
if (!wasUpgrade) {
throw new Error('hasn\'t received upgrade event');