use node-kvstore module
This commit is contained in:
Zach Carter 2013-05-20 14:07:16 -07:00
Родитель 6ce67eea44 867391ff68
Коммит ef131dc644
10 изменённых файлов: 42 добавлений и 710 удалений

Просмотреть файл

@ -1,15 +1,14 @@
const uuid = require('uuid');
const async = require('async');
const Hapi = require('hapi');
const kvstore = require('./kvstore');
const config = require('./config');
const kv = require('./kv');
const util = require('./util');
var internalError = Hapi.Error.internal;
var badRequest = Hapi.Error.badRequest;
var notFound = Hapi.Error.notFound;
var kv = kvstore.connect(config.get('kvstore'));
/* user account model
*
@ -40,20 +39,20 @@ exports.create = function(data, cb) {
async.waterfall([
// ensure that an account doesn't already exist for the email
function(cb) {
kv.get(data.email + '/uid', function (err, doc) {
kv.store.get(data.email + '/uid', function (err, doc) {
if (doc) return cb(badRequest('AccountExistsForEmail'));
cb(null);
});
},
// link email to userid
function(cb) {
kv.set(data.email + '/uid', userId, cb);
kv.store.set(data.email + '/uid', userId, cb);
},
// get new class A key
util.getKA,
// create user account
function(key, cb) {
kv.set(userKey, {
kv.store.set(userKey, {
params: data.params,
verifier: data.verifier,
kA: key,
@ -75,7 +74,7 @@ exports.startLogin = function(email, cb) {
// eventually will store SRP state
// and expiration time
kv.set(sid + '/session', {
kv.store.set(sid + '/session', {
uid: uid
}, function (err) {
// return sessionID
@ -93,7 +92,7 @@ exports.finishLogin = function(sessionId, verifier, cb) {
async.waterfall([
// get session doc
function(cb) {
kv.get(sessKey, function(err, session) {
kv.store.get(sessKey, function(err, session) {
if (err) return cb(err);
if (!session) return cb(notFound('UnknownSession'));
cb(null, session.value);
@ -120,11 +119,11 @@ exports.finishLogin = function(sessionId, verifier, cb) {
// create temporary account token doc
function(token, cb) {
accountToken = token;
kv.set(token + '/accountToken', { uid: uid }, cb);
kv.store.set(token + '/accountToken', { uid: uid }, cb);
},
// delete session doc
function(cb) {
kv.delete(sessKey, cb);
kv.store.delete(sessKey, cb);
},
// return info
function(cb) {
@ -176,7 +175,7 @@ exports.getSignToken = function(accountToken, cb) {
// This method returns the userId currently associated with an email address.
exports.getId = function(email, cb) {
kv.get(email + '/uid', function(err, result) {
kv.store.get(email + '/uid', function(err, result) {
if (err) return cb(internalError(err));
if (!result) return cb(notFound('UnknownUser'));
cb(null, result.value);
@ -185,7 +184,7 @@ exports.getId = function(email, cb) {
// get meta data associated with a user
exports.getUser = function(userId, cb) {
kv.get(userId + '/user', function(err, doc) {
kv.store.get(userId + '/user', function(err, doc) {
if (err) return cb(internalError(err));
if (!doc) return cb(notFound('UnknownUser'));
cb(null, doc.value);

Просмотреть файл

@ -32,6 +32,11 @@ var conf = module.exports = convict({
default: "./config/public-key.json"
},
kvstore: {
cache: {
format: AVAILABLE_BACKENDS,
default: 'memory',
env: 'KVSTORE_CACHE'
},
backend: {
format: AVAILABLE_BACKENDS,
default: "memory",
@ -115,7 +120,7 @@ if (process.env.CONFIG_FILES) {
conf.set('domain', url.parse(conf.get('public_url')).host);
if (conf.get('env') === 'test') {
if (conf.get('kvstore.backend') === 'mysql') {
if (conf.get('kvstore.backend') === 'mysql' || conf.get('kvstore.cache') === 'mysql') {
conf.set('mysql.database', 'test');
}
}

Просмотреть файл

@ -0,0 +1,7 @@
var config = require('./config');
var kvstore = require('kvstore')(config.root());
module.exports = {
cache: kvstore.connect({ backend: config.get('kvstore.cache') }),
store: kvstore.connect({ backend: config.get('kvstore.backend' )})
};

Просмотреть файл

@ -1,190 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* Very lightly abstracted key-value storage for PiCL projects.
*
* This module provides a simple key-value storage API that abstracts away
* the details of the underlying storage server. It explicitly mirrors the
* model used by the memcache protocol. In production it's currently intended
* to be mysql; for local development you can use an in-memory store.
*
* To obtain a database connection, call the connect() function:
*
* var kvstore = require('lib/kvstore');
* var db = kvstore.connect({<options>});
*
* This function takes an options hash to specify details of the underlying
* storage backend, and will fill in default options from runtime configuration
* data. It returns a connection object with the following methods:
*
* get(key, cb(<err>, <res>)):
*
* Get the data stored for the given key. The result will be an object
* with field 'value' giving the stored value, and field 'casid' giving
* the current CAS id. If the key does not exist then the result will be
* null.
*
*
* set(key, value, cb(<err>)):
*
* Unconditionally set the data stored for the given key.
*
*
* cas(key, value, casid, cb(<err>)):
*
* Check-and-set the data stored for the given key. The 'casid' should be
* a value taken from a previous call to get() for that key, or null to
* check that the key does not exist.
*
*
* delete(key, cb(<err>)):
*
* Unconditionally delete the data stored for the given key. There is no
* conditional delete since AFAIK it's not offered by
* couchbase.
*
* Here's an example of how these methods might be used:
*
* db.get("mydata", function(err, res) {
* if(err) throw err;
* console.log("My data is currently: " + res.value);
* db.cas("mydata", res.value + "newdata", res.casid, function(err) {
* if(err) throw "oh noes there was a write conflict";
* });
* });
*
* Each of the connection methods will transparently block until the underlying
* storage backend connection is established, which allows calls to connect()
* to be made synchronously. If you need to be notified when the underlying
* connection has been established, pass a callback to connect() like so:
*
* kvstore.connect({<options>}, function(err, db) {
* ...do stuff with the db...
* }
*
*/
var config = require('./config');
var hoek = require('hoek');
// The set of default options to use for new db connections in this process.
var DEFAULT_OPTIONS = config.get('kvstore');
// The set of available backend names.
// This will be populated with the loaded sub-modules on demand.
var AVAILABLE_BACKENDS = DEFAULT_OPTIONS.available_backends.reduce(
function(map, backend) {
map[backend] = null;
return map;
}, {});
module.exports = {
ERROR_CAS_MISMATCH: 'cas mismatch',
connect: function(options, cb) {
options = hoek.applyToDefaults(DEFAULT_OPTIONS, options || {});
// Load the specified backend implementation
// if it's not already available.
var backend = AVAILABLE_BACKENDS[options.backend];
if(backend === undefined) {
cb("invalid kvstore backend: " + backend);
return;
}
if(backend === null) {
backend = require("./kvstore/" + options.backend + ".js");
AVAILABLE_BACKENDS[options.backend] = backend;
}
// Create a blocking proxy object to return from this function.
// It will act just like the underlying backend connection, but
// all method calls will block until the connection is established.
var proxy = makeBlockingProxy();
// Connect via the backend implementation, and have it unblock
// the proxy object upon completion.
backend.connect(options, function(err, db) {
proxy._unblock(err, db);
if (cb) cb(err, db);
});
return proxy;
}
};
// Function to create a blocking proxy for a yet-to-be-established connection.
// This returns an object that looks and acts just like a kvstore connection,
// but whose method calls all transparently block until a real connection (or
// connection error) is provided asynchronously.
//
function makeBlockingProxy() {
// The proxy object to return.
var proxy = {};
// Variables to hold the connection, or connection error, once established.
var dbConnection = null;
var dbError = null;
// List of calls that are blocked waiting for the connection to be provided.
var waitList = [];
// Create a transparently-blocking method that proxies to the named method
// on the underlying connection.
//
function makeBlockingMethod(methodName) {
return function() {
if (dbConnection !== null) {
// The connection is ready, immediately call the underlying method.
dbConnection[methodName].apply(dbConnection, arguments);
} else if (dbError !== null) {
// The connection errored out, call the callback with an error.
// All kvstore methods take a callback as their final argument.
arguments[arguments.length - 1].call(undefined, dbError);
} else {
// The connection is pending, add this call to the waitlist.
waitList.push({ methodName: methodName, arguments: arguments });
}
};
}
proxy.get = makeBlockingMethod("get");
proxy.set = makeBlockingMethod("set");
proxy.cas = makeBlockingMethod("cas");
proxy.delete = makeBlockingMethod("delete");
// Private method which is called to provide the connection once established.
// This will continue execution of any waiting calls.
//
proxy._unblock = function _unblock(err, db) {
// Record the connection or error into the closed-over variables.
// If the connection was successful, optimize future use of the proxy
// proxy by copying over methods from the underlying connection.
if (err) {
dbError = err;
} else {
dbConnection = db;
proxy.get = db.get.bind(db);
proxy.set = db.set.bind(db);
proxy.cas = db.cas.bind(db);
proxy.delete = db.delete.bind(db);
}
// Resume any calls that are waiting for the connection.
// By re-calling the named method on the proxy object, we avoid duplicating
// the connection-or-error fulfillment logic from makeBlockingMethod().
waitList.forEach(function(blockedCall) {
process.nextTick(function() {
proxy[blockedCall.methodName].apply(proxy, blockedCall.arguments);
});
});
// Clean up so that things can be GC'd.
waitList = null;
delete proxy._unblock;
};
return proxy;
}

Просмотреть файл

@ -1,71 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
const Hapi = require('hapi');
const config = require('../config');
const kvstore = require('../kvstore');
const Memcached = require('memcached');
function MemcachedStore(options) {
this.client = new Memcached(options.hosts, options);
this.lifetime = options.lifetime;
}
MemcachedStore.connect = function connect(options, callback) {
options = Hapi.utils.merge(options, config.get('memcached'));
callback(null, new MemcachedStore(options));
};
MemcachedStore.prototype.get = function get(key, cb) {
this.client.gets(key,
function (err, result) {
if (err) { cb(err); }
else if (!result) { cb(null, null); }
else {
cb(null,
{
value: result[key],
casid: +(result.cas)
}
);
}
}
);
};
MemcachedStore.prototype.set = function set(key, value, cb) {
this.client.set(key, value, this.lifetime,
function (err, result) {
if (err) { cb(err); }
else if (!result) { cb('NOT SET'); }
else { cb(null); }
}
);
};
MemcachedStore.prototype.cas = function cas(key, value, casid, cb) {
this.client.cas(key, value, casid, this.lifetime,
function (err, result) {
if (err) { cb(err); }
else if (!result) { cb(kvstore.ERROR_CAS_MISMATCH); }
else { cb(null); }
}
);
};
MemcachedStore.prototype.delete = function del(key, cb) {
this.client.del(
key,
function (err) {
cb(err);
}
);
};
MemcachedStore.prototype.close = function close(cb) {
this.client.end();
if (cb) cb();
};
module.exports = MemcachedStore;

Просмотреть файл

@ -1,94 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* KVStore implementation using in-memory data.
*
*/
const hoek = require('hoek');
const kvstore = require('../kvstore');
// Hapi's clone function only works on objects.
// Wrap it so that other datatypes are returned unchanged.
function clone(value) {
if (typeof value !== 'object') return value;
return hoek.clone(value);
}
// This is the in-memory store for the data, shared across all connections.
// Each key maps to an object with keys 'value' and 'casid'.
// It's a very rough simulation of how memcache does its CAS.
var data = {};
module.exports = {
connect: function(options, cb) {
// Following the lead of couchbase node module, this is using closures
// and simple objects rather than instantiating a prototype connection.
function get(key, cb) {
process.nextTick(function() {
if (data[key] === undefined) {
cb(null, null);
} else {
// take a copy so caller cannot modify our internal data structures.
cb(null, {
value: clone(data[key].value),
casid: data[key].casid
});
}
});
}
function set(key, value, cb) {
value = clone(value);
process.nextTick(function() {
if (data[key] === undefined) {
data[key] = {
value: value,
casid: 1
};
} else {
data[key].value = value;
data[key].casid++;
}
cb(null);
});
}
function cas(key, value, casid, cb) {
value = clone(value);
process.nextTick(function() {
if (data[key] === undefined) {
if (casid !== null) return cb(kvstore.ERROR_CAS_MISMATCH);
data[key] = {
value: value,
casid: 1
};
} else {
if (data[key].casid !== casid) return cb(kvstore.ERROR_CAS_MISMATCH);
data[key].value = value;
data[key].casid++;
}
cb(null);
});
}
function del(key, cb) {
process.nextTick(function() {
delete data[key];
cb(null);
});
}
// use process.nextTick to make our api behave asynchronously
process.nextTick(function() {
cb(null, {get: get, set: set, cas: cas, delete: del});
});
}
};

Просмотреть файл

@ -1,156 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* KVStore implementation backed by mysql.
*
*/
const mysql = require('../mysql/wrapper.js');
const config = require('../config');
const Hapi = require('hapi');
const kvstore = require('../kvstore');
var client;
const schema =
"CREATE TABLE IF NOT EXISTS kvstore (" +
"kv_key VARCHAR(255) NOT NULL UNIQUE, " +
"kv_value MEDIUMTEXT NOT NULL," +
"kv_casid INTEGER NOT NULL" +
") ENGINE=InnoDB;"
;
module.exports = {
connect: function(cfg, connectCB) {
if (client) return connectCB(null, {get: get, set: set, cas: cas, delete: del});
cfg = Hapi.utils.merge(cfg, config.get('mysql'));
var options = {};
// Get the relevant client options from the mysql config
['host', 'port', 'user', 'password', 'database'].forEach(function(param) {
options[param] = cfg[param];
});
// Helper function for connecting to MySQL.
//
// XXX TODO: connection pooling
function connect(cb) {
client = mysql.createClient(options);
client.on('error', function(err) {
if (cb) {
cb(err);
cb = null;
}
});
return cb(null);
}
function createSchema(cb) {
var database = options.database;
// don't specify the database yet -- it shouldn't exist
delete options.database;
var createClient = mysql.createClient(options);
createClient.query("CREATE DATABASE IF NOT EXISTS " + database, function(err) {
if (err) return cb(err);
createClient.useDatabase(database, function(err) {
if (err) return cb(err);
createClient.query(schema, function(err) {
if (err) return cb(err);
// reset the database name
options.database = database;
connect(cb);
});
});
});
}
function get(key, cb) {
var query = "SELECT kv_value, kv_casid FROM kvstore WHERE kv_key = ?";
client.query(query, [key], function(err, results) {
if (err) return cb(err);
if (!results.length) return cb(null, null);
var value, error = null;
try {
value = JSON.parse(results[0].kv_value);
} catch(e) {
error = e;
}
return cb(error, {
value: value,
casid: results[0].kv_casid
});
});
}
function set(key, value, cb) {
var query = "INSERT INTO kvstore (kv_key, kv_value, kv_casid)" +
" VALUES (?, ?, 0)" +
" ON DUPLICATE KEY UPDATE" +
" kv_value=VALUES(kv_value), kv_casid = kv_casid + 1";
client.query(query, [key, JSON.stringify(value)], function(err) {
return cb(err);
});
}
function cas(key, value, casid, cb) {
var query;
var args = [JSON.stringify(value), key];
if (casid === null) {
query = "INSERT INTO kvstore (kv_value, kv_key, kv_casid)" +
" VALUES (?, ?, 0)";
} else {
query = "UPDATE kvstore SET kv_value=?, kv_casid=kv_casid+1" +
" WHERE kv_key = ? and kv_casid = ?";
args.push(casid);
}
client.query(query, args, function(err, result) {
// XXX TODO: check for a constraint violation if casid == null.
if (casid !== null && result.affectedRows === 0) err = kvstore.ERROR_CAS_MISMATCH;
if (err) console.log(err);
return cb(err);
});
}
function del(key, cb) {
var query = "DELETE FROM kvstore WHERE kv_key=?";
client.query(query, [key], function(err) {
return cb(err);
});
}
function returnClientCallback (err) {
connectCB(err, {get: get, set: set, cas: cas, delete: del});
}
if (cfg.create_schema) {
createSchema(returnClientCallback);
} else {
connect(returnClientCallback);
}
},
close: function(cb) {
client.end(function(err) {
client = undefined;
if (err) console.error(err);
if (cb) cb(err);
});
},
closeAndRemove: function(cb) {
client.query("DROP DATABASE " + "test", function() {
module.exports.close(cb);
});
}
};

Просмотреть файл

@ -1,161 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/* This abstraction wraps the mysql driver and provides application level
* queueing, as well as query timing and reconnect upon an apparently "stalled"
* driver
*/
const mysql = require('mysql');
const config = require('../config.js');
exports.createClient = function(options) {
// the application level query queue
var queryQueue = [];
// The slowQueryTimer is !null when a query is running, and holds
// the result from setTimeout. This variable is both a means to
// check if a query is running (only one runs at a time), and as
// the timeout handle.
var slowQueryTimer = null;
// how many consecutive failures have we seen when running queries?
var consecutiveFailures = 0;
// a testing feature. By calling `client.stall` you can
// cause responses to be dropped which will trigger slow query detection
var stalled = false;
var client = {
stall: function(stalledState) {
stalled = stalledState;
},
realClient: null,
_resetConnection: function() {
if (this.realClient) this.realClient.destroy();
this.realClient = mysql.createClient(options);
this.realClient.on('error', function(e) {
console.log("database connection down: " + e.toString());
});
},
ping: function(client_cb) {
// ping queries are added to the front of the pending work queue. they are
// a priority, as they are used by load balancers that want to know the health
// of the system.
queryQueue.unshift({
ping: true,
cb: client_cb
});
this._runNextQuery();
},
_runNextQuery: function() {
var self = this;
if (slowQueryTimer !== null || !queryQueue.length) return;
var work = queryQueue.shift();
function invokeCallback(cb, err, rez) {
if (cb) {
process.nextTick(function() {
try {
cb(err, rez);
} catch(e) {
console.error('database query callback failed: ' + e.toString());
}
});
}
}
slowQueryTimer = setTimeout(function() {
if (++consecutiveFailures > config.get('mysql.max_reconnect_attempts')) {
// if we can't run the query multiple times in a row, we'll fail all outstanding
// queries, and reinitialize the connection, so that the process stays up and
// retries mysql connection the next time a request which requires db interaction
// comes in.
queryQueue.unshift(work);
console.log("cannot reconnect to mysql! " + queryQueue.length + " outstanding queries #fail.");
queryQueue.forEach(function(work) {
invokeCallback(work.cb, "database connection unavailable");
});
queryQueue = [];
self._resetConnection();
slowQueryTimer = null;
} else {
console.log("Query taking more than " + config.get('mysql.max_query_time_ms') + "ms! reconnecting to mysql");
// we'll fail the long running query, because we cannot
// meaningfully know whether or not it completed in the case where
// the driver is unresponsive.
invokeCallback(work.cb, "database connection unavailable");
self._resetConnection();
slowQueryTimer = null;
self._runNextQuery();
}
}, config.get('mysql.max_query_time_ms'));
if (work.ping) {
this.realClient.ping(function(err) {
if (stalled) {
return invokeCallback(work.cb, "database is intentionally stalled");
}
clearTimeout(slowQueryTimer);
slowQueryTimer = null;
consecutiveFailures = 0;
invokeCallback(work.cb, err);
self._runNextQuery();
});
} else {
this.realClient.query(work.query, work.args, function(err, r) {
// if we want to simulate a "stalled" mysql connection, we simply
// ignore the results from a query.
if (stalled) return;
clearTimeout(slowQueryTimer);
slowQueryTimer = null;
consecutiveFailures = 0;
// report query time for all queries
//var reqTime = new Date() - work.startTime;
//console.log('query_time', reqTime);
// report failed queries
if (err) console.error('failed_query', work.query, err);
invokeCallback(work.cb, err, r);
self._runNextQuery();
});
}
},
query: function() {
var client_cb;
var args = Array.prototype.slice.call(arguments);
var query = args.shift();
if (args.length && typeof args[args.length - 1] === 'function') {
client_cb = args.pop();
}
args = args.length ? args[0] : [];
queryQueue.push({
query: query,
args: args,
cb: client_cb,
// record the time .query was called by the application for
// true end to end query timing in statsd
startTime: new Date()
});
this._runNextQuery();
},
end: function(cb) {
this.realClient.end(cb);
},
on: function(ev, cb) {
this.realClient.on(ev, cb);
},
useDatabase: function(db, cb) {
this.realClient.useDatabase(db, cb);
}
};
client._resetConnection();
client.database = client.realClient.database;
return client;
};

Просмотреть файл

@ -21,19 +21,16 @@
"compute-cluster": "0.0.7",
"jwcrypto": "0.4.3",
"handlebars": "1.0.10",
"convict": "0.1.0",
"convict": "0.1.1",
"hoek": "0.8.5",
"uuid": "1.4.1",
"async": "0.2.8"
"async": "0.2.8",
"kvstore": "git://github.com/mozilla/node-kvstore.git#4b8c2f6763"
},
"devDependencies": {
"awsbox": "0.4.x",
"mocha": "1.9.x",
"request": "2.21.x",
"jshint": "0.9.1"
},
"optionalDependencies": {
"memcached": "0.2.2",
"mysql": "0.9.5"
}
}

Просмотреть файл

@ -2,11 +2,8 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
const Hapi = require('hapi');
const config = require('../lib/config');
const kvstore = require('../lib/kvstore');
const kv = require('../lib/kv');
const async = require('async');
exports.routes = [
{
@ -18,19 +15,18 @@ exports.routes = [
}
];
// heartbeat
function heartbeat() {
var response = new Hapi.response.Text();
var handler = this;
// check for database connection
kvstore.connect(config.get('kvstore'), function(err) {
if (err) {
response.message(err.toString(), 'text/plain');
return handler.reply(response);
}
response.message(err ? String(err) : 'ok', 'text/plain');
handler.reply(response);
});
function heartbeat(request) {
async.each(
[kv.store, kv.cache],
function (db, done) {
db.ping(done);
},
function (err) {
var text = 'ok';
if (err) {
text = err.toString();
}
request.reply(text).type('text/plain');
}
);
}