Bug 1250531 - Unconditionally sync the clients collection. r=markh

MozReview-Commit-ID: 4RHolqewNmx

--HG--
extra : rebase_source : fdde3ebdc10018c00c065fbef364f4f6fb160048
This commit is contained in:
Kit Cambridge 2016-04-01 10:55:10 -07:00
Родитель 2bfd46b860
Коммит 814c90a39d
4 изменённых файлов: 321 добавлений и 20 удалений

Просмотреть файл

@ -25,6 +25,11 @@ const CLIENTS_TTL_REFRESH = 604800; // 7 days
const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
function hasDupeCommand(commands, action) {
return commands.some(other => other.command == action.command &&
Utils.deepEquals(other.args, action.args));
}
this.ClientsRec = function ClientsRec(collection, id) {
CryptoWrapper.call(this, collection, id);
}
@ -150,6 +155,27 @@ ClientEngine.prototype = {
SyncEngine.prototype._syncStartup.call(this);
},
_processIncoming() {
// Fetch all records from the server.
this.lastSync = 0;
this._incomingClients = [];
try {
SyncEngine.prototype._processIncoming.call(this);
// Since clients are synced unconditionally, any records in the local store
// that don't exist on the server must be for disconnected clients. Remove
// them, so that we don't upload records with commands for clients that will
// never see them. We also do this to filter out stale clients from the
// tabs collection, since showing their list of tabs is confusing.
let remoteClientIDs = Object.keys(this._store._remoteClients);
let staleIDs = Utils.arraySub(remoteClientIDs, this._incomingClients);
for (let staleID of staleIDs) {
this._removeRemoteClient(staleID);
}
} finally {
this._incomingClients = null;
}
},
_syncFinish() {
// Record telemetry for our device types.
for (let [deviceType, count] of this.deviceTypes) {
@ -170,9 +196,22 @@ ClientEngine.prototype = {
SyncEngine.prototype._syncFinish.call(this);
},
// Always process incoming items because they might have commands
_reconcile: function _reconcile() {
return true;
_reconcile: function _reconcile(item) {
// Every incoming record is reconciled, so we use this to track the
// contents of the collection on the server.
this._incomingClients.push(item.id);
if (!this._store.itemExists(item.id)) {
return true;
}
// Clients are synced unconditionally, so we'll always have new records.
// Unfortunately, this will cause the scheduler to use the immediate sync
// interval for the multi-device case, instead of the active interval. We
// work around this by updating the record during reconciliation, and
// returning false to indicate that the record doesn't need to be applied
// later.
this._store.update(item);
return false;
},
// Treat reset the same as wiping for locally cached clients
@ -243,11 +282,6 @@ ClientEngine.prototype = {
throw new Error("Unknown remote client ID: '" + clientId + "'.");
}
// notDupe compares two commands and returns if they are not equal.
let notDupe = function(other) {
return other.command != command || !Utils.deepEquals(other.args, args);
};
let action = {
command: command,
args: args,
@ -257,7 +291,7 @@ ClientEngine.prototype = {
client.commands = [action];
}
// Add the new action if there are no duplicates.
else if (client.commands.every(notDupe)) {
else if (!hasDupeCommand(client.commands, action)) {
client.commands.push(action);
}
// It must be a dupe. Skip.
@ -409,7 +443,12 @@ ClientEngine.prototype = {
let subject = {uri: uri, client: clientId, title: title};
Svc.Obs.notify("weave:engine:clients:display-uri", subject);
}
},
_removeRemoteClient(id) {
delete this._store._remoteClients[id];
this._tracker.removeChangedID(id);
},
};
function ClientStore(name, engine) {
@ -426,8 +465,18 @@ ClientStore.prototype = {
// Only grab commands from the server; local name/type always wins
if (record.id == this.engine.localID)
this.engine.localCommands = record.commands;
else
else {
let currentRecord = this._remoteClients[record.id];
if (currentRecord && currentRecord.commands) {
// Merge commands.
for (let action of currentRecord.commands) {
if (!hasDupeCommand(record.cleartext.commands, action)) {
record.cleartext.commands.push(action);
}
}
}
this._remoteClients[record.id] = record.cleartext;
}
},
createRecord: function createRecord(id, collection) {

Просмотреть файл

@ -193,6 +193,79 @@ add_test(function test_properties() {
}
});
add_test(function test_full_sync() {
_("Ensure that Clients engine fetches all records for each sync.");
let now = Date.now() / 1000;
let contents = {
meta: {global: {engines: {clients: {version: engine.version,
syncID: engine.syncID}}}},
clients: {},
crypto: {}
};
let server = serverForUsers({"foo": "password"}, contents);
let user = server.user("foo");
new SyncTestingInfrastructure(server.server);
generateNewKeys(Service.collectionKeys);
let activeID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(activeID, encryptPayload({
id: activeID,
name: "Active client",
type: "desktop",
commands: [],
version: "48",
protocols: ["1.5"],
}), now - 10));
let deletedID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(deletedID, encryptPayload({
id: deletedID,
name: "Client to delete",
type: "desktop",
commands: [],
version: "48",
protocols: ["1.5"],
}), now - 10));
try {
let store = engine._store;
_("First sync. 2 records downloaded; our record uploaded.");
strictEqual(engine.lastRecordUpload, 0);
engine._sync();
ok(engine.lastRecordUpload > 0);
deepEqual(user.collection("clients").keys().sort(),
[activeID, deletedID, engine.localID].sort(),
"Our record should be uploaded on first sync");
deepEqual(Object.keys(store.getAllIDs()).sort(),
[activeID, deletedID, engine.localID].sort(),
"Other clients should be downloaded on first sync");
_("Delete a record, then sync again");
let collection = server.getCollection("foo", "clients");
collection.remove(deletedID);
// Simulate a timestamp update in info/collections.
engine.lastModified = now;
engine._sync();
_("Record should be updated");
deepEqual(Object.keys(store.getAllIDs()).sort(),
[activeID, engine.localID].sort(),
"Deleted client should be removed on next sync");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
try {
server.deleteCollections("foo");
} finally {
server.stop(run_next_test);
}
}
});
add_test(function test_sync() {
_("Ensure that Clients engine uploads a new client record once a week.");
@ -454,18 +527,29 @@ add_test(function test_command_sync() {
}
_("Create remote client record");
let rec = new ClientsRec("clients", remoteId);
engine._store.create(rec);
let remoteRecord = engine._store.createRecord(remoteId, "clients");
engine.sendCommand("wipeAll", []);
let clientRecord = engine._store._remoteClients[remoteId];
do_check_neq(clientRecord, undefined);
do_check_eq(clientRecord.commands.length, 1);
server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
id: remoteId,
name: "Remote client",
type: "desktop",
commands: [],
version: "48",
protocols: ["1.5"],
}), Date.now() / 1000));
try {
_("Syncing.");
engine._sync();
_("Checking remote record was downloaded.");
let clientRecord = engine._store._remoteClients[remoteId];
do_check_neq(clientRecord, undefined);
do_check_eq(clientRecord.commands.length, 0);
_("Send a command to the remote client.");
engine.sendCommand("wipeAll", []);
do_check_eq(clientRecord.commands.length, 1);
engine._sync();
_("Checking record was uploaded.");
do_check_neq(clientWBO(engine.localID).payload, undefined);
do_check_true(engine.lastRecordUpload > 0);
@ -487,7 +571,13 @@ add_test(function test_command_sync() {
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
server.stop(run_next_test);
try {
let collection = server.getCollection("foo", "clients");
collection.remove(remoteId);
} finally {
server.stop(run_next_test);
}
}
});
@ -575,6 +665,9 @@ add_test(function test_receive_display_uri() {
Svc.Obs.add(ev, handler);
do_check_true(engine.processIncomingCommands());
engine._resetClient();
run_next_test();
});
add_test(function test_optional_client_fields() {
@ -603,6 +696,154 @@ add_test(function test_optional_client_fields() {
run_next_test();
});
add_test(function test_merge_commands() {
_("Verifies local commands for remote clients are merged with the server's");
let now = Date.now() / 1000;
let contents = {
meta: {global: {engines: {clients: {version: engine.version,
syncID: engine.syncID}}}},
clients: {},
crypto: {}
};
let server = serverForUsers({"foo": "password"}, contents);
let user = server.user("foo");
new SyncTestingInfrastructure(server.server);
generateNewKeys(Service.collectionKeys);
let desktopID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
id: desktopID,
name: "Desktop client",
type: "desktop",
commands: [{
command: "displayURI",
args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
let mobileID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(mobileID, encryptPayload({
id: mobileID,
name: "Mobile client",
type: "mobile",
commands: [{
command: "logout",
args: [],
}],
version: "48",
protocols: ["1.5"],
}), now - 10));
try {
let store = engine._store;
_("First sync. 2 records downloaded.");
strictEqual(engine.lastRecordUpload, 0);
engine._sync();
_("Broadcast logout to all clients");
engine.sendCommand("logout", []);
engine._sync();
let collection = server.getCollection("foo", "clients");
let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
deepEqual(desktopPayload.commands, [{
command: "displayURI",
args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
}, {
command: "logout",
args: [],
}], "Should send the logout command to the desktop client");
let mobilePayload = JSON.parse(JSON.parse(collection.payload(mobileID)).ciphertext);
deepEqual(mobilePayload.commands, [{ command: "logout", args: [] }],
"Should not send a duplicate logout to the mobile client");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
try {
server.deleteCollections("foo");
} finally {
server.stop(run_next_test);
}
}
});
add_test(function test_deleted_commands() {
_("Verifies commands for a deleted client are discarded");
let now = Date.now() / 1000;
let contents = {
meta: {global: {engines: {clients: {version: engine.version,
syncID: engine.syncID}}}},
clients: {},
crypto: {}
};
let server = serverForUsers({"foo": "password"}, contents);
let user = server.user("foo");
new SyncTestingInfrastructure(server.server);
generateNewKeys(Service.collectionKeys);
let activeID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(activeID, encryptPayload({
id: activeID,
name: "Active client",
type: "desktop",
commands: [],
version: "48",
protocols: ["1.5"],
}), now - 10));
let deletedID = Utils.makeGUID();
server.insertWBO("foo", "clients", new ServerWBO(deletedID, encryptPayload({
id: deletedID,
name: "Client to delete",
type: "desktop",
commands: [],
version: "48",
protocols: ["1.5"],
}), now - 10));
try {
let store = engine._store;
_("First sync. 2 records downloaded.");
engine._sync();
_("Delete a record on the server.");
let collection = server.getCollection("foo", "clients");
collection.remove(deletedID);
_("Broadcast a command to all clients");
engine.sendCommand("logout", []);
engine._sync();
deepEqual(collection.keys().sort(), [activeID, engine.localID].sort(),
"Should not reupload deleted clients");
let activePayload = JSON.parse(JSON.parse(collection.payload(activeID)).ciphertext);
deepEqual(activePayload.commands, [{ command: "logout", args: [] }],
"Should send the command to the active client");
} finally {
Svc.Prefs.resetBranch("");
Service.recordManager.clearCache();
engine._resetClient();
try {
server.deleteCollections("foo");
} finally {
server.stop(run_next_test);
}
}
});
function run_test() {
initTestLogging("Trace");
Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;

Просмотреть файл

@ -13,6 +13,11 @@ Cu.import("resource://services-sync/service.js");
var scheduler = Service.scheduler;
var clientsEngine = Service.clientsEngine;
// Don't remove stale clients when syncing. This is a test-only workaround
// that lets us add clients directly to the store, without losing them on
// the next sync.
clientsEngine._removeRemoteClient = id => {};
function promiseStopServer(server) {
let deferred = Promise.defer();
server.stop(deferred.resolve);

Просмотреть файл

@ -29,6 +29,11 @@ Service.engineManager.register(CatapultEngine);
var scheduler = new SyncScheduler(Service);
var clientsEngine = Service.clientsEngine;
// Don't remove stale clients when syncing. This is a test-only workaround
// that lets us add clients directly to the store, without losing them on
// the next sync.
clientsEngine._removeRemoteClient = id => {};
function sync_httpd_setup() {
let global = new ServerWBO("global", {
syncID: Service.syncID,
@ -69,6 +74,7 @@ function setUp(server) {
function cleanUpAndGo(server) {
let deferred = Promise.defer();
Utils.nextTick(function () {
clientsEngine._store.wipe();
Service.startOver();
if (server) {
server.stop(deferred.resolve);