From f53559f1c327c366d86c1c591cfe4c02b1bf6a75 Mon Sep 17 00:00:00 2001 From: Tilman Kamp <5991088+tilmankamp@users.noreply.github.com> Date: Thu, 10 Jan 2019 14:19:21 +0100 Subject: [PATCH] WIP --- bin/db-init.sh | 3 - sql/create.sql | 141 ------------------------ src/models/Job-model.js | 32 +++--- src/models/Resource-model.js | 29 +++-- src/models/User-model.js | 5 +- src/models/index.js | 12 ++- src/pitRunner.js | 6 +- src/reservations.js | 6 -- src/routes/groups.js | 69 +++++++----- src/routes/jobs.js | 22 ++-- src/routes/nodes.js | 140 ++++++++++-------------- src/routes/users.js | 8 +- src/scheduler.js | 8 +- src/service.js | 15 +-- src/store.js | 204 ----------------------------------- 15 files changed, 170 insertions(+), 530 deletions(-) delete mode 100644 sql/create.sql delete mode 100644 src/store.js diff --git a/bin/db-init.sh b/bin/db-init.sh index 59b5c84..38f8e01 100755 --- a/bin/db-init.sh +++ b/bin/db-init.sh @@ -6,7 +6,4 @@ fi echo "Creating snakepit DB..." createdb -U postgres snakepit -echo "Done." -echo "Preparing tables..." -psql -q -U postgres -d snakepit -f /code/sql/create.sql echo "Done." \ No newline at end of file diff --git a/sql/create.sql b/sql/create.sql deleted file mode 100644 index afbe8a8..0000000 --- a/sql/create.sql +++ /dev/null @@ -1,141 +0,0 @@ - -CREATE TABLE users ( - username VARCHAR (10) NOT NULL PRIMARY KEY, - password VARCHAR (1024) NOT NULL, - email VARCHAR (320) NOT NULL, - admin BOOLEAN NOT NULL -); - -CREATE TABLE user_groups ( - username VARCHAR (10) NOT NULL - CONSTRAINT user_group__ref__users - REFERENCES users - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - groupname VARCHAR (10) NOT NULL, - UNIQUE (username, groupname) -); - -CREATE TABLE nodes ( - nodename VARCHAR (10) NOT NULL PRIMARY KEY, - endpoint VARCHAR (1024) NOT NULL, - nodestate INTEGER NOT NULL -); - -CREATE TABLE resources ( - resnumber SERIAL NOT NULL PRIMARY KEY, - nodename VARCHAR (10) NOT NULL - CONSTRAINT resource__ref__nodes - REFERENCES nodes - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - restype VARCHAR (10) NOT NULL, - resindex INTEGER NOT NULL, - resname VARCHAR (255) NOT NULL, - UNIQUE (nodename, restype, resindex) -); - -CREATE TABLE resource_groups ( - resnumber INTEGER NOT NULL - CONSTRAINT resource_group__ref__resources - REFERENCES resources - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - groupname VARCHAR (10) NOT NULL, - UNIQUE (resnumber, groupname) -); - -CREATE TABLE aliases ( - alias VARCHAR (10) NOT NULL PRIMARY KEY, - name VARCHAR (255) NOT NULL -); - -CREATE SEQUENCE pits START 1; - -CREATE TABLE jobs ( - jobnumber SERIAL NOT NULL PRIMARY KEY, - jobdesc VARCHAR (20) NOT NULL, - username VARCHAR (10) NOT NULL, - jobstate INTEGER NOT NULL, - resrequest VARCHAR (1024) NOT NULL, - provisioning VARCHAR (1024) NOT NULL -); - -CREATE TABLE job_groups ( - jobnumber INTEGER NOT NULL - CONSTRAINT job_group__ref__jobs - REFERENCES jobs - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - groupname VARCHAR (10) NOT NULL, - UNIQUE (jobnumber, groupname) -); - -CREATE TABLE job_states ( - jobnumber INTEGER NOT NULL - CONSTRAINT job_state__ref__jobs - REFERENCES jobs - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - jobstate INTEGER NOT NULL, - since DATE NOT NULL, - reason VARCHAR (1024), - UNIQUE (jobnumber, jobstate) -); - -CREATE TABLE process_groups ( - groupnumber SERIAL NOT NULL PRIMARY KEY, - jobnumber INTEGER NOT NULL - CONSTRAINT process_group__ref__jobs - REFERENCES jobs - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - groupindex INTEGER NOT NULL, - UNIQUE (jobnumber, groupindex) -); - -CREATE TABLE processes ( - procnumber SERIAL NOT NULL PRIMARY KEY, - groupnumber INTEGER NOT NULL - CONSTRAINT process__ref__process_groups - REFERENCES process_groups - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - procindex INTEGER NOT NULL, - exitstatus INTEGER, - result VARCHAR (20), - UNIQUE (groupnumber, procindex) -); - -CREATE TABLE allocations ( - allocnumber SERIAL NOT NULL PRIMARY KEY, - procnumber INTEGER NOT NULL - CONSTRAINT allocation__ref__processes - REFERENCES processes - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - resnumber INTEGER NOT NULL, - UNIQUE (procnumber, resnumber) -); - -CREATE TABLE utilizations ( - allocnumber INTEGER NOT NULL - CONSTRAINT utilization__ref__allocations - REFERENCES allocations - ON UPDATE CASCADE - ON DELETE CASCADE - DEFERRABLE, - utiltype VARCHAR (10) NOT NULL, - aggregated INTEGER NOT NULL, - samples INTEGER NOT NULL, - current INTEGER NOT NULL, - UNIQUE (allocnumber, utiltype) -); diff --git a/src/models/Job-model.js b/src/models/Job-model.js index bd2bd71..1ccde69 100644 --- a/src/models/Job-model.js +++ b/src/models/Job-model.js @@ -1,7 +1,8 @@ const Sequelize = require('sequelize') const sequelize = require('./db.js') -const Group = require('./Pit-model.js') +const Pit = require('./Pit-model.js') const Group = require('./Group-model.js') +const User = require('./User-model.js') const State = require('./State-model.js') const ProcessGroup = require('./ProcessGroup-model.js') @@ -18,25 +19,22 @@ Job.hasMany(State) Job.hasMany(ProcessGroup) -Job.belongsToMany(Group, { through: 'JobGroup' }) -Group.belongsToMany(Job, { through: 'JobGroup' }) +var JobGroup = sequelize.define('jobgroup') +Job.belongsToMany(Group, { through: JobGroup }) +Group.belongsToMany(Job, { through: JobGroup }) -User.prototype.canAccessJob = async (resource) => { - // TODO: Implement DB based decision - /* - if (this.admin || this.id == job.user) { +User.prototype.canAccessJob = async (job) => { + if (this.admin || await job.hasUser(this)) { return true } - if (job.groups && this.groups) { - for (let group of this.groups) { - if (job.groups.includes(group)) { - return true - } - } - } - return false - */ - return true + return (await Job.count({ + where: { id: job.id }, + include: [ + { model: JobGroup }, + { model: User.UserGroup }, + { model: User, where: { id: this.id } } + ] + }) > 0) } Job.prototype.getJobDir = () => Pit.getPitDir(this.id) diff --git a/src/models/Resource-model.js b/src/models/Resource-model.js index 9b82d8c..111df34 100644 --- a/src/models/Resource-model.js +++ b/src/models/Resource-model.js @@ -1,6 +1,7 @@ const Sequelize = require('sequelize') const sequelize = require('./db.js') const Group = require('./Group-model.js') +const User = require('./User-model.js') var Resource = sequelize.define('resource', { id: { type: Sequelize.INTEGER, autoIncrement: true, primaryKey: true }, @@ -9,24 +10,22 @@ var Resource = sequelize.define('resource', { name: { type: Sequelize.STRING, allowNull: false } }) -User.belongsToMany(Group, { through: 'ResourceGroup' }) -Group.belongsToMany(User, { through: 'ResourceGroup' }) +var ResourceGroup = sequelize.define('resourcegroup') +Resource.belongsToMany(Group, { through: ResourceGroup }) +Group.belongsToMany(Resource, { through: ResourceGroup }) User.prototype.canAccessResource = async (resource) => { - // TODO: Implement DB based decision - /* - if (resource.groups) { - if (this.groups) { - for (let group of this.groups) { - if (resource.groups.includes(group)) { - return true - } - } - } - return false + if (await resource.countGroups() == 0) { + return true } - */ - return true + return (await Resource.count({ + where: { id: resource.id }, + include: [ + { model: ResourceGroup }, + { model: User.UserGroup }, + { model: User, where: { id: this.id } } + ] + }) > 0) } module.exports = Resource diff --git a/src/models/User-model.js b/src/models/User-model.js index 299d22b..54b090f 100644 --- a/src/models/User-model.js +++ b/src/models/User-model.js @@ -12,8 +12,9 @@ var User = sequelize.define('user', { email: { type: Sequelize.STRING, allowNull: true } }) -User.belongsToMany(Group, { through: 'UserGroup' }) -Group.belongsToMany(User, { through: 'UserGroup' }) +var UserGroup = User.UserGroup = sequelize.define('usergroup') +User.belongsToMany(Group, { through: UserGroup }) +Group.belongsToMany(User, { through: UserGroup }) const userPrefix = '/data/home/' diff --git a/src/models/index.js b/src/models/index.js index 8ae9795..fdbb68d 100644 --- a/src/models/index.js +++ b/src/models/index.js @@ -1,8 +1,12 @@ -const glob = require( 'glob' ) -const path = require( 'path' ) +const glob = require('glob') +const path = require('path') + +var exports = module.exports = { all: [] } glob.sync('./*-model.js').forEach(moduleName => { let modelName = moduleName.substr(0, moduleName.indexOf('-')) - module[modelName] = require(path.resolve(moduleName)) + let model = require(path.resolve(moduleName)) + exports[modelName] = model + exports.all.push(model) }) -module.sequelize = require('./db.js') \ No newline at end of file +exports.sequelize = require('./db.js') \ No newline at end of file diff --git a/src/pitRunner.js b/src/pitRunner.js index 179052a..d86c743 100644 --- a/src/pitRunner.js +++ b/src/pitRunner.js @@ -5,12 +5,12 @@ const axios = require('axios') const assign = require('assign-deep') const Parallel = require('async-parallel') -const log = require('./logger.js') -const store = require('./store.js') -const config = require('./config.js') +const log = require('./utils/logger.js') const clusterEvents = require('./utils/clusterEvents.js') const { to, getScript, envToScript } = require('./utils/utils.js') +const config = require('./config.js') + const lxdStatus = { created: 100, started: 101, diff --git a/src/reservations.js b/src/reservations.js index e723db4..4d2b8e6 100644 --- a/src/reservations.js +++ b/src/reservations.js @@ -1,13 +1,7 @@ const { MultiRange } = require('multi-integer-range') -const store = require('./store.js') -const groupsModule = require('./groups.js') -const nodesModule = require('./nodes.js') - var exports = module.exports = {} -var db = store.root - function _isReserved(clusterReservation, nodeId, resourceId) { return [].concat.apply([], clusterReservation).reduce( (result, reservation) => diff --git a/src/routes/groups.js b/src/routes/groups.js index 258feef..191f753 100644 --- a/src/routes/groups.js +++ b/src/routes/groups.js @@ -8,33 +8,46 @@ const router = module.exports = new Router() router.use(ensureSignedIn) -exports.initApp = function(app) { - app.get('/groups', function(req, res) { - if (req.user.admin) { - let groups = {} - for (let node of Object.keys(db.nodes).map(k => db.nodes[k])) { - _getGroups(groups, node.resources || {}) - } - _getGroups(groups, db.users) - _getGroups(groups, db.jobs) - res.status(200).json(Object.keys(groups)) - } else { - res.status(403).send() - } - }) +router.get('/', async (req, res) => { + res.send((await Group.findAll()).map(group => group.id)) +}) - app.post('/groups/:group/fs', function(req, res) { - let group = req.params.group - if (req.user.groups.includes(group)) { - let chunks = [] - req.on('data', chunk => chunks.push(chunk)); - req.on('end', () => fslib.serve( - fslib.real(exports.getGroupDir(group)), - Buffer.concat(chunks), - result => res.send(result), config.debugJobFS) - ) - } else { - res.status(403).send() - } - }) +router.use(ensureAdmin) + +router.put('/:id', async (req, res) => { + if (req.body && req.body.title) { + await Group.create({ + id: req.params.id, + title: req.body.title + }) + res.send() + } else { + res.status(400).send() + } +}) + +function targetGroup (req, res, next) { + req.targetGroup = Group.findById(req.params.id) + req.targetGroup ? next() : res.status(404).send() } + +router.use(targetGroup) + +router.delete('/:id', async (req, res) => { + await req.targetGroup.destroy() + res.send() +}) + +router.post('/:id/fs', async (req, res) => { + if (await req.user.hasGroup(req.targetGroup)) { + let chunks = [] + req.on('data', chunk => chunks.push(chunk)); + req.on('end', () => fslib.serve( + fslib.real(req.targetGroup.getGroupDir()), + Buffer.concat(chunks), + result => res.send(result), config.debugJobFS) + ) + } else { + res.status(403).send() + } +}) diff --git a/src/routes/jobs.js b/src/routes/jobs.js index be6d385..2a99197 100644 --- a/src/routes/jobs.js +++ b/src/routes/jobs.js @@ -202,17 +202,21 @@ router.post('/', async (req, res) => { } }) -router.put('/:id/groups/:group', async (req, res) => { - _addGroup(db.jobs[req.params.job], req, res, entity => { - _emitEntityChange('job', entity) - }) +function targetGroup (req, res, next) { + req.targetGroup = Group.findById(req.params.group) + req.targetGroup ? next() : res.status(404).send() +} + +router.put('/:id/groups/:group', targetGroup, async (req, res) => { + await req.targetJob.addGroup(req.targetGroup) + res.send() + clusterEvents.emit('moreJobRights', req.targetUser.id) }) -router.delete('/:id/groups/:group', async (req, res) => { - _removeGroup(db.jobs[req.params.job], req, res, entity => { - _emitEntityChange('job', entity) - _emitRestricted() - }) +router.delete('/:id/groups/:group', targetGroup, async (req, res) => { + await req.targetJob.removeGroup(req.targetGroup) + res.send() + clusterEvents.emit('lessJobRights', req.targetUser.id) }) router.get('/', async (req, res) => { diff --git a/src/routes/nodes.js b/src/routes/nodes.js index 7ed056d..1a724ff 100644 --- a/src/routes/nodes.js +++ b/src/routes/nodes.js @@ -1,26 +1,9 @@ const { getAlias } = require('../models/Alias-model.js') +const Group = require('../models/Group-model.js') +const Resource = require('../models/Resource-model.js') +const { ensureSignedIn, ensureAdmin } = require('./users.js') -router.put('/:id', async (req, res) => { - if (req.user.admin) { - let id = req.params.id - let node = req.body - let dbnode = db.nodes[id] - if (dbnode) { - res.status(400).send({ message: 'Node with same id already registered' }) - } else if (node.endpoint && node.password) { - addNode(id, node.endpoint, node.password).then(newNode => { - setNodeState(newNode, nodeStates.ONLINE) - res.status(200).send() - }).catch(err => { - res.status(400).send({ message: 'Problem adding node:\n' + err }) - }) - } else { - res.status(400).send() - } - } else { - res.status(403).send() - } -}) +router.use(ensureSignedIn) router.get('/', async (req, res) => { res.status(200).send(Object.keys(db.nodes)) @@ -32,7 +15,7 @@ router.get('/:id', async (req, res) => { res.status(200).json({ id: node.id, endpoint: node.endpoint, - state: node.state, + online: node.online, since: node.since, resources: Object.keys(node.resources).map(resourceId => { let dbResource = node.resources[resourceId] @@ -56,76 +39,67 @@ router.get('/:id', async (req, res) => { } }) +router.use(ensureAdmin) + +router.put('/:id', async (req, res) => { + let id = req.params.id + let node = req.body + let dbnode = db.nodes[id] + if (dbnode) { + res.status(400).send({ message: 'Node with same id already registered' }) + } else if (node.endpoint && node.password) { + addNode(id, node.endpoint, node.password).then(newNode => { + setNodeState(newNode, nodeStates.ONLINE) + res.status(200).send() + }).catch(err => { + res.status(400).send({ message: 'Problem adding node:\n' + err }) + }) + } else { + res.status(400).send() + } +}) + router.delete('/:id', async (req, res) => { - if (req.user.admin) { - let node = db.nodes[req.params.id] - if (node) { - removeNode(node) - .then(() => res.status(404).send()) - .catch(err => res.status(500).send({ message: 'Problem removing node:\n' + err })) - } else { - res.status(404).send() - } + let node = db.nodes[req.params.id] + if (node) { + removeNode(node) + .then(() => res.status(404).send()) + .catch(err => res.status(500).send({ message: 'Problem removing node:\n' + err })) } else { - res.status(403).send() + res.status(404).send() } }) -router.put('/:id/groups/:group', (req, res) => { - if (req.user.admin) { - let node = db.nodes[req.params.node] - if (node) { - let group = req.params.group - for (let resource of Object.keys(node.resources).map(k => node.resources[k])) { - if (resource.groups) { - if (!resource.groups.includes(group)) { - resource.groups.push(group) - _emitEntityChange('resource', resource) - } - } else { - resource.groups = [ group ] - } - } - res.status(200).send() - } else { - res.status(404).send() - } - } else { - res.status(403).send() - } +function targetGroup (req, res, next) { + req.targetGroup = Group.findById(req.params.group) + req.targetGroup ? next() : res.status(404).send() +} + +router.put('/:id/groups/:group', targetGroup, async (req, res) => { + await req.targetNode.addGroup(req.targetGroup) + res.send() + clusterEvents.emit('changedNodeRights', req.targetNode.id) }) -router.delete('/:id/groups/:group', (req, res) => { - if (req.user.admin) { - let node = db.nodes[req.params.node] - if (node) { - let group = req.params.group - for (let resource of Object.keys(node.resources).map(k => node.resources[k])) { - let index = resource.groups ? resource.groups.indexOf(group) : -1 - if (index >= 0) { - _removeGroupByIndex(resource, index) - _emitEntityChange('resource', resource) - } - } - res.status(200).send() - } else { - res.status(404).send() - } - } else { - res.status(403).send() - } - _emitRestricted() +router.delete('/:id/groups/:group', targetGroup, async (req, res) => { + await req.targetNode.removeGroup(req.targetGroup) + res.send() + clusterEvents.emit('changedNodeRights', req.targetNode.id) }) -router.put('/:id/resources/:resource/groups/:group', async (req, res) => { - _addGroup(_getResource(req), req, res, entity => { - _emitEntityChange('resource', entity) - }) +function targetResource (req, res, next) { + req.targetResource = Resource.findOne({ where: { node: req.targetNode, index: req.params.resource } }) + req.targetResource ? next() : res.status(404).send() +} + +router.put('/:id/resources/:resource/groups/:group', targetResource, targetGroup, async (req, res) => { + await req.targetResource.addGroup(req.targetGroup) + res.send() + clusterEvents.emit('changedNodeRights', req.targetNode.id) }) -router.delete('/:id/resources/:resource/groups/:group', async (req, res) => { - _removeGroup(_getResource(req), req, res, entity => { - _emitEntityChange('resource', entity) - _emitRestricted() - }) +router.delete('/:id/resources/:resource/groups/:group', targetResource, targetGroup, async (req, res) => { + await req.targetResource.removeGroup(req.targetGroup) + res.send() + clusterEvents.emit('changedNodeRights', req.targetNode.id) }) diff --git a/src/routes/users.js b/src/routes/users.js index a051be7..b6ef965 100644 --- a/src/routes/users.js +++ b/src/routes/users.js @@ -4,6 +4,7 @@ const Router = require('express-promise-router') const config = require('../config.js') const fslib = require('../utils/httpfs.js') +const clusterEvents = require('../utils/clusterEvents.js') const User = require('../models/User-model.js') const Group = require('../models/Group-model.js') @@ -172,15 +173,16 @@ function targetGroup (req, res, next) { req.targetGroup ? next() : res.status(404).send() } -router.put('/:id/groups/:group', targetGroup, async (req, res) => { +router.put('/:id/groups/:group', router.ensureAdmin, targetGroup, async (req, res) => { await req.targetUser.addGroup(req.targetGroup) res.send() + clusterEvents.emit('moreUserRights', req.targetUser.id) }) -router.delete('/:id/groups/:group', targetGroup, async (req, res) => { +router.delete('/:id/groups/:group', router.ensureAdmin, targetGroup, async (req, res) => { await req.targetUser.removeGroup(req.targetGroup) - clusterEvents.emit('userGotRestricted', req.targetUser.id) res.send() + clusterEvents.emit('lessUserRights', req.targetUser.id) }) router.post('/:id/fs', async (req, res) => { diff --git a/src/scheduler.js b/src/scheduler.js index 890ccea..361848d 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -1,15 +1,11 @@ const fs = require('fs-extra') const path = require('path') -const log = require('../logger.js') -const store = require('./store.js') +const log = require('./utils/logger.js') const utils = require('./utils/utils.js') - const config = require('./config.js') -const nodesModule = require('../nodes.js') -const groupsModule = require('./routes/groups.js') -const parseClusterRequest = require('./clusterParser.js').parse const reservations = require('./reservations.js') +const parseClusterRequest = require('./clusterParser.js').parse var exports = module.exports = {} diff --git a/src/service.js b/src/service.js index 4b9a485..7262c9a 100644 --- a/src/service.js +++ b/src/service.js @@ -1,13 +1,16 @@ const cluster = require('cluster') const cpus = require('os').cpus().length -const log = require('./logger.js') +const log = require('./utils/logger.js') const config = require('./config.js') -const modules = 'users groups nodes jobs aliases' - .split(' ').map(name => require('./' + name + '.js')) +const models = require('./models') +const scheduler = require('./scheduler.js') +const pitRunner = require('./pitRunner.js') if (cluster.isMaster) { - modules.forEach(module => (module.initDb || Function)()) - modules.forEach(module => (module.tick || Function)()) + models.sequelize.sync() + models.all.forEach(model => (model.startup || Function)()) + pitRunner.tick() + scheduler.tick() for (let i = 0; i < cpus; i++) { cluster.fork() } @@ -32,7 +35,7 @@ if (cluster.isMaster) { skip: (req, res) => res.statusCode < 400 && !config.debugHttp })) - modules.forEach(module => (module.initApp || Function)(app)) + app.use(require('./routes')) app.use(function (err, req, res, next) { console.error(err.stack) diff --git a/src/store.js b/src/store.js deleted file mode 100644 index b25ca60..0000000 --- a/src/store.js +++ /dev/null @@ -1,204 +0,0 @@ -const fs = require('fs') -const cluster = require('cluster') - -var exports = module.exports = {} - -const PARENT_SYMBOL = Symbol('parent') -const NAME_SYMBOL = Symbol('name') -const DB_PATH = '/data/db.json' - -var rawRoot = (fs.existsSync(DB_PATH) && fs.statSync(DB_PATH).isFile()) ? JSON.parse(fs.readFileSync(DB_PATH).toString()) : {} -var storeLog = [] -var locks = {} -var callbackIdCounter = 0 -var callbacks = {} - -_parentify(rawRoot, null, '') - -function log(msg) { - var entity = cluster.worker ? ('Worker ' + cluster.worker.id) : 'Master' - console.log(entity + ': ' + msg) -} - -function _broadcast(msg, skip_worker) { - //log('sending message:' + JSON.stringify(msg)) - if (cluster.isMaster) { - storeLog.push(msg) - for(var wid in cluster.workers) { - var worker = cluster.workers[wid] - if (worker !== skip_worker) - worker.send(msg) - } - } else { - process.send(msg) - } -} - -function _getPath(obj) { - if(!obj) return '' - var parent = obj[PARENT_SYMBOL] - var name = obj[NAME_SYMBOL] - var path = _getPath(parent) - return path.length > 0 ? (path + '.' + name) : name -} - -function _getObject(path) { - var obj = rawRoot - path.split('.').filter(x => x).forEach(name => { obj = obj[name] }) - return obj -} - -function _parentify(obj, parent, name) { - obj[PARENT_SYMBOL] = parent - obj[NAME_SYMBOL] = name - for (var k in obj) { - if (obj.hasOwnProperty(k)) { - var v = obj[k] - if (typeof(v) === 'object' && v !== null) - _parentify(v, obj, k) - } - } -} - -var observer = { - get: function(target, name) { - value = target[name] - return (typeof(value) === 'object' && value !== null) ? new Proxy(value, observer) : value - }, - set: function(target, name, value) { - var path = _getPath(target) - var value_str = JSON.stringify(value) - if(typeof(value) === 'object' && value !== null) { - value = JSON.parse(value_str) - _parentify(value, target, name) - } - //log('Setting property "' + name + '" of object "' + path + '" to value "' + value_str + '"') - if (this != 'skip') { - _broadcast({ storeOperation: 'set', path: path, args: [name, value] }) - } - return Reflect.set(target, name, value) - }, - deleteProperty: function(target, name) { - var path = _getPath(target) - //log('Deleting property "' + name + '" of object "' + path + '"') - if (this != 'skip') { - _broadcast({ storeOperation: 'deleteProperty', path: path, args: [name] }) - } - return Reflect.deleteProperty(...arguments) - } -} - -function _send(recipient, msg) { - if (recipient) { - recipient.send(msg) - } else if (cluster.isMaster) { - _handle_message(msg, null) - } else { - process.send(msg) - } -} - -function _handle_message(msg, sender) { - //log('Got a message ' + JSON.stringify(msg)) - if (msg.storeOperation) { - observer[msg.storeOperation].apply('skip', [_getObject(msg.path)].concat(msg.args)) - if (cluster.isMaster) { - _broadcast(msg, sender) - } - } else if (msg.askLock) { - var waiting = locks[msg.askLock] - var entry = { sender: sender, id: msg.id } - if (waiting && waiting.length > 0) { - waiting.push(entry) - } else { - locks[msg.askLock] = [entry] - _send(sender, { gotLock: msg.askLock, id: msg.id }) - } - //log('asked for lock') - } else if (msg.gotLock) { - var callback = callbacks[msg.id] - delete callbacks[msg.id] - if (callback.sync) { - try { - callback.fun() - } finally { - _send(sender, { freeLock: msg.gotLock }) - } - } else { - callback.fun(function() { - _send(sender, { freeLock: msg.gotLock }) - }) - } - //log('got lock') - } else if (msg.freeLock) { - var waiting = locks[msg.freeLock] - if (waiting && waiting.length > 0) { - waiting.shift() - } - if (waiting && waiting.length > 0) { - _send(waiting[0].sender, { gotLock: msg.freeLock, id: waiting[0].id }) - } - //log('freed lock') - } -} - -function _lock(target, callback, sync) { - callbackIdCounter += 1 - callbacks[callbackIdCounter] = { fun: callback, sync: !!sync } - _send(null, { askLock: target, id: callbackIdCounter }) -} - -function _writeDb() { - if (storeLog.length > 0) { - storeLog = [] - fs.writeFile(DB_PATH, JSON.stringify(rawRoot, null, '\t'), function(err) { - if(err) - return console.error(err); - //log('Wrote db!') - }) - } -} - -function _tickOn() { - setTimeout(_tick, 1000) -} - -function _tick() { - //console.log('Tick...') - _writeDb() - _tickOn() -} - -if (cluster.isMaster) { - cluster.on('fork', worker => { - worker.on('message', msg => _handle_message(msg, worker)) - storeLog.forEach(msg => worker.send(msg)) - }) - cluster.on('exit', function(worker, code, signal) { - for (let lockName in locks) { - if (locks.hasOwnProperty(lockName)) { - let waiting = locks[lockName] - if (waiting && waiting.length > 0) { - let first = waiting[0] - locks[lockName] = waiting = waiting.filter(entry => entry.sender == worker) - if (waiting.length > 0 && first != waiting[0]) { - _send(waiting[0].sender, { gotLock: lockName, id: waiting[0].id }) - } - } - } - } - }) - _tickOn() -} else { - process.on('message', _handle_message) -} - -exports.root = new Proxy(rawRoot, observer) - -exports.lockAutoRelease = function(target, callback) { - _lock(target, callback, true) -} - -exports.lockAsyncRelease = function (target, callback) { - _lock(target, callback, false) -} \ No newline at end of file