diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index df1a03d..bce97ee 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -335,8 +335,8 @@ setHttpTraffic <- function(value = FALSE) { resourceFiles <- append(resourceFiles, requiredJobResourceFiles) - enableCloudCombine <- - list(name = "enableCloudCombine", value = "TRUE") + enableCloudCombineKeyValuePair <- + list(name = "enableCloudCombine", value = as.character(enableCloudCombine)) chunkSize <- 1 @@ -352,13 +352,22 @@ setHttpTraffic <- function(value = FALSE) { chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) } + chunkSizeKeyValuePair <- list(name = "chunkSize", value = as.character(chunkSize)) if (is.null(obj$packages)) { - metadata <- list(enableCloudCombine, chunkSizeKeyValuePair) + metadata <- + list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) } else { - metadata <- list(enableCloudCombine, chunkSizeKeyValuePair, obj$packages) + packagesKeyValuePair <- + list(name = "packages", + value = paste(obj$packages, collapse = ";")) + + metadata <- + list(enableCloudCombineKeyValuePair, + chunkSizeKeyValuePair, + packagesKeyValuePair) } response <- .addJob( diff --git a/R/utility.R b/R/utility.R index 7f039d4..47560e7 100644 --- a/R/utility.R +++ b/R/utility.R @@ -63,8 +63,15 @@ getJobList <- function(filter = NULL) { if (!is.null(filter)) { if (!is.null(filter$state)) { + for (i in 1:length(filter$state)) { + filterClause <- + paste0(filterClause, + sprintf("state eq '%s'", filter$state[i]), + " or ") + } + filterClause <- - paste0(filterClause, sprintf("state eq '%s'", filter$state)) + substr(filterClause, 1, nchar(filterClause) - 3) } } @@ -78,6 +85,9 @@ getJobList <- function(filter = NULL) { totalTasks <- integer(length(jobs$value)) if (length(jobs$value) > 0) { + if (is.null(jobs$value[[1]]$id)) { + stop(jobs$value) + } for (j in 1:length(jobs$value)) { id[j] <- jobs$value[[j]]$id state[j] <- jobs$value[[j]]$state @@ -456,9 +466,9 @@ waitForTasksToComplete <- tasksFailureWarningLabel <- sprintf(paste("%i task(s) failed while running the job.", - "This caused the job to terminate automatically.", - "To disable this behavior and continue on failure, set .errorHandling='remove | pass'", - "in the foreach loop\n"), taskCounts$failed) + "This caused the job to terminate automatically.", + "To disable this behavior and continue on failure, set .errorHandling='remove | pass'", + "in the foreach loop\n"), taskCounts$failed) for (i in 1:length(failedTasks$value)) { @@ -477,11 +487,11 @@ waitForTasksToComplete <- stop(sprintf( paste("Errors have occurred while running the job '%s'.", - "Error handling is set to 'stop' and has proceeded to terminate the job.", - "The user will have to handle deleting the job.", - "If this is not the correct behavior, change the errorHandling property to 'pass'", - " or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.", - "For more information about getting job logs, follow this link:", + "Error handling is set to 'stop' and has proceeded to terminate the job.", + "The user will have to handle deleting the job.", + "If this is not the correct behavior, change the errorHandling property to 'pass'", + " or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.", + "For more information about getting job logs, follow this link:", paste0("https://github.com/Azure/doAzureParallel/blob/master/docs/", "40-troubleshooting.md#viewing-files-directly-from-compute-node")), jobId @@ -490,11 +500,11 @@ waitForTasksToComplete <- if (Sys.time() > timeToTimeout) { stop(sprintf(paste("Timeout has occurred while waiting for tasks to complete.", - "Users will have to manually track the job '%s' and get the results.", - "Use the getJobResults function to obtain the results and getJobList for", - "tracking job status. To change the timeout, set 'timeout' property in the", + "Users will have to manually track the job '%s' and get the results.", + "Use the getJobResults function to obtain the results and getJobList for", + "tracking job status. To change the timeout, set 'timeout' property in the", "foreach's options.azure.")), - jobId) + jobId) } if (taskCounts$completed >= totalTasks && diff --git a/README.md b/README.md index 74b737a..7d655a9 100644 --- a/README.md +++ b/README.md @@ -295,12 +295,12 @@ job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { . results <- getJobResult(job_id) ``` -Finally, you may also want to track the status of jobs by state (active/running/completed/failed/succeeded): +Finally, you may also want to track the status of jobs by state (active, completed etc): ```R -# List jobs in running state: +# List jobs in completed state: filter <- list() -filter$state <- "running" +filter$state <- list("active", "completed") jobList <- getJobList(filter) View(jobList) ``` @@ -388,4 +388,4 @@ Note: User defined functions for the merge task is on our list of features that ## Next Steps -For more information, please visit [our documentation](./docs/README.md). \ No newline at end of file +For more information, please visit [our documentation](./docs/README.md). diff --git a/man/createOutputFile.Rd b/man/createOutputFile.Rd index 4a51a50..1869b28 100644 --- a/man/createOutputFile.Rd +++ b/man/createOutputFile.Rd @@ -2,24 +2,15 @@ % Please edit documentation in R/utility.R \name{createOutputFile} \alias{createOutputFile} -\alias{createOutputFile} \title{Utility function for creating an output file} \usage{ -createOutputFile(filePattern, url) - createOutputFile(filePattern, url) } \arguments{ \item{filePattern}{a pattern indicating which file(s) to upload} -\item{url}{the destination blob or virtual directory within the Azure Storage container} - -\item{filePattern}{a pattern indicating which file(s) to upload} - \item{url}{the destination blob or virtual directory within the Azure Storage container} } \description{ -Utility function for creating an output file - Utility function for creating an output file } diff --git a/man/waitForTasksToComplete.Rd b/man/waitForTasksToComplete.Rd index 9d373cf..d220732 100644 --- a/man/waitForTasksToComplete.Rd +++ b/man/waitForTasksToComplete.Rd @@ -2,13 +2,10 @@ % Please edit documentation in R/utility.R \name{waitForTasksToComplete} \alias{waitForTasksToComplete} -\alias{waitForTasksToComplete} \title{Wait for current tasks to complete} \usage{ -waitForTasksToComplete(jobId, timeout, errorhandling = "stop") +waitForTasksToComplete(jobId, timeout, errorHandling = "stop") } \description{ -Wait for current tasks to complete - Wait for current tasks to complete } diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R new file mode 100644 index 0000000..0dac053 --- /dev/null +++ b/tests/testthat/test-long-running-job.R @@ -0,0 +1,55 @@ +# Run this test for users to make sure the long running job feature +# of doAzureParallel are still working +context("long running job scenario test") +test_that("Long Running Job Test", { + testthat::skip_on_travis() + credentialsFileName <- "credentials.json" + clusterFileName <- "cluster.json" + + doAzureParallel::generateCredentialsConfig(credentialsFileName) + doAzureParallel::generateClusterConfig(clusterFileName) + + # set your credentials + doAzureParallel::setCredentials(credentialsFileName) + cluster <- doAzureParallel::makeCluster(clusterFileName) + doAzureParallel::registerDoAzureParallel(cluster) + + opt <- list(wait = FALSE) + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach( + i = 1:4, + .packages = c('httr'), + .options.azure = opt + ) %dopar% { + mean(1:3) + } + + job <- getJob(res) + + # get active/running job list + filter <- filter <- list() + filter$state <- list("active") + getJobList(filter) + + # get job list for all jobs + getJobList() + + # wait 2 minutes for job to finish + Sys.sleep(120) + + # get job result + jobResult <- getJobResult(res) + + doAzureParallel::stopCluster(cluster) + + # verify the job result is correct + testthat::expect_equal(length(jobResult), + 4) + + testthat::expect_equal(jobResult, + list(2, 2, 2, 2)) + + # delete the job + rAzureBatch::deleteJob(res) +})