Fixed worker and merger scripts (#116)

* Fixed worker and merger scripts

* Fixed verbose logs based on PR comments

* Added documentation on error handling

* Fixed header on table markdown

* Fixed based on PR comments
This commit is contained in:
Brian 2017-09-28 15:18:06 -07:00 коммит произвёл GitHub
Родитель 368eeb85c8
Коммит 3dadf0803e
8 изменённых файлов: 667 добавлений и 523 удалений

Просмотреть файл

@ -186,7 +186,6 @@ setHttpTraffic <- function(value = FALSE) {
}
pkgName <- if (exists("packageName", mode = "function"))
packageName(envir)
else
NULL
@ -414,16 +413,10 @@ setHttpTraffic <- function(value = FALSE) {
taskId <- paste0(id, "-task", i)
.addTask(
id,
jobId = id,
taskId = taskId,
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R %s %s %s %s > %s.txt",
"$AZ_BATCH_JOB_PREP_WORKING_DIR",
"$AZ_BATCH_TASK_WORKING_DIR",
jobFileName,
paste0(taskId, ".rds"),
taskId
),
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R > $AZ_BATCH_TASK_ID.txt"),
args = argsList[startIndex:endIndex],
envir = .doAzureBatchGlobals,
packages = obj$packages,
@ -436,17 +429,15 @@ setHttpTraffic <- function(value = FALSE) {
rAzureBatch::updateJob(id)
if (enableCloudCombine) {
mergeTaskId <- paste0(id, "-merge")
.addTask(
id,
taskId = paste0(id, "-merge"),
jobId = id,
taskId = mergeTaskId,
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s %s %s > %s.txt",
"$AZ_BATCH_JOB_PREP_WORKING_DIR",
"$AZ_BATCH_TASK_WORKING_DIR",
id,
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s > $AZ_BATCH_TASK_ID.txt",
length(tasks),
ntasks,
paste0(id, "-merge")
chunkSize,
as.character(obj$errorHandling)
),
envir = .doAzureBatchGlobals,
packages = obj$packages,
@ -461,61 +452,67 @@ setHttpTraffic <- function(value = FALSE) {
waitForJobPreparation(id, data$poolId)
}
waitForTasksToComplete(id, jobTimeout)
tryCatch({
waitForTasksToComplete(id, jobTimeout, obj$errorHandling)
if (typeof(cloudCombine) == "list" && enableCloudCombine) {
tempFile <- tempfile("doAzureParallel", fileext = ".rds")
if (typeof(cloudCombine) == "list" && enableCloudCombine) {
tempFile <- tempfile("doAzureParallel", fileext = ".rds")
response <-
rAzureBatch::downloadBlob(
id,
paste0("result/", id, "-merge-result.rds"),
sasToken = sasToken,
accountName = storageCredentials$name,
downloadPath = tempFile,
overwrite = TRUE
)
response <-
rAzureBatch::downloadBlob(
id,
paste0("result/", id, "-merge-result.rds"),
sasToken = sasToken,
accountName = storageCredentials$name,
downloadPath = tempFile,
overwrite = TRUE
)
results <- readRDS(tempFile)
results <- readRDS(tempFile)
failTasks <- sapply(results, .isError)
failTasks <- sapply(results, .isError)
numberOfFailedTasks <- sum(unlist(failTasks))
numberOfFailedTasks <- sum(unlist(failTasks))
if (numberOfFailedTasks > 0) {
.createErrorViewerPane(id, failTasks)
}
if (numberOfFailedTasks > 0) {
.createErrorViewerPane(id, failTasks)
}
accumulator <- foreach::makeAccum(it)
accumulator <- foreach::makeAccum(it)
tryCatch(
accumulator(results, seq(along = results)),
error = function(e) {
cat("error calling combine function:\n")
print(e)
tryCatch(
accumulator(results, seq(along = results)),
error = function(e) {
cat("error calling combine function:\n")
print(e)
}
)
# check for errors
errorValue <- foreach::getErrorValue(it)
errorIndex <- foreach::getErrorIndex(it)
cat(sprintf("Number of errors: %i", numberOfFailedTasks),
fill = TRUE)
rAzureBatch::deleteJob(id)
if (identical(obj$errorHandling, "stop") &&
!is.null(errorValue)) {
msg <- sprintf("task %d failed - '%s'",
errorIndex,
conditionMessage(errorValue))
stop(simpleError(msg, call = expr))
}
else {
foreach::getResult(it)
}
}
)
# check for errors
errorValue <- foreach::getErrorValue(it)
errorIndex <- foreach::getErrorIndex(it)
cat(sprintf("Number of errors: %i", numberOfFailedTasks),
fill = TRUE)
rAzureBatch::deleteJob(id)
if (identical(obj$errorHandling, "stop") &&
!is.null(errorValue)) {
msg <- sprintf("task %d failed - '%s'",
errorIndex,
conditionMessage(errorValue))
stop(simpleError(msg, call = expr))
},
error = function(ex){
message(ex)
}
else {
foreach::getResult(it)
}
}
)
}
else{
print(

Просмотреть файл

@ -29,9 +29,17 @@
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
exitConditions <- NULL
if (!is.null(args$dependsOn)) {
dependsOn <- list(taskIds = dependsOn)
}
else {
exitConditions <- list(
default = list(
dependencyAction = "satisfy"
)
)
}
resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name
@ -122,7 +130,8 @@
resourceFiles = resourceFiles,
commandLine = commands,
dependsOn = dependsOn,
outputFiles = outputFiles
outputFiles = outputFiles,
exitConditions = exitConditions
)
}

Просмотреть файл

@ -257,7 +257,8 @@ getJobResult <- function(jobId = "", ...) {
if (!is.null(args$container)) {
results <-
rAzureBatch::downloadBlob(args$container, paste0("result/", jobId, "-merge-result.rds"))
rAzureBatch::downloadBlob(args$container,
paste0("result/", jobId, "-merge-result.rds"))
}
else{
results <-
@ -437,58 +438,25 @@ createOutputFile <- function(filePattern, url) {
output
}
#' Wait for current tasks to complete
#'
#' @export
waitForTasksToComplete <- function(jobId, timeout) {
cat("Waiting for tasks to complete. . .", fill = TRUE)
waitForTasksToComplete <-
function(jobId, timeout, errorHandling = "stop") {
cat("Waiting for tasks to complete. . .", fill = TRUE)
numOfTasks <- 0
currentTasks <- rAzureBatch::listTask(jobId)
if (is.null(currentTasks$value)) {
stop(paste0("Error: ", currentTasks$message$value))
return()
}
numOfTasks <- numOfTasks + length(currentTasks$value)
# Getting the total count of tasks for progress bar
repeat {
if (is.null(currentTasks$odata.nextLink)) {
break
}
skipTokenParameter <-
strsplit(currentTasks$odata.nextLink, "&")[[1]][2]
skipTokenValue <-
substr(skipTokenParameter,
nchar("$skiptoken=") + 1,
nchar(skipTokenParameter))
currentTasks <-
rAzureBatch::listTask(jobId, skipToken = URLdecode(skipTokenValue))
numOfTasks <- numOfTasks + length(currentTasks$value)
}
pb <- txtProgressBar(min = 0, max = numOfTasks, style = 3)
timeToTimeout <- Sys.time() + timeout
while (Sys.time() < timeToTimeout) {
count <- 0
totalTasks <- 0
currentTasks <- rAzureBatch::listTask(jobId)
taskStates <-
lapply(currentTasks$value, function(x)
x$state != "completed")
for (i in 1:length(taskStates)) {
if (taskStates[[i]] == FALSE) {
count <- count + 1
}
if (is.null(currentTasks$value)) {
stop(paste0("Error: ", currentTasks$message$value))
return()
}
totalTasks <- totalTasks + length(currentTasks$value)
# Getting the total count of tasks for progress bar
repeat {
if (is.null(currentTasks$odata.nextLink)) {
break
@ -505,30 +473,84 @@ waitForTasksToComplete <- function(jobId, timeout) {
currentTasks <-
rAzureBatch::listTask(jobId, skipToken = URLdecode(skipTokenValue))
taskStates <-
lapply(currentTasks$value, function(x)
x$state != "completed")
totalTasks <- totalTasks + length(currentTasks$value)
}
for (i in 1:length(taskStates)) {
if (taskStates[[i]] == FALSE) {
count <- count + 1
pb <- txtProgressBar(min = 0, max = totalTasks, style = 3)
timeToTimeout <- Sys.time() + timeout
repeat {
taskCounts <- rAzureBatch::getJobTaskCounts(jobId)
setTxtProgressBar(pb, taskCounts$completed)
validationFlag <-
(taskCounts$validationStatus == "Validated" &&
totalTasks <= 200000) ||
totalTasks > 200000
if (taskCounts$failed > 0 &&
errorHandling == "stop" &&
validationFlag) {
cat("\n")
select <- "id, executionInfo"
failedTasks <-
rAzureBatch::listTask(jobId, select = select)
tasksFailureWarningLabel <-
sprintf(paste("%i task(s) failed while running the job.",
"This caused the job to terminate automatically.",
"To disable this behavior and continue on failure, set .errorHandling='remove | pass'",
"in the foreach loop\n"), taskCounts$failed)
for (i in 1:length(failedTasks$value)) {
if (failedTasks$value[[i]]$executionInfo$result == "Failure") {
tasksFailureWarningLabel <-
paste0(tasksFailureWarningLabel,
sprintf("%s\n", failedTasks$value[[i]]$id))
}
}
warning(sprintf(tasksFailureWarningLabel,
taskCounts$failed))
response <- rAzureBatch::terminateJob(jobId)
httr::stop_for_status(response)
stop(sprintf(
paste("Errors have occurred while running the job '%s'.",
"Error handling is set to 'stop' and has proceeded to terminate the job.",
"The user will have to handle deleting the job.",
"If this is not the correct behavior, change the errorHandling property to 'pass'",
" or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.",
"For more information about getting job logs, follow this link:",
paste0("https://github.com/Azure/doAzureParallel/blob/master/docs/",
"40-troubleshooting.md#viewing-files-directly-from-compute-node")),
jobId
))
}
if (Sys.time() > timeToTimeout) {
stop(sprintf(paste("Timeout has occurred while waiting for tasks to complete.",
"Users will have to manually track the job '%s' and get the results.",
"Use the getJobResults function to obtain the results and getJobList for",
"tracking job status. To change the timeout, set 'timeout' property in the",
"foreach's options.azure.")),
jobId)
}
if (taskCounts$completed >= totalTasks &&
(taskCounts$validationStatus == "Validated" ||
totalTasks >= 200000)) {
cat("\n")
return(0)
}
Sys.sleep(10)
}
setTxtProgressBar(pb, count)
if (all(taskStates == FALSE)) {
cat("\n")
return(0)
}
Sys.sleep(10)
}
stop("A timeout has occurred when waiting for tasks to complete.")
}
waitForJobPreparation <- function(jobId, poolId) {
cat("Job Preparation Status: Package(s) being installed")
@ -556,11 +578,13 @@ waitForJobPreparation <- function(jobId, poolId) {
# Verify that all the job preparation tasks are not failing
if (all(FALSE %in% statuses)) {
cat("\n")
stop(paste(
sprintf("Job '%s' unable to install packages.", jobId),
"Use the 'getJobFile' function to get more information about",
"job package installation."
))
stop(
paste(
sprintf("Job '%s' unable to install packages.", jobId),
"Use the 'getJobFile' function to get more information about",
"job package installation."
)
)
}
cat(".")
@ -574,6 +598,6 @@ getXmlValues <- function(xmlResponse, xmlPath) {
xml2::xml_text(xml2::xml_find_all(xmlResponse, xmlPath))
}
areShallowEqual <- function(a, b){
areShallowEqual <- function(a, b) {
!is.null(a) && !is.null(b) && a == b
}

721
README.md
Просмотреть файл

@ -1,335 +1,386 @@
[![Build Status](https://travis-ci.org/Azure/doAzureParallel.svg?branch=master)](https://travis-ci.org/Azure/doAzureParallel)
# doAzureParallel
```R
# set your credentials
setCredentials("credentials.json")
# setup your cluster with a simple config file
cluster<- makeCluster("cluster.json")
# register the cluster as your parallel backend
registerDoAzureParallel(cluster)
# run your foreach loop on a distributed cluster in Azure
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations) %dopar% {
myParallelAlgorithm()
}
```
## Introduction
The *doAzureParallel* package is a parallel backend for the widely popular *foreach* package. With *doAzureParallel*, each iteration of the *foreach* loop runs in parallel on an Azure Virtual Machine (VM), allowing users to scale up their R jobs to tens or hundreds of machines.
*doAzureParallel* is built to support the *foreach* parallel computing package. The *foreach* package supports parallel execution - it can execute multiple processes across some parallel backend. With just a few lines of code, the *doAzureParallel* package helps create a cluster in Azure, register it as a parallel backend, and seamlessly connects to the *foreach* package.
NOTE: The terms *pool* and *cluster* are used interchangably throughout this document.
## Dependencies
- R (>= 3.3.1)
- httr (>= 1.2.1)
- rjson (>= 0.2.15)
- RCurl (>= 1.95-4.8)
- digest (>= 0.6.9)
- foreach (>= 1.4.3)
- iterators (>= 1.0.8)
- bitops (>= 1.0.5)
## Installation
Install doAzureParallel directly from Github.
```R
# install the package devtools
install.packages("devtools")
library(devtools)
# install the doAzureParallel and rAzureBatch package
install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel"))
```
## Azure Requirements
To run your R code across a cluster in Azure, we'll need to get keys and account information.
### Setup Azure Account
First, set up your Azure Account ([Get started for free!](https://azure.microsoft.com/en-us/free/))
Once you have an Azure account, you'll need to create the following two services in the Azure portal:
- Azure Batch Account ([Create an Azure Batch Account in the Portal](https://docs.microsoft.com/en-us/azure/Batch/batch-account-create-portal))
- Azure Storage Account (this can be created with the Batch Account)
### Get Keys and Account Information
For your Azure Batch Account, we need to get:
- Batch Account Name
- Batch Account URL
- Batch Account Access Key
This information can be found in the Azure Portal inside your Batch Account:
![Azure Batch Acccount in the Portal](./vignettes/doAzureParallel-azurebatch-instructions.PNG "Azure Batch Acccount in the Portal")
For your Azure Storage Account, we need to get:
- Storage Account Name
- Storage Account Access Key
This information can be found in the Azure Portal inside your Azure Storage Account:
![Azure Storage Acccount in the Portal](./vignettes/doAzureParallel-azurestorage-instructions.PNG "Azure Storage Acccount in the Portal")
Keep track of the above keys and account information as it will be used to connect your R session with Azure.
## Getting Started
Import the package
```R
library(doAzureParallel)
```
Set up your parallel backend with Azure. This is your set of Azure VMs.
```R
# 1. Generate your credential and cluster configuration files.
generateClusterConfig("cluster.json")
generateCredentialsConfig("credentials.json")
# 2. Fill out your credential config and cluster config files.
# Enter your Azure Batch Account & Azure Storage keys/account-info into your credential config ("credentials.json") and configure your cluster in your cluster config ("cluster.json")
# 3. Set your credentials - you need to give the R session your credentials to interact with Azure
setCredentials("credentials.json")
# 4. Register the pool. This will create a new pool if your pool hasn't already been provisioned.
cluster <- makeCluster("cluster.json")
# 5. Register the pool as your parallel backend
registerDoAzureParallel(cluster)
# 6. Check that your parallel backend has been registered
getDoParWorkers()
```
Run your parallel *foreach* loop with the *%dopar%* keyword. The *foreach* function will return the results of your parallel code.
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations) %dopar% {
# This code is executed, in parallel, across your cluster.
myAlgorithm()
}
```
After you finish running your R code in Azure, you may want to shut down your cluster of VMs to make sure that you are not being charged anymore.
```R
# shut down your pool
stopCluster(cluster)
```
### Configuration JSON files
#### Credentials
Use your credential config JSON file to enter your credentials.
```javascript
{
"batchAccount": {
"name": <Azure Batch Account Name>,
"key": <Azure Batch Account Key>,
"url": <Azure Batch Account URL>
},
"storageAccount": {
"name": <Azure Storage Account Name>,
"key": <Azure Storage Account Key>
}
}
```
Learn more:
- [Batch account / Storage account](./README.md#azure-requirements)
#### Cluster Settings
Use your pool configuration JSON file to define your pool in Azure.
```javascript
{
"name": <your pool name>, // example: "myazurecluster"
"vmSize": <your pool VM size name>, // example: "Standard_F2"
"maxTasksPerNode": <num tasks to allocate to each node>, // example: "2"
"poolSize": {
"dedicatedNodes": { // dedicated vms
"min": 2,
"max": 2
},
"lowPriorityNodes": { // low priority vms
"min": 1,
"max": 10
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": ["some_cran_package", "some_other_cran_package"],
"github": ["username/some_github_package", "another_username/some_other_github_package"],
"githubAuthenticationToken": {}
},
"commandLine": []
}
```
NOTE: If you do **not** want your cluster to autoscale, simply set the number of min nodes equal to max nodes for low-priority and dedicated.
Learn more:
- [Choosing VM size](./docs/10-vm-sizes.md#vm-size-table)
- [MaxTasksPerNode](./docs/22-parallelizing-cores.md)
- [LowPriorityNodes](#low-priority-vms)
- [Autoscale](./docs/11-autoscale.md)
- [PoolSize Limitations](./docs/12-quota-limitations.md)
- [rPackages](./docs/20-package-management.md)
### Low Priority VMs
Low-priority VMs are a way to obtain and consume Azure compute at a much lower price using Azure Batch. Since doAzureParallel is built on top of Azure Batch, this package is able to take advantage of low-priority VMs and allocate compute resources from Azure's surplus capacity at up to **80% discount**.
Low-priority VMs come with the understanding that when you request it, there is the possibility that we'll need to take some or all of it back. Hence the name *low-priority* - VMs may not be allocated or may be preempted due to higher priority allocations, which equate to full-priced VMs that have an SLA.
And as the name suggests, this significant cost reduction is ideal for *low priority* workloads that do not have a strict performance requirement.
With Azure Batch's first-class support for low-priority VMs, you can use them in conjunction with normal on-demand VMs (*dedicated VMs*) and enable job cost to be balanced with job execution flexibility:
* Batch pools can contain both on-demand nodes and low-priority nodes. The two types can be independently scaled, either explicitly with the resize operation or automatically using auto-scale. Different configurations can be used, such as maximizing cost savings by always using low-priority nodes or spinning up on-demand nodes at full price, to maintain capacity by replacing any preempted low-priority nodes.
* If any low-priority nodes are preempted, then Batch will automatically attempt to replace the lost capacity, continually seeking to maintain the target amount of low-priority capacity in the pool.
* If tasks are interrupted when the node on which it is running is preempted, then the tasks are automatically re-queued to be re-run.
For more information about low-priority VMs, please visit the [documentation](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms).
You can also check out information on low-priority pricing [here](https://azure.microsoft.com/en-us/pricing/details/batch/).
### Distributing Data
When developing at scale, you may also want to chunk up your data and distribute the data across your nodes. Learn more about that [here](./docs/21-distributing-data.md#chunking-data)
### Using %do% vs %dopar%
When developing at scale, it is always recommended that you test and debug your code locally first. Switch between *%dopar%* and *%do%* to toggle between running in parallel on Azure and running in sequence on your local machine.
```R
# run your code sequentially on your local machine
results <- foreach(i = 1:number_of_iterations) %do% { ... }
# use the doAzureParallel backend to run your code in parallel across your Azure cluster
results <- foreach(i = 1:number_of_iterations) %dopar% { ... }
```
### Long-running Jobs + Job Management
doAzureParallel also helps you manage your jobs so that you can run many jobs at once while managing it through a few simple methods.
```R
# List your jobs:
getJobList()
```
This will also let you run *long running jobs* easily.
With long running jobs, you will need to keep track of your jobs as well as set your job to a non-blocking state. You can do this with the *.options.azure* options:
```R
# set the .options.azure option in the foreach loop
opt <- list(job = 'unique_job_id', wait = FALSE)
# NOTE - if the option wait = FALSE, foreach will return your unique job id
job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { ... }
# get back your job results with your unique job id
results <- getJobResult(job_id)
```
Finally, you may also want to track the status of jobs that you've name:
```R
# List specific jobs:
getJobList(c('unique_job_id', 'another_job_id'))
```
You can learn more about how to execute long-running jobs [here](./docs/23-persistent-storage.md).
With long-running jobs, you can take advantage of Azure's autoscaling capabilities to save time and/or money. Learn more about autoscale [here](./docs/11-autoscale.md).
### Using the 'chunkSize' option
doAzureParallel also supports custom chunk sizes. This option allows you to group iterations of the foreach loop together and execute them in a single R session.
```R
# set the chunkSize option
opt <- list(chunkSize = 3)
results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... }
```
You should consider using the chunkSize if each iteration in the loop executes very quickly.
If you have a static cluster and want to have a single chunk for each worker, you can compute the chunkSize as follows:
```R
# compute the chunk size
cs <- ceiling(number_of_iterations / getDoParWorkers())
# run the foreach loop with chunkSize optimized
opt <- list(chunkSize = cs)
results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... }
```
### Resizing Your Cluster
At some point, you may also want to resize your cluster manually. You can do this simply with the command *resizeCluster*.
```R
cluster <- makeCluster("cluster.json")
# resize so that we have a min of 10 dedicated nodes and a max of 20 dedicated nodes
# AND a min of 10 low priority nodes and a max of 20 low priority nodes
resizeCluster(
cluster,
dedicatedMin = 10,
dedicatedMax = 20,
lowPriorityMin = 10,
lowPriorityMax = 20,
algorithm = 'QUEUE',
timeInterval = '5m' )
```
If your cluster is using autoscale but you want to set it to a static size of 10, you can also use this method:
```R
# resize to a static cluster of 10
resizeCluster(cluster,
dedicatedMin = 10,
dedicatedMax = 10,
lowPriorityMin = 0,
lowPriorityMax = 0)
```
### Setting Verbose Mode to Debug
To debug your doAzureParallel jobs, you can set the package to operate on *verbose* mode:
```R
# turn on verbose mode
setVerbose(TRUE)
# turn off verbose mode
setVerbose(FALSE)
```
### Bypassing merge task
Skipping the merge task is useful when the tasks results don't need to be merged into a list. To bypass the merge task, you can pass the *enableMerge* flag to the foreach object:
```R
# Enable merge task
foreach(i = 1:3, .options.azure = list(enableMerge = TRUE))
# Disable merge task
foreach(i = 1:3, .options.azure = list(enableMerge = FALSE))
```
Note: User defined functions for the merge task is on our list of features that we are planning on doing.
## Next Steps
For more information, please visit [our documentation](./docs/README.md).
[![Build Status](https://travis-ci.org/Azure/doAzureParallel.svg?branch=master)](https://travis-ci.org/Azure/doAzureParallel)
# doAzureParallel
```R
# set your credentials
setCredentials("credentials.json")
# setup your cluster with a simple config file
cluster<- makeCluster("cluster.json")
# register the cluster as your parallel backend
registerDoAzureParallel(cluster)
# run your foreach loop on a distributed cluster in Azure
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations) %dopar% {
myParallelAlgorithm()
}
```
## Introduction
The *doAzureParallel* package is a parallel backend for the widely popular *foreach* package. With *doAzureParallel*, each iteration of the *foreach* loop runs in parallel on an Azure Virtual Machine (VM), allowing users to scale up their R jobs to tens or hundreds of machines.
*doAzureParallel* is built to support the *foreach* parallel computing package. The *foreach* package supports parallel execution - it can execute multiple processes across some parallel backend. With just a few lines of code, the *doAzureParallel* package helps create a cluster in Azure, register it as a parallel backend, and seamlessly connects to the *foreach* package.
NOTE: The terms *pool* and *cluster* are used interchangably throughout this document.
## Dependencies
- R (>= 3.3.1)
- httr (>= 1.2.1)
- rjson (>= 0.2.15)
- RCurl (>= 1.95-4.8)
- digest (>= 0.6.9)
- foreach (>= 1.4.3)
- iterators (>= 1.0.8)
- bitops (>= 1.0.5)
## Installation
Install doAzureParallel directly from Github.
```R
# install the package devtools
install.packages("devtools")
library(devtools)
# install the doAzureParallel and rAzureBatch package
install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel"))
```
## Azure Requirements
To run your R code across a cluster in Azure, we'll need to get keys and account information.
### Setup Azure Account
First, set up your Azure Account ([Get started for free!](https://azure.microsoft.com/en-us/free/))
Once you have an Azure account, you'll need to create the following two services in the Azure portal:
- Azure Batch Account ([Create an Azure Batch Account in the Portal](https://docs.microsoft.com/en-us/azure/Batch/batch-account-create-portal))
- Azure Storage Account (this can be created with the Batch Account)
### Get Keys and Account Information
For your Azure Batch Account, we need to get:
- Batch Account Name
- Batch Account URL
- Batch Account Access Key
This information can be found in the Azure Portal inside your Batch Account:
![Azure Batch Acccount in the Portal](./vignettes/doAzureParallel-azurebatch-instructions.PNG "Azure Batch Acccount in the Portal")
For your Azure Storage Account, we need to get:
- Storage Account Name
- Storage Account Access Key
This information can be found in the Azure Portal inside your Azure Storage Account:
![Azure Storage Acccount in the Portal](./vignettes/doAzureParallel-azurestorage-instructions.PNG "Azure Storage Acccount in the Portal")
Keep track of the above keys and account information as it will be used to connect your R session with Azure.
## Getting Started
Import the package
```R
library(doAzureParallel)
```
Set up your parallel backend with Azure. This is your set of Azure VMs.
```R
# 1. Generate your credential and cluster configuration files.
generateClusterConfig("cluster.json")
generateCredentialsConfig("credentials.json")
# 2. Fill out your credential config and cluster config files.
# Enter your Azure Batch Account & Azure Storage keys/account-info into your credential config ("credentials.json") and configure your cluster in your cluster config ("cluster.json")
# 3. Set your credentials - you need to give the R session your credentials to interact with Azure
setCredentials("credentials.json")
# 4. Register the pool. This will create a new pool if your pool hasn't already been provisioned.
cluster <- makeCluster("cluster.json")
# 5. Register the pool as your parallel backend
registerDoAzureParallel(cluster)
# 6. Check that your parallel backend has been registered
getDoParWorkers()
```
Run your parallel *foreach* loop with the *%dopar%* keyword. The *foreach* function will return the results of your parallel code.
```R
number_of_iterations <- 10
results <- foreach(i = 1:number_of_iterations) %dopar% {
# This code is executed, in parallel, across your cluster.
myAlgorithm()
}
```
After you finish running your R code in Azure, you may want to shut down your cluster of VMs to make sure that you are not being charged anymore.
```R
# shut down your pool
stopCluster(cluster)
```
### Configuration JSON files
#### Credentials
Use your credential config JSON file to enter your credentials.
```javascript
{
"batchAccount": {
"name": <Azure Batch Account Name>,
"key": <Azure Batch Account Key>,
"url": <Azure Batch Account URL>
},
"storageAccount": {
"name": <Azure Storage Account Name>,
"key": <Azure Storage Account Key>
}
}
```
Learn more:
- [Batch account / Storage account](./README.md#azure-requirements)
#### Cluster Settings
Use your pool configuration JSON file to define your pool in Azure.
```javascript
{
"name": <your pool name>, // example: "myazurecluster"
"vmSize": <your pool VM size name>, // example: "Standard_F2"
"maxTasksPerNode": <num tasks to allocate to each node>, // example: "2"
"poolSize": {
"dedicatedNodes": { // dedicated vms
"min": 2,
"max": 2
},
"lowPriorityNodes": { // low priority vms
"min": 1,
"max": 10
},
"autoscaleFormula": "QUEUE"
},
"rPackages": {
"cran": ["some_cran_package", "some_other_cran_package"],
"github": ["username/some_github_package", "another_username/some_other_github_package"],
"githubAuthenticationToken": {}
},
"commandLine": []
}
```
NOTE: If you do **not** want your cluster to autoscale, simply set the number of min nodes equal to max nodes for low-priority and dedicated.
Learn more:
- [Choosing VM size](./docs/10-vm-sizes.md#vm-size-table)
- [MaxTasksPerNode](./docs/22-parallelizing-cores.md)
- [LowPriorityNodes](#low-priority-vms)
- [Autoscale](./docs/11-autoscale.md)
- [PoolSize Limitations](./docs/12-quota-limitations.md)
- [rPackages](./docs/20-package-management.md)
### Low Priority VMs
Low-priority VMs are a way to obtain and consume Azure compute at a much lower price using Azure Batch. Since doAzureParallel is built on top of Azure Batch, this package is able to take advantage of low-priority VMs and allocate compute resources from Azure's surplus capacity at up to **80% discount**.
Low-priority VMs come with the understanding that when you request it, there is the possibility that we'll need to take some or all of it back. Hence the name *low-priority* - VMs may not be allocated or may be preempted due to higher priority allocations, which equate to full-priced VMs that have an SLA.
And as the name suggests, this significant cost reduction is ideal for *low priority* workloads that do not have a strict performance requirement.
With Azure Batch's first-class support for low-priority VMs, you can use them in conjunction with normal on-demand VMs (*dedicated VMs*) and enable job cost to be balanced with job execution flexibility:
* Batch pools can contain both on-demand nodes and low-priority nodes. The two types can be independently scaled, either explicitly with the resize operation or automatically using auto-scale. Different configurations can be used, such as maximizing cost savings by always using low-priority nodes or spinning up on-demand nodes at full price, to maintain capacity by replacing any preempted low-priority nodes.
* If any low-priority nodes are preempted, then Batch will automatically attempt to replace the lost capacity, continually seeking to maintain the target amount of low-priority capacity in the pool.
* If tasks are interrupted when the node on which it is running is preempted, then the tasks are automatically re-queued to be re-run.
For more information about low-priority VMs, please visit the [documentation](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms).
You can also check out information on low-priority pricing [here](https://azure.microsoft.com/en-us/pricing/details/batch/).
### Distributing Data
When developing at scale, you may also want to chunk up your data and distribute the data across your nodes. Learn more about that [here](./docs/21-distributing-data.md#chunking-data)
### Using %do% vs %dopar%
When developing at scale, it is always recommended that you test and debug your code locally first. Switch between *%dopar%* and *%do%* to toggle between running in parallel on Azure and running in sequence on your local machine.
```R
# run your code sequentially on your local machine
results <- foreach(i = 1:number_of_iterations) %do% { ... }
# use the doAzureParallel backend to run your code in parallel across your Azure cluster
results <- foreach(i = 1:number_of_iterations) %dopar% { ... }
```
### Error Handling
The errorhandling option specifies how failed tasks should be evaluated. By default, the error handling is 'stop' to ensure users' can have reproducible results. If a combine function is assigned, it must be able to handle error objects.
Error Handling Type | Description
--- | ---
stop | The execution of the foreach will stop if an error occurs
pass | The error object of the task is included the results
remove | The result of a failed task will not be returned
```R
# Remove R error objects from the results
res <- foreach::foreach(i = 1:4, .errorhandling = "remove") %dopar% {
if (i == 2 || i == 4) {
randomObject
}
mean(1:3)
}
#> res
#[[1]]
#[1] 2
#
#[[2]]
#[1] 2
```
```R
# Passing R error objects into the results
res <- foreach::foreach(i = 1:4, .errorhandling = "pass") %dopar% {
if (i == 2|| i == 4) {
randomObject
}
sum(i, 1)
}
#> res
#[[1]]
#[1] 2
#
#[[2]]
#<simpleError in eval(expr, envir, enclos): object 'randomObject' not found>
#
#[[3]]
#[1] 4
#
#[[4]]
#<simpleError in eval(expr, envir, enclos): object 'randomObject' not found>
```
### Long-running Jobs + Job Management
doAzureParallel also helps you manage your jobs so that you can run many jobs at once while managing it through a few simple methods.
```R
# List your jobs:
getJobList()
```
This will also let you run *long running jobs* easily.
With long running jobs, you will need to keep track of your jobs as well as set your job to a non-blocking state. You can do this with the *.options.azure* options:
```R
# set the .options.azure option in the foreach loop
opt <- list(job = 'unique_job_id', wait = FALSE)
# NOTE - if the option wait = FALSE, foreach will return your unique job id
job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { ... }
# get back your job results with your unique job id
results <- getJobResult(job_id)
```
Finally, you may also want to track the status of jobs that you've name:
```R
# List specific jobs:
getJobList(c('unique_job_id', 'another_job_id'))
```
You can learn more about how to execute long-running jobs [here](./docs/23-persistent-storage.md).
With long-running jobs, you can take advantage of Azure's autoscaling capabilities to save time and/or money. Learn more about autoscale [here](./docs/11-autoscale.md).
### Using the 'chunkSize' option
doAzureParallel also supports custom chunk sizes. This option allows you to group iterations of the foreach loop together and execute them in a single R session.
```R
# set the chunkSize option
opt <- list(chunkSize = 3)
results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... }
```
You should consider using the chunkSize if each iteration in the loop executes very quickly.
If you have a static cluster and want to have a single chunk for each worker, you can compute the chunkSize as follows:
```R
# compute the chunk size
cs <- ceiling(number_of_iterations / getDoParWorkers())
# run the foreach loop with chunkSize optimized
opt <- list(chunkSize = cs)
results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... }
```
### Resizing Your Cluster
At some point, you may also want to resize your cluster manually. You can do this simply with the command *resizeCluster*.
```R
cluster <- makeCluster("cluster.json")
# resize so that we have a min of 10 dedicated nodes and a max of 20 dedicated nodes
# AND a min of 10 low priority nodes and a max of 20 low priority nodes
resizeCluster(
cluster,
dedicatedMin = 10,
dedicatedMax = 20,
lowPriorityMin = 10,
lowPriorityMax = 20,
algorithm = 'QUEUE',
timeInterval = '5m' )
```
If your cluster is using autoscale but you want to set it to a static size of 10, you can also use this method:
```R
# resize to a static cluster of 10
resizeCluster(cluster,
dedicatedMin = 10,
dedicatedMax = 10,
lowPriorityMin = 0,
lowPriorityMax = 0)
```
### Setting Verbose Mode to Debug
To debug your doAzureParallel jobs, you can set the package to operate on *verbose* mode:
```R
# turn on verbose mode
setVerbose(TRUE)
# turn off verbose mode
setVerbose(FALSE)
```
### Bypassing merge task
Skipping the merge task is useful when the tasks results don't need to be merged into a list. To bypass the merge task, you can pass the *enableMerge* flag to the foreach object:
```R
# Enable merge task
foreach(i = 1:3, .options.azure = list(enableMerge = TRUE))
# Disable merge task
foreach(i = 1:3, .options.azure = list(enableMerge = FALSE))
```
Note: User defined functions for the merge task is on our list of features that we are planning on doing.
## Next Steps
For more information, please visit [our documentation](./docs/README.md).

Просмотреть файл

@ -2,17 +2,30 @@
args <- commandArgs(trailingOnly = TRUE)
# Assumption: devtools is already installed based on Azure DSVM
status <- tryCatch({
for (package in args) {
packageDirectory <- strsplit(package, "/")[[1]]
packageName <- packageDirectory[length(packageDirectory)]
for (package in args) {
packageDirectory <- strsplit(package, "/")[[1]]
packageName <- packageDirectory[length(packageDirectory)]
if (!require(packageName, character.only = TRUE)) {
devtools::install_github(packageDirectory)
require(packageName, character.only = TRUE)
if (!require(package, character.only = TRUE)) {
devtools::install_github(packageDirectory)
require(package, character.only = TRUE)
}
}
}
return(0)
},
error = function(e) {
cat(sprintf(
"Error getting parent environment: %s\n",
conditionMessage(e)
))
# Install packages doesn't return a non-exit code.
# Using '1' as the default non-exit code
return(1)
})
quit(save = "yes",
status = 0,
status = status,
runLast = FALSE)

Просмотреть файл

@ -1,64 +1,118 @@
#!/usr/bin/Rscript
args <- commandArgs(trailingOnly = TRUE)
status <- 0
# test if there is at least one argument: if not, return an error
if (length(args) == 0) {
stop("At least one argument must be supplied (input file).n", call. = FALSE)
} else if (length(args) == 1) {
# default output file
args[2] <- "out.txt"
isError <- function(x) {
inherits(x, "simpleError") || inherits(x, "try-error")
}
batchJobPreparationDirectory <- args[1]
batchTaskWorkingDirectory <- args[2]
batchJobId <- args[3]
batchTasksCount <- as.integer(args[1])
chunkSize <- as.integer(args[2])
errorHandling <- args[3]
n <- args[4]
numOfTasks <- args[5]
batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
batchJobPreparationDirectory <-
Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
azbatchenv <-
readRDS(paste0(batchJobPreparationDirectory, "/", batchJobId, ".rds"))
setwd(batchTaskWorkingDirectory)
for (package in azbatchenv$packages) {
library(package, character.only = TRUE)
}
parent.env(azbatchenv$exportenv) <- globalenv()
sessionInfo()
enableCloudCombine <- azbatchenv$enableCloudCombine
cloudCombine <- azbatchenv$cloudCombine
if (typeof(cloudCombine) == "list" && enableCloudCombine) {
results <- vector("list", numOfTasks)
results <- vector("list", batchTasksCount * chunkSize)
count <- 1
tryCatch({
for (i in 1:n) {
taskResult <-
file.path(
batchTaskWorkingDirectory,
"result",
paste0(batchJobId, "-task", i, "-result.rds")
)
task <- readRDS(taskResult)
status <- tryCatch({
if (errorHandling == "remove" || errorHandling == "stop") {
files <- list.files(file.path(batchTaskWorkingDirectory,
"result"),
full.names = TRUE)
for (t in 1:length(task)) {
results[count] <- task[t]
count <- count + 1
if (errorHandling == "stop" && length(files) != batchTasksCount) {
stop(paste("Error handling is set to 'stop' and there are missing results due to",
"task failures. If this is not the correct behavior, change the errorHandling",
"property to 'pass' or 'remove' in the foreach object.",
"For more information on troubleshooting, check",
"https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md"))
}
results <- vector("list", length(files) * chunkSize)
for (i in 1:length(files)) {
task <- readRDS(files[i])
if (isError(task)) {
if (errorHandling == "stop") {
stop("Error found")
}
else {
next
}
}
for (t in 1:length(chunkSize)) {
results[count] <- task[t]
count <- count + 1
}
}
saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
))
}
else if (errorHandling == "pass") {
for (i in 1:batchTasksCount) {
taskResult <-
file.path(
batchTaskWorkingDirectory,
"result",
paste0(batchJobId, "-task", i, "-result.rds")
)
if (file.exists(taskResult)) {
task <- readRDS(taskResult)
for (t in 1:length(chunkSize)) {
results[count] <- task[t]
count <- count + 1
}
}
else {
for (t in 1:length(chunkSize)) {
results[count] <- NA
count <- count + 1
}
}
}
saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
))
}
saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
))
0
},
error = function(e) {
print(e)
1
})
} else {
# Work needs to be done for utilizing custom merge functions
}
quit(save = "yes",
status = 0,
status = status,
runLast = FALSE)

Просмотреть файл

@ -1,13 +1,6 @@
#!/usr/bin/Rscript
args <- commandArgs(trailingOnly = TRUE)
# test if there is at least one argument: if not, return an error
if (length(args) == 0) {
stop("At least one argument must be supplied (input file).n", call. = FALSE)
} else if (length(args) == 1) {
# default output file
args[2] <- "out.txt"
}
workerErrorStatus <- 0
getparentenv <- function(pkgname) {
parenv <- NULL
@ -55,10 +48,13 @@ getparentenv <- function(pkgname) {
parenv
}
batchJobPreparationDirectory <- args[1]
batchTaskWorkingDirectory <- args[2]
batchJobEnvironment <- args[3]
batchTaskEnvironment <- args[4]
batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID")
batchJobPreparationDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
batchJobEnvironment <- paste0(batchJobId, ".rds")
batchTaskEnvironment <- paste0(batchTaskId, ".rds")
setwd(batchTaskWorkingDirectory)
@ -87,12 +83,11 @@ result <- lapply(taskArgs, function(args) {
eval(azbatchenv$expr, azbatchenv$exportenv)
},
error = function(e) {
print(e)
workerErrorStatus <<- 1
e
})
})
fileResultName <- strsplit(batchTaskEnvironment, "[.]")[[1]][1]
if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) {
result <- Reduce(azbatchenv$gather, result)
}
@ -100,9 +95,10 @@ if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) {
saveRDS(result,
file = file.path(
batchTaskWorkingDirectory,
paste0(fileResultName, "-result.rds")
paste0(batchTaskId, "-result.rds")
))
cat(paste0("Error Code: ", workerErrorStatus, fill = TRUE))
quit(save = "yes",
status = 0,
status = workerErrorStatus,
runLast = FALSE)

Просмотреть файл

@ -4,7 +4,7 @@
\alias{waitForTasksToComplete}
\title{Wait for current tasks to complete}
\usage{
waitForTasksToComplete(jobId, timeout)
waitForTasksToComplete(jobId, timeout, errorhandling = "stop")
}
\description{
Wait for current tasks to complete