зеркало из https://github.com/mozilla/gecko-dev.git
Bug 769500 - Add batching APIs to storage service client; r=rnewman
This commit is contained in:
Родитель
60c58a84c7
Коммит
47efadcd26
|
@ -870,9 +870,12 @@ StorageServiceRequest.prototype = {
|
|||
return;
|
||||
}
|
||||
|
||||
if (response.status == 503) {
|
||||
if (response.status >= 500 && response.status <= 599) {
|
||||
this._log.error(response.status + " seen from server!");
|
||||
this._error = new StorageServiceRequestError();
|
||||
this._error.server = new Error("503 Received.");
|
||||
this._error.server = new Error(response.status + " status code.");
|
||||
callOnComplete();
|
||||
return;
|
||||
}
|
||||
|
||||
callOnComplete();
|
||||
|
@ -1084,15 +1087,21 @@ StorageCollectionGetRequest.prototype = {
|
|||
function StorageCollectionSetRequest() {
|
||||
StorageServiceRequest.call(this);
|
||||
|
||||
this._lines = [];
|
||||
this._size = 0;
|
||||
this.size = 0;
|
||||
|
||||
this.successfulIDs = new Set();
|
||||
this.failures = new Map();
|
||||
// TODO Bug 775781 convert to Set and Map once iterable.
|
||||
this.successfulIDs = [];
|
||||
this.failures = {};
|
||||
|
||||
this._lines = [];
|
||||
}
|
||||
StorageCollectionSetRequest.prototype = {
|
||||
__proto__: StorageServiceRequest.prototype,
|
||||
|
||||
get count() {
|
||||
return this._lines.length;
|
||||
},
|
||||
|
||||
/**
|
||||
* Add a BasicStorageObject to this request.
|
||||
*
|
||||
|
@ -1112,33 +1121,384 @@ StorageCollectionSetRequest.prototype = {
|
|||
throw new Error("Passed BSO must have id defined.");
|
||||
}
|
||||
|
||||
let line = JSON.stringify(bso).replace("\n", "\u000a");
|
||||
this.addLine(JSON.stringify(bso));
|
||||
},
|
||||
|
||||
/**
|
||||
* Add a BSO (represented by its serialized newline-delimited form).
|
||||
*
|
||||
* You probably shouldn't use this. It is used for batching.
|
||||
*/
|
||||
addLine: function addLine(line) {
|
||||
// This is off by 1 in the larger direction. We don't care.
|
||||
this._size += line.length + "\n".length;
|
||||
this.size += line.length + 1;
|
||||
this._lines.push(line);
|
||||
},
|
||||
|
||||
_onDispatch: function _onDispatch() {
|
||||
this._data = this._lines.join("\n");
|
||||
this.size = this._data.length;
|
||||
},
|
||||
|
||||
_completeParser: function _completeParser(response) {
|
||||
let result = JSON.parse(response.body);
|
||||
|
||||
for (let id of result.success) {
|
||||
this.successfulIDs.add(id);
|
||||
this.successfulIDs.push(id);
|
||||
}
|
||||
|
||||
this.allSucceeded = true;
|
||||
|
||||
for (let [id, reasons] in result.failed) {
|
||||
for (let [id, reasons] in Iterator(result.failed)) {
|
||||
this.failures[id] = reasons;
|
||||
this.allSucceeded = false;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
* Represents a batch upload of BSOs to an individual collection.
|
||||
*
|
||||
* This is a more intelligent way to upload may BSOs to the server. It will
|
||||
* split the uploaded data into multiple requests so size limits, etc aren't
|
||||
* exceeded.
|
||||
*
|
||||
* Once a client obtains an instance of this type, it calls `addBSO` for each
|
||||
* BSO to be uploaded. When the client is done providing BSOs to be uploaded,
|
||||
* it calls `finish`. When `finish` is called, no more BSOs can be added to the
|
||||
* batch. When all requests created from this batch have finished, the callback
|
||||
* provided to `finish` will be invoked.
|
||||
*
|
||||
* Clients can also explicitly flush pending outgoing BSOs via `flush`. This
|
||||
* allows callers to control their own batching/chunking.
|
||||
*
|
||||
* Interally, this maintains a queue of StorageCollectionSetRequest to be
|
||||
* issued. At most one request is allowed to be in-flight at once. This is to
|
||||
* avoid potential conflicts on the server. And, in the case of conditional
|
||||
* requests, it prevents requests from being declined due to the server being
|
||||
* updated by another request issued by us.
|
||||
*
|
||||
* If a request errors for any reason, all queued uploads are abandoned and the
|
||||
* `finish` callback is invoked as soon as possible. The `successfulIDs` and
|
||||
* `failures` properties will contain data from all requests that had this
|
||||
* response data. In other words, the IDs have BSOs that were never sent to the
|
||||
* server are not lumped in to either property.
|
||||
*
|
||||
* Requests can be made conditional by setting `locallyModifiedVersion` to the
|
||||
* most recent version of server data. As responses from the server are seen,
|
||||
* the last server version is carried forward to subsequent requests.
|
||||
*
|
||||
* The server version from the last request is available in the
|
||||
* `serverModifiedVersion` property. It should only be accessed during or
|
||||
* after the callback passed to `finish`.
|
||||
*
|
||||
* @param client
|
||||
* (StorageServiceClient) Client instance to use for uploading.
|
||||
*
|
||||
* @param collection
|
||||
* (string) Collection the batch operation will upload to.
|
||||
*/
|
||||
function StorageCollectionBatchedSet(client, collection) {
|
||||
this.client = client;
|
||||
this.collection = collection;
|
||||
|
||||
this._log = client._log;
|
||||
|
||||
this.locallyModifiedVersion = null;
|
||||
this.serverModifiedVersion = null;
|
||||
|
||||
// TODO Bug 775781 convert to Set and Map once iterable.
|
||||
this.successfulIDs = [];
|
||||
this.failures = {};
|
||||
|
||||
// Request currently being populated.
|
||||
this._stagingRequest = client.setBSOs(this.collection);
|
||||
|
||||
// Requests ready to be sent over the wire.
|
||||
this._outgoingRequests = [];
|
||||
|
||||
// Whether we are waiting for a response.
|
||||
this._requestInFlight = false;
|
||||
|
||||
this._onFinishCallback = null;
|
||||
this._finished = false;
|
||||
this._errorEncountered = false;
|
||||
}
|
||||
StorageCollectionBatchedSet.prototype = {
|
||||
/**
|
||||
* Add a BSO to be uploaded as part of this batch.
|
||||
*/
|
||||
addBSO: function addBSO(bso) {
|
||||
if (this._errorEncountered) {
|
||||
return;
|
||||
}
|
||||
|
||||
let line = JSON.stringify(bso);
|
||||
|
||||
if (line.length > this.client.REQUEST_SIZE_LIMIT) {
|
||||
throw new Error("BSO is larger than allowed limit: " + line.length +
|
||||
" > " + this.client.REQUEST_SIZE_LIMIT);
|
||||
}
|
||||
|
||||
if (this._stagingRequest.size + line.length > this.client.REQUEST_SIZE_LIMIT) {
|
||||
this._log.debug("Sending request because payload size would be exceeded");
|
||||
this._finishStagedRequest();
|
||||
|
||||
this._stagingRequest.addLine(line);
|
||||
return;
|
||||
}
|
||||
|
||||
// We are guaranteed to fit within size limits.
|
||||
this._stagingRequest.addLine(line);
|
||||
|
||||
if (this._stagingRequest.count >= this.client.REQUEST_BSO_COUNT_LIMIT) {
|
||||
this._log.debug("Sending request because BSO count threshold reached.");
|
||||
this._finishStagedRequest();
|
||||
return;
|
||||
}
|
||||
},
|
||||
|
||||
finish: function finish(cb) {
|
||||
if (this._finished) {
|
||||
throw new Error("Batch request has already been finished.");
|
||||
}
|
||||
|
||||
this.flush();
|
||||
|
||||
this._onFinishCallback = cb;
|
||||
this._finished = true;
|
||||
this._stagingRequest = null;
|
||||
},
|
||||
|
||||
flush: function flush() {
|
||||
if (this._finished) {
|
||||
throw new Error("Batch request has been finished.");
|
||||
}
|
||||
|
||||
if (!this._stagingRequest.count) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._finishStagedRequest();
|
||||
},
|
||||
|
||||
_finishStagedRequest: function _finishStagedRequest() {
|
||||
this._outgoingRequests.push(this._stagingRequest);
|
||||
this._sendOutgoingRequest();
|
||||
this._stagingRequest = this.client.setBSOs(this.collection);
|
||||
},
|
||||
|
||||
_sendOutgoingRequest: function _sendOutgoingRequest() {
|
||||
if (this._requestInFlight || this._errorEncountered) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this._outgoingRequests.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
let request = this._outgoingRequests.shift();
|
||||
|
||||
if (this.locallyModifiedVersion) {
|
||||
request.locallyModifiedVersion = this.locallyModifiedVersion;
|
||||
}
|
||||
|
||||
request.dispatch(this._onBatchComplete.bind(this));
|
||||
this._requestInFlight = true;
|
||||
},
|
||||
|
||||
_onBatchComplete: function _onBatchComplete(error, request) {
|
||||
this._requestInFlight = false;
|
||||
|
||||
this.serverModifiedVersion = request.serverTime;
|
||||
|
||||
// Only update if we had a value before. Otherwise, this breaks
|
||||
// unconditional requests!
|
||||
if (this.locallyModifiedVersion) {
|
||||
this.locallyModifiedVersion = request.serverTime;
|
||||
}
|
||||
|
||||
for (let id of request.successfulIDs) {
|
||||
this.successfulIDs.push(id);
|
||||
}
|
||||
|
||||
for (let [id, reason] in Iterator(request.failures)) {
|
||||
this.failures[id] = reason;
|
||||
}
|
||||
|
||||
if (request.error) {
|
||||
this._errorEncountered = true;
|
||||
}
|
||||
|
||||
this._checkFinish();
|
||||
},
|
||||
|
||||
_checkFinish: function _checkFinish() {
|
||||
if (this._outgoingRequests.length && !this._errorEncountered) {
|
||||
this._sendOutgoingRequest();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this._onFinishCallback) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this._onFinishCallback(this);
|
||||
} catch (ex) {
|
||||
this._log.warn("Exception when calling finished callback: " +
|
||||
CommonUtils.exceptionStr(ex));
|
||||
}
|
||||
},
|
||||
};
|
||||
Object.freeze(StorageCollectionBatchedSet.prototype);
|
||||
|
||||
/**
|
||||
* Manages a batch of BSO deletion requests.
|
||||
*
|
||||
* A single instance of this virtual request allows deletion of many individual
|
||||
* BSOs without having to worry about server limits.
|
||||
*
|
||||
* Instances are obtained by calling `deleteBSOsBatching` on
|
||||
* StorageServiceClient.
|
||||
*
|
||||
* Usage is roughly the same as StorageCollectionBatchedSet. Callers obtain
|
||||
* an instance and select individual BSOs for deletion by calling `addID`.
|
||||
* When the caller is finished marking BSOs for deletion, they call `finish`
|
||||
* with a callback which will be invoked when all deletion requests finish.
|
||||
*
|
||||
* When the finished callback is invoked, any encountered errors will be stored
|
||||
* in the `errors` property of this instance (which is passed to the callback).
|
||||
* This will be an empty array if no errors were encountered. Else, it will
|
||||
* contain the errors from the `onComplete` handler of request instances. The
|
||||
* set of succeeded and failed IDs is not currently available.
|
||||
*
|
||||
* Deletes can be made conditional by setting `locallyModifiedVersion`. The
|
||||
* behavior is the same as request types. The only difference is that the
|
||||
* updated version from the server as a result of requests is carried forward
|
||||
* to subsequent requests.
|
||||
*
|
||||
* The server version from the last request is stored in the
|
||||
* `serverModifiedVersion` property. It is not safe to access this until the
|
||||
* callback from `finish`.
|
||||
*
|
||||
* Like StorageCollectionBatchedSet, requests are issued serially to avoid
|
||||
* race conditions on the server.
|
||||
*
|
||||
* @param client
|
||||
* (StorageServiceClient) Client request is associated with.
|
||||
* @param collection
|
||||
* (string) Collection being operated on.
|
||||
*/
|
||||
function StorageCollectionBatchedDelete(client, collection) {
|
||||
this.client = client;
|
||||
this.collection = collection;
|
||||
|
||||
this._log = client._log;
|
||||
|
||||
this.locallyModifiedVersion = null;
|
||||
this.serverModifiedVersion = null;
|
||||
this.errors = [];
|
||||
|
||||
this._pendingIDs = [];
|
||||
this._requestInFlight = false;
|
||||
this._finished = false;
|
||||
this._finishedCallback = null;
|
||||
}
|
||||
StorageCollectionBatchedDelete.prototype = {
|
||||
addID: function addID(id) {
|
||||
if (this._finished) {
|
||||
throw new Error("Cannot add IDs to a finished instance.");
|
||||
}
|
||||
|
||||
// If we saw errors already, don't do any work. This is an optimization
|
||||
// and isn't strictly required, as _sendRequest() should no-op.
|
||||
if (this.errors.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._pendingIDs.push(id);
|
||||
|
||||
if (this._pendingIDs.length >= this.client.REQUEST_BSO_DELETE_LIMIT) {
|
||||
this._sendRequest();
|
||||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Finish this batch operation.
|
||||
*
|
||||
* No more IDs can be added to this operation. Existing IDs are flushed as
|
||||
* a request. The passed callback will be called when all requests have
|
||||
* finished.
|
||||
*/
|
||||
finish: function finish(cb) {
|
||||
if (this._finished) {
|
||||
throw new Error("Batch delete instance has already been finished.");
|
||||
}
|
||||
|
||||
this._finished = true;
|
||||
this._finishedCallback = cb;
|
||||
|
||||
if (this._pendingIDs.length) {
|
||||
this._sendRequest();
|
||||
}
|
||||
},
|
||||
|
||||
_sendRequest: function _sendRequest() {
|
||||
// Only allow 1 active request at a time and don't send additional
|
||||
// requests if one has failed.
|
||||
if (this._requestInFlight || this.errors.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
let ids = this._pendingIDs.splice(0, this.client.REQUEST_BSO_DELETE_LIMIT);
|
||||
let request = this.client.deleteBSOs(this.collection, ids);
|
||||
|
||||
if (this.locallyModifiedVersion) {
|
||||
request.locallyModifiedVersion = this.locallyModifiedVersion;
|
||||
}
|
||||
|
||||
request.dispatch(this._onRequestComplete.bind(this));
|
||||
this._requestInFlight = true;
|
||||
},
|
||||
|
||||
_onRequestComplete: function _onRequestComplete(error, request) {
|
||||
this._requestInFlight = false;
|
||||
|
||||
if (error) {
|
||||
// We don't currently track metadata of what failed. This is an obvious
|
||||
// feature that could be added.
|
||||
this._log.warn("Error received from server: " + error);
|
||||
this.errors.push(error);
|
||||
}
|
||||
|
||||
this.serverModifiedVersion = request.serverTime;
|
||||
|
||||
// If performing conditional requests, carry forward the new server version
|
||||
// so subsequent conditional requests work.
|
||||
if (this.locallyModifiedVersion) {
|
||||
this.locallyModifiedVersion = request.serverTime;
|
||||
}
|
||||
|
||||
if (this._pendingIDs.length && !this.errors.length) {
|
||||
this._sendRequest();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this._finishedCallback) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this._finishedCallback(this);
|
||||
} catch (ex) {
|
||||
this._log.warn("Exception when invoking finished callback: " +
|
||||
CommonUtils.exceptionStr(ex));
|
||||
}
|
||||
},
|
||||
};
|
||||
Object.freeze(StorageCollectionBatchedDelete.prototype);
|
||||
|
||||
/**
|
||||
* Construct a new client for the SyncStorage API, version 2.0.
|
||||
*
|
||||
|
@ -1195,6 +1555,27 @@ StorageServiceClient.prototype = {
|
|||
*/
|
||||
userAgent: "StorageServiceClient",
|
||||
|
||||
/**
|
||||
* Maximum size of entity bodies.
|
||||
*
|
||||
* TODO this should come from the server somehow. See bug 769759.
|
||||
*/
|
||||
REQUEST_SIZE_LIMIT: 512000,
|
||||
|
||||
/**
|
||||
* Maximum number of BSOs in requests.
|
||||
*
|
||||
* TODO this should come from the server somehow. See bug 769759.
|
||||
*/
|
||||
REQUEST_BSO_COUNT_LIMIT: 100,
|
||||
|
||||
/**
|
||||
* Maximum number of BSOs that can be deleted in a single DELETE.
|
||||
*
|
||||
* TODO this should come from the server. See bug 769759.
|
||||
*/
|
||||
REQUEST_BSO_DELETE_LIMIT: 100,
|
||||
|
||||
_baseURI: null,
|
||||
_log: null,
|
||||
|
||||
|
@ -1584,11 +1965,9 @@ StorageServiceClient.prototype = {
|
|||
* has additional functions and properties specific to this operation. See
|
||||
* its documentation for more.
|
||||
*
|
||||
* Future improvement: support streaming of uploaded records. Currently, data
|
||||
* is buffered in the client before going over the wire. Ideally, we'd support
|
||||
* sending over the wire as soon as data is available. This will require
|
||||
* support in RESTRequest, which doesn't support streaming on requests, only
|
||||
* responses.
|
||||
* Most consumers interested in submitting multiple BSOs to the server will
|
||||
* want to use `setBSOsBatching` instead. That API intelligently splits up
|
||||
* requests as necessary, etc.
|
||||
*
|
||||
* Example usage:
|
||||
*
|
||||
|
@ -1635,6 +2014,30 @@ StorageServiceClient.prototype = {
|
|||
return request;
|
||||
},
|
||||
|
||||
/**
|
||||
* This is a batching variant of setBSOs.
|
||||
*
|
||||
* Whereas `setBSOs` is a 1:1 mapping between function calls and HTTP
|
||||
* requests issued, this one is a 1:N mapping. It will intelligently break
|
||||
* up outgoing BSOs into multiple requests so size limits, etc aren't
|
||||
* exceeded.
|
||||
*
|
||||
* Please see the documentation for `StorageCollectionBatchedSet` for
|
||||
* usage info.
|
||||
*
|
||||
* @param collection
|
||||
* (string) Collection to operate on.
|
||||
* @return
|
||||
* (StorageCollectionBatchedSet) Batched set instance.
|
||||
*/
|
||||
setBSOsBatching: function setBSOsBatching(collection) {
|
||||
if (!collection) {
|
||||
throw new Error("collection argument must be defined.");
|
||||
}
|
||||
|
||||
return new StorageCollectionBatchedSet(this, collection);
|
||||
},
|
||||
|
||||
/**
|
||||
* Deletes a single BSO from a collection.
|
||||
*
|
||||
|
@ -1670,6 +2073,10 @@ StorageServiceClient.prototype = {
|
|||
* The request can be made conditional by setting `locallyModifiedVersion`
|
||||
* on the returned request instance.
|
||||
*
|
||||
* If the number of BSOs to delete is potentially large, it is preferred to
|
||||
* use `deleteBSOsBatching`. That API automatically splits the operation into
|
||||
* multiple requests so server limits aren't exceeded.
|
||||
*
|
||||
* @param collection
|
||||
* (string) Name of collection to delete BSOs from.
|
||||
* @param ids
|
||||
|
@ -1688,6 +2095,24 @@ StorageServiceClient.prototype = {
|
|||
});
|
||||
},
|
||||
|
||||
/**
|
||||
* Bulk deletion of BSOs with no size limit.
|
||||
*
|
||||
* This allows a large amount of BSOs to be deleted easily. It will formulate
|
||||
* multiple `deleteBSOs` queries so the client does not exceed server limits.
|
||||
*
|
||||
* @param collection
|
||||
* (string) Name of collection to delete BSOs from.
|
||||
* @return StorageCollectionBatchedDelete
|
||||
*/
|
||||
deleteBSOsBatching: function deleteBSOsBatching(collection) {
|
||||
if (!collection) {
|
||||
throw new Error("collection argument must be defined.");
|
||||
}
|
||||
|
||||
return new StorageCollectionBatchedDelete(this, collection);
|
||||
},
|
||||
|
||||
/**
|
||||
* Deletes a single collection from the server.
|
||||
*
|
||||
|
|
|
@ -12,7 +12,11 @@ function run_test() {
|
|||
run_next_test();
|
||||
}
|
||||
|
||||
function getEmptyServer(user="765", password="password") {
|
||||
function getRandomUser() {
|
||||
return "" + (Math.floor(Math.random() * 100000) + 1);
|
||||
}
|
||||
|
||||
function getEmptyServer(user=getRandomUser(), password="password") {
|
||||
let users = {};
|
||||
users[user] = password;
|
||||
|
||||
|
@ -23,7 +27,7 @@ function getEmptyServer(user="765", password="password") {
|
|||
});
|
||||
}
|
||||
|
||||
function getClient(user="765", password="password") {
|
||||
function getClient(user=getRandomUser(), password="password") {
|
||||
let client = new StorageServiceClient(BASE_URI + "/" + user);
|
||||
client.addListener({
|
||||
onDispatch: function onDispatch(request) {
|
||||
|
@ -35,7 +39,7 @@ function getClient(user="765", password="password") {
|
|||
return client;
|
||||
}
|
||||
|
||||
function getServerAndClient(user="765", password="password") {
|
||||
function getServerAndClient(user=getRandomUser(), password="password") {
|
||||
let server = getEmptyServer(user, password);
|
||||
let client = getClient(user, password);
|
||||
|
||||
|
@ -644,9 +648,9 @@ add_test(function test_set_bsos_simple() {
|
|||
do_check_null(error);
|
||||
|
||||
let successful = req.successfulIDs;
|
||||
do_check_eq(successful.size(), 2);
|
||||
do_check_true(successful.has(bso0.id));
|
||||
do_check_true(successful.has(bso1.id));
|
||||
do_check_eq(successful.length, 2);
|
||||
do_check_eq(successful.indexOf(bso0.id), 0);
|
||||
do_check_true(successful.indexOf(bso1.id), 1);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
|
@ -701,7 +705,7 @@ add_test(function test_set_bsos_newline() {
|
|||
|
||||
request.dispatch(function onComplete(error, request) {
|
||||
do_check_null(error);
|
||||
do_check_eq(request.successfulIDs.size(), 2);
|
||||
do_check_eq(request.successfulIDs.length, 2);
|
||||
|
||||
let coll = user.collection("testcoll");
|
||||
do_check_eq(coll.bso("bso0").payload, bso0.payload);
|
||||
|
@ -965,3 +969,409 @@ add_test(function test_network_error_listener() {
|
|||
run_next_test();
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_too_large() {
|
||||
_("Ensure we throw when attempting to add a BSO that is too large to fit.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
let payload = "";
|
||||
|
||||
// The actual length of the payload is a little less. But, this ensures we
|
||||
// exceed it.
|
||||
for (let i = 0; i < client.REQUEST_SIZE_LIMIT; i++) {
|
||||
payload += i;
|
||||
}
|
||||
|
||||
let bso = new BasicStorageObject("bso");
|
||||
bso.payload = payload;
|
||||
do_check_throws(function add() { request.addBSO(bso); });
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_basic() {
|
||||
_("Ensure batching set works with single requests.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
for (let i = 0; i < 10; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = "payload" + i;
|
||||
request.addBSO(bso);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(request.successfulIDs.length, 10);
|
||||
|
||||
let collection = server.user(username).collection("testcoll");
|
||||
do_check_eq(collection.timestamp, request.serverModifiedVersion);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_batch_count() {
|
||||
_("Ensure multiple outgoing request batching works when count is exceeded.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
}
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
for (let i = 1; i <= 300; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = "XXXXXXX";
|
||||
request.addBSO(bso);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(request.successfulIDs.length, 300);
|
||||
do_check_eq(requestCount, 3);
|
||||
|
||||
let collection = server.user(username).collection("testcoll");
|
||||
do_check_eq(collection.timestamp, request.serverModifiedVersion);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_batch_size() {
|
||||
_("Ensure outgoing requests batch when size is exceeded.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
};
|
||||
|
||||
let limit = client.REQUEST_SIZE_LIMIT;
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
|
||||
// JavaScript: Y U NO EASY REPETITION FUNCTIONALITY?
|
||||
let data = [];
|
||||
for (let i = (limit / 2) - 100; i; i -= 1) {
|
||||
data.push("X");
|
||||
}
|
||||
|
||||
let payload = data.join("");
|
||||
|
||||
for (let i = 0; i < 4; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = payload;
|
||||
request.addBSO(bso);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(request.successfulIDs.length, 4);
|
||||
do_check_eq(requestCount, 2);
|
||||
|
||||
let collection = server.user(username).collection("testcoll");
|
||||
do_check_eq(collection.timestamp, request.serverModifiedVersion);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_flush() {
|
||||
_("Ensure flushing batch sets works.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
}
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
for (let i = 1; i < 101; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = "foo";
|
||||
request.addBSO(bso);
|
||||
|
||||
if (i % 10 == 0) {
|
||||
request.flush();
|
||||
}
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(request.successfulIDs.length, 100);
|
||||
do_check_eq(requestCount, 10);
|
||||
|
||||
let collection = server.user(username).collection("testcoll");
|
||||
do_check_eq(collection.timestamp, request.serverModifiedVersion);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_conditional_success() {
|
||||
_("Ensure conditional requests for batched sets work properly.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
|
||||
let collection = server.user(username).createCollection("testcoll");
|
||||
|
||||
let lastServerVersion = Date.now();
|
||||
collection.insertBSO(new ServerBSO("foo", "bar", lastServerVersion));
|
||||
do_check_eq(collection.timestamp, lastServerVersion);
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
}
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
request.locallyModifiedVersion = collection.timestamp;
|
||||
|
||||
for (let i = 1; i < 251; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = "foo" + i;
|
||||
request.addBSO(bso);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 3);
|
||||
|
||||
do_check_eq(collection.timestamp, request.serverModifiedVersion);
|
||||
do_check_eq(collection.timestamp, request.locallyModifiedVersion);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_initial_failure() {
|
||||
_("Ensure that an initial request failure setting BSOs is handled properly.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
|
||||
let collection = server.user(username).createCollection("testcoll");
|
||||
collection.timestamp = Date.now();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
}
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
request.locallyModifiedVersion = collection.timestamp - 1;
|
||||
|
||||
for (let i = 1; i < 250; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = "foo" + i;
|
||||
request.addBSO(bso);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 1);
|
||||
|
||||
do_check_eq(request.successfulIDs.length, 0);
|
||||
do_check_eq(Object.keys(request.failures).length, 0);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batching_set_subsequent_failure() {
|
||||
_("Ensure a non-initial failure during batching set is handled properly.");
|
||||
|
||||
let [server, client, username] = getServerAndClient();
|
||||
let collection = server.user(username).createCollection("testcoll");
|
||||
collection.timestamp = Date.now();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
|
||||
if (requestCount == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
collection.timestamp++;
|
||||
}
|
||||
|
||||
let request = client.setBSOsBatching("testcoll");
|
||||
request.locallyModifiedVersion = collection.timestamp;
|
||||
|
||||
for (let i = 0; i < 250; i++) {
|
||||
let bso = new BasicStorageObject("bso" + i);
|
||||
bso.payload = "foo" + i;
|
||||
request.addBSO(bso);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 2);
|
||||
do_check_eq(request.successfulIDs.length, 100);
|
||||
do_check_eq(Object.keys(request.failures).length, 0);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
function getBatchedDeleteData(collection="testcoll") {
|
||||
let [server, client, username] = getServerAndClient();
|
||||
|
||||
let serverBSOs = {};
|
||||
for (let i = 1000; i; i -= 1) {
|
||||
serverBSOs["bso" + i] = new ServerBSO("bso" + i, "payload" + i);
|
||||
}
|
||||
|
||||
let user = server.user(username);
|
||||
user.createCollection(collection, serverBSOs);
|
||||
|
||||
return [server, client, username, collection];
|
||||
}
|
||||
|
||||
add_test(function test_batched_delete_single() {
|
||||
_("Ensure batched delete with single request works.");
|
||||
|
||||
let [server, client, username, collection] = getBatchedDeleteData();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount += 1;
|
||||
}
|
||||
|
||||
let request = client.deleteBSOsBatching(collection);
|
||||
for (let i = 1; i < 51; i += 1) {
|
||||
request.addID("bso" + i);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 1);
|
||||
do_check_eq(request.errors.length, 0);
|
||||
|
||||
let coll = server.user(username).collection(collection);
|
||||
do_check_eq(coll.count(), 950);
|
||||
|
||||
do_check_eq(request.serverModifiedVersion, coll.timestamp);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batched_delete_multiple() {
|
||||
_("Ensure batched delete splits requests properly.");
|
||||
|
||||
let [server, client, username, collection] = getBatchedDeleteData();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount += 1;
|
||||
}
|
||||
|
||||
let request = client.deleteBSOsBatching(collection);
|
||||
for (let i = 1; i < 251; i += 1) {
|
||||
request.addID("bso" + i);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 3);
|
||||
do_check_eq(request.errors.length, 0);
|
||||
|
||||
let coll = server.user(username).collection(collection);
|
||||
do_check_eq(coll.count(), 750);
|
||||
|
||||
do_check_eq(request.serverModifiedVersion, coll.timestamp);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batched_delete_conditional_success() {
|
||||
_("Ensure conditional batched delete all work.");
|
||||
|
||||
let [server, client, username, collection] = getBatchedDeleteData();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
}
|
||||
|
||||
let serverCollection = server.user(username).collection(collection);
|
||||
let initialTimestamp = serverCollection.timestamp;
|
||||
|
||||
let request = client.deleteBSOsBatching(collection);
|
||||
request.locallyModifiedVersion = initialTimestamp;
|
||||
|
||||
for (let i = 1; i < 251; i += 1) {
|
||||
request.addID("bso" + 1);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 3);
|
||||
do_check_eq(request.errors.length, 0);
|
||||
|
||||
do_check_true(request.locallyModifiedVersion > initialTimestamp);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batched_delete_conditional_initial_failure() {
|
||||
_("Ensure conditional batched delete failure on initial request works.");
|
||||
|
||||
// The client needs to issue multiple requests but the first one was
|
||||
// rejected. The client should only issue that initial request.
|
||||
let [server, client, username, collection] = getBatchedDeleteData();
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
}
|
||||
|
||||
let serverCollection = server.user(username).collection(collection);
|
||||
let request = client.deleteBSOsBatching(collection);
|
||||
request.locallyModifiedVersion = serverCollection.timestamp - 1;
|
||||
|
||||
for (let i = 1; i < 251; i += 1) {
|
||||
request.addID("bso" + i);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 1);
|
||||
do_check_eq(request.errors.length, 1);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
||||
add_test(function test_batched_delete_conditional_subsequent_failure() {
|
||||
_("Ensure conditional batched delete failure on non-initial request.");
|
||||
|
||||
let [server, client, username, collection] = getBatchedDeleteData();
|
||||
|
||||
let serverCollection = server.user(username).collection(collection);
|
||||
|
||||
let requestCount = 0;
|
||||
server.callback.onRequest = function onRequest() {
|
||||
requestCount++;
|
||||
|
||||
if (requestCount <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Advance collection's timestamp on subsequent requests so request is
|
||||
// rejected.
|
||||
serverCollection.timestamp++;
|
||||
}
|
||||
|
||||
let request = client.deleteBSOsBatching(collection);
|
||||
request.locallyModifiedVersion = serverCollection.timestamp;
|
||||
|
||||
for (let i = 1; i < 251; i += 1) {
|
||||
request.addID("bso" + i);
|
||||
}
|
||||
|
||||
request.finish(function onFinish(request) {
|
||||
do_check_eq(requestCount, 2);
|
||||
do_check_eq(request.errors.length, 1);
|
||||
|
||||
server.stop(run_next_test);
|
||||
});
|
||||
});
|
||||
|
|
Загрузка…
Ссылка в новой задаче