Bug 659107 - Only report failure for newly failed items. r=philikon,rnewman

This commit is contained in:
Marina Samuel 2011-05-27 17:32:13 -07:00
Родитель e8709ce0a1
Коммит 3c5655ca3e
3 изменённых файлов: 288 добавлений и 24 удалений

Просмотреть файл

@ -417,6 +417,7 @@ Engine.prototype = {
function SyncEngine(name) {
Engine.call(this, name || "SyncEngine");
this.loadToFetch();
this.loadPreviousFailed();
}
// Enumeration to define approaches to handling bad records.
@ -483,6 +484,10 @@ SyncEngine.prototype = {
get toFetch() this._toFetch,
set toFetch(val) {
// Coerce the array to a string for more efficient comparison.
if (val + "" == this._toFetch) {
return;
}
this._toFetch = val;
Utils.delay(function () {
Utils.jsonSave("toFetch/" + this.name, this, val);
@ -499,6 +504,28 @@ SyncEngine.prototype = {
});
},
get previousFailed() this._previousFailed,
set previousFailed(val) {
// Coerce the array to a string for more efficient comparison.
if (val + "" == this._previousFailed) {
return;
}
this._previousFailed = val;
Utils.delay(function () {
Utils.jsonSave("failed/" + this.name, this, val);
}, 0, this, "_previousFailedDelay");
},
loadPreviousFailed: function loadPreviousFailed() {
// Initialize to empty if there's no file
this._previousFailed = [];
Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
if (previousFailed) {
this._previousFailed = previousFailed;
}
});
},
/*
* lastSyncLocal is a timestamp in local time.
*/
@ -618,12 +645,19 @@ SyncEngine.prototype = {
newitems.newer = this.lastSync;
newitems.full = true;
newitems.limit = batchSize;
let count = {applied: 0, failed: 0, reconciled: 0};
// applied => number of items that should be applied.
// failed => number of items that failed in this sync.
// newFailed => number of items that failed for the first time in this sync.
// reconciled => number of items that were reconciled.
let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
let handled = [];
let applyBatch = [];
let failed = [];
let fetchBatch = this.toFetch;
let failedInPreviousSync = this.previousFailed;
let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
// Reset previousFailed for each sync since previously failed items may not fail again.
this.previousFailed = [];
function doApplyBatch() {
this._tracker.ignoreAll = true;
@ -639,7 +673,7 @@ SyncEngine.prototype = {
}
// Persist failed items so we refetch them.
if (failed.length) {
this.toFetch = Utils.arrayUnion(failed, this.toFetch);
this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
failed = [];
@ -787,10 +821,12 @@ SyncEngine.prototype = {
// This batch was successfully applied. Not using
// doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
fetchBatch = fetchBatch.slice(batchSize);
let newToFetch = Utils.arraySub(this.toFetch, newitems.ids);
this.toFetch = Utils.arrayUnion(newToFetch, failed);
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
if (failed.length) {
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
}
failed = [];
if (this.lastSync < this.lastModified) {
this.lastSync = this.lastModified;
@ -800,7 +836,8 @@ SyncEngine.prototype = {
// Apply remaining items.
doApplyBatchAndPersistFailed.call(this);
if (count.failed) {
count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length;
if (count.newFailed) {
// Notify observers if records failed to apply. Pass the count object
// along so that they can make an informed decision on what to do.
Observers.notify("weave:engine:sync:apply-failed", count, this.name);
@ -808,6 +845,7 @@ SyncEngine.prototype = {
this._log.info(["Records:",
count.applied, "applied,",
count.failed, "failed to apply,",
count.newFailed, "newly failed to apply,",
count.reconciled, "reconciled."].join(" "));
},
@ -1051,6 +1089,7 @@ SyncEngine.prototype = {
_resetClient: function SyncEngine__resetClient() {
this.resetLastSync();
this.previousFailed = [];
this.toFetch = [];
},

Просмотреть файл

@ -99,6 +99,36 @@ function test_toFetch() {
}
}
function test_previousFailed() {
_("SyncEngine.previousFailed corresponds to file on disk");
let syncTesting = new SyncTestingInfrastructure();
const filename = "weave/failed/steam.json";
let engine = makeSteamEngine();
try {
// Ensure pristine environment
do_check_eq(engine.previousFailed.length, 0);
// Write file to disk
let previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.previousFailed = previousFailed;
do_check_eq(engine.previousFailed, previousFailed);
// previousFailed is written asynchronously
engine._store._sleep(0);
let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
do_check_eq(fakefile, JSON.stringify(previousFailed));
// Read file from disk
previousFailed = [Utils.makeGUID(), Utils.makeGUID()];
syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(previousFailed);
engine.loadPreviousFailed();
do_check_eq(engine.previousFailed.length, 2);
do_check_eq(engine.previousFailed[0], previousFailed[0]);
do_check_eq(engine.previousFailed[1], previousFailed[1]);
} finally {
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_resetClient() {
_("SyncEngine.resetClient resets lastSync and toFetch");
let syncTesting = new SyncTestingInfrastructure();
@ -112,11 +142,13 @@ function test_resetClient() {
engine.lastSync = 123.45;
engine.lastSyncLocal = 67890;
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.resetClient();
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.lastSyncLocal, 0);
do_check_eq(engine.toFetch.length, 0);
do_check_eq(engine.previousFailed.length, 0);
} finally {
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
Svc.Prefs.resetBranch("");
@ -159,6 +191,7 @@ function run_test() {
test_syncID();
test_lastSync();
test_toFetch();
test_previousFailed();
test_resetClient();
test_wipeServer();
}

Просмотреть файл

@ -598,7 +598,7 @@ function test_processIncoming_store_toFetch() {
function test_processIncoming_resume_toFetch() {
_("toFetch items left over from previous syncs are fetched on the next sync, along with new items.");
_("toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items.");
let syncTesting = new SyncTestingInfrastructure();
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
@ -616,6 +616,13 @@ function test_processIncoming_resume_toFetch() {
collection.wbos.rekolok = new ServerWBO(
'rekolok', encryptPayload({id: 'rekolok',
denomination: "Rekonstruktionslokomotive"}));
for (var i = 0; i < 3; i++) {
let id = 'failed' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + i});
let wbo = new ServerWBO(id, payload);
wbo.modified = LASTSYNC - 10;
collection.wbos[id] = wbo;
}
collection.wbos.flying.modified = collection.wbos.scotsman.modified
= LASTSYNC - 10;
@ -625,6 +632,7 @@ function test_processIncoming_resume_toFetch() {
let engine = makeSteamEngine();
engine.lastSync = LASTSYNC;
engine.toFetch = ["flying", "scotsman"];
engine.previousFailed = ["failed0", "failed1", "failed2"];
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
@ -648,7 +656,10 @@ function test_processIncoming_resume_toFetch() {
do_check_eq(engine._store.items.flying, "LNER Class A3 4472");
do_check_eq(engine._store.items.scotsman, "Flying Scotsman");
do_check_eq(engine._store.items.rekolok, "Rekonstruktionslokomotive");
do_check_eq(engine._store.items.failed0, "Record No. 0");
do_check_eq(engine._store.items.failed1, "Record No. 1");
do_check_eq(engine._store.items.failed2, "Record No. 2");
do_check_eq(engine.previousFailed.length, 0);
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
@ -699,12 +710,13 @@ function test_processIncoming_applyIncomingBatchSize_smaller() {
engine._syncStartup();
engine._processIncoming();
// Records have been applied.
// Records have been applied and the expected failures have failed.
do_check_eq([id for (id in engine._store.items)].length,
APPLY_BATCH_SIZE - 1 - 2);
do_check_eq(engine.toFetch.length, 2);
do_check_eq(engine.toFetch[0], "record-no-0");
do_check_eq(engine.toFetch[1], "record-no-8");
do_check_eq(engine.toFetch.length, 0);
do_check_eq(engine.previousFailed.length, 2);
do_check_eq(engine.previousFailed[0], "record-no-0");
do_check_eq(engine.previousFailed[1], "record-no-8");
} finally {
server.stop(do_test_finished);
@ -770,6 +782,182 @@ function test_processIncoming_applyIncomingBatchSize_multiple() {
}
function test_processIncoming_failed_items_reported_once() {
_("Ensure that failed records are reported only once.");
let syncTesting = new SyncTestingInfrastructure();
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
const APPLY_BATCH_SIZE = 5;
const NUMBER_OF_RECORDS = 15;
// Engine that fails the first record.
let engine = makeSteamEngine();
engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
engine._store.applyIncomingBatch = function (records) {
engine._store._applyIncomingBatch(records.slice(1));
return [records[0].id];
};
// Create a batch of server side records.
let collection = new ServerCollection();
for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
let id = 'record-no-' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + id});
collection.wbos[id] = new ServerWBO(id, payload);
}
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.1/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
let called = 0;
let counts;
// Confirm initial environment.
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
do_check_eq(engine.previousFailed.length, 0);
do_check_eq([id for (id in engine._store.items)].length, 0);
Svc.Obs.add("weave:engine:sync:apply-failed", function(count) {
_("Called with " + JSON.stringify(counts));
counts = count;
called++;
});
// Do sync.
engine._syncStartup();
engine._processIncoming();
// Confirm failures.
do_check_eq([id for (id in engine._store.items)].length, 12);
do_check_eq(engine.previousFailed.length, 3);
do_check_eq(engine.previousFailed[0], "record-no-0");
do_check_eq(engine.previousFailed[1], "record-no-5");
do_check_eq(engine.previousFailed[2], "record-no-10");
// There are newly failed records and they are reported.
do_check_eq(called, 1);
do_check_eq(counts.failed, 3);
do_check_eq(counts.applied, 15);
do_check_eq(counts.newFailed, 3);
// Sync again, 1 of the failed items are the same, the rest didn't fail.
engine._processIncoming();
// Confirming removed failures.
do_check_eq([id for (id in engine._store.items)].length, 14);
do_check_eq(engine.previousFailed.length, 1);
do_check_eq(engine.previousFailed[0], "record-no-0");
// Failures weren't notified again because there were no newly failed items.
do_check_eq(called, 1);
do_check_eq(counts.failed, 3);
do_check_eq(counts.applied, 15);
do_check_eq(counts.newFailed, 3);
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
}
}
function test_processIncoming_previousFailed() {
_("Ensure that failed records are retried.");
let syncTesting = new SyncTestingInfrastructure();
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
Svc.Prefs.set("client.type", "mobile");
const APPLY_BATCH_SIZE = 4;
const NUMBER_OF_RECORDS = 14;
// Engine that fails the first 2 records.
let engine = makeSteamEngine();
engine.mobileGUIDFetchBatchSize = engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
engine._store.applyIncomingBatch = function (records) {
engine._store._applyIncomingBatch(records.slice(2));
return [records[0].id, records[1].id];
};
// Create a batch of server side records.
let collection = new ServerCollection();
for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
let id = 'record-no-' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + i});
collection.wbos[id] = new ServerWBO(id, payload);
}
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.1/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial environment.
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
do_check_eq(engine.previousFailed.length, 0);
do_check_eq([id for (id in engine._store.items)].length, 0);
// Initial failed items in previousFailed to be reset.
let previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.previousFailed = previousFailed;
do_check_eq(engine.previousFailed, previousFailed);
// Do sync.
engine._syncStartup();
engine._processIncoming();
// Expected result: 4 sync batches with 2 failures each => 8 failures
do_check_eq([id for (id in engine._store.items)].length, 6);
do_check_eq(engine.previousFailed.length, 8);
do_check_eq(engine.previousFailed[0], "record-no-0");
do_check_eq(engine.previousFailed[1], "record-no-1");
do_check_eq(engine.previousFailed[2], "record-no-4");
do_check_eq(engine.previousFailed[3], "record-no-5");
do_check_eq(engine.previousFailed[4], "record-no-8");
do_check_eq(engine.previousFailed[5], "record-no-9");
do_check_eq(engine.previousFailed[6], "record-no-12");
do_check_eq(engine.previousFailed[7], "record-no-13");
// Sync again with the same failed items (records 0, 1, 8, 9).
engine._processIncoming();
// A second sync with the same failed items should not add the same items again.
// Items that did not fail a second time should no longer be in previousFailed.
do_check_eq([id for (id in engine._store.items)].length, 10);
do_check_eq(engine.previousFailed.length, 4);
do_check_eq(engine.previousFailed[0], "record-no-0");
do_check_eq(engine.previousFailed[1], "record-no-1");
do_check_eq(engine.previousFailed[2], "record-no-8");
do_check_eq(engine.previousFailed[3], "record-no-9");
// Refetched items that didn't fail the second time are in engine._store.items.
do_check_eq(engine._store.items['record-no-4'], "Record No. 4");
do_check_eq(engine._store.items['record-no-5'], "Record No. 5");
do_check_eq(engine._store.items['record-no-12'], "Record No. 12");
do_check_eq(engine._store.items['record-no-13'], "Record No. 13");
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
}
}
function test_processIncoming_failed_records() {
_("Ensure that failed records from _reconcile and applyIncomingBatch are refetched.");
let syncTesting = new SyncTestingInfrastructure();
@ -841,6 +1029,7 @@ function test_processIncoming_failed_records() {
// Confirm initial environment
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
do_check_eq(engine.previousFailed.length, 0);
do_check_eq([id for (id in engine._store.items)].length, 0);
let observerSubject;
@ -860,11 +1049,11 @@ function test_processIncoming_failed_records() {
NUMBER_OF_RECORDS - BOGUS_RECORDS.length);
// Ensure that the bogus records will be fetched again on the next sync.
do_check_eq(engine.toFetch.length, BOGUS_RECORDS.length);
engine.toFetch.sort();
do_check_eq(engine.previousFailed.length, BOGUS_RECORDS.length);
engine.previousFailed.sort();
BOGUS_RECORDS.sort();
for (let i = 0; i < engine.toFetch.length; i++) {
do_check_eq(engine.toFetch[i], BOGUS_RECORDS[i]);
for (let i = 0; i < engine.previousFailed.length; i++) {
do_check_eq(engine.previousFailed[i], BOGUS_RECORDS[i]);
}
// Ensure the observer was notified
@ -952,6 +1141,7 @@ function test_processIncoming_decrypt_failed() {
// Confirm initial state
do_check_eq(engine.toFetch.length, 0);
do_check_eq(engine.previousFailed.length, 0);
let observerSubject;
let observerData;
@ -965,11 +1155,11 @@ function test_processIncoming_decrypt_failed() {
engine.lastSync = collection.wbos.nojson.modified - 1;
engine.sync();
do_check_eq(engine.toFetch.length, 4);
do_check_eq(engine.toFetch[0], "nojson");
do_check_eq(engine.toFetch[1], "nojson2");
do_check_eq(engine.toFetch[2], "nodecrypt");
do_check_eq(engine.toFetch[3], "nodecrypt2");
do_check_eq(engine.previousFailed.length, 4);
do_check_eq(engine.previousFailed[0], "nojson");
do_check_eq(engine.previousFailed[1], "nojson2");
do_check_eq(engine.previousFailed[2], "nodecrypt");
do_check_eq(engine.previousFailed[3], "nodecrypt2");
// Ensure the observer was notified
do_check_eq(observerData, engine.name);
@ -1457,6 +1647,8 @@ function run_test() {
test_processIncoming_resume_toFetch();
test_processIncoming_applyIncomingBatchSize_smaller();
test_processIncoming_applyIncomingBatchSize_multiple();
test_processIncoming_failed_items_reported_once();
test_processIncoming_previousFailed();
test_processIncoming_failed_records();
test_processIncoming_decrypt_failed();
test_uploadOutgoing_toEmptyServer();