From 3dadf0803ee8228d7b5869705d1e580eab729d2d Mon Sep 17 00:00:00 2001 From: Brian Date: Thu, 28 Sep 2017 15:18:06 -0700 Subject: [PATCH] Fixed worker and merger scripts (#116) * Fixed worker and merger scripts * Fixed verbose logs based on PR comments * Added documentation on error handling * Fixed header on table markdown * Fixed based on PR comments --- R/doAzureParallel.R | 121 +++--- R/helpers.R | 11 +- R/utility.R | 162 ++++---- README.md | 721 ++++++++++++++++++---------------- inst/startup/install_github.R | 31 +- inst/startup/merger.R | 112 ++++-- inst/startup/worker.R | 30 +- man/waitForTasksToComplete.Rd | 2 +- 8 files changed, 667 insertions(+), 523 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 02bcb2d..b42e719 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -186,7 +186,6 @@ setHttpTraffic <- function(value = FALSE) { } pkgName <- if (exists("packageName", mode = "function")) - packageName(envir) else NULL @@ -414,16 +413,10 @@ setHttpTraffic <- function(value = FALSE) { taskId <- paste0(id, "-task", i) .addTask( - id, + jobId = id, taskId = taskId, rCommand = sprintf( - "Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R %s %s %s %s > %s.txt", - "$AZ_BATCH_JOB_PREP_WORKING_DIR", - "$AZ_BATCH_TASK_WORKING_DIR", - jobFileName, - paste0(taskId, ".rds"), - taskId - ), + "Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R > $AZ_BATCH_TASK_ID.txt"), args = argsList[startIndex:endIndex], envir = .doAzureBatchGlobals, packages = obj$packages, @@ -436,17 +429,15 @@ setHttpTraffic <- function(value = FALSE) { rAzureBatch::updateJob(id) if (enableCloudCombine) { + mergeTaskId <- paste0(id, "-merge") .addTask( - id, - taskId = paste0(id, "-merge"), + jobId = id, + taskId = mergeTaskId, rCommand = sprintf( - "Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s %s %s > %s.txt", - "$AZ_BATCH_JOB_PREP_WORKING_DIR", - "$AZ_BATCH_TASK_WORKING_DIR", - id, + "Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s > $AZ_BATCH_TASK_ID.txt", length(tasks), - ntasks, - paste0(id, "-merge") + chunkSize, + as.character(obj$errorHandling) ), envir = .doAzureBatchGlobals, packages = obj$packages, @@ -461,61 +452,67 @@ setHttpTraffic <- function(value = FALSE) { waitForJobPreparation(id, data$poolId) } - waitForTasksToComplete(id, jobTimeout) + tryCatch({ + waitForTasksToComplete(id, jobTimeout, obj$errorHandling) - if (typeof(cloudCombine) == "list" && enableCloudCombine) { - tempFile <- tempfile("doAzureParallel", fileext = ".rds") + if (typeof(cloudCombine) == "list" && enableCloudCombine) { + tempFile <- tempfile("doAzureParallel", fileext = ".rds") - response <- - rAzureBatch::downloadBlob( - id, - paste0("result/", id, "-merge-result.rds"), - sasToken = sasToken, - accountName = storageCredentials$name, - downloadPath = tempFile, - overwrite = TRUE - ) + response <- + rAzureBatch::downloadBlob( + id, + paste0("result/", id, "-merge-result.rds"), + sasToken = sasToken, + accountName = storageCredentials$name, + downloadPath = tempFile, + overwrite = TRUE + ) - results <- readRDS(tempFile) + results <- readRDS(tempFile) - failTasks <- sapply(results, .isError) + failTasks <- sapply(results, .isError) - numberOfFailedTasks <- sum(unlist(failTasks)) + numberOfFailedTasks <- sum(unlist(failTasks)) - if (numberOfFailedTasks > 0) { - .createErrorViewerPane(id, failTasks) - } + if (numberOfFailedTasks > 0) { + .createErrorViewerPane(id, failTasks) + } - accumulator <- foreach::makeAccum(it) + accumulator <- foreach::makeAccum(it) - tryCatch( - accumulator(results, seq(along = results)), - error = function(e) { - cat("error calling combine function:\n") - print(e) + tryCatch( + accumulator(results, seq(along = results)), + error = function(e) { + cat("error calling combine function:\n") + print(e) + } + ) + + # check for errors + errorValue <- foreach::getErrorValue(it) + errorIndex <- foreach::getErrorIndex(it) + + cat(sprintf("Number of errors: %i", numberOfFailedTasks), + fill = TRUE) + + rAzureBatch::deleteJob(id) + + if (identical(obj$errorHandling, "stop") && + !is.null(errorValue)) { + msg <- sprintf("task %d failed - '%s'", + errorIndex, + conditionMessage(errorValue)) + stop(simpleError(msg, call = expr)) + } + else { + foreach::getResult(it) + } } - ) - - # check for errors - errorValue <- foreach::getErrorValue(it) - errorIndex <- foreach::getErrorIndex(it) - - cat(sprintf("Number of errors: %i", numberOfFailedTasks), - fill = TRUE) - - rAzureBatch::deleteJob(id) - - if (identical(obj$errorHandling, "stop") && - !is.null(errorValue)) { - msg <- sprintf("task %d failed - '%s'", - errorIndex, - conditionMessage(errorValue)) - stop(simpleError(msg, call = expr)) + }, + error = function(ex){ + message(ex) } - else { - foreach::getResult(it) - } - } + ) } else{ print( diff --git a/R/helpers.R b/R/helpers.R index d860044..5655ed4 100644 --- a/R/helpers.R +++ b/R/helpers.R @@ -29,9 +29,17 @@ resourceFiles <- list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile)) + exitConditions <- NULL if (!is.null(args$dependsOn)) { dependsOn <- list(taskIds = dependsOn) } + else { + exitConditions <- list( + default = list( + dependencyAction = "satisfy" + ) + ) + } resultFile <- paste0(taskId, "-result", ".rds") accountName <- storageCredentials$name @@ -122,7 +130,8 @@ resourceFiles = resourceFiles, commandLine = commands, dependsOn = dependsOn, - outputFiles = outputFiles + outputFiles = outputFiles, + exitConditions = exitConditions ) } diff --git a/R/utility.R b/R/utility.R index 5baa63f..42948d8 100644 --- a/R/utility.R +++ b/R/utility.R @@ -257,7 +257,8 @@ getJobResult <- function(jobId = "", ...) { if (!is.null(args$container)) { results <- - rAzureBatch::downloadBlob(args$container, paste0("result/", jobId, "-merge-result.rds")) + rAzureBatch::downloadBlob(args$container, + paste0("result/", jobId, "-merge-result.rds")) } else{ results <- @@ -437,58 +438,25 @@ createOutputFile <- function(filePattern, url) { output } + #' Wait for current tasks to complete #' #' @export -waitForTasksToComplete <- function(jobId, timeout) { - cat("Waiting for tasks to complete. . .", fill = TRUE) +waitForTasksToComplete <- + function(jobId, timeout, errorHandling = "stop") { + cat("Waiting for tasks to complete. . .", fill = TRUE) - numOfTasks <- 0 - currentTasks <- rAzureBatch::listTask(jobId) - - if (is.null(currentTasks$value)) { - stop(paste0("Error: ", currentTasks$message$value)) - return() - } - - numOfTasks <- numOfTasks + length(currentTasks$value) - - # Getting the total count of tasks for progress bar - repeat { - if (is.null(currentTasks$odata.nextLink)) { - break - } - - skipTokenParameter <- - strsplit(currentTasks$odata.nextLink, "&")[[1]][2] - - skipTokenValue <- - substr(skipTokenParameter, - nchar("$skiptoken=") + 1, - nchar(skipTokenParameter)) - - currentTasks <- - rAzureBatch::listTask(jobId, skipToken = URLdecode(skipTokenValue)) - numOfTasks <- numOfTasks + length(currentTasks$value) - } - - pb <- txtProgressBar(min = 0, max = numOfTasks, style = 3) - - timeToTimeout <- Sys.time() + timeout - - while (Sys.time() < timeToTimeout) { - count <- 0 + totalTasks <- 0 currentTasks <- rAzureBatch::listTask(jobId) - taskStates <- - lapply(currentTasks$value, function(x) - x$state != "completed") - for (i in 1:length(taskStates)) { - if (taskStates[[i]] == FALSE) { - count <- count + 1 - } + if (is.null(currentTasks$value)) { + stop(paste0("Error: ", currentTasks$message$value)) + return() } + totalTasks <- totalTasks + length(currentTasks$value) + + # Getting the total count of tasks for progress bar repeat { if (is.null(currentTasks$odata.nextLink)) { break @@ -505,30 +473,84 @@ waitForTasksToComplete <- function(jobId, timeout) { currentTasks <- rAzureBatch::listTask(jobId, skipToken = URLdecode(skipTokenValue)) - taskStates <- - lapply(currentTasks$value, function(x) - x$state != "completed") + totalTasks <- totalTasks + length(currentTasks$value) + } - for (i in 1:length(taskStates)) { - if (taskStates[[i]] == FALSE) { - count <- count + 1 + pb <- txtProgressBar(min = 0, max = totalTasks, style = 3) + timeToTimeout <- Sys.time() + timeout + + repeat { + taskCounts <- rAzureBatch::getJobTaskCounts(jobId) + setTxtProgressBar(pb, taskCounts$completed) + + validationFlag <- + (taskCounts$validationStatus == "Validated" && + totalTasks <= 200000) || + totalTasks > 200000 + + if (taskCounts$failed > 0 && + errorHandling == "stop" && + validationFlag) { + cat("\n") + + select <- "id, executionInfo" + failedTasks <- + rAzureBatch::listTask(jobId, select = select) + + tasksFailureWarningLabel <- + sprintf(paste("%i task(s) failed while running the job.", + "This caused the job to terminate automatically.", + "To disable this behavior and continue on failure, set .errorHandling='remove | pass'", + "in the foreach loop\n"), taskCounts$failed) + + + for (i in 1:length(failedTasks$value)) { + if (failedTasks$value[[i]]$executionInfo$result == "Failure") { + tasksFailureWarningLabel <- + paste0(tasksFailureWarningLabel, + sprintf("%s\n", failedTasks$value[[i]]$id)) + } } + + warning(sprintf(tasksFailureWarningLabel, + taskCounts$failed)) + + response <- rAzureBatch::terminateJob(jobId) + httr::stop_for_status(response) + + stop(sprintf( + paste("Errors have occurred while running the job '%s'.", + "Error handling is set to 'stop' and has proceeded to terminate the job.", + "The user will have to handle deleting the job.", + "If this is not the correct behavior, change the errorHandling property to 'pass'", + " or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.", + "For more information about getting job logs, follow this link:", + paste0("https://github.com/Azure/doAzureParallel/blob/master/docs/", + "40-troubleshooting.md#viewing-files-directly-from-compute-node")), + jobId + )) } + + if (Sys.time() > timeToTimeout) { + stop(sprintf(paste("Timeout has occurred while waiting for tasks to complete.", + "Users will have to manually track the job '%s' and get the results.", + "Use the getJobResults function to obtain the results and getJobList for", + "tracking job status. To change the timeout, set 'timeout' property in the", + "foreach's options.azure.")), + jobId) + } + + if (taskCounts$completed >= totalTasks && + (taskCounts$validationStatus == "Validated" || + totalTasks >= 200000)) { + cat("\n") + return(0) + } + + Sys.sleep(10) } - - setTxtProgressBar(pb, count) - - if (all(taskStates == FALSE)) { - cat("\n") - return(0) - } - - Sys.sleep(10) } - stop("A timeout has occurred when waiting for tasks to complete.") -} - waitForJobPreparation <- function(jobId, poolId) { cat("Job Preparation Status: Package(s) being installed") @@ -556,11 +578,13 @@ waitForJobPreparation <- function(jobId, poolId) { # Verify that all the job preparation tasks are not failing if (all(FALSE %in% statuses)) { cat("\n") - stop(paste( - sprintf("Job '%s' unable to install packages.", jobId), - "Use the 'getJobFile' function to get more information about", - "job package installation." - )) + stop( + paste( + sprintf("Job '%s' unable to install packages.", jobId), + "Use the 'getJobFile' function to get more information about", + "job package installation." + ) + ) } cat(".") @@ -574,6 +598,6 @@ getXmlValues <- function(xmlResponse, xmlPath) { xml2::xml_text(xml2::xml_find_all(xmlResponse, xmlPath)) } -areShallowEqual <- function(a, b){ +areShallowEqual <- function(a, b) { !is.null(a) && !is.null(b) && a == b } diff --git a/README.md b/README.md index c3478bc..e939ac6 100644 --- a/README.md +++ b/README.md @@ -1,335 +1,386 @@ -[![Build Status](https://travis-ci.org/Azure/doAzureParallel.svg?branch=master)](https://travis-ci.org/Azure/doAzureParallel) -# doAzureParallel - -```R -# set your credentials -setCredentials("credentials.json") - -# setup your cluster with a simple config file -cluster<- makeCluster("cluster.json") - -# register the cluster as your parallel backend -registerDoAzureParallel(cluster) - -# run your foreach loop on a distributed cluster in Azure -number_of_iterations <- 10 -results <- foreach(i = 1:number_of_iterations) %dopar% { - myParallelAlgorithm() -} -``` - -## Introduction - -The *doAzureParallel* package is a parallel backend for the widely popular *foreach* package. With *doAzureParallel*, each iteration of the *foreach* loop runs in parallel on an Azure Virtual Machine (VM), allowing users to scale up their R jobs to tens or hundreds of machines. - -*doAzureParallel* is built to support the *foreach* parallel computing package. The *foreach* package supports parallel execution - it can execute multiple processes across some parallel backend. With just a few lines of code, the *doAzureParallel* package helps create a cluster in Azure, register it as a parallel backend, and seamlessly connects to the *foreach* package. - -NOTE: The terms *pool* and *cluster* are used interchangably throughout this document. - -## Dependencies - -- R (>= 3.3.1) -- httr (>= 1.2.1) -- rjson (>= 0.2.15) -- RCurl (>= 1.95-4.8) -- digest (>= 0.6.9) -- foreach (>= 1.4.3) -- iterators (>= 1.0.8) -- bitops (>= 1.0.5) - -## Installation - -Install doAzureParallel directly from Github. - -```R -# install the package devtools -install.packages("devtools") -library(devtools) - -# install the doAzureParallel and rAzureBatch package -install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel")) -``` - -## Azure Requirements - -To run your R code across a cluster in Azure, we'll need to get keys and account information. - -### Setup Azure Account -First, set up your Azure Account ([Get started for free!](https://azure.microsoft.com/en-us/free/)) - -Once you have an Azure account, you'll need to create the following two services in the Azure portal: -- Azure Batch Account ([Create an Azure Batch Account in the Portal](https://docs.microsoft.com/en-us/azure/Batch/batch-account-create-portal)) -- Azure Storage Account (this can be created with the Batch Account) - -### Get Keys and Account Information -For your Azure Batch Account, we need to get: -- Batch Account Name -- Batch Account URL -- Batch Account Access Key - -This information can be found in the Azure Portal inside your Batch Account: - -![Azure Batch Acccount in the Portal](./vignettes/doAzureParallel-azurebatch-instructions.PNG "Azure Batch Acccount in the Portal") - -For your Azure Storage Account, we need to get: -- Storage Account Name -- Storage Account Access Key - -This information can be found in the Azure Portal inside your Azure Storage Account: - -![Azure Storage Acccount in the Portal](./vignettes/doAzureParallel-azurestorage-instructions.PNG "Azure Storage Acccount in the Portal") - -Keep track of the above keys and account information as it will be used to connect your R session with Azure. - -## Getting Started - -Import the package -```R -library(doAzureParallel) -``` - -Set up your parallel backend with Azure. This is your set of Azure VMs. -```R -# 1. Generate your credential and cluster configuration files. -generateClusterConfig("cluster.json") -generateCredentialsConfig("credentials.json") - -# 2. Fill out your credential config and cluster config files. -# Enter your Azure Batch Account & Azure Storage keys/account-info into your credential config ("credentials.json") and configure your cluster in your cluster config ("cluster.json") - -# 3. Set your credentials - you need to give the R session your credentials to interact with Azure -setCredentials("credentials.json") - -# 4. Register the pool. This will create a new pool if your pool hasn't already been provisioned. -cluster <- makeCluster("cluster.json") - -# 5. Register the pool as your parallel backend -registerDoAzureParallel(cluster) - -# 6. Check that your parallel backend has been registered -getDoParWorkers() -``` - -Run your parallel *foreach* loop with the *%dopar%* keyword. The *foreach* function will return the results of your parallel code. - -```R -number_of_iterations <- 10 -results <- foreach(i = 1:number_of_iterations) %dopar% { - # This code is executed, in parallel, across your cluster. - myAlgorithm() -} -``` - -After you finish running your R code in Azure, you may want to shut down your cluster of VMs to make sure that you are not being charged anymore. - -```R -# shut down your pool -stopCluster(cluster) -``` - -### Configuration JSON files - -#### Credentials -Use your credential config JSON file to enter your credentials. - -```javascript -{ - "batchAccount": { - "name": , - "key": , - "url": - }, - "storageAccount": { - "name": , - "key": - } -} -``` -Learn more: - - [Batch account / Storage account](./README.md#azure-requirements) - - -#### Cluster Settings -Use your pool configuration JSON file to define your pool in Azure. - -```javascript -{ - "name": , // example: "myazurecluster" - "vmSize": , // example: "Standard_F2" - "maxTasksPerNode": , // example: "2" - "poolSize": { - "dedicatedNodes": { // dedicated vms - "min": 2, - "max": 2 - }, - "lowPriorityNodes": { // low priority vms - "min": 1, - "max": 10 - }, - "autoscaleFormula": "QUEUE" - }, - "rPackages": { - "cran": ["some_cran_package", "some_other_cran_package"], - "github": ["username/some_github_package", "another_username/some_other_github_package"], - "githubAuthenticationToken": {} - }, - "commandLine": [] -} -``` -NOTE: If you do **not** want your cluster to autoscale, simply set the number of min nodes equal to max nodes for low-priority and dedicated. - -Learn more: - - [Choosing VM size](./docs/10-vm-sizes.md#vm-size-table) - - [MaxTasksPerNode](./docs/22-parallelizing-cores.md) - - [LowPriorityNodes](#low-priority-vms) - - [Autoscale](./docs/11-autoscale.md) - - [PoolSize Limitations](./docs/12-quota-limitations.md) - - [rPackages](./docs/20-package-management.md) - -### Low Priority VMs -Low-priority VMs are a way to obtain and consume Azure compute at a much lower price using Azure Batch. Since doAzureParallel is built on top of Azure Batch, this package is able to take advantage of low-priority VMs and allocate compute resources from Azure's surplus capacity at up to **80% discount**. - -Low-priority VMs come with the understanding that when you request it, there is the possibility that we'll need to take some or all of it back. Hence the name *low-priority* - VMs may not be allocated or may be preempted due to higher priority allocations, which equate to full-priced VMs that have an SLA. - -And as the name suggests, this significant cost reduction is ideal for *low priority* workloads that do not have a strict performance requirement. - -With Azure Batch's first-class support for low-priority VMs, you can use them in conjunction with normal on-demand VMs (*dedicated VMs*) and enable job cost to be balanced with job execution flexibility: - - * Batch pools can contain both on-demand nodes and low-priority nodes. The two types can be independently scaled, either explicitly with the resize operation or automatically using auto-scale. Different configurations can be used, such as maximizing cost savings by always using low-priority nodes or spinning up on-demand nodes at full price, to maintain capacity by replacing any preempted low-priority nodes. - * If any low-priority nodes are preempted, then Batch will automatically attempt to replace the lost capacity, continually seeking to maintain the target amount of low-priority capacity in the pool. - * If tasks are interrupted when the node on which it is running is preempted, then the tasks are automatically re-queued to be re-run. - -For more information about low-priority VMs, please visit the [documentation](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms). - -You can also check out information on low-priority pricing [here](https://azure.microsoft.com/en-us/pricing/details/batch/). - -### Distributing Data -When developing at scale, you may also want to chunk up your data and distribute the data across your nodes. Learn more about that [here](./docs/21-distributing-data.md#chunking-data) - -### Using %do% vs %dopar% -When developing at scale, it is always recommended that you test and debug your code locally first. Switch between *%dopar%* and *%do%* to toggle between running in parallel on Azure and running in sequence on your local machine. - -```R -# run your code sequentially on your local machine -results <- foreach(i = 1:number_of_iterations) %do% { ... } - -# use the doAzureParallel backend to run your code in parallel across your Azure cluster -results <- foreach(i = 1:number_of_iterations) %dopar% { ... } -``` - -### Long-running Jobs + Job Management - -doAzureParallel also helps you manage your jobs so that you can run many jobs at once while managing it through a few simple methods. - - -```R -# List your jobs: -getJobList() -``` - -This will also let you run *long running jobs* easily. - -With long running jobs, you will need to keep track of your jobs as well as set your job to a non-blocking state. You can do this with the *.options.azure* options: - -```R -# set the .options.azure option in the foreach loop -opt <- list(job = 'unique_job_id', wait = FALSE) - -# NOTE - if the option wait = FALSE, foreach will return your unique job id -job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { ... } - -# get back your job results with your unique job id -results <- getJobResult(job_id) -``` - -Finally, you may also want to track the status of jobs that you've name: - -```R -# List specific jobs: -getJobList(c('unique_job_id', 'another_job_id')) -``` - -You can learn more about how to execute long-running jobs [here](./docs/23-persistent-storage.md). - -With long-running jobs, you can take advantage of Azure's autoscaling capabilities to save time and/or money. Learn more about autoscale [here](./docs/11-autoscale.md). - -### Using the 'chunkSize' option - -doAzureParallel also supports custom chunk sizes. This option allows you to group iterations of the foreach loop together and execute them in a single R session. - -```R -# set the chunkSize option -opt <- list(chunkSize = 3) -results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } -``` - -You should consider using the chunkSize if each iteration in the loop executes very quickly. - -If you have a static cluster and want to have a single chunk for each worker, you can compute the chunkSize as follows: - -```R -# compute the chunk size -cs <- ceiling(number_of_iterations / getDoParWorkers()) - -# run the foreach loop with chunkSize optimized -opt <- list(chunkSize = cs) -results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } -``` - -### Resizing Your Cluster - -At some point, you may also want to resize your cluster manually. You can do this simply with the command *resizeCluster*. - -```R -cluster <- makeCluster("cluster.json") - -# resize so that we have a min of 10 dedicated nodes and a max of 20 dedicated nodes -# AND a min of 10 low priority nodes and a max of 20 low priority nodes -resizeCluster( - cluster, - dedicatedMin = 10, - dedicatedMax = 20, - lowPriorityMin = 10, - lowPriorityMax = 20, - algorithm = 'QUEUE', - timeInterval = '5m' ) -``` - -If your cluster is using autoscale but you want to set it to a static size of 10, you can also use this method: - -```R -# resize to a static cluster of 10 -resizeCluster(cluster, - dedicatedMin = 10, - dedicatedMax = 10, - lowPriorityMin = 0, - lowPriorityMax = 0) -``` - -### Setting Verbose Mode to Debug - -To debug your doAzureParallel jobs, you can set the package to operate on *verbose* mode: - -```R -# turn on verbose mode -setVerbose(TRUE) - -# turn off verbose mode -setVerbose(FALSE) -``` -### Bypassing merge task - -Skipping the merge task is useful when the tasks results don't need to be merged into a list. To bypass the merge task, you can pass the *enableMerge* flag to the foreach object: - -```R -# Enable merge task -foreach(i = 1:3, .options.azure = list(enableMerge = TRUE)) - -# Disable merge task -foreach(i = 1:3, .options.azure = list(enableMerge = FALSE)) -``` -Note: User defined functions for the merge task is on our list of features that we are planning on doing. - -## Next Steps - -For more information, please visit [our documentation](./docs/README.md). +[![Build Status](https://travis-ci.org/Azure/doAzureParallel.svg?branch=master)](https://travis-ci.org/Azure/doAzureParallel) +# doAzureParallel + +```R +# set your credentials +setCredentials("credentials.json") + +# setup your cluster with a simple config file +cluster<- makeCluster("cluster.json") + +# register the cluster as your parallel backend +registerDoAzureParallel(cluster) + +# run your foreach loop on a distributed cluster in Azure +number_of_iterations <- 10 +results <- foreach(i = 1:number_of_iterations) %dopar% { + myParallelAlgorithm() +} +``` + +## Introduction + +The *doAzureParallel* package is a parallel backend for the widely popular *foreach* package. With *doAzureParallel*, each iteration of the *foreach* loop runs in parallel on an Azure Virtual Machine (VM), allowing users to scale up their R jobs to tens or hundreds of machines. + +*doAzureParallel* is built to support the *foreach* parallel computing package. The *foreach* package supports parallel execution - it can execute multiple processes across some parallel backend. With just a few lines of code, the *doAzureParallel* package helps create a cluster in Azure, register it as a parallel backend, and seamlessly connects to the *foreach* package. + +NOTE: The terms *pool* and *cluster* are used interchangably throughout this document. + +## Dependencies + +- R (>= 3.3.1) +- httr (>= 1.2.1) +- rjson (>= 0.2.15) +- RCurl (>= 1.95-4.8) +- digest (>= 0.6.9) +- foreach (>= 1.4.3) +- iterators (>= 1.0.8) +- bitops (>= 1.0.5) + +## Installation + +Install doAzureParallel directly from Github. + +```R +# install the package devtools +install.packages("devtools") +library(devtools) + +# install the doAzureParallel and rAzureBatch package +install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel")) +``` + +## Azure Requirements + +To run your R code across a cluster in Azure, we'll need to get keys and account information. + +### Setup Azure Account +First, set up your Azure Account ([Get started for free!](https://azure.microsoft.com/en-us/free/)) + +Once you have an Azure account, you'll need to create the following two services in the Azure portal: +- Azure Batch Account ([Create an Azure Batch Account in the Portal](https://docs.microsoft.com/en-us/azure/Batch/batch-account-create-portal)) +- Azure Storage Account (this can be created with the Batch Account) + +### Get Keys and Account Information +For your Azure Batch Account, we need to get: +- Batch Account Name +- Batch Account URL +- Batch Account Access Key + +This information can be found in the Azure Portal inside your Batch Account: + +![Azure Batch Acccount in the Portal](./vignettes/doAzureParallel-azurebatch-instructions.PNG "Azure Batch Acccount in the Portal") + +For your Azure Storage Account, we need to get: +- Storage Account Name +- Storage Account Access Key + +This information can be found in the Azure Portal inside your Azure Storage Account: + +![Azure Storage Acccount in the Portal](./vignettes/doAzureParallel-azurestorage-instructions.PNG "Azure Storage Acccount in the Portal") + +Keep track of the above keys and account information as it will be used to connect your R session with Azure. + +## Getting Started + +Import the package +```R +library(doAzureParallel) +``` + +Set up your parallel backend with Azure. This is your set of Azure VMs. +```R +# 1. Generate your credential and cluster configuration files. +generateClusterConfig("cluster.json") +generateCredentialsConfig("credentials.json") + +# 2. Fill out your credential config and cluster config files. +# Enter your Azure Batch Account & Azure Storage keys/account-info into your credential config ("credentials.json") and configure your cluster in your cluster config ("cluster.json") + +# 3. Set your credentials - you need to give the R session your credentials to interact with Azure +setCredentials("credentials.json") + +# 4. Register the pool. This will create a new pool if your pool hasn't already been provisioned. +cluster <- makeCluster("cluster.json") + +# 5. Register the pool as your parallel backend +registerDoAzureParallel(cluster) + +# 6. Check that your parallel backend has been registered +getDoParWorkers() +``` + +Run your parallel *foreach* loop with the *%dopar%* keyword. The *foreach* function will return the results of your parallel code. + +```R +number_of_iterations <- 10 +results <- foreach(i = 1:number_of_iterations) %dopar% { + # This code is executed, in parallel, across your cluster. + myAlgorithm() +} +``` + +After you finish running your R code in Azure, you may want to shut down your cluster of VMs to make sure that you are not being charged anymore. + +```R +# shut down your pool +stopCluster(cluster) +``` + +### Configuration JSON files + +#### Credentials +Use your credential config JSON file to enter your credentials. + +```javascript +{ + "batchAccount": { + "name": , + "key": , + "url": + }, + "storageAccount": { + "name": , + "key": + } +} +``` +Learn more: + - [Batch account / Storage account](./README.md#azure-requirements) + + +#### Cluster Settings +Use your pool configuration JSON file to define your pool in Azure. + +```javascript +{ + "name": , // example: "myazurecluster" + "vmSize": , // example: "Standard_F2" + "maxTasksPerNode": , // example: "2" + "poolSize": { + "dedicatedNodes": { // dedicated vms + "min": 2, + "max": 2 + }, + "lowPriorityNodes": { // low priority vms + "min": 1, + "max": 10 + }, + "autoscaleFormula": "QUEUE" + }, + "rPackages": { + "cran": ["some_cran_package", "some_other_cran_package"], + "github": ["username/some_github_package", "another_username/some_other_github_package"], + "githubAuthenticationToken": {} + }, + "commandLine": [] +} +``` +NOTE: If you do **not** want your cluster to autoscale, simply set the number of min nodes equal to max nodes for low-priority and dedicated. + +Learn more: + - [Choosing VM size](./docs/10-vm-sizes.md#vm-size-table) + - [MaxTasksPerNode](./docs/22-parallelizing-cores.md) + - [LowPriorityNodes](#low-priority-vms) + - [Autoscale](./docs/11-autoscale.md) + - [PoolSize Limitations](./docs/12-quota-limitations.md) + - [rPackages](./docs/20-package-management.md) + +### Low Priority VMs +Low-priority VMs are a way to obtain and consume Azure compute at a much lower price using Azure Batch. Since doAzureParallel is built on top of Azure Batch, this package is able to take advantage of low-priority VMs and allocate compute resources from Azure's surplus capacity at up to **80% discount**. + +Low-priority VMs come with the understanding that when you request it, there is the possibility that we'll need to take some or all of it back. Hence the name *low-priority* - VMs may not be allocated or may be preempted due to higher priority allocations, which equate to full-priced VMs that have an SLA. + +And as the name suggests, this significant cost reduction is ideal for *low priority* workloads that do not have a strict performance requirement. + +With Azure Batch's first-class support for low-priority VMs, you can use them in conjunction with normal on-demand VMs (*dedicated VMs*) and enable job cost to be balanced with job execution flexibility: + + * Batch pools can contain both on-demand nodes and low-priority nodes. The two types can be independently scaled, either explicitly with the resize operation or automatically using auto-scale. Different configurations can be used, such as maximizing cost savings by always using low-priority nodes or spinning up on-demand nodes at full price, to maintain capacity by replacing any preempted low-priority nodes. + * If any low-priority nodes are preempted, then Batch will automatically attempt to replace the lost capacity, continually seeking to maintain the target amount of low-priority capacity in the pool. + * If tasks are interrupted when the node on which it is running is preempted, then the tasks are automatically re-queued to be re-run. + +For more information about low-priority VMs, please visit the [documentation](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms). + +You can also check out information on low-priority pricing [here](https://azure.microsoft.com/en-us/pricing/details/batch/). + +### Distributing Data +When developing at scale, you may also want to chunk up your data and distribute the data across your nodes. Learn more about that [here](./docs/21-distributing-data.md#chunking-data) + +### Using %do% vs %dopar% +When developing at scale, it is always recommended that you test and debug your code locally first. Switch between *%dopar%* and *%do%* to toggle between running in parallel on Azure and running in sequence on your local machine. + +```R +# run your code sequentially on your local machine +results <- foreach(i = 1:number_of_iterations) %do% { ... } + +# use the doAzureParallel backend to run your code in parallel across your Azure cluster +results <- foreach(i = 1:number_of_iterations) %dopar% { ... } +``` + +### Error Handling +The errorhandling option specifies how failed tasks should be evaluated. By default, the error handling is 'stop' to ensure users' can have reproducible results. If a combine function is assigned, it must be able to handle error objects. + +Error Handling Type | Description +--- | --- +stop | The execution of the foreach will stop if an error occurs +pass | The error object of the task is included the results +remove | The result of a failed task will not be returned + +```R +# Remove R error objects from the results +res <- foreach::foreach(i = 1:4, .errorhandling = "remove") %dopar% { + if (i == 2 || i == 4) { + randomObject + } + + mean(1:3) +} + +#> res +#[[1]] +#[1] 2 +# +#[[2]] +#[1] 2 +``` + +```R +# Passing R error objects into the results +res <- foreach::foreach(i = 1:4, .errorhandling = "pass") %dopar% { + if (i == 2|| i == 4) { + randomObject + } + + sum(i, 1) +} + +#> res +#[[1]] +#[1] 2 +# +#[[2]] +# +# +#[[3]] +#[1] 4 +# +#[[4]] +# +``` + +### Long-running Jobs + Job Management + +doAzureParallel also helps you manage your jobs so that you can run many jobs at once while managing it through a few simple methods. + + +```R +# List your jobs: +getJobList() +``` + +This will also let you run *long running jobs* easily. + +With long running jobs, you will need to keep track of your jobs as well as set your job to a non-blocking state. You can do this with the *.options.azure* options: + +```R +# set the .options.azure option in the foreach loop +opt <- list(job = 'unique_job_id', wait = FALSE) + +# NOTE - if the option wait = FALSE, foreach will return your unique job id +job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { ... } + +# get back your job results with your unique job id +results <- getJobResult(job_id) +``` + +Finally, you may also want to track the status of jobs that you've name: + +```R +# List specific jobs: +getJobList(c('unique_job_id', 'another_job_id')) +``` + +You can learn more about how to execute long-running jobs [here](./docs/23-persistent-storage.md). + +With long-running jobs, you can take advantage of Azure's autoscaling capabilities to save time and/or money. Learn more about autoscale [here](./docs/11-autoscale.md). + +### Using the 'chunkSize' option + +doAzureParallel also supports custom chunk sizes. This option allows you to group iterations of the foreach loop together and execute them in a single R session. + +```R +# set the chunkSize option +opt <- list(chunkSize = 3) +results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } +``` + +You should consider using the chunkSize if each iteration in the loop executes very quickly. + +If you have a static cluster and want to have a single chunk for each worker, you can compute the chunkSize as follows: + +```R +# compute the chunk size +cs <- ceiling(number_of_iterations / getDoParWorkers()) + +# run the foreach loop with chunkSize optimized +opt <- list(chunkSize = cs) +results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } +``` + +### Resizing Your Cluster + +At some point, you may also want to resize your cluster manually. You can do this simply with the command *resizeCluster*. + +```R +cluster <- makeCluster("cluster.json") + +# resize so that we have a min of 10 dedicated nodes and a max of 20 dedicated nodes +# AND a min of 10 low priority nodes and a max of 20 low priority nodes +resizeCluster( + cluster, + dedicatedMin = 10, + dedicatedMax = 20, + lowPriorityMin = 10, + lowPriorityMax = 20, + algorithm = 'QUEUE', + timeInterval = '5m' ) +``` + +If your cluster is using autoscale but you want to set it to a static size of 10, you can also use this method: + +```R +# resize to a static cluster of 10 +resizeCluster(cluster, + dedicatedMin = 10, + dedicatedMax = 10, + lowPriorityMin = 0, + lowPriorityMax = 0) +``` + +### Setting Verbose Mode to Debug + +To debug your doAzureParallel jobs, you can set the package to operate on *verbose* mode: + +```R +# turn on verbose mode +setVerbose(TRUE) + +# turn off verbose mode +setVerbose(FALSE) +``` +### Bypassing merge task + +Skipping the merge task is useful when the tasks results don't need to be merged into a list. To bypass the merge task, you can pass the *enableMerge* flag to the foreach object: + +```R +# Enable merge task +foreach(i = 1:3, .options.azure = list(enableMerge = TRUE)) + +# Disable merge task +foreach(i = 1:3, .options.azure = list(enableMerge = FALSE)) +``` +Note: User defined functions for the merge task is on our list of features that we are planning on doing. + +## Next Steps + +For more information, please visit [our documentation](./docs/README.md). \ No newline at end of file diff --git a/inst/startup/install_github.R b/inst/startup/install_github.R index 5caf40e..54f8478 100644 --- a/inst/startup/install_github.R +++ b/inst/startup/install_github.R @@ -2,17 +2,30 @@ args <- commandArgs(trailingOnly = TRUE) # Assumption: devtools is already installed based on Azure DSVM +status <- tryCatch({ + for (package in args) { + packageDirectory <- strsplit(package, "/")[[1]] + packageName <- packageDirectory[length(packageDirectory)] -for (package in args) { - packageDirectory <- strsplit(package, "/")[[1]] - packageName <- packageDirectory[length(packageDirectory)] - - if (!require(packageName, character.only = TRUE)) { - devtools::install_github(packageDirectory) - require(packageName, character.only = TRUE) + if (!require(package, character.only = TRUE)) { + devtools::install_github(packageDirectory) + require(package, character.only = TRUE) + } } -} + + return(0) +}, +error = function(e) { + cat(sprintf( + "Error getting parent environment: %s\n", + conditionMessage(e) + )) + + # Install packages doesn't return a non-exit code. + # Using '1' as the default non-exit code + return(1) +}) quit(save = "yes", - status = 0, + status = status, runLast = FALSE) diff --git a/inst/startup/merger.R b/inst/startup/merger.R index b226333..d1835fc 100644 --- a/inst/startup/merger.R +++ b/inst/startup/merger.R @@ -1,64 +1,118 @@ #!/usr/bin/Rscript args <- commandArgs(trailingOnly = TRUE) +status <- 0 -# test if there is at least one argument: if not, return an error -if (length(args) == 0) { - stop("At least one argument must be supplied (input file).n", call. = FALSE) -} else if (length(args) == 1) { - # default output file - args[2] <- "out.txt" +isError <- function(x) { + inherits(x, "simpleError") || inherits(x, "try-error") } -batchJobPreparationDirectory <- args[1] -batchTaskWorkingDirectory <- args[2] -batchJobId <- args[3] +batchTasksCount <- as.integer(args[1]) +chunkSize <- as.integer(args[2]) +errorHandling <- args[3] -n <- args[4] -numOfTasks <- args[5] +batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID") +batchJobPreparationDirectory <- + Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR") +batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR") azbatchenv <- readRDS(paste0(batchJobPreparationDirectory, "/", batchJobId, ".rds")) +setwd(batchTaskWorkingDirectory) + for (package in azbatchenv$packages) { library(package, character.only = TRUE) } + parent.env(azbatchenv$exportenv) <- globalenv() +sessionInfo() enableCloudCombine <- azbatchenv$enableCloudCombine cloudCombine <- azbatchenv$cloudCombine if (typeof(cloudCombine) == "list" && enableCloudCombine) { - results <- vector("list", numOfTasks) + results <- vector("list", batchTasksCount * chunkSize) count <- 1 - tryCatch({ - for (i in 1:n) { - taskResult <- - file.path( - batchTaskWorkingDirectory, - "result", - paste0(batchJobId, "-task", i, "-result.rds") - ) - task <- readRDS(taskResult) + status <- tryCatch({ + if (errorHandling == "remove" || errorHandling == "stop") { + files <- list.files(file.path(batchTaskWorkingDirectory, + "result"), + full.names = TRUE) - for (t in 1:length(task)) { - results[count] <- task[t] - count <- count + 1 + if (errorHandling == "stop" && length(files) != batchTasksCount) { + stop(paste("Error handling is set to 'stop' and there are missing results due to", + "task failures. If this is not the correct behavior, change the errorHandling", + "property to 'pass' or 'remove' in the foreach object.", + "For more information on troubleshooting, check", + "https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md")) } + + results <- vector("list", length(files) * chunkSize) + + for (i in 1:length(files)) { + task <- readRDS(files[i]) + + if (isError(task)) { + if (errorHandling == "stop") { + stop("Error found") + } + else { + next + } + } + + for (t in 1:length(chunkSize)) { + results[count] <- task[t] + count <- count + 1 + } + } + + saveRDS(results, file = file.path( + batchTaskWorkingDirectory, + paste0(batchJobId, "-merge-result.rds") + )) + } + else if (errorHandling == "pass") { + for (i in 1:batchTasksCount) { + taskResult <- + file.path( + batchTaskWorkingDirectory, + "result", + paste0(batchJobId, "-task", i, "-result.rds") + ) + + if (file.exists(taskResult)) { + task <- readRDS(taskResult) + for (t in 1:length(chunkSize)) { + results[count] <- task[t] + count <- count + 1 + } + } + else { + for (t in 1:length(chunkSize)) { + results[count] <- NA + count <- count + 1 + } + } + } + + saveRDS(results, file = file.path( + batchTaskWorkingDirectory, + paste0(batchJobId, "-merge-result.rds") + )) } - saveRDS(results, file = file.path( - batchTaskWorkingDirectory, - paste0(batchJobId, "-merge-result.rds") - )) + 0 }, error = function(e) { print(e) + 1 }) } else { # Work needs to be done for utilizing custom merge functions } quit(save = "yes", - status = 0, + status = status, runLast = FALSE) diff --git a/inst/startup/worker.R b/inst/startup/worker.R index c32de70..025d4c1 100644 --- a/inst/startup/worker.R +++ b/inst/startup/worker.R @@ -1,13 +1,6 @@ #!/usr/bin/Rscript args <- commandArgs(trailingOnly = TRUE) - -# test if there is at least one argument: if not, return an error -if (length(args) == 0) { - stop("At least one argument must be supplied (input file).n", call. = FALSE) -} else if (length(args) == 1) { - # default output file - args[2] <- "out.txt" -} +workerErrorStatus <- 0 getparentenv <- function(pkgname) { parenv <- NULL @@ -55,10 +48,13 @@ getparentenv <- function(pkgname) { parenv } -batchJobPreparationDirectory <- args[1] -batchTaskWorkingDirectory <- args[2] -batchJobEnvironment <- args[3] -batchTaskEnvironment <- args[4] +batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID") +batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID") +batchJobPreparationDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR") +batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR") + +batchJobEnvironment <- paste0(batchJobId, ".rds") +batchTaskEnvironment <- paste0(batchTaskId, ".rds") setwd(batchTaskWorkingDirectory) @@ -87,12 +83,11 @@ result <- lapply(taskArgs, function(args) { eval(azbatchenv$expr, azbatchenv$exportenv) }, error = function(e) { - print(e) + workerErrorStatus <<- 1 + e }) }) -fileResultName <- strsplit(batchTaskEnvironment, "[.]")[[1]][1] - if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) { result <- Reduce(azbatchenv$gather, result) } @@ -100,9 +95,10 @@ if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) { saveRDS(result, file = file.path( batchTaskWorkingDirectory, - paste0(fileResultName, "-result.rds") + paste0(batchTaskId, "-result.rds") )) +cat(paste0("Error Code: ", workerErrorStatus, fill = TRUE)) quit(save = "yes", - status = 0, + status = workerErrorStatus, runLast = FALSE) diff --git a/man/waitForTasksToComplete.Rd b/man/waitForTasksToComplete.Rd index 4ae3b86..d41c545 100644 --- a/man/waitForTasksToComplete.Rd +++ b/man/waitForTasksToComplete.Rd @@ -4,7 +4,7 @@ \alias{waitForTasksToComplete} \title{Wait for current tasks to complete} \usage{ -waitForTasksToComplete(jobId, timeout) +waitForTasksToComplete(jobId, timeout, errorhandling = "stop") } \description{ Wait for current tasks to complete