diff --git a/R/batch-api.R b/R/batch-api.R index 235b466..7d7f8de 100644 --- a/R/batch-api.R +++ b/R/batch-api.R @@ -19,7 +19,7 @@ BatchUtilities <- R6::R6Class( accountName <- storageClient$authentication$name - resourceFiles <- NULL + resourceFiles <- args$resourceFiles if (!is.null(argsList)) { envFile <- paste0(taskId, ".rds") saveRDS(argsList, file = envFile) @@ -37,8 +37,18 @@ BatchUtilities <- R6::R6Class( envFile, readToken, config$endpointSuffix) - resourceFiles <- - list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile)) + + environmentResourceFile <- + rAzureBatch::createResourceFile(filePath = envFile, httpUrl = envFileUrl) + + if (is.null(resourceFiles)) + { + resourceFiles <- + list(environmentResourceFile) + } + else { + resourceFiles <- append(resourceFiles, environmentResourceFile) + } } # Only use the download command if cloudCombine is enabled @@ -52,17 +62,6 @@ BatchUtilities <- R6::R6Class( if (!is.null(cloudCombine)) { assign("cloudCombine", cloudCombine, .doAzureBatchGlobals) - containerSettings$imageName <- "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1" - - copyCommand <- sprintf( - "%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds --endpoint %s", - accountName, - jobId, - "$AZ_BATCH_TASK_WORKING_DIR", - config$endpointSuffix - ) - - commands <- c(paste("blobxfer", copyCommand)) } exitConditions <- NULL diff --git a/R/cluster.R b/R/cluster.R index 0651d77..3536fad 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -123,7 +123,7 @@ makeCluster <- # install docker containerConfiguration <- list( - type = "docker" + type = "dockerCompatible" ) dockerImage <- "rocker/tidyverse:latest" diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 4302b76..aedf7da 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -474,12 +474,12 @@ setHttpTraffic <- function(value = FALSE) { storageEndpointSuffix = config$endpointSuffix) requiredJobResourceFiles <- list( - rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"), - rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"), - rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"), - rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"), - rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"), - rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName) + rAzureBatch::createResourceFile(filePath = "worker.R", httpUrl = workerScriptUrl), + rAzureBatch::createResourceFile(filePath = "merger.R", httpUrl = mergerScriptUrl), + rAzureBatch::createResourceFile(filePath = "install_github.R", httpUrl = installGithubScriptUrl), + rAzureBatch::createResourceFile(filePath = "install_cran.R", httpUrl = installCranScriptUrl), + rAzureBatch::createResourceFile(filePath = "install_bioconductor.R", httpUrl = installBioConductorScriptUrl), + rAzureBatch::createResourceFile(filePath = jobFileName, httpUrl = jobCommonFileUrl) ) resourceFiles <- @@ -669,6 +669,21 @@ setHttpTraffic <- function(value = FALSE) { ) ) + mergeReadSasToken <- storageClient$generateSasToken("rl", "c", id) + mergeResourceFileUrl <- + rAzureBatch::createBlobUrl( + storageAccount = storageClient$authentication$name, + containerName = id, + sasToken = mergeReadSasToken, + storageEndpointSuffix = config$endpointSuffix + ) + + mergeResources <- + list( + rAzureBatch::createResourceFile( + storageContainerUrl = mergeResourceFileUrl, + blobPrefix = "results")) + BatchUtilitiesOperations$addTask( jobId = id, taskId = "merge", @@ -684,7 +699,8 @@ setHttpTraffic <- function(value = FALSE) { dependsOn = taskDependencies, cloudCombine = cloudCombine, outputFiles = append(obj$options$azure$outputFiles, mergeOutput), - containerImage = data$containerImage + containerImage = data$containerImage, + resourceFiles = mergeResources ) cat(". . .") @@ -726,7 +742,7 @@ setHttpTraffic <- function(value = FALSE) { } if (!identical(function(a, ...) c(a, list(...)), - obj$combineInfo$fun, ignore.environment = TRUE)){ + obj$combineInfo$fun, ignore.environment = TRUE)) { tryCatch({ accumulator <- foreach::makeAccum(it) accumulator(results, as.numeric(names(results))) diff --git a/R/utility-job.R b/R/utility-job.R index dfe0398..1788632 100644 --- a/R/utility-job.R +++ b/R/utility-job.R @@ -472,19 +472,14 @@ waitForTasksToComplete <- flush.console() - validationFlag <- - (taskCounts$validationStatus == "Validated" && - totalTasks <= 200000) || - totalTasks > 200000 - if (taskCounts$failed > 0 && - errorHandling == "stop" && - validationFlag) { + errorHandling == "stop") { cat("\n") select <- "id, executionInfo" + filter <- "executionInfo/result eq 'failure'" failedTasks <- - batchClient$taskOperations$list(jobId, select = select) + batchClient$taskOperations$list(jobId, select = select, filter = filter) tasksFailureWarningLabel <- sprintf( @@ -498,14 +493,9 @@ waitForTasksToComplete <- ) for (i in 1:length(failedTasks$value)) { - if (!is.null(failedTasks$value[[i]]$executionInfo$result) && - grepl(failedTasks$value[[i]]$executionInfo$result, - "failure", - ignore.case = TRUE)) { tasksFailureWarningLabel <- paste0(tasksFailureWarningLabel, sprintf("%s\n", failedTasks$value[[i]]$id)) - } } warning(sprintf(tasksFailureWarningLabel, @@ -533,9 +523,10 @@ waitForTasksToComplete <- jobId) } - if (taskCounts$completed >= totalTasks && - (taskCounts$validationStatus == "Validated" || - totalTasks >= 200000)) { + jobInfo <- getJob(jobId, verbose = FALSE) + if (taskCounts$completed >= totalTasks || + jobInfo$jobState == "completed" || + jobInfo$jobState == "terminating") { cat("\n") break } diff --git a/docs/71-distributing-data.md b/docs/71-distributing-data.md index 3546c6b..081c2c8 100644 --- a/docs/71-distributing-data.md +++ b/docs/71-distributing-data.md @@ -39,12 +39,12 @@ Here's an example that uses data stored in a public location on Azure Blob Stora # define where to download data from resource_files = list( rAzureBatch::createResourceFile( - url = "https://.blob.core.windows.net//2010.csv", - fileName = "2010.csv" + httpUrl = "https://.blob.core.windows.net//2010.csv", + filePath = "2010.csv" ), rAzureBatch::createResourceFile( - url = "https://.blob.core.windows.net//2011.csv", - fileName = "2011.csv" + httpUrl = "https://.blob.core.windows.net//2011.csv", + filePath = "2011.csv" ) ) diff --git a/samples/resource_files/resource_files_example.R b/samples/resource_files/resource_files_example.R index 37fa6ac..b3544c8 100644 --- a/samples/resource_files/resource_files_example.R +++ b/samples/resource_files/resource_files_example.R @@ -34,12 +34,12 @@ doAzureParallel::setCredentials("credentials.json") # Using the NYC taxi datasets, http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml azureStorageUrl <- "http://playdatastore.blob.core.windows.net/nyc-taxi-dataset" resource_files <- list( - rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), fileName = "yellow_tripdata_2016-1.csv"), - rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), fileName = "yellow_tripdata_2016-2.csv"), - rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), fileName = "yellow_tripdata_2016-3.csv"), - rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), fileName = "yellow_tripdata_2016-4.csv"), - rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), fileName = "yellow_tripdata_2016-5.csv"), - rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), fileName = "yellow_tripdata_2016-6.csv") + rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), filePath = "yellow_tripdata_2016-1.csv"), + rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), filePath = "yellow_tripdata_2016-2.csv"), + rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), filePath = "yellow_tripdata_2016-3.csv"), + rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), filePath = "yellow_tripdata_2016-4.csv"), + rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), filePath = "yellow_tripdata_2016-5.csv"), + rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), filePath = "yellow_tripdata_2016-6.csv") ) # add the parameter 'resourceFiles' to download files to nodes diff --git a/samples/sas_resource_files/sas_resources_files_example.R b/samples/sas_resource_files/sas_resources_files_example.R index adf3da2..37ee93a 100644 --- a/samples/sas_resource_files/sas_resources_files_example.R +++ b/samples/sas_resource_files/sas_resources_files_example.R @@ -56,8 +56,8 @@ csvFileUrl2 <- rAzureBatch::createBlobUrl(storageAccount = storageAccountName, # Create a list of files to download to the cluster using read-only permissions # Place the files in a directory called 'data' resource_files = list( - rAzureBatch::createResourceFile(url = csvFileUrl1, fileName = "data/1989.csv"), - rAzureBatch::createResourceFile(url = csvFileUrl2, fileName = "data/1990.csv") + rAzureBatch::createResourceFile(httpUrl = csvFileUrl1, filePath = "data/1989.csv"), + rAzureBatch::createResourceFile(httpUrl = csvFileUrl2, filePath = "data/1990.csv") ) # Create the cluster