зеркало из https://github.com/mozilla/gecko-dev.git
Bug 569295 - limit the number of bytes we attempt to upload to the storage servers. r=rnewman
This commit is contained in:
Родитель
6dd019d2f4
Коммит
1a720883b1
|
@ -94,10 +94,10 @@ SCORE_UPDATE_DELAY: 100,
|
|||
// observed spurious idle/back events and short enough to pre-empt user activity.
|
||||
IDLE_OBSERVER_BACK_DELAY: 100,
|
||||
|
||||
// Number of records to upload in a single POST (multiple POSTS if exceeded)
|
||||
// FIXME: Record size limit is 256k (new cluster), so this can be quite large!
|
||||
// (Bug 569295)
|
||||
// Max number of records or bytes to upload in a single POST - we'll do multiple POSTS if either
|
||||
// MAX_UPLOAD_RECORDS or MAX_UPLOAD_BYTES is hit)
|
||||
MAX_UPLOAD_RECORDS: 100,
|
||||
MAX_UPLOAD_BYTES: 1024 * 1023, // just under 1MB
|
||||
MAX_HISTORY_UPLOAD: 5000,
|
||||
MAX_HISTORY_DOWNLOAD: 5000,
|
||||
|
||||
|
|
|
@ -1434,13 +1434,7 @@ SyncEngine.prototype = {
|
|||
|
||||
// collection we'll upload
|
||||
let up = new Collection(this.engineURL, null, this.service);
|
||||
let count = 0;
|
||||
|
||||
// Upload what we've got so far in the collection
|
||||
let doUpload = Utils.bind2(this, function(desc) {
|
||||
this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
|
||||
" records");
|
||||
let resp = up.post();
|
||||
let handleResponse = resp => {
|
||||
if (!resp.success) {
|
||||
this._log.debug("Uploading records failed: " + resp);
|
||||
resp.failureCode = ENGINE_UPLOAD_FAIL;
|
||||
|
@ -1463,32 +1457,29 @@ SyncEngine.prototype = {
|
|||
let id = resp.obj.success[key];
|
||||
delete this._modified[id];
|
||||
}
|
||||
}
|
||||
|
||||
up.clearRecords();
|
||||
});
|
||||
let postQueue = up.newPostQueue(this._log, handleResponse);
|
||||
|
||||
for (let id of modifiedIDs) {
|
||||
let out;
|
||||
let ok = false;
|
||||
try {
|
||||
let out = this._createRecord(id);
|
||||
out = this._createRecord(id);
|
||||
if (this._log.level <= Log.Level.Trace)
|
||||
this._log.trace("Outgoing: " + out);
|
||||
|
||||
out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
|
||||
up.pushData(out);
|
||||
ok = true;
|
||||
} catch (ex if !Async.isShutdownException(ex)) {
|
||||
this._log.warn("Error creating record", ex);
|
||||
}
|
||||
|
||||
// Partial upload
|
||||
if ((++count % MAX_UPLOAD_RECORDS) == 0)
|
||||
doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
|
||||
|
||||
if (ok) {
|
||||
postQueue.enqueue(out);
|
||||
}
|
||||
this._store._sleep(0);
|
||||
}
|
||||
|
||||
// Final upload
|
||||
if (count % MAX_UPLOAD_RECORDS > 0)
|
||||
doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
|
||||
postQueue.flush();
|
||||
}
|
||||
},
|
||||
|
||||
|
|
|
@ -600,14 +600,6 @@ Collection.prototype = {
|
|||
this._rebuildURL();
|
||||
},
|
||||
|
||||
pushData: function Coll_pushData(data) {
|
||||
this._data.push(data);
|
||||
},
|
||||
|
||||
clearRecords: function Coll_clearRecords() {
|
||||
this._data = [];
|
||||
},
|
||||
|
||||
set recordHandler(onRecord) {
|
||||
// Save this because onProgress is called with this as the ChannelListener
|
||||
let coll = this;
|
||||
|
@ -629,4 +621,82 @@ Collection.prototype = {
|
|||
}
|
||||
};
|
||||
},
|
||||
|
||||
// This object only supports posting via the postQueue object.
|
||||
post() {
|
||||
throw new Error("Don't directly post to a collection - use newPostQueue instead");
|
||||
},
|
||||
|
||||
newPostQueue(log, postCallback) {
|
||||
let poster = data => {
|
||||
return Resource.prototype.post.call(this, data);
|
||||
}
|
||||
return new PostQueue(poster, log, postCallback);
|
||||
},
|
||||
};
|
||||
|
||||
/* A helper to manage the posting of records while respecting the various
|
||||
size limits.
|
||||
*/
|
||||
function PostQueue(poster, log, postCallback) {
|
||||
// The "post" function we should use when it comes time to do the post.
|
||||
this.poster = poster;
|
||||
this.log = log;
|
||||
|
||||
// The callback we make with the response when we do get around to making the
|
||||
// post (which could be during any of the enqueue() calls or the final flush())
|
||||
// This callback may be called multiple times and must not add new items to
|
||||
// the queue.
|
||||
this.postCallback = postCallback;
|
||||
|
||||
// The string where we are capturing the stringified version of the records
|
||||
// queued so far. It will always be invalid JSON as it is always missing the
|
||||
// close bracket.
|
||||
this.queued = "";
|
||||
|
||||
// The number of records we've queued so far.
|
||||
this.numQueued = 0;
|
||||
}
|
||||
|
||||
PostQueue.prototype = {
|
||||
enqueue(record) {
|
||||
// We want to ensure the record has a .toJSON() method defined - even
|
||||
// though JSON.stringify() would implicitly call it, the stringify might
|
||||
// still work even if it isn't defined, which isn't what we want.
|
||||
let jsonRepr = record.toJSON();
|
||||
if (!jsonRepr) {
|
||||
throw new Error("You must only call this with objects that explicitly support JSON");
|
||||
}
|
||||
let bytes = JSON.stringify(jsonRepr);
|
||||
// Note that we purposely don't check if a single record would exceed our
|
||||
// limit - we still attempt the post and if it sees a 413 like we think it
|
||||
// will, we just let that do whatever it does (which is probably cause
|
||||
// ongoing sync failures for that engine - bug 1241356 exists to fix this)
|
||||
// (Note that counter-intuitively, the post of the oversized record will
|
||||
// not happen here but on the next .enqueue/.flush.)
|
||||
|
||||
// Do a flush if we can't add this record without exceeding our limits.
|
||||
let newLength = this.queued.length + bytes.length + 1; // extra 1 for trailing "]"
|
||||
if (this.numQueued >= MAX_UPLOAD_RECORDS || newLength >= MAX_UPLOAD_BYTES) {
|
||||
this.log.trace("PostQueue flushing"); // flush logs more info...
|
||||
// We need to write the queue out before handling this one.
|
||||
this.flush();
|
||||
}
|
||||
// Either a ',' or a '[' depending on whether this is the first record.
|
||||
this.queued += this.numQueued ? "," : "[";
|
||||
this.queued += bytes;
|
||||
this.numQueued++;
|
||||
},
|
||||
|
||||
flush() {
|
||||
if (!this.queued) {
|
||||
// nothing queued.
|
||||
return;
|
||||
}
|
||||
this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes`);
|
||||
let queued = this.queued + "]";
|
||||
this.queued = "";
|
||||
this.numQueued = 0;
|
||||
this.postCallback(this.poster(queued));
|
||||
},
|
||||
}
|
||||
|
|
|
@ -1486,6 +1486,99 @@ add_test(function test_uploadOutgoing_MAX_UPLOAD_RECORDS() {
|
|||
}
|
||||
});
|
||||
|
||||
add_test(function test_uploadOutgoing_MAX_UPLOAD_BYTES() {
|
||||
_("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_BYTES");
|
||||
|
||||
Service.identity.username = "foo";
|
||||
let collection = new ServerCollection();
|
||||
|
||||
// Let's count how many times the client posts to the server
|
||||
let uploadCounts = [];
|
||||
collection.post = (function(orig) {
|
||||
return function(data) {
|
||||
uploadCounts.push(JSON.parse(data).length);
|
||||
return orig.call(this, data);
|
||||
};
|
||||
}(collection.post));
|
||||
|
||||
let engine = makeRotaryEngine();
|
||||
|
||||
// A helper function that calculates the overhead of a record as uploaded
|
||||
// to the server - it returns the size of a record with an empty string.
|
||||
// This is so we can calculate exactly how many records we can fit into a
|
||||
// batch (ie, we expect the record size that's actually uploaded to be the
|
||||
// result of this function + the length of the data)
|
||||
let calculateRecordOverhead = function() {
|
||||
engine._store.items["string-no-x"] = "";
|
||||
let x = engine._createRecord("string-no-x");
|
||||
x.encrypt(Service.collectionKeys.keyForCollection(engine.name));
|
||||
delete engine._store.items["string-no-x"];
|
||||
return JSON.stringify(x).length;
|
||||
}
|
||||
|
||||
let allIds = [];
|
||||
// Create a bunch of records (and server side handlers) - make 20 that will
|
||||
// fit inside our byte limit.
|
||||
let fullItemSize = (MAX_UPLOAD_BYTES - 2) / 20;
|
||||
// fullItemSize includes the "," between records and quote characters (as we
|
||||
// will use strings)
|
||||
let itemSize = fullItemSize - calculateRecordOverhead() - (3 * 20);
|
||||
// Add 21 of this size - the first 20 should fit in the first batch.
|
||||
for (let i = 0; i < 21; i++) {
|
||||
let id = 'string-no-' + i;
|
||||
engine._store.items[id] = "X".repeat(itemSize);
|
||||
engine._tracker.addChangedID(id, 0);
|
||||
collection.insert(id);
|
||||
allIds.push(id);
|
||||
}
|
||||
// Now a single large item that's greater than MAX_UPLOAD_BYTES. This should
|
||||
// cause the 1 item that didn't fit in the previous batch to be uploaded
|
||||
// by itself, then this large one by itself.
|
||||
engine._store.items["large-item"] = "Y".repeat(MAX_UPLOAD_BYTES*2);
|
||||
engine._tracker.addChangedID("large-item", 0);
|
||||
collection.insert("large-item");
|
||||
allIds.push("large-item");
|
||||
// And a few more small items - these should all be uploaded together.
|
||||
for (let i = 0; i < 20; i++) {
|
||||
let id = 'small-no-' + i;
|
||||
engine._store.items[id] = "ZZZZ";
|
||||
engine._tracker.addChangedID(id, 0);
|
||||
collection.insert(id);
|
||||
allIds.push(id);
|
||||
}
|
||||
|
||||
let meta_global = Service.recordManager.set(engine.metaURL,
|
||||
new WBORecord(engine.metaURL));
|
||||
meta_global.payload.engines = {rotary: {version: engine.version,
|
||||
syncID: engine.syncID}};
|
||||
|
||||
let server = sync_httpd_setup({
|
||||
"/1.1/foo/storage/rotary": collection.handler()
|
||||
});
|
||||
|
||||
let syncTesting = new SyncTestingInfrastructure(server);
|
||||
|
||||
try {
|
||||
|
||||
// Confirm initial environment.
|
||||
do_check_eq(uploadCounts.length, 0);
|
||||
|
||||
engine._syncStartup();
|
||||
engine._uploadOutgoing();
|
||||
|
||||
// Ensure all records have been uploaded.
|
||||
for (let checkId of allIds) {
|
||||
do_check_true(!!collection.payload(checkId));
|
||||
}
|
||||
|
||||
// Ensure that the uploads were performed in the batch sizes we expect.
|
||||
Assert.deepEqual(uploadCounts, [20, 1, 1, 20]);
|
||||
|
||||
} finally {
|
||||
cleanAndGo(server);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
add_test(function test_syncFinish_noDelete() {
|
||||
_("SyncEngine._syncFinish resets tracker's score");
|
||||
|
|
Загрузка…
Ссылка в новой задаче