зеркало из https://github.com/Azure/AzureRMR.git
let pool fns fall back to sequential
This commit is contained in:
Родитель
3f139277bf
Коммит
54020361c5
65
R/pool.R
65
R/pool.R
|
@ -1,3 +1,6 @@
|
|||
.AzureR <- new.env()
|
||||
|
||||
|
||||
#' Manage parallel Azure connections
|
||||
#'
|
||||
#' @param size For `init_pool`, the number of background R processes to create. Limit this is you are low on memory.
|
||||
|
@ -12,14 +15,14 @@
|
|||
#' - `delete_pool` shuts down the background processes and deletes the pool.
|
||||
#' - `pool_exists` checks for the existence of the pool, returning a TRUE/FALSE value.
|
||||
#' - `pool_size` returns the size of the pool, or zero if the pool does not exist.
|
||||
#' - `pool_export` exports variables to the pool nodes. It calls `parallel::clusterExport` with the given arguments.
|
||||
#' - `pool_lapply`, `pool_sapply` and `pool_map` carry out work on the pool. They call `parallel::parLapply`, `parallel::parSapply` and `parallel::clusterMap` with the given arguments.
|
||||
#' - `pool_call` and `pool_evalq` execute code on the pool nodes. They call `parallel::clusterCall` and `parallel::clusterEvalQ` with the given arguments.
|
||||
#' - `pool_export` exports variables to the pool nodes. It calls `parallel::clusterExport` with the given arguments if the pool exists; otherwise it does nothing.
|
||||
#' - `pool_lapply`, `pool_sapply` and `pool_map` carry out work on the pool. They call `parallel::parLapply`, `parallel::parSapply` and `parallel::clusterMap` respectively if the pool exists, or `lapply`, `sapply` and `mapply` otherwise.
|
||||
#' - `pool_call` and `pool_evalq` execute code on the pool nodes. They call `parallel::clusterCall` and `parallel::clusterEvalQ` respectively if the pool exists, or the function `func` directly and `evalq` otherwise.
|
||||
#'
|
||||
#' The pool is persistent for the session or until terminated by `delete_pool`. You should initialise the pool by calling `init_pool` before running any code on it. This restores the original state of the pool nodes by removing any objects that may be in memory, and resetting the working directory to the master working directory.
|
||||
#'
|
||||
#' @seealso
|
||||
#' [parallel::makeCluster], [parallel::clusterCall], [parallel::parLapply]
|
||||
#' [parallel::makeCluster], [parallel::clusterCall], [parallel::parLapply], [lapply], [mapply]
|
||||
#' @examples
|
||||
#' \dontrun{
|
||||
#'
|
||||
|
@ -88,9 +91,9 @@ pool_exists <- function()
|
|||
#' @export
|
||||
pool_size <- function()
|
||||
{
|
||||
if(!pool_exists())
|
||||
return(0)
|
||||
length(.AzureR$pool)
|
||||
if(pool_exists())
|
||||
length(.AzureR$pool)
|
||||
else 0
|
||||
}
|
||||
|
||||
|
||||
|
@ -98,44 +101,49 @@ pool_size <- function()
|
|||
#' @export
|
||||
pool_export <- function(...)
|
||||
{
|
||||
pool_check()
|
||||
parallel::clusterExport(cl=.AzureR$pool, ...)
|
||||
if(pool_exists())
|
||||
parallel::clusterExport(cl=.AzureR$pool, ...)
|
||||
else invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname pool
|
||||
#' @export
|
||||
pool_lapply <- function(...)
|
||||
pool_lapply <- function(X, func, ...)
|
||||
{
|
||||
pool_check()
|
||||
parallel::parLapply(cl=.AzureR$pool, ...)
|
||||
if(pool_exists())
|
||||
parallel::parLapply(cl=.AzureR$pool, X=X, fun=func, ...)
|
||||
else lapply(X=X, FUN=func, ...)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname pool
|
||||
#' @export
|
||||
pool_sapply <- function(...)
|
||||
pool_sapply <- function(X, func, ...)
|
||||
{
|
||||
pool_check()
|
||||
parallel::parSapply(cl=.AzureR$pool, ...)
|
||||
if(pool_exists())
|
||||
parallel::parSapply(cl=.AzureR$pool, X=X, FUN=func, ...)
|
||||
else sapply(X=X, FUN=func, ...)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname pool
|
||||
#' @export
|
||||
pool_map <- function(...)
|
||||
pool_map <- function(func, ..., SIMPLIFY=FALSE)
|
||||
{
|
||||
pool_check()
|
||||
parallel::clusterMap(cl=.AzureR$pool, ...)
|
||||
if(pool_exists())
|
||||
parallel::clusterMap(cl=.AzureR$pool, func, ..., SIMPLIFY=SIMPLIFY)
|
||||
else mapply(func, ..., SIMPLIFY=SIMPLIFY)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname pool
|
||||
#' @export
|
||||
pool_call <- function(...)
|
||||
pool_call <- function(func, ...)
|
||||
{
|
||||
pool_check()
|
||||
parallel::clusterCall(cl=.AzureR$pool, ...)
|
||||
if(pool_exists())
|
||||
parallel::clusterCall(cl=.AzureR$pool, func, ...)
|
||||
else func(...)
|
||||
}
|
||||
|
||||
|
||||
|
@ -143,16 +151,7 @@ pool_call <- function(...)
|
|||
#' @export
|
||||
pool_evalq <- function(...)
|
||||
{
|
||||
pool_check()
|
||||
parallel::clusterEvalQ(cl=.AzureR$pool, ...)
|
||||
}
|
||||
|
||||
|
||||
.AzureR <- new.env()
|
||||
|
||||
|
||||
pool_check <- function()
|
||||
{
|
||||
if(!pool_exists())
|
||||
stop("AzureR pool does not exist; call init_pool() to create it", call.=FALSE)
|
||||
if(pool_exists())
|
||||
parallel::clusterEvalQ(cl=.AzureR$pool, ...)
|
||||
else evalq(..., envir=parent.frame())
|
||||
}
|
||||
|
|
16
man/pool.Rd
16
man/pool.Rd
|
@ -23,13 +23,13 @@ pool_size()
|
|||
|
||||
pool_export(...)
|
||||
|
||||
pool_lapply(...)
|
||||
pool_lapply(X, func, ...)
|
||||
|
||||
pool_sapply(...)
|
||||
pool_sapply(X, func, ...)
|
||||
|
||||
pool_map(...)
|
||||
pool_map(func, ..., SIMPLIFY = FALSE)
|
||||
|
||||
pool_call(...)
|
||||
pool_call(func, ...)
|
||||
|
||||
pool_evalq(...)
|
||||
}
|
||||
|
@ -52,9 +52,9 @@ A small API consisting of the following functions is currently provided for mana
|
|||
\item \code{delete_pool} shuts down the background processes and deletes the pool.
|
||||
\item \code{pool_exists} checks for the existence of the pool, returning a TRUE/FALSE value.
|
||||
\item \code{pool_size} returns the size of the pool, or zero if the pool does not exist.
|
||||
\item \code{pool_export} exports variables to the pool nodes. It calls \code{parallel::clusterExport} with the given arguments.
|
||||
\item \code{pool_lapply}, \code{pool_sapply} and \code{pool_map} carry out work on the pool. They call \code{parallel::parLapply}, \code{parallel::parSapply} and \code{parallel::clusterMap} with the given arguments.
|
||||
\item \code{pool_call} and \code{pool_evalq} execute code on the pool nodes. They call \code{parallel::clusterCall} and \code{parallel::clusterEvalQ} with the given arguments.
|
||||
\item \code{pool_export} exports variables to the pool nodes. It calls \code{parallel::clusterExport} with the given arguments if the pool exists; otherwise it does nothing.
|
||||
\item \code{pool_lapply}, \code{pool_sapply} and \code{pool_map} carry out work on the pool. They call \code{parallel::parLapply}, \code{parallel::parSapply} and \code{parallel::clusterMap} respectively if the pool exists, or \code{lapply}, \code{sapply} and \code{mapply} otherwise.
|
||||
\item \code{pool_call} and \code{pool_evalq} execute code on the pool nodes. They call \code{parallel::clusterCall} and \code{parallel::clusterEvalQ} respectively if the pool exists, or the function \code{func} directly and \code{evalq} otherwise.
|
||||
}
|
||||
|
||||
The pool is persistent for the session or until terminated by \code{delete_pool}. You should initialise the pool by calling \code{init_pool} before running any code on it. This restores the original state of the pool nodes by removing any objects that may be in memory, and resetting the working directory to the master working directory.
|
||||
|
@ -79,5 +79,5 @@ delete_pool()
|
|||
}
|
||||
}
|
||||
\seealso{
|
||||
\link[parallel:makeCluster]{parallel::makeCluster}, \link[parallel:clusterCall]{parallel::clusterCall}, \link[parallel:parLapply]{parallel::parLapply}
|
||||
\link[parallel:makeCluster]{parallel::makeCluster}, \link[parallel:clusterCall]{parallel::clusterCall}, \link[parallel:parLapply]{parallel::parLapply}, \link{lapply}, \link{mapply}
|
||||
}
|
||||
|
|
|
@ -5,8 +5,6 @@ skip_on_cran()
|
|||
test_that("Background process pool works",
|
||||
{
|
||||
expect_false(pool_exists())
|
||||
expect_error(AzureRMR:::pool_check())
|
||||
expect_error(pool_sapply(1:5, function(x) x))
|
||||
|
||||
init_pool(5)
|
||||
expect_true(pool_exists())
|
||||
|
@ -34,3 +32,27 @@ test_that("Background process pool works",
|
|||
delete_pool()
|
||||
expect_false(pool_exists())
|
||||
})
|
||||
|
||||
|
||||
test_that("Pool functionality falls back correctly if pool doesn't exist",
|
||||
{
|
||||
expect_false(pool_exists())
|
||||
|
||||
res <- pool_sapply(1:5, function(x) x)
|
||||
expect_identical(res, 1:5)
|
||||
|
||||
res2 <- pool_lapply(1:5, function(x) x)
|
||||
expect_identical(res2, list(1L, 2L, 3L, 4L, 5L))
|
||||
|
||||
res3 <- pool_map(function(x, y) x + y, 1:5, 2)
|
||||
expect_identical(res3, list(3, 4, 5, 6, 7))
|
||||
|
||||
y <- 42
|
||||
pool_export("y", environment())
|
||||
res <- pool_sapply(1:5, function(x) y)
|
||||
expect_identical(res, rep(42, 5))
|
||||
|
||||
rm(y)
|
||||
expect_false(pool_evalq(exists("y")))
|
||||
expect_error(pool_sapply(1:5, function(x) y))
|
||||
})
|
||||
|
|
Загрузка…
Ссылка в новой задаче