зеркало из https://github.com/mozilla/fxa.git
use node-kvstore module
This commit is contained in:
Родитель
c0ffe969f8
Коммит
5409beafb0
|
@ -1,15 +1,14 @@
|
|||
const uuid = require('uuid');
|
||||
const async = require('async');
|
||||
const Hapi = require('hapi');
|
||||
const kvstore = require('./kvstore');
|
||||
const config = require('./config');
|
||||
const kv = require('./kv');
|
||||
const util = require('./util');
|
||||
|
||||
var internalError = Hapi.Error.internal;
|
||||
var badRequest = Hapi.Error.badRequest;
|
||||
var notFound = Hapi.Error.notFound;
|
||||
|
||||
var kv = kvstore.connect(config.get('kvstore'));
|
||||
|
||||
/* user account model
|
||||
*
|
||||
|
@ -40,20 +39,20 @@ exports.create = function(data, cb) {
|
|||
async.waterfall([
|
||||
// ensure that an account doesn't already exist for the email
|
||||
function(cb) {
|
||||
kv.get(data.email + '/uid', function (err, doc) {
|
||||
kv.store.get(data.email + '/uid', function (err, doc) {
|
||||
if (doc) return cb(badRequest('AccountExistsForEmail'));
|
||||
cb(null);
|
||||
});
|
||||
},
|
||||
// link email to userid
|
||||
function(cb) {
|
||||
kv.set(data.email + '/uid', userId, cb);
|
||||
kv.store.set(data.email + '/uid', userId, cb);
|
||||
},
|
||||
// get new class A key
|
||||
util.getKA,
|
||||
// create user account
|
||||
function(key, cb) {
|
||||
kv.set(userKey, {
|
||||
kv.store.set(userKey, {
|
||||
params: data.params,
|
||||
verifier: data.verifier,
|
||||
kA: key,
|
||||
|
@ -75,7 +74,7 @@ exports.startLogin = function(email, cb) {
|
|||
|
||||
// eventually will store SRP state
|
||||
// and expiration time
|
||||
kv.set(sid + '/session', {
|
||||
kv.store.set(sid + '/session', {
|
||||
uid: uid
|
||||
}, function (err) {
|
||||
// return sessionID
|
||||
|
@ -93,7 +92,7 @@ exports.finishLogin = function(sessionId, verifier, cb) {
|
|||
async.waterfall([
|
||||
// get session doc
|
||||
function(cb) {
|
||||
kv.get(sessKey, function(err, session) {
|
||||
kv.store.get(sessKey, function(err, session) {
|
||||
if (err) return cb(err);
|
||||
if (!session) return cb(notFound('UnknownSession'));
|
||||
cb(null, session.value);
|
||||
|
@ -120,11 +119,11 @@ exports.finishLogin = function(sessionId, verifier, cb) {
|
|||
// create temporary account token doc
|
||||
function(token, cb) {
|
||||
accountToken = token;
|
||||
kv.set(token + '/accountToken', { uid: uid }, cb);
|
||||
kv.store.set(token + '/accountToken', { uid: uid }, cb);
|
||||
},
|
||||
// delete session doc
|
||||
function(cb) {
|
||||
kv.delete(sessKey, cb);
|
||||
kv.store.delete(sessKey, cb);
|
||||
},
|
||||
// return info
|
||||
function(cb) {
|
||||
|
@ -139,7 +138,7 @@ exports.finishLogin = function(sessionId, verifier, cb) {
|
|||
|
||||
// This method returns the userId currently associated with an email address.
|
||||
exports.getId = function(email, cb) {
|
||||
kv.get(email + '/uid', function(err, result) {
|
||||
kv.store.get(email + '/uid', function(err, result) {
|
||||
if (err) return cb(internalError(err));
|
||||
if (!result) return cb(notFound('UnknownUser'));
|
||||
cb(null, result.value);
|
||||
|
@ -148,7 +147,7 @@ exports.getId = function(email, cb) {
|
|||
|
||||
// get meta data associated with a user
|
||||
exports.getUser = function(userId, cb) {
|
||||
kv.get(userId + '/user', function(err, doc) {
|
||||
kv.store.get(userId + '/user', function(err, doc) {
|
||||
if (err) return cb(internalError(err));
|
||||
if (!doc) return cb(notFound('UnknownUser'));
|
||||
cb(null, doc.value);
|
||||
|
|
|
@ -26,6 +26,11 @@ var conf = module.exports = convict({
|
|||
default: "http://127.0.0.1:9000"
|
||||
},
|
||||
kvstore: {
|
||||
cache: {
|
||||
format: AVAILABLE_BACKENDS,
|
||||
default: 'memory',
|
||||
env: 'KVSTORE_CACHE'
|
||||
},
|
||||
backend: {
|
||||
format: AVAILABLE_BACKENDS,
|
||||
default: "memory",
|
||||
|
@ -109,7 +114,7 @@ if (process.env.CONFIG_FILES) {
|
|||
conf.set('domain', url.parse(conf.get('public_url')).hostname);
|
||||
|
||||
if (conf.get('env') === 'test') {
|
||||
if (conf.get('kvstore.backend') === 'mysql') {
|
||||
if (conf.get('kvstore.backend') === 'mysql' || conf.get('kvstore.cache') === 'mysql') {
|
||||
conf.set('mysql.database', 'test');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
var config = require('./config');
|
||||
var kvstore = require('kvstore')(config);
|
||||
|
||||
module.exports = {
|
||||
cache: kvstore.connect({ backend: config.get('kvstore.cache') }),
|
||||
store: kvstore.connect({ backend: config.get('kvstore.backend' )})
|
||||
};
|
|
@ -1,190 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
/**
|
||||
* Very lightly abstracted key-value storage for PiCL projects.
|
||||
*
|
||||
* This module provides a simple key-value storage API that abstracts away
|
||||
* the details of the underlying storage server. It explicitly mirrors the
|
||||
* model used by the memcache protocol. In production it's currently intended
|
||||
* to be mysql; for local development you can use an in-memory store.
|
||||
*
|
||||
* To obtain a database connection, call the connect() function:
|
||||
*
|
||||
* var kvstore = require('lib/kvstore');
|
||||
* var db = kvstore.connect({<options>});
|
||||
*
|
||||
* This function takes an options hash to specify details of the underlying
|
||||
* storage backend, and will fill in default options from runtime configuration
|
||||
* data. It returns a connection object with the following methods:
|
||||
*
|
||||
* get(key, cb(<err>, <res>)):
|
||||
*
|
||||
* Get the data stored for the given key. The result will be an object
|
||||
* with field 'value' giving the stored value, and field 'casid' giving
|
||||
* the current CAS id. If the key does not exist then the result will be
|
||||
* null.
|
||||
*
|
||||
*
|
||||
* set(key, value, cb(<err>)):
|
||||
*
|
||||
* Unconditionally set the data stored for the given key.
|
||||
*
|
||||
*
|
||||
* cas(key, value, casid, cb(<err>)):
|
||||
*
|
||||
* Check-and-set the data stored for the given key. The 'casid' should be
|
||||
* a value taken from a previous call to get() for that key, or null to
|
||||
* check that the key does not exist.
|
||||
*
|
||||
*
|
||||
* delete(key, cb(<err>)):
|
||||
*
|
||||
* Unconditionally delete the data stored for the given key. There is no
|
||||
* conditional delete since AFAIK it's not offered by
|
||||
* couchbase.
|
||||
*
|
||||
* Here's an example of how these methods might be used:
|
||||
*
|
||||
* db.get("mydata", function(err, res) {
|
||||
* if(err) throw err;
|
||||
* console.log("My data is currently: " + res.value);
|
||||
* db.cas("mydata", res.value + "newdata", res.casid, function(err) {
|
||||
* if(err) throw "oh noes there was a write conflict";
|
||||
* });
|
||||
* });
|
||||
*
|
||||
* Each of the connection methods will transparently block until the underlying
|
||||
* storage backend connection is established, which allows calls to connect()
|
||||
* to be made synchronously. If you need to be notified when the underlying
|
||||
* connection has been established, pass a callback to connect() like so:
|
||||
*
|
||||
* kvstore.connect({<options>}, function(err, db) {
|
||||
* ...do stuff with the db...
|
||||
* }
|
||||
*
|
||||
*/
|
||||
|
||||
var config = require('./config');
|
||||
var hoek = require('hoek');
|
||||
|
||||
|
||||
// The set of default options to use for new db connections in this process.
|
||||
var DEFAULT_OPTIONS = config.get('kvstore');
|
||||
|
||||
|
||||
// The set of available backend names.
|
||||
// This will be populated with the loaded sub-modules on demand.
|
||||
var AVAILABLE_BACKENDS = DEFAULT_OPTIONS.available_backends.reduce(
|
||||
function(map, backend) {
|
||||
map[backend] = null;
|
||||
return map;
|
||||
}, {});
|
||||
|
||||
|
||||
module.exports = {
|
||||
ERROR_CAS_MISMATCH: 'cas mismatch',
|
||||
|
||||
connect: function(options, cb) {
|
||||
options = hoek.applyToDefaults(DEFAULT_OPTIONS, options || {});
|
||||
|
||||
// Load the specified backend implementation
|
||||
// if it's not already available.
|
||||
var backend = AVAILABLE_BACKENDS[options.backend];
|
||||
if(backend === undefined) {
|
||||
cb("invalid kvstore backend: " + backend);
|
||||
return;
|
||||
}
|
||||
if(backend === null) {
|
||||
backend = require("./kvstore/" + options.backend + ".js");
|
||||
AVAILABLE_BACKENDS[options.backend] = backend;
|
||||
}
|
||||
|
||||
// Create a blocking proxy object to return from this function.
|
||||
// It will act just like the underlying backend connection, but
|
||||
// all method calls will block until the connection is established.
|
||||
var proxy = makeBlockingProxy();
|
||||
|
||||
// Connect via the backend implementation, and have it unblock
|
||||
// the proxy object upon completion.
|
||||
backend.connect(options, function(err, db) {
|
||||
proxy._unblock(err, db);
|
||||
if (cb) cb(err, db);
|
||||
});
|
||||
|
||||
return proxy;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Function to create a blocking proxy for a yet-to-be-established connection.
|
||||
// This returns an object that looks and acts just like a kvstore connection,
|
||||
// but whose method calls all transparently block until a real connection (or
|
||||
// connection error) is provided asynchronously.
|
||||
//
|
||||
function makeBlockingProxy() {
|
||||
// The proxy object to return.
|
||||
var proxy = {};
|
||||
|
||||
// Variables to hold the connection, or connection error, once established.
|
||||
var dbConnection = null;
|
||||
var dbError = null;
|
||||
|
||||
// List of calls that are blocked waiting for the connection to be provided.
|
||||
var waitList = [];
|
||||
|
||||
// Create a transparently-blocking method that proxies to the named method
|
||||
// on the underlying connection.
|
||||
//
|
||||
function makeBlockingMethod(methodName) {
|
||||
return function() {
|
||||
if (dbConnection !== null) {
|
||||
// The connection is ready, immediately call the underlying method.
|
||||
dbConnection[methodName].apply(dbConnection, arguments);
|
||||
} else if (dbError !== null) {
|
||||
// The connection errored out, call the callback with an error.
|
||||
// All kvstore methods take a callback as their final argument.
|
||||
arguments[arguments.length - 1].call(undefined, dbError);
|
||||
} else {
|
||||
// The connection is pending, add this call to the waitlist.
|
||||
waitList.push({ methodName: methodName, arguments: arguments });
|
||||
}
|
||||
};
|
||||
}
|
||||
proxy.get = makeBlockingMethod("get");
|
||||
proxy.set = makeBlockingMethod("set");
|
||||
proxy.cas = makeBlockingMethod("cas");
|
||||
proxy.delete = makeBlockingMethod("delete");
|
||||
|
||||
// Private method which is called to provide the connection once established.
|
||||
// This will continue execution of any waiting calls.
|
||||
//
|
||||
proxy._unblock = function _unblock(err, db) {
|
||||
// Record the connection or error into the closed-over variables.
|
||||
// If the connection was successful, optimize future use of the proxy
|
||||
// proxy by copying over methods from the underlying connection.
|
||||
if (err) {
|
||||
dbError = err;
|
||||
} else {
|
||||
dbConnection = db;
|
||||
proxy.get = db.get.bind(db);
|
||||
proxy.set = db.set.bind(db);
|
||||
proxy.cas = db.cas.bind(db);
|
||||
proxy.delete = db.delete.bind(db);
|
||||
}
|
||||
// Resume any calls that are waiting for the connection.
|
||||
// By re-calling the named method on the proxy object, we avoid duplicating
|
||||
// the connection-or-error fulfillment logic from makeBlockingMethod().
|
||||
waitList.forEach(function(blockedCall) {
|
||||
process.nextTick(function() {
|
||||
proxy[blockedCall.methodName].apply(proxy, blockedCall.arguments);
|
||||
});
|
||||
});
|
||||
// Clean up so that things can be GC'd.
|
||||
waitList = null;
|
||||
delete proxy._unblock;
|
||||
};
|
||||
|
||||
return proxy;
|
||||
}
|
|
@ -1,71 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
const Hapi = require('hapi');
|
||||
const config = require('../config');
|
||||
const kvstore = require('../kvstore');
|
||||
const Memcached = require('memcached');
|
||||
|
||||
function MemcachedStore(options) {
|
||||
this.client = new Memcached(options.hosts, options);
|
||||
this.lifetime = options.lifetime;
|
||||
}
|
||||
|
||||
MemcachedStore.connect = function connect(options, callback) {
|
||||
options = Hapi.utils.merge(options, config.get('memcached'));
|
||||
callback(null, new MemcachedStore(options));
|
||||
};
|
||||
|
||||
MemcachedStore.prototype.get = function get(key, cb) {
|
||||
this.client.gets(key,
|
||||
function (err, result) {
|
||||
if (err) { cb(err); }
|
||||
else if (!result) { cb(null, null); }
|
||||
else {
|
||||
cb(null,
|
||||
{
|
||||
value: result[key],
|
||||
casid: +(result.cas)
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
MemcachedStore.prototype.set = function set(key, value, cb) {
|
||||
this.client.set(key, value, this.lifetime,
|
||||
function (err, result) {
|
||||
if (err) { cb(err); }
|
||||
else if (!result) { cb('NOT SET'); }
|
||||
else { cb(null); }
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
MemcachedStore.prototype.cas = function cas(key, value, casid, cb) {
|
||||
this.client.cas(key, value, casid, this.lifetime,
|
||||
function (err, result) {
|
||||
if (err) { cb(err); }
|
||||
else if (!result) { cb(kvstore.ERROR_CAS_MISMATCH); }
|
||||
else { cb(null); }
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
MemcachedStore.prototype.delete = function del(key, cb) {
|
||||
this.client.del(
|
||||
key,
|
||||
function (err) {
|
||||
cb(err);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
MemcachedStore.prototype.close = function close(cb) {
|
||||
this.client.end();
|
||||
if (cb) cb();
|
||||
};
|
||||
|
||||
module.exports = MemcachedStore;
|
|
@ -1,94 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
/**
|
||||
* KVStore implementation using in-memory data.
|
||||
*
|
||||
*/
|
||||
|
||||
const hoek = require('hoek');
|
||||
const kvstore = require('../kvstore');
|
||||
|
||||
|
||||
// Hapi's clone function only works on objects.
|
||||
// Wrap it so that other datatypes are returned unchanged.
|
||||
function clone(value) {
|
||||
if (typeof value !== 'object') return value;
|
||||
return hoek.clone(value);
|
||||
}
|
||||
|
||||
// This is the in-memory store for the data, shared across all connections.
|
||||
// Each key maps to an object with keys 'value' and 'casid'.
|
||||
// It's a very rough simulation of how memcache does its CAS.
|
||||
var data = {};
|
||||
|
||||
module.exports = {
|
||||
|
||||
connect: function(options, cb) {
|
||||
|
||||
// Following the lead of couchbase node module, this is using closures
|
||||
// and simple objects rather than instantiating a prototype connection.
|
||||
|
||||
function get(key, cb) {
|
||||
process.nextTick(function() {
|
||||
if (data[key] === undefined) {
|
||||
cb(null, null);
|
||||
} else {
|
||||
// take a copy so caller cannot modify our internal data structures.
|
||||
cb(null, {
|
||||
value: clone(data[key].value),
|
||||
casid: data[key].casid
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function set(key, value, cb) {
|
||||
value = clone(value);
|
||||
process.nextTick(function() {
|
||||
if (data[key] === undefined) {
|
||||
data[key] = {
|
||||
value: value,
|
||||
casid: 1
|
||||
};
|
||||
} else {
|
||||
data[key].value = value;
|
||||
data[key].casid++;
|
||||
}
|
||||
cb(null);
|
||||
});
|
||||
}
|
||||
|
||||
function cas(key, value, casid, cb) {
|
||||
value = clone(value);
|
||||
process.nextTick(function() {
|
||||
if (data[key] === undefined) {
|
||||
if (casid !== null) return cb(kvstore.ERROR_CAS_MISMATCH);
|
||||
data[key] = {
|
||||
value: value,
|
||||
casid: 1
|
||||
};
|
||||
} else {
|
||||
if (data[key].casid !== casid) return cb(kvstore.ERROR_CAS_MISMATCH);
|
||||
data[key].value = value;
|
||||
data[key].casid++;
|
||||
}
|
||||
cb(null);
|
||||
});
|
||||
}
|
||||
|
||||
function del(key, cb) {
|
||||
process.nextTick(function() {
|
||||
delete data[key];
|
||||
cb(null);
|
||||
});
|
||||
}
|
||||
|
||||
// use process.nextTick to make our api behave asynchronously
|
||||
process.nextTick(function() {
|
||||
cb(null, {get: get, set: set, cas: cas, delete: del});
|
||||
});
|
||||
}
|
||||
|
||||
};
|
|
@ -1,156 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
/**
|
||||
* KVStore implementation backed by mysql.
|
||||
*
|
||||
*/
|
||||
|
||||
const mysql = require('../mysql/wrapper.js');
|
||||
const config = require('../config');
|
||||
const Hapi = require('hapi');
|
||||
const kvstore = require('../kvstore');
|
||||
|
||||
var client;
|
||||
|
||||
const schema =
|
||||
"CREATE TABLE IF NOT EXISTS kvstore (" +
|
||||
"kv_key VARCHAR(255) NOT NULL UNIQUE, " +
|
||||
"kv_value MEDIUMTEXT NOT NULL," +
|
||||
"kv_casid INTEGER NOT NULL" +
|
||||
") ENGINE=InnoDB;"
|
||||
;
|
||||
|
||||
module.exports = {
|
||||
|
||||
connect: function(cfg, connectCB) {
|
||||
if (client) return connectCB(null, {get: get, set: set, cas: cas, delete: del});
|
||||
|
||||
cfg = Hapi.utils.merge(cfg, config.get('mysql'));
|
||||
|
||||
var options = {};
|
||||
|
||||
// Get the relevant client options from the mysql config
|
||||
['host', 'port', 'user', 'password', 'database'].forEach(function(param) {
|
||||
options[param] = cfg[param];
|
||||
});
|
||||
|
||||
// Helper function for connecting to MySQL.
|
||||
//
|
||||
// XXX TODO: connection pooling
|
||||
function connect(cb) {
|
||||
client = mysql.createClient(options);
|
||||
|
||||
client.on('error', function(err) {
|
||||
if (cb) {
|
||||
cb(err);
|
||||
cb = null;
|
||||
}
|
||||
});
|
||||
|
||||
return cb(null);
|
||||
}
|
||||
|
||||
function createSchema(cb) {
|
||||
var database = options.database;
|
||||
// don't specify the database yet -- it shouldn't exist
|
||||
delete options.database;
|
||||
var createClient = mysql.createClient(options);
|
||||
|
||||
createClient.query("CREATE DATABASE IF NOT EXISTS " + database, function(err) {
|
||||
if (err) return cb(err);
|
||||
createClient.useDatabase(database, function(err) {
|
||||
if (err) return cb(err);
|
||||
createClient.query(schema, function(err) {
|
||||
if (err) return cb(err);
|
||||
// reset the database name
|
||||
options.database = database;
|
||||
connect(cb);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function get(key, cb) {
|
||||
var query = "SELECT kv_value, kv_casid FROM kvstore WHERE kv_key = ?";
|
||||
client.query(query, [key], function(err, results) {
|
||||
if (err) return cb(err);
|
||||
if (!results.length) return cb(null, null);
|
||||
|
||||
var value, error = null;
|
||||
|
||||
try {
|
||||
value = JSON.parse(results[0].kv_value);
|
||||
} catch(e) {
|
||||
error = e;
|
||||
}
|
||||
|
||||
return cb(error, {
|
||||
value: value,
|
||||
casid: results[0].kv_casid
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function set(key, value, cb) {
|
||||
var query = "INSERT INTO kvstore (kv_key, kv_value, kv_casid)" +
|
||||
" VALUES (?, ?, 0)" +
|
||||
" ON DUPLICATE KEY UPDATE" +
|
||||
" kv_value=VALUES(kv_value), kv_casid = kv_casid + 1";
|
||||
client.query(query, [key, JSON.stringify(value)], function(err) {
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
function cas(key, value, casid, cb) {
|
||||
var query;
|
||||
var args = [JSON.stringify(value), key];
|
||||
if (casid === null) {
|
||||
query = "INSERT INTO kvstore (kv_value, kv_key, kv_casid)" +
|
||||
" VALUES (?, ?, 0)";
|
||||
} else {
|
||||
query = "UPDATE kvstore SET kv_value=?, kv_casid=kv_casid+1" +
|
||||
" WHERE kv_key = ? and kv_casid = ?";
|
||||
args.push(casid);
|
||||
}
|
||||
client.query(query, args, function(err, result) {
|
||||
// XXX TODO: check for a constraint violation if casid == null.
|
||||
if (casid !== null && result.affectedRows === 0) err = kvstore.ERROR_CAS_MISMATCH;
|
||||
if (err) console.log(err);
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
function del(key, cb) {
|
||||
var query = "DELETE FROM kvstore WHERE kv_key=?";
|
||||
client.query(query, [key], function(err) {
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
function returnClientCallback (err) {
|
||||
connectCB(err, {get: get, set: set, cas: cas, delete: del});
|
||||
}
|
||||
|
||||
if (cfg.create_schema) {
|
||||
createSchema(returnClientCallback);
|
||||
} else {
|
||||
connect(returnClientCallback);
|
||||
}
|
||||
},
|
||||
|
||||
close: function(cb) {
|
||||
client.end(function(err) {
|
||||
client = undefined;
|
||||
if (err) console.error(err);
|
||||
if (cb) cb(err);
|
||||
});
|
||||
},
|
||||
|
||||
closeAndRemove: function(cb) {
|
||||
client.query("DROP DATABASE " + "test", function() {
|
||||
module.exports.close(cb);
|
||||
});
|
||||
}
|
||||
};
|
|
@ -1,161 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
/* This abstraction wraps the mysql driver and provides application level
|
||||
* queueing, as well as query timing and reconnect upon an apparently "stalled"
|
||||
* driver
|
||||
*/
|
||||
|
||||
const mysql = require('mysql');
|
||||
const config = require('../config.js');
|
||||
|
||||
exports.createClient = function(options) {
|
||||
// the application level query queue
|
||||
var queryQueue = [];
|
||||
// The slowQueryTimer is !null when a query is running, and holds
|
||||
// the result from setTimeout. This variable is both a means to
|
||||
// check if a query is running (only one runs at a time), and as
|
||||
// the timeout handle.
|
||||
var slowQueryTimer = null;
|
||||
// how many consecutive failures have we seen when running queries?
|
||||
var consecutiveFailures = 0;
|
||||
// a testing feature. By calling `client.stall` you can
|
||||
// cause responses to be dropped which will trigger slow query detection
|
||||
var stalled = false;
|
||||
|
||||
var client = {
|
||||
stall: function(stalledState) {
|
||||
stalled = stalledState;
|
||||
},
|
||||
realClient: null,
|
||||
_resetConnection: function() {
|
||||
if (this.realClient) this.realClient.destroy();
|
||||
this.realClient = mysql.createClient(options);
|
||||
this.realClient.on('error', function(e) {
|
||||
console.log("database connection down: " + e.toString());
|
||||
});
|
||||
},
|
||||
ping: function(client_cb) {
|
||||
// ping queries are added to the front of the pending work queue. they are
|
||||
// a priority, as they are used by load balancers that want to know the health
|
||||
// of the system.
|
||||
queryQueue.unshift({
|
||||
ping: true,
|
||||
cb: client_cb
|
||||
});
|
||||
this._runNextQuery();
|
||||
},
|
||||
_runNextQuery: function() {
|
||||
var self = this;
|
||||
|
||||
if (slowQueryTimer !== null || !queryQueue.length) return;
|
||||
|
||||
var work = queryQueue.shift();
|
||||
|
||||
function invokeCallback(cb, err, rez) {
|
||||
if (cb) {
|
||||
process.nextTick(function() {
|
||||
try {
|
||||
cb(err, rez);
|
||||
} catch(e) {
|
||||
console.error('database query callback failed: ' + e.toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
slowQueryTimer = setTimeout(function() {
|
||||
if (++consecutiveFailures > config.get('mysql.max_reconnect_attempts')) {
|
||||
// if we can't run the query multiple times in a row, we'll fail all outstanding
|
||||
// queries, and reinitialize the connection, so that the process stays up and
|
||||
// retries mysql connection the next time a request which requires db interaction
|
||||
// comes in.
|
||||
queryQueue.unshift(work);
|
||||
console.log("cannot reconnect to mysql! " + queryQueue.length + " outstanding queries #fail.");
|
||||
queryQueue.forEach(function(work) {
|
||||
invokeCallback(work.cb, "database connection unavailable");
|
||||
});
|
||||
queryQueue = [];
|
||||
self._resetConnection();
|
||||
slowQueryTimer = null;
|
||||
} else {
|
||||
console.log("Query taking more than " + config.get('mysql.max_query_time_ms') + "ms! reconnecting to mysql");
|
||||
// we'll fail the long running query, because we cannot
|
||||
// meaningfully know whether or not it completed in the case where
|
||||
// the driver is unresponsive.
|
||||
invokeCallback(work.cb, "database connection unavailable");
|
||||
self._resetConnection();
|
||||
slowQueryTimer = null;
|
||||
self._runNextQuery();
|
||||
}
|
||||
}, config.get('mysql.max_query_time_ms'));
|
||||
|
||||
if (work.ping) {
|
||||
this.realClient.ping(function(err) {
|
||||
if (stalled) {
|
||||
return invokeCallback(work.cb, "database is intentionally stalled");
|
||||
}
|
||||
|
||||
clearTimeout(slowQueryTimer);
|
||||
slowQueryTimer = null;
|
||||
consecutiveFailures = 0;
|
||||
|
||||
invokeCallback(work.cb, err);
|
||||
|
||||
self._runNextQuery();
|
||||
});
|
||||
} else {
|
||||
this.realClient.query(work.query, work.args, function(err, r) {
|
||||
// if we want to simulate a "stalled" mysql connection, we simply
|
||||
// ignore the results from a query.
|
||||
if (stalled) return;
|
||||
|
||||
clearTimeout(slowQueryTimer);
|
||||
slowQueryTimer = null;
|
||||
consecutiveFailures = 0;
|
||||
|
||||
// report query time for all queries
|
||||
//var reqTime = new Date() - work.startTime;
|
||||
//console.log('query_time', reqTime);
|
||||
|
||||
// report failed queries
|
||||
if (err) console.error('failed_query', work.query, err);
|
||||
|
||||
invokeCallback(work.cb, err, r);
|
||||
self._runNextQuery();
|
||||
});
|
||||
}
|
||||
},
|
||||
query: function() {
|
||||
var client_cb;
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
var query = args.shift();
|
||||
if (args.length && typeof args[args.length - 1] === 'function') {
|
||||
client_cb = args.pop();
|
||||
}
|
||||
args = args.length ? args[0] : [];
|
||||
queryQueue.push({
|
||||
query: query,
|
||||
args: args,
|
||||
cb: client_cb,
|
||||
// record the time .query was called by the application for
|
||||
// true end to end query timing in statsd
|
||||
startTime: new Date()
|
||||
});
|
||||
this._runNextQuery();
|
||||
},
|
||||
end: function(cb) {
|
||||
this.realClient.end(cb);
|
||||
},
|
||||
on: function(ev, cb) {
|
||||
this.realClient.on(ev, cb);
|
||||
},
|
||||
useDatabase: function(db, cb) {
|
||||
this.realClient.useDatabase(db, cb);
|
||||
}
|
||||
};
|
||||
client._resetConnection();
|
||||
client.database = client.realClient.database;
|
||||
return client;
|
||||
};
|
|
@ -24,16 +24,13 @@
|
|||
"convict": "0.1.0",
|
||||
"hoek": "0.8.5",
|
||||
"uuid": "1.4.1",
|
||||
"async": "0.2.8"
|
||||
"async": "0.2.8",
|
||||
"kvstore": "git://github.com/mozilla/node-kvstore.git#3533c23f095d"
|
||||
},
|
||||
"devDependencies": {
|
||||
"awsbox": "0.4.x",
|
||||
"mocha": "1.9.x",
|
||||
"request": "2.21.x",
|
||||
"jshint": "0.9.1"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"memcached": "0.2.2",
|
||||
"mysql": "0.9.5"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,12 +2,6 @@
|
|||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
const Hapi = require('hapi');
|
||||
|
||||
const config = require('../lib/config');
|
||||
const kvstore = require('../lib/kvstore');
|
||||
|
||||
|
||||
exports.routes = [
|
||||
{
|
||||
method: 'GET',
|
||||
|
@ -18,19 +12,6 @@ exports.routes = [
|
|||
}
|
||||
];
|
||||
|
||||
// heartbeat
|
||||
function heartbeat() {
|
||||
var response = new Hapi.response.Text();
|
||||
var handler = this;
|
||||
|
||||
// check for database connection
|
||||
kvstore.connect(config.get('kvstore'), function(err) {
|
||||
if (err) {
|
||||
response.message(err.toString(), 'text/plain');
|
||||
return handler.reply(response);
|
||||
}
|
||||
|
||||
response.message(err ? String(err) : 'ok', 'text/plain');
|
||||
handler.reply(response);
|
||||
});
|
||||
function heartbeat(request) {
|
||||
request.reply('ok');
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче