diff --git a/services/sync/modules/engines.js b/services/sync/modules/engines.js index b4f4062f1e7c..f89112b32889 100644 --- a/services/sync/modules/engines.js +++ b/services/sync/modules/engines.js @@ -35,7 +35,7 @@ * * ***** END LICENSE BLOCK ***** */ -const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine', 'FileEngine']; +const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine', 'BlobEngine']; const Cc = Components.classes; const Ci = Components.interfaces; @@ -322,48 +322,30 @@ SyncEngine.prototype = { return; } - if (this._remote.status.data.GUID != this._snapshot.GUID) { - this._log.debug("Remote/local sync GUIDs do not match. " + - "Forcing initial sync."); - this._log.trace("Remote: " + this._remote.status.data.GUID); - this._log.trace("Local: " + this._snapshot.GUID); - yield this._store.resetGUIDs(self.cb); - this._snapshot.data = {}; - this._snapshot.version = -1; - this._snapshot.GUID = this._remote.status.data.GUID; - } - - this._log.info("Local snapshot version: " + this._snapshot.version); - this._log.info("Server maxVersion: " + this._remote.status.data.maxVersion); - - if ("none" != Utils.prefs.getCharPref("encryption")) - yield this._remote.keys.getKeyAndIV(self.cb, this.engineId); - // 1) Fetch server deltas this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-deltas"); - let server = {}; - let serverSnap = yield this._remote.wrap(self.cb, this._snapshot); - server.snapshot = serverSnap.data; - this._core.detectUpdates(self.cb, this._snapshot.data, server.snapshot); - server.updates = yield; + let serverSnap = yield this._remote.wrap(self.cb); + let serverUpdates = yield this._core.detectUpdates(self.cb, + this._snapshot.data, serverSnap); // 2) Generate local deltas from snapshot -> current client status this._os.notifyObservers(null, "weave:service:sync:status", "status.calculating-differences"); - let localJson = new SnapshotStore(); - localJson.data = this._store.wrap(); - this._core.detectUpdates(self.cb, this._snapshot.data, localJson.data); + let localSnap = new SnapshotStore(); + localSnap.data = this._store.wrap(); + this._core.detectUpdates(self.cb, this._snapshot.data, localSnap.data); let localUpdates = yield; - this._log.trace("local json:\n" + localJson.serialize()); + this._log.trace("local json:\n" + localSnap.serialize()); this._log.trace("Local updates: " + this._serializeCommands(localUpdates)); - this._log.trace("Server updates: " + this._serializeCommands(server.updates)); + this._log.trace("Server updates: " + this._serializeCommands(serverUpdates)); - if (server.updates.length == 0 && localUpdates.length == 0) { - this._snapshot.version = this._remote.status.data.maxVersion; + if (serverUpdates.length == 0 && localUpdates.length == 0) { this._os.notifyObservers(null, "weave:service:sync:status", "status.no-changes-required"); this._log.info("Sync complete: no changes needed on client or server"); + this._snapshot.version = this._remote.status.data.maxVersion; + this._snapshot.save(); self.done(true); return; } @@ -372,8 +354,7 @@ SyncEngine.prototype = { this._os.notifyObservers(null, "weave:service:sync:status", "status.reconciling-updates"); this._log.info("Reconciling client/server updates"); - this._core.reconcile(self.cb, localUpdates, server.updates); - let ret = yield; + let ret = yield this._core.reconcile(self.cb, localUpdates, serverUpdates); let clientChanges = ret.propagations[0]; let serverChanges = ret.propagations[1]; @@ -393,20 +374,15 @@ SyncEngine.prototype = { clientConflicts.length || serverConflicts.length)) { this._os.notifyObservers(null, "weave:service:sync:status", "status.no-changes-required"); this._log.info("Sync complete: no changes needed on client or server"); - this._snapshot.data = localJson.data; + this._snapshot.data = localSnap.data; this._snapshot.version = this._remote.status.data.maxVersion; this._snapshot.save(); self.done(true); return; } - if (clientConflicts.length || serverConflicts.length) { + if (clientConflicts.length || serverConflicts.length) this._log.warn("Conflicts found! Discarding server changes"); - } - - let savedSnap = Utils.deepCopy(this._snapshot.data); - let savedVersion = this._snapshot.version; - let newSnapshot; // 3.1) Apply server changes to local store @@ -414,29 +390,27 @@ SyncEngine.prototype = { this._log.info("Applying changes locally"); this._os.notifyObservers(null, "weave:service:sync:status", "status.applying-changes"); - // Note that we need to need to apply client changes to the - // current tree, not the saved snapshot + // apply to real store + yield this._store.applyCommands.async(this._store, self.cb, clientChanges); - localJson.applyCommands.async(localJson, self.cb, clientChanges); - yield; - this._snapshot.data = localJson.data; - this._snapshot.version = this._remote.status.data.maxVersion; - this._store.applyCommands.async(this._store, self.cb, clientChanges); - yield; - newSnapshot = this._store.wrap(); + // get the current state + let newSnap = new SnapshotStore(); + newSnap.data = this._store.wrap(); - this._core.detectUpdates(self.cb, this._snapshot.data, newSnapshot); - let diff = yield; + // apply to the snapshot we got in step 1, and compare with current state + yield localSnap.applyCommands.async(localSnap, self.cb, clientChanges); + let diff = yield this._core.detectUpdates(self.cb, + localSnap.data, newSnap.data); if (diff.length != 0) { this._log.warn("Commands did not apply correctly"); this._log.trace("Diff from snapshot+commands -> " + "new snapshot after commands:\n" + this._serializeCommands(diff)); - // FIXME: do we really want to revert the snapshot here? - this._snapshot.data = Utils.deepCopy(savedSnap); - this._snapshot.version = savedVersion; } - this._snapshot.save(); + + // update the local snap to the current state, we'll use it below + localSnap.data = newSnap.data; + localSnap.version = this._remote.status.data.maxVersion; } // 3.2) Append server delta to the delta file and upload @@ -445,10 +419,10 @@ SyncEngine.prototype = { // current client snapshot. In the case where there are no // conflicts, it should be the same as what the resolver returned - this._os.notifyObservers(null, "weave:service:sync:status", "status.calculating-differences"); - newSnapshot = this._store.wrap(); - this._core.detectUpdates(self.cb, server.snapshot, newSnapshot); - let serverDelta = yield; + this._os.notifyObservers(null, "weave:service:sync:status", + "status.calculating-differences"); + let serverDelta = yield this._core.detectUpdates(self.cb, + serverSnap, localSnap.data); // Log an error if not the same if (!(serverConflicts.length || @@ -462,46 +436,38 @@ SyncEngine.prototype = { if (serverDelta.length) { this._log.info("Uploading changes to server"); - this._snapshot.data = newSnapshot; - this._snapshot.version = ++this._remote.status.data.maxVersion; + this._os.notifyObservers(null, "weave:service:sync:status", + "status.uploading-deltas"); - // XXX don't append delta if we do a full upload? - if (this._remote.status.data.formatVersion != ENGINE_STORAGE_FORMAT_VERSION) - yield this._remote.initialize(self.cb, this._snapshot); - - let c = 0; - for (GUID in this._snapshot.data) - c++; - - this._os.notifyObservers(null, "weave:service:sync:status", "status.uploading-deltas"); - - this._remote.appendDelta(self.cb, this._snapshot, serverDelta, - {maxVersion: this._snapshot.version, - deltasEncryption: Crypto.defaultAlgorithm, - itemCount: c}); - yield; + yield this._remote.appendDelta(self.cb, localSnap, serverDelta, + {maxVersion: this._snapshot.version, + deltasEncryption: Crypto.defaultAlgorithm}); + localSnap.version = this._remote.status.data.maxVersion; this._log.info("Successfully updated deltas and status on server"); - this._snapshot.save(); } + this._snapshot.data = localSnap.data; + this._snapshot.version = localSnap.version; + this._snapshot.save(); + this._log.info("Sync complete"); self.done(true); } }; -function FileEngine() { +function BlobEngine() { // subclasses should call _init // we don't call it here because it requires serverPrefix to be set } -FileEngine.prototype = { +BlobEngine.prototype = { __proto__: new Engine(), get _profileID() { return ClientData.GUID; }, - _init: function FileEngine__init() { + _init: function BlobEngine__init() { // FIXME meep? this.__proto__.__proto__.__proto__.__proto__._init.call(this); this._keys = new Keychain(this.serverPrefix); @@ -510,8 +476,9 @@ FileEngine.prototype = { this._file.pushFilter(new CryptoFilter(this.engineId)); }, - _initialUpload: function FileEngine__initialUpload() { + _initialUpload: function BlobEngine__initialUpload() { let self = yield; + this._log.info("Initial upload to server"); yield this._keys.initialize(self.cb, this.engineId); this._file.data = {}; yield this._merge.async(this, self.cb); @@ -520,7 +487,7 @@ FileEngine.prototype = { // NOTE: Assumes this._file has latest server data // this method is separate from _sync so it's easy to override in subclasses - _merge: function FileEngine__merge() { + _merge: function BlobEngine__merge() { let self = yield; this._file.data[this._profileID] = this._store.wrap(); }, @@ -531,7 +498,7 @@ FileEngine.prototype = { // 3) Upload new merged data // NOTE: a version file will be needed in the future to optimize the case // where there are no changes - _sync: function FileEngine__sync() { + _sync: function BlobEngine__sync() { let self = yield; this._log.info("Beginning sync"); @@ -548,12 +515,45 @@ FileEngine.prototype = { yield this._file.put(self.cb); } catch (e if e.status == 404) { - this._log.info("Initial upload to server"); yield this._initialUpload.async(this, self.cb); } this._log.info("Sync complete"); - this._os.notifyObservers(null, "weave:service:sync:engine:end", this.name); + this._os.notifyObservers(null, "weave:service:sync:engine:success", this.name); self.done(true); } }; + +function HeuristicEngine() { +} +HeuristicEngine.prototype = { + __proto__: new Engine(), + + get _snapshot() { + let snap = new SnapshotStore(this.name); + this.__defineGetter__("_snapshot", function() snap); + return snap; + }, + + _resetClient: function SyncEngine__resetClient() { + let self = yield; + this._log.debug("Resetting client state"); + this._snapshot.wipe(); + this._store.wipe(); + this._log.debug("Client reset completed successfully"); + }, + + _initialUpload: function HeuristicEngine__initialUpload() { + let self = yield; + this._log.info("Initial upload to server"); + this._snapshot.data = this._store.wrap(); + this._snapshot.version = 0; + this._snapshot.GUID = null; // in case there are other snapshots out there + yield this._remote.initialize(self.cb, this._snapshot); + this._snapshot.save(); + }, + + _sync: function HeuristicEngine__sync() { + let self = yield; + } +}; diff --git a/services/sync/modules/engines/tabs.js b/services/sync/modules/engines/tabs.js index 0f6a2682de08..25c22a17c23c 100644 --- a/services/sync/modules/engines/tabs.js +++ b/services/sync/modules/engines/tabs.js @@ -55,7 +55,7 @@ function TabEngine(pbeId) { } TabEngine.prototype = { - __proto__: new FileEngine(), + __proto__: new BlobEngine(), get name() "tabs", get displayName() { return "Tabs"; }, diff --git a/services/sync/modules/remote.js b/services/sync/modules/remote.js index 81c43d079f8c..37f5bf61dea0 100644 --- a/services/sync/modules/remote.js +++ b/services/sync/modules/remote.js @@ -429,10 +429,17 @@ function RemoteStore(engine) { this._log = Log4Moz.Service.getLogger("Service.RemoteStore"); } RemoteStore.prototype = { - __proto__: new Store(), get serverPrefix() this._engine.serverPrefix, get engineId() this._engine.engineId, + __os: null, + get _os() { + if (!this.__os) + this.__os = Cc["@mozilla.org/observer-service;1"] + .getService(Ci.nsIObserverService); + return this.__os; + }, + get status() { let status = new Resource(this.serverPrefix + "status.json"); status.pushFilter(new JsonFilter()); @@ -462,7 +469,7 @@ RemoteStore.prototype = { return deltas; }, - _openSession: function RStore__openSession() { + _openSession: function RStore__openSession(lastSyncSnap) { let self = yield; if (!this.serverPrefix || !this.engineId) @@ -472,6 +479,7 @@ RemoteStore.prototype = { this.keys.data = null; this._snapshot.data = null; this._deltas.data = null; + this._lastSyncSnap = lastSyncSnap; let ret = yield DAV.MKCOL(this.serverPrefix + "deltas", self.cb); if (!ret) @@ -491,9 +499,24 @@ RemoteStore.prototype = { ENGINE_STORAGE_FORMAT_VERSION); throw "Incompatible remote store format"; } + + if (this.status.data.GUID != lastSyncSnap.GUID) { + this._log.trace("Remote GUID: " + this.status.data.GUID); + this._log.trace("Local GUID: " + lastSyncSnap.GUID); + this._log.debug("Server wipe since last sync, resetting last sync snapshot"); + lastSyncSnap.wipe(); + lastSyncSnap.GUID = this.status.data.GUID; + // yield this._store.resetGUIDs(self.cb); // XXX not sure if this is really needed (and it needs to be done from the engine if so) + } + + this._log.info("Last sync snapshot version: " + lastSyncSnap.version); + this._log.info("Server maxVersion: " + this.status.data.maxVersion); + + if ("none" != Utils.prefs.getCharPref("encryption")) + yield this.keys.getKeyAndIV(self.cb, this.engineId); }, - openSession: function RStore_openSession(onComplete) { - this._openSession.async(this, onComplete); + openSession: function RStore_openSession(onComplete, lastSyncSnap) { + this._openSession.async(this, onComplete, lastSyncSnap); }, closeSession: function RStore_closeSession() { @@ -501,6 +524,7 @@ RemoteStore.prototype = { this.keys.data = null; this._snapshot.data = null; this._deltas.data = null; + this._lastSyncSnap = null; }, // Does a fresh upload of the given snapshot to a new store @@ -511,7 +535,6 @@ RemoteStore.prototype = { yield this.keys.initialize(self.cb, this.engineId); this._os.notifyObservers(null, "weave:service:sync:status", "status.uploading-snapshot"); yield this._snapshot.put(self.cb, snapshot.data); - //yield this._deltas.put(self.cb, []); let c = 0; for (GUID in snapshot.data) @@ -533,6 +556,7 @@ RemoteStore.prototype = { }, // Removes server files - you may want to run initialize() after this + // FIXME: might want to do a PROPFIND instead (to catch all deltas in one go) _wipe: function Engine__wipe() { let self = yield; this._log.debug("Deleting remote store data"); @@ -550,57 +574,58 @@ RemoteStore.prototype = { // (snapshot + deltas) _getLatestFromScratch: function RStore__getLatestFromScratch() { let self = yield; - let status = this.status.data; this._log.info("Downloading all server data from scratch"); this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-snapshot"); let snap = new SnapshotStore(); snap.data = yield this._snapshot.get(self.cb); - snap.version = status.maxVersion; + this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-deltas"); + let status = this.status.data; for (let id = status.snapVersion + 1; id <= status.maxVersion; id++) { let delta = yield this._deltas.get(self.cb, id); yield snap.applyCommands.async(snap, self.cb, delta); } - self.done(snap); + self.done(snap.data); }, // Gets the latest server snapshot by downloading only the necessary // deltas from the given snapshot (but may fall back to a full download) - _getLatestFromSnap: function RStore__getLatestFromSnap(lastSyncSnap) { + _getLatestFromSnap: function RStore__getLatestFromSnap() { let self = yield; let deltas, snap = new SnapshotStore(); snap.version = this.status.data.maxVersion; - if (lastSyncSnap.version < this.status.data.snapVersion) { - this._log.trace("Getting latest from snap --> scratch"); - snap = yield this._getLatestFromScratch.async(this, self.cb); - self.done(snap); + if (!this._lastSyncSnap || + this._lastSyncSnap.version < this.status.data.snapVersion) { + this._log.trace("Getting latest from scratch (last sync snap too old)"); + snap.data = yield this._getLatestFromScratch.async(this, self.cb); + self.done(snap.data); return; - } else if (lastSyncSnap.version >= this.status.data.snapVersion && - lastSyncSnap.version < this.status.data.maxVersion) { + } else if (this._lastSyncSnap.version >= this.status.data.snapVersion && + this._lastSyncSnap.version < this.status.data.maxVersion) { this._log.debug("Using last sync snapshot as starting point for server snapshot"); - snap.data = Utils.deepCopy(lastSyncSnap.data); + snap.data = Utils.deepCopy(this._lastSyncSnap.data); this._log.info("Downloading server deltas"); this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-deltas"); deltas = []; - let min = lastSyncSnap.version + 1; + let min = this._lastSyncSnap.version + 1; let max = this.status.data.maxVersion; for (let id = min; id <= max; id++) { let delta = yield this._deltas.get(self.cb, id); deltas.push(delta); } - } else if (lastSyncSnap.version == this.status.data.maxVersion) { + } else if (this._lastSyncSnap.version == this.status.data.maxVersion) { this._log.debug("Using last sync snapshot as server snapshot (snap version == max version)"); this._log.trace("Local snapshot version == server maxVersion"); - snap.data = Utils.deepCopy(lastSyncSnap.data); + snap.data = Utils.deepCopy(this._lastSyncSnap.data); deltas = []; - } else { // lastSyncSnap.version > this.status.data.maxVersion + } else { // this._lastSyncSnap.version > this.status.data.maxVersion this._log.error("Server snapshot is older than local snapshot"); throw "Server snapshot is older than local snapshot"; } @@ -613,23 +638,21 @@ RemoteStore.prototype = { this._log.warn("Error applying remote deltas to saved snapshot, attempting a full download"); this._log.debug("Exception: " + Utils.exceptionStr(e)); this._log.trace("Stack:\n" + Utils.stackTrace(e)); - snap = yield this._getLatestFromScratch.async(this, self.cb); + snap.data = yield this._getLatestFromScratch.async(this, self.cb); } - self.done(snap); + self.done(snap.data); }, // get the latest server snapshot. If a snapshot is given, try to // download only the necessary deltas to get to the latest - _wrap: function RStore__wrap(snapshot) { + _wrap: function RStore__wrap() { let self = yield; - if (snapshot) - self.done(yield this._getLatestFromSnap.async(this, self.cb, snapshot)); - else - self.done(yield this._getLatestFromScratch.async(this, self.cb)); + let ret = yield this._getLatestFromSnap.async(this, self.cb); + self.done(ret); }, - wrap: function RStore_wrap(onComplete, snapshot) { - this._wrap.async(this, onComplete, snapshot); + wrap: function RStore_wrap(onComplete) { + this._wrap.async(this, onComplete); }, // Adds a new set of changes (a delta) to this store @@ -641,8 +664,12 @@ RemoteStore.prototype = { this.status.data[key] = metadata[key]; } - // FIXME: we should increment maxVersion here instead of in Engine - let id = this.status.data.maxVersion; + let c = 0; + for (item in snapshot.data) + c++; + this.status.data.itemCount = c; + + let id = ++this.status.data.maxVersion; // upload the delta even if we upload a new snapshot, so other clients // can be spared of a full re-download