Bug 622762 - Add batch API for incoming records, deal with exceptions in SyncEngine's recordHandler. r=mconnor,rnewman

This commit is contained in:
Philipp von Weitershausen 2011-01-28 09:39:12 -08:00
Родитель b48638a57d
Коммит 034dbc550a
8 изменённых файлов: 456 добавлений и 39 удалений

Просмотреть файл

@ -88,6 +88,9 @@ MASTER_PASSWORD_LOCKED_RETRY_INTERVAL: 15 * 60 * 1000, // 15 minutes
// (GUIDs can be up to 64 chars long)
MOBILE_BATCH_SIZE: 50,
// Default batch size for applying incoming records.
DEFAULT_STORE_BATCH_SIZE: 1,
// score thresholds for early syncs
SINGLE_USER_THRESHOLD: 1000,
MULTI_DESKTOP_THRESHOLD: 500,
@ -151,6 +154,7 @@ RESPONSE_OVER_QUOTA: "14",
ENGINE_UPLOAD_FAIL: "error.engine.reason.record_upload_fail",
ENGINE_DOWNLOAD_FAIL: "error.engine.reason.record_download_fail",
ENGINE_UNKNOWN_FAIL: "error.engine.reason.unknown_fail",
ENGINE_APPLY_FAIL: "error.engine.reason.apply_fail",
ENGINE_METARECORD_DOWNLOAD_FAIL: "error.engine.reason.metarecord_download_fail",
ENGINE_METARECORD_UPLOAD_FAIL: "error.engine.reason.metarecord_upload_fail",

Просмотреть файл

@ -195,6 +195,21 @@ function Store(name) {
this._log.level = Log4Moz.Level[level];
}
Store.prototype = {
applyIncomingBatch: function applyIncomingBatch(records) {
let failed = [];
records.forEach(function (record) {
try {
this.applyIncoming(record);
} catch (ex) {
this._log.warn("Failed to apply incoming record " + record.id);
this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
failed.push(record.id);
}
}, this);
return failed;
},
applyIncoming: function Store_applyIncoming(record) {
if (record.deleted)
this.remove(record);
@ -321,7 +336,6 @@ EngineManagerSvc.prototype = {
function Engine(name) {
this.Name = name || "Unnamed";
this.name = name.toLowerCase();
this.downloadLimit = null;
this._notify = Utils.notify("weave:engine:");
this._log = Log4Moz.repository.getLogger("Engine." + this.Name);
@ -475,6 +489,8 @@ SyncEngine.prototype = {
__proto__: Engine.prototype,
_recordObj: CryptoWrapper,
version: 1,
downloadLimit: null,
applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") +
"/" + ID.get("WeaveID").username + "/storage/",
@ -649,8 +665,33 @@ SyncEngine.prototype = {
newitems.full = true;
newitems.limit = batchSize;
let count = {applied: 0, reconciled: 0};
let count = {applied: 0, failed: 0, reconciled: 0};
let handled = [];
let applyBatch = [];
let failed = [];
let fetchBatch = this.toFetch;
function doApplyBatch() {
this._tracker.ignoreAll = true;
failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
this._tracker.ignoreAll = false;
applyBatch = [];
}
function doApplyBatchAndPersistFailed() {
// Apply remaining batch.
if (applyBatch.length) {
doApplyBatch.call(this);
}
// Persist failed items so we refetch them.
if (failed.length) {
this.toFetch = Utils.arrayUnion(failed, this.toFetch);
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
failed = [];
}
}
newitems.recordHandler = Utils.bind2(this, function(item) {
// Grab a later last modified if possible
if (this.lastModified == null || item.modified > this.lastModified)
@ -672,30 +713,41 @@ SyncEngine.prototype = {
// we've got new keys.
this._log.info("Trying decrypt again...");
item.decrypt();
}
if (this._reconcile(item)) {
count.applied++;
this._tracker.ignoreAll = true;
this._store.applyIncoming(item);
} else {
count.reconciled++;
this._log.trace("Skipping reconciled incoming item " + item.id);
}
} catch (ex if (Utils.isHMACMismatch(ex))) {
this._log.warn("Error processing record: " + Utils.exceptionStr(ex));
// Upload a new record to replace the bad one if we have it
if (this._store.itemExists(item.id))
this._modified[item.id] = 0;
}
} catch (ex) {
this._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
failed.push(item.id);
return;
}
let shouldApply;
try {
shouldApply = this._reconcile(item);
} catch (ex) {
this._log.warn("Failed to reconcile incoming record " + item.id);
this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
failed.push(item.id);
return;
}
if (shouldApply) {
count.applied++;
applyBatch.push(item);
} else {
count.reconciled++;
this._log.trace("Skipping reconciled incoming item " + item.id);
}
if (applyBatch.length == this.applyIncomingBatchSize) {
doApplyBatch.call(this);
}
this._tracker.ignoreAll = false;
Sync.sleep(0);
});
// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
let resp = newitems.get();
doApplyBatchAndPersistFailed.call(this);
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
@ -720,8 +772,10 @@ SyncEngine.prototype = {
// Figure out which guids weren't just fetched then remove any guids that
// were already waiting and prepend the new ones
let extra = Utils.arraySub(guids.obj, handled);
if (extra.length > 0)
this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
if (extra.length > 0) {
fetchBatch = Utils.arrayUnion(extra, fetchBatch);
this.toFetch = Utils.arrayUnion(extra, this.toFetch);
}
}
// Fast-foward the lastSync timestamp since we have stored the
@ -731,12 +785,12 @@ SyncEngine.prototype = {
}
// Mobile: process any backlog of GUIDs
while (this.toFetch.length) {
while (fetchBatch.length) {
// Reuse the original query, but get rid of the restricting params
// and batch remaining records.
newitems.limit = 0;
newitems.newer = 0;
newitems.ids = this.toFetch.slice(0, batchSize);
newitems.ids = fetchBatch.slice(0, batchSize);
// Reuse the existing record handler set earlier
let resp = newitems.get();
@ -745,15 +799,31 @@ SyncEngine.prototype = {
throw resp;
}
// This batch was successfully applied.
this.toFetch = this.toFetch.slice(batchSize);
// This batch was successfully applied. Not using
// doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
fetchBatch = fetchBatch.slice(batchSize);
let newToFetch = Utils.arraySub(this.toFetch, newitems.ids);
this.toFetch = Utils.arrayUnion(newToFetch, failed);
count.failed += failed.length;
this._log.debug("Records that failed to apply: " + failed);
failed = [];
if (this.lastSync < this.lastModified) {
this.lastSync = this.lastModified;
}
}
this._log.info(["Records:", count.applied, "applied,", count.reconciled,
"reconciled."].join(" "));
// Apply remaining items.
doApplyBatchAndPersistFailed.call(this);
if (count.failed) {
// Notify observers if records failed to apply. Pass the count object
// along so that they can make an informed decision on what to do.
Observers.notify("weave:engine:sync:apply-failed", count, this.name);
}
this._log.info(["Records:",
count.applied, "applied,",
count.failed, "failed to apply,",
count.reconciled, "reconciled."].join(" "));
},
/**

Просмотреть файл

@ -68,13 +68,13 @@ Utils.deferGetSet(HistoryRec, "cleartext", ["histUri", "title", "visits"]);
function HistoryEngine() {
SyncEngine.call(this, "History");
this.downloadLimit = MAX_HISTORY_DOWNLOAD;
}
HistoryEngine.prototype = {
__proto__: SyncEngine.prototype,
_recordObj: HistoryRec,
_storeObj: HistoryStore,
_trackerObj: HistoryTracker,
downloadLimit: MAX_HISTORY_DOWNLOAD,
_sync: Utils.batchSync("History", SyncEngine),

Просмотреть файл

@ -401,6 +401,7 @@ WeaveSvc.prototype = {
Svc.Obs.add("weave:service:sync:error", this);
Svc.Obs.add("weave:service:backoff:interval", this);
Svc.Obs.add("weave:engine:score:updated", this);
Svc.Obs.add("weave:engine:sync:apply-failed", this);
Svc.Obs.add("weave:resource:status:401", this);
Svc.Prefs.observe("engine.", this);
@ -559,6 +560,14 @@ WeaveSvc.prototype = {
case "weave:engine:score:updated":
this._handleScoreUpdate();
break;
case "weave:engine:sync:apply-failed":
// An engine isn't able to apply one or more incoming records.
// We don't fail hard on this, but it usually indicates a bug,
// so for now treat it as sync error (c.f. Service._syncEngine())
Status.engines = [data, ENGINE_APPLY_FAIL];
this._syncError = true;
this._log.debug(data + " failed to apply some records.");
break;
case "weave:resource:status:401":
this._handleResource401(subject);
break;

Просмотреть файл

@ -1451,6 +1451,13 @@ let Utils = {
return minuend.filter(function(i) subtrahend.indexOf(i) == -1);
},
/**
* Build the union of two arrays.
*/
arrayUnion: function arrayUnion(foo, bar) {
return foo.concat(Utils.arraySub(bar, foo));
},
bind2: function Async_bind2(object, method) {
return function innerBind() { return method.apply(object, arguments); };
},

Просмотреть файл

@ -74,7 +74,6 @@ function test_processIncoming_error_orderChildren() {
"/1.0/foo/storage/bookmarks": collection.handler()
});
try {
let folder1_id = Svc.Bookmark.createFolder(
@ -98,17 +97,19 @@ function test_processIncoming_error_orderChildren() {
collection.wbos[folder1_guid] = new ServerWBO(
folder1_guid, encryptPayload(folder1_payload));
// Also create a bogus server record (no parent) to provoke an exception.
// Create a bogus record that when synced down will provoke a
// network error which in turn provokes an exception in _processIncoming.
const BOGUS_GUID = "zzzzzzzzzzzz";
collection.wbos[BOGUS_GUID] = new ServerWBO(
BOGUS_GUID, encryptPayload({
id: BOGUS_GUID,
type: "folder",
title: "Bogus Folder",
parentid: null,
parentName: null,
children: []
}));
let bogus_record = collection.wbos[BOGUS_GUID]
= new ServerWBO(BOGUS_GUID, "I'm a bogus record!");
bogus_record.get = function get() {
throw "Sync this!";
};
// Make the 10 minutes old so it will only be synced in the toFetch phase.
bogus_record.modified = Date.now() / 1000 - 60 * 10;
engine.lastSync = Date.now() / 1000 - 60;
engine.toFetch = [BOGUS_GUID];
let error;
try {

Просмотреть файл

@ -135,6 +135,35 @@ function test_overQuota() {
}
}
// Slightly misplaced test as it doesn't actually test checkServerError,
// but the observer for "weave:engine:sync:apply-failed".
function test_engine_applyFailed() {
let server = sync_httpd_setup();
do_test_pending();
setUp();
Engines.register(CatapultEngine);
let engine = Engines.get("catapult");
engine.enabled = true;
engine.sync = function sync() {
Svc.Obs.notify("weave:engine:sync:apply-failed", {}, "steam");
};
try {
do_check_eq(Status.engines["steam"], undefined);
Service.login();
Service.sync();
do_check_eq(Status.engines["steam"], ENGINE_APPLY_FAIL);
} finally {
server.stop(do_test_finished);
Engines.unregister("catapult");
Status.resetSync();
Service.startOver();
}
}
function run_test() {
if (DISABLE_TESTS_BUG_604565)
return;
@ -142,4 +171,5 @@ function run_test() {
test_backoff500();
test_backoff503();
test_overQuota();
test_engine_applyFailed();
}

Просмотреть файл

@ -659,6 +659,298 @@ function test_processIncoming_resume_toFetch() {
}
function test_processIncoming_applyIncomingBatchSize_smaller() {
_("Ensure that a number of incoming items less than applyIncomingBatchSize is still applied.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
// Engine that doesn't like the first and last record it's given.
const APPLY_BATCH_SIZE = 10;
let engine = makeSteamEngine();
engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
engine._store.applyIncomingBatch = function (records) {
let failed1 = records.shift();
let failed2 = records.pop();
this._applyIncomingBatch(records);
return [failed1.id, failed2.id];
};
// Let's create less than a batch worth of server side records.
let collection = new ServerCollection();
for (let i = 0; i < APPLY_BATCH_SIZE - 1; i++) {
let id = 'record-no-' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + id});
collection.wbos[id] = new ServerWBO(id, payload);
}
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial environment
do_check_eq([id for (id in engine._store.items)].length, 0);
engine._syncStartup();
engine._processIncoming();
// Records have been applied.
do_check_eq([id for (id in engine._store.items)].length,
APPLY_BATCH_SIZE - 1 - 2);
do_check_eq(engine.toFetch.length, 2);
do_check_eq(engine.toFetch[0], "record-no-0");
do_check_eq(engine.toFetch[1], "record-no-8");
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_processIncoming_applyIncomingBatchSize_multiple() {
_("Ensure that incoming items are applied according to applyIncomingBatchSize.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
const APPLY_BATCH_SIZE = 10;
// Engine that applies records in batches.
let engine = makeSteamEngine();
engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
let batchCalls = 0;
engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
engine._store.applyIncomingBatch = function (records) {
batchCalls += 1;
do_check_eq(records.length, APPLY_BATCH_SIZE);
this._applyIncomingBatch.apply(this, arguments);
};
// Let's create three batches worth of server side records.
let collection = new ServerCollection();
for (let i = 0; i < APPLY_BATCH_SIZE * 3; i++) {
let id = 'record-no-' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + id});
collection.wbos[id] = new ServerWBO(id, payload);
}
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial environment
do_check_eq([id for (id in engine._store.items)].length, 0);
engine._syncStartup();
engine._processIncoming();
// Records have been applied in 3 batches.
do_check_eq(batchCalls, 3);
do_check_eq([id for (id in engine._store.items)].length,
APPLY_BATCH_SIZE * 3);
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_processIncoming_failed_records() {
_("Ensure that failed records from _reconcile and applyIncomingBatch are refetched.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
// Pretend to be a mobile client so we can test failed record handling
// while batching GETs.
Svc.Prefs.set("client.type", "mobile");
// Let's create three and a bit batches worth of server side records.
let collection = new ServerCollection();
const NUMBER_OF_RECORDS = MOBILE_BATCH_SIZE * 3 + 5;
for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
let id = 'record-no-' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + id});
let wbo = new ServerWBO(id, payload);
wbo.modified = Date.now()/1000 + 60 * (i - MOBILE_BATCH_SIZE * 3);
collection.wbos[id] = wbo;
}
// Engine that batches but likes to throw on a couple of records,
// two in each batch: the even ones fail in reconcile, the odd ones
// in applyIncoming.
const BOGUS_RECORDS = ["record-no-" + 42,
"record-no-" + 23,
"record-no-" + (42 + MOBILE_BATCH_SIZE),
"record-no-" + (23 + MOBILE_BATCH_SIZE),
"record-no-" + (42 + MOBILE_BATCH_SIZE * 2),
"record-no-" + (23 + MOBILE_BATCH_SIZE * 2),
"record-no-" + (2 + MOBILE_BATCH_SIZE * 3),
"record-no-" + (1 + MOBILE_BATCH_SIZE * 3)];
let engine = makeSteamEngine();
engine.applyIncomingBatchSize = MOBILE_BATCH_SIZE;
engine.__reconcile = engine._reconcile;
engine._reconcile = function _reconcile(record) {
if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
throw "I don't like this record! Baaaaaah!";
}
return this.__reconcile.apply(this, arguments);
};
engine._store._applyIncoming = engine._store.applyIncoming;
engine._store.applyIncoming = function (record) {
if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) {
throw "I don't like this record! Baaaaaah!";
}
return this._applyIncoming.apply(this, arguments);
};
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial environment
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
do_check_eq([id for (id in engine._store.items)].length, 0);
let observerSubject;
let observerData;
Svc.Obs.add("weave:engine:sync:apply-failed",
function onApplyFailed(subject, data) {
Svc.Obs.remove("weave:engine:sync:apply-failed", onApplyFailed);
observerSubject = subject;
observerData = data;
});
engine._syncStartup();
engine._processIncoming();
// Ensure that all records but the bogus 4 have been applied.
do_check_eq([id for (id in engine._store.items)].length,
NUMBER_OF_RECORDS - BOGUS_RECORDS.length);
// Ensure that the bogus records will be fetched again on the next sync.
do_check_eq(engine.toFetch.length, BOGUS_RECORDS.length);
engine.toFetch.sort();
BOGUS_RECORDS.sort();
for (let i = 0; i < engine.toFetch.length; i++) {
do_check_eq(engine.toFetch[i], BOGUS_RECORDS[i]);
}
// Ensure the observer was notified
do_check_eq(observerData, engine.name);
do_check_eq(observerSubject.failed, BOGUS_RECORDS.length);
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_processIncoming_decrypt_failed() {
_("Ensure that records failing to decrypt are either replaced or refetched.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
// Some good and some bogus records. One doesn't contain valid JSON,
// the other will throw during decrypt.
let collection = new ServerCollection();
collection.wbos.flying = new ServerWBO(
'flying', encryptPayload({id: 'flying',
denomination: "LNER Class A3 4472"}));
collection.wbos.nojson = new ServerWBO("nojson", "This is invalid JSON");
collection.wbos.nojson2 = new ServerWBO("nojson2", "This is invalid JSON");
collection.wbos.scotsman = new ServerWBO(
'scotsman', encryptPayload({id: 'scotsman',
denomination: "Flying Scotsman"}));
collection.wbos.nodecrypt = new ServerWBO("nodecrypt", "Decrypt this!");
collection.wbos.nodecrypt2 = new ServerWBO("nodecrypt2", "Decrypt this!");
// Patch the fake crypto service to throw on the record above.
Svc.Crypto._decrypt = Svc.Crypto.decrypt;
Svc.Crypto.decrypt = function (ciphertext) {
if (ciphertext == "Decrypt this!") {
throw "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz.";
}
return this._decrypt.apply(this, arguments);
};
// Some broken records also exist locally.
let engine = makeSteamEngine();
engine.enabled = true;
engine._store.items = {nojson: "Valid JSON",
nodecrypt: "Valid ciphertext"};
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial state
do_check_eq(engine.toFetch.length, 0);
let observerSubject;
let observerData;
Svc.Obs.add("weave:engine:sync:apply-failed",
function onApplyFailed(subject, data) {
Svc.Obs.remove("weave:engine:sync:apply-failed", onApplyFailed);
observerSubject = subject;
observerData = data;
});
engine.lastSync = collection.wbos.nojson.modified - 1;
engine.sync();
do_check_eq(engine.toFetch.length, 4);
do_check_eq(engine.toFetch[0], "nojson");
do_check_eq(engine.toFetch[1], "nojson2");
do_check_eq(engine.toFetch[2], "nodecrypt");
do_check_eq(engine.toFetch[3], "nodecrypt2");
// Ensure the observer was notified
do_check_eq(observerData, engine.name);
do_check_eq(observerSubject.applied, 2);
do_check_eq(observerSubject.failed, 4);
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_uploadOutgoing_toEmptyServer() {
_("SyncEngine._uploadOutgoing uploads new records to server");
@ -1128,6 +1420,10 @@ function run_test() {
test_processIncoming_mobile_batchSize();
test_processIncoming_store_toFetch();
test_processIncoming_resume_toFetch();
test_processIncoming_applyIncomingBatchSize_smaller();
test_processIncoming_applyIncomingBatchSize_multiple();
test_processIncoming_failed_records();
test_processIncoming_decrypt_failed();
test_uploadOutgoing_toEmptyServer();
test_uploadOutgoing_failed();
test_uploadOutgoing_MAX_UPLOAD_RECORDS();