Initial reworking of network code to utilise new TLS API in node 0.4+.

I'm not entirely sure I have the error checking right/sufficient.

The method of queueing pending messages whilst connecting has been changed to an array which the data is pushed/shifted from as it is no longer possible to add a listener to trigger the write because the socket will change as it is recreated.
This commit is contained in:
Andrew Naylor 2011-03-06 00:45:27 +00:00
Родитель 6779c0fda3
Коммит db064b17c3
1 изменённых файлов: 23 добавлений и 31 удалений

Просмотреть файл

@ -1,5 +1,4 @@
var net = require('net'); var tls = require('tls');
var crypto = require('crypto');
var fs = require('fs'); var fs = require('fs');
var Buffer = require('buffer').Buffer; var Buffer = require('buffer').Buffer;
@ -17,13 +16,13 @@ var Errors = {
} }
var Connection = function (optionArgs) { var Connection = function (optionArgs) {
this.socket = new net.Stream();
this.credentials = crypto.createCredentials();
this.currentId = 0; this.currentId = 0;
this.cachedNotes = []; this.cachedNotes = [];
var self = this; var self = this;
var hasKey = hasCert = false; var hasKey = hasCert = false;
var socketOptions = {};
var offlineCache = [];
var options = { cert: 'cert.pem' /* Certificate file */ var options = { cert: 'cert.pem' /* Certificate file */
, key: 'key.pem' /* Key file */ , key: 'key.pem' /* Key file */
@ -47,12 +46,12 @@ var Connection = function (optionArgs) {
} }
var startSocket = function () { var startSocket = function () {
self.socket.connect(options['port'], options['gateway']); self.socket = tls.connect(options['port'], options['gateway'], socketOptions);
} self.socket.pair.on('secure', function () { if(!self.socket.authorized) { throw self.socket.authorizationError } while(offlineCache.length) { self.socket.write(offlineCache.shift()); } });
self.socket.on('connect', function() { self.socket.setSecure(self.credentials); });
self.socket.on('data', function(data) { handleTransmissionError(data); }); self.socket.on('data', function(data) { handleTransmissionError(data); });
self.socket.on('end', function () { self.socket.end(); }); self.socket.once('end', function () { });
self.socket.once('close', function () { self.socket.removeAllListeners(); self.socket = undefined; });
}
var connect = invoke_after(function() { startSocket(); }); var connect = invoke_after(function() { startSocket(); });
@ -60,7 +59,7 @@ var Connection = function (optionArgs) {
if(err) { if(err) {
throw err; throw err;
} }
self.credentials.context.setCert(data.toString()); socketOptions['cert'] = data.toString();
hasCert = true; hasCert = true;
})); }));
@ -68,7 +67,7 @@ var Connection = function (optionArgs) {
if(err) { if(err) {
throw err; throw err;
} }
self.credentials.context.setKey(data.toString()); socketOptions['key'] = data.toString();
hasKey = true; hasKey = true;
})); }));
@ -114,16 +113,11 @@ var Connection = function (optionArgs) {
pos += data.write(message, pos); pos += data.write(message, pos);
// If error occurs then slice array and resend all stored notes. // If error occurs then slice array and resend all stored notes.
if(self.socket === undefined || self.socket.readyState != 'open') {
if(self.socket.readyState != 'open') { if((self.socket === undefined || self.socket.readyState == 'closed') && readyToConnect()) {
if(self.socket.readyState == 'closed' && readyToConnect()) {
startSocket(); startSocket();
} }
self.socket.on('connect', offlineCache.push(data);
function() {
self.socket.write(data);
self.socket.removeListener('connect', arguments.callee);
});
} }
else { else {
self.socket.write(data); self.socket.write(data);
@ -240,11 +234,9 @@ Device.prototype.hexToken = function () {
} }
var Feedback = function (optionArgs) { var Feedback = function (optionArgs) {
this.socket = new net.Stream();
this.credentials = crypto.createCredentials();
var self = this; var self = this;
var hasKey = hasCert = false; var hasKey = hasCert = false;
var socketOptions = {}
var responsePacketLength = 38; var responsePacketLength = 38;
var readBuffer = new Buffer(responsePacketLength); var readBuffer = new Buffer(responsePacketLength);
@ -274,13 +266,13 @@ var Feedback = function (optionArgs) {
return (hasKey && hasCert); return (hasKey && hasCert);
} }
this.startSocket = function () { self.startSocket = function () {
self.socket.connect(options['port'], options['address']); self.socket = tls.connect(options['port'], options['address'], socketOptions);
} self.socket.pair.on('secure', function () { if(!self.socket.authorized) { throw self.socket.authorizationError } });
self.socket.on('connect', function() { self.socket.setSecure(self.credentials); });
self.socket.on('data', function(data) { processData(data); }); self.socket.on('data', function(data) { processData(data); });
self.socket.on('end', function () { self.socket.end(); }); self.socket.once('end', function () { });
self.socket.once('close', function () { self.socket.removeAllListeners(); self.socket = undefined; });
}
var connect = invoke_after(function() { self.startSocket(); }); var connect = invoke_after(function() { self.startSocket(); });
@ -288,7 +280,7 @@ var Feedback = function (optionArgs) {
if(err) { if(err) {
throw err; throw err;
} }
self.credentials.context.setCert(data.toString()); socketOptions['cert'] = data.toString();
hasCert = true; hasCert = true;
})); }));
@ -296,7 +288,7 @@ var Feedback = function (optionArgs) {
if(err) { if(err) {
throw err; throw err;
} }
self.credentials.context.setKey(data.toString()); socketOptions['key'] = data.toString();
hasKey = true; hasKey = true;
})); }));
@ -345,7 +337,7 @@ var Feedback = function (optionArgs) {
} }
Feedback.prototype.request = function () { Feedback.prototype.request = function () {
if(this.socket.readyState == 'closed' && this.readyToConnect()) { if((this.socket === undefined || this.socket.readyState == 'closed') && this.readyToConnect()) {
this.startSocket(); this.startSocket();
} }
} }