Bug 656513: part 1: provide a way for record handlers to abort incoming sync. r=philiKON

This commit is contained in:
Richard Newman 2011-06-15 00:03:32 -07:00
Родитель 472afc523c
Коммит 405156adc7
3 изменённых файлов: 106 добавлений и 2 удалений

Просмотреть файл

@ -211,6 +211,11 @@ Store.prototype = {
for each (let record in records) {
try {
this.applyIncoming(record);
} catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
// This kind of exception should have a 'cause' attribute, which is an
// originating exception.
// ex.cause will carry its stack with it when rethrown.
throw ex.cause;
} catch (ex) {
this._log.warn("Failed to apply incoming record " + record.id);
this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
@ -362,6 +367,10 @@ Engine.prototype = {
_storeObj: Store,
_trackerObj: Tracker,
// Local 'constant'.
// Signal to the engine that processing further records is pointless.
eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
get prefName() this.name,
get enabled() Svc.Prefs.get("engine." + this.prefName, false),
set enabled(val) Svc.Prefs.set("engine." + this.prefName, !!val),
@ -659,9 +668,22 @@ SyncEngine.prototype = {
// Reset previousFailed for each sync since previously failed items may not fail again.
this.previousFailed = [];
// Used (via exceptions) to allow the record handler/reconciliation/etc.
// methods to signal that they would like processing of incoming records to
// cease.
let aborting = undefined;
function doApplyBatch() {
this._tracker.ignoreAll = true;
try {
failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
} catch (ex) {
// Catch any error that escapes from applyIncomingBatch. At present
// those will all be abort events.
this._log.warn("Got exception " + Utils.exceptionStr(ex) +
", aborting processIncoming.");
aborting = ex;
}
this._tracker.ignoreAll = false;
applyBatch = [];
}
@ -684,6 +706,10 @@ SyncEngine.prototype = {
// called for every incoming record.
let self = this;
newitems.recordHandler = function(item) {
if (aborting) {
return;
}
// Grab a later last modified if possible
if (self.lastModified == null || item.modified > self.lastModified)
self.lastModified = item.modified;
@ -737,6 +763,10 @@ SyncEngine.prototype = {
let shouldApply;
try {
shouldApply = self._reconcile(item);
} catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
self._log.warn("Reconciliation failed: aborting incoming processing.");
failed.push(item.id);
aborting = ex.cause;
} catch (ex) {
self._log.warn("Failed to reconcile incoming record " + item.id);
self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
@ -766,6 +796,10 @@ SyncEngine.prototype = {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
if (aborting) {
throw aborting;
}
}
// Mobile: check if we got the maximum that we requested; get the rest if so.
@ -804,7 +838,7 @@ SyncEngine.prototype = {
batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
this.guidFetchBatchSize;
while (fetchBatch.length) {
while (fetchBatch.length && !aborting) {
// Reuse the original query, but get rid of the restricting params
// and batch remaining records.
newitems.limit = 0;
@ -828,6 +862,11 @@ SyncEngine.prototype = {
this._log.debug("Records that failed to apply: " + failed);
}
failed = [];
if (aborting) {
throw aborting;
}
if (this.lastSync < this.lastModified) {
this.lastSync = this.lastModified;
}

Просмотреть файл

@ -0,0 +1,64 @@
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/util.js");
add_test(function test_processIncoming_abort() {
_("An abort exception, raised in applyIncoming, will abort _processIncoming.");
let syncTesting = new SyncTestingInfrastructure();
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
generateNewKeys();
let engine = new RotaryEngine();
_("Create some server data.");
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {rotary: {version: engine.version,
syncID: engine.syncID}};
let collection = new ServerCollection();
let id = Utils.makeGUID();
let payload = encryptPayload({id: id, denomination: "Record No. " + id});
collection.wbos[id] = new ServerWBO(id, payload);
let server = sync_httpd_setup({
"/1.1/foo/storage/rotary": collection.handler()
});
_("Fake applyIncoming to abort.");
engine._store.applyIncoming = function (record) {
let ex = {code: Engine.prototype.eEngineAbortApplyIncoming,
cause: "Nooo"};
_("Throwing: " + JSON.stringify(ex));
throw ex;
};
_("Trying _processIncoming. It will throw after aborting.");
let err;
try {
engine._syncStartup();
engine._processIncoming();
} catch (ex) {
err = ex;
}
do_check_eq(err, "Nooo");
err = undefined;
_("Trying engine.sync(). It will abort without error.");
try {
// This will quietly fail.
engine.sync();
} catch (ex) {
err = ex;
}
do_check_eq(err, undefined);
server.stop(run_next_test);
Svc.Prefs.resetBranch("");
Records.clearCache();
});
function run_test() {
run_next_test();
}

Просмотреть файл

@ -23,6 +23,7 @@ tail =
[test_collections_recovery.js]
[test_corrupt_keys.js]
[test_engine.js]
[test_engine_abort.js]
[test_enginemanager.js]
[test_forms_store.js]
[test_forms_tracker.js]