* Refactoring major batch/storage rest proxy for future use

* Added httpTraffic back to azure calls

* file operations

* Removed unneeded files that were moved to doAzureParallel

* Revised lintr rules

* Separated out container operations

* Added progress bar to storage, moved operations to appropriate files

* Fixed upload functions

* upload blob with httpTrafficFlag correct

* Ramped up description to v0.5.0
This commit is contained in:
Brian 2017-09-07 12:57:06 -07:00 коммит произвёл GitHub
Родитель 9a4e12024b
Коммит a811d2a286
21 изменённых файлов: 916 добавлений и 1075 удалений

Просмотреть файл

@ -7,6 +7,3 @@ warnings_are_errors: false
r_github_packages:
- jimhester/lintr
after_success:
- Rscript -e 'lintr::lint_package()'

0
CHANGELOG.md Normal file
Просмотреть файл

Просмотреть файл

@ -1,7 +1,7 @@
Package: rAzureBatch
Type: Package
Title: rAzureBatch
Version: 0.4.0
Version: 0.5.0
Author: Brian Hoang
Maintainer: Brian Hoang <brhoan@microsoft.com>
Description: The project is for data experts who use R at scale. The project

Просмотреть файл

@ -1,41 +1,179 @@
createSignature <- function(requestMethod, headerList=character()) {
headers <- c('Content-Encoding',
'Content-Language',
'Content-Length',
'Content-MD5',
'Content-Type',
'Date',
'If-Modified-Since',
'If-Match',
'If-None-Match',
'If-Unmodified-Since',
'Range' )
createSignature <-
function(requestMethod, headerList = character()) {
headers <- c(
'Content-Encoding',
'Content-Language',
'Content-Length',
'Content-MD5',
'Content-Type',
'Date',
'If-Modified-Since',
'If-Match',
'If-None-Match',
'If-Unmodified-Since',
'Range'
)
stringToSign <- paste0(requestMethod, "\n")
stringToSign <- paste0(requestMethod, "\n")
for(name in headers){
temp <- ifelse(!is.na(headerList[name]), headerList[name], "")
for (name in headers) {
temp <- ifelse(!is.na(headerList[name]), headerList[name], "")
stringToSign <- paste0(stringToSign, temp, "\n")
stringToSign <- paste0(stringToSign, temp, "\n")
}
return(stringToSign)
}
return(stringToSign)
AzureRequest <- setRefClass(
"AzureRequest",
fields = list(
method = "character",
path = "character",
headers = "character",
query = "list",
body = "list",
url = "character"
),
methods = list(
signString = function(x, key) {
undecodedKey <- RCurl::base64Decode(key, mode = "raw")
RCurl::base64(digest::hmac(
key = undecodedKey,
object = enc2utf8(x),
algo = "sha256",
raw = TRUE
))
}
)
)
signAzureRequest <- function(request, resource, key, prefix) {
headers <- request$headers
canonicalizedHeaders <- ""
for (name in sort(names(headers))) {
if (grepl(prefix, name)) {
canonicalizedHeaders <-
paste0(canonicalizedHeaders, name, ":", headers[name], "\n")
}
}
canonicalizedResource <- paste0("/", resource, request$path, "\n")
if (!is.null(names(request$query))) {
for (name in sort(names(request$query))) {
canonicalizedResource <-
paste0(canonicalizedResource, name, ":", request$query[[name]], "\n")
}
}
canonicalizedResource <-
substr(canonicalizedResource, 1, nchar(canonicalizedResource) - 1)
stringToSign <- createSignature(request$method, request$headers)
stringToSign <- paste0(stringToSign, canonicalizedHeaders)
stringToSign <- paste0(stringToSign, canonicalizedResource)
# sign the request
authorizationString <-
paste0("SharedKey ",
resource,
":",
request$signString(stringToSign, key))
return(authorizationString)
}
AzureRequest <- setRefClass("AzureRequest",
fields = list(
method = "character",
path = "character",
headers = "character",
query = "list"),
executeAzureRequest <- function(request, ...) {
args <- list(...)
methods = list(
signString = function(x, key){
undecodedKey <- RCurl::base64Decode(key, mode="raw")
newString<-RCurl::base64(
digest::hmac(key=undecodedKey,
object=enc2utf8(x),
algo= "sha256", raw=TRUE)
)
}
))
body <- NULL
httpTraffic <- NULL
write <- NULL
progressBar <- NULL
httpTrafficFlag <- getOption("azureHttpTraffic")
if (length(request$body) != 0) {
body <- request$body
}
if (hasArg("uploadFile")) {
body <- args$uploadFile
}
if (hasArg("body")) {
body <- args$body
}
if (hasArg("write")) {
write <- args$write
}
if (hasArg("progress") && args$progress) {
progressBar <- httr::progress()
}
if (!is.null(httpTrafficFlag) && httpTrafficFlag) {
httpTraffic <- httr::verbose()
}
requestHeaders <- httr::add_headers(request$headers)
# Execute request with http method
if (request$method == "GET" ||
request$method == "POST" ||
request$method == "DELETE" ||
request$method == "PUT" ||
request$method == "PATCH") {
httr::VERB(
request$method,
request$url,
config = requestHeaders,
body = body,
query = request$query,
encode = "json",
write,
httpTraffic,
progressBar
)
}
else if (request$method == "HEAD") {
httr::HEAD(
request$url,
config = requestHeaders,
body = body,
query = request$query,
encode = "json",
write,
httpTraffic,
progressBar
)
}
else {
stop(
sprintf(
"This HTTP Verb is not found: %s - Please try again with GET, POST, HEAD, PUT, PATCH or DELETE",
request$method
)
)
}
}
extractAzureResponse <- function(response, content) {
if (is.null(content)) {
httr::content(response, encoding = "UTF-8")
}
else if (content %in% c("raw", "text", "parsed")) {
httr::content(response, content, encoding = "UTF-8")
}
else if (content == "response") {
response
}
# Legacy code: By default it will, automatically attempt
# figure out which one is most appropriate
else {
httr::content(response, encoding = "UTF-8")
}
}

Просмотреть файл

@ -1,101 +1,69 @@
apiVersion <- "2017-05-01.5.0"
getBatchCredentials <- function(configPath = "az_config.json", ...){
getBatchCredentials <- function(configPath = "az_config.json", ...) {
config <- getOption("az_config")
if(!is.null(config) &&!is.null(config$batchAccount)){
if (!is.null(config) && !is.null(config$batchAccount)) {
batchAccount <- config$batchAccount
credentials <- BatchCredentials$new(name=batchAccount$name, key=batchAccount$key, url=batchAccount$url)
credentials <-
BatchCredentials$new(name = batchAccount$name,
key = batchAccount$key,
url = batchAccount$url)
}
else{
config <- rjson::fromJSON(file = configPath)
credentials <- BatchCredentials$new(name=config$batchAccount$name, key=config$batchAccount$key, url=config$batchAccount$url)
credentials <-
BatchCredentials$new(
name = config$batchAccount$name,
key = config$batchAccount$key,
url = config$batchAccount$url
)
}
credentials
}
BatchCredentials <- setRefClass("BatchCredentials",
fields = list(name = "character", key = "character", url="character"),
methods = list(
signString = function(x){
undecodedKey <- RCurl::base64Decode(key, mode="raw")
newString<-RCurl::base64(
digest::hmac(key=undecodedKey,
object=enc2utf8(x),
algo= "sha256", raw=TRUE)
)
}
))
callBatchService <- function(request, credentials, body = NULL, writeFlag = FALSE, verbose = FALSE, ...){
args <- list(...)
contentType = args$contentType
requestdate <- httr::http_date(Sys.time())
headers <- request$headers
headers['ocp-date'] <- requestdate
canonicalizedHeaders <- ""
for(name in sort(names(headers))){
if(grepl('ocp-', name)){
canonicalizedHeaders <- paste0(canonicalizedHeaders, name,":", headers[name], "\n")
BatchCredentials <- setRefClass(
"BatchCredentials",
fields = list(name = "character", key = "character", url = "character"),
methods = list(
signString = function(x) {
undecodedKey <- RCurl::base64Decode(key, mode = "raw")
RCurl::base64(digest::hmac(
key = undecodedKey,
object = enc2utf8(x),
algo = "sha256",
raw = TRUE
))
}
}
)
)
canonicalizedHeaders <- substr(canonicalizedHeaders, 1, nchar(canonicalizedHeaders) - 1)
prepareBatchRequest <- function(request, credentials) {
requestdate <- httr::http_date(Sys.time())
request$headers['ocp-date'] <- requestdate
canonicalizedResource <- paste0("/", credentials$name, request$path, "\n")
for (name in sort(names(request$query))) {
canonicalizedResource <- paste0(canonicalizedResource, name,":", request$query[[name]], "\n")
}
authorizationHeader <-
signAzureRequest(request, credentials$name, credentials$key, 'ocp-')
canonicalizedResource <- substr(canonicalizedResource, 1, nchar(canonicalizedResource) - 1)
request$headers['Authorization'] <- authorizationHeader
request$headers['User-Agent'] <-
paste0(
"rAzureBatch/",
packageVersion("rAzureBatch"),
";",
"doAzureParallel/",
packageVersion("doAzureParallel")
)
stringToSign <- createSignature(request$method, request$headers)
stringToSign <- paste0(stringToSign, canonicalizedHeaders, "\n")
stringToSign <- paste0(stringToSign, canonicalizedResource)
request$url <- paste0(credentials$url, request$path)
authString <- sprintf("SharedKey %s:%s", credentials$name, credentials$signString(stringToSign))
headers['Authorization'] <- authString
requestHeaders <- httr::add_headers(.headers = headers, "User-Agent"=paste0("rAzureBatch/", packageVersion("rAzureBatch"), ";", "doAzureParallel/", packageVersion("doAzureParallel")))
response <- ""
url <- paste0(credentials$url, request$path)
config <- getOption("az_config")
verbose <- ifelse(!is.null(config) && !is.null(config$settings),
config$settings$verbose,
getOption("verbose"))
write <- if(writeFlag) { httr::write_memory() } else { NULL }
verboseMode <- if(getOption("verbose")){ httr::verbose() } else { NULL }
if (verbose) {
print(stringToSign)
print(url)
print(paste0("Auth String: ", authString))
print(requestHeaders)
}
response <- httr::VERB(request$method,
url,
config = requestHeaders,
verboseMode,
write,
query = request$query,
body = body,
encode = "json")
if(!is.null(contentType) && contentType){
httr::content(response, as = "text")
}
else{
httr::content(response)
}
request
}
callBatchService <- function(request, credentials, content, ...){
request <- prepareBatchRequest(request, credentials)
response <- executeAzureRequest(request, ...)
extractAzureResponse(response, content)
}

91
R/blob_operations.R Normal file
Просмотреть файл

@ -0,0 +1,91 @@
listBlobs <- function(containerName, prefix = "", content = "parsed", ...) {
query <- list('restype' = "container", 'comp' = "list", 'prefix' = prefix)
request <- AzureRequest$new(
method = "GET",
path = paste0("/", containerName),
query = query
)
callStorage(request, content, ...)
}
deleteBlob <-
function(containerName, blobName, content = "parsed", ...) {
request <- AzureRequest$new(method = "DELETE",
path = paste0("/", containerName, "/", blobName))
callStorage(request, content, ...)
}
uploadBlob <-
function(containerName,
fileDirectory,
parallelThreads = 1,
content = "response",
...) {
args <- list(...)
if (file.exists(fileDirectory)) {
fileSize <- file.size(fileDirectory)
}
else if (file.exists(paste0(getwd(), "/", fileDirectory))) {
fileDirectory <- paste0(getwd(), "/", fileDirectory)
fileSize <- file.size(fileDirectory)
}
else{
stop("The given file does not exist.")
}
# file size is less than 64 mb
if (fileSize < (1024 * 1024 * 64)) {
endFile <- httr::upload_file(fileDirectory)
headers <- c()
headers['Content-Length'] <- fileSize
headers['Content-Type'] <- endFile$type
headers['x-ms-blob-type'] <- 'BlockBlob'
blobName <- basename(fileDirectory)
if (!is.null(args$remoteName)) {
blobName <- args$remoteName
}
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName, "/", blobName),
headers = headers
)
callStorage(request, content, uploadFile = endFile, ...)
}
else {
uploadChunk(containerName, fileDirectory, content = content, parallelThreads = parallelThreads, ...)
}
}
downloadBlob <- function(containerName,
blobName,
overwrite = FALSE,
downloadPath = NULL,
progress = FALSE,
...) {
if (!is.null(downloadPath)) {
write <- httr::write_disk(downloadPath, overwrite)
}
else {
write <- httr::write_memory()
}
request <- AzureRequest$new(method = "GET",
path = paste0("/", containerName, "/", blobName))
if (grepl(".txt", blobName)) {
content = "text"
}
else {
content = NULL
}
callStorage(request, content = content, write = write, progress = progress, ...)
}

34
R/container_operations.R Normal file
Просмотреть файл

@ -0,0 +1,34 @@
listContainers <- function(prefix = "", content = "parsed", ...) {
query <- list('comp' = "list", 'prefix' = prefix)
request <- AzureRequest$new(method = "GET",
path = paste0("/"),
query = query)
callStorage(request, content, ...)
}
deleteContainer <- function(containerName, content = "parsed", ...) {
query <- list('restype' = "container")
request <- AzureRequest$new(
method = "DELETE",
path = paste0("/", containerName),
query = query
)
callStorage(request, content, ...)
}
createContainer <-
function(containerName, content = "parsed", ...) {
query <- list('restype' = "container")
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName),
query = query
)
callStorage(request, content, ...)
}

83
R/file_operations.R Normal file
Просмотреть файл

@ -0,0 +1,83 @@
getNodeFile <-
function(poolId,
nodeId,
filePath,
content = "parsed",
downloadPath = NULL,
overwrite = FALSE,
...) {
batchCredentials <- getBatchCredentials()
args <- list(...)
verb <- "GET"
if (!is.null(args$verb) && args$verb == "HEAD") {
verb <- args$verb
}
if (!is.null(downloadPath)) {
write <- httr::write_disk(downloadPath, overwrite)
}
else {
write <- httr::write_memory()
}
progress <- NULL
if (!is.null(args$progress)) {
progress <- args$progress
}
request <- AzureRequest$new(
method = verb,
path = paste0("/pools/", poolId, "/nodes/", nodeId, "/files/", filePath),
query = list("api-version" = apiVersion)
)
callBatchService(request,
batchCredentials,
content,
write = write,
progress = progress,
...)
}
getTaskFile <-
function(jobId,
taskId,
filePath,
content = "parsed",
downloadPath = NULL,
overwrite = FALSE,
...) {
batchCredentials <- getBatchCredentials()
args <- list(...)
verb <- "GET"
if (!is.null(args$verb) && args$verb == "HEAD") {
verb <- args$verb
}
progress <- NULL
if (!is.null(args$progress)) {
progress <- args$progress
}
if (!is.null(downloadPath)) {
write <- httr::write_disk(downloadPath, overwrite)
}
else {
write <- httr::write_memory()
}
request <- AzureRequest$new(
method = verb,
path = paste0("/jobs/", jobId, "/tasks/", taskId, "/files/", filePath),
query = list("api-version" = apiVersion)
)
callBatchService(request,
batchCredentials,
content,
write = write,
progress = progress,
...)
}

Просмотреть файл

@ -1,25 +1,10 @@
#' Add a job to the specified pool.
#'
#' @param jobId A string that uniquely identifies the job within the account.
#' @param ... Further named parameters
#' \itemize{
#' \item{"resourceFiles"}: {A list of files that the Batch service will download to the compute node before running the command line.}
#' \item{"args"}: {Arguments in the foreach parameters that will be used for the task running.}
#' \item{"packages"}: {A list of packages that the Batch service will download to the compute node.}
#' \item{"envir"}: {The R environment that the task will run under.}
#'}
#' @return The request to the Batch service was successful.
#' @export
addJob <- function(jobId,
poolInfo,
jobPreparationTask = NULL,
usesTaskDependencies = FALSE,
raw = FALSE,
content = "parsed",
...) {
args <- list(...)
batchCredentials <- getBatchCredentials()
storageCredentials <- getStorageCredentials()
body <- list(
id = jobId,
@ -42,21 +27,14 @@ addJob <- function(jobId,
method = "POST",
path = "/jobs",
query = list("api-version" = apiVersion),
headers = headers
headers = headers,
body = body
)
callBatchService(request, batchCredentials, body, contentType = raw)
callBatchService(request, batchCredentials, content)
}
#' Gets information about the specified job.
#'
#' @param jobId The id of the job.
#'
#' @return A response containing the job.
#' @examples
#' getJob(job-001)
#' @export
getJob <- function(jobId){
getJob <- function(jobId, content = "parsed") {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
@ -65,18 +43,10 @@ getJob <- function(jobId){
query = list("api-version" = apiVersion)
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}
#' Deletes a job.
#' @details Deleting a job also deletes all tasks that are part of that job, and all job statistics. This also overrides the retention period for task data; that is, if the job contains tasks which are still retained on compute nodes, the Batch services deletes those tasks' working directories and all their contents.
#' @param jobId The id of the job to delete..
#'
#' @return The request to the Batch service was successful.
#' @examples
#' deleteJob(job-001)
#' @export
deleteJob <- function(jobId){
deleteJob <- function(jobId, content = "parsed") {
batchCredentials <- getBatchCredentials()
headers <- c()
@ -89,7 +59,7 @@ deleteJob <- function(jobId){
headers = headers
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}
#' Updates the properties of the specified job.
@ -98,24 +68,50 @@ deleteJob <- function(jobId){
#' @param ... Additional parameters to customize update the job
#' @return The request to the Batch service was successful.
#' @export
updateJob <- function(jobId, ...) {
updateJob <- function(jobId, content = "parsed", ...) {
batchCredentials <- getBatchCredentials()
headers <- character()
body = list(onAllTasksComplete = "terminatejob")
body <- list(onAllTasksComplete = "terminatejob")
size <-
nchar(jsonlite::toJSON(body, method = "C", auto_unbox = TRUE))
headers['Content-Length'] <- size
headers['Content-Type'] <-
'application/json;odata=minimalmetadata'
request <- AzureRequest$new(
method = "PATCH",
path = paste0("/jobs/", jobId),
query = list("api-version" = apiVersion),
headers = headers
headers = headers,
body = body
)
callBatchService(request, batchCredentials, body)
callBatchService(request, batchCredentials, content)
}
listJobs <- function(query = list(), content = "parsed") {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
method = "GET",
path = paste0("/jobs"),
query = append(list("api-version" = apiVersion), query)
)
callBatchService(request, batchCredentials, content)
}
getJobPreparationStatus <- function(jobId, content = "parsed") {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
method = "GET",
path = paste0("/jobs/", jobId, "jobpreparationandreleasetaskstatus"),
query = list("api-version" = apiVersion)
)
callBatchService(request, batchCredentials, content)
}

Просмотреть файл

@ -1,45 +1,42 @@
addPool <- function(poolId, vmSize, ...){
addPool <- function(poolId, vmSize, content = "parsed", ...) {
args <- list(...)
raw <- FALSE
if(!is.null(args$raw)){
raw <- args$raw
}
commands <- c(
"export PATH=/anaconda/envs/py35/bin:$PATH",
"env PATH=$PATH pip install --no-dependencies blobxfer"
)
commands <- c("export PATH=/anaconda/envs/py35/bin:$PATH",
"env PATH=$PATH pip install --no-dependencies blobxfer")
if(!is.null(args$packages)){
if (!is.null(args$packages)) {
commands <- c(commands, args$packages)
}
autoscaleFormula <- ""
if(!is.null(args$autoscaleFormula)){
if (!is.null(args$autoscaleFormula)) {
autoscaleFormula <- args$autoscaleFormula
}
startTask <- NULL
if(!is.null(args$startTask)){
if (!is.null(args$startTask)) {
startTask <- args$startTask
}
virtualMachineConfiguration <- NULL
if(!is.null(args$virtualMachineConfiguration)){
if (!is.null(args$virtualMachineConfiguration)) {
virtualMachineConfiguration <- args$virtualMachineConfiguration
}
maxTasksPerNode <- ""
if(!is.null(args$maxTasksPerNode)){
if (!is.null(args$maxTasksPerNode)) {
maxTasksPerNode <- args$maxTasksPerNode
}
enableAutoScale <- FALSE
if(!is.null(args$enableAutoScale)){
if (!is.null(args$enableAutoScale)) {
enableAutoScale <- args$enableAutoScale
}
autoScaleEvaluationInterval <- "PT5M"
if(!is.null(args$autoScaleEvaluationInterval)){
if (!is.null(args$autoScaleEvaluationInterval)) {
autoScaleEvaluationInterval <- args$autoScaleEvaluationInterval
}
@ -47,34 +44,39 @@ addPool <- function(poolId, vmSize, ...){
batchCredentials <- getBatchCredentials()
body = list(vmSize = vmSize,
id = poolId,
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = enableAutoScale,
autoScaleFormula = autoscaleFormula,
autoScaleEvaluationInterval = autoScaleEvaluationInterval,
maxTasksPerNode = maxTasksPerNode)
body <- list(
vmSize = vmSize,
id = poolId,
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = enableAutoScale,
autoScaleFormula = autoscaleFormula,
autoScaleEvaluationInterval = autoScaleEvaluationInterval,
maxTasksPerNode = maxTasksPerNode
)
body <- Filter(length, body)
size <- nchar(jsonlite::toJSON(body, method="C", auto_unbox = TRUE))
size <-
nchar(jsonlite::toJSON(body, method = "C", auto_unbox = TRUE))
headers <- c()
headers['Content-Length'] <- size
headers['Content-Type'] <- 'application/json;odata=minimalmetadata'
headers['Content-Type'] <-
'application/json;odata=minimalmetadata'
request <- AzureRequest$new(
method = "POST",
path = "/pools",
query = list("api-version" = apiVersion),
headers = headers
headers = headers,
body = body
)
callBatchService(request, batchCredentials, body, contentType = raw)
callBatchService(request, batchCredentials, content)
}
deletePool <- function(poolId = ""){
deletePool <- function(poolId, content = "parsed") {
batchCredentials <- getBatchCredentials()
headers <- c()
@ -87,51 +89,51 @@ deletePool <- function(poolId = ""){
headers = headers
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}
getPool <- function(poolId){
batchCredentials = getBatchCredentials()
getPool <- function(poolId, content = "parsed") {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
method = "GET",
path = paste0("/pools/", poolId),
query = list("api-version" = apiVersion))
query = list("api-version" = apiVersion)
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}
resizePool <- function(poolId, ...){
batchCredentials = getBatchCredentials()
args = list(...)
resizePool <- function(poolId, content = "parsed", ...) {
batchCredentials <- getBatchCredentials()
args <- list(...)
autoscaleFormula <- ""
if(!is.null(args$autoscaleFormula)){
autoscaleFormula <- .getFormula(args$autoscaleFormula)
}
autoscaleInterval <- ""
if(!is.null(args$autoscaleInterval)){
autoscaleFormula <- .getFormula(args$autoscaleInterval)
if (!is.null(args$autoscaleFormula)) {
autoscaleFormula <- args$autoscaleFormula
}
body <- list("autoScaleFormula" = autoscaleFormula)
size <- nchar(jsonlite::toJSON(body, method="C", auto_unbox = TRUE))
size <-
nchar(jsonlite::toJSON(body, method = "C", auto_unbox = TRUE))
headers <- character()
headers['Content-Type'] <- 'application/json;odata=minimalmetadata'
headers['Content-Type'] <-
'application/json;odata=minimalmetadata'
headers['Content-Length'] <- size
request <- AzureRequest$new(
method = "POST",
path = paste0("/pools/", poolId, "/evaluateautoscale"),
query = list("api-version" = apiVersion),
headers = headers)
headers = headers,
body = body
)
callBatchService(request, batchCredentials, body)
callBatchService(request, batchCredentials, content)
}
listPoolNodes <- function(poolId, ...){
listPoolNodes <- function(poolId, content = "parsed", ...) {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
@ -140,18 +142,29 @@ listPoolNodes <- function(poolId, ...){
query = list("api-version" = apiVersion)
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}
listJobs <- function(query = list()){
rebootNode <- function(poolId, nodeId, content = "parsed", ...) {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
method = "GET",
path = paste0("/jobs"),
query = append(list("api-version" = apiVersion), query)
method = "POST",
path = paste0("/pools/", poolId, "/nodes/", nodeId, "/reboot"),
query = list("api-version" = apiVersion)
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}
reimageNode <- function(poolId, nodeId, content = "parsed", ...) {
batchCredentials <- getBatchCredentials()
request <- AzureRequest$new(
method = "POST",
path = paste0("/pools/", poolId, "/nodes/", nodeId, "/reimage"),
query = list("api-version" = apiVersion)
)
callBatchService(request, batchCredentials, content)
}

Просмотреть файл

@ -16,11 +16,13 @@ signed_services <- 'ss'
signed_ip <- 'si'
signed_version <- 'sv'
createSasToken <- function(permission, sr, path,
start = Sys.time() - 60*60*24*1,
end = Sys.time() + 60*60*24*2){
createSasToken <- function(permission,
sr,
path,
start = Sys.time() - 60 * 60 * 24 * 1,
end = Sys.time() + 60 * 60 * 24 * 2) {
myList <- list()
query = c()
query <- c()
startTime <- as.POSIXlt(start, "UTC", "%Y-%m-%dT%H:%M:%S")
startTime <- paste0(strftime(startTime, "%Y-%m-%dT%H:%I:%SZ"))
@ -42,85 +44,120 @@ createSasToken <- function(permission, sr, path,
myList[[signed_permission]] <- permission
storageCredentials <- getStorageCredentials()
canonicalizedResource <- paste0("/blob/", storageCredentials$name, "/", path, "\n")
canonicalizedResource <-
paste0("/blob/", storageCredentials$name, "/", path, "\n")
stringToSign <- paste0(getValueFromQuery(query, signed_permission))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_start))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_expiry))
stringToSign <-
paste0(getValueFromQuery(query, signed_permission))
stringToSign <-
paste0(stringToSign, getValueFromQuery(query, signed_start))
stringToSign <-
paste0(stringToSign, getValueFromQuery(query, signed_expiry))
stringToSign <- paste0(stringToSign, canonicalizedResource)
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_identifier))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_ip))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_protocol))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_version))
stringToSign <-
paste0(stringToSign, getValueFromQuery(query, signed_identifier))
stringToSign <-
paste0(stringToSign, getValueFromQuery(query, signed_ip))
stringToSign <-
paste0(stringToSign, getValueFromQuery(query, signed_protocol))
stringToSign <-
paste0(stringToSign, getValueFromQuery(query, signed_version))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_cache_control))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_content_disposition))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_content_encoding))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_content_language))
stringToSign <- paste0(stringToSign, getValueFromQuery(query, signed_content_type))
stringToSign <-
paste0(stringToSign,
getValueFromQuery(query, signed_cache_control))
stringToSign <-
paste0(stringToSign,
getValueFromQuery(query, signed_content_disposition))
stringToSign <-
paste0(stringToSign,
getValueFromQuery(query, signed_content_encoding))
stringToSign <-
paste0(stringToSign,
getValueFromQuery(query, signed_content_language))
stringToSign <-
paste0(stringToSign,
getValueFromQuery(query, signed_content_type))
stringToSign <- substr(stringToSign, 1, nchar(stringToSign) - 1)
config <- getOption("az_config")
if(!is.null(config) && !is.null(config$settings)){
if (!is.null(config) && !is.null(config$settings)) {
verbose <- config$settings$verbose
}
else{
verbose <- getOption("verbose")
}
if(verbose){
if (verbose) {
print(stringToSign)
}
undecodedKey <- RCurl::base64Decode(storageCredentials$key, mode="raw")
encString <- RCurl::base64(
digest::hmac(key=undecodedKey,
object=enc2utf8(stringToSign),
algo= "sha256", raw=TRUE)
)
undecodedKey <-
RCurl::base64Decode(storageCredentials$key, mode = "raw")
encString <- RCurl::base64(digest::hmac(
key = undecodedKey,
object = enc2utf8(stringToSign),
algo = "sha256",
raw = TRUE
))
myList[[signed_signature]] <- encString
myList
}
getValueFromQuery <- function(query, header){
getValueFromQuery <- function(query, header) {
value <- "\n"
if(!is.na(query[header])){
if (!is.na(query[header])) {
value <- paste0(query[header], "\n")
}
value
}
createResourceFile <- function(url, fileName){
resourceFile <-list(
blobSource = url,
filePath = fileName
)
createResourceFile <- function(url, fileName) {
list(blobSource = url,
filePath = fileName)
}
createBlobUrl <- function(storageAccount, containerName, fileName = NULL, sasToken){
if(is.null(fileName)){
url <- sprintf("https://%s.blob.core.windows.net/%s", storageAccount, containerName)
createBlobUrl <-
function(storageAccount,
containerName,
fileName = NULL,
sasToken) {
if (is.null(fileName)) {
url <-
sprintf("https://%s.blob.core.windows.net/%s",
storageAccount,
containerName)
}
else {
url <-
sprintf(
"https://%s.blob.core.windows.net/%s/%s",
storageAccount,
containerName,
fileName
)
}
queryParameterUrl <- "?"
for (query in names(sasToken)) {
queryParameterUrl <-
paste0(queryParameterUrl,
query,
"=",
RCurl::curlEscape(sasToken[[query]]),
"&")
}
queryParameterUrl <-
substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
url <- paste0(url, queryParameterUrl)
return(url)
}
else {
url <- sprintf("https://%s.blob.core.windows.net/%s/%s", storageAccount, containerName, fileName)
}
queryParameterUrl <- "?"
for(query in names(sasToken)){
queryParameterUrl <- paste0(queryParameterUrl, query, "=", RCurl::curlEscape(sasToken[[query]]), "&")
}
queryParameterUrl <- substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
url <- paste0(url, queryParameterUrl)
return(url)
}

Просмотреть файл

@ -1,508 +1,137 @@
storageVersion <- "2016-05-31"
getStorageCredentials <- function(configName = "az_config.json", ...){
config <- getOption("az_config")
getStorageCredentials <-
function(configName = "az_config.json", ...) {
config <- getOption("az_config")
if(!is.null(config) && !is.null(config$storageAccount)){
storageAccount <- config$storageAccount
credentials <- StorageCredentials$new(name=storageAccount$name, key=storageAccount$key)
}
else{
config <- rjson::fromJSON(file = paste0(getwd(), "/", configName))
credentials <- StorageCredentials$new(name=config$storageAccount$name, key=config$storageAccount$key)
if (!is.null(config) && !is.null(config$storageAccount)) {
storageAccount <- config$storageAccount
credentials <-
StorageCredentials$new(name = storageAccount$name, key = storageAccount$key)
}
else{
config <- rjson::fromJSON(file = paste0(getwd(), "/", configName))
credentials <-
StorageCredentials$new(name = config$storageAccount$name,
key = config$storageAccount$key)
}
credentials
}
credentials
}
StorageCredentials <- setRefClass(
"StorageCredentials",
fields = list(name = "character", key = "character"),
methods = list(
signString = function(x) {
undecodedKey <- RCurl::base64Decode(key, mode = "raw")
RCurl::base64(digest::hmac(
key = undecodedKey,
object = enc2utf8(x),
algo = "sha256",
raw = TRUE
))
}
)
)
StorageCredentials <- setRefClass("StorageCredentials",
fields = list(name = "character", key = "character"),
methods = list(
signString = function(x){
undecodedKey <- RCurl::base64Decode(key, mode="raw")
RCurl::base64(
digest::hmac(key=undecodedKey,
object=enc2utf8(x),
algo= "sha256", raw=TRUE)
)
}
))
callStorageSas <- function(request, accountName, body = NULL, sas_params, ...){
callStorageSas <- function(request, accountName, sasToken, ...) {
args <- list(...)
requestdate <- httr::http_date(Sys.time())
url <- sprintf("https://%s.blob.core.windows.net%s", accountName, request$path)
url <-
sprintf("https://%s.blob.core.windows.net%s",
accountName,
request$path)
headers <- request$headers
headers['x-ms-date'] <- requestdate
headers['x-ms-version'] <- storageVersion
request$query <- append(request$query, sas_params)
request$query <- append(request$query, sasToken)
requestHeaders <- httr::add_headers(.headers = headers)
response <- ""
config <- getOption("az_config")
body <- NULL
httpTraffic <- NULL
write <- NULL
verbose <- ifelse(!is.null(config) && !is.null(config$settings),
config$settings$verbose,
getOption("verbose"))
httpTrafficFlag <- getOption("azureHttpTraffic")
verbose <- getOption("azureVerbose")
verboseMode <- NULL
if(verbose){
if (!is.null(verbose) && verbose) {
print(headers)
print(paste0("URL: ", url))
#cat(content(response, "text"), "\n")
verboseMode <- verbose()
}
write <- NULL
if(!is.null(args$write)){
if (!is.null(httpTrafficFlag) && httpTrafficFlag) {
httpTraffic <- httr::verbose()
}
if (length(request$body) != 0) {
body <- request$body
}
if (hasArg("uploadFile")) {
body <- args$uploadFile
}
if (hasArg("body")) {
body <- args$body
}
if (!is.null(args$write)) {
write <- args$write
}
response <- httr::VERB(request$method,
url,
verboseMode,
write,
query = request$query,
config = requestHeaders,
body = body)
response <- httr::VERB(
request$method,
url,
httpTraffic,
write,
query = request$query,
config = requestHeaders,
body = body
)
httr::stop_for_status(response)
}
callStorage <- function(request, credentials, body = NULL, ...){
args <- list(...)
contentType = args$contentType
stringToSign <- createSignature(request$method, request$headers)
url <- sprintf("https://%s.blob.core.windows.net%s", credentials$name, request$path)
prepareStorageRequest <- function(request, credentials) {
requestdate <- httr::http_date(Sys.time())
request$headers['x-ms-date'] <- requestdate
request$headers['x-ms-version'] <- storageVersion
headers <- request$headers
headers['x-ms-date'] <- requestdate
headers['x-ms-version'] <- storageVersion
authorizationHeader <-
signAzureRequest(request, credentials$name, credentials$key, 'x-ms-')
canonicalizedHeaders <- ""
for(name in sort(names(headers))){
if(grepl('x-ms', name)){
canonicalizedHeaders <- paste0(canonicalizedHeaders, name,":", headers[name], "\n")
}
}
request$headers['Authorization'] <- authorizationHeader
request$headers['User-Agent'] <-
paste0("rAzureBatch/",
packageVersion("rAzureBatch"))
canonicalizedResource <- paste0("/", credentials$name, request$path, "\n")
if(!is.null(names(request$query))){
for(name in sort(names(request$query))){
canonicalizedResource <- paste0(canonicalizedResource, name,":", request$query[[name]], "\n")
}
}
request$url <-
sprintf("https://%s.blob.core.windows.net%s",
credentials$name,
request$path)
canonicalizedResource <- substr(canonicalizedResource, 1, nchar(canonicalizedResource) - 1)
stringToSign <- paste0(stringToSign, canonicalizedHeaders)
stringToSign <- paste0(stringToSign, canonicalizedResource)
# sign the request
authString<-paste0("SharedKey ", credentials$name, ":", credentials$signString(stringToSign))
headers['Authorization'] <- authString
requestHeaders <- httr::add_headers(.headers = headers, "User-Agent"="rAzureBatch/0.2.0")
config <- getOption("az_config")
if(!is.null(config) && !is.null(config$settings)){
verbose <- config$settings$verbose
}
else{
verbose <- getOption("verbose")
}
response <- ""
if(verbose){
print(paste0("Resource String: ", canonicalizedResource))
print(stringToSign)
print(headers)
print(paste0("URL: ", url))
response <- httr::VERB(request$method, url, query = request$query, config = requestHeaders, body=body, verbose())
cat(httr::content(response, "text"), "\n")
}
else{
response <- httr::VERB(request$method, url, query = request$query, config = requestHeaders, body=body)
}
if(!is.null(contentType) && contentType){
httr::content(response, as = "text")
}
else{
httr::content(response)
}
request
}
listBlobs <- function(containerName, sasToken = NULL, ...){
callStorage <- function(request, content = NULL, ...) {
args <- list(...)
if(!is.null(args$accountName)){
name <- args$accountName
}
else{
storageCredentials <- getStorageCredentials()
name <- storageCredentials$name
}
query <- list('restype' = "container", 'comp' = "list")
request <- AzureRequest$new(
method = "GET",
path = paste0("/", containerName),
query = query)
if(is.null(sasToken)){
callStorage(request, storageCredentials)
}
else{
callStorageSas(request, accountName = name, sas_params = sasToken)
}
}
listContainers <- function(){
storageCredentials <- getStorageCredentials()
query <- list('comp' = "list")
request <- AzureRequest$new(
method = "GET",
path = paste0("/"),
query = query)
callStorage(request, storageCredentials)
}
deleteContainer <- function(containerName){
storageCredentials <- getStorageCredentials()
query <- list('restype' = "container")
request <- AzureRequest$new(
method = "DELETE",
path = paste0("/", containerName),
query = query)
callStorage(request, storageCredentials)
}
createContainer <- function(containerName, ...){
args <- list(...)
raw <- FALSE
if(!is.null(args$raw)){
raw <- args$raw
}
storageCredentials <- getStorageCredentials()
query <- list('restype' = "container")
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName),
query = query)
callStorage(request, storageCredentials, contentType = raw)
}
deleteBlob <- function(containerName, blobName, sasToken = NULL, ...){
request <- AzureRequest$new(
method = "DELETE",
path = paste0("/", containerName, "/", blobName))
if(!is.null(sasToken)){
callStorageSas(request, args$accountName, sas_params = sasToken)
if (!is.null(args$sasToken) && !is.null(args$accountName)) {
response <-
callStorageSas(request, args$sasToken, args$accountName, ...)
}
else {
storageCredentials <- getStorageCredentials()
callStorage(request, storageCredentials)
}
}
uploadBlob <- function(containerName, fileDirectory, sasToken = NULL, parallelThreads = 1, ...){
args <- list(...)
if(!is.null(args$accountName)){
name <- args$accountName
}
else{
storageCredentials <- getStorageCredentials()
name <- storageCredentials$name
}
if(file.exists(fileDirectory)){
fileSize <- file.size(fileDirectory)
}
else if(file.exists(paste0(getwd(), "/", fileDirectory))){
fileDirectory <- paste0(getwd(), "/", fileDirectory)
fileSize <- file.size(fileDirectory)
}
else{
stop("The given file does not exist.")
}
# file size is less than 64 mb
if(fileSize < (1024 * 1024 * 64)){
endFile <- httr::upload_file(fileDirectory)
headers <- c()
headers['Content-Length'] <- fileSize
headers['Content-Type'] <- endFile$type
headers['x-ms-blob-type'] <- 'BlockBlob'
blobName <- basename(fileDirectory)
if(!is.null(args$remoteName)){
blobName <- args$remoteName
}
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName, "/", blobName),
headers = headers)
if(!is.null(sasToken)){
callStorageSas(request, name, body = endFile, sas_params = sasToken)
}
else {
callStorage(request, storageCredentials, body = endFile)
}
}
else{
if(!is.null(sasToken)){
uploadChunk(containerName, fileDirectory, parallelThreads = parallelThreads, sasToken = sasToken, accountName = name)
}
else {
uploadChunk(containerName, fileDirectory, parallelThreads = parallelThreads, ...)
}
}
}
uploadChunk <- function(containerName, fileDirectory, sasToken = NULL, ...){
args <- list(...)
if(!is.null(args$accountName)){
name <- args$accountName
}
else{
storageCredentials <- getStorageCredentials()
name <- storageCredentials$name
}
finfo <- file.info(fileDirectory)
to.read <- file(fileDirectory, "rb")
defaultSize <- 50000000
numOfChunks <- ceiling(finfo$size / defaultSize)
blockList <- c()
filePath <- strsplit(fileDirectory, "/")
filePath <- unlist(filePath)
blobName <- filePath[length(filePath)]
blobName <- basename(fileDirectory)
if(!is.null(args$remoteName)){
blobName <- args$remoteName
}
pb <- txtProgressBar(min = 0, max = numOfChunks, style = 3)
`%fun%` <- foreach::`%do%`
parallelThreads <- 1
if(!is.null(args$parallelThreads) && args$parallelThreads > 1){
require(doParallel)
parallelThreads <- args$parallelThreads
registerDoParallel(parallelThreads)
`%fun%` <- foreach::`%dopar%`
}
# Initialize the current indices for chunks and blockList
currentChunk <- 0
blockList <- ""
while(currentChunk < numOfChunks){
count <- 1
if(currentChunk + parallelThreads >= numOfChunks){
count <- numOfChunks - currentChunk
}
else{
count <- parallelThreads
}
chunk <- readBin(to.read, raw(), n = defaultSize * count)
accountName <- name
results <- foreach::foreach(i = 0:(count - 1), .export = c("sasToken", "accountName")) %fun% {
if(i == count - 1){
data <- chunk[((i*defaultSize) + 1) : length(chunk)]
}
else{
data <- chunk[((i*defaultSize) + 1) : ((i*defaultSize) + defaultSize)]
}
blockId <- currentChunk + i
currLength <- 8 - nchar(blockId)
for(j in 1:currLength)
{
blockId <- paste0(blockId, 0)
}
blockId <- RCurl::base64Encode(enc2utf8(blockId))
headers <- c()
headers['Content-Length'] <- as.character(length(data))
headers['x-ms-blob-type'] <- 'BlockBlob'
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName, "/", blobName),
headers = headers,
query=list('comp'="block",
'blockid'=blockId))
if(is.null(sasToken)){
storageCredentials <- getStorageCredentials()
callStorage(request, credentials = storageCredentials, body = data)
}
else{
callStorageSas(request, accountName = accountName, body = data, sas_params = sasToken)
}
return(paste0("<Latest>", blockId, "</Latest>"))
}
if(!is.null(args$parallelThreads) && args$parallelThreads > 1){
require(doParallel)
doParallel::stopImplicitCluster()
foreach::registerDoSEQ()
}
for(j in 1:length(results)){
blockList <- paste0(blockList, results[[j]])
}
currentChunk <- currentChunk + count
setTxtProgressBar(pb, currentChunk)
}
close(to.read)
httpBodyRequest <- paste0("<BlockList>", blockList, "</BlockList>")
httpBodyRequest <- paste0("<?xml version='1.0' encoding='utf-8'?>", httpBodyRequest)
if(is.null(sasToken)){
putBlockList(containerName, blobName, httpBodyRequest)
}
else{
putBlockList(containerName, blobName, body = httpBodyRequest, sasToken = sasToken, accountName = name)
}
}
putBlockList <- function(containerName, fileName, body, sasToken = NULL, ...){
args <- list(...)
if(is.null(args$accountName)){
storageCredentials <- getStorageCredentials()
}
headers <- c()
headers['Content-Length'] <- nchar(body)
headers['Content-Type'] <- 'text/xml'
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName, "/", fileName),
headers = headers,
query = list('comp'="blocklist")
)
if(!is.null(sasToken)){
callStorageSas(request, accountName = args$accountName, sas_params = sasToken, body = body)
}
else {
callStorage(request, storageCredentials, body)
}
}
getBlockList <- function(containerName, fileName, sasToken = NULL, ...){
if(!is.null(args$accountName)){
name <- args$accountName
}
else{
storageCredentials <- getStorageCredentials()
name <- storageCredentials$name
}
request <- AzureRequest$new(
method = "GET",
path = paste0("/", containerName, "/", fileName),
query=list('comp'="blocklist",
'blocklisttype'="all")
)
if(!is.null(sasToken)){
callStorageSas(request, name, sas_params = sasToken)
}
else {
callStorage(request, storageCredentials)
}
}
uploadDirectory <- function(containerName, fileDirectory, ...){
args <- list(...)
if(is.null(args$storageCredentials)){
storageCredentials <- getStorageCredentials()
}
else{
storageCredentials <- args$storageCredentials
}
files = list.files(fileDirectory, full.names = TRUE, recursive = TRUE)
fileName = list.files(fileDirectory, recursive = TRUE)
for(i in 1:length(files))
{
uploadBlob(containerName, files[i], remoteName = fileName[i], ...)
}
}
downloadBlob <- function(containerName,
blobName,
sasToken = NULL,
overwrite = FALSE,
...){
args <- list(...)
if(!is.null(args$localDest)){
write <- httr::write_disk(args$localDest, overwrite)
}
else {
write <- httr::write_memory()
}
if(is.null(args$accountName)){
storageCredentials <- getStorageCredentials()
}
request <- AzureRequest$new(
method = "GET",
path = paste0("/", containerName, "/", blobName))
if(!is.null(sasToken)){
callStorageSas(request, args$accountName, sas_params = sasToken, write = write)
}
else {
callStorage(request, storageCredentials, write = write)
credentials <- getStorageCredentials()
request <- prepareStorageRequest(request, credentials)
response <- executeAzureRequest(request, ...)
}
extractAzureResponse(response, content)
}

Просмотреть файл

@ -1,20 +1,5 @@
#' Add a task to the specified job.
#'
#' @param jobId The id of the job to which the task is to be added.
#' @param taskId A string that uniquely identifies the task within the job.
#' @param ... Further named parameters
#' \itemize{
#' \item{"resourceFiles"}: {A list of files that the Batch service will download to the compute node before running the command line.}
#' \item{"args"}: {Arguments in the foreach parameters that will be used for the task running.}
#' \item{"packages"}: {A list of packages that the Batch service will download to the compute node.}
#' \item{"envir"}: {The R environment that the task will run under.}
#'}
#' @return The response of task
#' @examples
#' addTask(job-001, task-001)
addTask <- function(jobId, taskId = "default", ...){
addTask <- function(jobId, taskId = "default", content = "parsed", ...){
batchCredentials <- getBatchCredentials()
storageCredentials <- getStorageCredentials()
args <- list(...)
environmentSettings <- args$environmentSettings
@ -23,11 +8,11 @@ addTask <- function(jobId, taskId = "default", ...){
dependsOn <- args$dependsOn
outputFiles <- args$outputFiles
if(is.null(commandLine)){
if (is.null(commandLine)) {
stop("Task requires a command line.")
}
body = list(id = taskId,
body <- list(id = taskId,
commandLine = commandLine,
userIdentity = list(
autoUser = list(
@ -45,7 +30,7 @@ addTask <- function(jobId, taskId = "default", ...){
body <- Filter(length, body)
size <- nchar(rjson::toJSON(body, method="C"))
size <- nchar(rjson::toJSON(body, method = "C"))
headers <- c()
headers['Content-Length'] <- size
@ -55,13 +40,14 @@ addTask <- function(jobId, taskId = "default", ...){
method = "POST",
path = paste0("/jobs/", jobId, "/tasks"),
query = list("api-version" = apiVersion),
headers = headers
headers = headers,
body = body
)
callBatchService(request, batchCredentials, body)
callBatchService(request, batchCredentials, content)
}
listTask <- function(jobId, ...){
listTask <- function(jobId, content = "parsed", ...){
batchCredentials <- getBatchCredentials()
args <- list(...)
@ -81,5 +67,5 @@ listTask <- function(jobId, ...){
query = query
)
callBatchService(request, batchCredentials)
callBatchService(request, batchCredentials, content)
}

190
R/upload_blob_operations.R Normal file
Просмотреть файл

@ -0,0 +1,190 @@
uploadChunk <-
function(containerName,
fileDirectory,
content = "parsed",
...) {
args <- list(...)
finfo <- file.info(fileDirectory)
readBytes <- file(fileDirectory, "rb")
defaultSize <- 50000000
numOfChunks <- ceiling(finfo$size / defaultSize)
blockList <- c()
filePath <- strsplit(fileDirectory, "/")
filePath <- unlist(filePath)
blobName <- filePath[length(filePath)]
blobName <- basename(fileDirectory)
if (!is.null(args$remoteName)) {
blobName <- args$remoteName
}
pb <- txtProgressBar(min = 0, max = numOfChunks, style = 3)
sasToken <- args$sasToken
accountName <- args$accountName
config <- getOption("az_config")
if (is.null(config) &&
(is.null(sasToken) || is.null(accountName))) {
stop(
paste(
"Missing authentication information: Use",
"setCredentials or provide sasToken and accountName"
)
)
}
`%fun%` <- foreach::`%do%`
parallelThreads <- 1
if (!is.null(args$parallelThreads) &&
args$parallelThreads > 1) {
require(doParallel)
parallelThreads <- args$parallelThreads
doParallel::registerDoParallel(parallelThreads)
`%fun%` <- foreach::`%dopar%`
}
# Initialize the current indices for chunks and blockList
currentChunk <- 0
blockList <- ""
while (currentChunk < numOfChunks) {
count <- 1
if (currentChunk + parallelThreads >= numOfChunks) {
count <- numOfChunks - currentChunk
}
else{
count <- parallelThreads
}
chunk <- readBin(readBytes, raw(), n = defaultSize * count)
results <-
foreach::foreach(
i = 0:(count - 1),
.export = c("sasToken", "accountName", "content")
) %fun% {
options("az_config" = config)
blockSize <- i * defaultSize
if (i == count - 1) {
data <- chunk[(blockSize + 1):length(chunk)]
}
else {
data <-
chunk[(blockSize + 1):(blockSize + defaultSize)]
}
blockId <- currentChunk + i
currLength <- 8 - nchar(blockId)
for (j in 1:currLength)
{
blockId <- paste0(blockId, 0)
}
blockId <-
RCurl::base64Encode(enc2utf8(blockId))
headers <- c()
headers['Content-Length'] <-
as.character(length(data))
headers['x-ms-blob-type'] <- 'BlockBlob'
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName, "/", blobName),
headers = headers,
query = list('comp' = "block",
'blockid' = blockId)
)
callStorage(
request,
content = NULL,
body = data,
progress = TRUE,
...
)
return(paste0("<Latest>", blockId, "</Latest>"))
}
if (!is.null(args$parallelThreads) &&
args$parallelThreads > 1) {
require(doParallel)
doParallel::stopImplicitCluster()
foreach::registerDoSEQ()
}
for (j in 1:length(results)) {
blockList <- paste0(blockList, results[[j]])
}
currentChunk <- currentChunk + count
setTxtProgressBar(pb, currentChunk)
}
close(readBytes)
httpBodyRequest <-
paste0("<BlockList>", blockList, "</BlockList>")
httpBodyRequest <-
paste0("<?xml version='1.0' encoding='utf-8'?>", httpBodyRequest)
putBlockList(containerName,
blobName,
content = "response",
body = httpBodyRequest,
...)
}
putBlockList <-
function(containerName,
fileName,
body,
content = "text",
...) {
headers <- c()
headers['Content-Length'] <- nchar(body)
headers['Content-Type'] <- 'text/xml'
request <- AzureRequest$new(
method = "PUT",
path = paste0("/", containerName, "/", fileName),
headers = headers,
query = list('comp' = "blocklist")
)
callStorage(request, content, body = body, ...)
}
getBlockList <-
function(containerName, fileName, content = "parsed", ...) {
request <- AzureRequest$new(
method = "GET",
path = paste0("/", containerName, "/", fileName),
query = list('comp' = "blocklist",
'blocklisttype' = "all")
)
callStorage(request, content, ...)
}
uploadDirectory <- function(containerName, fileDirectory, ...) {
files <-
list.files(fileDirectory, full.names = TRUE, recursive = TRUE)
fileName <- list.files(fileDirectory, recursive = TRUE)
for (i in 1:length(files))
{
uploadBlob(containerName,
files[i],
remoteName = fileName[i],
content = "parsed",
...)
}
}

Просмотреть файл

@ -1,55 +0,0 @@
library(xml2)
library(AzureBatch)
library(shiny)
library(jsonlite)
shinyServer(function(input, output, session) {
autoInvalidate <- reactiveTimer(10000)
output$ex1 <- DT::renderDataTable(
DT::datatable(iris, options = list(pageLength = 25), style = 'bootstrap')
)
output$cluster <- DT::renderDataTable({
autoInvalidate()
setPoolOption(input$directory, fullName = TRUE)
config <- getOption("az_config")
pool <- config$batchAccount$pool
nodes <- listPoolNodes(pool$name)
state <- c()
id <- c()
if(length(nodes$value) != 0){
for(i in 1:length(nodes$value))
{
id <- c(id, nodes$value[[i]]$id)
state <- c(state, nodes$value[[i]]$state)
}
}
return(data.frame(id, state))
}, options = list(style = 'bootstrap'))
output$table <- renderDataTable({
#autoInvalidate()
a <- "debug"
setPoolOption(input$directory, fullName = TRUE)
tasks <- if(input$sessionId == "") c() else listTask(input$sessionId, creds)
state <- c()
id <- c()
commandLine <- c()
for(i in 1:length(tasks$value))
{
state <- c(state, tasks$value[[i]]$state)
id <- c(id, tasks$value[[i]]$id)
commandLine <- c(commandLine, tasks$value[[i]]$commandLine)
}
data.frame(id, state, commandLine)
})
})

Просмотреть файл

@ -1,34 +0,0 @@
# This is the user-interface definition of a Shiny web application.
# You can find out more about building applications with Shiny here:
#
# http://shiny.rstudio.com
#
library(shinydashboard)
library(AzureBatch)
library(shiny)
header <- dashboardHeader(
title = "Azure Batch"
)
body <- dashboardBody(
fluidRow(
column(width=9,
tabBox(side = "left", height = "500px", width = NULL,
tabPanel('Iris', DT::dataTableOutput('ex1')),
tabPanel('Nodes', DT::dataTableOutput('cluster')),
tabPanel('Tasks', DT::dataTableOutput('table')))),
column(width=3,
box(width = NULL, solidHeader = TRUE,
textInput("directory", "Choose Configuration File")))
)
)
dashboardPage(
header,
dashboardSidebar(disable = TRUE),
body
)

Просмотреть файл

@ -1,30 +0,0 @@
#!/usr/bin/Rscript
args = commandArgs(trailingOnly=TRUE)
# test if there is at least one argument: if not, return an error
if (length(args) == 0) {
stop("At least one argument must be supplied (input file).n", call.=FALSE)
} else if (length(args)==1) {
# default output file
args[2] = "out.txt"
}
REPO <- args[1]
if(mode != "GITHUB"){
for(i in 2:length(args)){
install.packages(args[i], repos = REPO, dependencies = TRUE)
}
}
if(mode == "GITHUB"){
if (!require("devtools")) install.packages("devtools", dependencies = TRUE)
library(devtools)
for(i in 2:length(args)){
install_github(args[i])
}
}
quit(save = "yes", status = 0, runLast = FALSE)

Просмотреть файл

@ -1,45 +0,0 @@
#!/usr/bin/Rscript
args = commandArgs(trailingOnly=TRUE)
# test if there is at least one argument: if not, return an error
if (length(args)==0) {
stop("At least one argument must be supplied (input file).n", call.=FALSE)
} else if (length(args)==1) {
# default output file
args[2] = "out.txt"
}
AZ_BATCH_TASK_WORKING_DIR <- args[1]
AZ_BATCH_TASK_ENV <- args[2]
N <- args[3]
JOB_ID <- args[4]
numOfTasks <- args[5]
wd <- paste0(AZ_BATCH_TASK_WORKING_DIR, "/", AZ_BATCH_TASK_ENV)
azbatchenv <- readRDS(wd)
for(package in azbatchenv$packages){
library(package, character.only = TRUE)
}
parent.env(azbatchenv$exportenv) <- globalenv()
results <- vector("list", numOfTasks)
count <- 1
tryCatch({
for(i in 1:N){
task_result <- paste0(AZ_BATCH_TASK_WORKING_DIR, "/result/", JOB_ID, "-task", i, "-result.rds")
task <- readRDS(task_result)
for(t in 1 : length(task)){
results[count] <- task[t]
count <- count + 1
}
}
file_result_name <- strsplit(AZ_BATCH_TASK_ENV, "[.]")[[1]][1]
saveRDS(results, file = paste0(AZ_BATCH_TASK_WORKING_DIR, "/", file_result_name, "-result.rds"))
}, error = function(e) {
print(e)
})
quit(save = "yes", status = 0, runLast = FALSE)

Просмотреть файл

@ -1,68 +0,0 @@
AZ_BATCH_TASK_WORKING_DIR <- args[1]
AZ_BATCH_TASK_ENV <- args[2]
SPLIT_FUNC <- args[2]
FILE_INPUT <- args[3]
FILE_EXTENSION <- args[4]
LEVEL <- args[4]
file_extension <- strsplit(FILE_INPUT, "[.]")[[1]];
df <- data.frame(Date=as.Date(character()),
File=character(),
User=character(),
stringsAsFactors=FALSE)
if(file_extension[-1] == "txt"){
df <- read.table(paste0(AZ_BATCH_JOB_PREP_WORKING_DIR, '/', FILE_INPUT))
}
if(file_extension[-1] == "csv"){
df <- read.csv(paste0(AZ_BATCH_JOB_PREP_WORKING_DIR, '/', FILE_INPUT))
}
eval(azbatchenv$expr, azbatchenv$exportenv)
wd <- paste0(AZ_BATCH_JOB_PREP_WORKING_DIR, "/environment.RData")
e <- readRDS(wd)
chunks <- e[[SPLIT_FUNC]](df)
for(i in 1:length(chunks)){
file_name <- paste0(file_extension[1], "_", LEVEL, '_', i, '.', file_extension[-1])
if(file_extension[-1] == "txt"){
write.table(chunks[[i]], file=file_name)
}
else{
write.csv(chunks[[i]], file=file_name)
}
cat(file_name)
cat(';')
}
AZ_BATCH_TASK_WORKING_DIR <- args[1]
AZ_BATCH_TASK_ENV <- args[2]
N <- args[3]
JOB_ID <- args[4]
wd <- paste0(AZ_BATCH_TASK_WORKING_DIR, "/", AZ_BATCH_TASK_ENV)
azbatchenv <- readRDS(wd)
for(package in azbatchenv$packages){
library(package, character.only = TRUE)
}
parent.env(azbatchenv$exportenv) <- globalenv()
results <- list()
for(i in 1:N){
task_result <- paste0(AZ_BATCH_TASK_WORKING_DIR, "/result/", JOB_ID, "-task", i, "-result.rds")
results[[i]] <- readRDS(task_result)
}
file_result_name <- strsplit(AZ_BATCH_TASK_ENV, "[.]")[[1]][1]
saveRDS(results, file = paste0(AZ_BATCH_TASK_WORKING_DIR, "/", file_result_name, "-result.rds"))
quit(save = "yes", status = 0, runLast = FALSE)

Просмотреть файл

@ -1,86 +0,0 @@
#!/usr/bin/Rscript
args = commandArgs(trailingOnly=TRUE)
# test if there is at least one argument: if not, return an error
if (length(args)==0) {
stop("At least one argument must be supplied (input file).n", call.=FALSE)
} else if (length(args)==1) {
# default output file
args[2] = "out.txt"
}
getparentenv <- function(pkgname) {
parenv <- NULL
# if anything goes wrong, print the error object and return
# the global environment
tryCatch({
# pkgname is NULL in many cases, as when the foreach loop
# is executed interactively or in an R script
if (is.character(pkgname)) {
# load the specified package
if (require(pkgname, character.only=TRUE)) {
# search for any function in the package
pkgenv <- as.environment(paste0('package:', pkgname))
for (sym in ls(pkgenv)) {
fun <- get(sym, pkgenv, inherits=FALSE)
if (is.function(fun)) {
env <- environment(fun)
if (is.environment(env)) {
parenv <- env
break
}
}
}
if (is.null(parenv)) {
stop('loaded ', pkgname, ', but parent search failed', call.=FALSE)
} else {
message('loaded ', pkgname, ' and set parent environment')
}
}
}
},
error=function(e) {
cat(sprintf('Error getting parent environment: %s\n',
conditionMessage(e)))
})
# return the global environment by default
if (is.null(parenv)) globalenv() else parenv
}
AZ_BATCH_TASK_WORKING_DIR <- args[1]
AZ_BATCH_TASK_ENV <- args[2]
setwd(AZ_BATCH_TASK_WORKING_DIR)
wd <- paste0(AZ_BATCH_TASK_ENV)
azbatchenv <- readRDS(wd)
for(package in azbatchenv$packages){
library(package, character.only = TRUE)
}
ls(azbatchenv)
parent.env(azbatchenv$exportenv) <- getparentenv(azbatchenv$pkgName)
azbatchenv$pkgName
sessionInfo()
if(!is.null(azbatchenv$inputs)){
options("az_config" = list(container = azbatchenv$inputs))
}
result <- lapply(azbatchenv$argsList, function(args){
tryCatch({
lapply(names(args), function(n)
assign(n, args[[n]], pos=azbatchenv$exportenv))
eval(azbatchenv$expr, azbatchenv$exportenv)
}, error = function(e) {
print(e)
})
})
file_result_name <- strsplit(AZ_BATCH_TASK_ENV, "[.]")[[1]][1]
saveRDS(result, file = paste0(AZ_BATCH_TASK_WORKING_DIR, "/", file_result_name, "-result.rds"))
quit(save = "yes", status = 0, runLast = FALSE)

Просмотреть файл

@ -1,7 +1,6 @@
if (requireNamespace("lintr", quietly = TRUE)) {
context("lints")
test_that("Package Style", {
linters <- list(
absolute_path_linter = lintr::absolute_path_linter,
assignment_linter = lintr::assignment_linter,
@ -12,10 +11,8 @@ if (requireNamespace("lintr", quietly = TRUE)) {
line_length_linter = lintr::line_length_linter(120),
no_tab_linter = lintr::no_tab_linter,
object_usage_linter = lintr::object_usage_linter,
object_name_linter = lintr::object_name_linter(style = "lowerCamelCase"),
object_length_linter = lintr::object_length_linter,
open_curly_linter = lintr::open_curly_linter,
single_quotes_linter = lintr::single_quotes_linter,
spaces_inside_linter = lintr::spaces_inside_linter,
spaces_left_parentheses_linter = lintr::spaces_left_parentheses_linter,
trailing_blank_lines_linter = lintr::trailing_blank_lines_linter,