move some code from the engine to remote.js; rename FileEngine to BlobEngine (since it doesn't actually sync complete files); clean up SyncEngine's sync method

This commit is contained in:
Dan Mills 2008-08-08 14:42:57 -07:00
Родитель 7785d82950
Коммит dea1506c58
3 изменённых файлов: 143 добавлений и 116 удалений

Просмотреть файл

@ -35,7 +35,7 @@
*
* ***** END LICENSE BLOCK ***** */
const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine', 'FileEngine'];
const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine', 'BlobEngine'];
const Cc = Components.classes;
const Ci = Components.interfaces;
@ -322,48 +322,30 @@ SyncEngine.prototype = {
return;
}
if (this._remote.status.data.GUID != this._snapshot.GUID) {
this._log.debug("Remote/local sync GUIDs do not match. " +
"Forcing initial sync.");
this._log.trace("Remote: " + this._remote.status.data.GUID);
this._log.trace("Local: " + this._snapshot.GUID);
yield this._store.resetGUIDs(self.cb);
this._snapshot.data = {};
this._snapshot.version = -1;
this._snapshot.GUID = this._remote.status.data.GUID;
}
this._log.info("Local snapshot version: " + this._snapshot.version);
this._log.info("Server maxVersion: " + this._remote.status.data.maxVersion);
if ("none" != Utils.prefs.getCharPref("encryption"))
yield this._remote.keys.getKeyAndIV(self.cb, this.engineId);
// 1) Fetch server deltas
this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-deltas");
let server = {};
let serverSnap = yield this._remote.wrap(self.cb, this._snapshot);
server.snapshot = serverSnap.data;
this._core.detectUpdates(self.cb, this._snapshot.data, server.snapshot);
server.updates = yield;
let serverSnap = yield this._remote.wrap(self.cb);
let serverUpdates = yield this._core.detectUpdates(self.cb,
this._snapshot.data, serverSnap);
// 2) Generate local deltas from snapshot -> current client status
this._os.notifyObservers(null, "weave:service:sync:status", "status.calculating-differences");
let localJson = new SnapshotStore();
localJson.data = this._store.wrap();
this._core.detectUpdates(self.cb, this._snapshot.data, localJson.data);
let localSnap = new SnapshotStore();
localSnap.data = this._store.wrap();
this._core.detectUpdates(self.cb, this._snapshot.data, localSnap.data);
let localUpdates = yield;
this._log.trace("local json:\n" + localJson.serialize());
this._log.trace("local json:\n" + localSnap.serialize());
this._log.trace("Local updates: " + this._serializeCommands(localUpdates));
this._log.trace("Server updates: " + this._serializeCommands(server.updates));
this._log.trace("Server updates: " + this._serializeCommands(serverUpdates));
if (server.updates.length == 0 && localUpdates.length == 0) {
this._snapshot.version = this._remote.status.data.maxVersion;
if (serverUpdates.length == 0 && localUpdates.length == 0) {
this._os.notifyObservers(null, "weave:service:sync:status", "status.no-changes-required");
this._log.info("Sync complete: no changes needed on client or server");
this._snapshot.version = this._remote.status.data.maxVersion;
this._snapshot.save();
self.done(true);
return;
}
@ -372,8 +354,7 @@ SyncEngine.prototype = {
this._os.notifyObservers(null, "weave:service:sync:status", "status.reconciling-updates");
this._log.info("Reconciling client/server updates");
this._core.reconcile(self.cb, localUpdates, server.updates);
let ret = yield;
let ret = yield this._core.reconcile(self.cb, localUpdates, serverUpdates);
let clientChanges = ret.propagations[0];
let serverChanges = ret.propagations[1];
@ -393,20 +374,15 @@ SyncEngine.prototype = {
clientConflicts.length || serverConflicts.length)) {
this._os.notifyObservers(null, "weave:service:sync:status", "status.no-changes-required");
this._log.info("Sync complete: no changes needed on client or server");
this._snapshot.data = localJson.data;
this._snapshot.data = localSnap.data;
this._snapshot.version = this._remote.status.data.maxVersion;
this._snapshot.save();
self.done(true);
return;
}
if (clientConflicts.length || serverConflicts.length) {
if (clientConflicts.length || serverConflicts.length)
this._log.warn("Conflicts found! Discarding server changes");
}
let savedSnap = Utils.deepCopy(this._snapshot.data);
let savedVersion = this._snapshot.version;
let newSnapshot;
// 3.1) Apply server changes to local store
@ -414,29 +390,27 @@ SyncEngine.prototype = {
this._log.info("Applying changes locally");
this._os.notifyObservers(null, "weave:service:sync:status", "status.applying-changes");
// Note that we need to need to apply client changes to the
// current tree, not the saved snapshot
// apply to real store
yield this._store.applyCommands.async(this._store, self.cb, clientChanges);
localJson.applyCommands.async(localJson, self.cb, clientChanges);
yield;
this._snapshot.data = localJson.data;
this._snapshot.version = this._remote.status.data.maxVersion;
this._store.applyCommands.async(this._store, self.cb, clientChanges);
yield;
newSnapshot = this._store.wrap();
// get the current state
let newSnap = new SnapshotStore();
newSnap.data = this._store.wrap();
this._core.detectUpdates(self.cb, this._snapshot.data, newSnapshot);
let diff = yield;
// apply to the snapshot we got in step 1, and compare with current state
yield localSnap.applyCommands.async(localSnap, self.cb, clientChanges);
let diff = yield this._core.detectUpdates(self.cb,
localSnap.data, newSnap.data);
if (diff.length != 0) {
this._log.warn("Commands did not apply correctly");
this._log.trace("Diff from snapshot+commands -> " +
"new snapshot after commands:\n" +
this._serializeCommands(diff));
// FIXME: do we really want to revert the snapshot here?
this._snapshot.data = Utils.deepCopy(savedSnap);
this._snapshot.version = savedVersion;
}
this._snapshot.save();
// update the local snap to the current state, we'll use it below
localSnap.data = newSnap.data;
localSnap.version = this._remote.status.data.maxVersion;
}
// 3.2) Append server delta to the delta file and upload
@ -445,10 +419,10 @@ SyncEngine.prototype = {
// current client snapshot. In the case where there are no
// conflicts, it should be the same as what the resolver returned
this._os.notifyObservers(null, "weave:service:sync:status", "status.calculating-differences");
newSnapshot = this._store.wrap();
this._core.detectUpdates(self.cb, server.snapshot, newSnapshot);
let serverDelta = yield;
this._os.notifyObservers(null, "weave:service:sync:status",
"status.calculating-differences");
let serverDelta = yield this._core.detectUpdates(self.cb,
serverSnap, localSnap.data);
// Log an error if not the same
if (!(serverConflicts.length ||
@ -462,46 +436,38 @@ SyncEngine.prototype = {
if (serverDelta.length) {
this._log.info("Uploading changes to server");
this._snapshot.data = newSnapshot;
this._snapshot.version = ++this._remote.status.data.maxVersion;
this._os.notifyObservers(null, "weave:service:sync:status",
"status.uploading-deltas");
// XXX don't append delta if we do a full upload?
if (this._remote.status.data.formatVersion != ENGINE_STORAGE_FORMAT_VERSION)
yield this._remote.initialize(self.cb, this._snapshot);
let c = 0;
for (GUID in this._snapshot.data)
c++;
this._os.notifyObservers(null, "weave:service:sync:status", "status.uploading-deltas");
this._remote.appendDelta(self.cb, this._snapshot, serverDelta,
{maxVersion: this._snapshot.version,
deltasEncryption: Crypto.defaultAlgorithm,
itemCount: c});
yield;
yield this._remote.appendDelta(self.cb, localSnap, serverDelta,
{maxVersion: this._snapshot.version,
deltasEncryption: Crypto.defaultAlgorithm});
localSnap.version = this._remote.status.data.maxVersion;
this._log.info("Successfully updated deltas and status on server");
this._snapshot.save();
}
this._snapshot.data = localSnap.data;
this._snapshot.version = localSnap.version;
this._snapshot.save();
this._log.info("Sync complete");
self.done(true);
}
};
function FileEngine() {
function BlobEngine() {
// subclasses should call _init
// we don't call it here because it requires serverPrefix to be set
}
FileEngine.prototype = {
BlobEngine.prototype = {
__proto__: new Engine(),
get _profileID() {
return ClientData.GUID;
},
_init: function FileEngine__init() {
_init: function BlobEngine__init() {
// FIXME meep?
this.__proto__.__proto__.__proto__.__proto__._init.call(this);
this._keys = new Keychain(this.serverPrefix);
@ -510,8 +476,9 @@ FileEngine.prototype = {
this._file.pushFilter(new CryptoFilter(this.engineId));
},
_initialUpload: function FileEngine__initialUpload() {
_initialUpload: function BlobEngine__initialUpload() {
let self = yield;
this._log.info("Initial upload to server");
yield this._keys.initialize(self.cb, this.engineId);
this._file.data = {};
yield this._merge.async(this, self.cb);
@ -520,7 +487,7 @@ FileEngine.prototype = {
// NOTE: Assumes this._file has latest server data
// this method is separate from _sync so it's easy to override in subclasses
_merge: function FileEngine__merge() {
_merge: function BlobEngine__merge() {
let self = yield;
this._file.data[this._profileID] = this._store.wrap();
},
@ -531,7 +498,7 @@ FileEngine.prototype = {
// 3) Upload new merged data
// NOTE: a version file will be needed in the future to optimize the case
// where there are no changes
_sync: function FileEngine__sync() {
_sync: function BlobEngine__sync() {
let self = yield;
this._log.info("Beginning sync");
@ -548,12 +515,45 @@ FileEngine.prototype = {
yield this._file.put(self.cb);
} catch (e if e.status == 404) {
this._log.info("Initial upload to server");
yield this._initialUpload.async(this, self.cb);
}
this._log.info("Sync complete");
this._os.notifyObservers(null, "weave:service:sync:engine:end", this.name);
this._os.notifyObservers(null, "weave:service:sync:engine:success", this.name);
self.done(true);
}
};
function HeuristicEngine() {
}
HeuristicEngine.prototype = {
__proto__: new Engine(),
get _snapshot() {
let snap = new SnapshotStore(this.name);
this.__defineGetter__("_snapshot", function() snap);
return snap;
},
_resetClient: function SyncEngine__resetClient() {
let self = yield;
this._log.debug("Resetting client state");
this._snapshot.wipe();
this._store.wipe();
this._log.debug("Client reset completed successfully");
},
_initialUpload: function HeuristicEngine__initialUpload() {
let self = yield;
this._log.info("Initial upload to server");
this._snapshot.data = this._store.wrap();
this._snapshot.version = 0;
this._snapshot.GUID = null; // in case there are other snapshots out there
yield this._remote.initialize(self.cb, this._snapshot);
this._snapshot.save();
},
_sync: function HeuristicEngine__sync() {
let self = yield;
}
};

Просмотреть файл

@ -55,7 +55,7 @@ function TabEngine(pbeId) {
}
TabEngine.prototype = {
__proto__: new FileEngine(),
__proto__: new BlobEngine(),
get name() "tabs",
get displayName() { return "Tabs"; },

Просмотреть файл

@ -429,10 +429,17 @@ function RemoteStore(engine) {
this._log = Log4Moz.Service.getLogger("Service.RemoteStore");
}
RemoteStore.prototype = {
__proto__: new Store(),
get serverPrefix() this._engine.serverPrefix,
get engineId() this._engine.engineId,
__os: null,
get _os() {
if (!this.__os)
this.__os = Cc["@mozilla.org/observer-service;1"]
.getService(Ci.nsIObserverService);
return this.__os;
},
get status() {
let status = new Resource(this.serverPrefix + "status.json");
status.pushFilter(new JsonFilter());
@ -462,7 +469,7 @@ RemoteStore.prototype = {
return deltas;
},
_openSession: function RStore__openSession() {
_openSession: function RStore__openSession(lastSyncSnap) {
let self = yield;
if (!this.serverPrefix || !this.engineId)
@ -472,6 +479,7 @@ RemoteStore.prototype = {
this.keys.data = null;
this._snapshot.data = null;
this._deltas.data = null;
this._lastSyncSnap = lastSyncSnap;
let ret = yield DAV.MKCOL(this.serverPrefix + "deltas", self.cb);
if (!ret)
@ -491,9 +499,24 @@ RemoteStore.prototype = {
ENGINE_STORAGE_FORMAT_VERSION);
throw "Incompatible remote store format";
}
if (this.status.data.GUID != lastSyncSnap.GUID) {
this._log.trace("Remote GUID: " + this.status.data.GUID);
this._log.trace("Local GUID: " + lastSyncSnap.GUID);
this._log.debug("Server wipe since last sync, resetting last sync snapshot");
lastSyncSnap.wipe();
lastSyncSnap.GUID = this.status.data.GUID;
// yield this._store.resetGUIDs(self.cb); // XXX not sure if this is really needed (and it needs to be done from the engine if so)
}
this._log.info("Last sync snapshot version: " + lastSyncSnap.version);
this._log.info("Server maxVersion: " + this.status.data.maxVersion);
if ("none" != Utils.prefs.getCharPref("encryption"))
yield this.keys.getKeyAndIV(self.cb, this.engineId);
},
openSession: function RStore_openSession(onComplete) {
this._openSession.async(this, onComplete);
openSession: function RStore_openSession(onComplete, lastSyncSnap) {
this._openSession.async(this, onComplete, lastSyncSnap);
},
closeSession: function RStore_closeSession() {
@ -501,6 +524,7 @@ RemoteStore.prototype = {
this.keys.data = null;
this._snapshot.data = null;
this._deltas.data = null;
this._lastSyncSnap = null;
},
// Does a fresh upload of the given snapshot to a new store
@ -511,7 +535,6 @@ RemoteStore.prototype = {
yield this.keys.initialize(self.cb, this.engineId);
this._os.notifyObservers(null, "weave:service:sync:status", "status.uploading-snapshot");
yield this._snapshot.put(self.cb, snapshot.data);
//yield this._deltas.put(self.cb, []);
let c = 0;
for (GUID in snapshot.data)
@ -533,6 +556,7 @@ RemoteStore.prototype = {
},
// Removes server files - you may want to run initialize() after this
// FIXME: might want to do a PROPFIND instead (to catch all deltas in one go)
_wipe: function Engine__wipe() {
let self = yield;
this._log.debug("Deleting remote store data");
@ -550,57 +574,58 @@ RemoteStore.prototype = {
// (snapshot + deltas)
_getLatestFromScratch: function RStore__getLatestFromScratch() {
let self = yield;
let status = this.status.data;
this._log.info("Downloading all server data from scratch");
this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-snapshot");
let snap = new SnapshotStore();
snap.data = yield this._snapshot.get(self.cb);
snap.version = status.maxVersion;
this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-deltas");
let status = this.status.data;
for (let id = status.snapVersion + 1; id <= status.maxVersion; id++) {
let delta = yield this._deltas.get(self.cb, id);
yield snap.applyCommands.async(snap, self.cb, delta);
}
self.done(snap);
self.done(snap.data);
},
// Gets the latest server snapshot by downloading only the necessary
// deltas from the given snapshot (but may fall back to a full download)
_getLatestFromSnap: function RStore__getLatestFromSnap(lastSyncSnap) {
_getLatestFromSnap: function RStore__getLatestFromSnap() {
let self = yield;
let deltas, snap = new SnapshotStore();
snap.version = this.status.data.maxVersion;
if (lastSyncSnap.version < this.status.data.snapVersion) {
this._log.trace("Getting latest from snap --> scratch");
snap = yield this._getLatestFromScratch.async(this, self.cb);
self.done(snap);
if (!this._lastSyncSnap ||
this._lastSyncSnap.version < this.status.data.snapVersion) {
this._log.trace("Getting latest from scratch (last sync snap too old)");
snap.data = yield this._getLatestFromScratch.async(this, self.cb);
self.done(snap.data);
return;
} else if (lastSyncSnap.version >= this.status.data.snapVersion &&
lastSyncSnap.version < this.status.data.maxVersion) {
} else if (this._lastSyncSnap.version >= this.status.data.snapVersion &&
this._lastSyncSnap.version < this.status.data.maxVersion) {
this._log.debug("Using last sync snapshot as starting point for server snapshot");
snap.data = Utils.deepCopy(lastSyncSnap.data);
snap.data = Utils.deepCopy(this._lastSyncSnap.data);
this._log.info("Downloading server deltas");
this._os.notifyObservers(null, "weave:service:sync:status", "status.downloading-deltas");
deltas = [];
let min = lastSyncSnap.version + 1;
let min = this._lastSyncSnap.version + 1;
let max = this.status.data.maxVersion;
for (let id = min; id <= max; id++) {
let delta = yield this._deltas.get(self.cb, id);
deltas.push(delta);
}
} else if (lastSyncSnap.version == this.status.data.maxVersion) {
} else if (this._lastSyncSnap.version == this.status.data.maxVersion) {
this._log.debug("Using last sync snapshot as server snapshot (snap version == max version)");
this._log.trace("Local snapshot version == server maxVersion");
snap.data = Utils.deepCopy(lastSyncSnap.data);
snap.data = Utils.deepCopy(this._lastSyncSnap.data);
deltas = [];
} else { // lastSyncSnap.version > this.status.data.maxVersion
} else { // this._lastSyncSnap.version > this.status.data.maxVersion
this._log.error("Server snapshot is older than local snapshot");
throw "Server snapshot is older than local snapshot";
}
@ -613,23 +638,21 @@ RemoteStore.prototype = {
this._log.warn("Error applying remote deltas to saved snapshot, attempting a full download");
this._log.debug("Exception: " + Utils.exceptionStr(e));
this._log.trace("Stack:\n" + Utils.stackTrace(e));
snap = yield this._getLatestFromScratch.async(this, self.cb);
snap.data = yield this._getLatestFromScratch.async(this, self.cb);
}
self.done(snap);
self.done(snap.data);
},
// get the latest server snapshot. If a snapshot is given, try to
// download only the necessary deltas to get to the latest
_wrap: function RStore__wrap(snapshot) {
_wrap: function RStore__wrap() {
let self = yield;
if (snapshot)
self.done(yield this._getLatestFromSnap.async(this, self.cb, snapshot));
else
self.done(yield this._getLatestFromScratch.async(this, self.cb));
let ret = yield this._getLatestFromSnap.async(this, self.cb);
self.done(ret);
},
wrap: function RStore_wrap(onComplete, snapshot) {
this._wrap.async(this, onComplete, snapshot);
wrap: function RStore_wrap(onComplete) {
this._wrap.async(this, onComplete);
},
// Adds a new set of changes (a delta) to this store
@ -641,8 +664,12 @@ RemoteStore.prototype = {
this.status.data[key] = metadata[key];
}
// FIXME: we should increment maxVersion here instead of in Engine
let id = this.status.data.maxVersion;
let c = 0;
for (item in snapshot.data)
c++;
this.status.data.itemCount = c;
let id = ++this.status.data.maxVersion;
// upload the delta even if we upload a new snapshot, so other clients
// can be spared of a full re-download