This commit is contained in:
Tilman Kamp 2019-05-02 19:42:38 +02:00
Родитель cba2307103
Коммит f26a532474
1 изменённых файлов: 190 добавлений и 55 удалений

Просмотреть файл

@ -113,12 +113,15 @@ function callPit(verb, resource, content, callback, callOptions) {
if (callOptions && callOptions.offset) {
headers['Range'] = 'bytes=' + callOptions.offset + '-'
}
let creq = request[verb]({
let creqoptions = {
url: pitUrl + '/' + resource,
agentOptions: agentOptions,
headers: headers,
body: content ? JSON.stringify(content) : undefined
})
headers: headers
}
if (content && (typeof content.pipe != 'function')) {
creqoptions.content = JSON.stringify(content)
}
let creq = request[verb](creqoptions)
.on('error', err => fail('Unable to reach pit: ' + err.code))
.on('response', res => {
if (res.statusCode === 401) {
@ -147,6 +150,9 @@ function callPit(verb, resource, content, callback, callOptions) {
})
}
})
if (content && (typeof content.pipe == 'function')) {
content.pipe(creq)
}
}
function authenticate(username, password, callback) {
@ -552,6 +558,44 @@ function getResourcePath (remotePath) {
return remotePath ? (remotePath.startsWith('/') ? remotePath.slice(1) : remotePath) : ''
}
function copyContent (entity, remotePath, localPath) {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('get', entityPath + '/simplefs/stats/' + resource, (code, stats) => {
evaluateResponse(code)
if (stats.isFile) {
if (localPath) {
if (fs.existsSync(localPath)) {
let localStats = fs.statSync(localPath)
if (localStats.isDirectory()) {
let rname = remotePath.substring(remotePath.lastIndexOf('/') + 1)
if (rname.length > 0) {
localPath = path.join(localPath, rname)
} else {
fail('Cannot construct target filename.')
}
}
} else {
let dirname = path.dirname(localPath)
if (fs.existsSync(dirname)) {
if (!fs.statSync(dirname).isDirectory()) {
fail('Specified target directory is not a directory.')
}
} else {
fail('Target directory not existing.')
}
}
}
callPit('get', entityPath + '/simplefs/content/' + resource, (code, res) => {
evaluateResponse(code)
res.pipe(localPath ? fs.createWriteStream(localPath) : process.stdout)
}, { asStream: true }) // offset: offset
} else {
fail('Command only supports file transfers.')
}
})
}
program
.version('0.0.1')
@ -996,24 +1040,6 @@ program
})
})
program
.command('download <jobNumber>')
.description('downloads job directory as .tar.gz archive')
.on('--help', function() {
printIntro()
printExample('pit download 1234')
})
.action((jobNumber) => {
let filename = 'job' + jobNumber + '.tar.gz'
if (fs.existsSync(filename)) {
fail('Unable to download: File "' + filename + '" already exists')
}
callPit('get', 'jobs/' + jobNumber + '/targz', (code, res) => {
evaluateResponse(code)
res.pipe(fs.createWriteStream(filename))
}, { asStream: true })
})
program
.command('ls <entity> [remotePath]')
.description('lists contents within a job directory')
@ -1050,57 +1076,166 @@ program
})
program
.command('cp <entity> <remotePath> <fsPath>')
.description('copies contents from job directory to local file system')
.command('pull <entity> <remotePath> [localPath]')
.alias('cp')
.option('-f, --force', 'will overwrite existing target file if existing - always starting download from scratch')
.option('-c, --continue', 'will try to continue interrupted download - starting from scratch, if target is not existing')
.description('copies contents from an entity\'s file to a local file or stdout')
.on('--help', function() {
printIntro()
printExample('pit cp job:1234 keep/checkpoint-0001.bin ./checkpoint.bin')
printExample('pit cp home data/corpus.data ./corpus.data')
printExample('pit pull job:1234 keep/checkpoint-0001.bin ./checkpoint.bin')
printExample('pit pull home data/corpus.data ./corpus.data')
printLine()
printLine('"entity" is the entity whose data directory should be accessed')
printEntityHelp('home', entityUser, entityJob, entityGroup, 'shared')
printLine('"remotePath" is the source path within the remote data directory.')
printLine('"fsPath" is the destination path within local filesystem.')
printLine('"localPath" is the destination path within the local filesystem. If omitted, data will be written to stdout.')
})
.action((entity, remotePath, fsPath) => {
.action(copyContent)
program
.command('cat <entity> <remotePath>')
.description('copies contents from an entity\'s directory to stdout')
.on('--help', function() {
printIntro()
printExample('pit cat job:1234 keep/results.txt')
printExample('pit cat home data/some.txt')
printLine()
printLine('"entity" is the entity whose data directory should be accessed')
printEntityHelp('home', entityUser, entityJob, entityGroup, 'shared')
printLine('"remotePath" is the source path within the remote data directory.')
})
.action(copyContent)
program
.command('push <entity> <remotePath> [localPath]')
.option('-f, --force', 'will overwrite existing target file if existing - always starting upload from scratch')
.option('-c, --continue', 'will try to continue interrupted upload - starting from scratch, if target is not existing')
.description('copies contents from stdin or local file system to a file in an entity\'s tree')
.on('--help', function() {
printIntro()
printExample('pit push group:students some/dir/data.bin ./data.bin')
printExample('generate-some-data.py | pit push home keeping/some.data')
printLine()
printLine('"entity" is the entity whose data directory should be targeted')
printEntityHelp('home', entityUser, entityGroup)
printLine('"remotePath" is the target path within the remote entity\'s directory.')
printLine('"localPath" is the path to a source file within the local filesystem. If omitted, data will be read from stdin.')
})
.action((entity, remotePath, localPath, options) => {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('get', entityPath + '/simplefs/stats/' + resource, (code, stats) => {
evaluateResponse(code)
if (stats.isFile) {
let offset = 0
if (fs.existsSync(fsPath)) {
let localStats = fs.statSync(fsPath)
if (localStats.isDirectory()) {
let rname = remotePath.substring(remotePath.lastIndexOf('/') + 1)
if (rname.length > 0) {
fsPath = path.join(fsPath, rname)
} else {
fail('Cannot construct target filename.')
let localStats
if (localPath) {
if (fs.existsSync(localPath)) {
localStats = fs.statSync(localPath)
} else {
fail('Source file not found.')
}
}
let transferContent = (offset) => {
let targetPath = entityPath + '/simplefs/content/' + resource
if (localStats) {
const blockSize = 1024 * 1024
let toTransfer = localStats.size - offset
let blocks = Math.floor(toTransfer / blockSize) + 1
let block = 0
let transferBlock = () => {
let blockOffset = offset + block * blockSize
let blockStream = fs.createReadStream(localPath, {
start: blockOffset,
end: blockOffset + blockSize - 1
})
block++
callPit('put', targetPath, blockStream, (code, res) => {
evaluateResponse(code)
if (block < blocks) {
transferBlock()
}
} else if (localStats.isFile()) {
offset = localStats.size
}
} else {
let dirname = path.dirname(fsPath)
if (fs.existsSync(dirname)) {
if (!fs.statSync(dirname).isDirectory()) {
fail('Target directory not a directory.')
}, { header: { 'Content-Offset': blockOffset } })
}
} else {
callPit('put', targetPath, process.stdin, (code, res) => {
evaluateResponse(code)
})
}
}
callPit('get', entityPath + '/simplefs/stats/' + resource, (code, stats) => {
if (code === 404) {
console.log('Remote file not existing - creating...')
callPit('put', entityPath + '/simplefs/stats/' + resource, (code, res) => {
evaluateResponse(code)
transferContent(0)
})
} else {
evaluateResponse(code)
if (stats.isFile) {
if (stats.size < size) {
if (options.continue) {
console.log('Remote file smaller than the local one - continuing upload...')
transferContent(stats.size)
} else {
if (options.force) {
console.log('Remote file existing - re-uploading...')
transferContent(0)
} else {
fail('Remote file existing.')
}
}
} else {
fail('Target directory not existing.')
if (options.force) {
console.log('Remote file is of same size or bigger than the local one - re-uploading...')
transferContent(0)
} else {
fail('Remote file is of same size or bigger than the local one.')
}
}
} else {
fail('Target path is a directory.')
}
callPit('get', entityPath + '/simplefs/content/' + resource, (code, res) => {
evaluateResponse(code)
res.pipe(fs.createWriteStream(fsPath))
}, { asStream: true }) // offset: offset
} else {
fail('At the moment only file copying is supported.')
}
})
})
program
.command('mkdir <entity> <remotePath>')
.description('creates an entity directory')
.on('--help', function() {
printIntro()
printExample('pit mkdir group:students some/dir')
printLine()
printLine('"entity" is the entity whose data directory should be targeted')
printEntityHelp('home', entityUser, entityGroup)
printLine('"remotePath" is the target path within the remote entity\'s tree.')
})
.action((entity, remotePath) => {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('put', entityPath + '/simplefs/stats/' + resource, (code) => {
evaluateResponse(code)
})
})
program
.command('delete <entity> <remotePath>')
.description('deletes a file or directory within an entity\'s tree')
.on('--help', function() {
printIntro()
printExample('pit delete group:students some/dir')
printExample('pit delete home some/file.txt')
printLine()
printLine('"entity" is the entity whose data directory should be targeted')
printEntityHelp('home', entityUser, entityGroup)
printLine('"remotePath" is the target path within the remote entity\'s tree.')
})
.action((entity, remotePath) => {
let entityPath = getEntityPath(entity)
let resource = getResourcePath(remotePath)
callPit('delete', entityPath + '/simplefs/stats/' + resource, (code) => {
evaluateResponse(code)
})
})
program
.command('mount <entity> [mountpoint]')
.description('mounts the data directory of an entity to a local mountpoint')