This commit is contained in:
Tilman Kamp 2018-03-07 17:54:57 +01:00
Родитель 4810592047
Коммит e7ca489932
12 изменённых файлов: 273 добавлений и 155 удалений

Просмотреть файл

@ -37,6 +37,8 @@
"express": "^4.16.2",
"jsonwebtoken": "^8.1.1",
"morgan": "^1.9.0",
"process": "^0.11.10",
"readable-stream": "^2.3.5",
"request": "^2.83.0"
}
}

2
scripts/available.sh Normal file
Просмотреть файл

@ -0,0 +1,2 @@
echo "Available"
exit 0

0
scripts/prepare.sh Normal file
Просмотреть файл

0
scripts/run.sh Normal file
Просмотреть файл

10
src/config.js Normal file
Просмотреть файл

@ -0,0 +1,10 @@
const fs = require('fs')
const path = require('path')
var filename = path.join(__dirname, '..', 'config', 'snakepit.config')
try {
module.exports = JSON.parse(fs.readFileSync(filename, 'utf8'))
} catch (err) {
console.error('Unable to load configuration from ""' + filename + '"')
}

Просмотреть файл

@ -1,71 +0,0 @@
const store = require('./store.js')
var exports = module.exports = {}
var db = store.root
exports.initDb = function() {
if (!db.jobIdCounter) {
db.jobIdCounter = 1
}
if (!db.jobs) {
db.jobs = {}
}
if (!db.allocation) {
db.allocation = {}
}
if (!db.schedule) {
db.schedule = []
}
}
function _getEmptyCluster() {
}
function _getAllocation(numNodes, numGpus, clusterAllocation) {
//if (numNodes)
}
function _mergeAllocation(allocation, clusterAllocation) {
}
exports.initApp = function(app) {
app.get('/jobs/:state', function(req, res) {
res.status(200).send()
})
app.post('/jobs', function(req, res) {
store.lockAutoRelease('jobs', function() {
var id = db.jobIdCounter++
var job = req.body
var allocation = _getAllocation(job.numNodes, job.numGpus, _getEmptyClusterAllocation())
if (allocation) {
db.jobs[id] = {
id: id,
origin: job.origin,
hash: job.hash,
diff: job.diff,
description: job.description || (req.user.id + ' - ' + new Date().toISOString()),
resources: job.resources
}
db.schedule.push(id)
res.status(200).send({ id: id })
} else {
res.status(406).send()
}
})
})
app.get('/jobs/:id', function(req, res) {
res.status(200).send()
})
app.get('/jobs/:id/watch', function(req, res) {
res.status(200).send()
})
app.delete('/jobs/:id', function(req, res) {
res.status(200).send()
})
}

124
src/jobs.js Normal file
Просмотреть файл

@ -0,0 +1,124 @@
const store = require('./store.js')
const node = require('./nodes.js')
var exports = module.exports = {}
var db = store.root
exports.initDb = function() {
if (!db.jobIdCounter) {
db.jobIdCounter = 1
}
if (!db.jobs) {
db.jobs = {}
}
if (!db.schedule) {
db.schedule = []
}
}
function _getRunningJobs() {
var jobs = []
for (let [id, node] of Object.entries(db.nodes)) {
if (node.state >= nodes.STATE_ACTIVE) {
let gpuCounter = numGpus
gpuReservation = []
for(let gpu = 0; gpu < node.gpus.length; gpu++) {
if (node.gpus[gpu].job == 0 || state == 0) {
gpuReservation.push(gpu)
gpuCounter--
if (gpuCounter == 0) {
reservation.push({ node: id, gpuReservation: gpuReservation })
nodeCounter--
if (nodeCounter == 0) return reservation
gpuCounter = numGpus
gpuReservation = []
}
}
}
gpuCounter = numGpus
}
}
return jobs
}
function _reserve(numNodes, numGpus, state) {
let reservation = []
let nodeCounter = numNodes
for (let [id, node] of Object.entries(db.nodes)) {
if (node.state >= state) {
let gpuCounter = numGpus
gpuReservation = []
for(let gpu = 0; gpu < node.gpus.length; gpu++) {
if (node.gpus[gpu].job == 0 || state == 0) {
gpuReservation.push(gpu)
gpuCounter--
if (gpuCounter == 0) {
reservation.push({ node: id, gpuReservation: gpuReservation })
nodeCounter--
if (nodeCounter == 0) return reservation
gpuCounter = numGpus
gpuReservation = []
}
}
}
gpuCounter = numGpus
}
}
return false
}
function _allocate(reservation, jobNumber) {
reservation.forEach(instanceReservation => {
var node = db.nodes[instanceReservation.node]
instanceReservation.gpuReservation.forEach(reservedGpu => node.gpus[reservedGpu].job = jobNumber)
})
}
function _deallocate(reservation) {
_allocate(reservation, 0)
}
exports.initApp = function(app) {
app.get('/jobs/:state', function(req, res) {
res.status(200).send()
})
app.post('/jobs', function(req, res) {
store.lockAutoRelease('jobs', function() {
var id = db.jobIdCounter++
var job = req.body
var allocation = _getAllocation(job.numNodes, job.numGpus, _getEmptyClusterAllocation())
if (allocation) {
db.jobs[id] = {
id: id,
origin: job.origin,
hash: job.hash,
diff: job.diff,
description: job.description || (req.user.id + ' - ' + new Date().toISOString()),
numNodes: job.numNodes,
numGpus: job.numGpus
}
db.schedule.push(id)
res.status(200).send({ id: id })
} else {
res.status(406).send()
}
})
})
app.get('/jobs/:id', function(req, res) {
res.status(200).send()
})
app.get('/jobs/:id/watch', function(req, res) {
res.status(200).send()
})
app.delete('/jobs/:id', function(req, res) {
res.status(200).send()
})
}
exports.tick = function() {
}

Просмотреть файл

@ -1,58 +0,0 @@
const store = require('./store.js')
var exports = module.exports = {}
var db = store.root
exports.initDb = function() {
if (!db.nodes) {
db.nodes = {}
}
}
exports.initApp = function(app) {
app.put('/nodes/:id', function(req, res) {
if (req.user.admin) {
var id = req.params.id
node = req.body
dbnode = db.nodes[id] || {}
newnode = {
id: id,
address: node.address || dbnode.address,
port: node.port || dbnode.port || 22,
gpus: node.gpus || dbnode.gpus,
user: node.user || dbnode.user || 'pitmaster'
}
db.nodes[id] = newnode
res.status(200).send()
} else {
res.status(403).send()
}
})
app.get('/nodes', function(req, res) {
res.status(200).send(Object.keys(db.nodes))
})
app.get('/nodes/:id', function(req, res) {
var node = db.nodes[req.params.id]
if (node) {
res.status(200).json(node)
} else {
res.status(404).send()
}
})
app.delete('/nodes/:id', function(req, res) {
if (req.user.admin) {
var id = req.params.id
if (db.nodes[id]) {
delete db.nodes[id]
res.status(200).send()
} else {
res.status(404).send()
}
} else {
res.status(403).send()
}
})
}

102
src/nodes.js Normal file
Просмотреть файл

@ -0,0 +1,102 @@
const fs = require('fs')
const path = require('path')
const { exec, execFile, spawn } = require('child_process')
const store = require('./store.js')
var exports = module.exports = {}
var db = store.root
const STATE_UNKNOWN = exports.STATE_UNKNOWN = 0
const STATE_OFFLINE = exports.STATE_OFFLINE = 1
const STATE_ACTIVE = exports.STATE_ACTIVE = 2
function _runScript(node, scriptName, callback) {
let scriptPath = path.join(__dirname, '..', 'scripts', scriptName)
let address = node.user + '@' + node.address
console.log('Running script "' + scriptPath + '" on "' + address + '"')
p = execFile(
'ssh',
[address, '-p', node.port, 'bash -s'],
null,
callback
)
fs.createReadStream(scriptPath).pipe(p.stdin)
}
function _checkAvailability(node, callback) {
_runScript(node, 'available.sh', (err, stdout, stderr) => {
console.log(stdout)
if (err) {
console.error(stderr)
callback(false)
} else {
callback(true)
}
})
}
exports.initDb = function() {
if (!db.nodes) {
db.nodes = {}
}
}
exports.initApp = function(app) {
app.put('/nodes/:id', function(req, res) {
if (req.user.admin) {
var id = req.params.id
node = req.body
dbnode = db.nodes[id] || {}
newnode = {
id: id,
address: node.address || dbnode.address,
port: node.port || dbnode.port || 22,
gpus: node.hasOwnProperty('gpus') ? node.gpus : dbnode.gpus,
user: node.user || dbnode.user || 'pitmaster',
state: STATE_UNKNOWN
}
if (newnode.address) {
_checkAvailability(newnode, available => {
if (available) {
db.nodes[id] = newnode
res.status(200).send()
} else {
res.status(400).send({ message: 'Node not available' })
}
})
} else {
res.status(400).send()
}
} else {
res.status(403).send()
}
})
app.get('/nodes', function(req, res) {
res.status(200).send(Object.keys(db.nodes))
})
app.get('/nodes/:id', function(req, res) {
var node = db.nodes[req.params.id]
if (node) {
res.status(200).json(node)
} else {
res.status(404).send()
}
})
app.delete('/nodes/:id', function(req, res) {
if (req.user.admin) {
var id = req.params.id
if (db.nodes[id]) {
delete db.nodes[id]
res.status(200).send()
} else {
res.status(404).send()
}
} else {
res.status(403).send()
}
})
}

Просмотреть файл

@ -2,7 +2,8 @@ const fs = require('fs')
const path = require('path')
const cluster = require('cluster')
const cpus = require('os').cpus().length
const modules = 'user node job'.split(' ').map(name => require('./' + name + '.js'))
const config = require('./config.js')
const modules = 'users nodes jobs'.split(' ').map(name => require('./' + name + '.js'))
function readConfigFile(name) {
var filename = path.join('config', name)
@ -17,10 +18,11 @@ if (cluster.isMaster) {
process.exit(100) // Preventing fork-loop on startup problems
}
var worker = cluster.fork();
console.log('Worker ' + deadWorker.process.pid + ' died.');
console.log('Worker ' + worker.process.pid + ' born.');
console.log('Worker ' + deadWorker.process.pid + ' died.')
console.log('Worker ' + worker.process.pid + ' born.')
})
modules.forEach(module => module.initDb())
modules.forEach(module => (module.initDb || Function)())
modules.forEach(module => (module.tick || Function)())
} else {
try {
const url = require('url')
@ -29,16 +31,13 @@ if (cluster.isMaster) {
const morgan = require('morgan')
const bodyParser = require('body-parser')
const config = JSON.parse(readConfigFile('snakepit.config'))
var app = express()
app.set('tokenSecret', readConfigFile('token-secret.txt'))
app.set('config', config)
app.use(bodyParser.urlencoded({ extended: false }))
app.use(bodyParser.json())
app.use(morgan('dev'))
modules.forEach(module => module.initApp(app))
modules.forEach(module => (module.initApp || Function)(app))
var credentials = {
key: readConfigFile('key.pem'),

Просмотреть файл

@ -8,7 +8,7 @@ const NAME_SYMBOL = Symbol('name')
const DB_PATH = process.env.SNAKEPIT_DB || 'db.json'
var rawRoot = (fs.existsSync(DB_PATH) && fs.statSync(DB_PATH).isFile()) ? JSON.parse(fs.readFileSync(DB_PATH).toString()) : {}
var dirty = false
var storeLog = []
var locks = {}
var callbackIdCounter = 0
var callbacks = {}
@ -74,7 +74,6 @@ var observer = {
if (this != 'skip') {
_broadcast({ storeOperation: 'set', path: path, args: [name, value] })
}
dirty = true
return Reflect.set(target, name, value)
},
deleteProperty: function(target, name) {
@ -83,7 +82,6 @@ var observer = {
if (this != 'skip') {
_broadcast({ storeOperation: 'deleteProperty', path: path, args: [name] })
}
dirty = true
return Reflect.deleteProperty(...arguments)
}
}
@ -103,6 +101,7 @@ function _handle_message(msg, sender) {
if (msg.storeOperation) {
observer[msg.storeOperation].apply('skip', [_getObject(msg.path)].concat(msg.args))
if (cluster.isMaster) {
storeLog.push(msg)
_broadcast(msg, sender)
}
} else if (msg.askLock) {
@ -148,31 +147,38 @@ function _lock(target, callback, sync) {
_send(null, { askLock: target, id: callbackIdCounter })
}
function _writeDb() {
if (storeLog.length > 0) {
fs.writeFile(DB_PATH, JSON.stringify(rawRoot, null, '\t'), function(err) {
if(err)
return console.err(err);
//log('Wrote db!')
storeLog = []
})
}
}
function _tickOn() {
setTimeout(_tick, 1000)
}
function _tick() {
//console.log('Tick...')
if (dirty) {
fs.writeFile(DB_PATH, JSON.stringify(rawRoot, null, '\t'), function(err) {
if(err)
return console.err(err);
//log('Wrote db!')
dirty = false
})
}
_writeDb()
_tickOn()
}
if (cluster.isMaster) {
cluster.on('fork', worker => worker.on('message', msg => _handle_message(msg, worker)))
cluster.on('fork', worker => {
worker.on('message', msg => _handle_message(msg, worker))
storeLog.forEach(msg => worker.send(msg))
})
cluster.on('exit', function(worker, code, signal) {
for (lockName in locks) {
for (let lockName in locks) {
if (locks.hasOwnProperty(lockName)) {
var waiting = locks[lockName]
let waiting = locks[lockName]
if (waiting && waiting.length > 0) {
var first = waiting[0]
let first = waiting[0]
locks[lockName] = waiting = waiting.filter(entry => entry.sender == worker)
if (waiting.length > 0 && first != waiting[0]) {
_send(waiting[0].sender, { gotLock: lockName, id: waiting[0].id })
@ -182,8 +188,9 @@ if (cluster.isMaster) {
}
})
_tickOn()
} else
} else {
process.on('message', _handle_message)
}
exports.root = new Proxy(rawRoot, observer)

Просмотреть файл

@ -1,6 +1,7 @@
const bcrypt = require('bcrypt')
const jwt = require('jsonwebtoken')
const store = require('./store.js')
const config = require('./config.js')
var exports = module.exports = {}
var db = store.root
@ -77,7 +78,7 @@ exports.initApp = function(app) {
res.status(200).send()
}
if (user.password) {
bcrypt.hash(user.password, app.get('config').hashRounds || 10, function(err, hash) {
bcrypt.hash(user.password, config.hashRounds || 10, function(err, hash) {
if(err) {
res.status(500).send()
} else {
@ -102,7 +103,7 @@ exports.initApp = function(app) {
jwt.sign(
{ user: id },
app.get('tokenSecret'),
{ expiresIn: app.get('config').tokenTTL },
{ expiresIn: config.tokenTTL },
function(err, token) {
if (err) {
res.status(500).send()