This commit is contained in:
Brian 2017-10-19 09:47:18 -07:00 коммит произвёл GitHub
Родитель 40a2cf036a
Коммит 169e75f9b4
2 изменённых файлов: 160 добавлений и 151 удалений

Просмотреть файл

@ -253,174 +253,183 @@ setHttpTraffic <- function(value = FALSE) {
assign("enableCloudCombine", enableCloudCombine, envir = .doAzureBatchGlobals) assign("enableCloudCombine", enableCloudCombine, envir = .doAzureBatchGlobals)
assign("cloudCombine", cloudCombine, envir = .doAzureBatchGlobals) assign("cloudCombine", cloudCombine, envir = .doAzureBatchGlobals)
resourceFiles <- list()
if (!is.null(obj$options$azure$resourceFiles)) {
resourceFiles <- obj$options$azure$resourceFiles
}
if (!is.null(obj$options$azure$resourcefiles)) {
resourceFiles <- obj$options$azure$resourcefiles
}
enableCloudCombineKeyValuePair <-
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
chunkSize <- 1
if (!is.null(obj$options$azure$chunkSize)) {
chunkSize <- obj$options$azure$chunkSize
}
if (!is.null(obj$options$azure$chunksize)) {
chunkSize <- obj$options$azure$chunksize
}
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
}
chunkSizeKeyValuePair <-
list(name = "chunkSize", value = as.character(chunkSize))
if (is.null(obj$packages)) {
metadata <-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
} else {
packagesKeyValuePair <-
list(name = "packages",
value = paste(obj$packages, collapse = ";"))
metadata <-
list(enableCloudCombineKeyValuePair,
chunkSizeKeyValuePair,
packagesKeyValuePair)
}
retryCounter <- 0 retryCounter <- 0
maxRetryCount <- 5 maxRetryCount <- 5
startupFolderName <- "startup" startupFolderName <- "startup"
containerResponse <- NULL repeat {
jobquotaReachedResponse <- NULL if (retryCounter > maxRetryCount) {
while (retryCounter < maxRetryCount) { stop(
sprintf("job id is: %s", id) sprintf(
# try to submit the job. We may run into naming conflicts. If so, try again "Error creating job: Maximum number of retries (%d) exceeded",
tryCatch({ maxRetryCount
)
)
}
else {
retryCounter <- retryCounter + 1 retryCounter <- retryCounter + 1
}
response <- rAzureBatch::createContainer(id, content = "text") containerResponse <- rAzureBatch::createContainer(id, content = "response")
if (grepl("ContainerAlreadyExists", response)) {
if (!is.null(obj$options$azure$job)) {
containerResponse <- grepl("ContainerAlreadyExists", response)
break
} if (containerResponse$status_code >= 400 && containerResponse$status_code <= 499) {
containerContent <- xml2::as_list(httr::content(containerResponse))
stop("Container already exists. Multiple jobs may possibly be running.") if (!is.null(obj$options$azure$job) && containerContent$Code[[1]] == "ContainerAlreadyExists") {
stop(paste("Error creating job: Job's storage container already exists for an unique job id.",
"Either delete the storage container or change the job argument in the foreach."))
} }
rAzureBatch::uploadBlob(id, Sys.sleep(retryCounter * retryCounter)
system.file(startupFolderName, "worker.R", package = "doAzureParallel"))
rAzureBatch::uploadBlob(id,
system.file(startupFolderName, "merger.R", package = "doAzureParallel"))
rAzureBatch::uploadBlob(
id,
system.file(startupFolderName, "install_github.R", package = "doAzureParallel")
)
rAzureBatch::uploadBlob(
id,
system.file(startupFolderName, "install_cran.R", package = "doAzureParallel")
)
rAzureBatch::uploadBlob(
id,
system.file(startupFolderName, "install_bioconductor.R", package = "doAzureParallel")
)
# Setting up common job environment for all tasks
jobFileName <- paste0(id, ".rds")
saveRDS(.doAzureBatchGlobals, file = jobFileName)
rAzureBatch::uploadBlob(id, paste0(getwd(), "/", jobFileName))
file.remove(jobFileName)
resourceFiles <- list()
if (!is.null(obj$options$azure$resourceFiles)) {
resourceFiles <- obj$options$azure$resourceFiles
}
if (!is.null(obj$options$azure$resourcefiles)) {
resourceFiles <- obj$options$azure$resourcefiles
}
sasToken <- rAzureBatch::createSasToken("r", "c", id)
workerScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "worker.R", sasToken)
mergerScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "merger.R", sasToken)
installGithubScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name,
id,
"install_github.R",
sasToken)
installCranScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_cran.R", sasToken)
installBioConductorScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_bioconductor.R", sasToken)
jobCommonFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, jobFileName, sasToken)
requiredJobResourceFiles <- list(
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
)
# We need to merge any files passed by the calling lib with the resource files specified here
resourceFiles <-
append(resourceFiles, requiredJobResourceFiles)
enableCloudCombineKeyValuePair <-
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
chunkSize <- 1
if (!is.null(obj$options$azure$chunkSize)) {
chunkSize <- obj$options$azure$chunkSize
}
if (!is.null(obj$options$azure$chunksize)) {
chunkSize <- obj$options$azure$chunksize
}
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
}
chunkSizeKeyValuePair <-
list(name = "chunkSize", value = as.character(chunkSize))
if (is.null(obj$packages)) {
metadata <-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
} else {
packagesKeyValuePair <-
list(name = "packages",
value = paste(obj$packages, collapse = ";"))
metadata <-
list(enableCloudCombineKeyValuePair,
chunkSizeKeyValuePair,
packagesKeyValuePair)
}
response <- .addJob(
jobId = id,
poolId = data$poolId,
resourceFiles = resourceFiles,
metadata = metadata,
packages = obj$packages
)
if (grepl("ActiveJobAndScheduleQuotaReached", response)) {
jobquotaReachedResponse <-
grepl("ActiveJobAndScheduleQuotaReached", response)
}
if (grepl("JobExists", response)) {
stop("The specified job already exists.")
}
break
},
error = function(e) {
if (retryCounter == maxRetryCount) {
cat(sprintf("Error creating job: %s\n",
conditionMessage(e)))
}
print(e)
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT") time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
id <- sprintf("%s%s", id <- sprintf("%s%s",
"job", "job",
time) time)
}) next
} }
else if (containerResponse$status_code >= 500 && containerResponse$status_code <= 599) {
containerContent <- xml2::as_list(httr::content(containerResponse))
stop(paste0("Error creating job: ", containerContent$message$value))
}
if (!is.null(containerResponse)) { # Uploading common job files for the worker node
stop( rAzureBatch::uploadBlob(id,
"Aborted mission. The container has already exist with user's specific job id. Please use a different job id." system.file(startupFolderName, "worker.R", package = "doAzureParallel"))
rAzureBatch::uploadBlob(id,
system.file(startupFolderName, "merger.R", package = "doAzureParallel"))
rAzureBatch::uploadBlob(
id,
system.file(startupFolderName, "install_github.R", package = "doAzureParallel")
)
rAzureBatch::uploadBlob(
id,
system.file(startupFolderName, "install_cran.R", package = "doAzureParallel")
)
rAzureBatch::uploadBlob(
id,
system.file(startupFolderName, "install_bioconductor.R", package = "doAzureParallel")
) )
}
if (!is.null(jobquotaReachedResponse)) { # Creating common job environment for all tasks
stop( jobFileName <- paste0(id, ".rds")
paste0( saveRDS(.doAzureBatchGlobals, file = jobFileName)
"Aborted mission. Your active job quota has been reached. To increase your active job quota, ", rAzureBatch::uploadBlob(id, paste0(getwd(), "/", jobFileName))
"go to https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit" file.remove(jobFileName)
)
# Creating read-only SAS token blob resource file urls
sasToken <- rAzureBatch::createSasToken("r", "c", id)
workerScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "worker.R", sasToken)
mergerScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "merger.R", sasToken)
installGithubScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name,
id,
"install_github.R",
sasToken)
installCranScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_cran.R", sasToken)
installBioConductorScriptUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_bioconductor.R", sasToken)
jobCommonFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, id, jobFileName, sasToken)
requiredJobResourceFiles <- list(
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
) )
resourceFiles <-
append(resourceFiles, requiredJobResourceFiles)
response <- .addJob(
jobId = id,
poolId = data$poolId,
resourceFiles = resourceFiles,
metadata = metadata,
packages = obj$packages
)
if (response$status_code == 201) {
break
}
else {
jobContent <- httr::content(response, content = "parsed")
if (jobContent$code == "JobExists" && !is.null(obj$options$azure$job)) {
stop(paste("Error in creating job: Job already exists with the unique job id.",
"Either delete the job or change the job argument in the foreach loop.",
jobContent$message$value))
}
else if (jobContent$code == "JobExists") {
Sys.sleep(retryCounter * retryCounter)
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
id <- sprintf("%s%s",
"job",
time)
next
}
if (jobContent$code == "ActiveJobAndScheduleQuotaReached") {
stop(
paste(
"Error in creating job: Your active job quota has been reached.",
"To increase your active job quota,",
"go to https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit"
)
)
}
stop("Error in creating job: ", jobContent$message$value)
}
} }
cat("Job Summary: ", fill = TRUE) cat("Job Summary: ", fill = TRUE)

Просмотреть файл

@ -169,7 +169,7 @@
poolInfo = poolInfo, poolInfo = poolInfo,
jobPreparationTask = jobPreparationTask, jobPreparationTask = jobPreparationTask,
usesTaskDependencies = usesTaskDependencies, usesTaskDependencies = usesTaskDependencies,
content = "text", content = "response",
metadata = metadata metadata = metadata
) )