* Added task id range

* Removed upload blob methods

* Removed upload blob

* Fixed trailing whitespace

* Discarded job id on merge task id name

* Adding chunk logic for argsList

* Added check for args containing data sets

* Removed container name for docker run command for all tasks

* Added test for hasDataSet

* Fix travis yml

* Adding before_install for R

* Removed before install, added github package of nycflights13
This commit is contained in:
Brian 2018-01-09 18:15:35 -08:00 коммит произвёл GitHub
Родитель 9d50403846
Коммит afde92fe01
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 94 добавлений и 35 удалений

Просмотреть файл

@ -8,3 +8,4 @@ warnings_are_errors: false
r_github_packages:
- Azure/rAzureBatch
- jimhester/lintr
- hadley/nycflights13

Просмотреть файл

@ -224,6 +224,12 @@ setHttpTraffic <- function(value = FALSE) {
assign("bioconductor", bioconductorPackages, .doAzureBatchGlobals)
assign("pkgName", pkgName, .doAzureBatchGlobals)
isDataSet <- hasDataSet(argsList)
if (!isDataSet) {
assign("argsList", argsList, .doAzureBatchGlobals)
}
if (!is.null(obj$options$azure$job)) {
id <- obj$options$azure$job
}
@ -528,18 +534,26 @@ setHttpTraffic <- function(value = FALSE) {
tasks <- lapply(1:length(endIndices), function(i) {
startIndex <- startIndices[i]
endIndex <- endIndices[i]
taskId <- paste0(id, "-task", i)
taskId <- as.character(i)
args <- NULL
if (isDataSet) {
args <- argsList[startIndex:endIndex]
}
.addTask(
jobId = id,
taskId = taskId,
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R > $AZ_BATCH_TASK_ID.txt"),
args = argsList[startIndex:endIndex],
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R %i %i %i > $AZ_BATCH_TASK_ID.txt",
startIndex,
endIndex,
isDataSet),
envir = .doAzureBatchGlobals,
packages = obj$packages,
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
containerImage = data$containerImage,
args = args
)
cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "")
@ -548,14 +562,11 @@ setHttpTraffic <- function(value = FALSE) {
return(taskId)
})
rAzureBatch::updateJob(id)
if (enableCloudCombine) {
cat("\nSubmitting merge task")
mergeTaskId <- paste0(id, "-merge")
.addTask(
jobId = id,
taskId = mergeTaskId,
taskId = "merge",
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s > $AZ_BATCH_TASK_ID.txt",
length(tasks),
@ -564,7 +575,7 @@ setHttpTraffic <- function(value = FALSE) {
),
envir = .doAzureBatchGlobals,
packages = obj$packages,
dependsOn = tasks,
dependsOn = list(taskIdRanges = list(list(start = 1, end = length(tasks)))),
cloudCombine = cloudCombine,
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
@ -572,6 +583,9 @@ setHttpTraffic <- function(value = FALSE) {
cat(". . .")
}
# Updating the job to terminate after all tasks are completed
rAzureBatch::updateJob(id)
if (wait) {
if (!is.null(obj$packages) ||
!is.null(githubPackages) ||
@ -588,7 +602,7 @@ setHttpTraffic <- function(value = FALSE) {
response <-
rAzureBatch::downloadBlob(
id,
paste0("result/", id, "-merge-result.rds"),
paste0("result/", "merge-result.rds"),
sasToken = sasToken,
accountName = storageCredentials$name,
downloadPath = tempFile,

Просмотреть файл

@ -3,8 +3,8 @@
args <- list(...)
.doAzureBatchGlobals <- args$envir
argsList <- args$args
dependsOn <- args$dependsOn
argsList <- args$args
cloudCombine <- args$cloudCombine
userOutputFiles <- args$outputFiles
containerImage <- args$containerImage
@ -12,8 +12,18 @@
resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name
resourceFiles <- NULL
if (!is.null(argsList)) {
assign("argsList", argsList, .doAzureBatchGlobals)
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, file.path(getwd(), envFile))
file.remove(envFile)
readToken <- rAzureBatch::createSasToken("r", "c", jobId)
envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, readToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
}
# Only use the download command if cloudCombine is enabled
@ -34,22 +44,9 @@
commands <- c(downloadCommand)
}
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", envFile))
file.remove(envFile)
sasToken <- rAzureBatch::createSasToken("r", "c", jobId)
writeToken <- rAzureBatch::createSasToken("w", "c", jobId)
envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, sasToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
exitConditions <- NULL
if (!is.null(args$dependsOn)) {
dependsOn <- list(taskIds = dependsOn)
dependsOn <- args$dependsOn
}
else {
exitConditions <- list(default = list(dependencyAction = "satisfy"))
@ -59,7 +56,7 @@
rAzureBatch::createBlobUrl(
storageAccount = storageCredentials$name,
containerName = jobId,
sasToken = writeToken
sasToken = rAzureBatch::createSasToken("w", "c", jobId)
)
outputFiles <- list(
@ -101,7 +98,7 @@
commands <-
c(commands,
dockerRunCommand(containerImage, rCommand, taskId))
dockerRunCommand(containerImage, rCommand))
commands <- linuxWrapCommands(commands)

Просмотреть файл

@ -464,7 +464,7 @@ waitForTasksToComplete <-
# Wait for merge task to complete
repeat {
# Verify that the merge cloud task didn't have any errors
mergeTask <- rAzureBatch::getTask(jobId, paste0(jobId, "-merge"))
mergeTask <- rAzureBatch::getTask(jobId, "merge")
# This test needs to go first as Batch service will not return an execution info as null
if (is.null(mergeTask$executionInfo$result)) {

Просмотреть файл

@ -243,3 +243,16 @@ readMetadataBlob <- function(jobId) {
areShallowEqual <- function(a, b) {
!is.null(a) && !is.null(b) && a == b
}
hasDataSet <- function(list) {
if (length(list) > 0) {
for (arg in list[[1]]) {
# Data frames are shown as list in the foreach iterator
if (typeof(arg) == "list") {
return(TRUE)
}
}
}
return(FALSE)
}

Просмотреть файл

@ -18,6 +18,7 @@ chunkSize <- as.integer(args[2])
errorHandling <- args[3]
batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID")
batchJobPreparationDirectory <-
Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
@ -75,14 +76,14 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
}
for (t in 1:length(task)) {
results[count] <- task[t]
results[[count]] <- task[[t]]
count <- count + 1
}
}
saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
paste0(batchTaskId, "-result.rds")
))
}
else if (errorHandling == "pass") {
@ -111,7 +112,7 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
paste0(batchTaskId, "-result.rds")
))
}

Просмотреть файл

@ -2,6 +2,10 @@
args <- commandArgs(trailingOnly = TRUE)
workerErrorStatus <- 0
startIndex <- as.integer(args[1])
endIndex <- as.integer(args[2])
isDataSet <- as.logical(as.integer(args[3]))
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(
jobPrepDirectory,
@ -68,7 +72,12 @@ setwd(batchTaskWorkingDirectory)
azbatchenv <-
readRDS(paste0(batchJobPreparationDirectory, "/", batchJobEnvironment))
taskArgs <- readRDS(batchTaskEnvironment)
if (isDataSet) {
argsList <- readRDS(batchTaskEnvironment)
} else {
argsList <- azbatchenv$argsList[startIndex:endIndex]
}
for (package in azbatchenv$packages) {
library(package, character.only = TRUE)
@ -83,7 +92,7 @@ if (!is.null(azbatchenv$inputs)) {
options("az_config" = list(container = azbatchenv$inputs))
}
result <- lapply(taskArgs, function(args) {
result <- lapply(argsList, function(args) {
tryCatch({
lapply(names(args), function(n)
assign(n, args[[n]], pos = azbatchenv$exportenv))
@ -99,7 +108,7 @@ result <- lapply(taskArgs, function(args) {
})
})
if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) {
if (!is.null(azbatchenv$gather) && length(argsList) > 1) {
result <- Reduce(azbatchenv$gather, result)
}

Просмотреть файл

@ -0,0 +1,24 @@
if (requireNamespace("nycflights13", quietly = TRUE)) {
context("hasDataSet function")
test_that("Arguments contains data set", {
byCarrierList <- split(nycflights13::flights, nycflights13::flights$carrier)
it <- iterators::iter(byCarrierList)
argsList <- as.list(it)
hasDataSet <- hasDataSet(argsList)
expect_equal(hasDataSet, TRUE)
})
test_that("Arguments does not contain data set", {
args <- seq(1:10)
it <- iterators::iter(args)
argsList <- as.list(it)
hasDataSet <- hasDataSet(argsList)
expect_equal(hasDataSet, FALSE)
})
}