Implements MD5 hashing:
- store hash on upload
- check hash on download
- add Content-MD5 header on transfers for error checking
This commit is contained in:
Hong Ooi 2020-09-24 02:06:33 +10:00 коммит произвёл GitHub
Родитель 8df7b6fa61
Коммит c47df7face
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
17 изменённых файлов: 426 добавлений и 173 удалений

Просмотреть файл

@ -1,11 +1,14 @@
# AzureStor 3.2.3.9000
- ADLS, file and block blob uploads gain the option to compute and store the MD5 hash of the uploaded file, via the `put_md5` argument to `upload_adls_file`, `upload_azure_file` and `upload_blob`.
- Similarly, downloads gain the option to verify the integrity of the downloaded file using the MD5 hash, via the `check_md5` argument to `download_adls_file`, `download_azure_file` and `download_blob`. This requires that the file's `Content-MD5` property is set.
- Add support for uploading to [append blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs), which are a type of blob optimized for append operations. They are useful for data that is constantly growing, but should not be modified once written, such as server logs. See `?upload_blob` for more details.
- Add support for the [Azurite](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite) and [Azure SDK](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-emulator) storage emulators. To connect to the endpoint, use the service-specific functions `blob_endpoint` and `queue_endpoint` (the latter from the AzureQstor package), passing the full URL including the account name: `blob_endpoint("http://127.0.0.1:10000/myaccount", key="mykey")`. The warning about an unrecognised endpoint can be ignored. See the linked pages for full details on how to authenticate to the emulator.
Note that the Azure SDK emulator is no longer being actively developed; it's recommended to use Azurite.
- Add a 10-second fuzz factor to the default starting datetime for a generated SAS, to allow for differences in clocks.
- More fixes to the directory handling of `list_blobs()`.
- All uploads now include the `Content-MD5` header in the HTTP requests, as an error-checking mechanism.
# AzureStor 3.2.3

Просмотреть файл

@ -228,6 +228,8 @@ delete_adls_filesystem.adls_endpoint <- function(endpoint, name, confirm=TRUE, .
#' @param overwrite When downloading, whether to overwrite an existing destination file.
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.
#' @param max_concurrent_transfers For `multiupload_adls_file` and `multidownload_adls_file`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
#' @param put_md5 For uploading, whether to compute the MD5 hash of the file(s). This will be stored as part of the file's properties.
#' @param check_md5 For downloading, whether to verify the MD5 hash of the downloaded file(s). This requires that the file's `Content-MD5` property is set. If this is TRUE and the `Content-MD5` property is missing, a warning is generated.
#'
#' @details
#' `upload_adls_file` and `download_adls_file` are the workhorse file transfer functions for ADLSgen2 storage. They each take as inputs a _single_ filename as the source for uploading/downloading, and a single filename as the destination. Alternatively, for uploading, `src` can be a [textConnection] or [rawConnection] object; and for downloading, `dest` can be NULL or a `rawConnection` object. If `dest` is NULL, the downloaded data is returned as a raw vector, and if a raw connection, it will be placed into the connection. See the examples below.
@ -372,48 +374,52 @@ list_adls_files <- function(filesystem, dir="/", info=c("all", "name"),
#' @rdname adls
#' @export
multiupload_adls_file <- function(filesystem, src, dest, recursive=FALSE, blocksize=2^22, lease=NULL,
use_azcopy=FALSE,
max_concurrent_transfers=10)
put_md5=FALSE, use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
return(azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease, recursive=recursive))
return(azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease, recursive=recursive,
put_md5=put_md5))
multiupload_internal(filesystem, src, dest, recursive=recursive, blocksize=blocksize, lease=lease,
max_concurrent_transfers=max_concurrent_transfers)
put_md5=put_md5, max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname adls
#' @export
upload_adls_file <- function(filesystem, src, dest=basename(src), blocksize=2^24, lease=NULL, use_azcopy=FALSE)
upload_adls_file <- function(filesystem, src, dest=basename(src), blocksize=2^24, lease=NULL,
put_md5=FALSE, use_azcopy=FALSE)
{
if(use_azcopy)
azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease)
else upload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease)
azcopy_upload(filesystem, src, dest, blocksize=blocksize, lease=lease, put_md5=put_md5)
else upload_adls_file_internal(filesystem, src, dest, blocksize=blocksize, lease=lease, put_md5=put_md5)
}
#' @rdname adls
#' @export
multidownload_adls_file <- function(filesystem, src, dest, recursive=FALSE, blocksize=2^24, overwrite=FALSE,
use_azcopy=FALSE,
check_md5=FALSE, use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
return(azcopy_download(filesystem, src, dest, overwrite=overwrite, recursive=recursive))
return(azcopy_download(filesystem, src, dest, overwrite=overwrite, recursive=recursive, check_md5=check_md5))
multidownload_internal(filesystem, src, dest, recursive=recursive, blocksize=blocksize, overwrite=overwrite,
max_concurrent_transfers=max_concurrent_transfers)
check_md5=check_md5, max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname adls
#' @export
download_adls_file <- function(filesystem, src, dest=basename(src), blocksize=2^24, overwrite=FALSE, use_azcopy=FALSE)
download_adls_file <- function(filesystem, src, dest=basename(src), blocksize=2^24, overwrite=FALSE,
check_md5=FALSE, use_azcopy=FALSE)
{
if(use_azcopy)
azcopy_download(filesystem, src, dest, overwrite=overwrite)
else download_adls_file_internal(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite)
azcopy_download(filesystem, src, dest, overwrite=overwrite, check_md5=check_md5)
else download_adls_file_internal(filesystem, src, dest, blocksize=blocksize, overwrite=overwrite,
check_md5=check_md5)
}

Просмотреть файл

@ -1,6 +1,6 @@
upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lease=NULL)
upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lease=NULL, put_md5=FALSE)
{
src <- normalize_src(src)
src <- normalize_src(src, put_md5)
on.exit(close(src$con))
headers <- list()
@ -22,8 +22,10 @@ upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lea
break
opts <- list(action="append", position=sprintf("%.0f", pos))
headers <- list(`content-length`=sprintf("%.0f", thisblock))
headers <- list(
`content-length`=sprintf("%.0f", thisblock),
`content-md5`=encode_md5(body)
)
do_container_op(filesystem, dest, headers=headers, body=body, options=opts, progress=bar$update(),
http_verb="PATCH")
@ -34,45 +36,27 @@ upload_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, lea
bar$close()
# flush contents
headers <- list(`content-type`=src$content_type)
if(!is.null(src$md5))
headers$`x-ms-content-md5` <- src$md5
do_container_op(filesystem, dest,
options=list(action="flush", position=sprintf("%.0f", pos)),
headers=list(`content-type`=src$content_type),
headers=headers,
http_verb="PATCH")
invisible(NULL)
}
download_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE)
download_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, overwrite=FALSE, check_md5=FALSE)
{
file_dest <- is.character(dest)
null_dest <- is.null(dest)
conn_dest <- inherits(dest, "rawConnection")
if(!file_dest && !null_dest && !conn_dest)
stop("Unrecognised dest argument", call.=FALSE)
headers <- list()
if(file_dest)
{
if(!overwrite && file.exists(dest))
stop("Destination file exists and overwrite is FALSE", call.=FALSE)
if(!dir.exists(dirname(dest)))
dir.create(dirname(dest), recursive=TRUE)
dest <- file(dest, "w+b")
on.exit(close(dest))
}
if(null_dest)
{
dest <- rawConnection(raw(0), "w+b")
on.exit(seek(dest, 0))
}
if(conn_dest)
on.exit(seek(dest, 0))
dest <- init_download_dest(dest, overwrite)
on.exit(dispose_download_dest(dest))
# get file size (for progress bar)
res <- do_container_op(filesystem, src, headers=headers, http_verb="HEAD", http_status_handler="pass")
httr::stop_for_status(res, storage_error_message(res))
size <- as.numeric(httr::headers(res)[["Content-Length"]])
# get file size (for progress bar) and MD5 hash
props <- get_storage_properties(filesystem, src)
size <- as.numeric(props[["content-length"]])
src_md5 <- props[["content-md5"]]
bar <- storage_progress_bar$new(size, "down")
offset <- 0
@ -89,5 +73,7 @@ download_adls_file_internal <- function(filesystem, src, dest, blocksize=2^24, o
}
bar$close()
if(null_dest) rawConnectionValue(dest) else invisible(NULL)
if(check_md5)
do_md5_check(dest, src_md5)
if(inherits(dest, "null_dest")) rawConnectionValue(dest) else invisible(NULL)
}

Просмотреть файл

@ -82,19 +82,23 @@ azcopy_upload_opts <- function(container, ...)
}
azcopy_upload_opts.blob_container <- function(container, type="BlockBlob", blocksize=2^24, recursive=FALSE,
lease=NULL, ...)
lease=NULL, put_md5=FALSE, ...)
{
c("--blob-type", type, "--block-size-mb", sprintf("%.0f", blocksize/1048576), if(recursive) "--recursive")
c("--blob-type", type, "--block-size-mb", sprintf("%.0f", blocksize/1048576), if(recursive) "--recursive",
if(put_md5) "--put-md5")
}
azcopy_upload_opts.file_share <- function(container, blocksize=2^22, recursive=FALSE, ...)
azcopy_upload_opts.file_share <- function(container, blocksize=2^22, recursive=FALSE, put_md5=FALSE, ...)
{
c("--block-size-mb", sprintf("%.0f", blocksize/1048576), if(recursive) "--recursive")
c("--block-size-mb", sprintf("%.0f", blocksize/1048576), if(recursive) "--recursive",
if(put_md5) "--put-md5")
}
azcopy_upload_opts.adls_filesystem <- function(container, blocksize=2^24, recursive=FALSE, lease=NULL, ...)
azcopy_upload_opts.adls_filesystem <- function(container, blocksize=2^24, recursive=FALSE, lease=NULL,
put_md5=FALSE, ...)
{
c("--block-size-mb", sprintf("%.0f", blocksize/1048576), if(recursive) "--recursive")
c("--block-size-mb", sprintf("%.0f", blocksize/1048576), if(recursive) "--recursive",
if(put_md5) "--put-md5")
}
@ -115,18 +119,21 @@ azcopy_download_opts <- function(container, ...)
}
# currently all azcopy_download_opts methods are the same
azcopy_download_opts.blob_container <- function(container, overwrite=FALSE, recursive=FALSE, ...)
azcopy_download_opts.blob_container <- function(container, overwrite=FALSE, recursive=FALSE, check_md5=FALSE, ...)
{
c(paste0("--overwrite=", tolower(as.character(overwrite))), if(recursive) "--recursive")
c(paste0("--overwrite=", tolower(as.character(overwrite))), if(recursive) "--recursive",
if(check_md5) c("--check-md5", "FailIfDifferent"))
}
azcopy_download_opts.file_share <- function(container, overwrite=FALSE, recursive=FALSE, ...)
{
c(paste0("--overwrite=", tolower(as.character(overwrite))), if(recursive) "--recursive")
c(paste0("--overwrite=", tolower(as.character(overwrite))), if(recursive) "--recursive",
if(check_md5) c("--check-md5", "FailIfDifferent"))
}
azcopy_download_opts.adls_filesystem <- function(container, overwrite=FALSE, recursive=FALSE, ...)
{
c(paste0("--overwrite=", tolower(as.character(overwrite))), if(recursive) "--recursive")
c(paste0("--overwrite=", tolower(as.character(overwrite))), if(recursive) "--recursive",
if(check_md5) c("--check-md5", "FailIfDifferent"))
}

Просмотреть файл

@ -237,6 +237,8 @@ delete_blob_container.blob_endpoint <- function(endpoint, name, confirm=TRUE, le
#' @param max_concurrent_transfers For `multiupload_blob` and `multidownload_blob`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
#' @param prefix For `list_blobs`, an alternative way to specify the directory.
#' @param recursive For the multiupload/download functions, whether to recursively transfer files in subdirectories. For `list_blobs`, whether to include the contents of any subdirectories in the listing. For `delete_blob_dir`, whether to recursively delete subdirectory contents as well (not yet supported).
#' @param put_md5 For uploading, whether to compute the MD5 hash of the blob(s). This will be stored as part of the blob's properties. Only used for block blobs.
#' @param check_md5 For downloading, whether to verify the MD5 hash of the downloaded blob(s). This requires that the blob's `Content-MD5` property is set. If this is TRUE and the `Content-MD5` property is missing, a warning is generated.
#'
#' @details
#' `upload_blob` and `download_blob` are the workhorse file transfer functions for blobs. They each take as inputs a _single_ filename as the source for uploading/downloading, and a single filename as the destination. Alternatively, for uploading, `src` can be a [textConnection] or [rawConnection] object; and for downloading, `dest` can be NULL or a `rawConnection` object. If `dest` is NULL, the downloaded data is returned as a raw vector, and if a raw connection, it will be placed into the connection. See the examples below.
@ -455,51 +457,54 @@ list_blobs <- function(container, dir="/", info=c("partial", "name", "all"),
#' @export
upload_blob <- function(container, src, dest=basename(src), type=c("BlockBlob", "AppendBlob"),
blocksize=if(type == "BlockBlob") 2^24 else 2^22,
lease=NULL, append=FALSE, use_azcopy=FALSE)
lease=NULL, put_md5=FALSE, append=FALSE, use_azcopy=FALSE)
{
type <- match.arg(type)
if(use_azcopy)
azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease)
else upload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease, append=append)
azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease, put_md5=put_md5)
else upload_blob_internal(container, src, dest, type=type, blocksize=blocksize, lease=lease,
put_md5=put_md5, append=append)
}
#' @rdname blob
#' @export
multiupload_blob <- function(container, src, dest, recursive=FALSE, type=c("BlockBlob", "AppendBlob"),
blocksize=if(type == "BlockBlob") 2^24 else 2^22,
lease=NULL, append=FALSE, use_azcopy=FALSE,
lease=NULL, put_md5=FALSE, append=FALSE, use_azcopy=FALSE,
max_concurrent_transfers=10)
{
type <- match.arg(type)
if(use_azcopy)
return(azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease,
return(azcopy_upload(container, src, dest, type=type, blocksize=blocksize, lease=lease, put_md5=put_md5,
recursive=recursive))
multiupload_internal(container, src, dest, recursive=recursive, type=type, blocksize=blocksize, lease=lease,
append=append, max_concurrent_transfers=max_concurrent_transfers)
put_md5=put_md5, append=append, max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname blob
#' @export
download_blob <- function(container, src, dest=basename(src), blocksize=2^24, overwrite=FALSE, lease=NULL,
use_azcopy=FALSE)
check_md5=FALSE, use_azcopy=FALSE)
{
if(use_azcopy)
azcopy_download(container, src, dest, overwrite=overwrite, lease=lease)
else download_blob_internal(container, src, dest, blocksize=blocksize, overwrite=overwrite, lease=lease)
azcopy_download(container, src, dest, overwrite=overwrite, lease=lease, check_md5=check_md5)
else download_blob_internal(container, src, dest, blocksize=blocksize, overwrite=overwrite, lease=lease,
check_md5=check_md5)
}
#' @rdname blob
#' @export
multidownload_blob <- function(container, src, dest, recursive=FALSE, blocksize=2^24, overwrite=FALSE, lease=NULL,
use_azcopy=FALSE,
check_md5=FALSE, use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
return(azcopy_download(container, src, dest, overwrite=overwrite, lease=lease, recursive=recursive))
return(azcopy_download(container, src, dest, overwrite=overwrite, lease=lease, recursive=recursive,
check_md5=check_md5))
multidownload_internal(container, src, dest, recursive=recursive, blocksize=blocksize, overwrite=overwrite,
lease=lease, max_concurrent_transfers=max_concurrent_transfers)
lease=lease, check_md5=check_md5, max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname blob

Просмотреть файл

@ -1,6 +1,6 @@
upload_blob_internal <- function(container, src, dest, type, blocksize, lease=NULL, append=TRUE)
upload_blob_internal <- function(container, src, dest, type, blocksize, lease=NULL, put_md5=FALSE, append=FALSE)
{
src <- normalize_src(src)
src <- normalize_src(src, put_md5)
on.exit(close(src$con))
switch(type,
@ -13,40 +13,19 @@ upload_blob_internal <- function(container, src, dest, type, blocksize, lease=NU
}
download_blob_internal <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL)
download_blob_internal <- function(container, src, dest, blocksize=2^24, overwrite=FALSE, lease=NULL, check_md5=FALSE)
{
file_dest <- is.character(dest)
null_dest <- is.null(dest)
conn_dest <- inherits(dest, "rawConnection")
if(!file_dest && !null_dest && !conn_dest)
stop("Unrecognised dest argument", call.=FALSE)
headers <- list()
if(!is.null(lease))
headers[["x-ms-lease-id"]] <- as.character(lease)
if(file_dest)
{
if(!overwrite && file.exists(dest))
stop("Destination file exists and overwrite is FALSE", call.=FALSE)
if(!dir.exists(dirname(dest)))
dir.create(dirname(dest), recursive=TRUE)
dest <- file(dest, "w+b")
on.exit(close(dest))
}
if(null_dest)
{
dest <- rawConnection(raw(0), "w+b")
on.exit(seek(dest, 0))
}
if(conn_dest)
on.exit(seek(dest, 0))
dest <- init_download_dest(dest, overwrite)
on.exit(dispose_download_dest(dest))
# get file size (for progress bar)
res <- do_container_op(container, src, headers=headers, http_verb="HEAD", http_status_handler="pass")
httr::stop_for_status(res, storage_error_message(res))
size <- as.numeric(httr::headers(res)[["Content-Length"]])
# get file size (for progress bar) and MD5 hash
props <- get_storage_properties(container, src)
size <- as.numeric(props[["content-length"]])
src_md5 <- props[["content-md5"]]
bar <- storage_progress_bar$new(size, "down")
offset <- 0
@ -63,5 +42,7 @@ download_blob_internal <- function(container, src, dest, blocksize=2^24, overwri
}
bar$close()
if(null_dest) rawConnectionValue(dest) else invisible(NULL)
if(check_md5)
do_md5_check(dest, src_md5)
if(inherits(dest, "null_dest")) rawConnectionValue(dest) else invisible(NULL)
}

Просмотреть файл

@ -19,6 +19,7 @@ upload_block_blob <- function(container, src, dest, blocksize, lease)
# ensure content-length is never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
headers[["content-md5"]] <- encode_md5(body)
id <- openssl::base64_encode(sprintf("%s-%010d", base_id, i))
opts <- list(comp="block", blockid=id)
@ -34,8 +35,14 @@ upload_block_blob <- function(container, src, dest, blocksize, lease)
# update block list
body <- as.character(xml2::as_xml_document(list(BlockList=blocklist)))
headers <- list("content-length"=sprintf("%.0f", nchar(body)),
"x-ms-blob-content-type"=src$content_type)
headers <- list(
"content-length"=sprintf("%.0f", nchar(body)),
"x-ms-blob-content-type"=src$content_type,
"content-md5"=encode_md5(charToRaw(body))
)
if(!is.null(src$md5))
headers[["x-ms-blob-content-md5"]] <- src$md5
do_container_op(container, dest, headers=headers, body=body, options=list(comp="blocklist"),
http_verb="PUT")
}
@ -66,6 +73,7 @@ upload_append_blob <- function(container, src, dest, blocksize, lease, append)
# ensure content-length is never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
headers[["content-md5"]] <- encode_md5(body)
id <- openssl::base64_encode(sprintf("%s-%010d", base_id, i))
opts <- list(comp="appendblock")

68
R/download_dest.R Normal file
Просмотреть файл

@ -0,0 +1,68 @@
init_download_dest <- function(dest, overwrite)
{
UseMethod("init_download_dest")
}
init_download_dest.character <- function(dest, overwrite)
{
if(!overwrite && file.exists(dest))
stop("Destination file exists and overwrite is FALSE", call.=FALSE)
if(!dir.exists(dirname(dest)))
dir.create(dirname(dest), recursive=TRUE)
f <- file(dest, "w+b")
structure(f, class=c("file_dest", class(f)))
}
init_download_dest.rawConnection <- function(dest, overwrite)
{
structure(dest, class=c("conn_dest", class(dest)))
}
init_download_dest.NULL <- function(dest, overwrite)
{
con <- rawConnection(raw(0), "w+b")
structure(con, class=c("null_dest", class(con)))
}
dispose_download_dest <- function(dest)
{
UseMethod("dispose_download_dest")
}
dispose_download_dest.file_dest <- function(dest)
{
close(dest)
}
dispose_download_dest.conn_dest <- function(dest)
{
seek(dest, 0)
}
dispose_download_dest.null_dest <- function(dest)
{
close(dest)
}
do_md5_check <- function(dest, src_md5)
{
if(is.null(src_md5))
{
warning("Source file MD5 hash not set", call.=FALSE)
return()
}
seek(dest, 0)
dest_md5 <- encode_md5(dest)
if(dest_md5 != src_md5)
stop("Destination and source MD5 hashes do not match", call.=FALSE)
}

Просмотреть файл

@ -215,6 +215,8 @@ delete_file_share.file_endpoint <- function(endpoint, name, confirm=TRUE, ...)
#' @param use_azcopy Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.
#' @param max_concurrent_transfers For `multiupload_azure_file` and `multidownload_azure_file`, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.
#' @param prefix For `list_azure_files`, filters the result to return only files and directories whose name begins with this prefix.
#' @param put_md5 For uploading, whether to compute the MD5 hash of the file(s). This will be stored as part of the file's properties.
#' @param check_md5 For downloading, whether to verify the MD5 hash of the downloaded file(s). This requires that the file's `Content-MD5` property is set. If this is TRUE and the `Content-MD5` property is missing, a warning is generated.
#'
#' @details
#' `upload_azure_file` and `download_azure_file` are the workhorse file transfer functions for file storage. They each take as inputs a _single_ filename as the source for uploading/downloading, and a single filename as the destination. Alternatively, for uploading, `src` can be a [textConnection] or [rawConnection] object; and for downloading, `dest` can be NULL or a `rawConnection` object. If `dest` is NULL, the downloaded data is returned as a raw vector, and if a raw connection, it will be placed into the connection. See the examples below.
@ -330,46 +332,49 @@ list_azure_files <- function(share, dir="/", info=c("all", "name"),
#' @rdname file
#' @export
upload_azure_file <- function(share, src, dest=basename(src), create_dir=FALSE, blocksize=2^22, use_azcopy=FALSE)
upload_azure_file <- function(share, src, dest=basename(src), create_dir=FALSE, blocksize=2^22, put_md5=FALSE,
use_azcopy=FALSE)
{
if(use_azcopy)
azcopy_upload(share, src, dest, blocksize=blocksize)
else upload_azure_file_internal(share, src, dest, create_dir=create_dir, blocksize=blocksize)
azcopy_upload(share, src, dest, blocksize=blocksize, put_md5=put_md5)
else upload_azure_file_internal(share, src, dest, create_dir=create_dir, blocksize=blocksize, put_md5=put_md5)
}
#' @rdname file
#' @export
multiupload_azure_file <- function(share, src, dest, recursive=FALSE, create_dir=recursive, blocksize=2^22,
use_azcopy=FALSE,
put_md5=FALSE, use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
return(azcopy_upload(share, src, dest, blocksize=blocksize, recursive=recursive))
return(azcopy_upload(share, src, dest, blocksize=blocksize, recursive=recursive, put_md5=put_md5))
multiupload_internal(share, src, dest, recursive=recursive, create_dir=create_dir, blocksize=blocksize,
max_concurrent_transfers=max_concurrent_transfers)
put_md5=put_md5, max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname file
#' @export
download_azure_file <- function(share, src, dest=basename(src), blocksize=2^22, overwrite=FALSE, use_azcopy=FALSE)
download_azure_file <- function(share, src, dest=basename(src), blocksize=2^22, overwrite=FALSE,
check_md5=FALSE, use_azcopy=FALSE)
{
if(use_azcopy)
azcopy_download(share, src, dest, overwrite=overwrite)
else download_azure_file_internal(share, src, dest, blocksize=blocksize, overwrite=overwrite)
azcopy_download(share, src, dest, overwrite=overwrite, check_md5=check_md5)
else download_azure_file_internal(share, src, dest, blocksize=blocksize, overwrite=overwrite,
check_md5=check_md5)
}
#' @rdname file
#' @export
multidownload_azure_file <- function(share, src, dest, recursive=FALSE, blocksize=2^22, overwrite=FALSE,
use_azcopy=FALSE,
check_md5=FALSE, use_azcopy=FALSE,
max_concurrent_transfers=10)
{
if(use_azcopy)
return(azcopy_download(share, src, dest, overwrite=overwrite, recursive=recursive))
return(azcopy_download(share, src, dest, overwrite=overwrite, recursive=recursive, check_md5=check_md5))
multidownload_internal(share, src, dest, recursive=recursive, blocksize=blocksize, overwrite=overwrite,
max_concurrent_transfers=max_concurrent_transfers)
check_md5=check_md5, max_concurrent_transfers=max_concurrent_transfers)
}
#' @rdname file

Просмотреть файл

@ -1,6 +1,6 @@
upload_azure_file_internal <- function(share, src, dest, create_dir=FALSE, blocksize=2^22)
upload_azure_file_internal <- function(share, src, dest, create_dir=FALSE, blocksize=2^22, put_md5=FALSE)
{
src <- normalize_src(src)
src <- normalize_src(src, put_md5)
on.exit(close(src$con))
# file API needs separate call(s) to create destination dir
@ -12,6 +12,8 @@ upload_azure_file_internal <- function(share, src, dest, create_dir=FALSE, block
headers <- list("x-ms-type"="file",
"x-ms-content-type"=src$content_type,
"x-ms-content-length"=sprintf("%.0f", src$size))
if(!is.null(src$md5))
headers <- c(headers, "x-ms-content-md5"=src$md5)
headers <- c(headers, file_default_perms)
do_container_op(share, dest, headers=headers, http_verb="PUT")
@ -34,6 +36,7 @@ upload_azure_file_internal <- function(share, src, dest, create_dir=FALSE, block
# ensure content-length and range are never exponential notation
headers[["content-length"]] <- sprintf("%.0f", thisblock)
headers[["range"]] <- sprintf("bytes=%.0f-%.0f", range_begin, range_begin + thisblock - 1)
headers[["content-md5"]] <- encode_md5(body)
do_container_op(share, dest, headers=headers, body=body, options=options, progress=bar$update(),
http_verb="PUT")
@ -48,37 +51,16 @@ upload_azure_file_internal <- function(share, src, dest, create_dir=FALSE, block
}
download_azure_file_internal <- function(share, src, dest, blocksize=2^22, overwrite=FALSE)
download_azure_file_internal <- function(share, src, dest, blocksize=2^22, overwrite=FALSE, check_md5=FALSE)
{
file_dest <- is.character(dest)
null_dest <- is.null(dest)
conn_dest <- inherits(dest, "rawConnection")
if(!file_dest && !null_dest && !conn_dest)
stop("Unrecognised dest argument", call.=FALSE)
headers <- list()
if(file_dest)
{
if(!overwrite && file.exists(dest))
stop("Destination file exists and overwrite is FALSE", call.=FALSE)
if(!dir.exists(dirname(dest)))
dir.create(dirname(dest), recursive=TRUE)
dest <- file(dest, "w+b")
on.exit(close(dest))
}
if(null_dest)
{
dest <- rawConnection(raw(0), "w+b")
on.exit(seek(dest, 0))
}
if(conn_dest)
on.exit(seek(dest, 0))
dest <- init_download_dest(dest, overwrite)
on.exit(dispose_download_dest(dest))
# get file size (for progress bar)
res <- do_container_op(share, src, headers=headers, http_verb="HEAD", http_status_handler="pass")
httr::stop_for_status(res, storage_error_message(res))
size <- as.numeric(httr::headers(res)[["Content-Length"]])
# get file size (for progress bar) and MD5 hash
props <- get_storage_properties(share, src)
size <- as.numeric(props[["content-length"]])
src_md5 <- props[["content-md5"]]
bar <- storage_progress_bar$new(size, "down")
offset <- 0
@ -95,5 +77,7 @@ download_azure_file_internal <- function(share, src, dest, blocksize=2^22, overw
}
bar$close()
if(null_dest) rawConnectionValue(dest) else invisible(NULL)
if(check_md5)
do_md5_check(dest, src_md5)
if(inherits(dest, "null_dest")) rawConnectionValue(dest) else invisible(NULL)
}

Просмотреть файл

@ -211,15 +211,26 @@ xml_to_list <- function(x)
# check whether to retry a failed file transfer
# retry on curl error (not any other kind of error)
# don't retry on host not found
# retry on:
# - curl error (except host not found)
# - http 400: MD5 mismatch
retry_transfer <- function(res)
{
inherits(res, "error") &&
grepl("curl", deparse(res$call[[1]]), fixed=TRUE) &&
UseMethod("retry_transfer")
}
retry_transfer.error <- function(res)
{
grepl("curl", deparse(res$call[[1]]), fixed=TRUE) &&
!grepl("Could not resolve host", res$message, fixed=TRUE)
}
retry_transfer.response <- function(res)
{
httr::status_code(res) == 400L &&
grepl("Md5Mismatch", rawToChar(httr::content(res, as="raw")), fixed=TRUE)
}
as_datetime <- function(x, format="%a, %d %b %Y %H:%M:%S", tz="GMT")
{
@ -267,3 +278,9 @@ url_encode <- function(string, reserved=FALSE)
{
URLencode(enc2utf8(string), reserved=reserved)
}
encode_md5 <- function(x, ...)
{
openssl::base64_encode(openssl::md5(x, ...))
}

Просмотреть файл

@ -1,30 +1,39 @@
normalize_src <- function(src)
normalize_src <- function(src, put_md5=FALSE)
{
UseMethod("normalize_src")
}
normalize_src.character <- function(src)
normalize_src.character <- function(src, put_md5=FALSE)
{
content_type <- mime::guess_type(src)
con <- file(src, open="rb")
size <- file.size(src)
list(content_type=content_type, con=con, size=size)
if(put_md5)
{
md5 <- encode_md5(con)
seek(con, 0)
}
else md5 <- NULL
list(content_type=content_type, con=con, size=size, md5=md5)
}
normalize_src.textConnection <- function(src)
normalize_src.textConnection <- function(src, put_md5=FALSE)
{
content_type <- "application/octet-stream"
# convert to raw connection
content_type <- "application/octet-stream"
src <- charToRaw(paste0(readLines(src), collapse="\n"))
size <- length(src)
md5 <- if(put_md5)
encode_md5(src)
else NULL
con <- rawConnection(src)
list(content_type=content_type, con=con, size=size)
list(content_type=content_type, con=con, size=size, md5=md5)
}
normalize_src.rawConnection <- function(src)
normalize_src.rawConnection <- function(src, put_md5=FALSE)
{
content_type <- "application/octet-stream"
# need to read the data to get object size (!)
@ -37,7 +46,13 @@ normalize_src.rawConnection <- function(src)
size <- size + length(x)
}
seek(src, 0) # reposition connection after reading
list(content_type=content_type, con=src, size=size)
if(put_md5)
{
md5 <- encode_md5(src)
seek(src, 0)
}
else md5 <- NULL
list(content_type=content_type, con=src, size=size, md5=md5)
}

Просмотреть файл

@ -16,18 +16,18 @@ list_adls_files(filesystem, dir = "/", info = c("all", "name"),
recursive = FALSE)
multiupload_adls_file(filesystem, src, dest, recursive = FALSE,
blocksize = 2^22, lease = NULL, use_azcopy = FALSE,
blocksize = 2^22, lease = NULL, put_md5 = FALSE, use_azcopy = FALSE,
max_concurrent_transfers = 10)
upload_adls_file(filesystem, src, dest = basename(src), blocksize = 2^24,
lease = NULL, use_azcopy = FALSE)
lease = NULL, put_md5 = FALSE, use_azcopy = FALSE)
multidownload_adls_file(filesystem, src, dest, recursive = FALSE,
blocksize = 2^24, overwrite = FALSE, use_azcopy = FALSE,
max_concurrent_transfers = 10)
blocksize = 2^24, overwrite = FALSE, check_md5 = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
download_adls_file(filesystem, src, dest = basename(src), blocksize = 2^24,
overwrite = FALSE, use_azcopy = FALSE)
overwrite = FALSE, check_md5 = FALSE, use_azcopy = FALSE)
delete_adls_file(filesystem, file, confirm = TRUE)
@ -52,12 +52,16 @@ adls_file_exists(filesystem, file)
\item{lease}{The lease for a file, if present.}
\item{put_md5}{For uploading, whether to compute the MD5 hash of the file(s). This will be stored as part of the file's properties.}
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.}
\item{max_concurrent_transfers}{For \code{multiupload_adls_file} and \code{multidownload_adls_file}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
\item{overwrite}{When downloading, whether to overwrite an existing destination file.}
\item{check_md5}{For downloading, whether to verify the MD5 hash of the downloaded file(s). This requires that the file's \code{Content-MD5} property is set. If this is TRUE and the \code{Content-MD5} property is missing, a warning is generated.}
\item{confirm}{Whether to ask for confirmation on deleting a file or directory.}
}
\value{

Просмотреть файл

@ -19,18 +19,19 @@ list_blobs(container, dir = "/", info = c("partial", "name", "all"),
upload_blob(container, src, dest = basename(src), type = c("BlockBlob",
"AppendBlob"), blocksize = if (type == "BlockBlob") 2^24 else 2^22,
lease = NULL, append = FALSE, use_azcopy = FALSE)
lease = NULL, put_md5 = FALSE, append = FALSE, use_azcopy = FALSE)
multiupload_blob(container, src, dest, recursive = FALSE,
type = c("BlockBlob", "AppendBlob"), blocksize = if (type == "BlockBlob")
2^24 else 2^22, lease = NULL, append = FALSE, use_azcopy = FALSE,
max_concurrent_transfers = 10)
2^24 else 2^22, lease = NULL, put_md5 = FALSE, append = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
download_blob(container, src, dest = basename(src), blocksize = 2^24,
overwrite = FALSE, lease = NULL, use_azcopy = FALSE)
overwrite = FALSE, lease = NULL, check_md5 = FALSE,
use_azcopy = FALSE)
multidownload_blob(container, src, dest, recursive = FALSE,
blocksize = 2^24, overwrite = FALSE, lease = NULL,
blocksize = 2^24, overwrite = FALSE, lease = NULL, check_md5 = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
delete_blob(container, blob, confirm = TRUE)
@ -65,6 +66,8 @@ multicopy_url_to_blob(container, src, dest, lease = NULL, async = FALSE,
\item{lease}{The lease for a blob, if present.}
\item{put_md5}{For uploading, whether to compute the MD5 hash of the blob(s). This will be stored as part of the blob's properties. Only used for block blobs.}
\item{append}{When uploading, whether to append the uploaded data to the destination blob. Only has an effect if \code{type="AppendBlob"}. If this is FALSE (the default) and the destination append blob exists, it is overwritten. If this is TRUE and the destination does not exist or is not an append blob, an error is thrown.}
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.}
@ -73,6 +76,8 @@ multicopy_url_to_blob(container, src, dest, lease = NULL, async = FALSE,
\item{overwrite}{When downloading, whether to overwrite an existing destination file.}
\item{check_md5}{For downloading, whether to verify the MD5 hash of the downloaded blob(s). This requires that the blob's \code{Content-MD5} property is set. If this is TRUE and the \code{Content-MD5} property is missing, a warning is generated.}
\item{blob}{A string naming a blob.}
\item{confirm}{Whether to ask for confirmation on deleting a blob.}

Просмотреть файл

@ -16,18 +16,18 @@ list_azure_files(share, dir = "/", info = c("all", "name"),
prefix = NULL, recursive = FALSE)
upload_azure_file(share, src, dest = basename(src), create_dir = FALSE,
blocksize = 2^22, use_azcopy = FALSE)
blocksize = 2^22, put_md5 = FALSE, use_azcopy = FALSE)
multiupload_azure_file(share, src, dest, recursive = FALSE,
create_dir = recursive, blocksize = 2^22, use_azcopy = FALSE,
max_concurrent_transfers = 10)
create_dir = recursive, blocksize = 2^22, put_md5 = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
download_azure_file(share, src, dest = basename(src), blocksize = 2^22,
overwrite = FALSE, use_azcopy = FALSE)
overwrite = FALSE, check_md5 = FALSE, use_azcopy = FALSE)
multidownload_azure_file(share, src, dest, recursive = FALSE,
blocksize = 2^22, overwrite = FALSE, use_azcopy = FALSE,
max_concurrent_transfers = 10)
blocksize = 2^22, overwrite = FALSE, check_md5 = FALSE,
use_azcopy = FALSE, max_concurrent_transfers = 10)
delete_azure_file(share, file, confirm = TRUE)
@ -54,12 +54,16 @@ azure_file_exists(share, file)
\item{blocksize}{The number of bytes to upload/download per HTTP(S) request.}
\item{put_md5}{For uploading, whether to compute the MD5 hash of the file(s). This will be stored as part of the file's properties.}
\item{use_azcopy}{Whether to use the AzCopy utility from Microsoft to do the transfer, rather than doing it in R.}
\item{max_concurrent_transfers}{For \code{multiupload_azure_file} and \code{multidownload_azure_file}, the maximum number of concurrent file transfers. Each concurrent file transfer requires a separate R process, so limit this if you are low on memory.}
\item{overwrite}{When downloading, whether to overwrite an existing destination file.}
\item{check_md5}{For downloading, whether to verify the MD5 hash of the downloaded file(s). This requires that the file's \code{Content-MD5} property is set. If this is TRUE and the \code{Content-MD5} property is missing, a warning is generated.}
\item{confirm}{Whether to ask for confirmation on deleting a file or directory.}
}
\value{

Просмотреть файл

@ -0,0 +1,97 @@
context("MD5 hashes")
tenant <- Sys.getenv("AZ_TEST_TENANT_ID")
app <- Sys.getenv("AZ_TEST_APP_ID")
password <- Sys.getenv("AZ_TEST_PASSWORD")
subscription <- Sys.getenv("AZ_TEST_SUBSCRIPTION")
if(tenant == "" || app == "" || password == "" || subscription == "")
skip("Authentication tests skipped: ARM credentials not set")
rgname <- Sys.getenv("AZ_TEST_STORAGE_RG")
storname <- Sys.getenv("AZ_TEST_STORAGE_HNS")
if(rgname == "" || storname == "")
skip("MD5 tests skipped: resource names not set")
sub <- AzureRMR::az_rm$new(tenant=tenant, app=app, password=password)$get_subscription(subscription)
stor <- sub$get_resource_group(rgname)$get_storage_account(storname)
bl <- stor$get_blob_endpoint()
ad <- stor$get_adls_endpoint()
fl <- stor$get_file_endpoint()
opts <- options(azure_storage_progress_bar=FALSE)
test_that("Blob upload/download works with MD5 hash",
{
contname <- make_name()
cont <- create_blob_container(bl, contname)
expect_silent(upload_blob(cont, "../resources/iris.csv"))
lst <- list_blobs(cont, info="all")
expect_true(all(is.na(lst[["Content-MD5"]])))
expect_silent(upload_blob(cont, "../resources/iris.csv", put_md5=TRUE))
md5 <- encode_md5(file("../resources/iris.csv"))
lst <- list_blobs(cont, info="all")
expect_identical(lst[["Content-MD5"]], md5)
dl_file <- file.path(tempdir(), make_name())
expect_silent(download_blob(cont, "iris.csv", dl_file, check_md5=TRUE))
dl_md5 <- encode_md5(file(dl_file))
expect_identical(md5, dl_md5)
})
test_that("ADLS upload/download works with MD5 hash",
{
contname <- make_name()
fs <- create_adls_filesystem(ad, contname)
expect_silent(upload_adls_file(fs, "../resources/iris.csv"))
props <- get_storage_properties(fs, "iris.csv")
expect_null(props$`content-md5`)
expect_silent(upload_adls_file(fs, "../resources/iris.csv", put_md5=TRUE))
md5 <- encode_md5(file("../resources/iris.csv"))
props <- get_storage_properties(fs, "iris.csv")
expect_identical(props$`content-md5`, md5)
dl_file <- file.path(tempdir(), make_name())
expect_silent(download_adls_file(fs, "iris.csv", dl_file, check_md5=TRUE))
dl_md5 <- encode_md5(file(dl_file))
expect_identical(md5, dl_md5)
})
test_that("File upload/download works with MD5 hash",
{
contname <- make_name()
share <- create_file_share(fl, contname)
expect_silent(upload_azure_file(share, "../resources/iris.csv"))
props <- get_storage_properties(share, "iris.csv")
expect_null(props$`content-md5`)
expect_silent(upload_azure_file(share, "../resources/iris.csv", put_md5=TRUE))
md5 <- encode_md5(file("../resources/iris.csv"))
props <- get_storage_properties(share, "iris.csv")
expect_identical(props$`content-md5`, md5)
dl_file <- file.path(tempdir(), make_name())
expect_silent(download_azure_file(share, "iris.csv", dl_file, check_md5=TRUE))
dl_md5 <- encode_md5(file(dl_file))
expect_identical(md5, dl_md5)
})
teardown(
{
conts <- list_blob_containers(bl)
lapply(conts, delete_blob_container, confirm=FALSE)
fss <- list_adls_filesystems(ad)
lapply(fss, delete_adls_filesystem, confirm=FALSE)
fls <- list_file_shares(fl)
lapply(fls, delete_file_share, confirm=FALSE)
options(opts)
})

Просмотреть файл

@ -0,0 +1,58 @@
context("Azcopy with MD5")
tenant <- Sys.getenv("AZ_TEST_TENANT_ID")
app <- Sys.getenv("AZ_TEST_APP_ID")
cli_app <- Sys.getenv("AZ_TEST_NATIVE_APP_ID")
password <- Sys.getenv("AZ_TEST_PASSWORD")
subscription <- Sys.getenv("AZ_TEST_SUBSCRIPTION")
if(tenant == "" || app == "" || password == "" || subscription == "")
skip("Authentication tests skipped: ARM credentials not set")
rgname <- Sys.getenv("AZ_TEST_STORAGE_RG")
storname <- Sys.getenv("AZ_TEST_STORAGE_NOHNS")
if(rgname == "" || storname == "")
skip("Azcopy client tests skipped: resource names not set")
set_azcopy_path()
if(is.null(.AzureStor$azcopy) || is.na(.AzureStor$azcopy))
skip("Azcopy tests skipped: not detected")
if(Sys.getenv("_R_CHECK_CRAN_INCOMING_") != "")
skip("Azcopy tests skipped: tests being run from devtools::check")
opt_sil <- getOption("azure_storage_azcopy_silent")
options(azure_storage_azcopy_silent="TRUE")
stor <- AzureRMR::az_rm$new(tenant=tenant, app=app, password=password)$
get_subscription(subscription)$
get_resource_group(rgname)$
get_storage_account(storname)
sas <- stor$get_account_sas(permissions="rwdla")
bl <- stor$get_blob_endpoint(key=NULL, sas=sas, token=NULL)
test_that("azcopy works with put-md5 and check-md5",
{
contname <- make_name()
cont <- create_blob_container(bl, contname)
expect_silent(upload_blob(cont, "../resources/iris.csv", put_md5=TRUE, use_azcopy=TRUE))
md5 <- encode_md5(file("../resources/iris.csv"))
lst <- list_blobs(cont, info="all")
expect_identical(lst[["Content-MD5"]], md5)
dl_file <- file.path(tempdir(), make_name())
expect_silent(download_blob(cont, "iris.csv", dl_file, check_md5=TRUE, use_azcopy=TRUE))
dl_md5 <- encode_md5(file(dl_file))
expect_identical(md5, dl_md5)
})
teardown(
{
options(azure_storage_azcopy_silent=opt_sil)
conts <- list_blob_containers(bl)
lapply(conts, delete_blob_container, confirm=FALSE)
})