From 421c7fd5a097defbf327fdb13b98bc183d34c69f Mon Sep 17 00:00:00 2001 From: Andrew Naylor Date: Thu, 22 Nov 2012 14:42:49 +0000 Subject: [PATCH] Introduced a new event model, connection timeouts and auto cache adjustment. Two new options: autoAdjustCache. If Apple returns an error for a notification which has been lost due to the cache being too small, when this is set to true the module will make a simple estimate of how many notifications have been lost and increase the cacheLength by twice that amount. connectionTimeout. Set the socket connection timeout value, if set (value >0 in milliseconds) then after a period of inactivity the socket will automatically disconnect. Note: if further notifications are sent the socket will be re-established automatically. ********* When a connection is created it is now possible to add eventListeners to monitor the modules progress. Events (arguments): - error (error): emitted when an error occurs during initialisation of the module, usually due to a problem with the keys and certificates. - transmitted (notification): emitted when a notification has been sent to Apple - not a guarantee that it has been accepted by Apple, an error relating to it make occur later on. A notification may also be sent several times if an earlier notification caused an error requiring retransmission. - timeout: emitted when the connectionTimeout option has been specified and no activity has occurred on a socket for a specified duration. The socket will be closed immediately after this event. - connected: emitted when the connection to Apple is successfully established. No action is required as the connection is managed internally. - disconnected: emitted when the connection to Apple has been closed, this could be for numerous reasons, for example an error has occurred or the connection has timed out. No action is required. - socketError (error): emitted when the connection socket experiences an error. This is useful for debugging but no action should be necessary. - transmissionError (error code, notification): emitted when a message has been received from Apple stating that a notification was invalid. If we still have the notification in cache it will be passed as the second argument, otherwise null. - cacheTooSmall (difference): emitted when Apple returns a notification as invalid but the notification has been expunged from the cache - usually due to high throughput. The parameter estimates how many notifications have been lost. --- lib/connection.js | 109 ++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 38 deletions(-) diff --git a/lib/connection.js b/lib/connection.js index 8d82bab..1f2c6f7 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -3,7 +3,9 @@ var Errors = require('./errors'); var fs = require('fs'); var q = require('q'); var tls = require('tls'); +var sysu = require('util'); var util = require('./util'); +var events = require('events'); var debug = function() {}; if(process.env.DEBUG) { try { @@ -32,10 +34,14 @@ if(process.env.DEBUG) { * @config {Boolean} [rejectUnauthorized=true] Reject Unauthorized property to be passed through to tls.connect() * @config {Boolean} [enhanced=true] Whether to use the enhanced notification format (recommended) * @config {Function} [errorCallback] A callback which accepts 2 parameters (err, notification). Recommended when using enhanced format. - * @config {Number} [cacheLength] Number of notifications to cache for error purposes (See Readme) + * @config {Number} [cacheLength=100] Number of notifications to cache for error purposes (See Readme) + * @config {Boolean} [autoAdjustCache=false] Whether the cache should grow in response to messages being lost after errors. (Will still emit a 'cacheTooSmall' event) + * @config {Number} [connectionTimeout=0] The duration the socket should stay alive with no activity in milliseconds. 0 = Disabled. */ function Connection (options) { - + if(false === (this instanceof Connection)) { + return new Connection(options); + } this.options = { cert: 'cert.pem', certData: null, @@ -50,7 +56,9 @@ function Connection (options) { rejectUnauthorized: true, enhanced: true, errorCallback: undefined, - cacheLength: 100 + cacheLength: 100, + autoAdjustCache: true, + connectionTimeout: 0 }; util.extend(this.options, options); @@ -65,10 +73,12 @@ function Connection (options) { this.currentId = 0; this.cachedNotifications = []; this.notificationBuffer = []; - - this.connectionTimeout = null; + + events.EventEmitter.call(this); }; +sysu.inherits(Connection, events.EventEmitter); + /** * @private */ @@ -169,27 +179,23 @@ Connection.prototype.connect = function () { this.options['gateway'], socketOptions, function () { - if (this.connectionTimeout) { - clearTimeout(this.connectionTimeout); - } - - if (this.options.connectionTimeout > 0) { - this.connectionTimeout = setTimeout(this.destroyConnection.bind(this), this.options.connectionTimeout); - } - debug("Connection established"); + this.emit('connected'); this.deferredConnection.resolve(); }.bind(this)); + this.socket.setNoDelay(false); + this.socket.setTimeout(this.options.connectionTimeout, this.socketTimeout.bind(this)); + this.socket.on("error", this.errorOccurred.bind(this)); this.socket.on('data', this.handleTransmissionError.bind(this)); this.socket.on("drain", this.socketDrained.bind(this)); this.socket.on('clientError', this.errorOccurred.bind(this)); - this.socket.on("end", this.restartConnection.bind(this)); this.socket.once('close', this.restartConnection.bind(this)); }.bind(this)).fail(function (error) { debug("Module initialisation error:", error); + this.emit('error', error); this.deferredConnection.reject(error); this.deferredConnection = null; }.bind(this)); @@ -202,6 +208,7 @@ Connection.prototype.connect = function () { */ Connection.prototype.errorOccurred = function(err) { debug("Socket error occurred", err); + this.emit('socketError', err); if(!this.deferredConnection.promise.isResolved()) { this.deferredConnection.reject(err); } @@ -216,6 +223,10 @@ Connection.prototype.errorOccurred = function(err) { */ Connection.prototype.socketDrained = function() { debug("Socket drained"); + if(this.options.enhanced) { + var notification = this.cachedNotifications[this.cachedNotifications.length - 1]; + this.emit('transmitted', notification); + } if (this.socket && (this.socket.socket.bufferSize != 0 || !this.socket.writable)) { return; } @@ -226,6 +237,15 @@ Connection.prototype.socketDrained = function() { } }; +/** + * @private + */ + Connection.prototype.socketTimeout = function() { + debug("Socket timeout"); + this.emit('timeout'); + this.socket.end(); + }; + /** * @private */ @@ -249,14 +269,12 @@ Connection.prototype.restartConnection = function() { debug("Connection error occurred before TLS Handshake"); this.deferredConnection.reject(new Error("Unable to connect")); } + + this.emit('disconnected'); this.socket = undefined; this.deferredConnection = undefined; - if (this.connectionTimeout) { - clearTimeout(this.connectionTimeout); - } - if (this.notificationBuffer.length) { debug("Notification queue has %d items, resending the first", this.notificationBuffer.length); this.sendNotification(this.notificationBuffer.shift()); @@ -311,11 +329,20 @@ Connection.prototype.handleTransmissionError = function (data) { while (temporaryCache.length) { temporaryCache.shift(); } + this.emit('transmissionError', errorCode, notification); this.raiseError(errorCode, notification); } else { this.cachedNotifications = temporaryCache; - this.raiseError(Errors["none"], null); + + var differentialSize = this.cachedNotifications[0]['_uid'] - notification['_uid'] + this.emit('cacheTooSmall', differentialSize); + if(this.options.autoAdjustCache) { + this.options.cacheLength += differentialSize * 2; + } + + this.emit('transmissionError', Errors["none"], null); + this.raiseError(errorCode, null); } var count = this.cachedNotifications.length; @@ -347,30 +374,33 @@ Connection.prototype.raiseError = function(errorCode, notification) { * @param {Notification} notification The notification object to be sent */ Connection.prototype.sendNotification = function (notification) { + var token = notification.device.token; + + var encoding = notification.encoding || 'utf8'; + var message = JSON.stringify(notification); + var messageLength = Buffer.byteLength(message, encoding); + var position = 0; + var data; + + if (token === undefined) { + process.nextTick(function () { + this.raiseError(Errors['missingDeviceToken'], notification); + }.bind(this)); + return Errors['missingDeviceToken']; + } + if (messageLength > 255) { + process.nextTick(function () { + this.raiseError(Errors['invalidPayloadSize'], notification); + }.bind(this)); + return Errors['invalidPayloadSize']; + } + this.connect().then(function() { debug("Sending notification"); - debug("Buffering notification"); if (!this.socket || this.socket.socket.bufferSize !== 0 || !this.socket.writable) { this.bufferNotification(notification); return; } - - var token = notification.device.token; - - var encoding = notification.encoding || 'utf8'; - var message = JSON.stringify(notification); - var messageLength = Buffer.byteLength(message, encoding); - var position = 0; - var data; - - if (token === undefined) { - this.raiseError(Errors['missingDeviceToken'], notification); - return; - } - if (messageLength > 255) { - this.raiseError(Errors['invalidPayloadSize'], notification); - return; - } notification._uid = this.currentId++; if (this.currentId > 0xffffffff) { @@ -409,13 +439,16 @@ Connection.prototype.sendNotification = function (notification) { position += 2; //Payload position += data.write(message, position, encoding); - + if(this.socket.write(data)) { this.socketDrained(); } }.bind(this)).fail(function (error) { + this.bufferNotification(notification); this.raiseError(error, notification); }.bind(this)); + + return 0; }; module.exports = Connection;