Bug 1370985 - Change getBatched() to return records directly instead of using a callback. r=markh,tcsc

MozReview-Commit-ID: HfwPe8jSH66

--HG--
extra : rebase_source : 12c8800f1fcc090589c2a94a0fc1608b36c54181
This commit is contained in:
Edouard Oger 2017-05-30 13:23:57 -04:00
Родитель 0312c4f54a
Коммит 1826fefb4c
11 изменённых файлов: 86 добавлений и 321 удалений

Просмотреть файл

@ -823,16 +823,14 @@ class BookmarkValidator {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
let resp = await collection.getBatched();
if (!resp.success) {
throw resp;
let result = await collection.getBatched();
if (!result.response.success) {
throw result.response;
}
return items;
return result.records.map(record => {
record.decrypt(collectionKey);
return record.cleartext;
});
}
async validate(engine) {

Просмотреть файл

@ -69,16 +69,14 @@ class CollectionValidator {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
let resp = await collection.getBatched();
if (!resp.success) {
throw resp;
let result = await collection.getBatched();
if (!result.response.success) {
throw result.response;
}
return items;
return result.records.map(record => {
record.decrypt(collectionKey);
return record.cleartext;
});
}
// Should return a promise that resolves to an array of client items.

Просмотреть файл

@ -1123,7 +1123,7 @@ SyncEngine.prototype = {
// called for every incoming record.
let self = this;
newitems.recordHandler = function(item) {
let recordHandler = function(item) {
if (aborting) {
return;
}
@ -1231,13 +1231,17 @@ SyncEngine.prototype = {
// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
let resp = Async.promiseSpinningly(newitems.getBatched());
doApplyBatchAndPersistFailed.call(this);
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
let { response, records } = Async.promiseSpinningly(newitems.getBatched());
if (!response.success) {
response.failureCode = ENGINE_DOWNLOAD_FAIL;
throw response;
}
for (let record of records) {
recordHandler(record);
}
doApplyBatchAndPersistFailed.call(this);
if (aborting) {
throw aborting;
}
@ -1285,13 +1289,18 @@ SyncEngine.prototype = {
newitems.newer = 0;
newitems.ids = fetchBatch.slice(0, batchSize);
// Reuse the existing record handler set earlier
let resp = Async.promiseSpinningly(newitems.get());
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
for (let json of resp.obj) {
let record = new this._recordObj();
record.deserialize(json);
recordHandler(record);
}
// This batch was successfully applied. Not using
// doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
fetchBatch = fetchBatch.slice(batchSize);
@ -1815,15 +1824,15 @@ SyncEngine.prototype = {
test.full = true;
let key = this.service.collectionKeys.keyForCollection(this.name);
test.recordHandler = function recordHandler(record) {
record.decrypt(key);
canDecrypt = true;
};
// Any failure fetching/decrypting will just result in false
try {
this._log.trace("Trying to decrypt a record from the server..");
Async.promiseSpinningly(test.get());
let json = Async.promiseSpinningly(test.get()).obj[0];
let record = new this._recordObj();
record.deserialize(json);
record.decrypt(key);
canDecrypt = true;
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;

Просмотреть файл

@ -707,8 +707,7 @@ Collection.prototype = {
async getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
let totalLimit = Number(this.limit) || Infinity;
if (batchSize <= 0 || batchSize >= totalLimit) {
// Invalid batch sizes should arguably be an error, but they're easy to handle
return this.get();
throw new Error("Invalid batch size");
}
if (!this.full) {
@ -716,13 +715,10 @@ Collection.prototype = {
}
// _onComplete and _onProgress are reset after each `get` by AsyncResource.
// We overwrite _onRecord to something that stores the data in an array
// until the end.
let { _onComplete, _onProgress, _onRecord } = this;
let { _onComplete, _onProgress } = this;
let recordBuffer = [];
let resp;
try {
this._onRecord = r => recordBuffer.push(r);
let lastModifiedTime;
this.limit = batchSize;
@ -736,8 +732,14 @@ Collection.prototype = {
// Actually perform the request
resp = await this.get();
if (!resp.success) {
recordBuffer = [];
break;
}
for (let json of resp.obj) {
let record = new this._recordObj();
record.deserialize(json);
recordBuffer.push(record);
}
// Initialize last modified, or check that something broken isn't happening.
let lastModified = resp.headers["x-last-modified"];
@ -759,54 +761,12 @@ Collection.prototype = {
// handler so that we can more convincingly pretend to be a normal get()
// call. Note: we're resetting these to the values they had before this
// function was called.
this._onRecord = _onRecord;
this._limit = totalLimit;
this._offset = null;
delete this._headers["x-if-unmodified-since"];
this._rebuildURL();
}
if (resp.success && Async.checkAppReady()) {
// call the original _onRecord (e.g. the user supplied record handler)
// for each record we've stored
for (let record of recordBuffer) {
this._onRecord(record);
}
}
return resp;
},
set recordHandler(onRecord) {
// Save this because onProgress is called with this as the ChannelListener
let coll = this;
// Switch to newline separated records for incremental parsing
coll.setHeader("Accept", "application/newlines");
this._onRecord = onRecord;
this._onProgress = function(httpChannel) {
let newline, length = 0, contentLength = "unknown";
try {
// Content-Length of the value of this response header
contentLength = httpChannel.getResponseHeader("Content-Length");
} catch (ex) { }
while ((newline = this._data.indexOf("\n")) > 0) {
// Split the json record from the rest of the data
let json = this._data.slice(0, newline);
this._data = this._data.slice(newline + 1);
length += json.length;
coll._log.trace("Record: Content-Length = " + contentLength +
", ByteCount = " + length);
// Deserialize a record from json and give it to the callback
let record = new coll._recordObj();
record.deserialize(json);
coll._onRecord(record);
}
};
return { response: resp, records: recordBuffer };
},
// This object only supports posting via the postQueue object.

Просмотреть файл

@ -296,7 +296,7 @@ ServerCollection.prototype = {
return c;
},
get(options) {
get(options, request) {
let result;
if (options.full) {
let data = [];
@ -317,8 +317,13 @@ ServerCollection.prototype = {
} else if (start) {
data = data.slice(start);
}
// Our implementation of application/newlines.
result = data.join("\n") + "\n";
if (request && request.getHeader("accept") == "application/newlines") {
this._log.error("Error: client requesting application/newlines content");
throw new Error("This server should not serve application/newlines content");
} else {
result = JSON.stringify(data);
}
// Use options as a backchannel to report count.
options.recordCount = data.length;

Просмотреть файл

@ -71,7 +71,7 @@ async function createBookmark(parentId, url, title, index = bms.DEFAULT_INDEX) {
function getServerRecord(collection, id) {
let wbo = collection.get({ full: true, ids: [id] });
// Whew - lots of json strings inside strings.
return JSON.parse(JSON.parse(JSON.parse(wbo).payload).ciphertext);
return JSON.parse(JSON.parse(JSON.parse(JSON.parse(wbo)[0]).payload).ciphertext);
}
async function promiseNoLocalItem(guid) {

Просмотреть файл

@ -535,23 +535,9 @@ add_task(async function test_misreconciled_root() {
};
let rec = new FakeRecord(BookmarkFolder, to_apply);
let encrypted = encryptPayload(rec.cleartext);
encrypted.decrypt = function() {
for (let x in rec) {
encrypted[x] = rec[x];
}
};
_("Applying record.");
engine._processIncoming({
getBatched() {
return this.get();
},
async get() {
this.recordHandler(encrypted);
return {success: true}
},
});
store.applyIncoming(rec);
// Ensure that afterwards, toolbar is still there.
// As of 2012-12-05, this only passes because Places doesn't use "toolbar" as

Просмотреть файл

@ -15,7 +15,7 @@ function recordRange(lim, offset, total) {
for (let i = offset; i < Math.min(lim + offset, total); ++i) {
res.push(JSON.stringify({ id: String(i), payload: "test:" + i }));
}
return res.join("\n") + "\n";
return res;
}
function get_test_collection_info({ totalRecords, batchSize, lastModified,
@ -25,9 +25,7 @@ function get_test_collection_info({ totalRecords, batchSize, lastModified,
coll.full = true;
let requests = [];
let responses = [];
let sawRecord = false;
coll.get = async function() {
ok(!sawRecord); // make sure we call record handler after all requests.
let limit = +this.limit;
let offset = 0;
if (this.offset) {
@ -44,9 +42,8 @@ function get_test_collection_info({ totalRecords, batchSize, lastModified,
throw "Some Network Error";
}
let body = recordRange(limit, offset, totalRecords);
this._onProgress.call({ _data: body });
let response = {
body,
obj: body,
success: true,
status: 200,
headers: {}
@ -64,33 +61,24 @@ function get_test_collection_info({ totalRecords, batchSize, lastModified,
responses.push(response);
return response;
};
let records = [];
coll.recordHandler = function(record) {
sawRecord = true;
// ensure records are coming in in the right order
equal(record.id, String(records.length));
equal(record.payload, "test:" + records.length);
records.push(record);
};
return { records, responses, requests, coll };
return { responses, requests, coll };
}
add_task(async function test_success() {
const totalRecords = 11;
const batchSize = 2;
const lastModified = "111111";
let { records, responses, requests, coll } = get_test_collection_info({
let { responses, requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
});
let response = await coll.getBatched(batchSize);
let { response, records } = await coll.getBatched(batchSize);
equal(requests.length, Math.ceil(totalRecords / batchSize));
// records are mostly checked in recordHandler, we just care about the length
equal(records.length, totalRecords);
checkRecordsOrder(records);
// ensure we're returning the last response
equal(responses[responses.length - 1], response);
@ -124,13 +112,14 @@ add_task(async function test_total_limit() {
const recordLimit = 11;
const batchSize = 2;
const lastModified = "111111";
let { records, requests, coll } = get_test_collection_info({
let { requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
});
coll.limit = recordLimit;
await coll.getBatched(batchSize);
let { records } = await coll.getBatched(batchSize);
checkRecordsOrder(records);
equal(requests.length, Math.ceil(recordLimit / batchSize));
equal(records.length, recordLimit);
@ -152,16 +141,16 @@ add_task(async function test_412() {
const totalRecords = 11;
const batchSize = 2;
const lastModified = "111111";
let { records, responses, requests, coll } = get_test_collection_info({
let { responses, requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
interruptedAfter: 3
});
let response = await coll.getBatched(batchSize);
let { response, records } = await coll.getBatched(batchSize);
equal(requests.length, 3);
equal(records.length, 0); // record handler shouldn't be called for anything
equal(records.length, 0); // we should not get any records
// ensure we're returning the last response
equal(responses[responses.length - 1], response);
@ -171,19 +160,26 @@ add_task(async function test_412() {
});
add_task(async function test_get_throws() {
_("We shouldn't record records if get() throws for some reason");
_("getBatched() should throw if a get() throws");
const totalRecords = 11;
const batchSize = 2;
const lastModified = "111111";
let { records, requests, coll } = get_test_collection_info({
let { requests, coll } = get_test_collection_info({
totalRecords,
batchSize,
lastModified,
throwAfter: 3
});
await Assert.rejects(coll.getBatched(batchSize), "Some Network Error");
await Assert.rejects(coll.getBatched(batchSize), /Some Network Error/);
equal(requests.length, 3);
equal(records.length, 0);
});
function checkRecordsOrder(records) {
ok(records.length > 0)
for (let i = 0; i < records.length; i++) {
equal(records[i].id, String(i));
equal(records[i].payload, "test:" + i);
}
}

Просмотреть файл

@ -1,188 +0,0 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
_("Make sure Collection can correctly incrementally parse GET requests");
Cu.import("resource://services-sync/record.js");
Cu.import("resource://services-sync/service.js");
function run_test() {
let base = "http://fake/";
let coll = new Collection("http://fake/uri/", WBORecord, Service);
let stream = { _data: "" };
let called, recCount, sum;
_("Not-JSON, string payloads are strings");
called = false;
stream._data = '{"id":"hello","payload":"world"}\n';
coll.recordHandler = function(rec) {
called = true;
_("Got record:", JSON.stringify(rec));
rec.collection = "uri"; // This would be done by an engine, so do it here.
do_check_eq(rec.collection, "uri");
do_check_eq(rec.id, "hello");
do_check_eq(rec.uri(base).spec, "http://fake/uri/hello");
do_check_eq(rec.payload, "world");
};
coll._onProgress.call(stream);
do_check_eq(stream._data, "");
do_check_true(called);
_("\n");
_("Parse record with payload");
called = false;
stream._data = '{"payload":"{\\"value\\":123}"}\n';
coll.recordHandler = function(rec) {
called = true;
_("Got record:", JSON.stringify(rec));
do_check_eq(rec.payload.value, 123);
};
coll._onProgress.call(stream);
do_check_eq(stream._data, "");
do_check_true(called);
_("\n");
_("Parse multiple records in one go");
called = false;
recCount = 0;
sum = 0;
stream._data = '{"id":"hundred","payload":"{\\"value\\":100}"}\n{"id":"ten","payload":"{\\"value\\":10}"}\n{"id":"one","payload":"{\\"value\\":1}"}\n';
coll.recordHandler = function(rec) {
called = true;
_("Got record:", JSON.stringify(rec));
recCount++;
sum += rec.payload.value;
_("Incremental status: count", recCount, "sum", sum);
rec.collection = "uri";
switch (recCount) {
case 1:
do_check_eq(rec.id, "hundred");
do_check_eq(rec.uri(base).spec, "http://fake/uri/hundred");
do_check_eq(rec.payload.value, 100);
do_check_eq(sum, 100);
break;
case 2:
do_check_eq(rec.id, "ten");
do_check_eq(rec.uri(base).spec, "http://fake/uri/ten");
do_check_eq(rec.payload.value, 10);
do_check_eq(sum, 110);
break;
case 3:
do_check_eq(rec.id, "one");
do_check_eq(rec.uri(base).spec, "http://fake/uri/one");
do_check_eq(rec.payload.value, 1);
do_check_eq(sum, 111);
break;
default:
do_throw("unexpected number of record counts", recCount);
break;
}
};
coll._onProgress.call(stream);
do_check_eq(recCount, 3);
do_check_eq(sum, 111);
do_check_eq(stream._data, "");
do_check_true(called);
_("\n");
_("Handle incremental data incoming");
called = false;
recCount = 0;
sum = 0;
stream._data = '{"payl';
coll.recordHandler = function(rec) {
called = true;
do_throw("shouldn't have gotten a record..");
};
coll._onProgress.call(stream);
_("shouldn't have gotten anything yet");
do_check_eq(recCount, 0);
do_check_eq(sum, 0);
_("leading array bracket should have been trimmed");
do_check_eq(stream._data, '{"payl');
do_check_false(called);
_();
_("adding more data enough for one record..");
called = false;
stream._data += 'oad":"{\\"value\\":100}"}\n';
coll.recordHandler = function(rec) {
called = true;
_("Got record:", JSON.stringify(rec));
recCount++;
sum += rec.payload.value;
};
coll._onProgress.call(stream);
_("should have 1 record with sum 100");
do_check_eq(recCount, 1);
do_check_eq(sum, 100);
_("all data should have been consumed including trailing comma");
do_check_eq(stream._data, "");
do_check_true(called);
_();
_("adding more data..");
called = false;
stream._data += '{"payload":"{\\"value\\":10}"';
coll.recordHandler = function(rec) {
called = true;
do_throw("shouldn't have gotten a record..");
};
coll._onProgress.call(stream);
_("should still have 1 record with sum 100");
do_check_eq(recCount, 1);
do_check_eq(sum, 100);
_("should almost have a record");
do_check_eq(stream._data, '{"payload":"{\\"value\\":10}"');
do_check_false(called);
_();
_("add data for two records..");
called = false;
stream._data += '}\n{"payload":"{\\"value\\":1}"}\n';
coll.recordHandler = function(rec) {
called = true;
_("Got record:", JSON.stringify(rec));
recCount++;
sum += rec.payload.value;
switch (recCount) {
case 2:
do_check_eq(rec.payload.value, 10);
do_check_eq(sum, 110);
break;
case 3:
do_check_eq(rec.payload.value, 1);
do_check_eq(sum, 111);
break;
default:
do_throw("unexpected number of record counts", recCount);
break;
}
};
coll._onProgress.call(stream);
_("should have gotten all 3 records with sum 111");
do_check_eq(recCount, 3);
do_check_eq(sum, 111);
_("should have consumed all data");
do_check_eq(stream._data, "");
do_check_true(called);
_();
_("add no extra data");
called = false;
stream._data += "";
coll.recordHandler = function(rec) {
called = true;
do_throw("shouldn't have gotten a record..");
};
coll._onProgress.call(stream);
_("should still have 3 records with sum 111");
do_check_eq(recCount, 3);
do_check_eq(sum, 111);
_("should have consumed nothing but still have nothing");
do_check_eq(stream._data, "");
do_check_false(called);
_("\n");
}

Просмотреть файл

@ -49,7 +49,6 @@ tags = addons
# Generic Sync types.
[test_browserid_identity.js]
[test_collection_inc_get.js]
[test_collection_getBatched.js]
[test_collections_recovery.js]
[test_keys.js]

Просмотреть файл

@ -615,17 +615,19 @@ var TPS = {
*/
ValidateBookmarks() {
let getServerBookmarkState = () => {
let getServerBookmarkState = async () => {
let bookmarkEngine = Weave.Service.engineManager.get("bookmarks");
let collection = bookmarkEngine.itemSource();
let collectionKey = bookmarkEngine.service.collectionKeys.keyForCollection(bookmarkEngine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
Async.promiseSpinningly(collection.get());
let resp = await collection.get();
for (let json of resp.obj) {
let record = new collection._recordObj();
record.deserialize(json);
record.decrypt(collectionKey);
items.push(record.cleartext);
}
return items;
};
let serverRecordDumpStr;
@ -634,7 +636,7 @@ var TPS = {
let clientTree = Async.promiseSpinningly(PlacesUtils.promiseBookmarksTree("", {
includeItemIds: true
}));
let serverRecords = getServerBookmarkState();
let serverRecords = Async.promiseSpinningly(getServerBookmarkState());
// We can't wait until catch to stringify this, since at that point it will have cycles.
serverRecordDumpStr = JSON.stringify(serverRecords);