diff --git a/browser/app/profile/firefox.js b/browser/app/profile/firefox.js index f6a04c4b3092..14f6ac7ce3e8 100644 --- a/browser/app/profile/firefox.js +++ b/browser/app/profile/firefox.js @@ -1660,6 +1660,8 @@ pref("loop.do_not_disturb", false); pref("loop.ringtone", "chrome://browser/content/loop/shared/sounds/ringtone.ogg"); pref("loop.retry_delay.start", 60000); pref("loop.retry_delay.limit", 300000); +pref("loop.ping.interval", 1800000); +pref("loop.ping.timeout", 10000); pref("loop.feedback.baseUrl", "https://input.mozilla.org/api/v1/feedback"); pref("loop.feedback.product", "Loop"); pref("loop.debug.loglevel", "Error"); diff --git a/browser/components/loop/MozLoopPushHandler.jsm b/browser/components/loop/MozLoopPushHandler.jsm index b4fd3392fdfb..7dfc93d8be7f 100644 --- a/browser/components/loop/MozLoopPushHandler.jsm +++ b/browser/components/loop/MozLoopPushHandler.jsm @@ -9,12 +9,308 @@ const { classes: Cc, interfaces: Ci, utils: Cu } = Components; Cu.import("resource://gre/modules/Services.jsm"); Cu.import("resource://gre/modules/XPCOMUtils.jsm"); Cu.import("resource://gre/modules/Timer.jsm"); -Cu.import("resource://gre/modules/Promise.jsm"); + +const {MozLoopService} = Cu.import("resource:///modules/loop/MozLoopService.jsm", {}); +const consoleLog = MozLoopService.log; this.EXPORTED_SYMBOLS = ["MozLoopPushHandler"]; -XPCOMUtils.defineLazyModuleGetter(this, "console", - "resource://gre/modules/devtools/Console.jsm"); +const CONNECTION_STATE_CLOSED = 0; +const CONNECTION_STATE_CONNECTING = 1; +const CONNECTION_STATE_OPEN = 2; + +const SERVICE_STATE_OFFLINE = 0; +const SERVICE_STATE_PENDING = 1; +const SERVICE_STATE_ACTIVE = 2; + +function PushSocket(webSocket = null) { + this._websocket = webSocket; +} + +PushSocket.prototype = { + + /** + * Open push-notification websocket. + * + * @param {String} pushUri + * @param {Function} onMsg(aMsg) callback receives any incoming messages + * aMsg is constructed from the json payload; both + * text and binary message reception are mapped to this + * callback. + * @param {Function} onStart called when the socket is connected + * @param {Function} onClose(aCode, aReason) called when the socket closes; + * both near and far side close events map to this + * callback. + * aCode is any status code returned on close + * aReason is any string returned on close + */ + + connect: function(pushUri, onMsg, onStart, onClose) { + if (!pushUri || !onMsg || !onStart || !onClose) { + throw new Error("PushSocket: missing required parameter(s):" + (pushUri ? "" : " pushUri") + + (onMsg ? "" : " onMsg") + + (onStart ? "" : " onStart") + + (onClose ? "" : " onClose")); + } + + this._onMsg = onMsg; + this._onStart = onStart; + this._onClose = onClose; + + if (!this._websocket) { + this._websocket = Cc["@mozilla.org/network/protocol;1?name=wss"] + .createInstance(Ci.nsIWebSocketChannel); + } + + let uri = Services.io.newURI(pushUri, null, null); + this._websocket.protocol = "push-notification"; + this._websocket.asyncOpen(uri, pushUri, this, null); + }, + + /** + * nsIWebSocketListener method, handles the start of the websocket stream. + * + * @param {nsISupports} aContext Not used + */ + onStart: function() { + this._socketOpen = true; + this._onStart(); + }, + + /** + * nsIWebSocketListener method, called when the websocket is closed locally. + * + * @param {nsISupports} aContext Not used + * @param {nsresult} aStatusCode + */ + onStop: function(aContext, aStatusCode) { + this._socketOpen = false; + this._onClose(aStatusCode, "websocket onStop"); + }, + + /** + * nsIWebSocketListener method, called when the websocket is closed + * by the far end. + * + * @param {nsISupports} aContext Not used + * @param {integer} aCode the websocket closing handshake close code + * @param {String} aReason the websocket closing handshake close reason + */ + onServerClose: function(aContext, aCode, aReason) { + this._socketOpen = false; + this._onClose(aCode, aReason); + }, + + /** + * nsIWebSocketListener method, called when the websocket receives + * a text message (normally json encoded). + * + * @param {nsISupports} aContext Not used + * @param {String} aMsg The message data + */ + onMessageAvailable: function(aContext, aMsg) { + consoleLog.log("PushSocket: Message received: ", aMsg); + if (!this._socketOpen) { + consoleLog.error("Message received in Winsocket closed state"); + return; + } + + try { + this._onMsg(JSON.parse(aMsg)); + } + catch (error) { + consoleLog.error("PushSocket: error parsing message payload - ", error); + } + }, + + /** + * nsIWebSocketListener method, called when the websocket receives a binary message. + * This class assumes that it is connected to a SimplePushServer and therefore treats + * the message payload as json encoded. + * + * @param {nsISupports} aContext Not used + * @param {String} aMsg The message data + */ + onBinaryMessageAvailable: function(aContext, aMsg) { + consoleLog.log("PushSocket: Binary message received: ", aMsg); + if (!this._socketOpen) { + consoleLog.error("PushSocket: message receive in Winsocket closed state"); + return; + } + + try { + this._onMsg(JSON.parse(aMsg)); + } + catch (error) { + consoleLog.error("PushSocket: error parsing message payload - ", error); + } + }, + + /** + * Create a JSON encoded message payload and send via websocket. + * + * @param {Object} aMsg Message to send. + * + * @returns {Boolean} true if message has been sent, false otherwise + */ + send: function(aMsg) { + if (!this._socketOpen) { + consoleLog.error("PushSocket: attempt to send before websocket is open"); + return false; + } + + let msg; + try { + msg = JSON.stringify(aMsg); + } + catch (error) { + consoleLog.error("PushSocket: JSON generation error - ", error); + return false; + } + + try { + this._websocket.sendMsg(msg); + consoleLog.log("PushSocket: Message sent: ", msg); + } + // guard against the case that the websocket has closed before this call. + catch (e) { + consoleLog.warn("PushSocket: websocket send error", e); + return false; + } + + return true; + }, + + /** + * Close the websocket. + */ + close: function() { + if (!this._socketOpen) { + return; + } + + this._socketOpen = false; + consoleLog.info("PushSocket: websocket closing"); + + // Do not pass through any callbacks after this point. + this._onStart = function() {}; + this._onMsg = this._onStart; + this._onClose = this._onStart; + + try { + this._websocket.close(this._websocket.CLOSE_NORMAL); + } + catch (e) {} + }, +}; + + +/** + * Create a RetryManager object. Class to handle retrying a UserAgent + * to PushServer request following a retry back-off scheme managed by + * this class. The current delay mechanism is to double the delay + * each time an operation to be retried until a maximum is met. + * + * @param {Integer} startDelay The initial delay interval in milliseconds. + * @param {Integer} maxDelay Maximum time delay value in milliseconds. + */ +function RetryManager (startDelay, maxDelay) { + if (!startDelay || !maxDelay) { + throw new Error("RetryManager: missing required parameters(s)" + + (startDelay ? "" : " startDelay") + + (maxDelay ? "" : " maxDelay")); + } + + this._startDelay = startDelay; + // The maximum delay cannot be less than the starting delay. + this._maxDelay = maxDelay > startDelay ? maxDelay : startDelay; +} + +RetryManager.prototype = { + /** + * Method to handle retrying a UserAgent to PushServer request. + * + * @param {Function} delayedOp Function to call after current delay is satisfied + */ + retry: function(delayedOp) { + if (!this._timeoutID) { + this._retryDelay = this._startDelay; + } else { + clearTimeout(this._timeoutID); + let nextDelay = this._retryDelay * 2; + this._retryDelay = nextDelay > this._maxDelay ? this._maxDelay : nextDelay; + } + + this._timeoutID = setTimeout(delayedOp, this._retryDelay); + consoleLog.log("PushHandler: retry delay set for ", this._retryDelay); + }, + + /** + * Method used to reset the delay back-off logic and clear any currently + * running delay timeout. + */ + reset: function() { + if (this._timeoutID) { + clearTimeout(this._timeoutID); + this._timeoutID = null; + } + }, +}; + +/** + * Create a PingMonitor object. An object instance will periodically execute + * a ping send function and if not reset, will then execute an error function. + * + * @param {Function} pingFunc Function that is called after a ping interval + * has expired without being restart. + * @param {Function} onTimeout Function that is called after a ping timeout + * interval has expired without restart being called. + * @param {Integer} interval Timeout value in milliseconds between successive + * pings or between the last restart call and a ping. + * When this interval expires, pingFunc is called and the + * timeout interval is started. + * @param {Integer} timeout Timeout value in milliseconds between a call to + * pingFunc and a call to onTimeout unless restart is called. + * Restart will begin the ping timeout interval again. + */ +function PingMonitor(pingFunc, onTimeout, interval, timeout) { + if (!pingFunc || !onTimeout || !interval || !timeout) { + throw new Error("PingMonitor: missing required parameters"); + } + this._onTimeout = onTimeout; + this._pingFunc = pingFunc; + this._pingInterval = interval; + this._pingTimeout = timeout; +} + +PingMonitor.prototype = { + /** + * Function to restart the ping timeout and cancel any current timeout operation. + */ + restart: function () { + consoleLog.info("PushHandler: ping timeout restart"); + this.stop(); + this._pingTimerID = setTimeout(() => {this._pingSend()}, this._pingInterval); + }, + + /** + * Function to stop the PingMonitor. + */ + stop: function() { + if (this._pingTimerID){ + clearTimeout(this._pingTimerID); + this._pingTimerID = undefined; + } + }, + + _pingSend: function () { + consoleLog.info("PushHandler: ping sent"); + this._pingTimerID = setTimeout(this._onTimeout, this._pingTimeout); + this._pingFunc(); + }, +}; + /** * We don't have push notifications on desktop currently, so this is a @@ -25,31 +321,53 @@ let MozLoopPushHandler = { pushServerUri: undefined, // Records containing the registration and notification callbacks indexed by channelID. // Each channel will be registered with the PushServer. - channels: {}, + channels: new Map(), // This is the UserAgent UUID assigned by the PushServer uaID: undefined, // Each successfully registered channelID is used as a key to hold its pushEndpoint URL. registeredChannels: {}, + // Push protocol state variable + serviceState: SERVICE_STATE_OFFLINE, + // Websocket connection state variable + connectionState: CONNECTION_STATE_CLOSED, + // Contains channels that need to be registered with the PushServer + _channelsToRegister: [], - _channelsToRegister: {}, - - _minRetryDelay_ms: (() => { + get _startRetryDelay_ms() { try { - return Services.prefs.getIntPref("loop.retry_delay.start") + return Services.prefs.getIntPref("loop.retry_delay.start"); } catch (e) { - return 60000 // 1 minute + return 60000; // 1 minute } - })(), + }, - _maxRetryDelay_ms: (() => { + get _maxRetryDelay_ms() { try { - return Services.prefs.getIntPref("loop.retry_delay.limit") + return Services.prefs.getIntPref("loop.retry_delay.limit"); } catch (e) { - return 300000 // 5 minutes + return 300000; // 5 minutes } - })(), + }, + + get _pingInterval_ms() { + try { + return Services.prefs.getIntPref("loop.ping.interval"); + } + catch (e) { + return 18000000; // 30 minutes + } + }, + + get _pingTimeout_ms() { + try { + return Services.prefs.getIntPref("loop.ping.timeout"); + } + catch (e) { + return 10000; // 10 seconds + } + }, /** * Inializes the PushHandler and opens a socket with the PushServer. @@ -61,8 +379,9 @@ let MozLoopPushHandler = { * used for testing. */ initialize: function(options = {}) { + consoleLog.info("PushHandler: initialize options = ", options); if (Services.io.offline) { - console.warn("MozLoopPushHandler - IO offline"); + consoleLog.warn("PushHandler: IO offline"); return false; } @@ -71,6 +390,14 @@ let MozLoopPushHandler = { } this._initDone = true; + this._retryManager = new RetryManager(this._startRetryDelay_ms, + this._maxRetryDelay_ms); + // Send an empty json payload as a ping. + // Close the websocket and re-open if a timeout occurs. + this._pingMonitor = new PingMonitor(() => this._pushSocket.send({}), + () => this._restartConnection(), + this._pingInterval_ms, + this._pingTimeout_ms); if ("mockWebSocket" in options) { this._mockWebSocket = options.mockWebSocket; @@ -80,6 +407,42 @@ let MozLoopPushHandler = { return true; }, + /** + * Reset and clear PushServer connection. + * Returns MozLoopPushHandler to pre-initialized state. + */ + shutdown: function() { + consoleLog.info("PushHandler: shutdown"); + if (!this._initDone) { + return; + } + + this._initDone = false; + this._retryManager.reset(); + this._pingMonitor.stop(); + + // Un-register each active notification channel + if (this.connectionState === CONNECTION_STATE_OPEN) { + Object.keys(this.registeredChannels).forEach((id) => { + let unRegMsg = {messageType: "unregister", + channelID: id}; + this._pushSocket.send(unRegMsg); + }); + this.registeredChannels = {}; + } + + this.connectionState = CONNECTION_STATE_CLOSED; + this.serviceState = SERVICE_STATE_OFFLINE; + this._pushSocket.close(); + this._pushSocket = undefined; + // NOTE: this PushSocket instance will not be released until at least + // the websocket referencing it as an nsIWebSocketListener is released. + this.channels.clear(); + this.uaID = undefined; + this.pushUrl = undefined; + this.pushServerUri = undefined; + }, + /** * Start registration of a PushServer notification channel. * connection, it will automatically say hello and register the channel @@ -97,159 +460,263 @@ let MozLoopPushHandler = { * @param {String} channelID Channel ID to use in registration. * * @param {Function} onRegistered Callback to be called once we are - * registered. + * registered. + * NOTE: This function can be called multiple times if + * the PushServer generates new pushURLs due to + * re-registration due to network loss or PushServer + * initiated re-assignment. * @param {Function} onNotification Callback to be called when a - * push notification is received (may be called multiple - * times). + * push notification is received (may be called multiple + * times). */ register: function(channelID, onRegistered, onNotification) { if (!channelID || !onRegistered || !onNotification) { - throw new Error("missing required parameter(s):" - + (channelID ? "" : " channelID") - + (onRegistered ? "" : " onRegistered") - + (onNotification ? "" : " onNotification")); + throw new Error("missing required parameter(s):" + + (channelID ? "" : " channelID") + + (onRegistered ? "" : " onRegistered") + + (onNotification ? "" : " onNotification")); } + consoleLog.info("PushHandler: channel registration: ", channelID); // If the channel is already registered, callback with an error immediately // so we don't leave code hanging waiting for an onRegistered callback. - if (channelID in this.channels) { + if (this.channels.has(channelID)) { + consoleLog.error("PushHandler: channel already registered"); onRegistered("error: channel already registered: " + channelID); return; } - this.channels[channelID] = { - onRegistered: onRegistered, - onNotification: onNotification - }; + this.channels.set(channelID, {onRegistered: onRegistered, + onNotification: onNotification}); - // If registration is in progress, simply add to the work list. - // Else, re-start a registration cycle. - if (this._registrationID) { - this._channelsToRegister.push(channelID); - } else { - this._registerChannels(); - } + this._channelsToRegister.push(channelID); + this._registerChannels(); }, /** - * Listener method, handles the start of the websocket stream. + * Handles the start of the websocket stream. * Sends a hello message to the server. * - * @param {nsISupports} aContext Not used */ - onStart: function() { - this._retryEnd(); - // If a uaID has already been assigned, assume this is a re-connect - // and send the uaID in order to re-synch with the - // PushServer. If a registration has been completed, send the channelID. + _onStart: function() { + consoleLog.info("PushHandler: websocket open, sending 'hello' to PushServer"); + this.connectionState = CONNECTION_STATE_OPEN; + // If a uaID has already been assigned, assume this is a re-connect; + // send the uaID and channelIDs in order to re-synch with the + // PushServer. The PushServer does not need to accept the existing channelIDs + // and may issue new channelIDs along with new pushURLs. + this.serviceState = SERVICE_STATE_PENDING; let helloMsg = { - messageType: "hello", - uaid: this.uaID || "", - channelIDs: Object.keys(this.registeredChannels)}; - - this._retryOperation(() => this.onStart(), this._maxRetryDelay_ms); - try { // in case websocket has closed before this handler is run - this._websocket.sendMsg(JSON.stringify(helloMsg)); - } - catch (e) {console.warn("MozLoopPushHandler::onStart websocket.sendMsg() failure");} + messageType: "hello", + uaid: this.uaID || "", + channelIDs: this.uaID ? Object.keys(this.registeredChannels) : [] + }; + // The Simple PushServer spec does not allow a retry of the Hello handshake but requires that the socket + // be closed and another socket openned in order to re-attempt the handshake. + // Here, the retryManager is not set up to retry the sending another 'hello' message: the timeout will + // trigger closing the websocket and starting the connection again from the start. + this._retryManager.reset(); + this._retryManager.retry(() => this._restartConnection()); + this._pushSocket.send(helloMsg); }, /** - * Listener method, called when the websocket is closed. + * Handles websocket close callbacks. * - * @param {nsISupports} aContext Not used - * @param {nsresult} aStatusCode Reason for stopping (NS_OK = successful) + * This method will continually try to re-establish a connection + * to the PushServer unless shutdown has been called. */ - onStop: function(aContext, aStatusCode) { - Cu.reportError("Loop Push server web socket closed! Code: " + aStatusCode); - this._retryOperation(() => this._openSocket()); - }, + _onClose: function(aCode, aReason) { + this._pingMonitor.stop(); - /** - * Listener method, called when the websocket is closed by the server. - * If there are errors, onStop may be called without ever calling this - * method. - * - * @param {nsISupports} aContext Not used - * @param {integer} aCode the websocket closing handshake close code - * @param {String} aReason the websocket closing handshake close reason - */ - onServerClose: function(aContext, aCode) { - Cu.reportError("Loop Push server web socket closed (server)! Code: " + aCode); - this._retryOperation(() => this._openSocket()); - }, + switch (this.connectionState) { + case CONNECTION_STATE_OPEN: + this.connectionState = CONNECTION_STATE_CLOSED; + consoleLog.info("PushHandler: websocket closed: begin reconnect - ", aCode); + // The first retry is immediate + this._retryManager.reset(); + this._openSocket(); + break; + + case CONNECTION_STATE_CONNECTING: + // Wait before re-attempting to open the websocket. + consoleLog.info("PushHandler: websocket closed: delay and retry - ", aCode); + this._retryManager.retry(() => this._openSocket()); + break; + } + }, /** * Listener method, called when the websocket receives a message. * - * @param {nsISupports} aContext Not used - * @param {String} aMsg The message data + * @param {Object} aMsg The message data */ - onMessageAvailable: function(aContext, aMsg) { - let msg = JSON.parse(aMsg); + _onMsg: function(aMsg) { + // If an error property exists in the message object ignore the other + // properties. + if (aMsg.error) { + consoleLog.error("PushHandler: received error response msg: ", aMsg.error); + return; + } - switch(msg.messageType) { + // The recommended response to a ping message when the push server has nothing + // else to send is a blank JSON message body: {} + if (!aMsg.messageType && this.serviceState === SERVICE_STATE_ACTIVE) { + // Treat this as a ping response + this._pingMonitor.restart(); + return; + } + + switch(aMsg.messageType) { case "hello": - this._retryEnd(); - this._isConnected = true; - if (this.uaID !== msg.uaid) { - this.uaID = msg.uaid; - this.registeredChannels = {}; - this._registerChannels(); - } + this._onHello(aMsg); break; case "register": - this._onRegister(msg); + this._onRegister(aMsg); break; case "notification": - msg.updates.forEach((update) => { - if (update.channelID in this.registeredChannels) { - this.channels[update.channelID].onNotification(update.version, update.channelID); - } - }); + this._onNotification(aMsg); break; + + default: + consoleLog.warn("PushHandler: unknown message type = ", aMsg.messageType); + if (this.serviceState === SERVICE_STATE_ACTIVE) { + // Treat this as a ping response + this._pingMonitor.restart(); + } + break; + } + }, + + /** + * Handles hello message. + * + * This method will parse the hello response from the PushServer + * and determine whether registration is necessary. + * + * @param {aMsg} hello message body + */ + _onHello: function(aMsg) { + if (this.serviceState !== SERVICE_STATE_PENDING) { + consoleLog.error("PushHandler: extra 'hello' response received from PushServer"); + return; } + + // Clear any pending timeout that will restart the connection. + this._retryManager.reset(); + this.serviceState = SERVICE_STATE_ACTIVE; + consoleLog.info("PushHandler: 'hello' handshake complete"); + // Start the PushServer ping monitor + this._pingMonitor.restart(); + // If a new uaID is received, then any previous channel registrations + // are no longer valid and a Registration request is generated. + if (this.uaID !== aMsg.uaid) { + consoleLog.log("PushHandler: registering all channels"); + this.uaID = aMsg.uaid; + // Re-register all channels. + this._channelsToRegister = [...this.channels.keys()]; + this.registeredChannels = {}; + } + // Allow queued registrations to start (or all if cleared above). + this._registerChannels(); }, + /** + * Handles notification message. + * + * This method will parse the Array of updates and trigger + * the callback of any registered channel. + * This method will construct an ack message containing + * a set of channel version update notifications. + * + * @param {aMsg} notification message body + */ + _onNotification: function(aMsg) { + if (this.serviceState !== SERVICE_STATE_ACTIVE || + this.registeredChannels.length === 0) { + // Treat reception of a notification before handshake and registration + // are complete as a fatal error. + consoleLog.error("PushHandler: protocol error - notification received in wrong state"); + this._restartConnection(); + return; + } + + this._pingMonitor.restart(); + if (Array.isArray(aMsg.updates) && aMsg.updates.length > 0) { + let ackChannels = []; + aMsg.updates.forEach(update => { + if (update.channelID in this.registeredChannels) { + consoleLog.log("PushHandler: notification: version = ", update.version, + ", channelID = ", update.channelID); + this.channels.get(update.channelID) + .onNotification(update.version, update.channelID); + ackChannels.push(update); + } else { + consoleLog.error("PushHandler: notification received for unknown channelID: ", + update.channelID); + } + }); + + consoleLog.log("PushHandler: PusherServer 'ack': ", ackChannels); + this._pushSocket.send({messageType: "ack", + updates: ackChannels}); + } + }, + /** * Handles the PushServer registration response. * * @param {Object} msg PushServer to UserAgent registration response (parsed from JSON). */ _onRegister: function(msg) { - let registerNext = () => { - this._registrationID = this._channelsToRegister.shift(); - this._sendRegistration(this._registrationID); + if (this.serviceState !== SERVICE_STATE_ACTIVE || + msg.channelID != this._pendingChannelID) { + // Treat reception of a register response outside of a completed handshake + // or for a channelID not currently pending a response + // as an indication that the connections should be reset. + consoleLog.error("PushHandler: registration protocol error"); + this._restartConnection(); + return; } + this._retryManager.reset(); + this._pingMonitor.restart(); + switch (msg.status) { case 200: - if (msg.channelID == this._registrationID) { - this._retryEnd(); // reset retry mechanism - this.registeredChannels[msg.channelID] = msg.pushEndpoint; - this.channels[msg.channelID].onRegistered(null, msg.pushEndpoint, msg.channelID); - registerNext(); - } + consoleLog.info("PushHandler: channel registered: ", msg.channelID); + this.registeredChannels[msg.channelID] = msg.pushEndpoint; + this.channels.get(msg.channelID) + .onRegistered(null, msg.pushEndpoint, msg.channelID); + this._registerNext(); break; case 500: + consoleLog.info("PushHandler: eeceived a 500 retry response from the PushServer: ", + msg.channelID); // retry the registration request after a suitable delay - this._retryOperation(() => this._sendRegistration(msg.channelID)); + this._retryManager.retry(() => this._sendRegistration(msg.channelID)); break; case 409: - this.channels[this._registrationID].onRegistered( + consoleLog.error("PushHandler: received a 409 response from the PushServer: ", + msg.channelID); + this.channels.get(this._pendingChannelID).onRegistered( "error: PushServer ChannelID already in use: " + msg.channelID); - registerNext(); + // Remove this channel from the channel list. + this.channels.delete(this._pendingChannelID); + this._registerNext(); break; default: - let id = this._channelsToRegister.shift(); - this.channels[this._registrationID].onRegistered( + consoleLog.error("PushHandler: received error ", msg.status, + " from the PushServer: ", msg.channelID); + this.channels.get(this._pendingChannelID).onRegistered( "error: PushServer registration failure, status = " + msg.status); - registerNext(); + this.channels.delete(this._pendingChannelID); + this._registerNext(); break; } }, @@ -259,38 +726,39 @@ let MozLoopPushHandler = { * * A new websocket interface is used each time. If an onStop callback * was received, calling asyncOpen() on the same interface will - * trigger a "alreay open socket" exception even though the channel + * trigger an "already open socket" exception even though the channel * is logically closed. */ _openSocket: function() { - this._isConnected = false; - - if (this._mockWebSocket) { - // For tests, use the mock instance. - this._websocket = this._mockWebSocket; - } else { - this._websocket = Cc["@mozilla.org/network/protocol;1?name=wss"] - .createInstance(Ci.nsIWebSocketChannel); - } - - this._websocket.protocol = "push-notification"; + this.connectionState = CONNECTION_STATE_CONNECTING; + // For tests, use the mock instance. + this._pushSocket = new PushSocket(this._mockWebSocket); let performOpen = () => { - let uri = Services.io.newURI(this.pushServerUri, null, null); - this._websocket.asyncOpen(uri, this.pushServerUri, this, null); + consoleLog.info("PushHandler: attempt to open websocket to PushServer: ", this.pushServerUri); + this._pushSocket.connect(this.pushServerUri, + (aMsg) => this._onMsg(aMsg), + () => this._onStart(), + (aCode, aReason) => this._onClose(aCode, aReason)); } let pushServerURLFetchError = () => { - console.warn("MozLoopPushHandler - Could not retrieve push server URL from Loop server, will retry"); - this._retryOperation(() => this._openSocket()); + consoleLog.warn("PushHandler: Could not retrieve push server URL from Loop server, will retry"); + this._pushSocket = undefined; + this._retryManager.retry(() => this._openSocket()); return; } + try { + this.pushServerUri = Services.prefs.getCharPref("loop.debug.pushserver"); + } + catch (e) {} + if (!this.pushServerUri) { // Get push server to use from the Loop server let pushUrlEndpoint = Services.prefs.getCharPref("loop.server") + "/push-server-config"; let req = Cc["@mozilla.org/xmlextras/xmlhttprequest;1"]. - createInstance(Ci.nsIXMLHttpRequest); + createInstance(Ci.nsIXMLHttpRequest); req.open("GET", pushUrlEndpoint); req.onload = () => { if (req.status >= 200 && req.status < 300) { @@ -298,19 +766,19 @@ let MozLoopPushHandler = { try { pushServerConfig = JSON.parse(req.responseText); } catch (e) { - console.warn("MozLoopPushHandler - Error parsing JSON response for push server URL"); + consoleLog.warn("PushHandler: Error parsing JSON response for push server URL"); pushServerURLFetchError(); } if (pushServerConfig.pushServerURI) { + this._retryManager.reset(); this.pushServerUri = pushServerConfig.pushServerURI; - this._retryEnd(); performOpen(); } else { - console.warn("MozLoopPushHandler - push server URL config lacks pushServerURI parameter"); + consoleLog.warn("PushHandler: push server URL config lacks pushServerURI parameter"); pushServerURLFetchError(); } } else { - console.warn("MozLoopPushHandler - push server URL retrieve error: " + req.status); + consoleLog.warn("PushHandler: push server URL retrieve error: " + req.status); pushServerURLFetchError(); } }; @@ -322,25 +790,43 @@ let MozLoopPushHandler = { } }, + /** + * Closes websocket and begins re-establishing a connection with the PushServer + */ + _restartConnection: function() { + this._retryManager.reset(); + this._pingMonitor.stop(); + this.serviceState = SERVICE_STATE_OFFLINE; + this._pendingChannelID = null; + + if (this.connectionState === CONNECTION_STATE_OPEN) { + // Close the current PushSocket and start the operation to open a new one. + this.connectionState = CONNECTION_STATE_CLOSED; + this._pushSocket.close(); + consoleLog.warn("PushHandler: connection error: re-establishing connection to PushServer"); + this._openSocket(); + } + }, + /** * Begins registering the channelIDs with the PushServer */ _registerChannels: function() { // Hold off registration operation until handshake is complete. - if (!this._isConnected) { + // If a registration cycle is in progress, do nothing. + if (this.serviceState !== SERVICE_STATE_ACTIVE || + this._pendingChannelID) { return; } + this._registerNext(); + }, - // If a registration is pending, do not generate a work list. - // Assume registration is in progress. - if (!this._registrationID) { - // Generate a list of channelIDs that have not yet been registered. - this._channelsToRegister = Object.keys(this.channels).filter((id) => { - return !(id in this.registeredChannels); - }); - this._registrationID = this._channelsToRegister.shift(); - this._sendRegistration(this._registrationID); - } + /** + * Gets the next channel to register from the worklist and kicks of its registration + */ + _registerNext: function() { + this._pendingChannelID = this._channelsToRegister.pop(); + this._sendRegistration(this._pendingChannelID); }, /** @@ -350,42 +836,8 @@ let MozLoopPushHandler = { */ _sendRegistration: function(channelID) { if (channelID) { - try { // in case websocket has closed - this._websocket.sendMsg(JSON.stringify({messageType: "register", - channelID: channelID})); - } - catch (e) {console.warn("MozLoopPushHandler::_registerChannel websocket.sendMsg() failure");} + this._pushSocket.send({messageType: "register", + channelID: channelID}); } }, - - /** - * Method to handle retrying UserAgent to PushServer request following - * a retry back-off scheme managed by this function. - * - * @param {function} delayedOp Function to call after current delay is satisfied - * - * @param {number} [optional] retryDelay This parameter will be used as the initial delay - */ - _retryOperation: function(delayedOp, retryDelay) { - if (!this._retryCount) { - this._retryDelay = retryDelay || this._minRetryDelay_ms; - this._retryCount = 1; - } else { - let nextDelay = this._retryDelay * 2; - this._retryDelay = nextDelay > this._maxRetryDelay_ms ? this._maxRetryDelay_ms : nextDelay; - this._retryCount += 1; - } - this._timeoutID = setTimeout(delayedOp, this._retryDelay); - }, - - /** - * Method used to reset the retry delay back-off logic. - * - */ - _retryEnd: function() { - if (this._retryCount) { - clearTimeout(this._timeoutID); - this._retryCount = 0; - } - } -}; +}