fix bug in metadata handling for packages and enableCloudCombine (#133)

* fix bug in metadata handling for packages and enableCloudCombine

* call long running job api in test

* update test

* add test for long running job feature

* code style fix

* update job state description in readme

* use list for job state filter
This commit is contained in:
zfengms 2017-09-30 21:45:26 -07:00 коммит произвёл Brian
Родитель a25c73581d
Коммит 7ec1306aeb
6 изменённых файлов: 96 добавлений и 34 удалений

Просмотреть файл

@ -335,8 +335,8 @@ setHttpTraffic <- function(value = FALSE) {
resourceFiles <-
append(resourceFiles, requiredJobResourceFiles)
enableCloudCombine <-
list(name = "enableCloudCombine", value = "TRUE")
enableCloudCombineKeyValuePair <-
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
chunkSize <- 1
@ -352,13 +352,22 @@ setHttpTraffic <- function(value = FALSE) {
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
}
chunkSizeKeyValuePair <-
list(name = "chunkSize", value = as.character(chunkSize))
if (is.null(obj$packages)) {
metadata <- list(enableCloudCombine, chunkSizeKeyValuePair)
metadata <-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
} else {
metadata <- list(enableCloudCombine, chunkSizeKeyValuePair, obj$packages)
packagesKeyValuePair <-
list(name = "packages",
value = paste(obj$packages, collapse = ";"))
metadata <-
list(enableCloudCombineKeyValuePair,
chunkSizeKeyValuePair,
packagesKeyValuePair)
}
response <- .addJob(

Просмотреть файл

@ -63,8 +63,15 @@ getJobList <- function(filter = NULL) {
if (!is.null(filter)) {
if (!is.null(filter$state)) {
for (i in 1:length(filter$state)) {
filterClause <-
paste0(filterClause,
sprintf("state eq '%s'", filter$state[i]),
" or ")
}
filterClause <-
paste0(filterClause, sprintf("state eq '%s'", filter$state))
substr(filterClause, 1, nchar(filterClause) - 3)
}
}
@ -78,6 +85,9 @@ getJobList <- function(filter = NULL) {
totalTasks <- integer(length(jobs$value))
if (length(jobs$value) > 0) {
if (is.null(jobs$value[[1]]$id)) {
stop(jobs$value)
}
for (j in 1:length(jobs$value)) {
id[j] <- jobs$value[[j]]$id
state[j] <- jobs$value[[j]]$state
@ -456,9 +466,9 @@ waitForTasksToComplete <-
tasksFailureWarningLabel <-
sprintf(paste("%i task(s) failed while running the job.",
"This caused the job to terminate automatically.",
"To disable this behavior and continue on failure, set .errorHandling='remove | pass'",
"in the foreach loop\n"), taskCounts$failed)
"This caused the job to terminate automatically.",
"To disable this behavior and continue on failure, set .errorHandling='remove | pass'",
"in the foreach loop\n"), taskCounts$failed)
for (i in 1:length(failedTasks$value)) {
@ -477,11 +487,11 @@ waitForTasksToComplete <-
stop(sprintf(
paste("Errors have occurred while running the job '%s'.",
"Error handling is set to 'stop' and has proceeded to terminate the job.",
"The user will have to handle deleting the job.",
"If this is not the correct behavior, change the errorHandling property to 'pass'",
" or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.",
"For more information about getting job logs, follow this link:",
"Error handling is set to 'stop' and has proceeded to terminate the job.",
"The user will have to handle deleting the job.",
"If this is not the correct behavior, change the errorHandling property to 'pass'",
" or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.",
"For more information about getting job logs, follow this link:",
paste0("https://github.com/Azure/doAzureParallel/blob/master/docs/",
"40-troubleshooting.md#viewing-files-directly-from-compute-node")),
jobId
@ -490,11 +500,11 @@ waitForTasksToComplete <-
if (Sys.time() > timeToTimeout) {
stop(sprintf(paste("Timeout has occurred while waiting for tasks to complete.",
"Users will have to manually track the job '%s' and get the results.",
"Use the getJobResults function to obtain the results and getJobList for",
"tracking job status. To change the timeout, set 'timeout' property in the",
"Users will have to manually track the job '%s' and get the results.",
"Use the getJobResults function to obtain the results and getJobList for",
"tracking job status. To change the timeout, set 'timeout' property in the",
"foreach's options.azure.")),
jobId)
jobId)
}
if (taskCounts$completed >= totalTasks &&

Просмотреть файл

@ -295,12 +295,12 @@ job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { .
results <- getJobResult(job_id)
```
Finally, you may also want to track the status of jobs by state (active/running/completed/failed/succeeded):
Finally, you may also want to track the status of jobs by state (active, completed etc):
```R
# List jobs in running state:
# List jobs in completed state:
filter <- list()
filter$state <- "running"
filter$state <- list("active", "completed")
jobList <- getJobList(filter)
View(jobList)
```
@ -388,4 +388,4 @@ Note: User defined functions for the merge task is on our list of features that
## Next Steps
For more information, please visit [our documentation](./docs/README.md).
For more information, please visit [our documentation](./docs/README.md).

Просмотреть файл

@ -2,24 +2,15 @@
% Please edit documentation in R/utility.R
\name{createOutputFile}
\alias{createOutputFile}
\alias{createOutputFile}
\title{Utility function for creating an output file}
\usage{
createOutputFile(filePattern, url)
createOutputFile(filePattern, url)
}
\arguments{
\item{filePattern}{a pattern indicating which file(s) to upload}
\item{url}{the destination blob or virtual directory within the Azure Storage container}
\item{filePattern}{a pattern indicating which file(s) to upload}
\item{url}{the destination blob or virtual directory within the Azure Storage container}
}
\description{
Utility function for creating an output file
Utility function for creating an output file
}

Просмотреть файл

@ -2,13 +2,10 @@
% Please edit documentation in R/utility.R
\name{waitForTasksToComplete}
\alias{waitForTasksToComplete}
\alias{waitForTasksToComplete}
\title{Wait for current tasks to complete}
\usage{
waitForTasksToComplete(jobId, timeout, errorhandling = "stop")
waitForTasksToComplete(jobId, timeout, errorHandling = "stop")
}
\description{
Wait for current tasks to complete
Wait for current tasks to complete
}

Просмотреть файл

@ -0,0 +1,55 @@
# Run this test for users to make sure the long running job feature
# of doAzureParallel are still working
context("long running job scenario test")
test_that("Long Running Job Test", {
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"
doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
# set your credentials
doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)
opt <- list(wait = FALSE)
'%dopar%' <- foreach::'%dopar%'
res <-
foreach::foreach(
i = 1:4,
.packages = c('httr'),
.options.azure = opt
) %dopar% {
mean(1:3)
}
job <- getJob(res)
# get active/running job list
filter <- filter <- list()
filter$state <- list("active")
getJobList(filter)
# get job list for all jobs
getJobList()
# wait 2 minutes for job to finish
Sys.sleep(120)
# get job result
jobResult <- getJobResult(res)
doAzureParallel::stopCluster(cluster)
# verify the job result is correct
testthat::expect_equal(length(jobResult),
4)
testthat::expect_equal(jobResult,
list(2, 2, 2, 2))
# delete the job
rAzureBatch::deleteJob(res)
})