Bug 591126 - Handle upload interruption gracefully [r=mconnor]

This commit is contained in:
Philipp von Weitershausen 2010-11-23 21:21:31 -08:00
Родитель be8d50e72a
Коммит fe95dc94ad
3 изменённых файлов: 82 добавлений и 74 удалений

Просмотреть файл

@ -436,13 +436,15 @@ SyncEngine.prototype = {
CryptoMetas.set(meta.uri, meta);
}
this._maybeLastSyncLocal = Date.now();
// Save objects that need to be uploaded in this._modified. We also save
// the timestamp of this fetch in this.lastSyncLocal. As we successfully
// upload objects we remove them from this._modified. If an error occurs
// or any objects fail to upload, they will remain in this._modified. At
// the end of a sync, or after an error, we add all objects remaining in
// this._modified to the tracker.
this.lastSyncLocal = Date.now();
if (this.lastSync) {
this._modified = this.getChangedIDs();
// Clear the tracker now but remember the changed IDs in case we
// need to roll back.
this._backupChangedIDs = this._tracker.changedIDs;
this._tracker.clearChangedIDs();
} else {
// Mark all items to be uploaded, but treat them as changed from long ago
this._log.debug("First sync, uploading all items");
@ -450,9 +452,15 @@ SyncEngine.prototype = {
for (let id in this._store.getAllIDs())
this._modified[id] = 0;
}
let outnum = [i for (i in this._modified)].length;
this._log.info(outnum + " outgoing items pre-reconciliation");
// Clear the tracker now. If the sync fails we'll add the ones we failed
// to upload back.
this._tracker.clearChangedIDs();
// Array of just the IDs from this._modified. This is what we iterate over
// so we can modify this._modified during the iteration.
this._modifiedIDs = [id for (id in this._modified)];
this._log.info(this._modifiedIDs.length +
" outgoing items pre-reconciliation");
// Keep track of what to delete at the end of sync
this._delete = {};
@ -622,7 +630,7 @@ SyncEngine.prototype = {
if (item.id in this._modified) {
// If the incoming and local changes are the same, skip
if (this._isEqual(item)) {
this._tracker.removeChangedID(item.id);
delete this._modified[item.id];
return false;
}
@ -654,10 +662,9 @@ SyncEngine.prototype = {
// Upload outgoing records
_uploadOutgoing: function SyncEngine__uploadOutgoing() {
let failed = {};
let outnum = [i for (i in this._modified)].length;
if (outnum) {
this._log.trace("Preparing " + outnum + " outgoing records");
if (this._modifiedIDs.length) {
this._log.trace("Preparing " + this._modifiedIDs.length +
" outgoing records");
// collection we'll upload
let up = new Collection(this.engineURL);
@ -665,7 +672,8 @@ SyncEngine.prototype = {
// Upload what we've got so far in the collection
let doUpload = Utils.bind2(this, function(desc) {
this._log.info("Uploading " + desc + " of " + outnum + " records");
this._log.info("Uploading " + desc + " of " +
this._modifiedIDs.length + " records");
let resp = up.post();
if (!resp.success) {
this._log.debug("Uploading records failed: " + resp);
@ -678,22 +686,21 @@ SyncEngine.prototype = {
if (modified > this.lastSync)
this.lastSync = modified;
// Remember changed IDs and timestamp of failed items so we
// can mark them changed again.
let failed_ids = [];
for (let id in resp.obj.failed) {
failed[id] = this._modified[id];
failed_ids.push(id);
}
let failed_ids = [id for (id in resp.obj.failed)];
if (failed_ids.length)
this._log.debug("Records that will be uploaded again because "
+ "the server couldn't store them: "
+ failed_ids.join(", "));
// Clear successfully uploaded objects.
for each (let id in resp.obj.success) {
delete this._modified[id];
}
up.clearRecords();
});
for (let id in this._modified) {
for each (let id in this._modifiedIDs) {
try {
let out = this._createRecord(id);
if (this._log.level <= Log4Moz.Level.Trace)
@ -717,16 +724,6 @@ SyncEngine.prototype = {
if (count % MAX_UPLOAD_RECORDS > 0)
doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
}
// Update local timestamp.
this.lastSyncLocal = this._maybeLastSyncLocal;
delete this._modified;
delete this._backupChangedIDs;
// Mark failed WBOs as changed again so they are reuploaded next time.
for (let id in failed) {
this._tracker.addChangedID(id, failed[id]);
}
},
// Any cleanup necessary.
@ -758,13 +755,16 @@ SyncEngine.prototype = {
}
},
_rollback: function _rollback() {
if (!this._backupChangedIDs)
_syncCleanup: function _syncCleanup() {
if (!this._modified)
return;
for (let [id, when] in Iterator(this._backupChangedIDs)) {
// Mark failed WBOs as changed again so they are reuploaded next time.
for (let [id, when] in Iterator(this._modified)) {
this._tracker.addChangedID(id, when);
}
delete this._modified;
delete this._modifiedIDs;
},
_sync: function SyncEngine__sync() {
@ -775,11 +775,8 @@ SyncEngine.prototype = {
Observers.notify("weave:engine:sync:status", "upload-outgoing");
this._uploadOutgoing();
this._syncFinish();
}
catch (e) {
this._rollback();
this._log.warn("Sync failed");
throw e;
} finally {
this._syncCleanup();
}
},

Просмотреть файл

@ -1572,7 +1572,7 @@ WeaveSvc.prototype = {
Status.engines = [engine.name, e.failureCode || ENGINE_UNKNOWN_FAIL];
this._syncError = true;
this._log.debug(Utils.exceptionStr(e));
this._log.debug(engine.name + " failed: " + Utils.exceptionStr(e));
return true;
}
finally {

Просмотреть файл

@ -191,7 +191,6 @@ function test_syncStartup_emptyOrOutdatedGlobalsResetsSync() {
// Sync was reset and server data was wiped
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.lastSyncLocal, 0);
do_check_eq(collection.wbos.flying.payload, undefined);
do_check_eq(collection.wbos.scotsman.payload, undefined);
@ -252,7 +251,6 @@ function test_syncStartup_metaGet404() {
_("Sync was reset and server data was wiped");
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.lastSyncLocal, 0);
do_check_eq(collection.wbos.flying.payload, undefined);
do_check_eq(collection.wbos.scotsman.payload, undefined);
@ -372,7 +370,6 @@ function test_syncStartup_syncIDMismatchResetsClient() {
// Sync was reset
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.lastSyncLocal, 0);
} finally {
server.stop(do_test_finished);
@ -438,7 +435,6 @@ function test_syncStartup_badKeyWipesServerData() {
// Sync was reset and server data was wiped
do_check_eq(engine.lastSync, 0);
do_check_eq(engine.lastSyncLocal, 0);
do_check_eq(collection.wbos.flying.payload, undefined);
do_check_eq(collection.wbos.scotsman.payload, undefined);
@ -877,8 +873,8 @@ function test_uploadOutgoing_failed() {
do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
engine._syncStartup();
engine._uploadOutgoing();
engine.enabled = true;
engine.sync();
// Local timestamp has been set.
do_check_true(engine.lastSyncLocal > 0);
@ -1095,15 +1091,18 @@ function test_syncFinish_deleteLotsInBatches() {
}
}
function test_sync_rollback() {
_("SyncEngine.sync() rolls back tracker's changedIDs when syncing fails.");
function test_sync_partialUpload() {
_("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
Svc.Prefs.set("username", "foo");
// Set up a server without a "steam" collection handler, so sync will fail.
let crypto_steam = new ServerWBO('steam');
let collection = new ServerCollection();
let server = sync_httpd_setup({
"/1.0/foo/storage/crypto/steam": crypto_steam.handler()
"/1.0/foo/storage/crypto/steam": crypto_steam.handler(),
"/1.0/foo/storage/steam": collection.handler()
});
do_test_pending();
createAndUploadKeypair();
@ -1112,31 +1111,34 @@ function test_sync_rollback() {
let engine = makeSteamEngine();
engine.lastSync = 123; // needs to be non-zero so that tracker is queried
engine.lastSyncLocal = 456;
engine._store.items = {flying: "LNER Class A3 4472",
scotsman: "Flying Scotsman",
peppercorn: "Peppercorn Class"};
// Mark these records as changed
const FLYING_CHANGED = 12345;
const SCOTSMAN_CHANGED = 23456;
const PEPPERCORN_CHANGED = 34567;
engine._tracker.addChangedID('flying', FLYING_CHANGED);
engine._tracker.addChangedID('scotsman', SCOTSMAN_CHANGED);
engine._tracker.addChangedID('peppercorn', PEPPERCORN_CHANGED);
// Let the third upload fail completely
var noOfUploads = 0;
collection.post = (function(orig) {
return function() {
if (noOfUploads == 2)
throw "FAIL!";
noOfUploads++;
return orig.apply(this, arguments);
};
}(collection.post));
// Create a bunch of records (and server side handlers)
for (let i = 0; i < 234; i++) {
let id = 'record-no-' + i;
engine._store.items[id] = "Record No. " + i;
engine._tracker.addChangedID(id, i);
// Let two items in the first upload batch fail.
if ((i != 23) && (i != 42))
collection.wbos[id] = new ServerWBO(id);
}
let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
meta_global.payload.engines = {steam: {version: engine.version,
syncID: engine.syncID}};
try {
// Confirm initial environment
do_check_eq(engine.lastSyncLocal, 456);
do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
engine.enabled = true;
let error;
try {
@ -1146,11 +1148,20 @@ function test_sync_rollback() {
}
do_check_true(!!error);
// Verify that the tracker state and local timestamp has been rolled back.
do_check_eq(engine.lastSyncLocal, 456);
do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
// The timestamp has been updated.
do_check_true(engine.lastSyncLocal > 456);
for (let i = 0; i < 234; i++) {
let id = 'record-no-' + i;
// Ensure failed records are back in the tracker:
// * records no. 23 and 42 were rejected by the server,
// * records no. 200 and higher couldn't be uploaded because we failed
// hard on the 3rd upload.
if ((i == 23) || (i == 42) || (i >= 200))
do_check_eq(engine._tracker.changedIDs[id], i);
else
do_check_false(id in engine._tracker.changedIDs);
}
} finally {
server.stop(do_test_finished);
@ -1243,7 +1254,7 @@ function run_test() {
test_syncFinish_noDelete();
test_syncFinish_deleteByIds();
test_syncFinish_deleteLotsInBatches();
test_sync_rollback();
test_sync_partialUpload();
test_canDecrypt_noCryptoMeta();
test_canDecrypt_true();
}