Bug 615284 - Download chunking needs to be more resilient against app shutdowns. r=mconnor

This commit is contained in:
Philipp von Weitershausen 2011-01-26 21:34:31 -08:00
Родитель 38886bdf84
Коммит 8fcbca0f65
3 изменённых файлов: 205 добавлений и 16 удалений

Просмотреть файл

@ -303,7 +303,6 @@ EngineManagerSvc.prototype = {
name = name.name || "";
let out = "Could not initialize engine '" + name + "': " + mesg;
dump(out);
this._log.error(out);
return engineObject;
@ -468,6 +467,7 @@ Engine.prototype = {
function SyncEngine(name) {
Engine.call(this, name || "SyncEngine");
this.loadToFetch();
}
SyncEngine.prototype = {
__proto__: Engine.prototype,
@ -511,6 +511,22 @@ SyncEngine.prototype = {
this.lastSyncLocal = 0;
},
get toFetch() this._toFetch,
set toFetch(val) {
this._toFetch = val;
Utils.delay(function () {
Utils.jsonSave("toFetch/" + this.name, this, val);
}, 0, this, "_toFetchDelay");
},
loadToFetch: function loadToFetch() {
// Initialize to empty if there's no file
this._toFetch = [];
Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
this._toFetch = toFetch;
});
},
/*
* lastSyncLocal is a timestamp in local time.
*/
@ -683,14 +699,13 @@ SyncEngine.prototype = {
}
// Mobile: check if we got the maximum that we requested; get the rest if so.
let toFetch = [];
if (handled.length == newitems.limit) {
let guidColl = new Collection(this.engineURL);
// Sort and limit so that on mobile we only get the last X records.
guidColl.limit = this.downloadLimit;
guidColl.newer = this.lastSync;
// index: Orders by the sortindex descending (highest weight first).
guidColl.sort = "index";
@ -702,18 +717,22 @@ SyncEngine.prototype = {
// were already waiting and prepend the new ones
let extra = Utils.arraySub(guids.obj, handled);
if (extra.length > 0)
toFetch = extra.concat(Utils.arraySub(toFetch, extra));
this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
}
// Fast-foward the lastSync timestamp since we have stored the
// remaining items in toFetch.
if (this.lastSync < this.lastModified) {
this.lastSync = this.lastModified;
}
// Mobile: process any backlog of GUIDs
while (toFetch.length) {
while (this.toFetch.length) {
// Reuse the original query, but get rid of the restricting params
// and batch remaining records.
newitems.limit = 0;
newitems.newer = 0;
// Get the first bunch of records and save the rest for later
newitems.ids = toFetch.slice(0, batchSize);
toFetch = toFetch.slice(batchSize);
newitems.ids = this.toFetch.slice(0, batchSize);
// Reuse the existing record handler set earlier
let resp = newitems.get();
@ -721,10 +740,13 @@ SyncEngine.prototype = {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
}
if (this.lastSync < this.lastModified)
this.lastSync = this.lastModified;
// This batch was successfully applied.
this.toFetch = this.toFetch.slice(batchSize);
if (this.lastSync < this.lastModified) {
this.lastSync = this.lastModified;
}
}
this._log.info(["Records:", count.applied, "applied,", count.reconciled,
"reconciled."].join(" "));
@ -969,6 +991,7 @@ SyncEngine.prototype = {
_resetClient: function SyncEngine__resetClient() {
this.resetLastSync();
this.toFetch = [];
},
wipeServer: function wipeServer() {

Просмотреть файл

@ -1,5 +1,6 @@
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/util.js");
Cu.import("resource://services-sync/ext/Sync.js");
function makeSteamEngine() {
return new SyncEngine('Steam');
@ -69,20 +70,52 @@ function test_lastSync() {
}
}
function test_toFetch() {
_("SyncEngine.toFetch corresponds to file on disk");
const filename = "weave/toFetch/steam.json";
let engine = makeSteamEngine();
try {
// Ensure pristine environment
do_check_eq(engine.toFetch.length, 0);
// Write file to disk
let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.toFetch = toFetch;
do_check_eq(engine.toFetch, toFetch);
// toFetch is written asynchronously
Sync.sleep(0);
let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
do_check_eq(fakefile, JSON.stringify(toFetch));
// Read file from disk
toFetch = [Utils.makeGUID(), Utils.makeGUID()];
syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(toFetch);
engine.loadToFetch();
do_check_eq(engine.toFetch.length, 2);
do_check_eq(engine.toFetch[0], toFetch[0]);
do_check_eq(engine.toFetch[1], toFetch[1]);
} finally {
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_resetClient() {
_("SyncEngine.resetClient resets lastSync");
_("SyncEngine.resetClient resets lastSync and toFetch");
let engine = makeSteamEngine();
try {
// Ensure pristine environment
do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined);
do_check_eq(Svc.Prefs.get("steam.lastSyncLocal"), undefined);
do_check_eq(engine.toFetch.length, 0);
engine.lastSync = 123.45;
engine.lastSyncLocal = 67890;
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
engine.resetClient();
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.lastSyncLocal, 0);
do_check_eq(engine.toFetch.length, 0);
} finally {
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
Svc.Prefs.resetBranch("");
@ -104,11 +137,13 @@ function test_wipeServer() {
try {
// Some data to reset.
engine.lastSync = 123.45;
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
_("Wipe server data and reset client.");
engine.wipeServer();
do_check_eq(steamCollection.payload, undefined);
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.toFetch.length, 0);
} finally {
server.stop(do_test_finished);
@ -121,6 +156,7 @@ function run_test() {
test_url_attributes();
test_syncID();
test_lastSync();
test_toFetch();
test_resetClient();
test_wipeServer();
}

Просмотреть файл

@ -462,7 +462,6 @@ function test_processIncoming_mobile_batchSize() {
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
Svc.Prefs.set("client.type", "mobile");
let crypto_steam = new ServerWBO('steam');
// A collection that logs each GET
let collection = new ServerCollection();
@ -529,6 +528,137 @@ function test_processIncoming_mobile_batchSize() {
}
}
function test_processIncoming_store_toFetch() {
_("If processIncoming fails in the middle of a batch on mobile, state is saved in toFetch and lastSync.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
Svc.Prefs.set("client.type", "mobile");
// A collection that throws at the fourth get.
let collection = new ServerCollection();
collection._get_calls = 0;
collection._get = collection.get;
collection.get = function() {
this._get_calls += 1;
if (this._get_calls > 3) {
throw "Abort on fourth call!";
}
return this._get.apply(this, arguments);
};
// Let's create three batches worth of server side records.
for (var i = 0; i < MOBILE_BATCH_SIZE * 3; i++) {
let id = 'record-no-' + i;
let payload = encryptPayload({id: id, denomination: "Record No. " + id});
let wbo = new ServerWBO(id, payload);
wbo.modified = Date.now()/1000 + 60 * (i - MOBILE_BATCH_SIZE * 3);
collection.wbos[id] = wbo;
}
let engine = makeSteamEngine();
engine.enabled = true;
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial environment
do_check_eq(engine.lastSync, 0);
do_check_eq([id for (id in engine._store.items)].length, 0);
let error;
try {
engine.sync();
} catch (ex) {
error = ex;
}
do_check_true(!!error);
// Only the first two batches have been applied.
do_check_eq([id for (id in engine._store.items)].length,
MOBILE_BATCH_SIZE * 2);
// The third batch is stuck in toFetch. lastSync has been moved forward to
// the last successful item's timestamp.
do_check_eq(engine.toFetch.length, MOBILE_BATCH_SIZE);
do_check_eq(engine.lastSync, collection.wbos["record-no-99"].modified);
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_processIncoming_resume_toFetch() {
_("toFetch items left over from previous syncs are fetched on the next sync, along with new items.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
const LASTSYNC = Date.now() / 1000;
// Server records that will be downloaded
let collection = new ServerCollection();
collection.wbos.flying = new ServerWBO(
'flying', encryptPayload({id: 'flying',
denomination: "LNER Class A3 4472"}));
collection.wbos.scotsman = new ServerWBO(
'scotsman', encryptPayload({id: 'scotsman',
denomination: "Flying Scotsman"}));
collection.wbos.rekolok = new ServerWBO(
'rekolok', encryptPayload({id: 'rekolok',
denomination: "Rekonstruktionslokomotive"}));
collection.wbos.flying.modified = collection.wbos.scotsman.modified
= LASTSYNC - 10;
collection.wbos.rekolok.modified = LASTSYNC + 10;
// Time travel 10 seconds into the future but still download the above WBOs.
let engine = makeSteamEngine();
engine.lastSync = LASTSYNC;
engine.toFetch = ["flying", "scotsman"];
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
let server = sync_httpd_setup({
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
try {
// Confirm initial environment
do_check_eq(engine._store.items.flying, undefined);
do_check_eq(engine._store.items.scotsman, undefined);
do_check_eq(engine._store.items.rekolok, undefined);
engine._syncStartup();
engine._processIncoming();
// Local records have been created from the server data.
do_check_eq(engine._store.items.flying, "LNER Class A3 4472");
do_check_eq(engine._store.items.scotsman, "Flying Scotsman");
do_check_eq(engine._store.items.rekolok, "Rekonstruktionslokomotive");
} finally {
server.stop(do_test_finished);
Svc.Prefs.resetBranch("");
Records.clearCache();
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
}
}
function test_uploadOutgoing_toEmptyServer() {
_("SyncEngine._uploadOutgoing uploads new records to server");
@ -850,10 +980,8 @@ function test_sync_partialUpload() {
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
let crypto_steam = new ServerWBO('steam');
let collection = new ServerCollection();
let server = sync_httpd_setup({
"/1.0/foo/storage/crypto/steam": crypto_steam.handler(),
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
@ -998,6 +1126,8 @@ function run_test() {
test_processIncoming_createFromServer();
test_processIncoming_reconcile();
test_processIncoming_mobile_batchSize();
test_processIncoming_store_toFetch();
test_processIncoming_resume_toFetch();
test_uploadOutgoing_toEmptyServer();
test_uploadOutgoing_failed();
test_uploadOutgoing_MAX_UPLOAD_RECORDS();