This commit is contained in:
Tilman Kamp 2018-03-14 18:43:43 +01:00
Родитель b3d0aa0f39
Коммит 8591c7471e
5 изменённых файлов: 181 добавлений и 55 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -1,5 +1,5 @@
config
src/clusterParser.js
db.json
# Logs

Просмотреть файл

@ -4,7 +4,8 @@
"description": "Machine learning job scheduler",
"main": "app.js",
"scripts": {
"start": "node src/service.js"
"start": "node src/service.js",
"postinstall": "node node_modules/pegjs/bin/pegjs src/clusterParser.pegjs"
},
"repository": {
"type": "git",
@ -33,11 +34,11 @@
"dependencies": {
"bcrypt": "^1.0.3",
"body-parser": "^1.18.2",
"combined-stream": "^1.0.6",
"commander": "^2.14.1",
"express": "^4.16.2",
"jsonwebtoken": "^8.1.1",
"morgan": "^1.9.0",
"pegjs": "^0.10.0",
"process": "^0.11.10",
"readable-stream": "^2.3.5",
"request": "^2.83.0"

Просмотреть файл

@ -17,10 +17,11 @@ exports.initApp = function(app) {
app.put('/aliases/:id', function(req, res) {
if (req.user.admin) {
if (req.body && req.body.model) {
console.log(req.body)
if (req.body && req.body.name) {
db.aliases[req.params.id] = {
id: req.params.id,
model: req.body.model
name: req.body.name
}
res.status(200).send()
} else {

27
src/clusterParser.pegjs Normal file
Просмотреть файл

@ -0,0 +1,27 @@
start
= cluster
cluster
= left:processGroup "," right:cluster { return left.concat(right); }
/ solo:processGroup { return [solo]; }
processGroup
= left:integer ":" right:process { return { count: left, process: right }; }
/ solo:process { return { count: 1, process: solo } }
process
= "[" solo:resourceList "]" { return solo }
resourceList
= left:resourceGroup "," right:resourceList { return left.concat(right); }
/ solo:resourceGroup { return [solo]; }
resourceGroup
= left:integer ":" right:resource { return { count: left, name: right }; }
/ solo:resource { return { count: 1, name: solo } }
resource
= chars:[a-zA-Z]+[a-zA-Z0-9]* { return chars.join(""); }
integer
= digits:[0-9]+ { return parseInt(digits.join(""), 10); }

Просмотреть файл

@ -1,5 +1,6 @@
const store = require('./store.js')
const node = require('./nodes.js')
const nodes = require('./nodes.js')
const parseClusterRequest = require('./clusterParser.js').parse
var exports = module.exports = {}
var db = store.root
@ -17,62 +18,128 @@ exports.initDb = function() {
}
function _getRunningJobs() {
var jobs = []
Object.keys(db.nodes).forEach(id => {
let node = db.nodes[id]
if (node.state >= nodes.STATE_ACTIVE) {
let gpuCounter = numGpus
gpuReservation = []
for(let gpu = 0; gpu < node.gpus.length; gpu++) {
if (node.gpus[gpu].job == 0 || state == 0) {
gpuReservation.push(gpu)
gpuCounter--
if (gpuCounter == 0) {
reservation.push({ node: id, gpuReservation: gpuReservation })
nodeCounter--
if (nodeCounter == 0) return reservation
gpuCounter = numGpus
gpuReservation = []
}
var jobs = {}
Object.keys(db.nodes).forEach(nodeId => {
let node = db.nodes[nodeId]
if (node.state >= nodes.STATE_ONLINE) {
Object.keys(node.resources).forEach(resourceType => {
let resource = node.resources[resourceType]
if (resource.job) {
jobs[resource.job] = db.jobs[resource.job]
}
}
gpuCounter = numGpus
})
}
})
return jobs
}
function _reserve(numNodes, numGpus, state) {
let reservation = []
let nodeCounter = numNodes
Object.keys(db.nodes).forEach(id => {
let node = db.nodes[id]
if (node.state >= state) {
let gpuCounter = numGpus
gpuReservation = []
for(let gpu = 0; gpu < node.gpus.length; gpu++) {
if (node.gpus[gpu].job == 0 || state == 0) {
gpuReservation.push(gpu)
gpuCounter--
if (gpuCounter == 0) {
reservation.push({ node: id, gpuReservation: gpuReservation })
nodeCounter--
if (nodeCounter == 0) return reservation
gpuCounter = numGpus
gpuReservation = []
function _getJobProcesses(job) {
var processes = {}
Object.keys(db.nodes).forEach(nodeId => {
let node = db.nodes[nodeId]
if (node.state >= nodes.STATE_ONLINE) {
let nodeProcesses = {}
Object.keys(node.resources).forEach(resourceType => {
let resource = node.resources[resourceType]
if (resource.job == job.id && resource.pid) {
nodeProcesses[resource.pid] = true
}
})
Object.keys(nodeProcesses).forEach(pid => {
let pids = processes[nodeId] = processes[nodeId] || []
pids.push(pid)
})
}
})
return processes
}
function _mergeReservation(target, source) {
Object.keys(source).forEach(key => {
if (!target[key]) {
target[key] = source[key]
} else if (typeof target[key] === 'object') {
_mergeReservation(target[key], source[key])
}
})
}
function _reserve(reservation, nodeId, resourceType, resourceIndex) {
let node = reservation[nodeId] = reservation[nodeId] || {}
let resource = node[resourceType] = node[resourceType] || {}
resource[resourceIndex] = true
}
function _isReserved(reservation, nodeId, resourceType, resourceIndex) {
return reservation[nodeId] && reservation[nodeId][resourceType] && reservation[nodeId][resourceType][resourceIndex]
}
function _reserveProcessOnNode(node, reservation, resourceList) {
var nodeReservation = {}
if (!node || !node.resources) {
return null
}
for (let resource of resourceList) {
let resourceCounter = resource.count
let name = db.aliases[resource.name] ? db.aliases[resource.name].name : resource.name
Object.keys(node.resources).forEach(resourceType => {
if (resourceCounter > 0) {
let nodeResources = node.resources[resourceType]
for(let resourceIndex = 0; resourceIndex < nodeResources.length && resourceCounter > 0; resourceIndex++) {
let nodeResource = nodeResources[resourceIndex]
if (nodeResource.name == name &&
!_isReserved(reservation, node.id, resourceType, resourceIndex) &&
(!nodeResource.job || state == 0)) {
_reserve(nodeReservation, node.id, resourceType, resourceIndex)
resourceCounter--
}
}
}
gpuCounter = numGpus
})
}
return nodeReservation
}
function _reserveProcess(reservation, resourceList, state) {
Object.keys(db.nodes).forEach(nodeId => {
let node = db.nodes[nodeId]
if (node.state >= state) {
let nodeReservation = _reserveProcessOnNode(node, reservation, resourceList)
if (nodeReservation) {
return nodeReservation
}
}
})
return false
return null
}
function _reserveCluster(clusterRequest, state) {
let reservation = {}
clusterRequest.forEach(processRequest => {
for(let i=0; i<processRequest.count; i++) {
let processReservation = _reserveProcess(reservation, processRequest.process, state)
if (processReservation) {
_mergeReservation(reservation, processReservation)
} else {
return null
}
}
})
return reservation
}
function _allocate(reservation, jobNumber) {
reservation.forEach(instanceReservation => {
var node = db.nodes[instanceReservation.node]
instanceReservation.gpuReservation.forEach(reservedGpu => node.gpus[reservedGpu].job = jobNumber)
Object.keys(reservation).forEach(nodeId => {
let node = db.nodes[nodeId]
Object.keys(reservation[nodeId]).forEach(resourceType => {
let resources = node[resourceType]
Object.keys(reservation[nodeId][resourceType]).forEach(resourceIndex => {
resources[resourceIndex].job = jobNumber
if (jobNumber == 0) {
resources[resourceIndex].pid = 0
}
})
})
})
}
@ -80,6 +147,14 @@ function _deallocate(reservation) {
_allocate(reservation, 0)
}
function _startJob(job) {
}
function _stopJob(job) {
}
exports.initApp = function(app) {
app.get('/jobs/:state', function(req, res) {
res.status(200).send()
@ -87,18 +162,25 @@ exports.initApp = function(app) {
app.post('/jobs', function(req, res) {
store.lockAutoRelease('jobs', function() {
var id = db.jobIdCounter++
var job = req.body
var allocation = _getAllocation(job.numNodes, job.numGpus, _getEmptyClusterAllocation())
if (allocation) {
let id = db.jobIdCounter++
let job = req.body
var clusterRequest
try {
clusterRequest = parseClusterRequest(job.clusterRequest)
} catch (ex) {
console.log(ex)
res.status(400).send({ message: 'Problem parsing allocation' })
return
}
let reservation = _reserveCluster(clusterRequest, nodes.STATE_UNKNOWN)
if (reservation) {
db.jobs[id] = {
id: id,
user: req.user.id,
origin: job.origin,
hash: job.hash,
diff: job.diff,
description: job.description || (req.user.id + ' - ' + new Date().toISOString()),
numNodes: job.numNodes,
numGpus: job.numGpus
}
db.schedule.push(id)
res.status(200).send({ id: id })
@ -117,7 +199,22 @@ exports.initApp = function(app) {
})
app.delete('/jobs/:id', function(req, res) {
res.status(200).send()
var id = Number(req.params.id)
var dbjob = db.jobs[id]
if (dbjob) {
if (req.user.id == dbjob.id || req.user.admin) {
delete db.jobs[id]
let scheduleIndex = db.schedule.indexOf(id)
if (scheduleIndex >= 0) {
db.schedule.splice(scheduleIndex, 1)
}
res.status(200).send()
} else {
res.status(403).send()
}
} else {
res.status(404).send()
}
})
}