зеркало из https://github.com/mozilla/gecko-dev.git
1040 строки
33 KiB
JavaScript
1040 строки
33 KiB
JavaScript
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
this.EXPORTED_SYMBOLS = [
|
|
"WBORecord",
|
|
"RecordManager",
|
|
"CryptoWrapper",
|
|
"CollectionKeyManager",
|
|
"Collection",
|
|
];
|
|
|
|
var Cc = Components.classes;
|
|
var Ci = Components.interfaces;
|
|
var Cr = Components.results;
|
|
var Cu = Components.utils;
|
|
|
|
const CRYPTO_COLLECTION = "crypto";
|
|
const KEYS_WBO = "keys";
|
|
|
|
Cu.import("resource://gre/modules/Log.jsm");
|
|
Cu.import("resource://services-sync/constants.js");
|
|
Cu.import("resource://services-sync/keys.js");
|
|
Cu.import("resource://services-sync/resource.js");
|
|
Cu.import("resource://services-sync/util.js");
|
|
Cu.import("resource://services-common/async.js");
|
|
|
|
this.WBORecord = function WBORecord(collection, id) {
|
|
this.data = {};
|
|
this.payload = {};
|
|
this.collection = collection; // Optional.
|
|
this.id = id; // Optional.
|
|
}
|
|
WBORecord.prototype = {
|
|
_logName: "Sync.Record.WBO",
|
|
|
|
get sortindex() {
|
|
if (this.data.sortindex)
|
|
return this.data.sortindex;
|
|
return 0;
|
|
},
|
|
|
|
// Get thyself from your URI, then deserialize.
|
|
// Set thine 'response' field.
|
|
fetch: function fetch(resource) {
|
|
if (!(resource instanceof Resource)) {
|
|
throw new Error("First argument must be a Resource instance.");
|
|
}
|
|
|
|
let r = resource.get();
|
|
if (r.success) {
|
|
this.deserialize(r); // Warning! Muffles exceptions!
|
|
}
|
|
this.response = r;
|
|
return this;
|
|
},
|
|
|
|
upload: function upload(resource) {
|
|
if (!(resource instanceof Resource)) {
|
|
throw new Error("First argument must be a Resource instance.");
|
|
}
|
|
|
|
return resource.put(this);
|
|
},
|
|
|
|
// Take a base URI string, with trailing slash, and return the URI of this
|
|
// WBO based on collection and ID.
|
|
uri(base) {
|
|
if (this.collection && this.id) {
|
|
let url = Utils.makeURI(base + this.collection + "/" + this.id);
|
|
url.QueryInterface(Ci.nsIURL);
|
|
return url;
|
|
}
|
|
return null;
|
|
},
|
|
|
|
deserialize: function deserialize(json) {
|
|
this.data = json.constructor.toString() == String ? JSON.parse(json) : json;
|
|
|
|
try {
|
|
// The payload is likely to be JSON, but if not, keep it as a string
|
|
this.payload = JSON.parse(this.payload);
|
|
} catch (ex) {}
|
|
},
|
|
|
|
toJSON: function toJSON() {
|
|
// Copy fields from data to be stringified, making sure payload is a string
|
|
let obj = {};
|
|
for (let [key, val] of Object.entries(this.data))
|
|
obj[key] = key == "payload" ? JSON.stringify(val) : val;
|
|
if (this.ttl)
|
|
obj.ttl = this.ttl;
|
|
return obj;
|
|
},
|
|
|
|
toString: function toString() {
|
|
return "{ " +
|
|
"id: " + this.id + " " +
|
|
"index: " + this.sortindex + " " +
|
|
"modified: " + this.modified + " " +
|
|
"ttl: " + this.ttl + " " +
|
|
"payload: " + JSON.stringify(this.payload) +
|
|
" }";
|
|
}
|
|
};
|
|
|
|
Utils.deferGetSet(WBORecord, "data", ["id", "modified", "sortindex", "payload"]);
|
|
|
|
this.CryptoWrapper = function CryptoWrapper(collection, id) {
|
|
this.cleartext = {};
|
|
WBORecord.call(this, collection, id);
|
|
this.ciphertext = null;
|
|
this.id = id;
|
|
}
|
|
CryptoWrapper.prototype = {
|
|
__proto__: WBORecord.prototype,
|
|
_logName: "Sync.Record.CryptoWrapper",
|
|
|
|
ciphertextHMAC: function ciphertextHMAC(keyBundle) {
|
|
let hasher = keyBundle.sha256HMACHasher;
|
|
if (!hasher) {
|
|
throw "Cannot compute HMAC without an HMAC key.";
|
|
}
|
|
|
|
return Utils.bytesAsHex(Utils.digestUTF8(this.ciphertext, hasher));
|
|
},
|
|
|
|
/*
|
|
* Don't directly use the sync key. Instead, grab a key for this
|
|
* collection, which is decrypted with the sync key.
|
|
*
|
|
* Cache those keys; invalidate the cache if the time on the keys collection
|
|
* changes, or other auth events occur.
|
|
*
|
|
* Optional key bundle overrides the collection key lookup.
|
|
*/
|
|
encrypt: function encrypt(keyBundle) {
|
|
if (!keyBundle) {
|
|
throw new Error("A key bundle must be supplied to encrypt.");
|
|
}
|
|
|
|
this.IV = Svc.Crypto.generateRandomIV();
|
|
this.ciphertext = Svc.Crypto.encrypt(JSON.stringify(this.cleartext),
|
|
keyBundle.encryptionKeyB64, this.IV);
|
|
this.hmac = this.ciphertextHMAC(keyBundle);
|
|
this.cleartext = null;
|
|
},
|
|
|
|
// Optional key bundle.
|
|
decrypt: function decrypt(keyBundle) {
|
|
if (!this.ciphertext) {
|
|
throw "No ciphertext: nothing to decrypt?";
|
|
}
|
|
|
|
if (!keyBundle) {
|
|
throw new Error("A key bundle must be supplied to decrypt.");
|
|
}
|
|
|
|
// Authenticate the encrypted blob with the expected HMAC
|
|
let computedHMAC = this.ciphertextHMAC(keyBundle);
|
|
|
|
if (computedHMAC != this.hmac) {
|
|
Utils.throwHMACMismatch(this.hmac, computedHMAC);
|
|
}
|
|
|
|
// Handle invalid data here. Elsewhere we assume that cleartext is an object.
|
|
let cleartext = Svc.Crypto.decrypt(this.ciphertext,
|
|
keyBundle.encryptionKeyB64, this.IV);
|
|
let json_result = JSON.parse(cleartext);
|
|
|
|
if (json_result && (json_result instanceof Object)) {
|
|
this.cleartext = json_result;
|
|
this.ciphertext = null;
|
|
} else {
|
|
throw "Decryption failed: result is <" + json_result + ">, not an object.";
|
|
}
|
|
|
|
// Verify that the encrypted id matches the requested record's id.
|
|
if (this.cleartext.id != this.id)
|
|
throw "Record id mismatch: " + this.cleartext.id + " != " + this.id;
|
|
|
|
return this.cleartext;
|
|
},
|
|
|
|
toString: function toString() {
|
|
let payload = this.deleted ? "DELETED" : JSON.stringify(this.cleartext);
|
|
|
|
return "{ " +
|
|
"id: " + this.id + " " +
|
|
"index: " + this.sortindex + " " +
|
|
"modified: " + this.modified + " " +
|
|
"ttl: " + this.ttl + " " +
|
|
"payload: " + payload + " " +
|
|
"collection: " + (this.collection || "undefined") +
|
|
" }";
|
|
},
|
|
|
|
// The custom setter below masks the parent's getter, so explicitly call it :(
|
|
get id() {
|
|
return WBORecord.prototype.__lookupGetter__("id").call(this);
|
|
},
|
|
|
|
// Keep both plaintext and encrypted versions of the id to verify integrity
|
|
set id(val) {
|
|
WBORecord.prototype.__lookupSetter__("id").call(this, val);
|
|
return this.cleartext.id = val;
|
|
},
|
|
};
|
|
|
|
Utils.deferGetSet(CryptoWrapper, "payload", ["ciphertext", "IV", "hmac"]);
|
|
Utils.deferGetSet(CryptoWrapper, "cleartext", "deleted");
|
|
|
|
/**
|
|
* An interface and caching layer for records.
|
|
*/
|
|
this.RecordManager = function RecordManager(service) {
|
|
this.service = service;
|
|
|
|
this._log = Log.repository.getLogger(this._logName);
|
|
this._records = {};
|
|
}
|
|
RecordManager.prototype = {
|
|
_recordType: CryptoWrapper,
|
|
_logName: "Sync.RecordManager",
|
|
|
|
import: function RecordMgr_import(url) {
|
|
this._log.trace("Importing record: " + (url.spec ? url.spec : url));
|
|
try {
|
|
// Clear out the last response with empty object if GET fails
|
|
this.response = {};
|
|
this.response = this.service.resource(url).get();
|
|
|
|
// Don't parse and save the record on failure
|
|
if (!this.response.success)
|
|
return null;
|
|
|
|
let record = new this._recordType(url);
|
|
record.deserialize(this.response);
|
|
|
|
return this.set(url, record);
|
|
} catch (ex) {
|
|
if (Async.isShutdownException(ex)) {
|
|
throw ex;
|
|
}
|
|
this._log.debug("Failed to import record", ex);
|
|
return null;
|
|
}
|
|
},
|
|
|
|
get: function RecordMgr_get(url) {
|
|
// Use a url string as the key to the hash
|
|
let spec = url.spec ? url.spec : url;
|
|
if (spec in this._records)
|
|
return this._records[spec];
|
|
return this.import(url);
|
|
},
|
|
|
|
set: function RecordMgr_set(url, record) {
|
|
let spec = url.spec ? url.spec : url;
|
|
return this._records[spec] = record;
|
|
},
|
|
|
|
contains: function RecordMgr_contains(url) {
|
|
if ((url.spec || url) in this._records)
|
|
return true;
|
|
return false;
|
|
},
|
|
|
|
clearCache: function recordMgr_clearCache() {
|
|
this._records = {};
|
|
},
|
|
|
|
del: function RecordMgr_del(url) {
|
|
delete this._records[url];
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Keeps track of mappings between collection names ('tabs') and KeyBundles.
|
|
*
|
|
* You can update this thing simply by giving it /info/collections. It'll
|
|
* use the last modified time to bring itself up to date.
|
|
*/
|
|
this.CollectionKeyManager = function CollectionKeyManager(lastModified, default_, collections) {
|
|
this.lastModified = lastModified || 0;
|
|
this._default = default_ || null;
|
|
this._collections = collections || {};
|
|
|
|
this._log = Log.repository.getLogger("Sync.CollectionKeyManager");
|
|
}
|
|
|
|
// TODO: persist this locally as an Identity. Bug 610913.
|
|
// Note that the last modified time needs to be preserved.
|
|
CollectionKeyManager.prototype = {
|
|
|
|
/**
|
|
* Generate a new CollectionKeyManager that has the same attributes
|
|
* as this one.
|
|
*/
|
|
clone() {
|
|
const newCollections = {};
|
|
for (let c in this._collections) {
|
|
newCollections[c] = this._collections[c];
|
|
}
|
|
|
|
return new CollectionKeyManager(this.lastModified, this._default, newCollections);
|
|
},
|
|
|
|
// Return information about old vs new keys:
|
|
// * same: true if two collections are equal
|
|
// * changed: an array of collection names that changed.
|
|
_compareKeyBundleCollections: function _compareKeyBundleCollections(m1, m2) {
|
|
let changed = [];
|
|
|
|
function process(m1, m2) {
|
|
for (let k1 in m1) {
|
|
let v1 = m1[k1];
|
|
let v2 = m2[k1];
|
|
if (!(v1 && v2 && v1.equals(v2)))
|
|
changed.push(k1);
|
|
}
|
|
}
|
|
|
|
// Diffs both ways.
|
|
process(m1, m2);
|
|
process(m2, m1);
|
|
|
|
// Return a sorted, unique array.
|
|
changed.sort();
|
|
let last;
|
|
changed = changed.filter(x => (x != last) && (last = x));
|
|
return {same: changed.length == 0,
|
|
changed};
|
|
},
|
|
|
|
get isClear() {
|
|
return !this._default;
|
|
},
|
|
|
|
clear: function clear() {
|
|
this._log.info("Clearing collection keys...");
|
|
this.lastModified = 0;
|
|
this._collections = {};
|
|
this._default = null;
|
|
},
|
|
|
|
keyForCollection(collection) {
|
|
if (collection && this._collections[collection])
|
|
return this._collections[collection];
|
|
|
|
return this._default;
|
|
},
|
|
|
|
/**
|
|
* If `collections` (an array of strings) is provided, iterate
|
|
* over it and generate random keys for each collection.
|
|
* Create a WBO for the given data.
|
|
*/
|
|
_makeWBO(collections, defaultBundle) {
|
|
let wbo = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO);
|
|
let c = {};
|
|
for (let k in collections) {
|
|
c[k] = collections[k].keyPairB64;
|
|
}
|
|
wbo.cleartext = {
|
|
"default": defaultBundle ? defaultBundle.keyPairB64 : null,
|
|
"collections": c,
|
|
"collection": CRYPTO_COLLECTION,
|
|
"id": KEYS_WBO
|
|
};
|
|
return wbo;
|
|
},
|
|
|
|
/**
|
|
* Create a WBO for the current keys.
|
|
*/
|
|
asWBO(collection, id) {
|
|
return this._makeWBO(this._collections, this._default);
|
|
},
|
|
|
|
/**
|
|
* Compute a new default key, and new keys for any specified collections.
|
|
*/
|
|
newKeys(collections) {
|
|
let newDefaultKeyBundle = this.newDefaultKeyBundle();
|
|
|
|
let newColls = {};
|
|
if (collections) {
|
|
collections.forEach(function(c) {
|
|
let b = new BulkKeyBundle(c);
|
|
b.generateRandom();
|
|
newColls[c] = b;
|
|
});
|
|
}
|
|
return [newDefaultKeyBundle, newColls];
|
|
},
|
|
|
|
/**
|
|
* Generates new keys, but does not replace our local copy. Use this to
|
|
* verify an upload before storing.
|
|
*/
|
|
generateNewKeysWBO(collections) {
|
|
let newDefaultKey, newColls;
|
|
[newDefaultKey, newColls] = this.newKeys(collections);
|
|
|
|
return this._makeWBO(newColls, newDefaultKey);
|
|
},
|
|
|
|
/**
|
|
* Create a new default key.
|
|
*
|
|
* @returns {BulkKeyBundle}
|
|
*/
|
|
newDefaultKeyBundle() {
|
|
const key = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME);
|
|
key.generateRandom();
|
|
return key;
|
|
},
|
|
|
|
/**
|
|
* Create a new default key and store it as this._default, since without one you cannot use setContents.
|
|
*/
|
|
generateDefaultKey() {
|
|
this._default = this.newDefaultKeyBundle();
|
|
},
|
|
|
|
/**
|
|
* Return true if keys are already present for each of the given
|
|
* collections.
|
|
*/
|
|
hasKeysFor(collections) {
|
|
// We can't use filter() here because sometimes collections is an iterator.
|
|
for (let collection of collections) {
|
|
if (!this._collections[collection]) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
},
|
|
|
|
/**
|
|
* Return a new CollectionKeyManager that has keys for each of the
|
|
* given collections (creating new ones for collections where we
|
|
* don't already have keys).
|
|
*/
|
|
ensureKeysFor(collections) {
|
|
const newKeys = Object.assign({}, this._collections);
|
|
for (let c of collections) {
|
|
if (newKeys[c]) {
|
|
continue; // don't replace existing keys
|
|
}
|
|
|
|
const b = new BulkKeyBundle(c);
|
|
b.generateRandom();
|
|
newKeys[c] = b;
|
|
}
|
|
return new CollectionKeyManager(this.lastModified, this._default, newKeys);
|
|
},
|
|
|
|
// Take the fetched info/collections WBO, checking the change
|
|
// time of the crypto collection.
|
|
updateNeeded(info_collections) {
|
|
|
|
this._log.info("Testing for updateNeeded. Last modified: " + this.lastModified);
|
|
|
|
// No local record of modification time? Need an update.
|
|
if (!this.lastModified)
|
|
return true;
|
|
|
|
// No keys on the server? We need an update, though our
|
|
// update handling will be a little more drastic...
|
|
if (!(CRYPTO_COLLECTION in info_collections))
|
|
return true;
|
|
|
|
// Otherwise, we need an update if our modification time is stale.
|
|
return (info_collections[CRYPTO_COLLECTION] > this.lastModified);
|
|
},
|
|
|
|
//
|
|
// Set our keys and modified time to the values fetched from the server.
|
|
// Returns one of three values:
|
|
//
|
|
// * If the default key was modified, return true.
|
|
// * If the default key was not modified, but per-collection keys were,
|
|
// return an array of such.
|
|
// * Otherwise, return false -- we were up-to-date.
|
|
//
|
|
setContents: function setContents(payload, modified) {
|
|
|
|
let self = this;
|
|
|
|
this._log.info("Setting collection keys contents. Our last modified: " +
|
|
this.lastModified + ", input modified: " + modified + ".");
|
|
|
|
if (!payload)
|
|
throw "No payload in CollectionKeyManager.setContents().";
|
|
|
|
if (!payload.default) {
|
|
this._log.warn("No downloaded default key: this should not occur.");
|
|
this._log.warn("Not clearing local keys.");
|
|
throw "No default key in CollectionKeyManager.setContents(). Cannot proceed.";
|
|
}
|
|
|
|
// Process the incoming default key.
|
|
let b = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME);
|
|
b.keyPairB64 = payload.default;
|
|
let newDefault = b;
|
|
|
|
// Process the incoming collections.
|
|
let newCollections = {};
|
|
if ("collections" in payload) {
|
|
this._log.info("Processing downloaded per-collection keys.");
|
|
let colls = payload.collections;
|
|
for (let k in colls) {
|
|
let v = colls[k];
|
|
if (v) {
|
|
let keyObj = new BulkKeyBundle(k);
|
|
keyObj.keyPairB64 = v;
|
|
newCollections[k] = keyObj;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check to see if these are already our keys.
|
|
let sameDefault = (this._default && this._default.equals(newDefault));
|
|
let collComparison = this._compareKeyBundleCollections(newCollections, this._collections);
|
|
let sameColls = collComparison.same;
|
|
|
|
if (sameDefault && sameColls) {
|
|
self._log.info("New keys are the same as our old keys!");
|
|
if (modified) {
|
|
self._log.info("Bumped local modified time.");
|
|
self.lastModified = modified;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Make sure things are nice and tidy before we set.
|
|
this.clear();
|
|
|
|
this._log.info("Saving downloaded keys.");
|
|
this._default = newDefault;
|
|
this._collections = newCollections;
|
|
|
|
// Always trust the server.
|
|
if (modified) {
|
|
self._log.info("Bumping last modified to " + modified);
|
|
self.lastModified = modified;
|
|
}
|
|
|
|
return sameDefault ? collComparison.changed : true;
|
|
},
|
|
|
|
updateContents: function updateContents(syncKeyBundle, storage_keys) {
|
|
let log = this._log;
|
|
log.info("Updating collection keys...");
|
|
|
|
// storage_keys is a WBO, fetched from storage/crypto/keys.
|
|
// Its payload is the default key, and a map of collections to keys.
|
|
// We lazily compute the key objects from the strings we're given.
|
|
|
|
let payload;
|
|
try {
|
|
payload = storage_keys.decrypt(syncKeyBundle);
|
|
} catch (ex) {
|
|
log.warn("Got exception \"" + ex + "\" decrypting storage keys with sync key.");
|
|
log.info("Aborting updateContents. Rethrowing.");
|
|
throw ex;
|
|
}
|
|
|
|
let r = this.setContents(payload, storage_keys.modified);
|
|
log.info("Collection keys updated.");
|
|
return r;
|
|
}
|
|
}
|
|
|
|
this.Collection = function Collection(uri, recordObj, service) {
|
|
if (!service) {
|
|
throw new Error("Collection constructor requires a service.");
|
|
}
|
|
|
|
Resource.call(this, uri);
|
|
|
|
// This is a bit hacky, but gets the job done.
|
|
let res = service.resource(uri);
|
|
this.authenticator = res.authenticator;
|
|
|
|
this._recordObj = recordObj;
|
|
this._service = service;
|
|
|
|
this._full = false;
|
|
this._ids = null;
|
|
this._limit = 0;
|
|
this._older = 0;
|
|
this._newer = 0;
|
|
this._data = [];
|
|
// optional members used by batch upload operations.
|
|
this._batch = null;
|
|
this._commit = false;
|
|
// Used for batch download operations -- note that this is explicitly an
|
|
// opaque value and not (necessarily) a number.
|
|
this._offset = null;
|
|
}
|
|
Collection.prototype = {
|
|
__proto__: Resource.prototype,
|
|
_logName: "Sync.Collection",
|
|
|
|
_rebuildURL: function Coll__rebuildURL() {
|
|
// XXX should consider what happens if it's not a URL...
|
|
this.uri.QueryInterface(Ci.nsIURL);
|
|
|
|
let args = [];
|
|
if (this.older)
|
|
args.push("older=" + this.older);
|
|
else if (this.newer) {
|
|
args.push("newer=" + this.newer);
|
|
}
|
|
if (this.full)
|
|
args.push("full=1");
|
|
if (this.sort)
|
|
args.push("sort=" + this.sort);
|
|
if (this.ids != null)
|
|
args.push("ids=" + this.ids);
|
|
if (this.limit > 0 && this.limit != Infinity)
|
|
args.push("limit=" + this.limit);
|
|
if (this._batch)
|
|
args.push("batch=" + encodeURIComponent(this._batch));
|
|
if (this._commit)
|
|
args.push("commit=true");
|
|
if (this._offset)
|
|
args.push("offset=" + encodeURIComponent(this._offset));
|
|
|
|
this.uri.query = (args.length > 0) ? "?" + args.join("&") : "";
|
|
},
|
|
|
|
// get full items
|
|
get full() { return this._full; },
|
|
set full(value) {
|
|
this._full = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// Apply the action to a certain set of ids
|
|
get ids() { return this._ids; },
|
|
set ids(value) {
|
|
this._ids = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// Limit how many records to get
|
|
get limit() { return this._limit; },
|
|
set limit(value) {
|
|
this._limit = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// get only items modified before some date
|
|
get older() { return this._older; },
|
|
set older(value) {
|
|
this._older = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// get only items modified since some date
|
|
get newer() { return this._newer; },
|
|
set newer(value) {
|
|
this._newer = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// get items sorted by some criteria. valid values:
|
|
// oldest (oldest first)
|
|
// newest (newest first)
|
|
// index
|
|
get sort() { return this._sort; },
|
|
set sort(value) {
|
|
this._sort = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
get offset() { return this._offset; },
|
|
set offset(value) {
|
|
this._offset = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// Set information about the batch for this request.
|
|
get batch() { return this._batch; },
|
|
set batch(value) {
|
|
this._batch = value;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
get commit() { return this._commit; },
|
|
set commit(value) {
|
|
this._commit = value && true;
|
|
this._rebuildURL();
|
|
},
|
|
|
|
// Similar to get(), but will page through the items `batchSize` at a time,
|
|
// deferring calling the record handler until we've gotten them all.
|
|
//
|
|
// Returns the last response processed, and doesn't run the record handler
|
|
// on any items if a non-success status is received while downloading the
|
|
// records (or if a network error occurs).
|
|
getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
|
|
let totalLimit = Number(this.limit) || Infinity;
|
|
if (batchSize <= 0 || batchSize >= totalLimit) {
|
|
// Invalid batch sizes should arguably be an error, but they're easy to handle
|
|
return this.get();
|
|
}
|
|
|
|
if (!this.full) {
|
|
throw new Error("getBatched is unimplemented for guid-only GETs");
|
|
}
|
|
|
|
// _onComplete and _onProgress are reset after each `get` by AsyncResource.
|
|
// We overwrite _onRecord to something that stores the data in an array
|
|
// until the end.
|
|
let { _onComplete, _onProgress, _onRecord } = this;
|
|
let recordBuffer = [];
|
|
let resp;
|
|
try {
|
|
this._onRecord = r => recordBuffer.push(r);
|
|
let lastModifiedTime;
|
|
this.limit = batchSize;
|
|
|
|
do {
|
|
this._onProgress = _onProgress;
|
|
this._onComplete = _onComplete;
|
|
if (batchSize + recordBuffer.length > totalLimit) {
|
|
this.limit = totalLimit - recordBuffer.length;
|
|
}
|
|
this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
|
|
// Actually perform the request
|
|
resp = this.get();
|
|
if (!resp.success) {
|
|
break;
|
|
}
|
|
|
|
// Initialize last modified, or check that something broken isn't happening.
|
|
let lastModified = resp.headers["x-last-modified"];
|
|
if (!lastModifiedTime) {
|
|
lastModifiedTime = lastModified;
|
|
this.setHeader("X-If-Unmodified-Since", lastModified);
|
|
} else if (lastModified != lastModifiedTime) {
|
|
// Should be impossible -- We'd get a 412 in this case.
|
|
throw new Error("X-Last-Modified changed in the middle of a download batch! " +
|
|
`${lastModified} => ${lastModifiedTime}`)
|
|
}
|
|
|
|
// If this is missing, we're finished.
|
|
this.offset = resp.headers["x-weave-next-offset"];
|
|
} while (this.offset && totalLimit > recordBuffer.length);
|
|
} finally {
|
|
// Ensure we undo any temporary state so that subsequent calls to get()
|
|
// or getBatched() work properly. We do this before calling the record
|
|
// handler so that we can more convincingly pretend to be a normal get()
|
|
// call. Note: we're resetting these to the values they had before this
|
|
// function was called.
|
|
this._onRecord = _onRecord;
|
|
this._limit = totalLimit;
|
|
this._offset = null;
|
|
delete this._headers["x-if-unmodified-since"];
|
|
this._rebuildURL();
|
|
}
|
|
if (resp.success && Async.checkAppReady()) {
|
|
// call the original _onRecord (e.g. the user supplied record handler)
|
|
// for each record we've stored
|
|
for (let record of recordBuffer) {
|
|
this._onRecord(record);
|
|
}
|
|
}
|
|
return resp;
|
|
},
|
|
|
|
set recordHandler(onRecord) {
|
|
// Save this because onProgress is called with this as the ChannelListener
|
|
let coll = this;
|
|
|
|
// Switch to newline separated records for incremental parsing
|
|
coll.setHeader("Accept", "application/newlines");
|
|
|
|
this._onRecord = onRecord;
|
|
|
|
this._onProgress = function() {
|
|
let newline;
|
|
while ((newline = this._data.indexOf("\n")) > 0) {
|
|
// Split the json record from the rest of the data
|
|
let json = this._data.slice(0, newline);
|
|
this._data = this._data.slice(newline + 1);
|
|
|
|
// Deserialize a record from json and give it to the callback
|
|
let record = new coll._recordObj();
|
|
record.deserialize(json);
|
|
coll._onRecord(record);
|
|
}
|
|
};
|
|
},
|
|
|
|
// This object only supports posting via the postQueue object.
|
|
post() {
|
|
throw new Error("Don't directly post to a collection - use newPostQueue instead");
|
|
},
|
|
|
|
newPostQueue(log, timestamp, postCallback) {
|
|
let poster = (data, headers, batch, commit) => {
|
|
this.batch = batch;
|
|
this.commit = commit;
|
|
for (let [header, value] of headers) {
|
|
this.setHeader(header, value);
|
|
}
|
|
return Resource.prototype.post.call(this, data);
|
|
}
|
|
let getConfig = (name, defaultVal) => {
|
|
if (this._service.serverConfiguration && this._service.serverConfiguration.hasOwnProperty(name)) {
|
|
return this._service.serverConfiguration[name];
|
|
}
|
|
return defaultVal;
|
|
}
|
|
|
|
let config = {
|
|
max_post_bytes: getConfig("max_post_bytes", MAX_UPLOAD_BYTES),
|
|
max_post_records: getConfig("max_post_records", MAX_UPLOAD_RECORDS),
|
|
|
|
max_batch_bytes: getConfig("max_total_bytes", Infinity),
|
|
max_batch_records: getConfig("max_total_records", Infinity),
|
|
}
|
|
|
|
// Handle config edge cases
|
|
if (config.max_post_records <= 0) { config.max_post_records = MAX_UPLOAD_RECORDS; }
|
|
if (config.max_batch_records <= 0) { config.max_batch_records = Infinity; }
|
|
if (config.max_post_bytes <= 0) { config.max_post_bytes = MAX_UPLOAD_BYTES; }
|
|
if (config.max_batch_bytes <= 0) { config.max_batch_bytes = Infinity; }
|
|
|
|
// Max size of BSO payload is 256k. This assumes at most 4k of overhead,
|
|
// which sounds like plenty. If the server says it can't handle this, we
|
|
// might have valid records we can't sync, so we give up on syncing.
|
|
let requiredMax = 260 * 1024;
|
|
if (config.max_post_bytes < requiredMax) {
|
|
this._log.error("Server configuration max_post_bytes is too low", config);
|
|
throw new Error("Server configuration max_post_bytes is too low");
|
|
}
|
|
|
|
return new PostQueue(poster, timestamp, config, log, postCallback);
|
|
},
|
|
};
|
|
|
|
/* A helper to manage the posting of records while respecting the various
|
|
size limits.
|
|
|
|
This supports the concept of a server-side "batch". The general idea is:
|
|
* We queue as many records as allowed in memory, then make a single POST.
|
|
* This first POST (optionally) gives us a batch ID, which we use for
|
|
all subsequent posts, until...
|
|
* At some point we hit a batch-maximum, and jump through a few hoops to
|
|
commit the current batch (ie, all previous POSTs) and start a new one.
|
|
* Eventually commit the final batch.
|
|
|
|
In most cases we expect there to be exactly 1 batch consisting of possibly
|
|
multiple POSTs.
|
|
*/
|
|
function PostQueue(poster, timestamp, config, log, postCallback) {
|
|
// The "post" function we should use when it comes time to do the post.
|
|
this.poster = poster;
|
|
this.log = log;
|
|
|
|
// The config we use. We expect it to have fields "max_post_records",
|
|
// "max_batch_records", "max_post_bytes", and "max_batch_bytes"
|
|
this.config = config;
|
|
|
|
// The callback we make with the response when we do get around to making the
|
|
// post (which could be during any of the enqueue() calls or the final flush())
|
|
// This callback may be called multiple times and must not add new items to
|
|
// the queue.
|
|
// The second argument passed to this callback is a boolean value that is true
|
|
// if we're in the middle of a batch, and false if either the batch is
|
|
// complete, or it's a post to a server that does not understand batching.
|
|
this.postCallback = postCallback;
|
|
|
|
// The string where we are capturing the stringified version of the records
|
|
// queued so far. It will always be invalid JSON as it is always missing the
|
|
// closing bracket.
|
|
this.queued = "";
|
|
|
|
// The number of records we've queued so far but are yet to POST.
|
|
this.numQueued = 0;
|
|
|
|
// The number of records/bytes we've processed in previous POSTs for our
|
|
// current batch. Does *not* include records currently queued for the next POST.
|
|
this.numAlreadyBatched = 0;
|
|
this.bytesAlreadyBatched = 0;
|
|
|
|
// The ID of our current batch. Can be undefined (meaning we are yet to make
|
|
// the first post of a patch, so don't know if we have a batch), null (meaning
|
|
// we've made the first post but the server response indicated no batching
|
|
// semantics), otherwise we have made the first post and it holds the batch ID
|
|
// returned from the server.
|
|
this.batchID = undefined;
|
|
|
|
// Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET.
|
|
this.lastModified = timestamp;
|
|
}
|
|
|
|
PostQueue.prototype = {
|
|
enqueue(record) {
|
|
// We want to ensure the record has a .toJSON() method defined - even
|
|
// though JSON.stringify() would implicitly call it, the stringify might
|
|
// still work even if it isn't defined, which isn't what we want.
|
|
let jsonRepr = record.toJSON();
|
|
if (!jsonRepr) {
|
|
throw new Error("You must only call this with objects that explicitly support JSON");
|
|
}
|
|
let bytes = JSON.stringify(jsonRepr);
|
|
|
|
// Do a flush if we can't add this record without exceeding our single-request
|
|
// limits, or without exceeding the total limit for a single batch.
|
|
let newLength = this.queued.length + bytes.length + 2; // extras for leading "[" / "," and trailing "]"
|
|
|
|
let maxAllowedBytes = Math.min(256 * 1024, this.config.max_post_bytes);
|
|
|
|
let postSizeExceeded = this.numQueued >= this.config.max_post_records ||
|
|
newLength >= maxAllowedBytes;
|
|
|
|
let batchSizeExceeded = (this.numQueued + this.numAlreadyBatched) >= this.config.max_batch_records ||
|
|
(newLength + this.bytesAlreadyBatched) >= this.config.max_batch_bytes;
|
|
|
|
let singleRecordTooBig = bytes.length + 2 > maxAllowedBytes;
|
|
|
|
if (postSizeExceeded || batchSizeExceeded) {
|
|
this.log.trace(`PostQueue flushing due to postSizeExceeded=${postSizeExceeded}, batchSizeExceeded=${batchSizeExceeded}` +
|
|
`, max_batch_bytes: ${this.config.max_batch_bytes}, max_post_bytes: ${this.config.max_post_bytes}`);
|
|
|
|
if (singleRecordTooBig) {
|
|
return { enqueued: false, error: new Error("Single record too large to submit to server") };
|
|
}
|
|
|
|
// We need to write the queue out before handling this one, but we only
|
|
// commit the batch (and thus start a new one) if the batch is full.
|
|
// Note that if a single record is too big for the batch or post, then
|
|
// the batch may be empty, and so we don't flush in that case.
|
|
if (this.numQueued) {
|
|
this.flush(batchSizeExceeded || singleRecordTooBig);
|
|
}
|
|
}
|
|
// Either a ',' or a '[' depending on whether this is the first record.
|
|
this.queued += this.numQueued ? "," : "[";
|
|
this.queued += bytes;
|
|
this.numQueued++;
|
|
return { enqueued: true };
|
|
},
|
|
|
|
flush(finalBatchPost) {
|
|
if (!this.queued) {
|
|
// nothing queued - we can't be in a batch, and something has gone very
|
|
// bad if we think we are.
|
|
if (this.batchID) {
|
|
throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`);
|
|
}
|
|
return;
|
|
}
|
|
// the batch query-param and headers we'll send.
|
|
let batch;
|
|
let headers = [];
|
|
if (this.batchID === undefined) {
|
|
// First commit in a (possible) batch.
|
|
batch = "true";
|
|
} else if (this.batchID) {
|
|
// We have an existing batch.
|
|
batch = this.batchID;
|
|
} else {
|
|
// Not the first post and we know we have no batch semantics.
|
|
batch = null;
|
|
}
|
|
|
|
headers.push(["x-if-unmodified-since", this.lastModified]);
|
|
|
|
this.log.info(`Posting ${this.numQueued} records of ${this.queued.length + 1} bytes with batch=${batch}`);
|
|
let queued = this.queued + "]";
|
|
if (finalBatchPost) {
|
|
this.bytesAlreadyBatched = 0;
|
|
this.numAlreadyBatched = 0;
|
|
} else {
|
|
this.bytesAlreadyBatched += queued.length;
|
|
this.numAlreadyBatched += this.numQueued;
|
|
}
|
|
this.queued = "";
|
|
this.numQueued = 0;
|
|
let response = this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
|
|
|
|
if (!response.success) {
|
|
this.log.trace("Server error response during a batch", response);
|
|
// not clear what we should do here - we expect the consumer of this to
|
|
// abort by throwing in the postCallback below.
|
|
return this.postCallback(response, !finalBatchPost);
|
|
}
|
|
|
|
if (finalBatchPost) {
|
|
this.log.trace("Committed batch", this.batchID);
|
|
this.batchID = undefined; // we are now in "first post for the batch" state.
|
|
this.lastModified = response.headers["x-last-modified"];
|
|
return this.postCallback(response, false);
|
|
}
|
|
|
|
if (response.status != 202) {
|
|
if (this.batchID) {
|
|
throw new Error("Server responded non-202 success code while a batch was in progress");
|
|
}
|
|
this.batchID = null; // no batch semantics are in place.
|
|
this.lastModified = response.headers["x-last-modified"];
|
|
return this.postCallback(response, false);
|
|
}
|
|
|
|
// this response is saying the server has batch semantics - we should
|
|
// always have a batch ID in the response.
|
|
let responseBatchID = response.obj.batch;
|
|
this.log.trace("Server responsed 202 with batch", responseBatchID);
|
|
if (!responseBatchID) {
|
|
this.log.error("Invalid server response: 202 without a batch ID", response);
|
|
throw new Error("Invalid server response: 202 without a batch ID");
|
|
}
|
|
|
|
if (this.batchID === undefined) {
|
|
this.batchID = responseBatchID;
|
|
if (!this.lastModified) {
|
|
this.lastModified = response.headers["x-last-modified"];
|
|
if (!this.lastModified) {
|
|
throw new Error("Batch response without x-last-modified");
|
|
}
|
|
}
|
|
}
|
|
|
|
if (this.batchID != responseBatchID) {
|
|
throw new Error(`Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}`);
|
|
}
|
|
|
|
this.postCallback(response, true);
|
|
},
|
|
}
|