* force add PATH to current user

* checkin docker setup script

* Update cluster_setup.sh

* install docker and start container on cluster setup

* WIP: Run task in container

* fix merge conflict

* run tasks and merge task from within container

* refactor code to proper docker commands and make a single R container per job

* refactor command line utils into its own file

* refactor job utilities into its own file

* move cluster setup script to inst folder

* remove unnecessary curl installs

* remove starting container from setup script

* check in WIP

* add apt_install file

* make required directories

* update cluster setup files as needed

* include libxml2 packages in apt installs

* working cluster create with cran and github dependencies

* update job prep to install apt-get and not each task

* use rocker containers instead of r-base

* remove unused & commented code

* remove unused install function

* address several lintr issues

* initial test dockerfile

* add spacing between commands

* temporarily point wget to feature branch

* update bioconductor install for non-jobPrep installs

* Delete Dockerfile

* minor changes to install bioc

* resolve merge conflicts

* update cluster to correctly install BioC packages using install_bioconductor

* fix issue where some packages were not getting installed

* add missing BioConductorCommand initializer

* remove print lines

* initial dockerfile implementations

* update docker files

* Only install packages if they are required

* Remove requirement on bioconductor installer script on start task

* remove duplicate environment variable entry

* update docs for container support

* update version to 0.6.0

* refactor changes updates

* remove poorly formatted whitespaces

* add full path to pacakges directory

* fix docker command line

* update file share sample

* update azure files cluster name

* update mandelbrot sample

* update package management sample

* update plyr samples

* make montecarlo sample more consistent

* update montecarlo sample

* remove plyr example

* fix bad environment pointer

* fix linter issues

* more linter fixes

* more linter issues

* use latest rAzureBatch version

* update resource files example

* remove reference to deleted sample

* pr feedback

* PR docs feedback

* Print errors from worker (#154)

* Fixed pool package command line lintr test

* Package installation tests fixed - too long lines

* Fixed json in customize cluster docs

* Fix: Typos in customize cluster docs

* Cleaning up files

* Feature/githubbiopackage (#150)

* install github package worked for foreach loop

* fix lintr error

* tests for github and bioc packages installation

* lintr fix

* add back lost code due to merge and update docs

* The Travis CI build failed for feature/githubbiopackage

* remove incorrect parameter for install_github

* Updated job prep task to have default command

* Use the latest version of rAzureBatch

* Updated description + Generate cluster config

* Fix: Bioconductor and Github packages installation (#155)

* Added multiple package install test and fix obj reading args

* Fixed naming for packages install

* Replaced validation exclusion for linter

* Fixed test validate test

* Fixing all interactive tests with skip

* Fixed renaming validation

* Removed default test - cannot be tested

* Removed  in validation

* Added cluster package install tests (#156)
This commit is contained in:
Pablo Selem 2017-11-03 10:06:40 -07:00 коммит произвёл GitHub
Родитель 4eb3773738
Коммит a6e51c964e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
50 изменённых файлов: 1717 добавлений и 931 удалений

2
.lintr
Просмотреть файл

@ -1 +1 @@
exclusions: list("R/validators.R")
exclusions: list("R/validationUtilities.R")

Просмотреть файл

@ -1,7 +1,7 @@
Package: doAzureParallel
Type: Package
Title: doAzureParallel
Version: 0.5.0
Version: 0.6.0
Author: Brian Hoang
Maintainer: Brian Hoang <brhoan@microsoft.com>
Description: The project is for data experts who use R at scale. The project
@ -17,7 +17,7 @@ Depends:
foreach (>= 1.4.3),
iterators (>= 1.0.8)
Imports:
rAzureBatch (>= 0.5.1),
rAzureBatch (>= 0.5.3),
jsonlite,
rjson,
xml2,
@ -27,5 +27,5 @@ Suggests:
caret,
plyr,
lintr
Remotes: Azure/rAzureBatch@v0.5.1
Remotes: Azure/rAzureBatch@v0.5.3
RoxygenNote: 6.0.1

Просмотреть файл

@ -86,9 +86,11 @@ generateClusterConfig <- function(fileName) {
max = 3),
autoscaleFormula = "QUEUE"
),
containerImage = "rocker/tidyverse:latest",
rPackages = list(
cran = vector(),
github = vector(),
bioconductor = vector(),
githubAuthenticationToken = ""
),
commandLine = vector()
@ -143,6 +145,7 @@ makeCluster <-
installCranCommand <- NULL
installGithubCommand <- NULL
installBioconductorCommand <- NULL
if (!is.null(poolConfig$rPackages) &&
!is.null(poolConfig$rPackages$cran) &&
@ -158,21 +161,63 @@ makeCluster <-
getPoolPackageInstallationCommand("github", poolConfig$rPackages$github)
}
packages <- NULL
if (!is.null(installCranCommand)) {
packages <- installCranCommand
if (!is.null(poolConfig$rPackages) &&
!is.null(poolConfig$rPackages$bioconductor) &&
length(poolConfig$rPackages$bioconductor) > 0) {
installBioconductorCommand <-
getPoolPackageInstallationCommand("bioconductor", poolConfig$rPackages$bioconductor)
}
if (!is.null(installGithubCommand) && is.null(packages)) {
packages <- installGithubCommand
packages <- c()
if (!is.null(installCranCommand)) {
packages <- c(installCranCommand, packages)
}
else if (!is.null(installGithubCommand) && !is.null(packages)) {
packages <- c(installCranCommand, installGithubCommand)
if (!is.null(installGithubCommand)) {
packages <- c(installGithubCommand, packages)
}
if (!is.null(installBioconductorCommand)) {
packages <- c(installBioconductorCommand, packages)
}
if (length(packages) == 0) {
packages <- NULL
}
commandLine <- NULL
# install docker and create docker container
dockerImage <- "rocker/tidyverse:latest"
if (!is.null(poolConfig$containerImage)) {
dockerImage <- poolConfig$containerImage
}
config$containerImage <- dockerImage
installAndStartContainerCommand <- paste("cluster_setup.sh",
dockerImage,
sep = " ")
containerInstallCommand <- c(
#TODO: Updates branch to point at master!
paste0(
"wget https://raw.githubusercontent.com/Azure/doAzureParallel/",
"feature/container_wip/inst/startup/cluster_setup.sh"),
"chmod u+x cluster_setup.sh",
paste0(
"wget https://raw.githubusercontent.com/Azure/doAzureParallel/",
"feature/container_wip/inst/startup/install_bioconductor.R"),
"chmod u+x install_bioconductor.R",
installAndStartContainerCommand
)
if (!is.null(poolConfig$commandLine)) {
commandLine <- poolConfig$commandLine
commandLine <- c(containerInstallCommand, poolConfig$commandLine)
}
if (!is.null(packages)) {
# install packages
commandLine <-
c(commandLine,
dockerRunCommand(dockerImage, packages, NULL, FALSE, FALSE))
}
environmentSettings <- NULL
@ -189,17 +234,17 @@ makeCluster <-
}
if (!is.null(poolConfig[["pool"]])) {
validateDeprecatedClusterConfig(clusterSetting)
validation$isValidDeprecatedClusterConfig(clusterSetting)
poolConfig <- poolConfig[["pool"]]
}
else {
validateClusterConfig(clusterSetting)
validation$isValidClusterConfig(clusterSetting)
}
tryCatch({
`Validators`$isValidPoolName(poolConfig$name)
validation$isValidPoolName(poolConfig$name)
},
error = function(e){
error = function(e) {
stop(paste("Invalid pool name: \n",
e))
})
@ -219,19 +264,19 @@ makeCluster <-
if (grepl("PoolBeingDeleted", response)) {
pool <- rAzureBatch::getPool(poolConfig$name)
cat(
sprintf(
paste("Cluster '%s' already exists and is being deleted.",
"Another cluster with the same name cannot be created",
"until it is deleted. Please wait for the cluster to be deleted",
"or create one with a different name"),
poolConfig$name
cat(sprintf(
paste(
"Cluster '%s' already exists and is being deleted.",
"Another cluster with the same name cannot be created",
"until it is deleted. Please wait for the cluster to be deleted",
"or create one with a different name"
),
fill = TRUE
)
poolConfig$name
),
fill = TRUE)
while (areShallowEqual(rAzureBatch::getPool(poolConfig$name)$state,
"deleting")) {
"deleting")) {
cat(".")
Sys.sleep(10)
}

126
R/commandLineUtilities.R Normal file
Просмотреть файл

@ -0,0 +1,126 @@
getJobPackageInstallationCommand <- function(type, packages) {
script <- ""
if (type == "cran") {
script <- "Rscript $AZ_BATCH_JOB_PREP_WORKING_DIR/install_cran.R"
}
else if (type == "github") {
script <- "Rscript $AZ_BATCH_JOB_PREP_WORKING_DIR/install_github.R"
}
else if (type == "bioconductor") {
script <-
"Rscript $AZ_BATCH_JOB_PREP_WORKING_DIR/install_bioconductor.R"
}
else {
stop("Using an incorrect package source")
}
if (!is.null(packages) && length(packages) > 0) {
packageCommands <- paste0(packages, collapse = " ")
script <- paste0(script, " ", packageCommands)
}
}
getPoolPackageInstallationCommand <- function(type, packages) {
poolInstallationCommand <- character(length(packages))
sharedPackagesDirectory <- "/mnt/batch/tasks/shared/R/packages"
libPathsCommand <- paste0('\'.libPaths( c( \\\"',
sharedPackagesDirectory,
'\\\", .libPaths()));')
installCommand <-
paste("Rscript -e \'args <- commandArgs(TRUE)\'",
"-e \'options(warn=2)\'")
# At this point we cannot use install_cran.R and install_github.R because they are not yet available.
if (type == "cran") {
script <-
paste(installCommand,
paste("-e",
libPathsCommand,
"install.packages(args[1])\' %s")
)
}
else if (type == "github") {
script <-
paste(
installCommand,
paste(
"-e",
libPathsCommand,
"devtools::install_github(args[1])\' %s"
)
)
}
else if (type == "bioconductor") {
script <- "Rscript /mnt/batch/tasks/startup/wd/install_bioconductor.R %s"
}
else {
stop("Using an incorrect package source")
}
for (i in 1:length(packages)) {
poolInstallationCommand[i] <- sprintf(script, packages[i])
}
poolInstallationCommand
}
dockerRunCommand <-
function(containerImage,
command,
containerName = NULL,
runAsDaemon = FALSE,
includeEnvironmentVariables = TRUE) {
dockerOptions <- paste(
"--rm",
"-v $AZ_BATCH_NODE_ROOT_DIR:$AZ_BATCH_NODE_ROOT_DIR",
"-e AZ_BATCH_NODE_ROOT_DIR=$AZ_BATCH_NODE_ROOT_DIR",
"-e AZ_BATCH_NODE_STARTUP_DIR=$AZ_BATCH_NODE_STARTUP_DIR",
sep = " "
)
if (runAsDaemon) {
dockerOptions <- paste(dockerOptions, "-d", dockerOptions, sep = " ")
}
if (!is.null(containerName)) {
dockerOptions <-
paste(dockerOptions, "--name", containerName, dockerOptions, sep = " ")
}
if (includeEnvironmentVariables) {
dockerOptions <-
paste(
dockerOptions,
"-e AZ_BATCH_TASK_ID=$AZ_BATCH_TASK_ID",
"-e AZ_BATCH_JOB_ID=$AZ_BATCH_JOB_ID",
"-e AZ_BATCH_TASK_WORKING_DIR=$AZ_BATCH_TASK_WORKING_DIR",
"-e AZ_BATCH_JOB_PREP_WORKING_DIR=$AZ_BATCH_JOB_PREP_WORKING_DIR",
"-e BLOBXFER_SASKEY=$BLOBXFER_SASKEY",
sep = " "
)
}
dockerRunCommand <-
paste("docker run", dockerOptions, containerImage, command, sep = " ")
dockerRunCommand
}
linuxWrapCommands <- function(commands = c()) {
# Sanitize the vector and don't allow empty values
cleanCommands <- commands[lapply(commands, length) > 0]
commandLine <- ""
if (length(cleanCommands) > 0) {
# Do not allow absolute paths is enforced in lintr
commandLine <-
sprintf("/bin/bash -c \"set -e; set -o pipefail; %s wait\"",
paste0(paste(
cleanCommands, sep = " ", collapse = "; "
), ";"))
}
commandLine
}

Просмотреть файл

@ -11,7 +11,8 @@ registerDoAzureParallel <- function(cluster) {
fun = .doAzureParallel,
data = list(
config = list(cluster$batchAccount, cluster$storageAccount),
poolId = cluster$poolId
poolId = cluster$poolId,
containerImage = cluster$containerImage
),
info = .info
)
@ -128,6 +129,18 @@ setHttpTraffic <- function(value = FALSE) {
.doAzureParallel <- function(obj, expr, envir, data) {
stopifnot(inherits(obj, "foreach"))
githubPackages <- eval(obj$args$github)
bioconductorPackages <- eval(obj$args$bioconductor)
# Remove special arguments, github and bioconductor, from args list
if (!is.null(obj$args[["github"]])) {
obj$args[["github"]] <- NULL
}
if (!is.null(obj$args[["bioconductor"]])) {
obj$args[["bioconductor"]] <- NULL
}
storageCredentials <- rAzureBatch::getStorageCredentials()
it <- iterators::iter(obj)
@ -193,6 +206,8 @@ setHttpTraffic <- function(value = FALSE) {
assign("expr", expr, .doAzureBatchGlobals)
assign("exportenv", exportenv, .doAzureBatchGlobals)
assign("packages", obj$packages, .doAzureBatchGlobals)
assign("github", githubPackages, .doAzureBatchGlobals)
assign("bioconductor", bioconductorPackages, .doAzureBatchGlobals)
assign("pkgName", pkgName, .doAzureBatchGlobals)
if (!is.null(obj$options$azure$job)) {
@ -204,8 +219,8 @@ setHttpTraffic <- function(value = FALSE) {
}
tryCatch({
`Validators`$isValidStorageContainerName(id)
`Validators`$isValidJobName(id)
validation$isValidStorageContainerName(id)
validation$isValidJobName(id)
},
error = function(e){
stop(paste("Invalid job name: \n",
@ -394,7 +409,10 @@ setHttpTraffic <- function(value = FALSE) {
poolId = data$poolId,
resourceFiles = resourceFiles,
metadata = metadata,
packages = obj$packages
packages = obj$packages,
github = githubPackages,
bioconductor = bioconductorPackages,
containerImage = data$containerImage
)
if (response$status_code == 201) {
@ -466,7 +484,8 @@ setHttpTraffic <- function(value = FALSE) {
args = argsList[startIndex:endIndex],
envir = .doAzureBatchGlobals,
packages = obj$packages,
outputFiles = obj$options$azure$outputFiles
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
)
return(taskId)
@ -489,12 +508,15 @@ setHttpTraffic <- function(value = FALSE) {
packages = obj$packages,
dependsOn = tasks,
cloudCombine = cloudCombine,
outputFiles = obj$options$azure$outputFiles
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
)
}
if (wait) {
if (!is.null(obj$packages)) {
if (!is.null(obj$packages) ||
!is.null(githubPackages) ||
!is.null(bioconductorPackages)) {
waitForJobPreparation(id, data$poolId)
}

Просмотреть файл

@ -7,13 +7,31 @@
dependsOn <- args$dependsOn
cloudCombine <- args$cloudCombine
userOutputFiles <- args$outputFiles
containerImage <- args$containerImage
resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name
if (!is.null(argsList)) {
assign("argsList", argsList, .doAzureBatchGlobals)
}
# Only use the download command if cloudCombine is enabled
# Otherwise just leave it empty
commands <- c()
if (!is.null(cloudCombine)) {
assign("cloudCombine", cloudCombine, .doAzureBatchGlobals)
copyCommand <- sprintf(
"%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include result/*.rds",
accountName,
jobId,
"$AZ_BATCH_TASK_WORKING_DIR"
)
downloadCommand <-
dockerRunCommand("alfpark/blobxfer:0.12.1", copyCommand, "blobxfer", FALSE)
commands <- c(downloadCommand)
}
envFile <- paste0(taskId, ".rds")
@ -34,25 +52,9 @@
dependsOn <- list(taskIds = dependsOn)
}
else {
exitConditions <- list(
default = list(
dependencyAction = "satisfy"
)
)
exitConditions <- list(default = list(dependencyAction = "satisfy"))
}
resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name
downloadCommand <-
sprintf(
paste("/anaconda/envs/py35/bin/blobxfer %s %s %s --download --saskey $BLOBXFER_SASKEY",
"--remoteresource . --include result/*.rds"),
accountName,
jobId,
"$AZ_BATCH_TASK_WORKING_DIR"
)
containerUrl <-
rAzureBatch::createBlobUrl(
storageAccount = storageCredentials$name,
@ -96,9 +98,10 @@
)
outputFiles <- append(outputFiles, userOutputFiles)
commands <-
c(downloadCommand,
rCommand)
c(commands,
dockerRunCommand(containerImage, rCommand, taskId))
commands <- linuxWrapCommands(commands)
@ -142,12 +145,35 @@
...) {
args <- list(...)
packages <- args$packages
github <- args$github
bioconductor <- args$bioconductor
containerImage <- args$containerImage
poolInfo <- list("poolId" = poolId)
# Default command for job preparation task
commands <- c("ls")
if (!is.null(packages)) {
jobPackages <- getJobPackageInstallationCommand("cran", packages)
jobPackages <-
dockerRunCommand(containerImage,
getJobPackageInstallationCommand("cran", packages),
jobId)
commands <- c(commands, jobPackages)
}
if (!is.null(github) && length(github) > 0) {
jobPackages <-
dockerRunCommand(containerImage,
getJobPackageInstallationCommand("github", github),
jobId)
commands <- c(commands, jobPackages)
}
if (!is.null(bioconductor) &&
length(bioconductor) > 0) {
jobPackages <-
dockerRunCommand(containerImage,
getJobPackageInstallationCommand("bioconductor", bioconductor),
jobId)
commands <- c(commands, jobPackages)
}
@ -176,66 +202,64 @@
return(response)
}
.addPool <- function(pool, packages, environmentSettings, resourceFiles, ...) {
args <- list(...)
.addPool <-
function(pool,
packages,
environmentSettings,
resourceFiles,
...) {
args <- list(...)
commands <- c()
commands <- c(
"/anaconda/envs/py35/bin/pip install --no-dependencies blobxfer"
)
if (!is.null(args$commandLine)) {
commands <- c(commands, args$commandLine)
}
if (!is.null(args$commandLine)) {
commands <- c(commands, args$commandLine)
startTask <- list(
commandLine = linuxWrapCommands(commands),
userIdentity = list(autoUser = list(
scope = "pool",
elevationLevel = "admin"
)),
waitForSuccess = TRUE
)
if (!is.null(environmentSettings)) {
startTask$environmentSettings <- environmentSettings
}
if (length(resourceFiles) > 0) {
startTask$resourceFiles <- resourceFiles
}
virtualMachineConfiguration <- list(
imageReference = list(
publisher = "Canonical",
offer = "UbuntuServer",
sku = "16.04-LTS",
version = "latest"
),
nodeAgentSKUId = "batch.node.ubuntu 16.04"
)
response <- rAzureBatch::addPool(
pool$name,
pool$vmSize,
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = TRUE,
autoscaleFormula = getAutoscaleFormula(
pool$poolSize$autoscaleFormula,
pool$poolSize$dedicatedNodes$min,
pool$poolSize$dedicatedNodes$max,
pool$poolSize$lowPriorityNodes$min,
pool$poolSize$lowPriorityNodes$max,
maxTasksPerNode = pool$maxTasksPerNode
),
autoScaleEvaluationInterval = "PT5M",
maxTasksPerNode = pool$maxTasksPerNode,
content = "text"
)
return(response)
}
if (!is.null(packages)) {
commands <- c(commands, packages)
}
startTask <- list(
commandLine = linuxWrapCommands(commands),
userIdentity = list(autoUser = list(
scope = "pool",
elevationLevel = "admin"
)),
waitForSuccess = TRUE
)
if (!is.null(environmentSettings)) {
startTask$environmentSettings <- environmentSettings
}
if (length(resourceFiles) > 0) {
startTask$resourceFiles <- resourceFiles
}
virtualMachineConfiguration <- list(
imageReference = list(
publisher = "microsoft-ads",
offer = "linux-data-science-vm",
sku = "linuxdsvm",
version = "latest"
),
nodeAgentSKUId = "batch.node.centos 7"
)
response <- rAzureBatch::addPool(
pool$name,
pool$vmSize,
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = TRUE,
autoscaleFormula = getAutoscaleFormula(
pool$poolSize$autoscaleFormula,
pool$poolSize$dedicatedNodes$min,
pool$poolSize$dedicatedNodes$max,
pool$poolSize$lowPriorityNodes$min,
pool$poolSize$lowPriorityNodes$max,
maxTasksPerNode = pool$maxTasksPerNode
),
autoScaleEvaluationInterval = "PT5M",
maxTasksPerNode = pool$maxTasksPerNode,
content = "text"
)
return(response)
}

344
R/jobUtilities.R Normal file
Просмотреть файл

@ -0,0 +1,344 @@
#' Get a job for the given job id
#'
#' @param jobId A job id
#' @param verbose show verbose log output
#'
#' @examples
#' \dontrun{
#' getJob("job-001", FALSE)
#' }
#' @export
getJob <- function(jobId, verbose = TRUE) {
if (is.null(jobId)) {
stop("must specify the jobId parameter")
}
job <- rAzureBatch::getJob(jobId = jobId)
metadata <-
list(
chunkSize = 1,
enableCloudCombine = "TRUE",
packages = ""
)
if (!is.null(job$metadata)) {
for (i in 1:length(job$metadata)) {
metadata[[job$metadata[[i]]$name]] <- job$metadata[[i]]$value
}
}
if (verbose == TRUE) {
cat(sprintf("Job Id: %s", job$id), fill = TRUE)
cat("\njob metadata:", fill = TRUE)
cat(sprintf("\tchunkSize: %s", metadata$chunkSize),
fill = TRUE)
cat(sprintf("\tenableCloudCombine: %s", metadata$enableCloudCombine),
fill = TRUE)
cat(sprintf("\tpackages: %s", metadata$packages),
fill = TRUE)
}
taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId)
tasks <- list(
active = taskCounts$active,
running = taskCounts$running,
completed = taskCounts$completed,
succeeded = taskCounts$succeeded,
failed = taskCounts$failed
)
if (verbose == TRUE) {
cat("\ntasks:", fill = TRUE)
cat(sprintf("\tactive: %s", taskCounts$active), fill = TRUE)
cat(sprintf("\trunning: %s", taskCounts$running), fill = TRUE)
cat(sprintf("\tcompleted: %s", taskCounts$completed), fill = TRUE)
cat(sprintf("\t\tsucceeded: %s", taskCounts$succeeded), fill = TRUE)
cat(sprintf("\t\tfailed: %s", taskCounts$failed), fill = TRUE)
cat(
sprintf(
"\ttotal: %s",
taskCounts$active + taskCounts$running + taskCounts$completed
),
fill = TRUE
)
}
jobObj <- list(jobId = job$id,
metadata = metadata,
tasks = tasks)
return(jobObj)
}
#' Get a list of job statuses from the given filter
#'
#' @param filter A filter containing job state
#'
#' @examples
#' \dontrun{
#' getJobList()
#' }
#' @export
getJobList <- function(filter = NULL) {
filterClause <- ""
if (!is.null(filter)) {
if (!is.null(filter$state)) {
for (i in 1:length(filter$state)) {
filterClause <-
paste0(filterClause,
sprintf("state eq '%s'", filter$state[i]),
" or ")
}
filterClause <-
substr(filterClause, 1, nchar(filterClause) - 3)
}
}
jobs <-
rAzureBatch::listJobs(query = list("$filter" = filterClause, "$select" = "id,state"))
id <- character(length(jobs$value))
state <- character(length(jobs$value))
status <- character(length(jobs$value))
failedTasks <- integer(length(jobs$value))
totalTasks <- integer(length(jobs$value))
if (length(jobs$value) > 0) {
if (is.null(jobs$value[[1]]$id)) {
stop(jobs$value)
}
for (j in 1:length(jobs$value)) {
id[j] <- jobs$value[[j]]$id
state[j] <- jobs$value[[j]]$state
taskCounts <-
rAzureBatch::getJobTaskCounts(jobId = jobs$value[[j]]$id)
failedTasks[j] <-
as.integer(taskCounts$failed)
totalTasks[j] <-
as.integer(taskCounts$active + taskCounts$running + taskCounts$completed)
completed <- as.integer(taskCounts$completed)
if (totalTasks[j] > 0) {
status[j] <-
sprintf("%s %%", ceiling(completed / totalTasks[j] * 100))
}
else {
status[j] <- "No tasks in the job"
}
}
}
return (
data.frame(
Id = id,
State = state,
Status = status,
FailedTasks = failedTasks,
TotalTasks = totalTasks
)
)
}
#' Download the results of the job
#' @param jobId The jobId to download from
#'
#' @return The results from the job.
#' @examples
#' \dontrun{
#' getJobResult(jobId = "job-001")
#' }
#' @export
getJobResult <- function(jobId) {
cat("Getting job results...", fill = TRUE)
if (nchar(jobId) < 3) {
stop("jobId must contain at least 3 characters.")
}
tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds")
results <- rAzureBatch::downloadBlob(
jobId,
paste0("result/", jobId, "-merge-result.rds"),
downloadPath = tempFile,
overwrite = TRUE
)
if (is.vector(results)) {
results <- readRDS(tempFile)
}
return(results)
}
#' Wait for current tasks to complete
#'
#' @export
waitForTasksToComplete <-
function(jobId, timeout, errorHandling = "stop") {
cat("Waiting for tasks to complete. . .", fill = TRUE)
totalTasks <- 0
currentTasks <- rAzureBatch::listTask(jobId)
if (is.null(currentTasks$value)) {
stop(paste0("Error: ", currentTasks$message$value))
return()
}
totalTasks <- totalTasks + length(currentTasks$value)
# Getting the total count of tasks for progress bar
repeat {
if (is.null(currentTasks$odata.nextLink)) {
break
}
skipTokenParameter <-
strsplit(currentTasks$odata.nextLink, "&")[[1]][2]
skipTokenValue <-
substr(skipTokenParameter,
nchar("$skiptoken=") + 1,
nchar(skipTokenParameter))
currentTasks <-
rAzureBatch::listTask(jobId, skipToken = URLdecode(skipTokenValue))
totalTasks <- totalTasks + length(currentTasks$value)
}
pb <- txtProgressBar(min = 0, max = totalTasks, style = 3)
timeToTimeout <- Sys.time() + timeout
repeat {
taskCounts <- rAzureBatch::getJobTaskCounts(jobId)
setTxtProgressBar(pb, taskCounts$completed)
validationFlag <-
(taskCounts$validationStatus == "Validated" &&
totalTasks <= 200000) ||
totalTasks > 200000
if (taskCounts$failed > 0 &&
errorHandling == "stop" &&
validationFlag) {
cat("\n")
select <- "id, executionInfo"
failedTasks <-
rAzureBatch::listTask(jobId, select = select)
tasksFailureWarningLabel <-
sprintf(
paste(
"%i task(s) failed while running the job.",
"This caused the job to terminate automatically.",
"To disable this behavior and continue on failure, set .errorHandling='remove | pass'",
"in the foreach loop\n"
),
taskCounts$failed
)
for (i in 1:length(failedTasks$value)) {
if (failedTasks$value[[i]]$executionInfo$result == "Failure") {
tasksFailureWarningLabel <-
paste0(tasksFailureWarningLabel,
sprintf("%s\n", failedTasks$value[[i]]$id))
}
}
warning(sprintf(tasksFailureWarningLabel,
taskCounts$failed))
response <- rAzureBatch::terminateJob(jobId)
httr::stop_for_status(response)
stop(sprintf(
paste(
"Errors have occurred while running the job '%s'.",
"Error handling is set to 'stop' and has proceeded to terminate the job.",
"The user will have to handle deleting the job.",
"If this is not the correct behavior, change the errorHandling property to 'pass'",
" or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.",
"For more information about getting job logs, follow this link:",
paste0(
"https://github.com/Azure/doAzureParallel/blob/master/docs/",
"40-troubleshooting.md#viewing-files-directly-from-compute-node"
)
),
jobId
))
}
if (Sys.time() > timeToTimeout) {
stop(sprintf(
paste(
"Timeout has occurred while waiting for tasks to complete.",
"Users will have to manually track the job '%s' and get the results.",
"Use the getJobResults function to obtain the results and getJobList for",
"tracking job status. To change the timeout, set 'timeout' property in the",
"foreach's options.azure."
)
),
jobId)
}
if (taskCounts$completed >= totalTasks &&
(taskCounts$validationStatus == "Validated" ||
totalTasks >= 200000)) {
cat("\n")
return(0)
}
Sys.sleep(10)
}
}
waitForJobPreparation <- function(jobId, poolId) {
cat("Job Preparation Status: Package(s) being installed")
filter <- paste(
sprintf("poolId eq '%s' and", poolId),
"jobPreparationTaskExecutionInfo/state eq 'completed'"
)
select <- "jobPreparationTaskExecutionInfo"
repeat {
statuses <- rAzureBatch::getJobPreparationStatus(jobId,
content = "parsed",
filter = filter,
select = select)
statuses <- sapply(statuses$value, function(x) {
x$jobPreparationTaskExecutionInfo$result == "Success"
})
if (TRUE %in% statuses) {
break
}
# Verify that all the job preparation tasks are not failing
if (all(FALSE %in% statuses)) {
cat("\n")
stop(
paste(
sprintf("Job '%s' unable to install packages.", jobId),
"Use the 'getJobFile' function to get more information about",
"job package installation."
)
)
}
cat(".")
Sys.sleep(10)
}
cat("\n")
}

Просмотреть файл

@ -1,207 +1,3 @@
getJobPackageInstallationCommand <- function(type, packages) {
script <- ""
if (type == "cran") {
script <- "Rscript $AZ_BATCH_JOB_PREP_WORKING_DIR/install_cran.R"
}
else if (type == "github") {
script <- "Rscript $AZ_BATCH_JOB_PREP_WORKING_DIR/install_github.R"
}
else if (type == "bioconductor") {
script <- "Rscript $AZ_BATCH_JOB_PREP_WORKING_DIR/install_bioconductor.R"
}
else {
stop("Using an incorrect package source")
}
if (!is.null(packages) && length(packages) > 0) {
packageCommands <- paste0(packages, collapse = " ")
script <- paste0(script, " ", packageCommands)
}
}
getPoolPackageInstallationCommand <- function(type, packages) {
poolInstallationCommand <- character(length(packages))
if (type == "cran") {
script <-
"Rscript -e \'args <- commandArgs(TRUE)\' -e \'options(warn=2)\' -e \'install.packages(args[1])\' %s"
}
else if (type == "github") {
script <-
"Rscript -e \'args <- commandArgs(TRUE)\' -e \'options(warn=2)\' -e \'devtools::install_github(args[1])\' %s"
}
else if (type == "bioconductor") {
script <-
"Rscript -e \'args <- commandArgs(TRUE)\' -e \'options(warn=2)\' -e \'BiocInstaller::biocLite(args[1])\' %s"
}
else {
stop("Using an incorrect package source")
}
for (i in 1:length(packages)) {
poolInstallationCommand[i] <- sprintf(script, packages[i])
}
poolInstallationCommand
}
linuxWrapCommands <- function(commands = c()) {
# Do not allow absolute paths is enforced in lintr
commandLine <-
sprintf("/bin/bash -c \"set -e; set -o pipefail; %s wait\"",
paste0(paste(
commands, sep = " ", collapse = "; "
), ";"))
commandLine
}
#' Get a list of job statuses from the given filter
#'
#' @param filter A filter containing job state
#'
#' @examples
#' \dontrun{
#' getJobList()
#' }
#' @export
getJobList <- function(filter = NULL) {
filterClause <- ""
if (!is.null(filter)) {
if (!is.null(filter$state)) {
for (i in 1:length(filter$state)) {
filterClause <-
paste0(filterClause,
sprintf("state eq '%s'", filter$state[i]),
" or ")
}
filterClause <-
substr(filterClause, 1, nchar(filterClause) - 3)
}
}
jobs <-
rAzureBatch::listJobs(query = list("$filter" = filterClause, "$select" = "id,state"))
id <- character(length(jobs$value))
state <- character(length(jobs$value))
status <- character(length(jobs$value))
failedTasks <- integer(length(jobs$value))
totalTasks <- integer(length(jobs$value))
if (length(jobs$value) > 0) {
if (is.null(jobs$value[[1]]$id)) {
stop(jobs$value)
}
for (j in 1:length(jobs$value)) {
id[j] <- jobs$value[[j]]$id
state[j] <- jobs$value[[j]]$state
taskCounts <-
rAzureBatch::getJobTaskCounts(jobId = jobs$value[[j]]$id)
failedTasks[j] <-
as.integer(taskCounts$failed)
totalTasks[j] <-
as.integer(taskCounts$active + taskCounts$running + taskCounts$completed)
completed <- as.integer(taskCounts$completed)
if (totalTasks[j] > 0) {
status[j] <-
sprintf("%s %%", ceiling(completed / totalTasks[j] * 100))
}
else {
status[j] <- "No tasks in the job"
}
}
}
return (
data.frame(
Id = id,
State = state,
Status = status,
FailedTasks = failedTasks,
TotalTasks = totalTasks
)
)
}
#' Get a job for the given job id
#'
#' @param jobId A job id
#' @param verbose show verbose log output
#'
#' @examples
#' \dontrun{
#' getJob("job-001", FALSE)
#' }
#' @export
getJob <- function(jobId, verbose = TRUE) {
if (is.null(jobId)) {
stop("must specify the jobId parameter")
}
job <- rAzureBatch::getJob(jobId = jobId)
metadata <-
list(
chunkSize = 1,
enableCloudCombine = "TRUE",
packages = ""
)
if (!is.null(job$metadata)) {
for (i in 1:length(job$metadata)) {
metadata[[job$metadata[[i]]$name]] <- job$metadata[[i]]$value
}
}
if (verbose == TRUE) {
cat(sprintf("Job Id: %s", job$id), fill = TRUE)
cat("\njob metadata:", fill = TRUE)
cat(sprintf("\tchunkSize: %s", metadata$chunkSize),
fill = TRUE)
cat(sprintf("\tenableCloudCombine: %s", metadata$enableCloudCombine),
fill = TRUE)
cat(sprintf("\tpackages: %s", metadata$packages),
fill = TRUE)
}
taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId)
tasks <- list(
active = taskCounts$active,
running = taskCounts$running,
completed = taskCounts$completed,
succeeded = taskCounts$succeeded,
failed = taskCounts$failed
)
if (verbose == TRUE) {
cat("\ntasks:", fill = TRUE)
cat(sprintf("\tactive: %s", taskCounts$active), fill = TRUE)
cat(sprintf("\trunning: %s", taskCounts$running), fill = TRUE)
cat(sprintf("\tcompleted: %s", taskCounts$completed), fill = TRUE)
cat(sprintf("\t\tsucceeded: %s", taskCounts$succeeded), fill = TRUE)
cat(sprintf("\t\tfailed: %s", taskCounts$failed), fill = TRUE)
cat(
sprintf(
"\ttotal: %s",
taskCounts$active + taskCounts$running + taskCounts$completed
),
fill = TRUE
)
}
jobObj <- list(jobId = job$id,
metadata = metadata,
tasks = tasks)
return(jobObj)
}
#' Polling method to check status of cluster boot up
#'
#' @param poolId The cluster name to poll for
@ -214,7 +10,6 @@ getJob <- function(jobId, verbose = TRUE) {
#' @export
waitForNodesToComplete <- function(poolId, timeout = 86400) {
cat("Booting compute nodes. . . ", fill = TRUE)
pool <- rAzureBatch::getPool(poolId)
# Validate the getPool request first, before setting the progress bar
@ -332,38 +127,6 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) {
stop("Timeout expired")
}
#' Download the results of the job
#' @param jobId The jobId to download from
#'
#' @return The results from the job.
#' @examples
#' \dontrun{
#' getJobResult(jobId = "job-001")
#' }
#' @export
getJobResult <- function(jobId) {
cat("Getting job results...", fill = TRUE)
if (nchar(jobId) < 3) {
stop("jobId must contain at least 3 characters.")
}
tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds")
results <- rAzureBatch::downloadBlob(
jobId,
paste0("result/", jobId, "-merge-result.rds"),
downloadPath = tempFile,
overwrite = TRUE
)
if (is.vector(results)) {
results <- readRDS(tempFile)
}
return(results)
}
#' Utility function for creating an output file
#'
#' @param filePattern a pattern indicating which file(s) to upload
@ -412,160 +175,6 @@ createOutputFile <- function(filePattern, url) {
output
}
#' Wait for current tasks to complete
#'
#' @export
waitForTasksToComplete <-
function(jobId, timeout, errorHandling = "stop") {
cat("Waiting for tasks to complete. . .", fill = TRUE)
totalTasks <- 0
currentTasks <- rAzureBatch::listTask(jobId)
if (is.null(currentTasks$value)) {
stop(paste0("Error: ", currentTasks$message$value))
return()
}
totalTasks <- totalTasks + length(currentTasks$value)
# Getting the total count of tasks for progress bar
repeat {
if (is.null(currentTasks$odata.nextLink)) {
break
}
skipTokenParameter <-
strsplit(currentTasks$odata.nextLink, "&")[[1]][2]
skipTokenValue <-
substr(skipTokenParameter,
nchar("$skiptoken=") + 1,
nchar(skipTokenParameter))
currentTasks <-
rAzureBatch::listTask(jobId, skipToken = URLdecode(skipTokenValue))
totalTasks <- totalTasks + length(currentTasks$value)
}
pb <- txtProgressBar(min = 0, max = totalTasks, style = 3)
timeToTimeout <- Sys.time() + timeout
repeat {
taskCounts <- rAzureBatch::getJobTaskCounts(jobId)
setTxtProgressBar(pb, taskCounts$completed)
validationFlag <-
(taskCounts$validationStatus == "Validated" &&
totalTasks <= 200000) ||
totalTasks > 200000
if (taskCounts$failed > 0 &&
errorHandling == "stop" &&
validationFlag) {
cat("\n")
select <- "id, executionInfo"
failedTasks <-
rAzureBatch::listTask(jobId, select = select)
tasksFailureWarningLabel <-
sprintf(paste("%i task(s) failed while running the job.",
"This caused the job to terminate automatically.",
"To disable this behavior and continue on failure, set .errorHandling='remove | pass'",
"in the foreach loop\n"), taskCounts$failed)
for (i in 1:length(failedTasks$value)) {
if (failedTasks$value[[i]]$executionInfo$result == "Failure") {
tasksFailureWarningLabel <-
paste0(tasksFailureWarningLabel,
sprintf("%s\n", failedTasks$value[[i]]$id))
}
}
warning(sprintf(tasksFailureWarningLabel,
taskCounts$failed))
response <- rAzureBatch::terminateJob(jobId)
httr::stop_for_status(response)
stop(sprintf(
paste("Errors have occurred while running the job '%s'.",
"Error handling is set to 'stop' and has proceeded to terminate the job.",
"The user will have to handle deleting the job.",
"If this is not the correct behavior, change the errorHandling property to 'pass'",
" or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.",
"For more information about getting job logs, follow this link:",
paste0("https://github.com/Azure/doAzureParallel/blob/master/docs/",
"40-troubleshooting.md#viewing-files-directly-from-compute-node")),
jobId
))
}
if (Sys.time() > timeToTimeout) {
stop(sprintf(paste("Timeout has occurred while waiting for tasks to complete.",
"Users will have to manually track the job '%s' and get the results.",
"Use the getJobResults function to obtain the results and getJobList for",
"tracking job status. To change the timeout, set 'timeout' property in the",
"foreach's options.azure.")),
jobId)
}
if (taskCounts$completed >= totalTasks &&
(taskCounts$validationStatus == "Validated" ||
totalTasks >= 200000)) {
cat("\n")
return(0)
}
Sys.sleep(10)
}
}
waitForJobPreparation <- function(jobId, poolId) {
cat("Job Preparation Status: Package(s) being installed")
filter <- paste(
sprintf("poolId eq '%s' and", poolId),
"jobPreparationTaskExecutionInfo/state eq 'completed'"
)
select <- "jobPreparationTaskExecutionInfo"
repeat {
statuses <- rAzureBatch::getJobPreparationStatus(jobId,
content = "parsed",
filter = filter,
select = select)
statuses <- sapply(statuses$value, function(x) {
x$jobPreparationTaskExecutionInfo$result == "Success"
})
if (TRUE %in% statuses) {
break
}
# Verify that all the job preparation tasks are not failing
if (all(FALSE %in% statuses)) {
cat("\n")
stop(
paste(
sprintf("Job '%s' unable to install packages.", jobId),
"Use the 'getJobFile' function to get more information about",
"job package installation."
)
)
}
cat(".")
Sys.sleep(10)
}
cat("\n")
}
getXmlValues <- function(xmlResponse, xmlPath) {
xml2::xml_text(xml2::xml_find_all(xmlResponse, xmlPath))
}

Просмотреть файл

@ -1,121 +1,148 @@
validateClusterConfig <- function(clusterFilePath) {
if (file.exists(clusterFilePath)) {
pool <- rjson::fromJSON(file = clusterFilePath)
}
else{
pool <- rjson::fromJSON(file = file.path(getwd(), clusterFilePath))
}
validationClass <- R6::R6Class(
"validationClass",
lock_objects = TRUE,
public = list(
isValidStorageContainerName = function(storageContainerName) {
if (!grepl("^([a-z]|[0-9]|[-]){3,64}$", storageContainerName)) {
stop(paste("Storage Container names can contain only lowercase letters, numbers,",
"and the dash (-) character. Names must be 3 through 64 characters long."))
}
},
isValidPoolName = function(poolName) {
if (!grepl("^([a-zA-Z0-9]|[-]|[_]){1,64}$", poolName)) {
stop(paste("The pool name can contain any combination of alphanumeric characters",
"including hyphens and underscores, and cannot contain more",
"than 64 characters."))
}
},
isValidJobName = function(jobName) {
if (!grepl("^([a-zA-Z0-9]|[-]|[_]){1,64}$", jobName)) {
stop(paste("The job name can contain any combination of alphanumeric characters",
"including hyphens and underscores, and cannot contain more",
"than 64 characters."))
}
},
# Validating cluster configuration files below doAzureParallel version 0.3.2
isValidDeprecatedClusterConfig = function(clusterFilePath) {
if (file.exists(clusterFilePath)) {
poolConfig <- rjson::fromJSON(file = clusterFilePath)
}
else{
poolConfig <-
rjson::fromJSON(file = file.path(getwd(), clusterFilePath))
}
if (is.null(pool$poolSize)) {
stop("Missing poolSize entry")
}
if (is.null(poolConfig$pool$poolSize)) {
stop("Missing poolSize entry")
}
if (is.null(pool$poolSize$dedicatedNodes)) {
stop("Missing dedicatedNodes entry")
}
if (is.null(poolConfig$pool$poolSize$dedicatedNodes)) {
stop("Missing dedicatedNodes entry")
}
if (is.null(pool$poolSize$lowPriorityNodes)) {
stop("Missing lowPriorityNodes entry")
}
if (is.null(poolConfig$pool$poolSize$lowPriorityNodes)) {
stop("Missing lowPriorityNodes entry")
}
if (is.null(pool$poolSize$autoscaleFormula)) {
stop("Missing autoscaleFormula entry")
}
if (is.null(poolConfig$pool$poolSize$autoscaleFormula)) {
stop("Missing autoscaleFormula entry")
}
if (is.null(pool$poolSize$dedicatedNodes$min)) {
stop("Missing dedicatedNodes$min entry")
}
if (is.null(poolConfig$pool$poolSize$dedicatedNodes$min)) {
stop("Missing dedicatedNodes$min entry")
}
if (is.null(pool$poolSize$dedicatedNodes$max)) {
stop("Missing dedicatedNodes$max entry")
}
if (is.null(poolConfig$pool$poolSize$dedicatedNodes$max)) {
stop("Missing dedicatedNodes$max entry")
}
if (is.null(pool$poolSize$lowPriorityNodes$min)) {
stop("Missing lowPriorityNodes$min entry")
}
if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$min)) {
stop("Missing lowPriorityNodes$min entry")
}
if (is.null(pool$poolSize$lowPriorityNodes$max)) {
stop("Missing lowPriorityNodes$max entry")
}
if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$max)) {
stop("Missing lowPriorityNodes$max entry")
}
stopifnot(is.character(pool$name))
stopifnot(is.character(pool$vmSize))
stopifnot(is.character(pool$poolSize$autoscaleFormula))
stopifnot(pool$poolSize$autoscaleFormula %in% names(autoscaleFormula))
stopifnot(is.character(poolConfig$pool$name))
stopifnot(is.character(poolConfig$pool$vmSize))
stopifnot(is.character(poolConfig$pool$poolSize$autoscaleFormula))
stopifnot(poolConfig$pool$poolSize$autoscaleFormula %in% names(autoscaleFormula))
stopifnot(pool$poolSize$dedicatedNodes$min <= pool$poolSize$dedicatedNodes$max)
stopifnot(pool$poolSize$lowPriorityNodes$min <= pool$poolSize$lowPriorityNodes$max)
stopifnot(pool$maxTasksPerNode >= 1)
stopifnot(
poolConfig$pool$poolSize$dedicatedNodes$min <= poolConfig$pool$poolSize$dedicatedNodes$max
)
stopifnot(
poolConfig$pool$poolSize$lowPriorityNodes$min <= poolConfig$pool$poolSize$lowPriorityNodes$max
)
stopifnot(poolConfig$pool$maxTasksPerNode >= 1)
stopifnot(is.double(pool$poolSize$dedicatedNodes$min))
stopifnot(is.double(pool$poolSize$dedicatedNodes$max))
stopifnot(is.double(pool$poolSize$lowPriorityNodes$min))
stopifnot(is.double(pool$poolSize$lowPriorityNodes$max))
stopifnot(is.double(pool$maxTasksPerNode))
stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$min))
stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$max))
stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$min))
stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$max))
stopifnot(is.double(poolConfig$pool$maxTasksPerNode))
TRUE
}
TRUE
},
isValidClusterConfig = function(clusterFilePath) {
if (file.exists(clusterFilePath)) {
pool <- rjson::fromJSON(file = clusterFilePath)
}
else{
pool <- rjson::fromJSON(file = file.path(getwd(), clusterFilePath))
}
# Validating cluster configuration files below doAzureParallel version 0.3.2
validateDeprecatedClusterConfig <- function(clusterFilePath) {
if (file.exists(clusterFilePath)) {
poolConfig <- rjson::fromJSON(file = clusterFilePath)
}
else{
poolConfig <-
rjson::fromJSON(file = file.path(getwd(), clusterFilePath))
}
if (is.null(pool$poolSize)) {
stop("Missing poolSize entry")
}
if (is.null(poolConfig$pool$poolSize)) {
stop("Missing poolSize entry")
}
if (is.null(pool$poolSize$dedicatedNodes)) {
stop("Missing dedicatedNodes entry")
}
if (is.null(poolConfig$pool$poolSize$dedicatedNodes)) {
stop("Missing dedicatedNodes entry")
}
if (is.null(pool$poolSize$lowPriorityNodes)) {
stop("Missing lowPriorityNodes entry")
}
if (is.null(poolConfig$pool$poolSize$lowPriorityNodes)) {
stop("Missing lowPriorityNodes entry")
}
if (is.null(pool$poolSize$autoscaleFormula)) {
stop("Missing autoscaleFormula entry")
}
if (is.null(poolConfig$pool$poolSize$autoscaleFormula)) {
stop("Missing autoscaleFormula entry")
}
if (is.null(pool$poolSize$dedicatedNodes$min)) {
stop("Missing dedicatedNodes$min entry")
}
if (is.null(poolConfig$pool$poolSize$dedicatedNodes$min)) {
stop("Missing dedicatedNodes$min entry")
}
if (is.null(pool$poolSize$dedicatedNodes$max)) {
stop("Missing dedicatedNodes$max entry")
}
if (is.null(poolConfig$pool$poolSize$dedicatedNodes$max)) {
stop("Missing dedicatedNodes$max entry")
}
if (is.null(pool$poolSize$lowPriorityNodes$min)) {
stop("Missing lowPriorityNodes$min entry")
}
if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$min)) {
stop("Missing lowPriorityNodes$min entry")
}
if (is.null(pool$poolSize$lowPriorityNodes$max)) {
stop("Missing lowPriorityNodes$max entry")
}
if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$max)) {
stop("Missing lowPriorityNodes$max entry")
}
stopifnot(is.character(pool$name))
stopifnot(is.character(pool$vmSize))
stopifnot(is.character(pool$poolSize$autoscaleFormula))
stopifnot(pool$poolSize$autoscaleFormula %in% names(autoscaleFormula))
stopifnot(is.character(poolConfig$pool$name))
stopifnot(is.character(poolConfig$pool$vmSize))
stopifnot(is.character(poolConfig$pool$poolSize$autoscaleFormula))
stopifnot(poolConfig$pool$poolSize$autoscaleFormula %in% names(autoscaleFormula))
stopifnot(pool$poolSize$dedicatedNodes$min <= pool$poolSize$dedicatedNodes$max)
stopifnot(pool$poolSize$lowPriorityNodes$min <= pool$poolSize$lowPriorityNodes$max)
stopifnot(pool$maxTasksPerNode >= 1)
stopifnot(
poolConfig$pool$poolSize$dedicatedNodes$min <= poolConfig$pool$poolSize$dedicatedNodes$max
stopifnot(is.double(pool$poolSize$dedicatedNodes$min))
stopifnot(is.double(pool$poolSize$dedicatedNodes$max))
stopifnot(is.double(pool$poolSize$lowPriorityNodes$min))
stopifnot(is.double(pool$poolSize$lowPriorityNodes$max))
stopifnot(is.double(pool$maxTasksPerNode))
TRUE
}
)
stopifnot(
poolConfig$pool$poolSize$lowPriorityNodes$min <= poolConfig$pool$poolSize$lowPriorityNodes$max
)
stopifnot(poolConfig$pool$maxTasksPerNode >= 1)
)
stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$min))
stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$max))
stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$min))
stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$max))
stopifnot(is.double(poolConfig$pool$maxTasksPerNode))
TRUE
}
`validation` <- validationClass$new()

Просмотреть файл

@ -1,28 +0,0 @@
Validators <- R6::R6Class(
"Validators",
lock_objects = TRUE,
public = list(
isValidStorageContainerName = function(storageContainerName) {
if (!grepl("^([a-z]|[0-9]|[-]){3,64}$", storageContainerName)) {
stop(paste("Storage Container names can contain only lowercase letters, numbers,",
"and the dash (-) character. Names must be 3 through 64 characters long."))
}
},
isValidPoolName = function(poolName) {
if (!grepl("^([a-zA-Z0-9]|[-]|[_]){1,64}$", poolName)) {
stop(paste("The pool name can contain any combination of alphanumeric characters",
"including hyphens and underscores, and cannot contain more",
"than 64 characters."))
}
},
isValidJobName = function(jobName) {
if (!grepl("^([a-zA-Z0-9]|[-]|[_]){1,64}$", jobName)) {
stop(paste("The job name can contain any combination of alphanumeric characters",
"including hyphens and underscores, and cannot contain more",
"than 64 characters."))
}
}
)
)
`Validators` <- Validators$new()

Просмотреть файл

@ -44,10 +44,9 @@ Install doAzureParallel directly from Github.
```R
# install the package devtools
install.packages("devtools")
library(devtools)
# install the doAzureParallel and rAzureBatch package
install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel"))
devtools::install_github("Azure/doAzureParallel")
```
## Azure Requirements

Просмотреть файл

@ -0,0 +1,21 @@
FROM ubuntu:16.04
# Install minimum requirements
RUN apt-get update -y
RUN apt-get install -y wget
RUN apt-get install -y build-essential
# Download MRO
RUN wget https://mran.microsoft.com/install/mro/3.4.1/microsoft-r-open-3.4.1.tar.gz
# Untar the file
RUN tar -xf microsoft-r-open-3.4.1.tar.gz
# Install
RUN ./microsoft-r-open/install.sh
# Clean up
RUN rm ./microsoft-r-open-3.4.1.tar.gz
RUN rm ./microsoft-r-open/install.sh
CMD ["R"]

Просмотреть файл

@ -0,0 +1,28 @@
FROM mro-base:3.4.1
# Install basic apt packages
RUN apt-get update && apt-get -y --no-install-recommends install \
file \
git \
libapparmor1 \
libcurl4-openssl-dev \
libedit2 \
libssl-dev \
lsb-release \
psmisc \
python-setuptools \
sudo \
wget \
libxml2-dev \
libcairo2-dev \
libsqlite-dev \
libmariadbd-dev \
libmariadb-client-lgpl-dev \
libpq-dev \
libssh2-1-dev
# Install basic R pacakges
RUN R -e "install.packages(c('devtools', 'ggplot2'))"
# Install bioconductor
RUN R -e "source('https://bioconductor.org/biocLite.R')"

Просмотреть файл

@ -12,7 +12,8 @@ You can install packages by specifying the package(s) in your JSON pool configur
...
"rPackages": {
"cran": ["some_cran_package_name", "some_other_cran_package_name"],
"github": ["github_username/github_package_name", "another_github_username/another_github_package_name"]
"github": ["github_username/github_package_name", "another_github_username/another_github_package_name"],
"bioconductor": ["IRanges"]
},
...
}
@ -44,7 +45,7 @@ When the cluster is created the token is passed in as an environment variable ca
"rPackages": {
"cran": [],
"github": ["<project/some_private_repository>"],
"githubAuthenticationToken": "<github_authentication_token>"
"bioconductor": []
},
"commandLine": []
}
@ -54,43 +55,77 @@ When the cluster is created the token is passed in as an environment variable ca
_More information regarding github authentication tokens can be found [here](https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/)_
## Installing Packages per-*foreach* Loop
You can also install packages by using the **.packages** option in the *foreach* loop. Instead of installing packages during pool creation, packages (and it's dependencies) can be installed before each iteration in the loop is run on your Azure cluster.
You can also install cran packages by using the **.packages** option in the *foreach* loop. You can also install github/bioconductor packages by using the **github** and **bioconductor" option in the *foreach* loop. Instead of installing packages during pool creation, packages (and its dependencies) can be installed before each iteration in the loop is run on your Azure cluster.
To install a single package:
To install a single cran package:
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations, .packages='some_package') %dopar% { ... }
```
To install multiple packages:
To install multiple cran packages:
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations, .packages=c('package_1', 'package_2')) %dopar% { ... }
```
Installing packages from github using this method is not yet supported.
## Installing Packages from BioConductor
Currently there is no native support for Bioconductor package installation, but it can be achieved by installing the packages directly in your environment or using the 'commandLine' feature in the cluster configuration. We recommend using the 'commandLine' to install the base BioConductor package and then install additional packages through the 'commandLine'.
### Installing BioConductor using the 'commandLine'
We recommend using the [script provided in the samples](../samples/package_management/bioc_setup.sh) section of this project which will install the required pre-requisites for BioConductor as well as BioConductor itself.
In the example below, the script will install BioConductor and install the GenomeInfoDB and IRanges packages. Simply update your cluster configuration commandLine as follows:
```json
"commandLine": [
"wget https://raw.githubusercontent.com/Azure/doAzureParallel/master/samples/package_management/bioc_setup.sh",
"chmod u+x ./bioc_setup.sh",
"./bioc_setup.sh",
"wget https://raw.githubusercontent.com/Azure/doAzureParallel/master/inst/startup/install_bioconductor.R",
"chmod u+x ./install_bioconductor.R",
"Rscript install_bioconductor.R GenomeInfoDb IRange"]
To install a single github package:
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations, github='azure/rAzureBatch') %dopar% { ... }
```
Installing bioconductor packages within the _foreach_ code block is not supported, and should be specified and installed in the cluster config.
Please do not use "https://github.com/" as prefix for the github package name above.
A [working sample](../samples/package_management/bioconductor_cluster.json) can be found in the samples directory.
To install multiple github packages:
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations, github=c('package_1', 'package_2')) %dopar% { ... }
```
To install a single bioconductor package:
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations, bioconductor='some_package') %dopar% { ... }
```
To install multiple bioconductor packages:
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations, bioconductor=c('package_1', 'package_2')) %dopar% { ... }
```
## Installing Packages from BioConductor
The default deployment of R used in the cluster (see [Customizing the cluster](./30-customize-cluster.md) for more information) includes the Bioconductor installer by default. Simply add packages to the cluster by adding packages in the array.
```json
{
{
"name": <your pool name>,
"vmSize": <your pool VM size name>,
"maxTasksPerNode": <num tasks to allocate to each node>,
"poolSize": {
"dedicatedNodes": {
"min": 2,
"max": 2
},
"lowPriorityNodes": {
"min": 1,
"max": 10
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": [],
"github": [],
"bioconductor": ["IRanges"]
},
"commandLine": []
}
}
```
Note: Container references that are not provided by tidyverse do not support Bioconductor installs. If you choose another container, you must make sure that Biocondunctor is installed.
## Uninstalling packages
Uninstalling packages from your pool is not supported. However, you may consider rebuilding your pool.

Просмотреть файл

@ -29,7 +29,7 @@ results <- foreach(chunk = iter(chunks)) %dopar% {
Some workloads may require data pre-loaded into the cluster as soon as the cluster is provisioned. doAzureParallel supports this with the concept of a *resource file* - a file that is automatically downloaded to each node of the cluster after the cluster is created.
**NOTE** The default setting for storage containers is _private_. You can either use a [SAS](../samples/resource_files/sas_resource_files_example.R) to access the resources or [make the container public using the Azure Portal](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-manage-access-to-resources).
**NOTE** The default setting for storage containers is _private_. You can either use a [SAS](https://docs.microsoft.com/en-us/azure/storage/common/storage-dotnet-shared-access-signature-part-1) to access the resources or [make the container public using the Azure Portal](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-manage-access-to-resources).
**IMPORTANT** Public storage containers can be ready by anyone who knows the URL. We do not recommend storing any private or sensitive information in public storage containers!

Просмотреть файл

@ -1,19 +1,127 @@
# Running Commands when the Cluster Starts
# Customizing the cluster
There are several ways to control what gets deployed to a cluster. The most flexible and powerful method is to manage the docker container image that is used to provision the cluster. By default doAzureParallel uses containers to provision the R environement. Anything within the container will be available on all nodes in the cluster. The default container used in doAzureParallel is [rocker/tidyverse:latest](https://hub.docker.com/r/rocker/tidyverse/) developed and maintained by the rocker organization.
## Modifying the default docker container
Specifying a docker container is done by updating your cluster.json file. Simply adding a reference to the docker container in the cluster.json file 'containerImage' property will cause all new clusters to deploy that container to your cluster. doAzureParallel will use the version of R specified in the container.
```json
{
"name": "myPool",
"vmSize": "Standard_F2",
"maxTasksPerNode": 1,
"poolSize": {
"dedicatedNodes": {
"min": 0,
"max": 0
},
"lowPriorityNodes": {
"min": 1,
"max": 1
},
"autoscaleFormula": "QUEUE"
},
"containerImage": "rocker/tidyverse:3.4.1",
"rPackages": {
"cran": [],
"github": [],
"bioconductor": [],
"githubAuthenticationToken": ""
},
"commandLine": []
}
```
Note: \_If no 'containerImage' property is set, rocker/tidyverse:latest will be used. This usually points to one of the latest versions of R.\_
### Building your own container
Building your own container gives you the flexibility to package any specific requirements, packages or data you require for running your workloads. We recommend using a debian based OS such as debian or ubuntu to build your containers and pointing to where R is in the final CMD command. For example:
```dockerfile
FROM debian:stretch
...
CMD ["/usr/bin/R"]
```
Or alternitavely,
```dockerfile
FROM ubuntu:16.04
...
CMD ["R"]
```
There is no requirement to be debian based. For consistency with other pacakges it is recommeneded though. Please note though that the container **must be based off a Linux distribution as Windows is not supported**.
### List of tested container images
The following containers were tested and cover the most common cases for end users.
Container Image | R type | Description
--- | --- | ---
[rocker/tidyverse](https://hub.docker.com/r/rocker/r-ver/) | Open source R | Tidyverse is provided by the rocker org and uses a standard version of R developed by the open soruce community. rocker/tidyverse typically keeps up with the latest releases or R quite quickly and has versions back to R 3.1
[nuest/mro](https://hub.docker.com/r/nuest/mro/) | Microsoft R Open | [Microsoft R Open](https://mran.microsoft.com/open/) is an open source SKU of R that provides out of the box support for math packages, version pacakge support with MRAN and improved performance over standard Open Source R.
* We recommend reading the details of each package before using it to make sure you understand any limitaions or requirements of using the container images.
## Running Commands when the Cluster Starts
The commandline property in the cluster configuration file allows users to prepare the nodes' environments. For example, users can perform actions such as installing applications that your foreach loop requires.
Note: Batch clusters are run with Centos-OS Azure DSVMs.
Note: Batch clusters are provisioned with Ubuntu 16.04.
Note: All commands are already run as the sudo user, so there is no need to append sudo to your command line. _Commands may fail if you add the sudo user as part of the command._
Note: All commands are already run as the sudo user, so there is no need to append sudo to your command line. \_Commands may fail if you add the sudo user as part of the command.\_
```javascript
Note: All commands are run on the host node, not from within the container. This provides the most flexibility but also requires a bit of understanding on how to run code from within R and how to load directories correctly. See below for exposed environement variables, directories and examples.
```json
{
...
"commandLine": [
"yum install -y gdal gdal-devel",
"yum install -y proj-devel",
"yum install -y proj-nad",
"yum install -y proj-epsg"
"apt-get install -y wget",
"apt-get install -y libcurl4-openssl-dev",
"apt-get install -y curl"
]
}
```
### Environment variables for containers
The following Azure Batch environment variables are exposed into the container.
Environment Variable | Description
--- | ---
AZ\_BATCH\_NODE\_ROOT\_DIR | Root directory for all files on the node
AZ\_BATCH\_JOB\_ID | Job ID for the foreach loop
AZ\_BATCH\_TASK\_ID | Task ID for the task running the R loop instance
AZ\_BATCH\_TASK\_WORKING\_DIR | Working directory where all files for the R process are logged
AZ\_BATCH\_JOB\_PREP\_WORKING | Working directory where all files for packages in the foreach loop are logged
### Directories for containers
The following directories are mounted into the container.
Directory | Description
--- | ---
$AZ\_BATCH\_NODE\_ROOT\_DIR | Root directory for all files
$AZ\_BATCH\_NODE\_ROOT\_DIR\shared\R\packages | Shared directory where all packages are installed to by default.
### Examples
The following examples show how to configure the host node, or R package via the container.
#### Installing apt-get packages or configuring the host node
Configuring the host node is not a common operation but sometimes required. This can include installing packages, downloading data or setting up directories. The below example shows how to mount and Azure File Share to the node and expose it to the Azure Batch shared directory so it can be consumed by any R process running in the containers.
```json
{
"commandLine": [
"mkdir /mnt/batch/tasks/shared/fileshare",
"mount -t cifs //<STORAGE_ACCOUNT_NAME>.file.core.windows.net/<FILE_SHARE_NAME> /mnt/batch/tasks/shared/fileshare -o vers=3.0 username=<STORAGE_ACCOUNT_NAME>,password=<STORAGE_ACCOUNT_KEY>==,dir_mode=0777,file_mode=0777,sec=ntlmssp"
]
}
```
Within the container, you can now access that directory using the environment variable **AZ\_BATCH\_ROOT\_DIR**, for example $AZ\_BATCH\_ROOT\_DIR\shared\fileshare

Просмотреть файл

@ -0,0 +1,30 @@
#!/bin/bash
# Entry point for the start task. It will install the docker runtime and pull down the required docker images
# Usage:
# setup_node.sh [container_name]
container_name=$1
apt-get -y install linux-image-extra-$(uname -r) linux-image-extra-virtual
apt-get -y install apt-transport-https
apt-get -y install curl
apt-get -y install ca-certificates
apt-get -y install software-properties-common
# Install docker
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get -y update
apt-get -y install docker-ce
docker pull $container_name
# Check docker is running
docker info > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "UNKNOWN - Unable to talk to the docker daemon"
exit 3
fi
# Create required directories
mkdir -p /mnt/batch/tasks/shared/R/packages

Просмотреть файл

@ -1,7 +1,14 @@
#!/usr/bin/Rscript
args <- commandArgs(trailingOnly = TRUE)
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c("/mnt/batch/tasks/shared/R/packages", .libPaths()))
if (jobPrepDirectory != "") {
.libPaths(c(jobPrepDirectory, .libPaths()))
}
status <- tryCatch({
library(BiocInstaller)
for (package in args) {
if (!require(package, character.only = TRUE)) {

Просмотреть файл

@ -2,6 +2,8 @@
args <- commandArgs(trailingOnly = TRUE)
status <- tryCatch({
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(jobPrepDirectory, "/mnt/batch/tasks/shared/R/packages", .libPaths()))
for (package in args) {
if (!require(package, character.only = TRUE)) {
install.packages(pkgs = package)

Просмотреть файл

@ -1,19 +1,21 @@
#!/usr/bin/Rscript
args <- commandArgs(trailingOnly = TRUE)
# Assumption: devtools is already installed based on Azure DSVM
# Assumption: devtools is already installed in the container
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(jobPrepDirectory, "/mnt/batch/tasks/shared/R/packages", .libPaths()))
status <- tryCatch({
for (package in args) {
packageDirectory <- strsplit(package, "/")[[1]]
packageName <- packageDirectory[length(packageDirectory)]
for (package in args) {
packageDirectory <- strsplit(package, "/")[[1]]
packageName <- packageDirectory[length(packageDirectory)]
if (!require(package, character.only = TRUE)) {
devtools::install_github(packageDirectory)
require(package, character.only = TRUE)
if (!require(packageName, character.only = TRUE)) {
devtools::install_github(package)
require(packageName, character.only = TRUE)
}
}
return(0)
0
},
error = function(e) {
cat(sprintf(
@ -23,7 +25,7 @@ error = function(e) {
# Install packages doesn't return a non-exit code.
# Using '1' as the default non-exit code
return(1)
1
})
quit(save = "yes",

Просмотреть файл

@ -2,6 +2,13 @@
args <- commandArgs(trailingOnly = TRUE)
status <- 0
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(
jobPrepDirectory,
"/mnt/batch/tasks/shared/R/packages",
.libPaths()
))
isError <- function(x) {
inherits(x, "simpleError") || inherits(x, "try-error")
}
@ -40,12 +47,17 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
"result"),
full.names = TRUE)
if (errorHandling == "stop" && length(files) != batchTasksCount) {
stop(paste("Error handling is set to 'stop' and there are missing results due to",
"task failures. If this is not the correct behavior, change the errorHandling",
"property to 'pass' or 'remove' in the foreach object.",
"For more information on troubleshooting, check",
"https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md"))
if (errorHandling == "stop" &&
length(files) != batchTasksCount) {
stop(
paste(
"Error handling is set to 'stop' and there are missing results due to",
"task failures. If this is not the correct behavior, change the errorHandling",
"property to 'pass' or 'remove' in the foreach object.",
"For more information on troubleshooting, check",
"https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md"
)
)
}
results <- vector("list", length(files))

Просмотреть файл

@ -2,6 +2,13 @@
args <- commandArgs(trailingOnly = TRUE)
workerErrorStatus <- 0
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(
jobPrepDirectory,
"/mnt/batch/tasks/shared/R/packages",
.libPaths()
))
getparentenv <- function(pkgname) {
parenv <- NULL
@ -50,7 +57,8 @@ getparentenv <- function(pkgname) {
batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID")
batchJobPreparationDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchJobPreparationDirectory <-
Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
batchJobEnvironment <- paste0(batchJobId, ".rds")
@ -84,6 +92,9 @@ result <- lapply(taskArgs, function(args) {
},
error = function(e) {
workerErrorStatus <<- 1
print(e)
traceback()
e
})
})
@ -98,7 +109,8 @@ saveRDS(result,
paste0(batchTaskId, "-result.rds")
))
cat(paste0("Error Code: ", workerErrorStatus, fill = TRUE))
cat(paste0("Error Code: ", workerErrorStatus), fill = TRUE)
quit(save = "yes",
status = workerErrorStatus,
runLast = FALSE)

Просмотреть файл

@ -14,7 +14,7 @@ If you would like to see more samples, please reach out to [razurebatch@microsof
This sample uses the built-in email dataset to evaluate whether or not an email is spam. Using Caret, the code runs through random search using a 10-fold cross validation with 10 repeats. The classification algorithm used in the sample if Random Forest ('rf'), and each run is evaluated for ROC. Using doAzureParallel to create the backend, caret is able to distribute work to Azure and significantly speed up the work.
3. **Mandelbrot Simulation Benchmark** [(link)](./mandlebrot/mandlebrot_performance_test.ipynb)
3. **Mandelbrot Simulation Benchmark** [(link)](./mandelbrot/mandelbrot_performance_test.ipynb)
This sample uses doAzureParallel to compute the mandelbrot set. The code benchmarks the difference in performance for running local and running on a doAzureParallel cluster size of 10, 20, 40, and 80 cores.
@ -36,4 +36,4 @@ If you would like to see more samples, please reach out to [razurebatch@microsof
7. **Using Azure Files** [(link)](./azure_files/readme.md)
A quick introduction to setting up a distributed file system with Azure Files across all nodes in the cluster
A quick introduction to setting up a distributed file system with Azure Files across all nodes in the cluster

Просмотреть файл

@ -16,7 +16,7 @@ generateCredentialsConfig("credentials.json")
setCredentials("credentials.json")
# Create your cluster if not exist
cluster <- makeCluster("sample_cluster.json")
cluster <- makeCluster("azure_files_cluster.json")
# register your parallel backend
registerDoAzureParallel(cluster)
@ -33,11 +33,11 @@ getDoParWorkers()
# run on a different node. The output should be that both tasks outpu
# the same file list for each node.
files <- foreach(i = 1:2, .combine='rbind') %dopar% {
setwd('/mnt/data')
list.files()
setwd('/mnt/batch/tasks/shared/data')
x <- list.files()
return (x)
}
# Print result
files

Просмотреть файл

@ -16,9 +16,10 @@
"rPackages": {
"cran": [],
"github": [],
"bioconductor": [],
"githubAuthenticationToken": ""
},
"commandLine": [
"mkdir /mnt/data",
"mount -t cifs //<STORAGE_ACCOUNT_NAME>.file.core.windows.net/<FILE_SHARE_NAME> /mnt/data -o vers=3.0,username=<STORAGE_ACCOUNT_NAME>,password=<STORAGE_ACCOUNT_KEY>==,dir_mode=0777,file_mode=0777,sec=ntlmssp"]
}
"mkdir /mnt/batch/tasks/shared/data",
"mount -t cifs //<STORAGE_ACCOUNT_NAME>.file.core.windows.net/<FILE_SHARE_NAME> /mnt/batch/tasks/shared/data -o vers=3.0,username=<STORAGE_ACCOUNT_NAME>,password=<STORAGE_ACCOUNT_KEY>==,dir_mode=0777,file_mode=0777,sec=ntlmssp"]
}

Просмотреть файл

@ -6,8 +6,10 @@ This samples shows how to update the cluster configuration to create a new mount
**IMPORTANT** The cluster configuration files requires code to setup the file share. The exact command string to mount the drive can be found [here](https://docs.microsoft.com/en-us/azure/storage/files/storage-how-to-use-files-portal#connect-to-file-share) but remember to _remove_ the 'sudo' part of the command. All custom commands in a cluster are automatically run with elevated permissions and adding sudo will cause an error at node setup time.
**IMPORTANT** Since all of your processes are run within a container in the node, the number of directories mounted on the container are limited. Currently, only /mnt/batch/tasks is mounted into the container, so when you mount a drive it must be under that path. For example /mnt/batch/tasks/my/file/share. Note that any new directories under /mnt/batch/tasks __must first be created__ before mounting. Please see the provided azure\_files\_cluster.json as an example.
**IMPORTANT** Mounting Azure Files on non-azure machines has limited support. This service should be used for creating a shared files system in your doAzureParallel cluster. For managing files from your local machine we recommend [Azure Storage Explorer](https://azure.microsoft.com/en-us/features/storage-explorer/)
For large data sets or large traffic applications be sure to review the Azure Files [scalability and performance targets](https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets#scalability-targets-for-blobs-queues-tables-and-files).
For very large data sets we recommend using Azure Blobs. You can learn more in the [persistent storage](../../docs/23-persistent-storage.md) and [distrubuted data](../../docs/21-distributing-data.md) docs.
For very large data sets we recommend using Azure Blobs. You can learn more in the [persistent storage](../../docs/23-persistent-storage.md) and [distrubuted data](../../docs/21-distributing-data.md) docs.

Просмотреть файл

@ -0,0 +1,61 @@
# =================
# ===== Setup =====
# =================
# install packages
library(devtools)
install_github("azure/doazureparallel")
# import the doAzureParallel library and its dependencies
library(doAzureParallel)
# generate a credentials json file
generateCredentialsConfig("credentials.json")
# set your credentials
setCredentials("credentials.json")
# Create your cluster if not exist
cluster <- makeCluster("mandelbrot_cluster.json")
# register your parallel backend
registerDoAzureParallel(cluster)
# check that your workers are up
getDoParWorkers()
# ======================================
# ===== Compute the Mandelbrot Set =====
# ======================================
# Define Mandelbrot function
vmandelbrot <- function(xvec, y0, lim)
{
mandelbrot <- function(x0,y0,lim)
{
x <- x0; y <- y0
iter <- 0
while (x^2 + y^2 < 4 && iter < lim)
{
xtemp <- x^2 - y^2 + x0
y <- 2 * x * y + y0
x <- xtemp
iter <- iter + 1
}
iter
}
unlist(lapply(xvec, mandelbrot, y0=y0, lim=lim))
}
# Calculate Madelbrot
x.in <- seq(-2.0, 0.6, length.out=240)
y.in <- seq(-1.3, 1.3, length.out=240)
m <- 100
mset <- foreach(i=y.in, .combine=rbind, .options.azure = list(chunkSize=10)) %dopar% {
vmandelbrot(x.in, i, m)
}
# Plot image
image(x.in, y.in, t(mset), col=c(rainbow(m), '#000000'), useRaster=TRUE)

Просмотреть файл

@ -0,0 +1,23 @@
{
"name": "mandelbrot",
"vmSize": "Standard_F4",
"maxTasksPerNode": 4,
"poolSize": {
"dedicatedNodes": {
"min": 0,
"max": 0
},
"lowPriorityNodes": {
"min": 2,
"max": 2
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": [],
"github": [],
"bioconductor": [],
"githubAuthenticationToken": ""
},
"commandLine": []
}

Просмотреть файл

@ -0,0 +1,5 @@
# Mandelbrot
Calculating the Mandelbrot set is an embarassingly parallel problem that can easily be done using doAzureParallel. This sample shows how to set up a simple cluster of two nodes, generate the Mandelbrot set and render an image of it on the screen.
Also included in this directory is a notebook with a benchmark sample to show the performance difference of large Mandelbrot computations on your local workstation vs using doAzureParallel. This is a good sample to use if you would like to test out different VM sizes, maxTasksPerNode or chunk size settings to try to optimize your cluster.

Просмотреть файл

@ -0,0 +1,5 @@
# Monte Carlo
Using the Monte Carlo algorithm is a popular option for doing many financial modelling scenarios. In this sample we do a multiple pricing simulations for the closing price of a security. Part of the sample is to show the speed up of running locally without a parallel backend, and then using the cloud to leverage a cluster to do the same work.
To speed up the algorithm significantly play around with the number of nodes in the cluster, and the chunk size for the foreach loop. Currently it is set to 13 because we have 2 nodes, with 4 cores each (total of 8 cores) and we want to run 100 iterations of the loop. 100 / 8 ~= 13 so we set the chunk size to 13. If we have 32 cores, we may want to set the chunk size to 4 to spead out the work as evenly as possible across all the nodes and improve the total execution time.

Просмотреть файл

@ -0,0 +1,22 @@
{
"name": "montecarlo",
"vmSize": "Standard_F4",
"maxTasksPerNode": 4,
"poolSize": {
"dedicatedNodes": {
"min": 0,
"max": 0
},
"lowPriorityNodes": {
"min": 2,
"max": 2
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": [],
"github": [],
"bioconductor": []
},
"commandLine": []
}

Просмотреть файл

@ -4,23 +4,16 @@
# install packages
library(devtools)
install_github("azure/razurebatch")
install_github("azure/doazureparallel")
# import the doAzureParallel library and its dependencies
library(doAzureParallel)
# generate a credentials json file
generateCredentialsConfig("credentials.json")
# set your credentials
setCredentials("credentials.json")
# generate a cluster config file
generateClusterConfig("cluster.json")
# Create your cluster if not exist
cluster <- makeCluster("cluster.json")
cluster <- makeCluster("montecarlo_cluster.json")
# register your parallel backend
registerDoAzureParallel(cluster)
@ -37,18 +30,6 @@ mean_change = 1.001
volatility = 0.01
opening_price = 100
# define a function to simulate the movement of the stock price for one possible outcome over 5 years
simulateMovement <- function() {
days <- 1825 # ~ 5 years
movement <- rnorm(days, mean=mean_change, sd=volatility)
path <- cumprod(c(opening_price, movement))
return(path)
}
# run and plot 30 simulations
simulations <- replicate(30, simulateMovement())
matplot(simulations, type='l')
# define a new function to simulate closing prices
getClosingPrice <- function() {
days <- 1825 # ~ 5 years
@ -58,11 +39,35 @@ getClosingPrice <- function() {
return(closingPrice)
}
# Run 5 million simulations with doAzureParallel - we will run 50 iterations where each iteration executes 100000 simulations
closingPrices <- foreach(i = 1:50, .combine='c') %dopar% {
start_s <- Sys.time()
# Run 10,000 simulations in series
closingPrices_s <- foreach(i = 1:10, .combine='c') %do% {
replicate(1000, getClosingPrice())
}
end_s <- Sys.time()
# plot the 50 closing prices in a histogram to show the distribution of outcomes
hist(closingPrices_s)
# How long did it take?
difftime(end_s, start_s)
# Estimate runtime for 10 million (linear approximation)
1000 * difftime(end_s, start_s, unit = "min")
# Run 10 million simulations with doAzureParallel
# We will run 100 iterations where each iteration executes 100,000 simulations
opt <- list(chunkSize = 13) # optimizie runtime. Chunking allows us to run multiple iterations on a single instance of R.
start_p <- Sys.time()
closingPrices_p <- foreach(i = 1:100, .combine='c', .options.azure = opt) %dopar% {
replicate(100000, getClosingPrice())
}
end_p <- Sys.time()
# plot the 5 million closing prices in a histogram to show the distribution of outcomes
hist(closingPrices)
# How long did it take?
difftime(end_p, start_p, unit = "min")
# plot the 10 million closing prices in a histogram to show the distribution of outcomes
hist(closingPrices_p)

Просмотреть файл

@ -1,14 +1,69 @@
# Using package management
## BioConductor
doAzureParallel supports installing packages at either the cluster level or during the execution of the foreach loop. Packages installed at the cluster level benefit from only needing to be installed once per node. Each iteration of the foreach can load the library without needing to install them again. Packages installed in the foreach benefit from specifying any specific dependencies required only for that instance of the loop.
Currently, Bioconductor is not natively supported in doAzureParallel but enabling it only requires updating the cluster configuration. In the Bioconductor sample you can simply create a cluster using the bioconductor_cluster.json file and a cluster will be set up ready to go.
## Cluster level packages
Within your foreach loop, simply reference the Bioconductor library before running your algorithms.
Cluster level packages support CRAN, GitHub and BioConductor packages. The packages are installed in a shared directory on the node. It is important to note that it is required to explicitly load any packages installed at the cluster level within the foreach loop. For example, if you installed xml2 on the cluster, you must explicityly load it before using it.
```R
# Load the bioconductor libraries you want to use.
library(BiocInstaller)
foreach (i = 1:4) %dopar% {
# Load the libraries you want to use.
library(xml2)
xml2::as_list(...)
}
```
**IMPORTANT:** Using Bioconductor in doAzureParallel requires updating the default version of R on the nodes. The cluster setup scrips will download and install [Microsoft R Open version 3.4.0](https://mran.microsoft.com/download/) which is compatible with Bioconductor 3.4.
### CRAN
CRAN packages can be insatlled on the cluster by adding them to the collection of _cran_ packages in the cluster specification.
```json
"rPackages": {
"cran": ["package1", "package2", "..."],
"github": [],
"bioconductor": []
}
```
### GitHub
GitHub packages can be insatlled on the cluster by adding them to the collection of _github_ packages in the cluster specification.
```json
"rPackages": {
"cran": [],
"github": ["repo1/name1", "repo1/name2", "repo2/name1", "..."],
"bioconductor": []
}
```
**NOTE** When using packages from a private GitHub repository, you must add your GitHub authentication token to your credentials.json file.
### BioConductor
Installing bioconductor packages is now supported via the cluster configuration. Simply add the list of packages you want to have installed in the cluster configuration file and they will get automatically applied
```json
"rPackages": {
"cran": [],
"github": [],
"bioconductor": ["IRanges", "GenomeInofDb"]
}
```
**IMPORTANT** doAzureParallel uses the rocker/tidyverse Docker images by default, which comes with BioConductor pre-installed. If you use a different container image, make sure that bioconductor is installed on it.
## Foreach level packages
Foreach level packages currently only support CRAN packages. Unlike cluster level pacakges, when specifying packages on the foreach loop, packages will be automatically installed _and loaded_ for use.
### CRAN
```R
foreach(i = 1:4, .packages = c("xml2")) %dopar% {
# xml2 is automatically loaded an can be used without calling library(xml2)
xml2::as_list(...)
}
```

Просмотреть файл

@ -1,18 +0,0 @@
yum erase microsoft-r-open-mro-3.3* --assumeyes
if [ ! -d "microsoft-r-open" ]; then
# Download R
wget https://mran.microsoft.com/install/mro/3.4.0/microsoft-r-open-3.4.0.tar.gz
# Untar the file
tar -xf microsoft-r-open-3.4.0.tar.gz
# Install
./microsoft-r-open/install.sh
fi
# Update PATH on the node permanently
echo "export PATH=/usr/lib64/microsoft-r/3.4/lib64/R/bin:$PATH" >> /etc/environment
# Install bioconductor
Rscript -e 'source("https://bioconductor.org/biocLite.R")'

Просмотреть файл

@ -18,7 +18,12 @@ registerDoAzureParallel(cluster)
getDoParWorkers()
summary <- foreach(i = 1:1) %dopar% {
library(GenomeInofDb) # Already installed as part of the cluster configuration
library(GenomeInfoDb) # Already installed as part of the cluster configuration
library(IRanges) # Already installed as part of the cluster configuration
# You algorithm
}
sessionInfo()
# Your algorithm
}
summary

Просмотреть файл

@ -1,28 +0,0 @@
{
"name": "bioconductor",
"vmSize": "Standard_A2_v2",
"maxTasksPerNode": 1,
"poolSize": {
"dedicatedNodes": {
"min": 0,
"max": 0
},
"lowPriorityNodes": {
"min": 1,
"max": 1
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": [],
"github": [],
"githubAuthenticationToken": ""
},
"commandLine": [
"wget https://raw.githubusercontent.com/Azure/doAzureParallel/master/samples/package_management/bioc_setup.sh",
"chmod u+x ./bioc_setup.sh",
"./bioc_setup.sh",
"wget https://raw.githubusercontent.com/Azure/doAzureParallel/master/inst/startup/install_bioconductor.R",
"chmod u+x ./install_bioconductor.R",
"Rscript install_bioconductor.R GenomeInfoDb IRange"]
}

Просмотреть файл

@ -0,0 +1,23 @@
{
"name": "package_management",
"vmSize": "Standard_A2_v2",
"maxTasksPerNode": 1,
"poolSize": {
"dedicatedNodes": {
"min": 0,
"max": 0
},
"lowPriorityNodes": {
"min": 1,
"max": 1
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": ["xml2"],
"github": ["azure/rAzureBatch"],
"bioconductor": ["GenomeInfoDb", "IRange"],
"githubAuthenticationToken": ""
},
"commandLine": []
}

Просмотреть файл

@ -1,41 +0,0 @@
# =============
# === Setup ===
# =============
# install packages from github
library(devtools)
install_github("azure/razurebatch")
install_github("azure/doazureparallel")
# import packages
library(doAzureParallel)
# create credentials config files
generateCredentialsConfig("credentials.json")
# set azure credentials
setCredentials("credentials.json")
# generate cluster config json file
generateClusterConfig("cluster.json")
# Creating an Azure parallel backend
cluster <- makeCluster(clusterSetting = "cluster.json")
# Register your Azure parallel backend to the foreach implementation
registerDoAzureParallel(cluster)
# ==========================================================
# === Using plyr with doAzureParallel's parallel backend ===
# ==========================================================
# import plyr
library(plyr)
# For more information on plyr, https://github.com/hadley/plyr
dlply(iris, .(Species), function(x)
lm(x$Sepal.Width ~ x$Petal.Length, data=x),
.parallel=TRUE, .paropts = list(.packages = NULL,.export="iris"))
# de-provision your cluster in Azure
stopCluster(cluster)

Просмотреть файл

@ -0,0 +1,11 @@
# Resource Files
The following two samples show how to use resource files to move data onto and off of the nodes in doAzureParallel. Good data movement techniques, especially for large data, are critical to get your code running quickly and in a scalable fashion.
## Resource Files example
The resource files example is a good starting point on how to manage your files in the cloud and use them in your doAzureParallel cluster. The doAzureParallel package exposes Azure Storage methods to allow you to create, upload and download files from cloud storage.
This samples shows how to work with the well known large data set for the NYC Yellow Taxi Cab data set. It partitions the data set into monthly sets and then iterates over each month individually to create a map of all the pick up locations in NYC. The final result is then again uploaded to cloud storage as an image, and can be downloaded using any standard tools or viewed in a browser.
NOTE: _This sample may cause the cluster to take a bit of time to set up because it needs to download a large amount of data on each node._

Просмотреть файл

@ -0,0 +1,22 @@
{
"name": "resource_files",
"vmSize": "Standard_D11_v2",
"maxTasksPerNode": 1,
"poolSize": {
"dedicatedNodes": {
"min": 0,
"max": 0
},
"lowPriorityNodes": {
"min": 3,
"max": 3
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": ["data.table", "ggplot2"],
"github": ["azure/rAzureBatch"],
"bioconductor": []
},
"commandLine": []
}

Просмотреть файл

@ -1,23 +1,15 @@
# =======================================
# === Setup / Install and Credentials ===
# =======================================
# install packages from github
library(devtools)
install_github("azure/razurebatch")
install_github("azure/doazureparallel")
devtools::install_github("azure/doAzureParallel")
# import packages
library(doAzureParallel)
# create credentials config files
generateCredentialsConfig("credentials.json")
# set azure credentials
setCredentials("credentials.json")
# create credentials config files
generateClusterConfig("cluster_settings.json")
doAzureParallel::setCredentials("credentials.json")
# Add data.table package to the CRAN packages and Azure/rAzureBatch to the Github packages
# in order to install the packages to all of the nodes
@ -42,22 +34,22 @@ generateClusterConfig("cluster_settings.json")
# Using the NYC taxi datasets, http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml
azureStorageUrl <- "http://playdatastore.blob.core.windows.net/nyc-taxi-dataset"
resource_files <- list(
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), fileName = "yellow_tripdata_2016-1.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), fileName = "yellow_tripdata_2016-2.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), fileName = "yellow_tripdata_2016-3.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), fileName = "yellow_tripdata_2016-4.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), fileName = "yellow_tripdata_2016-5.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), fileName = "yellow_tripdata_2016-6.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-7.csv"), fileName = "yellow_tripdata_2016-7.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-8.csv"), fileName = "yellow_tripdata_2016-8.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-9.csv"), fileName = "yellow_tripdata_2016-9.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-10.csv"), fileName = "yellow_tripdata_2016-10.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-11.csv"), fileName = "yellow_tripdata_2016-11.csv"),
createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-12.csv"), fileName = "yellow_tripdata_2016-12.csv")
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), fileName = "yellow_tripdata_2016-1.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), fileName = "yellow_tripdata_2016-2.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), fileName = "yellow_tripdata_2016-3.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), fileName = "yellow_tripdata_2016-4.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), fileName = "yellow_tripdata_2016-5.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), fileName = "yellow_tripdata_2016-6.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-7.csv"), fileName = "yellow_tripdata_2016-7.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-8.csv"), fileName = "yellow_tripdata_2016-8.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-9.csv"), fileName = "yellow_tripdata_2016-9.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-10.csv"), fileName = "yellow_tripdata_2016-10.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-11.csv"), fileName = "yellow_tripdata_2016-11.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-12.csv"), fileName = "yellow_tripdata_2016-12.csv")
)
# add the parameter 'resourceFiles' to download files to nodes
cluster <- makeCluster("cluster_settings.json", resourceFiles = resource_files)
cluster <- makeCluster("resource_files_cluster.json", resourceFiles = resource_files)
# when the cluster is provisioned, register the cluster as your parallel backend
registerDoAzureParallel(cluster)
@ -77,18 +69,25 @@ registerDoAzureParallel(cluster)
#
storageAccountName <- "mystorageaccount"
outputsContainer <- "nyc-taxi-graphs"
createContainer(outputsContainer)
outputSas <- createSasToken(permission = "w", sr = "c", outputsContainer)
rAzureBatch::createContainer(outputsContainer)
# permissions: r = read, w = write.
outputSas <- rAzureBatch::createSasToken(permission = "rw", sr = "c", outputsContainer)
# =======================================================
# === Foreach with resourceFiles & writing to storage ===
# =======================================================
results <- foreach(i = 1:12, .packages = c("data.table", "ggplot2", "rAzureBatch")) %dopar% {
results <- foreach(i = 1:12) %dopar% {
library(data.table)
library(ggplot2)
library(rAzureBatch)
# To get access to your azure resource files, user needs to use the special
# environment variable to get the directory
fileDirectory <- paste0(Sys.getenv("AZ_BATCH_NODE_STARTUP_DIR"), "/wd")
print(fileDirectory)
# columns to keep for the datafram
colsToKeep <- c("pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "tip_amount", "trip_distance")
@ -115,12 +114,16 @@ results <- foreach(i = 1:12, .packages = c("data.table", "ggplot2", "rAzureBatch
ggsave(image)
# save image to the storage account using the Sas token we created above
uploadBlob(containerName = outputsContainer,
blob <- rAzureBatch::uploadBlob(containerName = outputsContainer,
image,
sasToken = outputSas,
accountName = storageAccountName)
NULL
# return the blob url
blob$url
}
results
# deprovision your cluster after your work is complete
stopCluster(cluster)

Просмотреть файл

@ -1,50 +0,0 @@
library(doAzureParallel)
setCredentials("credentials.json")
setVerbose(TRUE)
storageAccountName <- "mystorageaccount"
inputContainerName <- "datasets"
# Generate a sas token with the createSasToken function
writeSasToken <- rAzureBatch::createSasToken(permission = "w", sr = "c", inputContainerName)
readSasToken <- rAzureBatch::createSasToken(permission = "r", sr = "c", inputContainerName)
# Upload blobs with a write sasToken
rAzureBatch::uploadBlob(inputContainerName,
fileDirectory = "1989.csv",
sasToken = writeSasToken,
accountName = storageAccountName)
rAzureBatch::uploadBlob(inputContainerName,
fileDirectory = "1990.csv",
sasToken = writeSasToken,
accountName = storageAccountName)
csvFileUrl1 <- rAzureBatch::createBlobUrl(storageAccount = storageAccountName,
containerName = inputContainerName,
sasToken = readSasToken,
fileName = "1989.csv")
csvFileUrl2 <- rAzureBatch::createBlobUrl(storageAccount = storageAccountName,
containerName = inputContainerName,
sasToken = readSasToken,
fileName = "1990.csv")
azure_files = list(
rAzureBatch::createResourceFile(url = csvFileUrl1, fileName = "1989.csv"),
rAzureBatch::createResourceFile(url = csvFileUrl2, fileName = "1990.csv")
)
cluster <- doAzureParallel::makeCluster("cluster_settings.json", resourceFiles = azure_files)
registerDoAzureParallel(cluster)
# To get access to your azure resource files, user needs to use the special
# environment variable to get the directory
listFiles <- foreach(i = 1989:1990, .combine = 'c') %dopar% {
fileDirectory <- paste0(Sys.getenv("AZ_BATCH_NODE_STARTUP_DIR"), "/wd")
return(list.files(fileDirectory))
}
stopCluster(cluster)

Просмотреть файл

@ -9,4 +9,4 @@
library(testthat)
library(doAzureParallel)
test_check("doAzureParallel")
test_check("doAzureParallel")

Просмотреть файл

@ -21,7 +21,7 @@ test_that("validating a cluster config file with bad autoscale formula property"
configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file = paste0(getwd(), "/", clusterConfig))
expect_error(validateClusterConfig(clusterConfig))
expect_error(validation$isValidClusterConfig(clusterConfig))
on.exit(file.remove(clusterConfig))
})
@ -38,21 +38,7 @@ test_that("validating a cluster config file with incorrect data types", {
configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file = paste0(getwd(), "/", clusterConfig))
expect_error(validateClusterConfig(clusterConfig))
on.exit(file.remove(clusterConfig))
})
test_that("validating a cluster config file with default values", {
clusterConfig <- "default.json"
generateClusterConfig(clusterConfig)
config <- jsonlite::fromJSON(clusterConfig)
configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file = paste0(getwd(), "/", clusterConfig))
expect_equal(validateClusterConfig(clusterConfig), TRUE)
expect_error(validation$isValidClusterConfig(clusterConfig))
on.exit(file.remove(clusterConfig))
})
@ -68,7 +54,7 @@ test_that("validating a cluster config file with null values", {
configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file = paste0(getwd(), "/", clusterConfig))
expect_error(validateClusterConfig(clusterConfig))
expect_error(validation$isValidClusterConfig(clusterConfig))
on.exit(file.remove(clusterConfig))
})

Просмотреть файл

@ -1,10 +1,11 @@
# Run this test for users to make sure the core features
# of doAzureParallel are still working
context("live scenario test")
test_that("Scenario Test", {
test_that("Basic scenario test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
clusterFileName <- "test_cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
@ -15,20 +16,19 @@ test_that("Scenario Test", {
doAzureParallel::registerDoAzureParallel(cluster)
'%dopar%' <- foreach::'%dopar%'
res <- foreach::foreach(i = 1:4) %dopar% {
mean(1:3)
}
res <-
foreach::foreach(i = 1:4) %dopar% {
mean(1:3)
}
doAzureParallel::stopCluster(cluster)
res
testthat::expect_equal(length(res),
4)
testthat::expect_equal(res,
list(2, 2, 2, 2))
testthat::expect_equal(length(res), 4)
testthat::expect_equal(res, list(2, 2, 2, 2))
})
test_that("Chunksize Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
@ -42,12 +42,11 @@ test_that("Chunksize Test", {
'%dopar%' <- foreach::'%dopar%'
res <-
foreach::foreach(i = 1:10, .options.azure = list(chunkSize = 3)) %dopar% {
foreach::foreach(i = 1:10,
.options.azure = list(chunkSize = 3)) %dopar% {
i
}
doAzureParallel::stopCluster(cluster)
testthat::expect_equal(length(res),
10)

Просмотреть файл

@ -2,6 +2,7 @@
# of doAzureParallel are still working
context("long running job scenario test")
test_that("Long Running Job Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"

Просмотреть файл

@ -0,0 +1,118 @@
# Run this test for users to make sure the bioconductor package
# install feature of doAzureParallel are still working
context("bioconductor package install scenario test")
test_that("job single bioconductor package install Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
opt <- list(wait = TRUE)
'%dopar%' <- foreach::'%dopar%'
bioconductor <- 'AMOUNTAIN'
res <-
foreach::foreach(
i = 1:4,
bioconductor = bioconductor,
.options.azure = opt
) %dopar% {
"AMOUNTAIN" %in% rownames(installed.packages())
}
# verify the job result is correct
testthat::expect_equal(length(res),
4)
testthat::expect_equal(res,
list(TRUE, TRUE, TRUE, TRUE))
})
test_that("job multiple bioconductor package install Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
opt <- list(wait = TRUE)
'%dopar%' <- foreach::'%dopar%'
bioconductor <- c('AgiMicroRna', 'biobroom', 'BiocParallel')
res <-
foreach::foreach(i = 1:4,
bioconductor = bioconductor,
.options.azure = opt) %dopar% {
c("AgiMicroRna" %in% rownames(installed.packages()),
"biobroom" %in% rownames(installed.packages()),
"BiocParallel" %in% rownames(installed.packages()))
}
# verify the job result is correct
testthat::expect_equal(length(res),
4)
testthat::expect_equal(res,
list(
c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE)))
})
test_that("pool multiple bioconductor package install Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
config <- jsonlite::fromJSON(clusterFileName)
config$name <- "bioconductorPackages1"
config$poolSize$dedicatedNodes$min <- 0
config$poolSize$dedicatedNodes$max <- 0
config$poolSize$lowPriorityNodes$min <- 1
config$poolSize$lowPriorityNodes$max <- 1
config$rPackages$bioconductor <- c('AgiMicroRna', 'biobroom', 'BiocParallel')
configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file = paste0(getwd(), "/", clusterFileName))
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
'%dopar%' <- foreach::'%dopar%'
res <-
foreach::foreach(i = 1:2) %dopar% {
c("AgiMicroRna" %in% rownames(installed.packages()),
"biobroom" %in% rownames(installed.packages()),
"BiocParallel" %in% rownames(installed.packages()))
}
# verify the job result is correct
testthat::expect_equal(length(res),
2)
testthat::expect_equal(res,
list(
c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE)))
doAzureParallel::stopCluster(cluster)
})

Просмотреть файл

@ -0,0 +1,121 @@
# Run this test for users to make sure the github package
# install feature of doAzureParallel are still working
context("github package install scenario test")
test_that("single github package install Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
opt <- list(wait = TRUE)
'%dopar%' <- foreach::'%dopar%'
githubPackages <- 'Azure/doAzureParallel'
res <-
foreach::foreach(
i = 1:4,
github = githubPackages,
.options.azure = opt
) %dopar% {
"doAzureParallel" %in% rownames(installed.packages()) &&
"rAzureBatch" %in% rownames(installed.packages())
}
# verify the job result is correct
testthat::expect_equal(length(res),
4)
testthat::expect_equal(res,
list(TRUE, TRUE, TRUE, TRUE))
})
test_that("multiple github package install Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
opt <- list(wait = TRUE)
'%dopar%' <- foreach::'%dopar%'
githubPackages <- c('Azure/doAzureParallel', 'twitter/AnomalyDetection', 'hadley/dplyr')
res <-
foreach::foreach(
i = 1:3,
github = githubPackages,
.options.azure = opt
) %dopar% {
c("doAzureParallel" %in% rownames(installed.packages()),
"AnomalyDetection" %in% rownames(installed.packages()),
"dplyr" %in% rownames(installed.packages()))
}
# verify the job result is correct
testthat::expect_equal(length(res),
3)
testthat::expect_equal(res,
list(c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE)))
})
test_that("pool multiple github package install Test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
githubPackages <- c('Azure/doAzureParallel', 'twitter/AnomalyDetection', 'hadley/dplyr')
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
config <- jsonlite::fromJSON(clusterFileName)
config$name <- "multipleGithubPackage"
config$poolSize$dedicatedNodes$min <- 0
config$poolSize$dedicatedNodes$max <- 0
config$poolSize$lowPriorityNodes$min <- 1
config$poolSize$lowPriorityNodes$max <- 1
config$rPackages$github <- c('Azure/doAzureParallel', 'twitter/AnomalyDetection', 'hadley/dplyr')
configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file = paste0(getwd(), "/", clusterFileName))
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
'%dopar%' <- foreach::'%dopar%'
res <-
foreach::foreach(i = 1:3) %dopar% {
c("doAzureParallel" %in% rownames(installed.packages()),
"AnomalyDetection" %in% rownames(installed.packages()),
"dplyr" %in% rownames(installed.packages()))
}
# verify the job result is correct
testthat::expect_equal(length(res),
3)
testthat::expect_equal(res,
list(c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE),
c(TRUE, TRUE, TRUE)))
doAzureParallel::stopCluster(cluster)
})

Просмотреть файл

@ -21,11 +21,18 @@ test_that("successfully create cran pool package command line", {
poolInstallation <-
getPoolPackageInstallationCommand("cran", c("hts", "lubridate", "tidyr"))
expect_equal(length(poolInstallation), 3)
libPathCommand <-
paste(
"Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)'",
"-e \'.libPaths( c( \\\"/mnt/batch/tasks/shared/R/packages\\\", .libPaths()));"
)
expected <-
c(
"Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)' -e \'install.packages(args[1])\' hts",
"Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)' -e \'install.packages(args[1])\' lubridate",
"Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)' -e \'install.packages(args[1])\' tidyr"
paste(libPathCommand, "install.packages(args[1])\' hts"),
paste(libPathCommand, "install.packages(args[1])\' lubridate"),
paste(libPathCommand, "install.packages(args[1])\' tidyr")
)
expect_equal(poolInstallation, expected)
@ -36,12 +43,35 @@ test_that("successfully create github pool package command line", {
getPoolPackageInstallationCommand("github", c("Azure/doAzureParallel", "Azure/rAzureBatch"))
expect_equal(length(poolInstallation), 2)
libPathCommand <-
paste(
"Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)'",
"-e \'.libPaths( c( \\\"/mnt/batch/tasks/shared/R/packages\\\", .libPaths()));"
)
expected <-
c(
paste0("Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)' ",
"-e \'devtools::install_github(args[1])\' Azure/doAzureParallel"),
paste0("Rscript -e \'args <- commandArgs(TRUE)\' -e 'options(warn=2)' ",
"-e \'devtools::install_github(args[1])\' Azure/rAzureBatch")
paste(libPathCommand, "devtools::install_github(args[1])\' Azure/doAzureParallel"),
paste(libPathCommand, "devtools::install_github(args[1])\' Azure/rAzureBatch")
)
expect_equal(poolInstallation, expected)
})
test_that("successfully create bioconductor pool package command line", {
poolInstallation <-
getPoolPackageInstallationCommand("bioconductor", c("IRanges", "a4"))
cat(poolInstallation)
expect_equal(length(poolInstallation), 2)
expected <-
c(
paste("Rscript /mnt/batch/tasks/startup/wd/install_bioconductor.R",
"IRanges",
sep = " "),
paste("Rscript /mnt/batch/tasks/startup/wd/install_bioconductor.R",
"a4",
sep = " ")
)
expect_equal(poolInstallation, expected)