From c3efa6201b91d95108e7be3518b434f3ebe52578 Mon Sep 17 00:00:00 2001 From: Richard Newman Date: Sat, 25 Jun 2011 14:54:50 +0100 Subject: [PATCH] Bug 664865 - More considered handling of node reassignment mid-sync. r=philikon --- services/sync/modules/resource.js | 19 -- services/sync/modules/service.js | 73 +++--- services/sync/tests/unit/test_hmac_error.js | 245 ++++++++++++++++++ services/sync/tests/unit/test_resource.js | 36 --- .../sync/tests/unit/test_resource_async.js | 46 ---- services/sync/tests/unit/xpcshell.ini | 1 + 6 files changed, 288 insertions(+), 132 deletions(-) create mode 100644 services/sync/tests/unit/test_hmac_error.js diff --git a/services/sync/modules/resource.js b/services/sync/modules/resource.js index ae3f9d3b93e..36332aeb82f 100644 --- a/services/sync/modules/resource.js +++ b/services/sync/modules/resource.js @@ -389,25 +389,6 @@ AsyncResource.prototype = { // actual fetch, so be warned! XPCOMUtils.defineLazyGetter(ret, "obj", function() JSON.parse(ret)); - // Notify if we get a 401 to maybe try again with a new URI. - // TODO: more retry logic. - if (status == 401) { - // Create an object to allow observers to decide if we should try again. - let subject = { - newUri: "", - resource: this, - response: ret - } - Observers.notify("weave:resource:status:401", subject); - - // Do the same type of request but with the new URI. - if (subject.newUri != "") { - this.uri = subject.newUri; - this._doRequest(action, this._data, this._callback); - return; - } - } - this._callback(null, ret); }, diff --git a/services/sync/modules/service.js b/services/sync/modules/service.js index 90c066b4f98..9c5eb540eeb 100644 --- a/services/sync/modules/service.js +++ b/services/sync/modules/service.js @@ -635,21 +635,6 @@ WeaveSvc.prototype = { } }, - _handleResource401: function _handleResource401(request) { - // Only handle 401s that are hitting the current cluster - let spec = request.resource.spec; - let cluster = this.clusterURL; - if (spec.indexOf(cluster) != 0) - return; - - // Nothing to do if the cluster isn't changing - if (!this._setCluster()) - return; - - // Replace the old cluster with the new one to retry the request - request.newUri = this.clusterURL + spec.slice(cluster.length); - }, - // gets cluster from central LDAP server and returns it, or null on error _findCluster: function _findCluster() { this._log.debug("Finding cluster for user " + this.username); @@ -687,7 +672,7 @@ WeaveSvc.prototype = { _setCluster: function _setCluster() { // Make sure we didn't get some unexpected response for the cluster let cluster = this._findCluster(); - this._log.debug("cluster value = " + cluster); + this._log.debug("Cluster value = " + cluster); if (cluster == null) return false; @@ -695,19 +680,20 @@ WeaveSvc.prototype = { if (cluster == this.clusterURL) return false; + this._log.debug("Setting cluster to " + cluster); this.clusterURL = cluster; + Svc.Prefs.set("lastClusterUpdate", Date.now().toString()); return true; }, - // update cluster if required. returns false if the update was not required + // Update cluster if required. + // Returns false if the update was not required. _updateCluster: function _updateCluster() { + this._log.info("Updating cluster."); let cTime = Date.now(); let lastUp = parseFloat(Svc.Prefs.get("lastClusterUpdate")); if (!lastUp || ((cTime - lastUp) >= CLUSTER_BACKOFF)) { - if (this._setCluster()) { - Svc.Prefs.set("lastClusterUpdate", cTime.toString()); - return true; - } + return this._setCluster(); } return false; }, @@ -1779,8 +1765,13 @@ WeaveSvc.prototype = { throw "aborting sync, remote setup failed"; // Make sure we have an up-to-date list of clients before sending commands - this._log.trace("Refreshing client list"); - this._syncEngine(Clients); + this._log.debug("Refreshing client list."); + if (!this._syncEngine(Clients)) { + // Clients is an engine like any other; it can fail with a 401, + // and we can elect to abort the sync. + this._log.warn("Client engine sync failed. Aborting."); + return; + } // Wipe data in the desired direction if necessary switch (Svc.Prefs.get("firstSync")) { @@ -1808,7 +1799,10 @@ WeaveSvc.prototype = { throw "aborting sync, remote setup failed after processing commands"; } finally { - // Always immediately push back the local client (now without commands) + // Always immediately attempt to push back the local client (now + // without commands). + // Note that we don't abort here; if there's a 401 because we've + // been reassigned, we'll handle it around another engine. this._syncEngine(Clients); } } @@ -1826,6 +1820,15 @@ WeaveSvc.prototype = { } } + // If _syncEngine fails for a 401, we might not have a cluster URL here. + // If that's the case, break out of this immediately, rather than + // throwing an exception when trying to fetch metaURL. + if (!this.clusterURL) { + this._log.debug("Aborting sync, no cluster URL: " + + "not uploading new meta/global."); + return; + } + // Upload meta/global if any engines changed anything let meta = Records.get(this.metaURL); if (meta.isNew || meta.changed) { @@ -1834,9 +1837,9 @@ WeaveSvc.prototype = { delete meta.changed; } - if (this._syncError) + if (this._syncError) { throw "Some engines did not sync correctly"; - else { + } else { Svc.Prefs.set("lastSync", new Date().toString()); Status.sync = SYNC_SUCCEEDED; let syncTime = ((Date.now() - syncStartTime) / 1000).toFixed(2); @@ -1936,17 +1939,25 @@ WeaveSvc.prototype = { this._ignorePrefObserver = false; }, - // returns true if sync should proceed - // false / no return value means sync should be aborted + // Returns true if sync should proceed. + // false / no return value means sync should be aborted. _syncEngine: function WeaveSvc__syncEngine(engine) { try { engine.sync(); return true; } catch(e) { - // maybe a 401, cluster update needed? - if (e.status == 401 && this._updateCluster()) - return this._syncEngine(engine); + // Maybe a 401, cluster update perhaps needed? + if (e.status == 401) { + // Log out and clear the cluster URL pref. That will make us perform + // cluster detection and password check on next sync, which handles + // both causes of 401s; in either case, we won't proceed with this + // sync, so return false, but kick off a sync for next time. + this.logout(); + Svc.Prefs.reset("clusterURL"); + Utils.nextTick(this.sync, this); + return false; + } this._checkServerError(e); diff --git a/services/sync/tests/unit/test_hmac_error.js b/services/sync/tests/unit/test_hmac_error.js new file mode 100644 index 00000000000..720fa73b740 --- /dev/null +++ b/services/sync/tests/unit/test_hmac_error.js @@ -0,0 +1,245 @@ +Cu.import("resource://services-sync/engines.js"); +Cu.import("resource://services-sync/engines/clients.js"); +Cu.import("resource://services-sync/record.js"); +Cu.import("resource://services-sync/service.js"); +Cu.import("resource://services-sync/util.js"); + +// Track HMAC error counts. +let hmacErrorCount = 0; +(function () { + let hHE = Service.handleHMACEvent; + Service.handleHMACEvent = function () { + hmacErrorCount++; + return hHE.call(Service); + }; +})(); + +function shared_setup() { + hmacErrorCount = 0; + + // Do not instantiate SyncTestingInfrastructure; we need real crypto. + Service.serverURL = "http://localhost:8080/"; + Service.clusterURL = "http://localhost:8080/"; + Service.username = "foo"; + Service.password = "foo"; + Service.passphrase = "aabcdeabcdeabcdeabcdeabcde"; + + // Make sure RotaryEngine is the only one we sync. + Engines._engines = {}; + Engines.register(RotaryEngine); + let engine = Engines.get("rotary"); + engine.enabled = true; + engine.lastSync = 123; // Needs to be non-zero so that tracker is queried. + engine._store.items = {flying: "LNER Class A3 4472", + scotsman: "Flying Scotsman"}; + engine._tracker.addChangedID('scotsman', 0); + do_check_eq(1, Engines.getEnabled().length); + + let engines = {rotary: {version: engine.version, + syncID: engine.syncID}, + clients: {version: Clients.version, + syncID: Clients.syncID}}; + + // Common server objects. + let global = new ServerWBO("global", {engines: engines}); + let keysWBO = new ServerWBO("keys"); + let rotaryColl = new ServerCollection({}, true); + let clientsColl = new ServerCollection({}, true); + + return [engine, rotaryColl, clientsColl, keysWBO, global]; +} + +add_test(function hmac_error_during_404() { + _("Attempt to replicate the HMAC error setup."); + let [engine, rotaryColl, clientsColl, keysWBO, global] = shared_setup(); + + // Hand out 404s for crypto/keys. + let keysHandler = keysWBO.handler(); + let key404Counter = 0; + let keys404Handler = function (request, response) { + if (key404Counter > 0) { + let body = "Not Found"; + response.setStatusLine(request.httpVersion, 404, body); + response.bodyOutputStream.write(body, body.length); + key404Counter--; + return; + } + keysHandler(request, response); + }; + + let collectionsHelper = track_collections_helper(); + let upd = collectionsHelper.with_updated_collection; + let collections = collectionsHelper.collections; + let handlers = { + "/1.1/foo/info/collections": collectionsHelper.handler, + "/1.1/foo/storage/meta/global": upd("meta", global.handler()), + "/1.1/foo/storage/crypto/keys": upd("crypto", keys404Handler), + "/1.1/foo/storage/clients": upd("clients", clientsColl.handler()), + "/1.1/foo/storage/rotary": upd("rotary", rotaryColl.handler()) + }; + + let server = sync_httpd_setup(handlers); + + try { + _("Syncing."); + Service.sync(); + _("Partially resetting client, as if after a restart, and forcing redownload."); + CollectionKeys.clear(); + engine.lastSync = 0; // So that we redownload records. + key404Counter = 1; + _("---------------------------"); + Service.sync(); + _("---------------------------"); + + // Two rotary items, one client record... no errors. + do_check_eq(hmacErrorCount, 0) + } finally { + Svc.Prefs.resetBranch(""); + Records.clearCache(); + server.stop(run_next_test); + } +}); + +add_test(function hmac_error_during_node_reassignment() { + _("Attempt to replicate an HMAC error during node reassignment."); + let [engine, rotaryColl, clientsColl, keysWBO, global] = shared_setup(); + + let collectionsHelper = track_collections_helper(); + let upd = collectionsHelper.with_updated_collection; + + // We'll provide a 401 mid-way through the sync. This function + // simulates shifting to a node which has no data. + function on401() { + _("Deleting server data..."); + global.delete(); + rotaryColl.delete(); + keysWBO.delete(); + clientsColl.delete(); + delete collectionsHelper.collections.rotary; + delete collectionsHelper.collections.crypto; + delete collectionsHelper.collections.clients; + _("Deleted server data."); + } + + let should401 = false; + function upd401(coll, handler) { + return function (request, response) { + if (should401 && (request.method != "DELETE")) { + on401(); + should401 = false; + let body = "\"reassigned!\""; + response.setStatusLine(request.httpVersion, 401, "Node reassignment."); + response.bodyOutputStream.write(body, body.length); + return; + } + handler(request, response); + }; + } + + function sameNodeHandler(request, response) { + // Set this so that _setCluster will think we've really changed. + let url = Service.serverURL.replace("localhost", "LOCALHOST"); + _("Client requesting reassignment; pointing them to " + url); + response.setStatusLine(request.httpVersion, 200, "OK"); + response.bodyOutputStream.write(url, url.length); + } + + let handlers = { + "/user/1.0/foo/node/weave": sameNodeHandler, + "/1.1/foo/info/collections": collectionsHelper.handler, + "/1.1/foo/storage/meta/global": upd("meta", global.handler()), + "/1.1/foo/storage/crypto/keys": upd("crypto", keysWBO.handler()), + "/1.1/foo/storage/clients": upd401("clients", clientsColl.handler()), + "/1.1/foo/storage/rotary": upd("rotary", rotaryColl.handler()) + }; + + let server = sync_httpd_setup(handlers); + _("Syncing."); + // First hit of clients will 401. This will happen after meta/global and + // keys -- i.e., in the middle of the sync, but before RotaryEngine. + should401 = true; + + // Use observers to perform actions when our sync finishes. + // This allows us to observe the automatic next-tick sync that occurs after + // an abort. + function onSyncError() { + do_throw("Should not get a sync error!"); + } + function onSyncFinished() {} + let obs = { + observe: function observe(subject, topic, data) { + switch (topic) { + case "weave:service:sync:error": + onSyncError(); + break; + case "weave:service:sync:finish": + onSyncFinished(); + break; + } + } + }; + + Svc.Obs.add("weave:service:sync:finish", obs); + Svc.Obs.add("weave:service:sync:error", obs); + + // This kicks off the actual test. Split into a function here to allow this + // source file to broadly follow actual execution order. + function onwards() { + _("== Invoking first sync."); + Service.sync(); + _("We should not simultaneously have data but no keys on the server."); + let hasData = rotaryColl.wbos["flying"] || + rotaryColl.wbos["scotsman"]; + let hasKeys = keysWBO.modified; + + _("We correctly handle 401s by aborting the sync and starting again."); + do_check_true(!hasData == !hasKeys); + + _("Be prepared for the second (automatic) sync..."); + } + + _("Make sure that syncing again causes recovery."); + onSyncFinished = function() { + _("== First sync done."); + _("---------------------------"); + onSyncFinished = function() { + _("== Second (automatic) sync done."); + hasData = rotaryColl.wbos["flying"] || + rotaryColl.wbos["scotsman"]; + hasKeys = keysWBO.modified; + do_check_true(!hasData == !hasKeys); + + // Kick off another sync. Can't just call it, because we're inside the + // lock... + Utils.nextTick(function() { + _("Now a fresh sync will get no HMAC errors."); + _("Partially resetting client, as if after a restart, and forcing redownload."); + CollectionKeys.clear(); + engine.lastSync = 0; + hmacErrorCount = 0; + + onSyncFinished = function() { + // Two rotary items, one client record... no errors. + do_check_eq(hmacErrorCount, 0) + + Svc.Obs.remove("weave:service:sync:finish", obs); + Svc.Obs.remove("weave:service:sync:error", obs); + + Svc.Prefs.resetBranch(""); + Records.clearCache(); + server.stop(run_next_test); + }; + + Service.sync(); + }, + this); + }; + }; + + onwards(); +}); + +function run_test() { + initTestLogging("Trace"); + run_next_test(); +} diff --git a/services/sync/tests/unit/test_resource.js b/services/sync/tests/unit/test_resource.js index d170e5e5835..a5d22980eb4 100644 --- a/services/sync/tests/unit/test_resource.js +++ b/services/sync/tests/unit/test_resource.js @@ -185,9 +185,6 @@ function run_test() { } do_check_true(didThrow); - let did401 = false; - Observers.add("weave:resource:status:401", function() did401 = true); - _("Test that the BasicAuthenticator doesn't screw up header case."); let res1 = new Resource("http://localhost:8080/foo"); res1.setHeader("Authorization", "Basic foobar"); @@ -208,7 +205,6 @@ function run_test() { _("GET a password protected resource (test that it'll fail w/o pass, no throw)"); let res2 = new Resource("http://localhost:8080/protected"); content = res2.get(); - do_check_true(did401); do_check_eq(content, "This path exists and is protected - failed"); do_check_eq(content.status, 401); do_check_false(content.success); @@ -393,38 +389,6 @@ function run_test() { do_check_eq(error.message, "NS_ERROR_CONNECTION_REFUSED"); do_check_eq(typeof error.stack, "string"); - let redirRequest; - let redirToOpen = function(subject) { - subject.newUri = "http://localhost:8080/open"; - redirRequest = subject; - }; - Observers.add("weave:resource:status:401", redirToOpen); - - _("Notification of 401 can redirect to another uri"); - did401 = false; - let res12 = new Resource("http://localhost:8080/protected"); - content = res12.get(); - do_check_eq(res12.spec, "http://localhost:8080/open"); - do_check_eq(content, "This path exists"); - do_check_eq(content.status, 200); - do_check_true(content.success); - do_check_eq(res.data, content); - do_check_true(did401); - do_check_eq(redirRequest.response, "This path exists and is protected - failed"); - do_check_eq(redirRequest.response.status, 401); - do_check_false(redirRequest.response.success); - - Observers.remove("weave:resource:status:401", redirToOpen); - - _("Removing the observer should result in the original 401"); - did401 = false; - let res13 = new Resource("http://localhost:8080/protected"); - content = res13.get(); - do_check_true(did401); - do_check_eq(content, "This path exists and is protected - failed"); - do_check_eq(content.status, 401); - do_check_false(content.success); - _("Checking handling of errors in onProgress."); let res18 = new Resource("http://localhost:8080/json"); let onProgress = function(rec) { diff --git a/services/sync/tests/unit/test_resource_async.js b/services/sync/tests/unit/test_resource_async.js index cd35fd3cfad..793a4d55a29 100644 --- a/services/sync/tests/unit/test_resource_async.js +++ b/services/sync/tests/unit/test_resource_async.js @@ -134,9 +134,6 @@ function server_headers(metadata, response) { response.bodyOutputStream.write(body, body.length); } -let did401 = false; -Observers.add("weave:resource:status:401", function() { did401 = true; }); - let quotaValue; Observers.add("weave:service:quota:remaining", function (subject) { quotaValue = subject; }); @@ -232,7 +229,6 @@ add_test(function test_get_protected_fail() { let res2 = new AsyncResource("http://localhost:8080/protected"); res2.get(function (error, content) { do_check_eq(error, null); - do_check_true(did401); do_check_eq(content, "This path exists and is protected - failed"); do_check_eq(content.status, 401); do_check_false(content.success); @@ -549,48 +545,6 @@ add_test(function test_preserve_exceptions() { }); }); -add_test(function test_401_redirect() { - let redirRequest; - let redirToOpen = function(subject) { - subject.newUri = "http://localhost:8080/open"; - redirRequest = subject; - }; - Observers.add("weave:resource:status:401", redirToOpen); - - _("Notification of 401 can redirect to another uri"); - did401 = false; - let res12 = new AsyncResource("http://localhost:8080/protected"); - res12.get(function (error, content) { - do_check_eq(error, null); - do_check_eq(res12.spec, "http://localhost:8080/open"); - do_check_eq(content, "This path exists"); - do_check_eq(content.status, 200); - do_check_true(content.success); - do_check_eq(res12.data, content); - do_check_true(did401); - do_check_eq(redirRequest.response, "This path exists and is protected - failed"); - do_check_eq(redirRequest.response.status, 401); - do_check_false(redirRequest.response.success); - - Observers.remove("weave:resource:status:401", redirToOpen); - run_next_test(); - }); -}); - -add_test(function test_401_no_redirect() { - _("Removing the observer should result in the original 401"); - did401 = false; - let res13 = new AsyncResource("http://localhost:8080/protected"); - res13.get(function (error, content) { - do_check_eq(error, null); - do_check_true(did401); - do_check_eq(content, "This path exists and is protected - failed"); - do_check_eq(content.status, 401); - do_check_false(content.success); - run_next_test(); - }); -}); - add_test(function test_xpc_exception_handling() { _("Exception handling inside fetches."); let res14 = new AsyncResource("http://localhost:8080/json"); diff --git a/services/sync/tests/unit/xpcshell.ini b/services/sync/tests/unit/xpcshell.ini index 006e355a66f..d76f956a65f 100644 --- a/services/sync/tests/unit/xpcshell.ini +++ b/services/sync/tests/unit/xpcshell.ini @@ -30,6 +30,7 @@ tail = [test_history_engine.js] [test_history_store.js] [test_history_tracker.js] +[test_hmac_error.js] [test_jpakeclient.js] [test_keys.js] [test_load_modules.js]