Enhanced simple FS command support, removed download command

This commit is contained in:
Tilman Kamp 2019-05-06 20:10:06 +02:00
Родитель cba2307103
Коммит 19d9cd28be
3 изменённых файлов: 275 добавлений и 55 удалений

10
package-lock.json сгенерированный
Просмотреть файл

@ -170,6 +170,11 @@
"resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.0.0.tgz",
"integrity": "sha1-1RQsDK7msRifh9OnYREGT4bIu/I="
},
"filesize": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/filesize/-/filesize-4.1.2.tgz",
"integrity": "sha512-iSWteWtfNcrWQTkQw8ble2bnonSl7YJImsn9OZKpE2E4IHhXI78eASpDYUljXZZdYj36QsEKjOs/CsiDqmKMJw=="
},
"filesize-parser": {
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/filesize-parser/-/filesize-parser-1.5.0.tgz",
@ -345,6 +350,11 @@
"resolved": "https://registry.npmjs.org/performance-now/-/performance-now-2.1.0.tgz",
"integrity": "sha1-Ywn04OX6kT7BxpMHrjZLSzd8nns="
},
"progress": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/progress/-/progress-2.0.3.tgz",
"integrity": "sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA=="
},
"psl": {
"version": "1.1.31",
"resolved": "https://registry.npmjs.org/psl/-/psl-1.1.31.tgz",

Просмотреть файл

@ -35,10 +35,12 @@
"buffer-serializer": "^1.1.0",
"commander": "^2.19.0",
"cryptiles": ">=4.1.3",
"filesize": "^4.1.2",
"filesize-parser": "^1.5.0",
"hoek": ">=6.1.2",
"matcher": "^1.1.1",
"mime": ">=2.4.0",
"progress": "^2.0.3",
"readline-sync": "^1.4.9",
"request": "^2.88.0",
"tmp": "^0.0.33",

Просмотреть файл

@ -7,6 +7,8 @@ const path = require('path')
const program = require('commander')
const WebSocket = require('ws')
const request = require('request')
const ProgressBar = require('progress')
const filesize = require('filesize')
const readlineSync = require('readline-sync')
const { spawn, execFileSync } = require('child_process')
@ -113,12 +115,18 @@ function callPit(verb, resource, content, callback, callOptions) {
if (callOptions && callOptions.offset) {
headers['Range'] = 'bytes=' + callOptions.offset + '-'
}
let creq = request[verb]({
if (callOptions && callOptions.headers) {
headers = Object.assign(headers, callOptions.headers)
}
let creqoptions = {
url: pitUrl + '/' + resource,
agentOptions: agentOptions,
headers: headers,
body: content ? JSON.stringify(content) : undefined
})
headers: headers
}
if (content && (typeof content.pipe != 'function')) {
creqoptions.body = JSON.stringify(content)
}
let creq = request[verb](creqoptions)
.on('error', err => fail('Unable to reach pit: ' + err.code))
.on('response', res => {
if (res.statusCode === 401) {
@ -147,6 +155,9 @@ function callPit(verb, resource, content, callback, callOptions) {
})
}
})
if (content && (typeof content.pipe == 'function')) {
content.pipe(creq)
}
}
function authenticate(username, password, callback) {
@ -552,6 +563,108 @@ function getResourcePath (remotePath) {
return remotePath ? (remotePath.startsWith('/') ? remotePath.slice(1) : remotePath) : ''
}
function createProgressBar (caption, offset, size) {
let bar = new ProgressBar(' ' + caption + ' [:bar] :percent :speed :etas', {
complete: '=',
incomplete: ' ',
width: 40,
total: size
})
bar.tick(offset)
let origTick = bar.tick
let intervalStart = Date.now()
let intervalTicks = 0
let pastTicks = [{time: intervalStart, ticks: 0}]
let speed = ''
bar.tick = function(ticks) {
let now = Date.now()
intervalTicks += ticks
if (now - intervalStart > 100) {
pastTicks.push({time: intervalStart, ticks: intervalTicks})
intervalStart = now
intervalTicks = 0
pastTicks = pastTicks.reverse().slice(0, 10).reverse()
let transfer = pastTicks.map(t => t.ticks).reduce((t, v) => t + v, 0)
let timeDiff = (now - pastTicks[0].time) / 1000
speed = filesize(transfer / timeDiff, {round: 0}) + '/s'
}
origTick.apply(bar, [ticks, { speed: speed }])
}
return bar
}
function copyContent (entity, remotePath, localPath, options) {
options = options || {}
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('get', entityPath + '/simplefs/stats/' + resource, (code, stats) => {
evaluateResponse(code)
if (stats.isFile) {
if (localPath) {
let offset = 0
if (fs.existsSync(localPath)) {
let localStats = fs.statSync(localPath)
if (localStats.isDirectory()) {
let rname = remotePath.substring(remotePath.lastIndexOf('/') + 1)
if (rname.length > 0) {
localPath = path.join(localPath, rname)
} else {
fail('Cannot construct target filename.')
}
} else if (localStats.isFile()) {
if (options.force) {
console.error('Target file existing: Re-downloading...')
} else if (localStats.size >= stats.size) {
fail('Local file already existing. Remove it or use force option to overwrite.')
} else if (options.continue) {
console.error('Local file already existing and smaller than remote file: Continuing download...')
offset = localStats.size
} else {
let answer = readlineSync.question('Remote file larger than local one. Continue interrupted download (yN)? ', {
trueValue: ['y', 'yes'],
falseValue: ['n', 'no']
})
if (answer === true) {
offset = localStats.size
} else {
fail('Aborted')
}
}
} else {
fail('Target path is neither a directory nor a file.')
}
} else {
let dirname = path.dirname(localPath)
if (fs.existsSync(dirname)) {
if (!fs.statSync(dirname).isDirectory()) {
fail('Specified target directory is not a directory.')
}
} else {
fail('Target directory not existing.')
}
}
callPit('get', entityPath + '/simplefs/content/' + resource, (code, res) => {
evaluateResponse(code)
let bar = createProgressBar('downloading', offset, stats.size)
res.on('data', buf => bar.tick(buf.length))
let target = fs.createWriteStream(localPath, {flags: offset > 0 ? 'a' : 'w'})
res.pipe(target)
}, {
asStream: true,
headers: { 'Range': 'bytes=' + offset + '-' }
})
} else {
callPit('get', entityPath + '/simplefs/content/' + resource, (code, res) => {
evaluateResponse(code)
res.pipe(process.stdout)
}, { asStream: true })
}
} else {
fail('Command only supports file transfers.')
}
})
}
program
.version('0.0.1')
@ -996,24 +1109,6 @@ program
})
})
program
.command('download <jobNumber>')
.description('downloads job directory as .tar.gz archive')
.on('--help', function() {
printIntro()
printExample('pit download 1234')
})
.action((jobNumber) => {
let filename = 'job' + jobNumber + '.tar.gz'
if (fs.existsSync(filename)) {
fail('Unable to download: File "' + filename + '" already exists')
}
callPit('get', 'jobs/' + jobNumber + '/targz', (code, res) => {
evaluateResponse(code)
res.pipe(fs.createWriteStream(filename))
}, { asStream: true })
})
program
.command('ls <entity> [remotePath]')
.description('lists contents within a job directory')
@ -1050,57 +1145,170 @@ program
})
program
.command('cp <entity> <remotePath> <fsPath>')
.description('copies contents from job directory to local file system')
.command('pull <entity> <remotePath> [localPath]')
.alias('cp')
.option('-f, --force', 'will overwrite existing target file if existing - always starting download from scratch')
.option('-c, --continue', 'will try to continue interrupted download - starting from scratch, if target is not existing')
.description('copies contents from an entity\'s file to a local file or stdout')
.on('--help', function() {
printIntro()
printExample('pit cp job:1234 keep/checkpoint-0001.bin ./checkpoint.bin')
printExample('pit cp home data/corpus.data ./corpus.data')
printExample('pit pull job:1234 keep/checkpoint-0001.bin ./checkpoint.bin')
printExample('pit pull home data/corpus.data ./corpus.data')
printLine()
printLine('"entity" is the entity whose data directory should be accessed')
printEntityHelp('home', entityUser, entityJob, entityGroup, 'shared')
printLine('"remotePath" is the source path within the remote data directory.')
printLine('"fsPath" is the destination path within local filesystem.')
printLine('"localPath" is the destination path within the local filesystem. If omitted, data will be written to stdout.')
})
.action((entity, remotePath, fsPath) => {
.action((entity, remotePath, localPath, options) => copyContent(entity, remotePath, localPath, options))
program
.command('cat <entity> <remotePath>')
.description('copies contents from an entity\'s directory to stdout')
.on('--help', function() {
printIntro()
printExample('pit cat job:1234 keep/results.txt')
printExample('pit cat home data/some.txt')
printLine()
printLine('"entity" is the entity whose data directory should be accessed')
printEntityHelp('home', entityUser, entityJob, entityGroup, 'shared')
printLine('"remotePath" is the source path within the remote data directory.')
})
.action((entity, remotePath) => copyContent(entity, remotePath))
program
.command('push <entity> <remotePath> [localPath]')
.option('-f, --force', 'will overwrite existing target file if existing - always starting upload from scratch')
.option('-c, --continue', 'will try to continue interrupted upload - starting from scratch, if target is not existing')
.description('copies contents from stdin or local file system to a file in an entity\'s tree')
.on('--help', function() {
printIntro()
printExample('pit push group:students some/dir/data.bin ./data.bin')
printExample('generate-some-data.py | pit push home keeping/some.data')
printLine()
printLine('"entity" is the entity whose data directory should be targeted')
printEntityHelp('home', entityUser, entityGroup)
printLine('"remotePath" is the target path within the remote entity\'s directory.')
printLine('"localPath" is the path to a source file within the local filesystem. If omitted, data will be read from stdin.')
})
.action((entity, remotePath, localPath, options) => {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('get', entityPath + '/simplefs/stats/' + resource, (code, stats) => {
evaluateResponse(code)
if (stats.isFile) {
let offset = 0
if (fs.existsSync(fsPath)) {
let localStats = fs.statSync(fsPath)
if (localStats.isDirectory()) {
let rname = remotePath.substring(remotePath.lastIndexOf('/') + 1)
if (rname.length > 0) {
fsPath = path.join(fsPath, rname)
} else {
fail('Cannot construct target filename.')
}
} else if (localStats.isFile()) {
offset = localStats.size
let localStats
let size = 0
if (localPath) {
if (fs.existsSync(localPath)) {
localStats = fs.statSync(localPath)
size = localStats.size
} else {
fail('Source file not found.')
}
}
let transferContent = (offset) => {
let targetPath = entityPath + '/simplefs/content/' + resource
if (localStats) {
let stream = fs.createReadStream(localPath, { start: offset })
let bar = createProgressBar('uploading', offset, size)
stream.on('data', buf => bar.tick(buf.length))
callPit('put', targetPath, stream, (code, res) => {
evaluateResponse(code)
}, {
headers: {
'Content-Type': 'application/octet-stream',
'Content-Offset': offset
}
} else {
let dirname = path.dirname(fsPath)
if (fs.existsSync(dirname)) {
if (!fs.statSync(dirname).isDirectory()) {
fail('Target directory not a directory.')
})
} else {
callPit('put', targetPath, process.stdin, (code, res) => {
evaluateResponse(code)
}, { headers: { 'Content-Type': 'application/octet-stream' } })
}
}
let statsPath = entityPath + '/simplefs/stats/' + resource
callPit('get', statsPath, (code, stats) => {
if (code === 404) {
console.error('Remote file not existing - creating...')
callPit('put', statsPath, { type: 'file' }, (code, res) => {
evaluateResponse(code)
transferContent(0)
})
} else {
evaluateResponse(code)
if (stats.isFile) {
if (stats.size < size) {
if (options.continue) {
console.error('Remote file smaller than local one - continuing upload...')
transferContent(stats.size)
} else {
if (options.force) {
console.error('Remote file existing - re-uploading...')
transferContent(0)
} else {
let answer = readlineSync.question('Remote file smaller than local one. Continue interrupted upload (yN)? ', {
trueValue: ['y', 'yes'],
falseValue: ['n', 'no']
})
if (answer === true) {
transferContent(stats.size)
} else {
fail('Aborted')
}
}
}
} else {
fail('Target directory not existing.')
if (options.force) {
console.error('Remote file is of same size or larger than local one - re-uploading...')
transferContent(0)
} else {
fail('Remote file is of same size or larger than local one.')
}
}
} else {
fail('Target path is existing, but not a file.')
}
callPit('get', entityPath + '/simplefs/content/' + resource, (code, res) => {
evaluateResponse(code)
res.pipe(fs.createWriteStream(fsPath))
}, { asStream: true }) // offset: offset
} else {
fail('At the moment only file copying is supported.')
}
})
})
program
.command('mkdir <entity> <remotePath>')
.description('creates an entity directory')
.on('--help', function() {
printIntro()
printExample('pit mkdir group:students some/dir')
printLine()
printLine('"entity" is the entity whose data directory should be targeted')
printEntityHelp('home', entityUser, entityGroup)
printLine('"remotePath" is the target path within the remote entity\'s tree.')
})
.action((entity, remotePath) => {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('put', entityPath + '/simplefs/stats/' + resource, { type: 'directory' }, (code) => {
evaluateResponse(code)
})
})
program
.command('delete <entity> <remotePath>')
.description('deletes a file or directory within an entity\'s tree')
.on('--help', function() {
printIntro()
printExample('pit delete group:students some/dir')
printExample('pit delete home some/file.txt')
printLine()
printLine('"entity" is the entity whose data directory should be targeted')
printEntityHelp('home', entityUser, entityGroup)
printLine('"remotePath" is the target path within the remote entity\'s tree.')
})
.action((entity, remotePath) => {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('delete', entityPath + '/simplefs/stats/' + resource, (code) => {
evaluateResponse(code)
})
})
program
.command('mount <entity> [mountpoint]')
.description('mounts the data directory of an entity to a local mountpoint')