зеркало из https://github.com/Azure/AzureStor.git
revise retry logic
This commit is contained in:
Родитель
b8a2518ae4
Коммит
ebfe2c915c
2
NEWS.md
2
NEWS.md
|
@ -1,6 +1,6 @@
|
|||
# AzureStor 2.0.1.9000
|
||||
|
||||
- By default, the upload/download functions will now retry file transfers on encountering a network error.
|
||||
- By default, HTTP(S) requests to the storage endpoint will now be retried on encountering a network error. To change the number of retries, call `options(azure_storage_retries=N)` where N >= 0. Setting this option to zero disables retrying.
|
||||
- Downloading now proceeds in blocks, much like uploading. The default block size is set to 16MB for blob and ADLSgen2, and 4MB for file storage. While this reduces the throughput slightly (basically there is one extra REST call involved), it allows retrying a failed transfer on a per-block basis rather than having to redownload the entire file.
|
||||
|
||||
# AzureStor 2.0.1
|
||||
|
|
|
@ -12,6 +12,7 @@ globalVariables(c("self", "pool"), "AzureStor")
|
|||
options(azure_storage_api_version="2018-03-28")
|
||||
options(azure_adls_api_version="2018-06-17")
|
||||
options(azure_dl_progress_bar=TRUE)
|
||||
options(azure_storage_retries=10)
|
||||
|
||||
# all methods extending classes in external package must be run from .onLoad
|
||||
add_methods()
|
||||
|
|
|
@ -214,7 +214,6 @@ delete_adls_filesystem.adls_endpoint <- function(endpoint, name, confirm=TRUE, .
|
|||
#' @param blocksize The number of bytes to upload/download per HTTP(S) request.
|
||||
#' @param lease The lease for a file, if present.
|
||||
#' @param overwrite When downloading, whether to overwrite an existing destination file.
|
||||
#' @param retries The number of times the file transfer functions will retry when they encounter an error. Set this to 0 to disable retries. This is applied per block.
|
||||
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.
|
||||
#' @param max_concurrent_transfers For `multiupload_adls_file` and `multidownload_adls_file`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
|
||||
#' @param recursive For `list_adls_files`, and `delete_adls_dir`, whether the operation should recurse through subdirectories. For `delete_adls_dir`, this must be TRUE to delete a non-empty directory.
|
||||
|
@ -324,48 +323,47 @@ list_adls_files <- function(filesystem, dir="/", info=c("all", "name"),
|
|||
|
||||
#' @rdname adls
|
||||
#' @export
|
||||
multiupload_adls_file <- function(filesystem, src, dest, blocksize=2^22, lease=NULL, retries=5,
|
||||
multiupload_adls_file <- function(filesystem, src, dest, blocksize=2^22, lease=NULL,
|
||||
use_azcopy=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease)
|
||||
else multiupload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease, retries=retries,
|
||||
else multiupload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease,
|
||||
max_concurrent_transfers=max_concurrent_transfers)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname adls
|
||||
#' @export
|
||||
upload_adls_file <- function(filesystem, src, dest, blocksize=2^24, lease=NULL, retries=5, use_azcopy=FALSE)
|
||||
upload_adls_file <- function(filesystem, src, dest, blocksize=2^24, lease=NULL, use_azcopy=FALSE)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease)
|
||||
else upload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease, retries=retries)
|
||||
else upload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname adls
|
||||
#' @export
|
||||
multidownload_adls_file <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE, retries=5,
|
||||
multidownload_adls_file <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE,
|
||||
use_azcopy=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(filesystem, src, dest, overwrite=overwrite)
|
||||
else multidownload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite,
|
||||
retries=retries,
|
||||
max_concurrent_transfers=max_concurrent_transfers)
|
||||
}
|
||||
|
||||
|
||||
#' @rdname adls
|
||||
#' @export
|
||||
download_adls_file <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE, retries=5, use_azcopy=FALSE)
|
||||
download_adls_file <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE, use_azcopy=FALSE)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_download(filesystem, src, dest, overwrite=overwrite)
|
||||
else download_adls_file_internal(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite, retries=retries)
|
||||
else download_adls_file_internal(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
multiupload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^22, lease=lease, retries=5,
|
||||
multiupload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^22, lease=lease,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
src_files <- glob2rx(basename(src))
|
||||
|
@ -8,7 +8,7 @@ multiupload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^22
|
|||
if(length(src) == 0)
|
||||
stop("No files to transfer", call.=FALSE)
|
||||
if(length(src) == 1)
|
||||
return(upload_adls_file(filesystem, src, dest, blocksize=blocksize, lease=lease, retries=retries))
|
||||
return(upload_adls_file(filesystem, src, dest, blocksize=blocksize, lease=lease))
|
||||
|
||||
init_pool(max_concurrent_transfers)
|
||||
|
||||
|
@ -18,13 +18,13 @@ multiupload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^22
|
|||
parallel::parLapply(.AzureStor$pool, src, function(f)
|
||||
{
|
||||
dest <- sub("//", "/", file.path(dest, basename(f))) # API too dumb to handle //'s
|
||||
AzureStor::upload_adls_file(filesystem, f, dest, blocksize=blocksize, lease=lease, retries=retries)
|
||||
AzureStor::upload_adls_file(filesystem, f, dest, blocksize=blocksize, lease=lease)
|
||||
})
|
||||
invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lease=NULL, retries=5)
|
||||
upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lease=NULL)
|
||||
{
|
||||
con <- if(inherits(src, "textConnection"))
|
||||
rawConnection(charToRaw(paste0(readLines(src), collapse="\n")))
|
||||
|
@ -45,7 +45,7 @@ upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lea
|
|||
# transfer the contents
|
||||
blocklist <- list()
|
||||
pos <- 0
|
||||
while(1)
|
||||
repeat
|
||||
{
|
||||
body <- readBin(con, "raw", blocksize)
|
||||
thisblock <- length(body)
|
||||
|
@ -58,18 +58,7 @@ upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lea
|
|||
)
|
||||
opts <- list(action="append", position=sprintf("%.0f", pos))
|
||||
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
res <- tryCatch(
|
||||
do_container_op(filesystem, dest, headers=headers, body=body, options=opts, http_verb="PATCH"),
|
||||
error=function(e) e
|
||||
)
|
||||
if(retry_transfer(res))
|
||||
message(retry_upload_message(src))
|
||||
else break
|
||||
}
|
||||
if(inherits(res, "error"))
|
||||
stop(res)
|
||||
do_container_op(filesystem, dest, headers=headers, body=body, options=opts, http_verb="PATCH")
|
||||
|
||||
pos <- pos + thisblock
|
||||
}
|
||||
|
@ -81,7 +70,7 @@ upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lea
|
|||
}
|
||||
|
||||
|
||||
multidownload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE, retries=5,
|
||||
multidownload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
src_dir <- dirname(src)
|
||||
|
@ -94,7 +83,7 @@ multidownload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^
|
|||
if(length(src) == 0)
|
||||
stop("No files to transfer", call.=FALSE)
|
||||
if(length(src) == 1)
|
||||
return(download_adls_file(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite, retries=retries))
|
||||
return(download_adls_file(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite))
|
||||
|
||||
init_pool(max_concurrent_transfers)
|
||||
|
||||
|
@ -104,13 +93,13 @@ multidownload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^
|
|||
parallel::parLapply(.AzureStor$pool, src, function(f)
|
||||
{
|
||||
dest <- file.path(dest, basename(f))
|
||||
AzureStor::download_adls_file(filesystem, f, dest, blocksize=blocksize, overwrite=overwrite, retries=retries)
|
||||
AzureStor::download_adls_file(filesystem, f, dest, blocksize=blocksize, overwrite=overwrite)
|
||||
})
|
||||
invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
download_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE, retries=5)
|
||||
download_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE)
|
||||
{
|
||||
file_dest <- is.character(dest)
|
||||
null_dest <- is.null(dest)
|
||||
|
@ -142,19 +131,7 @@ download_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, o
|
|||
repeat
|
||||
{
|
||||
headers$Range <- sprintf("bytes=%.0f-%.0f", offset, offset + blocksize - 1)
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
# retry on curl errors, not on httr errors
|
||||
res <- tryCatch(
|
||||
do_container_op(filesystem, src, headers=headers, progress="down", http_status_handler="pass"),
|
||||
error=function(e) e
|
||||
)
|
||||
if(retry_transfer(res))
|
||||
message(retry_download_message(src))
|
||||
else break
|
||||
}
|
||||
if(inherits(res, "error"))
|
||||
stop(res)
|
||||
res <- do_container_op(filesystem, src, headers=headers, progress="down", http_status_handler="pass")
|
||||
|
||||
if(httr::status_code(res) == 416) # no data, overran eof
|
||||
break
|
||||
|
|
|
@ -232,7 +232,6 @@ delete_blob_container.blob_endpoint <- function(endpoint, name, confirm=TRUE, le
|
|||
#' @param lease The lease for a blob, if present.
|
||||
#' @param type When uploading, the type of blob to create. Currently only block blobs are supported.
|
||||
#' @param overwrite When downloading, whether to overwrite an existing destination file.
|
||||
#' @param retries The number of times the file transfer functions will retry when they encounter an error. Set this to 0 to disable retries. This is applied per block.
|
||||
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.
|
||||
#' @param max_concurrent_transfers For `multiupload_blob` and `multidownload_blob`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
|
||||
#' @param prefix For `list_blobs`, filters the result to return only blobs whose name begins with this prefix.
|
||||
|
@ -334,47 +333,45 @@ list_blobs <- function(container, info=c("partial", "name", "all"),
|
|||
|
||||
#' @rdname blob
|
||||
#' @export
|
||||
upload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL, retries=5,
|
||||
upload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL,
|
||||
use_azcopy=FALSE)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease)
|
||||
else upload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease, retries=retries)
|
||||
else upload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease)
|
||||
}
|
||||
|
||||
#' @rdname blob
|
||||
#' @export
|
||||
multiupload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL, retries=5,
|
||||
multiupload_blob <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL,
|
||||
use_azcopy=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease)
|
||||
else multiupload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease, retries=retries,
|
||||
else multiupload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease,
|
||||
max_concurrent_transfers=max_concurrent_transfers)
|
||||
}
|
||||
|
||||
#' @rdname blob
|
||||
#' @export
|
||||
download_blob <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL, retries=5,
|
||||
download_blob <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL,
|
||||
use_azcopy=FALSE)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_download(container, src, dest, overwrite=overwrite, lease=lease)
|
||||
else download_blob_internal(container, src, dest, blocksize=blocksize, overwrite=overwrite, lease=lease,
|
||||
retries=retries)
|
||||
else download_blob_internal(container, src, dest, blocksize=blocksize, overwrite=overwrite, lease=lease)
|
||||
}
|
||||
|
||||
#' @rdname blob
|
||||
#' @export
|
||||
multidownload_blob <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL, retries=5,
|
||||
multidownload_blob <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL,
|
||||
use_azcopy=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_download(container, src, dest, overwrite=overwrite, lease=lease)
|
||||
else multidownload_blob_internal(container, src, dest, blocksize=blocksize, overwrite=overwrite, lease=lease,
|
||||
retries=retries,
|
||||
max_concurrent_transfers=max_concurrent_transfers)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
multiupload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL, retries=5,
|
||||
multiupload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
src_dir <- dirname(src)
|
||||
|
@ -23,13 +23,13 @@ multiupload_blob_internal <- function(container, src, dest, type="BlockBlob", bl
|
|||
dest <- if(dest == "/")
|
||||
basename(f)
|
||||
else file.path(dest, basename(f))
|
||||
AzureStor::upload_blob(container, f, dest, type=type, blocksize=blocksize, lease=lease, retries=retries)
|
||||
AzureStor::upload_blob(container, f, dest, type=type, blocksize=blocksize, lease=lease)
|
||||
})
|
||||
invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
upload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL, retries=5)
|
||||
upload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksize=2^24, lease=NULL)
|
||||
{
|
||||
if(type != "BlockBlob")
|
||||
stop("Only block blobs currently supported")
|
||||
|
@ -52,7 +52,7 @@ upload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksi
|
|||
blocklist <- list()
|
||||
base_id <- openssl::md5(dest)
|
||||
i <- 1
|
||||
while(1)
|
||||
repeat
|
||||
{
|
||||
body <- readBin(con, "raw", blocksize)
|
||||
thisblock <- length(body)
|
||||
|
@ -64,18 +64,7 @@ upload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksi
|
|||
id <- openssl::base64_encode(sprintf("%s-%010d", base_id, i))
|
||||
opts <- list(comp="block", blockid=id)
|
||||
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
res <- tryCatch(
|
||||
do_container_op(container, dest, headers=headers, body=body, options=opts, http_verb="PUT"),
|
||||
error=function(e) e
|
||||
)
|
||||
if(retry_transfer(res))
|
||||
message(retry_upload_message(src))
|
||||
else break
|
||||
}
|
||||
if(inherits(res, "error"))
|
||||
stop(res)
|
||||
do_container_op(container, dest, headers=headers, body=body, options=opts, http_verb="PUT")
|
||||
|
||||
blocklist <- c(blocklist, list(Latest=list(id)))
|
||||
i <- i + 1
|
||||
|
@ -94,7 +83,7 @@ upload_blob_internal <- function(container, src, dest, type="BlockBlob", blocksi
|
|||
}
|
||||
|
||||
|
||||
multidownload_blob_internal <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL, retries=5,
|
||||
multidownload_blob_internal <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
files <- list_blobs(container, info="name")
|
||||
|
@ -115,13 +104,13 @@ multidownload_blob_internal <- function(container, src, dest, blocksize=2^24, ov
|
|||
parallel::parLapply(.AzureStor$pool, src, function(f)
|
||||
{
|
||||
dest <- file.path(dest, basename(f))
|
||||
AzureStor::download_blob(container, f, dest, overwrite=overwrite, lease=lease, retries=retries)
|
||||
AzureStor::download_blob(container, f, dest, overwrite=overwrite, lease=lease)
|
||||
})
|
||||
invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
download_blob_internal <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL, retries=5)
|
||||
download_blob_internal <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL)
|
||||
{
|
||||
file_dest <- is.character(dest)
|
||||
null_dest <- is.null(dest)
|
||||
|
@ -156,19 +145,7 @@ download_blob_internal <- function(container, src, dest, blocksize=2^24, overwri
|
|||
repeat
|
||||
{
|
||||
headers$Range <- sprintf("bytes=%.0f-%.0f", offset, offset + blocksize - 1)
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
# retry on curl errors, not on httr errors
|
||||
res <- tryCatch(
|
||||
do_container_op(container, src, headers=headers, progress="down", http_status_handler="pass"),
|
||||
error=function(e) e
|
||||
)
|
||||
if(retry_transfer(res))
|
||||
message(retry_download_message(src))
|
||||
else break
|
||||
}
|
||||
if(inherits(res, "error"))
|
||||
stop(res)
|
||||
res <- do_container_op(container, src, headers=headers, progress="down", http_status_handler="pass")
|
||||
|
||||
if(httr::status_code(res) == 416) # no data, overran eof
|
||||
break
|
||||
|
|
|
@ -199,7 +199,6 @@ delete_file_share.file_endpoint <- function(endpoint, name, confirm=TRUE, ...)
|
|||
#' @param confirm Whether to ask for confirmation on deleting a file or directory.
|
||||
#' @param blocksize The number of bytes to upload/download per HTTP(S) request.
|
||||
#' @param overwrite When downloading, whether to overwrite an existing destination file.
|
||||
#' @param retries The number of times the file transfer functions will retry when they encounter an error. Set this to 0 to disable retries. This is applied per block.
|
||||
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.
|
||||
#' @param max_concurrent_transfers For `multiupload_azure_file` and `multidownload_azure_file`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
|
||||
#' @param prefix For `list_azure_files`, filters the result to return only files and directories whose name begins with this prefix.
|
||||
|
@ -289,43 +288,43 @@ list_azure_files <- function(share, dir, info=c("all", "name"),
|
|||
|
||||
#' @rdname file
|
||||
#' @export
|
||||
upload_azure_file <- function(share, src, dest, blocksize=2^22, retries=5, use_azcopy=FALSE)
|
||||
upload_azure_file <- function(share, src, dest, blocksize=2^22, use_azcopy=FALSE)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(share, src, dest, blocksize=blocksize)
|
||||
else upload_azure_file_internal(share, src, dest, blocksize=blocksize, retries=retries)
|
||||
else upload_azure_file_internal(share, src, dest, blocksize=blocksize)
|
||||
}
|
||||
|
||||
#' @rdname file
|
||||
#' @export
|
||||
multiupload_azure_file <- function(share, src, dest, blocksize=2^22, retries=5,
|
||||
multiupload_azure_file <- function(share, src, dest, blocksize=2^22,
|
||||
use_azcopy=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_upload(share, src, dest, blocksize=blocksize)
|
||||
else multiupload_azure_file_internal(share, src, dest, blocksize=blocksize, retries=retries,
|
||||
else multiupload_azure_file_internal(share, src, dest, blocksize=blocksize,
|
||||
max_concurrent_transfers=max_concurrent_transfers)
|
||||
}
|
||||
|
||||
#' @rdname file
|
||||
#' @export
|
||||
download_azure_file <- function(share, src, dest, blocksize=2^22, overwrite=FALSE, retries=5, use_azcopy=FALSE)
|
||||
download_azure_file <- function(share, src, dest, blocksize=2^22, overwrite=FALSE, use_azcopy=FALSE)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_download(share, src, dest, overwrite=overwrite)
|
||||
else download_azure_file_internal(share, src, dest, blocksize=blocksize, overwrite=overwrite, retries=retries)
|
||||
else download_azure_file_internal(share, src, dest, blocksize=blocksize, overwrite=overwrite)
|
||||
}
|
||||
|
||||
#' @rdname file
|
||||
#' @export
|
||||
multidownload_azure_file <- function(share, src, dest, blocksize=2^22, overwrite=FALSE, retries=5,
|
||||
multidownload_azure_file <- function(share, src, dest, blocksize=2^22, overwrite=FALSE,
|
||||
use_azcopy=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
if(use_azcopy)
|
||||
azcopy_download(share, src, dest, overwrite=overwrite)
|
||||
else multidownload_azure_file_internal(share, src, dest, blocksize=blocksize, overwrite=overwrite, retries=retries,
|
||||
else multidownload_azure_file_internal(share, src, dest, blocksize=blocksize, overwrite=overwrite,
|
||||
max_concurrent_transfers=max_concurrent_transfers)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
multiupload_azure_file_internal <- function(share, src, dest, blocksize=2^22, retries=5, max_concurrent_transfers=10)
|
||||
multiupload_azure_file_internal <- function(share, src, dest, blocksize=2^22, max_concurrent_transfers=10)
|
||||
{
|
||||
src_dir <- dirname(src)
|
||||
src_files <- glob2rx(basename(src))
|
||||
|
@ -7,7 +7,7 @@ multiupload_azure_file_internal <- function(share, src, dest, blocksize=2^22, re
|
|||
if(length(src) == 0)
|
||||
stop("No files to transfer", call.=FALSE)
|
||||
if(length(src) == 1)
|
||||
return(upload_azure_file(share, src, dest, blocksize=blocksize, retries=retries))
|
||||
return(upload_azure_file(share, src, dest, blocksize=blocksize))
|
||||
|
||||
init_pool(max_concurrent_transfers)
|
||||
|
||||
|
@ -17,13 +17,13 @@ multiupload_azure_file_internal <- function(share, src, dest, blocksize=2^22, re
|
|||
parallel::parLapply(.AzureStor$pool, src, function(f)
|
||||
{
|
||||
dest <- sub("//", "/", file.path(dest, basename(f))) # API too dumb to handle //'s
|
||||
AzureStor::upload_azure_file(share, f, dest, blocksize=blocksize, retries=retries)
|
||||
AzureStor::upload_azure_file(share, f, dest, blocksize=blocksize)
|
||||
})
|
||||
invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
upload_azure_file_internal <- function(share, src, dest, blocksize=2^22, retries=5)
|
||||
upload_azure_file_internal <- function(share, src, dest, blocksize=2^22)
|
||||
{
|
||||
# set content type
|
||||
content_type <- if(inherits(src, "connection"))
|
||||
|
@ -81,18 +81,7 @@ upload_azure_file_internal <- function(share, src, dest, blocksize=2^22, retries
|
|||
headers[["content-length"]] <- sprintf("%.0f", thisblock)
|
||||
headers[["range"]] <- sprintf("bytes=%.0f-%.0f", range_begin, range_begin + thisblock - 1)
|
||||
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
res <- tryCatch(
|
||||
do_container_op(share, dest, headers=headers, body=body, options=options, http_verb="PUT"),
|
||||
error=function(e) e
|
||||
)
|
||||
if(retry_transfer(res))
|
||||
retry_upload_message(src)
|
||||
else break
|
||||
}
|
||||
if(inherits(res, "error"))
|
||||
stop(res)
|
||||
do_container_op(share, dest, headers=headers, body=body, options=options, http_verb="PUT")
|
||||
|
||||
range_begin <- range_begin + thisblock
|
||||
}
|
||||
|
@ -104,7 +93,7 @@ upload_azure_file_internal <- function(share, src, dest, blocksize=2^22, retries
|
|||
}
|
||||
|
||||
|
||||
multidownload_azure_file_internal <- function(share, src, dest, blocksize=2^22, overwrite=FALSE, retries=5,
|
||||
multidownload_azure_file_internal <- function(share, src, dest, blocksize=2^22, overwrite=FALSE,
|
||||
max_concurrent_transfers=10)
|
||||
{
|
||||
src_files <- glob2rx(basename(src))
|
||||
|
@ -118,7 +107,7 @@ multidownload_azure_file_internal <- function(share, src, dest, blocksize=2^22,
|
|||
if(length(src) == 0)
|
||||
stop("No files to transfer", call.=FALSE)
|
||||
if(length(src) == 1)
|
||||
return(download_azure_file(share, src, dest, blocksize=blocksize, overwrite=overwrite, retries=retries))
|
||||
return(download_azure_file(share, src, dest, blocksize=blocksize, overwrite=overwrite))
|
||||
|
||||
init_pool(max_concurrent_transfers)
|
||||
|
||||
|
@ -128,13 +117,13 @@ multidownload_azure_file_internal <- function(share, src, dest, blocksize=2^22,
|
|||
parallel::parLapply(.AzureStor$pool, src, function(f)
|
||||
{
|
||||
dest <- file.path(dest, basename(f))
|
||||
AzureStor::download_azure_file(share, f, dest, blocksize=blocksize, overwrite=overwrite, retries=retries)
|
||||
AzureStor::download_azure_file(share, f, dest, blocksize=blocksize, overwrite=overwrite)
|
||||
})
|
||||
invisible(NULL)
|
||||
}
|
||||
|
||||
|
||||
download_azure_file_internal <- function(share, src, dest, blocksize=2^22, overwrite=FALSE, retries=5)
|
||||
download_azure_file_internal <- function(share, src, dest, blocksize=2^22, overwrite=FALSE)
|
||||
{
|
||||
file_dest <- is.character(dest)
|
||||
null_dest <- is.null(dest)
|
||||
|
@ -166,19 +155,7 @@ download_azure_file_internal <- function(share, src, dest, blocksize=2^22, overw
|
|||
repeat
|
||||
{
|
||||
headers$Range <- sprintf("bytes=%.0f-%.0f", offset, offset + blocksize - 1)
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
# retry on curl errors, not on httr errors
|
||||
res <- tryCatch(
|
||||
do_container_op(share, src, headers=headers, progress="down", http_status_handler="pass"),
|
||||
error=function(e) e
|
||||
)
|
||||
if(retry_transfer(res))
|
||||
message(retry_download_message(src))
|
||||
else break
|
||||
}
|
||||
if(inherits(res, "error"))
|
||||
stop(res)
|
||||
res <- do_container_op(share, src, headers=headers, progress="down", http_status_handler="pass")
|
||||
|
||||
if(httr::status_code(res) == 416) # no data, overran eof
|
||||
break
|
||||
|
|
|
@ -35,12 +35,16 @@ do_storage_call <- function(endpoint_url, path, options=list(), headers=list(),
|
|||
url <- add_sas(sas, url)
|
||||
|
||||
headers <- do.call(httr::add_headers, headers)
|
||||
verb <- get(verb, getNamespace("httr"))
|
||||
|
||||
# do actual http[s] call
|
||||
response <- if(!is.null(progress) && isTRUE(getOption("azure_dl_progress_bar")))
|
||||
verb(url, headers, body=body, ..., httr::progress(progress))
|
||||
else verb(url, headers, body=body, ...)
|
||||
retries <- as.numeric(getOption("azure_storage_retries"))
|
||||
for(r in seq_len(retries + 1))
|
||||
{
|
||||
# retry on curl errors, not on httr errors
|
||||
response <- tryCatch(httr::VERB(verb, url, headers, body=body, ...), error=function(e) e)
|
||||
if(!retry_transfer(response))
|
||||
break
|
||||
}
|
||||
if(inherits(response, "error"))
|
||||
stop(response)
|
||||
|
||||
handler <- match.arg(http_status_handler)
|
||||
if(handler != "pass")
|
||||
|
@ -222,15 +226,3 @@ retry_transfer <- function(res)
|
|||
}
|
||||
|
||||
|
||||
retry_upload_message <- function(src)
|
||||
{
|
||||
if(is.character(src))
|
||||
sprintf("Error uploading file %s, retrying...", src)
|
||||
else "Error uploading from connection, retrying..."
|
||||
}
|
||||
|
||||
|
||||
retry_download_message <- function(src)
|
||||
{
|
||||
sprintf("Error downloading file %s, retrying...", src)
|
||||
}
|
||||
|
|
11
man/adls.Rd
11
man/adls.Rd
|
@ -15,18 +15,17 @@ list_adls_files(filesystem, dir = "/", info = c("all", "name"),
|
|||
recursive = FALSE)
|
||||
|
||||
multiupload_adls_file(filesystem, src, dest, blocksize = 2^22,
|
||||
lease = NULL, retries = 5, use_azcopy = FALSE,
|
||||
max_concurrent_transfers = 10)
|
||||
lease = NULL, use_azcopy = FALSE, max_concurrent_transfers = 10)
|
||||
|
||||
upload_adls_file(filesystem, src, dest, blocksize = 2^24, lease = NULL,
|
||||
retries = 5, use_azcopy = FALSE)
|
||||
use_azcopy = FALSE)
|
||||
|
||||
multidownload_adls_file(filesystem, src, dest, blocksize = 2^24,
|
||||
overwrite = FALSE, retries = 5, use_azcopy = FALSE,
|
||||
overwrite = FALSE, use_azcopy = FALSE,
|
||||
max_concurrent_transfers = 10)
|
||||
|
||||
download_adls_file(filesystem, src, dest, blocksize = 2^24,
|
||||
overwrite = FALSE, retries = 5, use_azcopy = FALSE)
|
||||
overwrite = FALSE, use_azcopy = FALSE)
|
||||
|
||||
delete_adls_file(filesystem, file, confirm = TRUE)
|
||||
|
||||
|
@ -49,8 +48,6 @@ delete_adls_dir(filesystem, dir, recursive = FALSE, confirm = TRUE)
|
|||
|
||||
\item{lease}{The lease for a file, if present.}
|
||||
|
||||
\item{retries}{The number of times the file transfer functions will retry when they encounter an error. Set this to 0 to disable retries. This is applied per block.}
|
||||
|
||||
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.}
|
||||
|
||||
\item{max_concurrent_transfers}{For \code{multiupload_adls_file} and \code{multidownload_adls_file}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
|
||||
|
|
10
man/blob.Rd
10
man/blob.Rd
|
@ -13,17 +13,17 @@ list_blobs(container, info = c("partial", "name", "all"),
|
|||
prefix = NULL)
|
||||
|
||||
upload_blob(container, src, dest, type = "BlockBlob", blocksize = 2^24,
|
||||
lease = NULL, retries = 5, use_azcopy = FALSE)
|
||||
lease = NULL, use_azcopy = FALSE)
|
||||
|
||||
multiupload_blob(container, src, dest, type = "BlockBlob",
|
||||
blocksize = 2^24, lease = NULL, retries = 5, use_azcopy = FALSE,
|
||||
blocksize = 2^24, lease = NULL, use_azcopy = FALSE,
|
||||
max_concurrent_transfers = 10)
|
||||
|
||||
download_blob(container, src, dest, blocksize = 2^24,
|
||||
overwrite = FALSE, lease = NULL, retries = 5, use_azcopy = FALSE)
|
||||
overwrite = FALSE, lease = NULL, use_azcopy = FALSE)
|
||||
|
||||
multidownload_blob(container, src, dest, blocksize = 2^24,
|
||||
overwrite = FALSE, lease = NULL, retries = 5, use_azcopy = FALSE,
|
||||
overwrite = FALSE, lease = NULL, use_azcopy = FALSE,
|
||||
max_concurrent_transfers = 10)
|
||||
|
||||
delete_blob(container, blob, confirm = TRUE)
|
||||
|
@ -43,8 +43,6 @@ delete_blob(container, blob, confirm = TRUE)
|
|||
|
||||
\item{lease}{The lease for a blob, if present.}
|
||||
|
||||
\item{retries}{The number of times the file transfer functions will retry when they encounter an error. Set this to 0 to disable retries. This is applied per block.}
|
||||
|
||||
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.}
|
||||
|
||||
\item{max_concurrent_transfers}{For \code{multiupload_blob} and \code{multidownload_blob}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
|
||||
|
|
10
man/file.Rd
10
man/file.Rd
|
@ -13,17 +13,17 @@
|
|||
\usage{
|
||||
list_azure_files(share, dir, info = c("all", "name"), prefix = NULL)
|
||||
|
||||
upload_azure_file(share, src, dest, blocksize = 2^22, retries = 5,
|
||||
upload_azure_file(share, src, dest, blocksize = 2^22,
|
||||
use_azcopy = FALSE)
|
||||
|
||||
multiupload_azure_file(share, src, dest, blocksize = 2^22, retries = 5,
|
||||
multiupload_azure_file(share, src, dest, blocksize = 2^22,
|
||||
use_azcopy = FALSE, max_concurrent_transfers = 10)
|
||||
|
||||
download_azure_file(share, src, dest, blocksize = 2^22,
|
||||
overwrite = FALSE, retries = 5, use_azcopy = FALSE)
|
||||
overwrite = FALSE, use_azcopy = FALSE)
|
||||
|
||||
multidownload_azure_file(share, src, dest, blocksize = 2^22,
|
||||
overwrite = FALSE, retries = 5, use_azcopy = FALSE,
|
||||
overwrite = FALSE, use_azcopy = FALSE,
|
||||
max_concurrent_transfers = 10)
|
||||
|
||||
delete_azure_file(share, file, confirm = TRUE)
|
||||
|
@ -45,8 +45,6 @@ delete_azure_dir(share, dir, confirm = TRUE)
|
|||
|
||||
\item{blocksize}{The number of bytes to upload/download per HTTP(S) request.}
|
||||
|
||||
\item{retries}{The number of times the file transfer functions will retry when they encounter an error. Set this to 0 to disable retries. This is applied per block.}
|
||||
|
||||
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.}
|
||||
|
||||
\item{max_concurrent_transfers}{For \code{multiupload_azure_file} and \code{multidownload_azure_file}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
|
||||
|
|
Загрузка…
Ссылка в новой задаче