* parallel upload for blobs

* more parallel blob transfer

* another header fix

* document

* blob download to memory

* all download to memory

* add multiple transfer tests

* cluster -> pool

* file multitransfer

* filetune file multitransfer

* fixing adls multi

* adls multi transfer working

* check for no files

* doc fixup

* more doc fixup

* yet more doc fixup
This commit is contained in:
Hong Ooi 2019-01-20 00:09:10 +11:00 коммит произвёл GitHub
Родитель 9a883bb125
Коммит 0f304abc70
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 950 добавлений и 186 удалений

Просмотреть файл

@ -14,6 +14,7 @@ Depends:
R (>= 3.3),
Imports:
utils,
parallel,
R6,
httr,
mime,

Просмотреть файл

@ -59,6 +59,7 @@ export(delete_azure_file)
export(delete_blob)
export(delete_blob_container)
export(delete_file_share)
export(delete_pool)
export(download_adls_file)
export(download_azure_file)
export(download_blob)
@ -69,12 +70,19 @@ export(get_azure_dir_properties)
export(get_azure_file_properties)
export(get_blob_properties)
export(get_storage_properties)
export(init_pool)
export(list_adls_files)
export(list_adls_filesystems)
export(list_azure_files)
export(list_blob_containers)
export(list_blobs)
export(list_file_shares)
export(multidownload_adls_file)
export(multidownload_azure_file)
export(multidownload_blob)
export(multiupload_adls_file)
export(multiupload_azure_file)
export(multiupload_blob)
export(release_lease)
export(renew_lease)
export(storage_endpoint)
@ -84,5 +92,6 @@ export(upload_blob)
export(upload_to_url)
import(AzureRMR)
importFrom(utils,URLencode)
importFrom(utils,glob2rx)
importFrom(utils,modifyList)
importFrom(utils,packageVersion)

Просмотреть файл

@ -1,5 +1,58 @@
#' @import AzureRMR
#' @importFrom utils URLencode modifyList packageVersion
#' @importFrom utils URLencode modifyList packageVersion glob2rx
NULL
globalVariables("self", "AzureStor")
globalVariables(c("self", "pool"), "AzureStor")
.AzureStor <- new.env()
.onLoad <- function(libname, pkgname)
{
# .AzureStor$azcopy <- find_azcopy()
# all methods extending classes in external package must go in .onLoad
add_methods()
}
.onUnload <- function(libpath)
{
if(exists("pool", envir=.AzureStor))
try(parallel::stopCluster(.AzureStor$pool), silent=TRUE)
}
# .onAttach <- function(libname, pkgname)
# {
# if(.AzureStor$azcopy != "")
# packageStartupMessage("azcopy version 10+ binary found at ", .AzureStor$azcopy)
# }
# find_azcopy <- function()
# {
# path <- Sys.which("azcopy")
# if(path != "")
# {
# # we need version 10 or later
# ver <- system2(path, "--version", stdout=TRUE)
# if(!grepl("version 1[[:digit:]]", ver, ignore.case=TRUE))
# path <- ""
# }
# unname(path)
# }
# set_azcopy_path <- function(path)
# {
# if(Sys.which(path) == "")
# stop("azcopy binary not found")
# ver <- system2(path, "--version", stdout=TRUE)
# if(!grepl("version 1[[:digit:]]", ver, ignore.case=TRUE))
# stop("azcopy version 10+ required but not found")
# .AzureStor$azcopy <- path
# invisible(path)
# }

Просмотреть файл

@ -148,8 +148,7 @@ NULL
NULL
# all methods extending classes in external package must go in .onLoad
.onLoad <- function(libname, pkgname)
add_methods <- function()
{
api <- "2018-03-28"
adls_api <- "2018-06-17"

Просмотреть файл

@ -7,11 +7,11 @@
#' @param api_version If an endpoint object is not supplied, the storage API version to use when interacting with the host. Currently defaults to `"2018-06-17"`.
#' @param name The name of the filesystem to get, create, or delete.
#' @param confirm For deleting a filesystem, whether to ask for confirmation.
#' @param x For the print method, a file share object.
#' @param x For the print method, a filesystem object.
#' @param ... Further arguments passed to lower-level functions.
#'
#' @details
#' You can call these functions in a couple of ways: by passing the full URL of the share, or by passing the endpoint object and the name of the share as a string.
#' You can call these functions in a couple of ways: by passing the full URL of the filesystem, or by passing the endpoint object and the name of the filesystem as a string.
#'
#' If authenticating via AAD, you can supply the token either as a string, or as an object of class [AzureRMR::AzureToken], created via [AzureRMR::get_azure_token]. The latter is the recommended way of doing it, as it allows for automatic refreshing of expired tokens.
#'
@ -213,11 +213,22 @@ delete_adls_filesystem.adls_endpoint <- function(endpoint, name, confirm=TRUE, .
#' @param blocksize The number of bytes to upload per HTTP(S) request.
#' @param lease The lease for a file, if present.
#' @param overwrite When downloading, whether to overwrite an existing destination file.
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R. Not yet implemented.
#' @param max_concurrent_transfers For `multiupload_adls_file` and `multidownload_adls_file`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
#' @param recursive For `list_adls_files`, and `delete_adls_dir`, whether the operation should recurse through subdirectories. For `delete_adls_dir`, this must be TRUE to delete a non-empty directory.
#'
#' @details
#' `upload_adls_file` and `download_adls_file` are the workhorse file transfer functions for ADLSgen2 storage. They each take as inputs a _single_ filename or connection as the source for uploading/downloading, and a single filename as the destination.
#'
#' `multiupload_adls_file` and `multidownload_adls_file` are functions for uploading and downloading _multiple_ files at once. They parallelise file transfers by deploying a pool of R processes in the background, which can lead to significantly greater efficiency when transferring many small files. They take as input a _wildcard_ pattern as the source, which expands to one or more files. The `dest` argument should be a directory.
#'
#' The file transfer functions also support working with connections to allow transferring R objects without creating temporary files. For uploading, `src` can be a [textConnection] or [rawConnection] object. For downloading, `dest` can be NULL or a `rawConnection` object. In the former case, the downloaded data is returned as a raw vector, and for the latter, it will be placed into the connection. See the examples below.
#'
#' @return
#' For `list_adls_files`, if `info="name"`, a vector of file/directory names. If `info="all"`, a data frame giving the file size and whether each object is a file or directory.
#'
#' For `download_adls_file`, if `dest=NULL`, the contents of the downloaded file as a raw vector.
#'
#' @seealso
#' [adls_filesystem], [az_storage]
#'
@ -236,6 +247,10 @@ delete_adls_filesystem.adls_endpoint <- function(endpoint, name, confirm=TRUE, .
#' delete_adls_file(fs, "/newdir/bigfile.zip")
#' delete_adls_dir(fs, "/newdir")
#'
#' # uploading/downloading multiple files at once
#' multiupload_adls_file(fs, "/data/logfiles/*.zip")
#' multidownload_adls_file(fs, "/monthly/jan*.*", "/data/january")
#'
#' # uploading serialized R objects via connections
#' json <- jsonlite::toJSON(iris, pretty=TRUE, auto_unbox=TRUE)
#' con <- textConnection(json)
@ -245,6 +260,14 @@ delete_adls_filesystem.adls_endpoint <- function(endpoint, name, confirm=TRUE, .
#' con <- rawConnection(rds)
#' upload_adls_file(fs, con, "iris.rds")
#'
#' # downloading files into memory: as a raw vector, and via a connection
#' rawvec <- download_adls_file(fs, "iris.json", NULL)
#' rawToChar(rawvec)
#'
#' con <- rawConnection(raw(0), "r+")
#' download_adls_file(fs, "iris.json", con)
#' unserialize(con)
#'
#' }
#' @rdname adls
#' @export
@ -296,56 +319,47 @@ list_adls_files <- function(filesystem, dir="/", info=c("all", "name"),
#' @rdname adls
#' @export
upload_adls_file <- function(filesystem, src, dest, blocksize=2^24, lease=NULL)
multiupload_adls_file <- function(filesystem, src, dest, blocksize=2^22, lease=NULL,
use_azcopy=FALSE,
max_concurrent_transfers=10)
{
con <- if(inherits(src, "textConnection"))
rawConnection(charToRaw(paste0(readLines(src), collapse="\n")))
else if(inherits(src, "rawConnection"))
src
else file(src, open="rb")
on.exit(close(con))
# create the file
content_type <- if(inherits(src, "connection"))
"application/octet-stream"
else mime::guess_type(src)
headers <- list(`x-ms-content-type`=content_type)
#if(!is.null(lease))
#headers[["x-ms-lease-id"]] <- as.character(lease)
do_container_op(filesystem, dest, options=list(resource="file"), headers=headers, http_verb="PUT")
# transfer the contents
blocklist <- list()
pos <- 0
while(1)
{
body <- readBin(con, "raw", blocksize)
thisblock <- length(body)
if(thisblock == 0)
break
headers <- list(
`content-type`="application/octet-stream",
`content-length`=sprintf("%.0f", thisblock)
)
opts <- list(action="append", position=sprintf("%.0f", pos))
do_container_op(filesystem, dest, options=opts, headers=headers, body=body, http_verb="PATCH")
pos <- pos + thisblock
}
# flush contents
do_container_op(filesystem, dest,
options=list(action="flush", position=sprintf("%.0f", pos)),
http_verb="PATCH")
if(use_azcopy)
call_azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease)
else multiupload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease,
max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname adls
#' @export
download_adls_file <- function(filesystem, src, dest, overwrite=FALSE)
upload_adls_file <- function(filesystem, src, dest, blocksize=2^24, lease=NULL, use_azcopy=FALSE)
{
do_container_op(filesystem, src, config=httr::write_disk(dest, overwrite))
if(use_azcopy)
call_azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease)
else upload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease)
}
#' @rdname adls
#' @export
multidownload_adls_file <- function(filesystem, src, dest, overwrite=FALSE,
use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
call_azcopy_upload(filesystem, src, dest, overwrite=overwrite)
else multidownload_adls_file_internal(filesystem, src, dest, overwrite=overwrite,
max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname adls
#' @export
download_adls_file <- function(filesystem, src, dest, overwrite=FALSE, use_azcopy=FALSE)
{
if(use_azcopy)
call_azcopy_download(filesystem, src, dest, overwrite=overwrite)
else download_adls_file_internal(filesystem, src, dest, overwrite=overwrite)
}

119
R/adls_transfer_internal.R Normal file
Просмотреть файл

@ -0,0 +1,119 @@
multiupload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^22, lease=lease,
max_concurrent_transfers=10)
{
src_files <- glob2rx(basename(src))
src_dir <- dirname(src)
src <- dir(src_dir, pattern=src_files, full.names=TRUE)
if(length(src) == 0)
stop("No files to transfer", call.=FALSE)
if(length(src) == 1)
return(upload_adls_file(filesystem, src, dest, blocksize=blocksize, lease=lease))
init_pool(max_concurrent_transfers)
parallel::clusterExport(.AzureStor$pool,
c("filesystem", "dest", "blocksize"),
envir=environment())
parallel::parLapply(.AzureStor$pool, src, function(f)
{
dest <- sub("//", "/", file.path(dest, basename(f))) # API too dumb to handle //'s
AzureStor::upload_adls_file(filesystem, f, dest, blocksize=blocksize, lease=lease)
})
invisible(NULL)
}
upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lease=NULL)
{
con <- if(inherits(src, "textConnection"))
rawConnection(charToRaw(paste0(readLines(src), collapse="\n")))
else if(inherits(src, "rawConnection"))
src
else file(src, open="rb")
on.exit(close(con))
# create the file
content_type <- if(inherits(src, "connection"))
"application/octet-stream"
else mime::guess_type(src)
headers <- list(`x-ms-content-type`=content_type)
#if(!is.null(lease))
#headers[["x-ms-lease-id"]] <- as.character(lease)
do_container_op(filesystem, dest, options=list(resource="file"), headers=headers, http_verb="PUT")
# transfer the contents
blocklist <- list()
pos <- 0
while(1)
{
body <- readBin(con, "raw", blocksize)
thisblock <- length(body)
if(thisblock == 0)
break
headers <- list(
`content-type`="application/octet-stream",
`content-length`=sprintf("%.0f", thisblock)
)
opts <- list(action="append", position=sprintf("%.0f", pos))
do_container_op(filesystem, dest, options=opts, headers=headers, body=body, http_verb="PATCH")
pos <- pos + thisblock
}
# flush contents
do_container_op(filesystem, dest,
options=list(action="flush", position=sprintf("%.0f", pos)),
http_verb="PATCH")
}
multidownload_adls_file_internal <- function(filesystem, src, dest, overwrite=FALSE, max_concurrent_transfers=10)
{
src_dir <- dirname(src)
if(src_dir == ".")
src_dir <- "/"
files <- list_adls_files(filesystem, src_dir, info="name")
src <- grep(glob2rx(src), files, value=TRUE) # file listing on ADLS includes directory name
if(length(src) == 0)
stop("No files to transfer", call.=FALSE)
if(length(src) == 1)
return(download_adls_file(filesystem, src, dest, overwrite=overwrite))
init_pool(max_concurrent_transfers)
parallel::clusterExport(.AzureStor$pool,
c("filesystem", "dest", "overwrite"),
envir=environment())
parallel::parLapply(.AzureStor$pool, src, function(f)
{
dest <- file.path(dest, basename(f))
writeLines(dest, file.path("d:/misc/temp", basename(f)))
AzureStor::download_adls_file(filesystem, f, dest, overwrite=overwrite)
})
invisible(NULL)
}
download_adls_file_internal <- function(filesystem, src, dest, overwrite=FALSE)
{
if(is.character(dest))
return(do_container_op(filesystem, src, config=httr::write_disk(dest, overwrite)))
# if dest is NULL or a raw connection, return the transferred data in memory as raw bytes
cont <- httr::content(do_container_op(filesystem, src, http_status_handler="pass"),
as="raw")
if(is.null(dest))
return(cont)
if(inherits(dest, "rawConnection"))
{
writeBin(cont, dest)
seek(dest, 0)
invisible(NULL)
}
else stop("Unrecognised dest argument", call.=FALSE)
}

Просмотреть файл

@ -218,27 +218,38 @@ delete_blob_container.blob_endpoint <- function(endpoint, name, confirm=TRUE, le
}
#' Operations on a blob container
#' Operations on a blob container or blob
#'
#' Upload, download, or delete a blob; list blobs in a container.
#'
#' @param container A blob container object.
#' @param blob A string naming a blob.
#' @param src,dest The source and destination files for uploading and downloading. For uploading, `src` can also be a [textConnection] or [rawConnection] object to allow transferring in-memory R objects without creating a temporary file.
#' @param src,dest The source and destination files for uploading and downloading. See 'Details' below.For uploading, `src` can also be a [textConnection] or [rawConnection] object to allow transferring in-memory R objects without creating a temporary file. For downloading,
#' @param info For `list_blobs`, level of detail about each blob to return: a vector of names only; the name, size and last-modified date (default); or all information.
#' @param confirm Whether to ask for confirmation on deleting a blob.
#' @param blocksize The number of bytes to upload per HTTP(S) request.
#' @param lease The lease for a blob, if present.
#' @param type When uploading, the type of blob to create. Currently only block blobs are supported.
#' @param overwrite When downloading, whether to overwrite an existing destination file.
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R. Not yet implemented.
#' @param max_concurrent_transfers For `multiupload_blob` and `multidownload_blob`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
#' @param prefix For `list_blobs`, filters the result to return only blobs whose name begins with this prefix.
#'
#' @details
#' `upload_blob` and `download_blob` are the workhorse file transfer functions for blobs. They each take as inputs a _single_ filename or connection as the source for uploading/downloading, and a single filename as the destination.
#'
#' `multiupload_blob` and `multidownload_blob` are functions for uploading and downloading _multiple_ blobs at once. They parallelise file transfers by deploying a pool of R processes in the background, which can lead to significantly greater efficiency when transferring many small files. They take as input a wildcard pattern as the source, which expands to one or more files. The `dest` argument should be a directory for downloading, and is not used for uploading.
#'
#' The file transfer functions also support working with connections to allow transferring R objects without creating temporary files. For uploading, `src` can be a [textConnection] or [rawConnection] object. For downloading, `dest` can be NULL or a `rawConnection` object. In the former case, the downloaded data is returned as a raw vector, and for the latter, it will be placed into the connection. See the examples below.
#'
#' @return
#' For `list_blobs`, details on the blobs in the container.
#' For `list_blobs`, details on the blobs in the container. For `download_blob`, if `dest=NULL`, the contents of the downloaded blob as a raw vector.
#'
#' @seealso
#' [blob_container], [az_storage]
#'
#' [AzCopy version 10 on GitHub](https://github.com/Azure/azure-storage-azcopy)
#'
#' @examples
#' \dontrun{
#'
@ -251,6 +262,10 @@ delete_blob_container.blob_endpoint <- function(endpoint, name, confirm=TRUE, le
#'
#' delete_blob(cont, "bigfile.zip")
#'
#' # uploading/downloading multiple files at once
#' multiupload_blob(cont, "/data/logfiles/*.zip")
#' multidownload_blob(cont, "jan*.*", "/data/january")
#'
#' # uploading serialized R objects via connections
#' json <- jsonlite::toJSON(iris, pretty=TRUE, auto_unbox=TRUE)
#' con <- textConnection(json)
@ -260,6 +275,14 @@ delete_blob_container.blob_endpoint <- function(endpoint, name, confirm=TRUE, le
#' con <- rawConnection(rds)
#' upload_blob(cont, con, "iris.rds")
#'
#' # downloading files into memory: as a raw vector, and via a connection
#' rawvec <- download_blob(cont, "iris.json", NULL)
#' rawToChar(rawvec)
#'
#' con <- rawConnection(raw(0), "r+")
#' download_blob(cont, "iris.json", con)
#' unserialize(con)
#'
#' }
#' @rdname blob
#' @export
@ -306,66 +329,45 @@ list_blobs <- function(container, info=c("partial", "name", "all"),
#' @rdname blob
#' @export
upload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL)
upload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL,
use_azcopy=FALSE)
{
if(type != "BlockBlob")
stop("Only block blobs currently supported")
content_type <- if(inherits(src, "connection"))
"application/octet-stream"
else mime::guess_type(src)
headers <- list("x-ms-blob-type"=type)
if(!is.null(lease))
headers[["x-ms-lease-id"]] <- as.character(lease)
con <- if(inherits(src, "textConnection"))
rawConnection(charToRaw(paste0(readLines(src), collapse="\n")))
else if(inherits(src, "rawConnection"))
src
else file(src, open="rb")
on.exit(close(con))
# upload each block
blocklist <- list()
i <- 1
while(1)
{
body <- readBin(con, "raw", blocksize)
thisblock <- length(body)
if(thisblock == 0)
break
# ensure content-length is never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
id <- openssl::base64_encode(sprintf("%s-%010d", dest, i))
opts <- list(comp="block", blockid=id)
do_container_op(container, dest, headers=headers, body=body, options=opts, http_verb="PUT")
blocklist <- c(blocklist, list(Latest=list(id)))
i <- i + 1
}
# update block list
body <- as.character(xml2::as_xml_document(list(BlockList=blocklist)))
headers <- list("content-length"=sprintf("%.0f", nchar(body)))
do_container_op(container, dest, headers=headers, body=body, options=list(comp="blocklist"),
http_verb="PUT")
# set content type
do_container_op(container, dest, headers=list("x-ms-blob-content-type"=content_type),
options=list(comp="properties"),
http_verb="PUT")
if(use_azcopy)
call_azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease)
else upload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease)
}
#' @rdname blob
#' @export
download_blob <- function(container, src, dest, overwrite=FALSE, lease=NULL)
multiupload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL,
use_azcopy=FALSE,
max_concurrent_transfers=10)
{
headers <- list()
if(!is.null(lease))
headers[["x-ms-lease-id"]] <- as.character(lease)
do_container_op(container, src, headers=headers, config=httr::write_disk(dest, overwrite))
if(use_azcopy)
call_azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease)
else multiupload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease,
max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname blob
#' @export
download_blob <- function(container, src, dest, overwrite=FALSE, lease=NULL, use_azcopy=FALSE)
{
if(use_azcopy)
call_azcopy_download(container, src, dest, overwrite=overwrite, lease=lease)
else download_blob_internal(container, src, dest, overwrite=overwrite, lease=lease)
}
#' @rdname blob
#' @export
multidownload_blob <- function(container, src, dest, overwrite=FALSE, lease=NULL,
use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
call_azcopy_download(container, src, dest, overwrite=overwrite, lease=lease)
else multidownload_blob_internal(container, src, dest, overwrite=overwrite, lease=lease,
max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname blob

147
R/blob_transfer_internal.R Normal file
Просмотреть файл

@ -0,0 +1,147 @@
multiupload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL,
max_concurrent_transfers=10)
{
src_dir <- dirname(src)
src_files <- glob2rx(basename(src))
src <- dir(src_dir, pattern=src_files, full.names=TRUE)
if(length(src) == 0)
stop("No files to transfer", call.=FALSE)
if(length(src) == 1)
return(upload_blob(container, src, dest, type=type, blocksize=blocksize, lease=lease))
if(!missing(dest))
warning("Internal multiupload_blob implementation does not use the 'dest' argument")
init_pool(max_concurrent_transfers)
parallel::clusterExport(.AzureStor$pool,
c("container", "type", "blocksize", "lease"),
envir=environment())
parallel::parLapply(.AzureStor$pool, src, function(f)
{
AzureStor::upload_blob(container, f, basename(f), type=type, blocksize=blocksize, lease=lease)
})
invisible(NULL)
}
upload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL)
{
if(type != "BlockBlob")
stop("Only block blobs currently supported")
content_type <- if(inherits(src, "connection"))
"application/octet-stream"
else mime::guess_type(src)
headers <- list("x-ms-blob-type"=type)
if(!is.null(lease))
headers[["x-ms-lease-id"]] <- as.character(lease)
con <- if(inherits(src, "textConnection"))
rawConnection(charToRaw(paste0(readLines(src), collapse="\n")))
else if(inherits(src, "rawConnection"))
src
else file(src, open="rb")
on.exit(close(con))
# upload each block
blocklist <- list()
i <- 1
while(1)
{
body <- readBin(con, "raw", blocksize)
thisblock <- length(body)
if(thisblock == 0)
break
# ensure content-length is never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
id <- openssl::base64_encode(sprintf("%s-%010d", dest, i))
opts <- list(comp="block", blockid=id)
do_container_op(container, dest, headers=headers, body=body, options=opts, http_verb="PUT")
blocklist <- c(blocklist, list(Latest=list(id)))
i <- i + 1
}
# update block list
body <- as.character(xml2::as_xml_document(list(BlockList=blocklist)))
headers <- list("content-length"=sprintf("%.0f", nchar(body)))
do_container_op(container, dest, headers=headers, body=body, options=list(comp="blocklist"),
http_verb="PUT")
# set content type
do_container_op(container, dest, headers=list("x-ms-blob-content-type"=content_type),
options=list(comp="properties"),
http_verb="PUT")
}
multidownload_blob_internal <- function(container, src, dest, overwrite=FALSE, lease=NULL,
max_concurrent_transfers=10)
{
files <- list_blobs(container, info="name")
src_files <- glob2rx(basename(src))
src <- grep(src_files, files, value=TRUE)
if(length(src) == 0)
stop("No files to transfer", call.=FALSE)
if(length(src) == 1)
return(download_blob(container, src, dest, overwrite=overwrite, lease=lease))
init_pool(max_concurrent_transfers)
parallel::clusterExport(.AzureStor$pool,
c("container", "dest", "overwrite", "lease"),
envir=environment())
parallel::parLapply(.AzureStor$pool, src, function(f)
{
dest <- file.path(dest, basename(f))
AzureStor::download_blob(container, f, dest, overwrite=overwrite, lease=lease)
})
invisible(NULL)
}
download_blob_internal <- function(container, src, dest, overwrite=FALSE, lease=NULL)
{
headers <- list()
if(!is.null(lease))
headers[["x-ms-lease-id"]] <- as.character(lease)
if(is.character(dest))
return(do_container_op(container, src, headers=headers, config=httr::write_disk(dest, overwrite)))
# if dest is NULL or a raw connection, return the transferred data in memory as raw bytes
cont <- httr::content(do_container_op(container, src, headers=headers, http_status_handler="pass"),
as="raw")
if(is.null(dest))
return(cont)
if(inherits(dest, "rawConnection"))
{
writeBin(cont, dest)
seek(dest, 0)
invisible(NULL)
}
else stop("Unrecognised dest argument", call.=FALSE)
}
call_azcopy_upload <- function(...)
{
if(.AzureStor$azcopy == "")
stop("azcopy version 10+ required but not found")
else stop("Not yet implemented")
}
call_azcopy_download <- function(...)
{
if(.AzureStor$azcopy == "")
stop("azcopy version 10+ required but not found")
else stop("Not yet implemented")
}

Просмотреть файл

@ -198,14 +198,27 @@ delete_file_share.file_endpoint <- function(endpoint, name, confirm=TRUE, ...)
#' @param confirm Whether to ask for confirmation on deleting a file or directory.
#' @param blocksize The number of bytes to upload per HTTP(S) request.
#' @param overwrite When downloading, whether to overwrite an existing destination file.
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R. Not yet implemented.
#' @param max_concurrent_transfers For `multiupload_azure_file` and `multidownload_azure_file`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
#' @param prefix For `list_azure_files`, filters the result to return only files and directories whose name begins with this prefix.
#'
#' @details
#' `upload_azure_file` and `download_azure_file` are the workhorse file transfer functions for file storage. They each take as inputs a _single_ filename or connection as the source for uploading/downloading, and a single filename as the destination.
#'
#' `multiupload_azure_file` and `multidownload_azure_file` are functions for uploading and downloading _multiple_ files at once. They parallelise file transfers by deploying a pool of R processes in the background, which can lead to significantly greater efficiency when transferring many small files. They take as input a _wildcard_ pattern as the source, which expands to one or more files. The `dest` argument should be a directory.
#'
#' The file transfer functions also support working with connections to allow transferring R objects without creating temporary files. For uploading, `src` can be a [textConnection] or [rawConnection] object. For downloading, `dest` can be NULL or a `rawConnection` object. In the former case, the downloaded data is returned as a raw vector, and for the latter, it will be placed into the connection. See the examples below.
#'
#' @return
#' For `list_azure_files`, if `info="name"`, a vector of file/directory names. If `info="all"`, a data frame giving the file size and whether each object is a file or directory.
#'
#' For `download_azure_file`, if `dest=NULL`, the contents of the downloaded file as a raw vector.
#'
#' @seealso
#' [file_share], [az_storage]
#'
#' [AzCopy version 10 on GitHub](https://github.com/Azure/azure-storage-azcopy)
#'
#' @examples
#' \dontrun{
#'
@ -221,6 +234,10 @@ delete_file_share.file_endpoint <- function(endpoint, name, confirm=TRUE, ...)
#' delete_azure_file(share, "/newdir/bigfile.zip")
#' delete_azure_dir(share, "/newdir")
#'
#' # uploading/downloading multiple files at once
#' multiupload_azure_file(share, "/data/logfiles/*.zip")
#' multidownload_azure_file(share, "/monthly/jan*.*", "/data/january")
#'
#' # uploading serialized R objects via connections
#' json <- jsonlite::toJSON(iris, pretty=TRUE, auto_unbox=TRUE)
#' con <- textConnection(json)
@ -230,6 +247,14 @@ delete_file_share.file_endpoint <- function(endpoint, name, confirm=TRUE, ...)
#' con <- rawConnection(rds)
#' upload_azure_file(share, con, "iris.rds")
#'
#' # downloading files into memory: as a raw vector, and via a connection
#' rawvec <- download_azure_file(share, "iris.json", NULL)
#' rawToChar(rawvec)
#'
#' con <- rawConnection(raw(0), "r+")
#' download_azure_file(share, "iris.json", con)
#' unserialize(con)
#'
#' }
#' @rdname file
#' @export
@ -250,7 +275,8 @@ list_azure_files <- function(share, dir, info=c("all", "name"),
type <- if(is_empty(name)) character(0) else names(name)
size <- vapply(lst$Entries,
function(ent) if(is_empty(ent$Properties)) NA_character_ else ent$Properties$`Content-Length`[[1]],
function(ent) if(is_empty(ent$Properties)) NA_character_
else ent$Properties$`Content-Length`[[1]],
FUN.VALUE=character(1))
data.frame(name=name, type=type, size=as.numeric(size), stringsAsFactors=FALSE)
@ -258,80 +284,44 @@ list_azure_files <- function(share, dir, info=c("all", "name"),
#' @rdname file
#' @export
upload_azure_file <- function(share, src, dest, blocksize=2^22)
upload_azure_file <- function(share, src, dest, blocksize=2^22, use_azcopy=FALSE)
{
# set content type
content_type <- if(inherits(src, "connection"))
"application/octet-stream"
else mime::guess_type(src)
if(inherits(src, "textConnection"))
{
src <- charToRaw(paste0(readLines(src), collapse="\n"))
nbytes <- length(src)
con <- rawConnection(src)
}
else if(inherits(src, "rawConnection"))
{
con <- src
# need to read the data to get object size (!)
nbytes <- 0
repeat
{
x <- readBin(con, "raw", n=blocksize)
if(length(x) == 0)
break
nbytes <- nbytes + length(x)
}
seek(con, 0) # reposition connection after reading
}
else
{
con <- file(src, open="rb")
nbytes <- file.info(src)$size
}
on.exit(close(con))
# first, create the file
# ensure content-length is never exponential notation
headers <- list("x-ms-type"="file",
"x-ms-content-length"=sprintf("%.0f", nbytes))
do_container_op(share, dest, headers=headers, http_verb="PUT")
# then write the bytes into it, one block at a time
options <- list(comp="range")
headers <- list("x-ms-write"="Update")
# upload each block
blocklist <- list()
range_begin <- 0
while(range_begin < nbytes)
{
body <- readBin(con, "raw", blocksize)
thisblock <- length(body)
if(thisblock == 0) # sanity check
break
# ensure content-length and range are never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
headers[["range"]] <- sprintf("bytes=%.0f-%.0f", range_begin, range_begin + thisblock - 1)
do_container_op(share, dest, headers=headers, body=body, options=options, http_verb="PUT")
range_begin <- range_begin + thisblock
}
do_container_op(share, dest, headers=list("x-ms-content-type"=content_type),
options=list(comp="properties"),
http_verb="PUT")
invisible(NULL)
if(use_azcopy)
call_azcopy_upload(share, src, dest, blocksize=blocksize)
else upload_azure_file_internal(share, src, dest, blocksize=blocksize)
}
#' @rdname file
#' @export
download_azure_file <- function(share, src, dest, overwrite=FALSE)
multiupload_azure_file <- function(share, src, dest, blocksize=2^22,
use_azcopy=FALSE,
max_concurrent_transfers=10)
{
do_container_op(share, src, config=httr::write_disk(dest, overwrite))
if(use_azcopy)
call_azcopy_upload(share, src, dest, blocksize=blocksize)
else multiupload_azure_file_internal(share, src, dest, blocksize=blocksize,
max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname file
#' @export
download_azure_file <- function(share, src, dest, overwrite=FALSE, use_azcopy=FALSE)
{
if(use_azcopy)
call_azcopy_download(share, src, dest, overwrite=overwrite)
else download_azure_file_internal(share, src, dest, overwrite=overwrite)
}
#' @rdname file
#' @export
multidownload_azure_file <- function(share, src, dest, overwrite=FALSE,
use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
call_azcopy_download(share, src, dest, overwrite=overwrite)
else multidownload_azure_file_internal(share, src, dest, overwrite=overwrite,
max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname file

143
R/file_transfer_internal.R Normal file
Просмотреть файл

@ -0,0 +1,143 @@
multiupload_azure_file_internal <- function(share, src, dest, blocksize=2^22, max_concurrent_transfers=10)
{
src_dir <- dirname(src)
src_files <- glob2rx(basename(src))
src <- dir(src_dir, pattern=src_files, full.names=TRUE)
if(length(src) == 0)
stop("No files to transfer", call.=FALSE)
if(length(src) == 1)
return(upload_azure_file(share, src, dest, blocksize=blocksize))
init_pool(max_concurrent_transfers)
parallel::clusterExport(.AzureStor$pool,
c("share", "dest", "blocksize"),
envir=environment())
parallel::parLapply(.AzureStor$pool, src, function(f)
{
dest <- sub("//", "/", file.path(dest, basename(f))) # API too dumb to handle //'s
AzureStor::upload_azure_file(share, f, dest, blocksize=blocksize)
})
invisible(NULL)
}
upload_azure_file_internal <- function(share, src, dest, blocksize=2^22)
{
# set content type
content_type <- if(inherits(src, "connection"))
"application/octet-stream"
else mime::guess_type(src)
if(inherits(src, "textConnection"))
{
src <- charToRaw(paste0(readLines(src), collapse="\n"))
nbytes <- length(src)
con <- rawConnection(src)
}
else if(inherits(src, "rawConnection"))
{
con <- src
# need to read the data to get object size (!)
nbytes <- 0
repeat
{
x <- readBin(con, "raw", n=blocksize)
if(length(x) == 0)
break
nbytes <- nbytes + length(x)
}
seek(con, 0) # reposition connection after reading
}
else
{
con <- file(src, open="rb")
nbytes <- file.info(src)$size
}
on.exit(close(con))
# first, create the file
# ensure content-length is never exponential notation
headers <- list("x-ms-type"="file",
"x-ms-content-length"=sprintf("%.0f", nbytes))
do_container_op(share, dest, headers=headers, http_verb="PUT")
# then write the bytes into it, one block at a time
options <- list(comp="range")
headers <- list("x-ms-write"="Update")
# upload each block
blocklist <- list()
range_begin <- 0
while(range_begin < nbytes)
{
body <- readBin(con, "raw", blocksize)
thisblock <- length(body)
if(thisblock == 0) # sanity check
break
# ensure content-length and range are never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
headers[["range"]] <- sprintf("bytes=%.0f-%.0f", range_begin, range_begin + thisblock - 1)
do_container_op(share, dest, headers=headers, body=body, options=options, http_verb="PUT")
range_begin <- range_begin + thisblock
}
do_container_op(share, dest, headers=list("x-ms-content-type"=content_type),
options=list(comp="properties"),
http_verb="PUT")
invisible(NULL)
}
multidownload_azure_file_internal <- function(share, src, dest, overwrite=FALSE, max_concurrent_transfers=10)
{
src_files <- glob2rx(basename(src))
src_dir <- dirname(src)
if(src_dir == ".")
src_dir <- "/"
files <- list_azure_files(share, src_dir, info="name")
src <- sub("//", "/", file.path(src_dir, grep(src_files, files, value=TRUE)))
if(length(src) == 0)
stop("No files to transfer", call.=FALSE)
if(length(src) == 1)
return(download_azure_file(share, src, dest, overwrite=overwrite))
init_pool(max_concurrent_transfers)
parallel::clusterExport(.AzureStor$pool,
c("share", "dest", "overwrite"),
envir=environment())
parallel::parLapply(.AzureStor$pool, src, function(f)
{
dest <- file.path(dest, basename(f))
AzureStor::download_azure_file(share, f, dest, overwrite=overwrite)
})
invisible(NULL)
}
download_azure_file_internal <- function(share, src, dest, overwrite=FALSE)
{
if(is.character(dest))
return(do_container_op(share, src, config=httr::write_disk(dest, overwrite)))
# if dest is NULL or a raw connection, return the transferred data in memory as raw bytes
cont <- httr::content(do_container_op(share, src, http_status_handler="pass"),
as="raw")
if(is.null(dest))
return(cont)
if(inherits(dest, "rawConnection"))
{
writeBin(cont, dest)
seek(dest, 0)
invisible(NULL)
}
else stop("Unrecognised dest argument", call.=FALSE)
}

52
R/pool.R Normal file
Просмотреть файл

@ -0,0 +1,52 @@
#' Parallelise multiple file transfers in the background
#'
#' @param max_concurrent_transfers The maximum number of concurrent file transfers to support, which translates into the number of background R processes to create. Each concurrent transfer requires a separate R process, so limit this is you are low on memory.
#' @param restart For `init_pool`, whether to terminate an already running pool first.
#' @param ... Other arguments passed on to `parallel::makeCluster`.
#'
#' @details
#' AzureStor can parallelise file transfers by utilizing a pool of R processes in the background. This often leads to significant speedups when transferring multiple small files. The pool is created by calling `init_pool`, or automatically the first time that a multiple file transfer is begun. It remains persistent for the session or until terminated by `delete_pool`.
#'
#' If `init_pool` is called and the current pool is smaller than `max_concurrent_transfers`, it is resized.
#'
#' @seealso
#' [multiupload_blob], [multidownload_blob], [parallel::makeCluster]
#' @rdname pool
#' @export
init_pool <- function(max_concurrent_transfers=10, restart=FALSE, ...)
{
if(restart)
delete_pool()
if(!exists("pool", envir=.AzureStor) || length(.AzureStor$pool) < max_concurrent_transfers)
{
delete_pool()
message("Creating background pool")
.AzureStor$pool <- parallel::makeCluster(max_concurrent_transfers)
parallel::clusterEvalQ(.AzureStor$pool, loadNamespace("AzureStor"))
}
else
{
# restore original state, set working directory to master working directory
parallel::clusterCall(.AzureStor$pool, function(wd)
{
setwd(wd)
rm(list=ls(all.names=TRUE), envir=.GlobalEnv)
}, wd=getwd())
}
invisible(NULL)
}
#' @rdname pool
#' @export
delete_pool <- function()
{
if(!exists("pool", envir=.AzureStor))
return()
message("Deleting background pool")
parallel::stopCluster(.AzureStor$pool)
rm(pool, envir=.AzureStor)
}

Просмотреть файл

@ -2,7 +2,9 @@
% Please edit documentation in R/adls_client_funcs.R
\name{list_adls_files}
\alias{list_adls_files}
\alias{multiupload_adls_file}
\alias{upload_adls_file}
\alias{multidownload_adls_file}
\alias{download_adls_file}
\alias{delete_adls_file}
\alias{create_adls_dir}
@ -12,9 +14,17 @@
list_adls_files(filesystem, dir = "/", info = c("all", "name"),
recursive = FALSE)
upload_adls_file(filesystem, src, dest, blocksize = 2^24, lease = NULL)
multiupload_adls_file(filesystem, src, dest, blocksize = 2^22,
lease = NULL, use_azcopy = FALSE, max_concurrent_transfers = 10)
download_adls_file(filesystem, src, dest, overwrite = FALSE)
upload_adls_file(filesystem, src, dest, blocksize = 2^24, lease = NULL,
use_azcopy = FALSE)
multidownload_adls_file(filesystem, src, dest, overwrite = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
download_adls_file(filesystem, src, dest, overwrite = FALSE,
use_azcopy = FALSE)
delete_adls_file(filesystem, file, confirm = TRUE)
@ -37,16 +47,29 @@ delete_adls_dir(filesystem, dir, recursive = FALSE, confirm = TRUE)
\item{lease}{The lease for a file, if present.}
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R. Not yet implemented.}
\item{max_concurrent_transfers}{For \code{multiupload_adls_file} and \code{multidownload_adls_file}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
\item{overwrite}{When downloading, whether to overwrite an existing destination file.}
\item{confirm}{Whether to ask for confirmation on deleting a file or directory.}
}
\value{
For \code{list_adls_files}, if \code{info="name"}, a vector of file/directory names. If \code{info="all"}, a data frame giving the file size and whether each object is a file or directory.
For \code{download_adls_file}, if \code{dest=NULL}, the contents of the downloaded file as a raw vector.
}
\description{
Upload, download, or delete a file; list files in a directory; create or delete directories.
}
\details{
\code{upload_adls_file} and \code{download_adls_file} are the workhorse file transfer functions for ADLSgen2 storage. They each take as inputs a \emph{single} filename or connection as the source for uploading/downloading, and a single filename as the destination.
\code{multiupload_adls_file} and \code{multidownload_adls_file} are functions for uploading and downloading \emph{multiple} files at once. They parallelise file transfers by deploying a pool of R processes in the background, which can lead to significantly greater efficiency when transferring many small files. They take as input a \emph{wildcard} pattern as the source, which expands to one or more files. The \code{dest} argument should be a directory.
The file transfer functions also support working with connections to allow transferring R objects without creating temporary files. For uploading, \code{src} can be a \link{textConnection} or \link{rawConnection} object. For downloading, \code{dest} can be NULL or a \code{rawConnection} object. In the former case, the downloaded data is returned as a raw vector, and for the latter, it will be placed into the connection. See the examples below.
}
\examples{
\dontrun{
@ -62,6 +85,10 @@ download_adls_file(fs, "/newdir/bigfile.zip", dest="~/bigfile_downloaded.zip")
delete_adls_file(fs, "/newdir/bigfile.zip")
delete_adls_dir(fs, "/newdir")
# uploading/downloading multiple files at once
multiupload_adls_file(fs, "/data/logfiles/*.zip")
multidownload_adls_file(fs, "/monthly/jan*.*", "/data/january")
# uploading serialized R objects via connections
json <- jsonlite::toJSON(iris, pretty=TRUE, auto_unbox=TRUE)
con <- textConnection(json)
@ -71,6 +98,14 @@ rds <- serialize(iris, NULL)
con <- rawConnection(rds)
upload_adls_file(fs, con, "iris.rds")
# downloading files into memory: as a raw vector, and via a connection
rawvec <- download_adls_file(fs, "iris.json", NULL)
rawToChar(rawvec)
con <- rawConnection(raw(0), "r+")
download_adls_file(fs, "iris.json", con)
unserialize(con)
}
}
\seealso{

Просмотреть файл

@ -68,7 +68,7 @@ delete_adls_filesystem(endpoint, ...)
\item{name}{The name of the filesystem to get, create, or delete.}
\item{x}{For the print method, a file share object.}
\item{x}{For the print method, a filesystem object.}
\item{confirm}{For deleting a filesystem, whether to ask for confirmation.}
}
@ -81,7 +81,7 @@ For \code{list_adls_filesystems}, a list of such objects.
Get, list, create, or delete ADLSgen2 filesystems. Currently (as of December 2018) ADLSgen2 is in general-access public preview.
}
\details{
You can call these functions in a couple of ways: by passing the full URL of the share, or by passing the endpoint object and the name of the share as a string.
You can call these functions in a couple of ways: by passing the full URL of the filesystem, or by passing the endpoint object and the name of the filesystem as a string.
If authenticating via AAD, you can supply the token either as a string, or as an object of class \link[AzureRMR:AzureToken]{AzureRMR::AzureToken}, created via \link[AzureRMR:get_azure_token]{AzureRMR::get_azure_token}. The latter is the recommended way of doing it, as it allows for automatic refreshing of expired tokens.

Просмотреть файл

@ -3,17 +3,27 @@
\name{list_blobs}
\alias{list_blobs}
\alias{upload_blob}
\alias{multiupload_blob}
\alias{download_blob}
\alias{multidownload_blob}
\alias{delete_blob}
\title{Operations on a blob container}
\title{Operations on a blob container or blob}
\usage{
list_blobs(container, info = c("partial", "name", "all"),
prefix = NULL)
upload_blob(container, src, dest, type = "BlockBlob", blocksize = 2^24,
lease = NULL)
lease = NULL, use_azcopy = FALSE)
download_blob(container, src, dest, overwrite = FALSE, lease = NULL)
multiupload_blob(container, src, dest, type = "BlockBlob",
blocksize = 2^24, lease = NULL, use_azcopy = FALSE,
max_concurrent_transfers = 10)
download_blob(container, src, dest, overwrite = FALSE, lease = NULL,
use_azcopy = FALSE)
multidownload_blob(container, src, dest, overwrite = FALSE,
lease = NULL, use_azcopy = FALSE, max_concurrent_transfers = 10)
delete_blob(container, blob, confirm = TRUE)
}
@ -24,7 +34,7 @@ delete_blob(container, blob, confirm = TRUE)
\item{prefix}{For \code{list_blobs}, filters the result to return only blobs whose name begins with this prefix.}
\item{src, dest}{The source and destination files for uploading and downloading. For uploading, \code{src} can also be a \link{textConnection} or \link{rawConnection} object to allow transferring in-memory R objects without creating a temporary file.}
\item{src, dest}{The source and destination files for uploading and downloading. See 'Details' below.For uploading, \code{src} can also be a \link{textConnection} or \link{rawConnection} object to allow transferring in-memory R objects without creating a temporary file. For downloading,}
\item{type}{When uploading, the type of blob to create. Currently only block blobs are supported.}
@ -32,6 +42,10 @@ delete_blob(container, blob, confirm = TRUE)
\item{lease}{The lease for a blob, if present.}
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R. Not yet implemented.}
\item{max_concurrent_transfers}{For \code{multiupload_blob} and \code{multidownload_blob}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
\item{overwrite}{When downloading, whether to overwrite an existing destination file.}
\item{blob}{A string naming a blob.}
@ -39,11 +53,18 @@ delete_blob(container, blob, confirm = TRUE)
\item{confirm}{Whether to ask for confirmation on deleting a blob.}
}
\value{
For \code{list_blobs}, details on the blobs in the container.
For \code{list_blobs}, details on the blobs in the container. For \code{download_blob}, if \code{dest=NULL}, the contents of the downloaded blob as a raw vector.
}
\description{
Upload, download, or delete a blob; list blobs in a container.
}
\details{
\code{upload_blob} and \code{download_blob} are the workhorse file transfer functions for blobs. They each take as inputs a \emph{single} filename or connection as the source for uploading/downloading, and a single filename as the destination.
\code{multiupload_blob} and \code{multidownload_blob} are functions for uploading and downloading \emph{multiple} blobs at once. They parallelise file transfers by deploying a pool of R processes in the background, which can lead to significantly greater efficiency when transferring many small files. They take as input a wildcard pattern as the source, which expands to one or more files. The \code{dest} argument should be a directory for downloading, and is not used for uploading.
The file transfer functions also support working with connections to allow transferring R objects without creating temporary files. For uploading, \code{src} can be a \link{textConnection} or \link{rawConnection} object. For downloading, \code{dest} can be NULL or a \code{rawConnection} object. In the former case, the downloaded data is returned as a raw vector, and for the latter, it will be placed into the connection. See the examples below.
}
\examples{
\dontrun{
@ -56,6 +77,10 @@ download_blob(cont, "bigfile.zip", dest="~/bigfile_downloaded.zip")
delete_blob(cont, "bigfile.zip")
# uploading/downloading multiple files at once
multiupload_blob(cont, "/data/logfiles/*.zip")
multidownload_blob(cont, "jan*.*", "/data/january")
# uploading serialized R objects via connections
json <- jsonlite::toJSON(iris, pretty=TRUE, auto_unbox=TRUE)
con <- textConnection(json)
@ -65,8 +90,18 @@ rds <- serialize(iris, NULL)
con <- rawConnection(rds)
upload_blob(cont, con, "iris.rds")
# downloading files into memory: as a raw vector, and via a connection
rawvec <- download_blob(cont, "iris.json", NULL)
rawToChar(rawvec)
con <- rawConnection(raw(0), "r+")
download_blob(cont, "iris.json", con)
unserialize(con)
}
}
\seealso{
\link{blob_container}, \link{az_storage}
\href{https://github.com/Azure/azure-storage-azcopy}{AzCopy version 10 on GitHub}
}

Просмотреть файл

@ -3,7 +3,9 @@
\name{list_azure_files}
\alias{list_azure_files}
\alias{upload_azure_file}
\alias{multiupload_azure_file}
\alias{download_azure_file}
\alias{multidownload_azure_file}
\alias{delete_azure_file}
\alias{create_azure_dir}
\alias{delete_azure_dir}
@ -11,9 +13,17 @@
\usage{
list_azure_files(share, dir, info = c("all", "name"), prefix = NULL)
upload_azure_file(share, src, dest, blocksize = 2^22)
upload_azure_file(share, src, dest, blocksize = 2^22,
use_azcopy = FALSE)
download_azure_file(share, src, dest, overwrite = FALSE)
multiupload_azure_file(share, src, dest, blocksize = 2^22,
use_azcopy = FALSE, max_concurrent_transfers = 10)
download_azure_file(share, src, dest, overwrite = FALSE,
use_azcopy = FALSE)
multidownload_azure_file(share, src, dest, overwrite = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
delete_azure_file(share, file, confirm = TRUE)
@ -34,16 +44,29 @@ delete_azure_dir(share, dir, confirm = TRUE)
\item{blocksize}{The number of bytes to upload per HTTP(S) request.}
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R. Not yet implemented.}
\item{max_concurrent_transfers}{For \code{multiupload_azure_file} and \code{multidownload_azure_file}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
\item{overwrite}{When downloading, whether to overwrite an existing destination file.}
\item{confirm}{Whether to ask for confirmation on deleting a file or directory.}
}
\value{
For \code{list_azure_files}, if \code{info="name"}, a vector of file/directory names. If \code{info="all"}, a data frame giving the file size and whether each object is a file or directory.
For \code{download_azure_file}, if \code{dest=NULL}, the contents of the downloaded file as a raw vector.
}
\description{
Upload, download, or delete a file; list files in a directory; create or delete directories.
}
\details{
\code{upload_azure_file} and \code{download_azure_file} are the workhorse file transfer functions for file storage. They each take as inputs a \emph{single} filename or connection as the source for uploading/downloading, and a single filename as the destination.
\code{multiupload_azure_file} and \code{multidownload_azure_file} are functions for uploading and downloading \emph{multiple} files at once. They parallelise file transfers by deploying a pool of R processes in the background, which can lead to significantly greater efficiency when transferring many small files. They take as input a \emph{wildcard} pattern as the source, which expands to one or more files. The \code{dest} argument should be a directory.
The file transfer functions also support working with connections to allow transferring R objects without creating temporary files. For uploading, \code{src} can be a \link{textConnection} or \link{rawConnection} object. For downloading, \code{dest} can be NULL or a \code{rawConnection} object. In the former case, the downloaded data is returned as a raw vector, and for the latter, it will be placed into the connection. See the examples below.
}
\examples{
\dontrun{
@ -59,6 +82,10 @@ download_azure_file(share, "/newdir/bigfile.zip", dest="~/bigfile_downloaded.zip
delete_azure_file(share, "/newdir/bigfile.zip")
delete_azure_dir(share, "/newdir")
# uploading/downloading multiple files at once
multiupload_azure_file(share, "/data/logfiles/*.zip")
multidownload_azure_file(share, "/monthly/jan*.*", "/data/january")
# uploading serialized R objects via connections
json <- jsonlite::toJSON(iris, pretty=TRUE, auto_unbox=TRUE)
con <- textConnection(json)
@ -68,8 +95,18 @@ rds <- serialize(iris, NULL)
con <- rawConnection(rds)
upload_azure_file(share, con, "iris.rds")
# downloading files into memory: as a raw vector, and via a connection
rawvec <- download_azure_file(share, "iris.json", NULL)
rawToChar(rawvec)
con <- rawConnection(raw(0), "r+")
download_azure_file(share, "iris.json", con)
unserialize(con)
}
}
\seealso{
\link{file_share}, \link{az_storage}
\href{https://github.com/Azure/azure-storage-azcopy}{AzCopy version 10 on GitHub}
}

29
man/pool.Rd Normal file
Просмотреть файл

@ -0,0 +1,29 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/pool.R
\name{init_pool}
\alias{init_pool}
\alias{delete_pool}
\title{Parallelise multiple file transfers in the background}
\usage{
init_pool(max_concurrent_transfers = 10, restart = FALSE, ...)
delete_pool()
}
\arguments{
\item{max_concurrent_transfers}{The maximum number of concurrent file transfers to support, which translates into the number of background R processes to create. Each concurrent transfer requires a separate R process, so limit this is you are low on memory.}
\item{restart}{For \code{init_pool}, whether to terminate an already running pool first.}
\item{...}{Other arguments passed on to \code{parallel::makeCluster}.}
}
\description{
Parallelise multiple file transfers in the background
}
\details{
AzureStor can parallelise file transfers by utilizing a pool of R processes in the background. This often leads to significant speedups when transferring multiple small files. The pool is created by calling \code{init_pool}, or automatically the first time that a multiple file transfer is begun. It remains persistent for the session or until terminated by \code{delete_pool}.
If \code{init_pool} is called and the current pool is smaller than \code{max_concurrent_transfers}, it is resized.
}
\seealso{
\link{multiupload_blob}, \link{multidownload_blob}, \link[parallel:makeCluster]{parallel::makeCluster}
}

Просмотреть файл

@ -126,6 +126,37 @@ test_that("Blob client interface works",
download_blob(cont, "iris.rds", con_dl2)
expect_identical(readBin("../resources/iris.rds", "raw", n=1e5), readBin(con_dl2, "raw", n=1e5))
# download to memory
rawvec <- download_blob(cont, "iris.rds", NULL)
iris2 <- unserialize(rawvec)
expect_identical(iris, iris2)
con <- rawConnection(raw(0), open="r+")
download_blob(cont, "iris.json", con)
iris3 <- as.data.frame(jsonlite::fromJSON(con))
expect_identical(iris, iris3)
# multiple file transfers
files <- lapply(1:10, function(f) paste0(sample(letters, 1000, replace=TRUE), collapse=""))
filenames <- sapply(1:10, function(n) file.path(tempdir(), sprintf("multitransfer_%d", n)))
suppressWarnings(file.remove(filenames))
mapply(writeLines, files, filenames)
multiupload_blob(cont, file.path(tempdir(), "multitransfer_*"))
expect_warning(multiupload_blob(cont, file.path(tempdir(), "multitransfer_*"), "newnames"))
dest_dir <- file.path(tempdir(), "blob_multitransfer")
suppressWarnings(unlink(dest_dir, recursive=TRUE))
dir.create(dest_dir)
multidownload_blob(cont, "multitransfer_*", dest_dir, overwrite=TRUE)
expect_true(all(sapply(filenames, function(f)
{
src <- readBin(f, "raw", n=1e5)
dest <- readBin(file.path(dest_dir, basename(f)), "raw", n=1e5)
identical(src, dest)
})))
# ways of deleting a container
delete_blob_container(cont, confirm=FALSE)
delete_blob_container(bl, "newcontainer2", confirm=FALSE)
@ -133,6 +164,8 @@ test_that("Blob client interface works",
delete_blob_container(paste0(bl$url, "newcontainer3"), key=bl$key, confirm=FALSE)
Sys.sleep(5)
expect_true(is_empty(list_blob_containers(bl)))
close(con)
})
rg$delete(confirm=FALSE)

Просмотреть файл

@ -131,6 +131,37 @@ test_that("File client interface works",
download_azure_file(share, "iris.rds", con_dl2)
expect_identical(readBin("../resources/iris.rds", "raw", n=1e5), readBin(con_dl2, "raw", n=1e5))
# download to memory
rawvec <- download_azure_file(share, "iris.rds", NULL)
iris2 <- unserialize(rawvec)
expect_identical(iris, iris2)
con <- rawConnection(raw(0), open="r+")
download_azure_file(share, "iris.json", con)
iris3 <- as.data.frame(jsonlite::fromJSON(con))
expect_identical(iris, iris3)
# multiple file transfers
files <- lapply(1:10, function(f) paste0(sample(letters, 1000, replace=TRUE), collapse=""))
filenames <- sapply(1:10, function(n) file.path(tempdir(), sprintf("multitransfer_%d", n)))
suppressWarnings(file.remove(filenames))
mapply(writeLines, files, filenames)
create_azure_dir(share, "multi")
multiupload_azure_file(share, file.path(tempdir(), "multitransfer_*"), "multi")
dest_dir <- file.path(tempdir(), "file_multitransfer")
suppressWarnings(unlink(dest_dir, recursive=TRUE))
dir.create(dest_dir)
multidownload_azure_file(share, "multi/multitransfer_*", dest_dir, overwrite=TRUE)
expect_true(all(sapply(filenames, function(f)
{
src <- readBin(f, "raw", n=1e5)
dest <- readBin(file.path(dest_dir, basename(f)), "raw", n=1e5)
identical(src, dest)
})))
# ways of deleting a share
delete_file_share(share, confirm=FALSE)
delete_file_share(fl, "newshare2", confirm=FALSE)
@ -138,6 +169,8 @@ test_that("File client interface works",
delete_file_share(paste0(fl$url, "newshare3"), key=fl$key, confirm=FALSE)
Sys.sleep(5)
expect_true(is_empty(list_file_shares(fl)))
close(con)
})
rg$delete(confirm=FALSE)

Просмотреть файл

@ -120,12 +120,45 @@ test_that("ADLSgen2 client interface works",
download_adls_file(fs, "iris.rds", con_dl2)
expect_identical(readBin("../resources/iris.rds", "raw", n=1e5), readBin(con_dl2, "raw", n=1e5))
# download to memory
rawvec <- download_adls_file(fs, "iris.rds", NULL)
iris2 <- unserialize(rawvec)
expect_identical(iris, iris2)
con <- rawConnection(raw(0), open="r+")
download_adls_file(fs, "iris.json", con)
iris3 <- as.data.frame(jsonlite::fromJSON(con))
expect_identical(iris, iris3)
# multiple file transfers
files <- lapply(1:10, function(f) paste0(sample(letters, 1000, replace=TRUE), collapse=""))
filenames <- sapply(1:10, function(n) file.path(tempdir(), sprintf("multitransfer_%d", n)))
suppressWarnings(file.remove(filenames))
mapply(writeLines, files, filenames)
create_adls_dir(fs, "multi")
multiupload_adls_file(fs, file.path(tempdir(), "multitransfer_*"), "multi")
dest_dir <- file.path(tempdir(), "adls_multitransfer")
suppressWarnings(unlink(dest_dir, recursive=TRUE))
dir.create(dest_dir)
multidownload_adls_file(fs, "multi/multitransfer_*", dest_dir, overwrite=TRUE)
expect_true(all(sapply(filenames, function(f)
{
src <- readBin(f, "raw", n=1e5)
dest <- readBin(file.path(dest_dir, basename(f)), "raw", n=1e5)
identical(src, dest)
})))
# ways of deleting a filesystem
delete_adls_filesystem(fs, confirm=FALSE)
delete_adls_filesystem(ad, "newfs2", confirm=FALSE)
delete_adls_filesystem(paste0(ad$url, "newfs3"), key=ad$key, confirm=FALSE)
Sys.sleep(5)
expect_true(is_empty(list_adls_filesystems(ad)))
close(con)
})
rg$delete(confirm=FALSE)