diff --git a/.gitignore b/.gitignore index 191c2c2..9f6262e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ config - +src/clusterParser.js db.json # Logs diff --git a/package.json b/package.json index 636623d..4c1895e 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "description": "Machine learning job scheduler", "main": "app.js", "scripts": { - "start": "node src/service.js" + "start": "node src/service.js", + "postinstall": "node node_modules/pegjs/bin/pegjs src/clusterParser.pegjs" }, "repository": { "type": "git", @@ -33,11 +34,11 @@ "dependencies": { "bcrypt": "^1.0.3", "body-parser": "^1.18.2", - "combined-stream": "^1.0.6", "commander": "^2.14.1", "express": "^4.16.2", "jsonwebtoken": "^8.1.1", "morgan": "^1.9.0", + "pegjs": "^0.10.0", "process": "^0.11.10", "readable-stream": "^2.3.5", "request": "^2.83.0" diff --git a/src/aliases.js b/src/aliases.js index 0805ae7..e425080 100644 --- a/src/aliases.js +++ b/src/aliases.js @@ -17,10 +17,11 @@ exports.initApp = function(app) { app.put('/aliases/:id', function(req, res) { if (req.user.admin) { - if (req.body && req.body.model) { + console.log(req.body) + if (req.body && req.body.name) { db.aliases[req.params.id] = { id: req.params.id, - model: req.body.model + name: req.body.name } res.status(200).send() } else { diff --git a/src/clusterParser.pegjs b/src/clusterParser.pegjs new file mode 100644 index 0000000..ba2d136 --- /dev/null +++ b/src/clusterParser.pegjs @@ -0,0 +1,27 @@ +start + = cluster + +cluster + = left:processGroup "," right:cluster { return left.concat(right); } + / solo:processGroup { return [solo]; } + +processGroup + = left:integer ":" right:process { return { count: left, process: right }; } + / solo:process { return { count: 1, process: solo } } + +process + = "[" solo:resourceList "]" { return solo } + +resourceList + = left:resourceGroup "," right:resourceList { return left.concat(right); } + / solo:resourceGroup { return [solo]; } + +resourceGroup + = left:integer ":" right:resource { return { count: left, name: right }; } + / solo:resource { return { count: 1, name: solo } } + +resource + = chars:[a-zA-Z]+[a-zA-Z0-9]* { return chars.join(""); } + +integer + = digits:[0-9]+ { return parseInt(digits.join(""), 10); } diff --git a/src/jobs.js b/src/jobs.js index b929218..98a9882 100644 --- a/src/jobs.js +++ b/src/jobs.js @@ -1,5 +1,6 @@ const store = require('./store.js') -const node = require('./nodes.js') +const nodes = require('./nodes.js') +const parseClusterRequest = require('./clusterParser.js').parse var exports = module.exports = {} var db = store.root @@ -17,62 +18,128 @@ exports.initDb = function() { } function _getRunningJobs() { - var jobs = [] - Object.keys(db.nodes).forEach(id => { - let node = db.nodes[id] - if (node.state >= nodes.STATE_ACTIVE) { - let gpuCounter = numGpus - gpuReservation = [] - for(let gpu = 0; gpu < node.gpus.length; gpu++) { - if (node.gpus[gpu].job == 0 || state == 0) { - gpuReservation.push(gpu) - gpuCounter-- - if (gpuCounter == 0) { - reservation.push({ node: id, gpuReservation: gpuReservation }) - nodeCounter-- - if (nodeCounter == 0) return reservation - gpuCounter = numGpus - gpuReservation = [] - } + var jobs = {} + Object.keys(db.nodes).forEach(nodeId => { + let node = db.nodes[nodeId] + if (node.state >= nodes.STATE_ONLINE) { + Object.keys(node.resources).forEach(resourceType => { + let resource = node.resources[resourceType] + if (resource.job) { + jobs[resource.job] = db.jobs[resource.job] } - } - gpuCounter = numGpus + }) } }) return jobs } -function _reserve(numNodes, numGpus, state) { - let reservation = [] - let nodeCounter = numNodes - Object.keys(db.nodes).forEach(id => { - let node = db.nodes[id] - if (node.state >= state) { - let gpuCounter = numGpus - gpuReservation = [] - for(let gpu = 0; gpu < node.gpus.length; gpu++) { - if (node.gpus[gpu].job == 0 || state == 0) { - gpuReservation.push(gpu) - gpuCounter-- - if (gpuCounter == 0) { - reservation.push({ node: id, gpuReservation: gpuReservation }) - nodeCounter-- - if (nodeCounter == 0) return reservation - gpuCounter = numGpus - gpuReservation = [] +function _getJobProcesses(job) { + var processes = {} + Object.keys(db.nodes).forEach(nodeId => { + let node = db.nodes[nodeId] + if (node.state >= nodes.STATE_ONLINE) { + let nodeProcesses = {} + Object.keys(node.resources).forEach(resourceType => { + let resource = node.resources[resourceType] + if (resource.job == job.id && resource.pid) { + nodeProcesses[resource.pid] = true + } + }) + Object.keys(nodeProcesses).forEach(pid => { + let pids = processes[nodeId] = processes[nodeId] || [] + pids.push(pid) + }) + } + }) + return processes +} + +function _mergeReservation(target, source) { + Object.keys(source).forEach(key => { + if (!target[key]) { + target[key] = source[key] + } else if (typeof target[key] === 'object') { + _mergeReservation(target[key], source[key]) + } + }) +} + +function _reserve(reservation, nodeId, resourceType, resourceIndex) { + let node = reservation[nodeId] = reservation[nodeId] || {} + let resource = node[resourceType] = node[resourceType] || {} + resource[resourceIndex] = true +} + +function _isReserved(reservation, nodeId, resourceType, resourceIndex) { + return reservation[nodeId] && reservation[nodeId][resourceType] && reservation[nodeId][resourceType][resourceIndex] +} + +function _reserveProcessOnNode(node, reservation, resourceList) { + var nodeReservation = {} + if (!node || !node.resources) { + return null + } + for (let resource of resourceList) { + let resourceCounter = resource.count + let name = db.aliases[resource.name] ? db.aliases[resource.name].name : resource.name + Object.keys(node.resources).forEach(resourceType => { + if (resourceCounter > 0) { + let nodeResources = node.resources[resourceType] + for(let resourceIndex = 0; resourceIndex < nodeResources.length && resourceCounter > 0; resourceIndex++) { + let nodeResource = nodeResources[resourceIndex] + if (nodeResource.name == name && + !_isReserved(reservation, node.id, resourceType, resourceIndex) && + (!nodeResource.job || state == 0)) { + _reserve(nodeReservation, node.id, resourceType, resourceIndex) + resourceCounter-- } } } - gpuCounter = numGpus + }) + } + return nodeReservation +} + +function _reserveProcess(reservation, resourceList, state) { + Object.keys(db.nodes).forEach(nodeId => { + let node = db.nodes[nodeId] + if (node.state >= state) { + let nodeReservation = _reserveProcessOnNode(node, reservation, resourceList) + if (nodeReservation) { + return nodeReservation + } } }) - return false + return null +} + +function _reserveCluster(clusterRequest, state) { + let reservation = {} + clusterRequest.forEach(processRequest => { + for(let i=0; i { - var node = db.nodes[instanceReservation.node] - instanceReservation.gpuReservation.forEach(reservedGpu => node.gpus[reservedGpu].job = jobNumber) + Object.keys(reservation).forEach(nodeId => { + let node = db.nodes[nodeId] + Object.keys(reservation[nodeId]).forEach(resourceType => { + let resources = node[resourceType] + Object.keys(reservation[nodeId][resourceType]).forEach(resourceIndex => { + resources[resourceIndex].job = jobNumber + if (jobNumber == 0) { + resources[resourceIndex].pid = 0 + } + }) + }) }) } @@ -80,6 +147,14 @@ function _deallocate(reservation) { _allocate(reservation, 0) } +function _startJob(job) { + +} + +function _stopJob(job) { + +} + exports.initApp = function(app) { app.get('/jobs/:state', function(req, res) { res.status(200).send() @@ -87,18 +162,25 @@ exports.initApp = function(app) { app.post('/jobs', function(req, res) { store.lockAutoRelease('jobs', function() { - var id = db.jobIdCounter++ - var job = req.body - var allocation = _getAllocation(job.numNodes, job.numGpus, _getEmptyClusterAllocation()) - if (allocation) { + let id = db.jobIdCounter++ + let job = req.body + var clusterRequest + try { + clusterRequest = parseClusterRequest(job.clusterRequest) + } catch (ex) { + console.log(ex) + res.status(400).send({ message: 'Problem parsing allocation' }) + return + } + let reservation = _reserveCluster(clusterRequest, nodes.STATE_UNKNOWN) + if (reservation) { db.jobs[id] = { id: id, + user: req.user.id, origin: job.origin, hash: job.hash, diff: job.diff, description: job.description || (req.user.id + ' - ' + new Date().toISOString()), - numNodes: job.numNodes, - numGpus: job.numGpus } db.schedule.push(id) res.status(200).send({ id: id }) @@ -117,7 +199,22 @@ exports.initApp = function(app) { }) app.delete('/jobs/:id', function(req, res) { - res.status(200).send() + var id = Number(req.params.id) + var dbjob = db.jobs[id] + if (dbjob) { + if (req.user.id == dbjob.id || req.user.admin) { + delete db.jobs[id] + let scheduleIndex = db.schedule.indexOf(id) + if (scheduleIndex >= 0) { + db.schedule.splice(scheduleIndex, 1) + } + res.status(200).send() + } else { + res.status(403).send() + } + } else { + res.status(404).send() + } }) }