Bug 664865 - More considered handling of node reassignment mid-sync. r=philikon

This commit is contained in:
Richard Newman 2011-06-25 14:54:50 +01:00
Родитель d18eb30170
Коммит c3efa6201b
6 изменённых файлов: 288 добавлений и 132 удалений

Просмотреть файл

@ -389,25 +389,6 @@ AsyncResource.prototype = {
// actual fetch, so be warned!
XPCOMUtils.defineLazyGetter(ret, "obj", function() JSON.parse(ret));
// Notify if we get a 401 to maybe try again with a new URI.
// TODO: more retry logic.
if (status == 401) {
// Create an object to allow observers to decide if we should try again.
let subject = {
newUri: "",
resource: this,
response: ret
Observers.notify("weave:resource:status:401", subject);
// Do the same type of request but with the new URI.
if (subject.newUri != "") {
this.uri = subject.newUri;
this._doRequest(action, this._data, this._callback);
this._callback(null, ret);

Просмотреть файл

@ -635,21 +635,6 @@ WeaveSvc.prototype = {
_handleResource401: function _handleResource401(request) {
// Only handle 401s that are hitting the current cluster
let spec = request.resource.spec;
let cluster = this.clusterURL;
if (spec.indexOf(cluster) != 0)
// Nothing to do if the cluster isn't changing
if (!this._setCluster())
// Replace the old cluster with the new one to retry the request
request.newUri = this.clusterURL + spec.slice(cluster.length);
// gets cluster from central LDAP server and returns it, or null on error
_findCluster: function _findCluster() {
this._log.debug("Finding cluster for user " + this.username);
@ -687,7 +672,7 @@ WeaveSvc.prototype = {
_setCluster: function _setCluster() {
// Make sure we didn't get some unexpected response for the cluster
let cluster = this._findCluster();
this._log.debug("cluster value = " + cluster);
this._log.debug("Cluster value = " + cluster);
if (cluster == null)
return false;
@ -695,19 +680,20 @@ WeaveSvc.prototype = {
if (cluster == this.clusterURL)
return false;
this._log.debug("Setting cluster to " + cluster);
this.clusterURL = cluster;
Svc.Prefs.set("lastClusterUpdate", Date.now().toString());
return true;
// update cluster if required. returns false if the update was not required
// Update cluster if required.
// Returns false if the update was not required.
_updateCluster: function _updateCluster() {
this._log.info("Updating cluster.");
let cTime = Date.now();
let lastUp = parseFloat(Svc.Prefs.get("lastClusterUpdate"));
if (!lastUp || ((cTime - lastUp) >= CLUSTER_BACKOFF)) {
if (this._setCluster()) {
Svc.Prefs.set("lastClusterUpdate", cTime.toString());
return true;
return this._setCluster();
return false;
@ -1779,8 +1765,13 @@ WeaveSvc.prototype = {
throw "aborting sync, remote setup failed";
// Make sure we have an up-to-date list of clients before sending commands
this._log.trace("Refreshing client list");
this._log.debug("Refreshing client list.");
if (!this._syncEngine(Clients)) {
// Clients is an engine like any other; it can fail with a 401,
// and we can elect to abort the sync.
this._log.warn("Client engine sync failed. Aborting.");
// Wipe data in the desired direction if necessary
switch (Svc.Prefs.get("firstSync")) {
@ -1808,7 +1799,10 @@ WeaveSvc.prototype = {
throw "aborting sync, remote setup failed after processing commands";
finally {
// Always immediately push back the local client (now without commands)
// Always immediately attempt to push back the local client (now
// without commands).
// Note that we don't abort here; if there's a 401 because we've
// been reassigned, we'll handle it around another engine.
@ -1826,6 +1820,15 @@ WeaveSvc.prototype = {
// If _syncEngine fails for a 401, we might not have a cluster URL here.
// If that's the case, break out of this immediately, rather than
// throwing an exception when trying to fetch metaURL.
if (!this.clusterURL) {
this._log.debug("Aborting sync, no cluster URL: " +
"not uploading new meta/global.");
// Upload meta/global if any engines changed anything
let meta = Records.get(this.metaURL);
if (meta.isNew || meta.changed) {
@ -1834,9 +1837,9 @@ WeaveSvc.prototype = {
delete meta.changed;
if (this._syncError)
if (this._syncError) {
throw "Some engines did not sync correctly";
else {
} else {
Svc.Prefs.set("lastSync", new Date().toString());
Status.sync = SYNC_SUCCEEDED;
let syncTime = ((Date.now() - syncStartTime) / 1000).toFixed(2);
@ -1936,17 +1939,25 @@ WeaveSvc.prototype = {
this._ignorePrefObserver = false;
// returns true if sync should proceed
// false / no return value means sync should be aborted
// Returns true if sync should proceed.
// false / no return value means sync should be aborted.
_syncEngine: function WeaveSvc__syncEngine(engine) {
try {
return true;
catch(e) {
// maybe a 401, cluster update needed?
if (e.status == 401 && this._updateCluster())
return this._syncEngine(engine);
// Maybe a 401, cluster update perhaps needed?
if (e.status == 401) {
// Log out and clear the cluster URL pref. That will make us perform
// cluster detection and password check on next sync, which handles
// both causes of 401s; in either case, we won't proceed with this
// sync, so return false, but kick off a sync for next time.
Utils.nextTick(this.sync, this);
return false;

Просмотреть файл

@ -0,0 +1,245 @@
// Track HMAC error counts.
let hmacErrorCount = 0;
(function () {
let hHE = Service.handleHMACEvent;
Service.handleHMACEvent = function () {
return hHE.call(Service);
function shared_setup() {
hmacErrorCount = 0;
// Do not instantiate SyncTestingInfrastructure; we need real crypto.
Service.serverURL = "http://localhost:8080/";
Service.clusterURL = "http://localhost:8080/";
Service.username = "foo";
Service.password = "foo";
Service.passphrase = "aabcdeabcdeabcdeabcdeabcde";
// Make sure RotaryEngine is the only one we sync.
Engines._engines = {};
let engine = Engines.get("rotary");
engine.enabled = true;
engine.lastSync = 123; // Needs to be non-zero so that tracker is queried.
engine._store.items = {flying: "LNER Class A3 4472",
scotsman: "Flying Scotsman"};
engine._tracker.addChangedID('scotsman', 0);
do_check_eq(1, Engines.getEnabled().length);
let engines = {rotary: {version: engine.version,
syncID: engine.syncID},
clients: {version: Clients.version,
syncID: Clients.syncID}};
// Common server objects.
let global = new ServerWBO("global", {engines: engines});
let keysWBO = new ServerWBO("keys");
let rotaryColl = new ServerCollection({}, true);
let clientsColl = new ServerCollection({}, true);
return [engine, rotaryColl, clientsColl, keysWBO, global];
add_test(function hmac_error_during_404() {
_("Attempt to replicate the HMAC error setup.");
let [engine, rotaryColl, clientsColl, keysWBO, global] = shared_setup();
// Hand out 404s for crypto/keys.
let keysHandler = keysWBO.handler();
let key404Counter = 0;
let keys404Handler = function (request, response) {
if (key404Counter > 0) {
let body = "Not Found";
response.setStatusLine(request.httpVersion, 404, body);
response.bodyOutputStream.write(body, body.length);
keysHandler(request, response);
let collectionsHelper = track_collections_helper();
let upd = collectionsHelper.with_updated_collection;
let collections = collectionsHelper.collections;
let handlers = {
"/1.1/foo/info/collections": collectionsHelper.handler,
"/1.1/foo/storage/meta/global": upd("meta", global.handler()),
"/1.1/foo/storage/crypto/keys": upd("crypto", keys404Handler),
"/1.1/foo/storage/clients": upd("clients", clientsColl.handler()),
"/1.1/foo/storage/rotary": upd("rotary", rotaryColl.handler())
let server = sync_httpd_setup(handlers);
try {
_("Partially resetting client, as if after a restart, and forcing redownload.");
engine.lastSync = 0; // So that we redownload records.
key404Counter = 1;
// Two rotary items, one client record... no errors.
do_check_eq(hmacErrorCount, 0)
} finally {
add_test(function hmac_error_during_node_reassignment() {
_("Attempt to replicate an HMAC error during node reassignment.");
let [engine, rotaryColl, clientsColl, keysWBO, global] = shared_setup();
let collectionsHelper = track_collections_helper();
let upd = collectionsHelper.with_updated_collection;
// We'll provide a 401 mid-way through the sync. This function
// simulates shifting to a node which has no data.
function on401() {
_("Deleting server data...");
delete collectionsHelper.collections.rotary;
delete collectionsHelper.collections.crypto;
delete collectionsHelper.collections.clients;
_("Deleted server data.");
let should401 = false;
function upd401(coll, handler) {
return function (request, response) {
if (should401 && (request.method != "DELETE")) {
should401 = false;
let body = "\"reassigned!\"";
response.setStatusLine(request.httpVersion, 401, "Node reassignment.");
response.bodyOutputStream.write(body, body.length);
handler(request, response);
function sameNodeHandler(request, response) {
// Set this so that _setCluster will think we've really changed.
let url = Service.serverURL.replace("localhost", "LOCALHOST");
_("Client requesting reassignment; pointing them to " + url);
response.setStatusLine(request.httpVersion, 200, "OK");
response.bodyOutputStream.write(url, url.length);
let handlers = {
"/user/1.0/foo/node/weave": sameNodeHandler,
"/1.1/foo/info/collections": collectionsHelper.handler,
"/1.1/foo/storage/meta/global": upd("meta", global.handler()),
"/1.1/foo/storage/crypto/keys": upd("crypto", keysWBO.handler()),
"/1.1/foo/storage/clients": upd401("clients", clientsColl.handler()),
"/1.1/foo/storage/rotary": upd("rotary", rotaryColl.handler())
let server = sync_httpd_setup(handlers);
// First hit of clients will 401. This will happen after meta/global and
// keys -- i.e., in the middle of the sync, but before RotaryEngine.
should401 = true;
// Use observers to perform actions when our sync finishes.
// This allows us to observe the automatic next-tick sync that occurs after
// an abort.
function onSyncError() {
do_throw("Should not get a sync error!");
function onSyncFinished() {}
let obs = {
observe: function observe(subject, topic, data) {
switch (topic) {
case "weave:service:sync:error":
case "weave:service:sync:finish":
Svc.Obs.add("weave:service:sync:finish", obs);
Svc.Obs.add("weave:service:sync:error", obs);
// This kicks off the actual test. Split into a function here to allow this
// source file to broadly follow actual execution order.
function onwards() {
_("== Invoking first sync.");
_("We should not simultaneously have data but no keys on the server.");
let hasData = rotaryColl.wbos["flying"] ||
let hasKeys = keysWBO.modified;
_("We correctly handle 401s by aborting the sync and starting again.");
do_check_true(!hasData == !hasKeys);
_("Be prepared for the second (automatic) sync...");
_("Make sure that syncing again causes recovery.");
onSyncFinished = function() {
_("== First sync done.");
onSyncFinished = function() {
_("== Second (automatic) sync done.");
hasData = rotaryColl.wbos["flying"] ||
hasKeys = keysWBO.modified;
do_check_true(!hasData == !hasKeys);
// Kick off another sync. Can't just call it, because we're inside the
// lock...
Utils.nextTick(function() {
_("Now a fresh sync will get no HMAC errors.");
_("Partially resetting client, as if after a restart, and forcing redownload.");
engine.lastSync = 0;
hmacErrorCount = 0;
onSyncFinished = function() {
// Two rotary items, one client record... no errors.
do_check_eq(hmacErrorCount, 0)
Svc.Obs.remove("weave:service:sync:finish", obs);
Svc.Obs.remove("weave:service:sync:error", obs);
function run_test() {

Просмотреть файл

@ -185,9 +185,6 @@ function run_test() {
let did401 = false;
Observers.add("weave:resource:status:401", function() did401 = true);
_("Test that the BasicAuthenticator doesn't screw up header case.");
let res1 = new Resource("http://localhost:8080/foo");
res1.setHeader("Authorization", "Basic foobar");
@ -208,7 +205,6 @@ function run_test() {
_("GET a password protected resource (test that it'll fail w/o pass, no throw)");
let res2 = new Resource("http://localhost:8080/protected");
content = res2.get();
do_check_eq(content, "This path exists and is protected - failed");
do_check_eq(content.status, 401);
@ -393,38 +389,6 @@ function run_test() {
do_check_eq(error.message, "NS_ERROR_CONNECTION_REFUSED");
do_check_eq(typeof error.stack, "string");
let redirRequest;
let redirToOpen = function(subject) {
subject.newUri = "http://localhost:8080/open";
redirRequest = subject;
Observers.add("weave:resource:status:401", redirToOpen);
_("Notification of 401 can redirect to another uri");
did401 = false;
let res12 = new Resource("http://localhost:8080/protected");
content = res12.get();
do_check_eq(res12.spec, "http://localhost:8080/open");
do_check_eq(content, "This path exists");
do_check_eq(content.status, 200);
do_check_eq(res.data, content);
do_check_eq(redirRequest.response, "This path exists and is protected - failed");
do_check_eq(redirRequest.response.status, 401);
Observers.remove("weave:resource:status:401", redirToOpen);
_("Removing the observer should result in the original 401");
did401 = false;
let res13 = new Resource("http://localhost:8080/protected");
content = res13.get();
do_check_eq(content, "This path exists and is protected - failed");
do_check_eq(content.status, 401);
_("Checking handling of errors in onProgress.");
let res18 = new Resource("http://localhost:8080/json");
let onProgress = function(rec) {

Просмотреть файл

@ -134,9 +134,6 @@ function server_headers(metadata, response) {
response.bodyOutputStream.write(body, body.length);
let did401 = false;
Observers.add("weave:resource:status:401", function() { did401 = true; });
let quotaValue;
function (subject) { quotaValue = subject; });
@ -232,7 +229,6 @@ add_test(function test_get_protected_fail() {
let res2 = new AsyncResource("http://localhost:8080/protected");
res2.get(function (error, content) {
do_check_eq(error, null);
do_check_eq(content, "This path exists and is protected - failed");
do_check_eq(content.status, 401);
@ -549,48 +545,6 @@ add_test(function test_preserve_exceptions() {
add_test(function test_401_redirect() {
let redirRequest;
let redirToOpen = function(subject) {
subject.newUri = "http://localhost:8080/open";
redirRequest = subject;
Observers.add("weave:resource:status:401", redirToOpen);
_("Notification of 401 can redirect to another uri");
did401 = false;
let res12 = new AsyncResource("http://localhost:8080/protected");
res12.get(function (error, content) {
do_check_eq(error, null);
do_check_eq(res12.spec, "http://localhost:8080/open");
do_check_eq(content, "This path exists");
do_check_eq(content.status, 200);
do_check_eq(res12.data, content);
do_check_eq(redirRequest.response, "This path exists and is protected - failed");
do_check_eq(redirRequest.response.status, 401);
Observers.remove("weave:resource:status:401", redirToOpen);
add_test(function test_401_no_redirect() {
_("Removing the observer should result in the original 401");
did401 = false;
let res13 = new AsyncResource("http://localhost:8080/protected");
res13.get(function (error, content) {
do_check_eq(error, null);
do_check_eq(content, "This path exists and is protected - failed");
do_check_eq(content.status, 401);
add_test(function test_xpc_exception_handling() {
_("Exception handling inside fetches.");
let res14 = new AsyncResource("http://localhost:8080/json");

Просмотреть файл

@ -30,6 +30,7 @@ tail =