This commit is contained in:
hong-revo 2018-05-15 06:40:26 +10:00
Родитель 17dbadeaa1
Коммит 51011c5b7d
10 изменённых файлов: 267 добавлений и 309 удалений

Просмотреть файл

@ -13,7 +13,6 @@ Depends:
Imports:
R6,
httr,
curl,
openssl,
xml2,
AzureRMR

Просмотреть файл

@ -1,26 +1,32 @@
# Generated by roxygen2: do not edit by hand
export(az_blob_container)
export(az_blob_endpoint)
export(az_create_blob_container)
export(az_create_file_share)
export(az_delete_blob)
export(az_delete_blob_container)
export(az_delete_file)
export(az_delete_file_share)
export(az_download_blob)
export(az_download_file)
export(az_file_endpoint)
export(az_file_share)
export(az_list_blob_containers)
export(az_list_blobs)
export(az_list_file_shares)
export(az_list_files)
S3method(get_storage_properties,blob_share)
S3method(get_storage_properties,file_share)
S3method(get_storage_properties,storage_endpoint)
export(az_storage)
export(az_upload_blob)
export(az_upload_file)
export(azure_download)
export(azure_upload)
export(blob_container)
export(create_azure_dir)
export(create_blob_container)
export(create_file_share)
export(delete_azure_blob)
export(delete_azure_dir)
export(delete_azure_file)
export(delete_blob_container)
export(delete_file_share)
export(download_azure_blob)
export(download_azure_file)
export(download_from_url)
export(file_share)
export(get_azure_blob_properties)
export(get_azure_dir_properties)
export(get_azure_file_properties)
export(get_storage_properties)
export(list_azure_files)
export(list_blob_containers)
export(list_blobs)
export(list_file_shares)
export(storage_endpoint)
export(upload_azure_blob)
export(upload_azure_file)
export(upload_to_url)
import(AzureRMR)

Просмотреть файл

@ -1,153 +0,0 @@
az_blob_client <- R6::R6Class("az_blob_client",
public=list(
endpoint=NULL,
key=NULL,
sas=NULL,
api_version=NULL,
initialize=function(endpoint, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
self$endpoint <- endpoint
self$key <- key
self$sas <- sas
self$api_version <- api_version
NULL
},
list_containers=function()
{
lst <- do_storage_call(self$endpoint, "/", options=list(comp="list"),
key=self$key, sas=self$sas, api_version=self$api_version)
named_list(lapply(lst$Containers, function(cont)
az_blob_container$new(cont$Name[[1]], self$endpoint, self$key, self$sas, self$api_version)))
},
get_container=function(container)
{
az_blob_container$new(container, self$endpoint, self$key, self$sas, self$api_version)
},
create_container=function(container, public_access=NULL)
{
az_blob_container$new(container, self$endpoint, self$key, self$sas, self$api_version,
public_access=public_access, create=TRUE)
},
delete_container=function(container, confirm=TRUE)
{
self$get_container(container)$delete(confirm=confirm)
}
))
az_blob_container <- R6::R6Class("az_blob_container",
public=list(
endpoint=NULL,
name=NULL,
key=NULL,
sas=NULL,
api_version=NULL,
initialize=function(name, endpoint, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"),
public_access=NULL, create=FALSE)
{
# allow passing full URL to constructor
if(missing(endpoint))
{
url <- parse_url(name)
if(url$path == "")
stop("Must supply container name", call.=FALSE)
self$endpoint <- get_hostroot(url)
self$name <- sub("/$", "", url$path) # strip trailing /
}
else
{
self$endpoint <- endpoint
self$name <- name
}
self$key <- key
self$sas <- sas
self$api_version <- api_version
if(create)
{
headers <- if(!is_empty(public_access))
list("x-ms-blob-public-access"=public_access)
else list()
private$do_container_op(options=list(restype="container"), headers=headers, http_verb="PUT")
}
NULL
},
delete=function(confirm=TRUE)
{
if(confirm && interactive())
{
path <- paste0(self$endpoint, self$name, "/")
yn <- readline(paste0("Are you sure you really want to delete blob container '", path, "'? (y/N) "))
if(tolower(substr(yn, 1, 1)) != "y")
return(invisible(NULL))
}
private$do_container_op(options=list(restype="container"), http_verb="DELETE")
invisible(NULL)
},
list_blobs=function()
{
lst <- private$do_container_op(options=list(comp="list", restype="container"))
unname(vapply(lst$Blobs, function(b) b$Name[[1]], FUN.VALUE=character(1)))
},
download_blob=function(blob, dest, overwrite=FALSE)
{
private$do_container_op(blob, config=httr::write_disk(dest, overwrite))
},
upload_blob=function(...) { },
delete_blob=function(...) { }
),
private=list(
do_container_op=function(blob="", options=list(), headers=list(), http_verb="GET", ...)
{
path <- paste0(self$name, "/", blob)
do_storage_call(self$endpoint, path, options=options, headers=headers,
key=self$key, sas=self$sas, api_version=self$api_version,
http_verb=http_verb, ...)
}
))
#' @export
download_azure_blob <- function(src, dest, overwrite=FALSE,
key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
if(is.null(key) && is.null(sas)) # use curl if downloading public file
{
if(!overwrite && file.exists(dest))
stop("Path exists and overwrite is FALSE", call.=FALSE)
return(curl::curl_download(src, dest))
}
az_blob_container$
new(dirname(src), key=key, sas=sas, api_version=api_version)$
download_blob(basename(src), dest, overwrite)
}
#' @export
upload_azure_blob <- function(src, dest, overwrite=FALSE,
key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
az_blob_container$
new(dirname(dest), key=key, sas=sas, api_version=api_version)$
upload_blob(src, basename(dest), overwrite)
}

Просмотреть файл

@ -1,34 +1,23 @@
#' @export
az_blob_endpoint <- function(endpoint, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
if(!grepl(".blob.", endpoint, fixed=TRUE))
stop("Not a blob storage endpoint", call.=FALSE)
obj <- list(url=endpoint, key=key, sas=sas, api_version=api_version)
class(obj) <- "blob_endpoint"
obj
}
#' @export
az_list_blob_containers <- function(endpoint)
list_blob_containers <- function(endpoint)
{
stopifnot(inherits(endpoint, "blob_endpoint"))
lst <- do_storage_call(endpoint$url, "/", options=list(comp="list"),
key=endpoint$key, sas=endpoint$sas, api_version=endpoint$api_version)
lst <- lapply(lst$Containers, function(cont) az_blob_container(endpoint, cont$Name[[1]]))
lst <- lapply(lst$Containers, function(cont) blob_container(endpoint, cont$Name[[1]]))
named_list(lst)
}
#' @export
az_blob_container <- function(endpoint, name, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
blob_container <- function(endpoint, name, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
if(missing(name) && is_url(endpoint))
{
stor_path <- parse_storage_url(endpoint)
endpoint <- storage_endpoint(stor_path[1], key, sas, api_version)
name <- stor_path[2]
endpoint <- az_blob_endpoint(stor_path[1], key, sas, api_version)
}
obj <- list(name=name, endpoint=endpoint)
@ -38,30 +27,31 @@ az_blob_container <- function(endpoint, name, key=NULL, sas=NULL, api_version=ge
#' @export
az_create_blob_container <- function(endpoint, name, key=NULL, sas=NULL,
create_blob_container <- function(endpoint, name, key=NULL, sas=NULL,
api_version=getOption("azure_storage_api_version"),
public_access=c("none", "blob", "container"))
public_access=c("none", "blob", "container"),
...)
{
if(missing(name) && is_url(endpoint))
{
stor_path <- parse_storage_url(endpoint)
name <- stor_path[2]
endpoint <- az_blob_endpoint(stor_path[1], key, sas, api_version)
endpoint <- storage_endpoint(stor_path[1], key, sas, api_version)
}
public_access <- match.arg(public_access)
headers <- if(public_access != "none")
list("x-ms-blob-public-access"=public_access)
else list()
modifyList(list(...), list("x-ms-blob-public-access"=public_access))
else list(...)
obj <- az_blob_container(endpoint, name)
obj <- blob_container(endpoint, name)
do_container_op(obj, options=list(restype="container"), headers=headers, http_verb="PUT")
obj
}
#' @export
az_delete_blob_container <- function(container, confirm=TRUE)
delete_blob_container <- function(container, confirm=TRUE, lease=NULL)
{
if(confirm && interactive())
{
@ -71,13 +61,15 @@ az_delete_blob_container <- function(container, confirm=TRUE)
if(tolower(substr(yn, 1, 1)) != "y")
return(invisible(NULL))
}
do_container_op(container, options=list(restype="container"), http_verb="DELETE")
headers <- if(!is_empty(lease))
list("x-ms-lease-id"=lease)
else list()
do_container_op(container, options=list(restype="container"), headers=headers, http_verb="DELETE")
}
#' @export
az_list_blobs <- function(container)
list_blobs <- function(container)
{
lst <- do_container_op(container, options=list(comp="list", restype="container"))
if(is_empty(lst$Blobs))
@ -87,8 +79,9 @@ az_list_blobs <- function(container)
#' @export
az_upload_blob <- function(container, src, dest, type="BlockBlob")
upload_azure_blob <- function(container, src, dest, type="BlockBlob")
{
# TODO: upload in chunks
body <- readBin(src, "raw", file.info(src)$size)
hash <- openssl::base64_encode(openssl::md5(body))
@ -103,14 +96,14 @@ az_upload_blob <- function(container, src, dest, type="BlockBlob")
#' @export
az_download_blob <- function(container, src, dest, overwrite=FALSE)
download_azure_blob <- function(container, src, dest, overwrite=FALSE)
{
do_container_op(container, src, config=httr::write_disk(dest, overwrite))
}
#' @export
az_delete_blob <- function(container, blob, confirm=TRUE)
delete_azure_blob <- function(container, blob, confirm=TRUE)
{
if(confirm && interactive())
{
@ -120,7 +113,53 @@ az_delete_blob <- function(container, blob, confirm=TRUE)
if(tolower(substr(yn, 1, 1)) != "y")
return(invisible(NULL))
}
do_container_op(container, blob, http_verb="DELETE")
}
acquire_blob_lease <- function(container, duration=60, lease=NULL)
{
headers <- list("x-ms-lease-action"="acquire", "x-ms-lease-duration"=duration)
if(!is_empty(lease))
headers <- c(headers, list("x-ms-proposed-lease-id"=lease))
res <- do_container_op(container, options=list(comp="lease", restype="container"), headers=headers,
http_verb="PUT", http_status_handler="pass")
httr::stop_for_status(res)
headers(res)[["x-ms-lease-id"]]
}
break_blob_lease <- function(container, period=NULL)
{
headers <- list("x-ms-lease-action"="break")
if(!is_empty(period))
headers=c(headers, list("x-ms-lease-break-period"=period))
do_container_op(container, options=list(comp="lease", restype="container"), headers=headers,
http_verb="PUT")
}
release_blob_lease <- function(container, lease)
{
headers <- list("x-ms-lease-id"=lease, "x-ms-lease-action"="release")
do_container_op(container, options=list(comp="lease", restype="container"), headers=headers,
http_verb="PUT")
}
renew_blob_lease <- function(container, lease)
{
headers <- list("x-ms-lease-id"=lease, "x-ms-lease-action"="renew")
do_container_op(container, options=list(comp="lease", restype="container"), headers=headers,
http_verb="PUT")
}
change_blob_lease <- function(container, lease, new_lease)
{
headers <- list("x-ms-lease-id"=lease, "x-ms-lease-action"="change", "x-ms-proposed-lease-id"=new_lease)
res <- do_container_op(container, options=list(comp="lease", restype="container"), headers=headers,
http_verb="PUT", http_status_handler="pass")
httr::stop_for_status(res)
headers(res)[["x-ms-lease-id"]]
}

64
R/client.R Normal file
Просмотреть файл

@ -0,0 +1,64 @@
#' @export
storage_endpoint <- function(endpoint,
key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"),
type=c("blob", "file", "queue", "table"))
{
if(missing(type)) # determine type of endpoint from url
{
type <- sapply(type, function(x) is_endpoint_url(endpoint, x))
if(!any(type))
stop("Unknown endpoint type", call.=FALSE)
type <- names(type)[type]
}
else
{
type <- match.arg(type)
if(!is_endpoint_url(endpoint, type))
stop("Unknown endpoint type", call.=FALSE)
}
obj <- list(url=endpoint, key=key, sas=sas, api_version=api_version)
class(obj) <- c(paste0(type, "_endpoint"), "storage_endpoint")
obj
}
#' @export
download_from_url <- function(src, dest, ..., overwrite=FALSE)
{
az_path <- parse_storage_url(src)
endpoint <- storage_endpoint(az_path[1], ...)
if(inherits(endpoint, "blob_endpoint"))
{
cont <- blob_container(endpoint, az_path[2])
download_azure_blob(cont, az_path[3], dest, overwrite=overwrite)
}
else if(inherits(endpoint, "file_endpoint"))
{
share <- file_share(endpoint, az_path[2])
download_azure_file(share, az_path[3], dest, overwrite=overwrite)
}
else stop("Unknown storage endpoint", call.=FALSE)
}
#' @export
upload_to_url <- function(src, dest, ...)
{
az_path <- parse_storage_url(dest)
endpoint <- storage_endpoint(az_path[1], ...)
if(inherits(endpoint, "blob_endpoint"))
{
cont <- blob_container(endpoint, az_path[2])
upload_azure_blob(cont, src, az_path[3])
}
else if(inherits(endpoint, "file_endpoint"))
{
share <- file_share(endpoint, az_path[2])
upload_azure_file(share, src, az_path[3])
}
else stop("Unknown storage endpoint", call.=FALSE)
}

Просмотреть файл

@ -1,28 +0,0 @@
az_file_client <- R6::R6Class("az_file_client",
public=list(
initialize=function(...) { },
list_shares=function(...) { },
create_share=function(...) { },
get_share=function(...) { },
delete_share=function(...) { }
))
az_file_share <- R6::R6Class("az_file_share",
public=list(
initialize=function(...) { },
delete=function(...) { },
list_files=function(...) { },
upload_file=function(...) { },
download_file=function(...) { },
delete_file=function(...) { }
))

Просмотреть файл

@ -1,34 +1,23 @@
#' @export
az_file_endpoint <- function(endpoint, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
if(!grepl(".file.", endpoint, fixed=TRUE))
stop("Not a file storage endpoint", call.=FALSE)
obj <- list(url=endpoint, key=key, sas=sas, api_version=api_version)
class(obj) <- "file_endpoint"
obj
}
#' @export
az_list_file_shares <- function(endpoint)
list_file_shares <- function(endpoint)
{
stopifnot(inherits(endpoint, "file_endpoint"))
lst <- do_storage_call(endpoint$url, "/", options=list(comp="list"),
key=endpoint$key, sas=endpoint$sas, api_version=endpoint$api_version)
lst <- lapply(lst$Shares, function(cont) az_file_share(endpoint, cont$Name[[1]]))
lst <- lapply(lst$Shares, function(cont) file_share(endpoint, cont$Name[[1]]))
named_list(lst)
}
#' @export
az_file_share <- function(endpoint, name, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
file_share <- function(endpoint, name, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
{
if(missing(name) && is_url(endpoint))
{
stor_path <- parse_storage_url(endpoint)
endpoint <- storage_endpoint(stor_path[1], key, sas, api_version)
name <- stor_path[2]
endpoint <- az_file_endpoint(stor_path[1], key, sas, api_version)
}
obj <- list(name=name, endpoint=endpoint)
@ -38,23 +27,25 @@ az_file_share <- function(endpoint, name, key=NULL, sas=NULL, api_version=getOpt
#' @export
az_create_file_share <- function(endpoint, name, key=NULL, sas=NULL, api_version=getOption("azure_storage_api_version"))
create_file_share <- function(endpoint, name, key=NULL, sas=NULL,
api_version=getOption("azure_storage_api_version"),
...)
{
if(missing(name) && is_url(endpoint))
{
stor_path <- parse_storage_url(endpoint)
name <- stor_path[2]
endpoint <- az_file_endpoint(stor_path[1], key, sas, api_version)
endpoint <- storage_endpoint(stor_path[1], key, sas, api_version)
}
obj <- az_file_share(endpoint, name)
do_container_op(obj, options=list(restype="share"), http_verb="PUT")
obj <- file_share(endpoint, name)
do_container_op(obj, options=list(restype="share"), headers=list(...), http_verb="PUT")
obj
}
#' @export
az_delete_file_share <- function(share, confirm=TRUE)
delete_file_share <- function(share, confirm=TRUE)
{
if(confirm && interactive())
{
@ -64,13 +55,12 @@ az_delete_file_share <- function(share, confirm=TRUE)
if(tolower(substr(yn, 1, 1)) != "y")
return(invisible(NULL))
}
do_container_op(share, options=list(restype="share"), http_verb="DELETE")
}
#' @export
az_list_files <- function(share, dir)
list_azure_files <- function(share, dir)
{
lst <- do_container_op(share, dir, options=list(comp="list", restype="directory"))
if(is_empty(lst$Entries))
@ -80,8 +70,9 @@ az_list_files <- function(share, dir)
#' @export
az_upload_file <- function(share, src, dest)
upload_azure_file <- function(share, src, dest)
{
# TODO: upload in chunks
body <- readBin(src, "raw", file.info(src)$size)
# first, create the file
@ -103,14 +94,14 @@ az_upload_file <- function(share, src, dest)
#' @export
az_download_file <- function(share, src, dest, overwrite=FALSE)
download_azure_file <- function(share, src, dest, overwrite=FALSE)
{
do_container_op(share, src, config=httr::write_disk(dest, overwrite))
}
#' @export
az_delete_file <- function(share, file, confirm=TRUE)
delete_azure_file <- function(share, file, confirm=TRUE)
{
if(confirm && interactive())
{
@ -120,7 +111,35 @@ az_delete_file <- function(share, file, confirm=TRUE)
if(tolower(substr(yn, 1, 1)) != "y")
return(invisible(NULL))
}
do_container_op(share, file, http_verb="DELETE")
}
#' @export
create_azure_dir <- function(share, dir)
{
do_container_op(share, dir, options=list(restype="directory"), http_verb="PUT")
}
#' @export
delete_azure_dir <- function(share, dir, confirm=TRUE)
{
if(confirm && interactive())
{
endp <- share$endpoint
path <- paste0(endp$url, endp$name, dir, "/")
yn <- readline(paste0("Are you sure you really want to delete directory '", path, "'? (y/N) "))
if(tolower(substr(yn, 1, 1)) != "y")
return(invisible(NULL))
}
do_container_op(share, file, options=list(restype="directory"), http_verb="DELETE")
}
azure_file_info <- function(share, file)
{
}

Просмотреть файл

@ -52,12 +52,12 @@ public=list(
get_blob_endpoint=function(key=self$list_keys()[1])
{
az_blob_endpoint(self$properties$primaryEndpoints$blob, key=key)
storage_endpoint(self$properties$primaryEndpoints$blob, key=key)
},
get_file_endpoint=function(key=self$list_keys()[1])
{
az_file_endpoint(self$properties$primaryEndpoints$file, key=key)
storage_endpoint(self$properties$primaryEndpoints$file, key=key)
}
),

48
R/storage_properties.R Normal file
Просмотреть файл

@ -0,0 +1,48 @@
#' @export
get_storage_properties <- function(object, ...)
{
UseMethod("get_storage_properties")
}
#' @export
get_storage_properties.storage_endpoint <- function(object)
{
do_storage_op(object$url, "", options=list(restype="service", comp="properties"))
}
#' @export
get_storage_properties.blob_share <- function(object)
{
do_container_op(object, options=list(restype="container"))
}
#' @export
get_storage_properties.file_share <- function(object)
{
do_container_op(object, options=list(restype="share"))
}
#' @export
get_azure_blob_properties <- function(container, blob)
{
do_container_op(container, blob, http_verb="HEAD")
}
#' @export
get_azure_file_properties <- function(share, file)
{
do_container_op(share, file, http_verb="HEAD")
}
#' @export
get_azure_dir_properties <- function(share, dir)
{
do_container_op(share, dir, options=list(restype="directory"), http_verb="HEAD")
}

Просмотреть файл

@ -1,45 +1,3 @@
#' @export
azure_download <- function(src, dest, ..., overwrite=FALSE)
{
az_path <- parse_storage_url(src)
if(grepl(".blob.", az_path[1], fixed=TRUE))
{
endpoint <- az_blob_endpoint(az_path[1], ...)
cont <- az_blob_container(endpoint, az_path[2])
az_download_blob(cont, az_path[3], dest)
}
else if(grepl(".file.", az_path[1], fixed=TRUE))
{
endpoint <- az_file_endpoint(az_path[1], ...)
share <- az_file_share(endpoint, az_path[2])
az_download_file(share, az_path[3], dest)
}
else stop("Unknown storage endpoint", call.=FALSE)
}
#' @export
azure_upload <- function(src, dest, ..., overwrite=FALSE)
{
az_path <- parse_storage_url(dest)
if(grepl(".blob.", az_path[1], fixed=TRUE))
{
endpoint <- az_blob_endpoint(az_path[1], ...)
cont <- az_blob_container(endpoint, az_path[2])
az_upload_blob(cont, az_path[3], dest)
}
else if(grepl(".file.", az_path[1], fixed=TRUE))
{
endpoint <- az_file_endpoint(az_path[1], ...)
share <- az_file_share(endpoint, az_path[2])
az_upload_file(share, az_path[3], dest)
}
else stop("Unknown storage endpoint", call.=FALSE)
}
do_container_op <- function(container, path="", options=list(), headers=list(), http_verb="GET", ...)
{
endp <- container$endpoint
@ -50,7 +8,7 @@ do_container_op <- function(container, path="", options=list(), headers=list(),
}
do_storage_call <- function(endpoint, path, options=list(), headers=list(), body=NULL, ...,
do_storage_call <- function(endpoint_url, path, options=list(), headers=list(), body=NULL, ...,
key=NULL, sas=NULL,
api_version=getOption("azure_storage_api_version"),
http_verb=c("GET", "DELETE", "PUT", "POST", "HEAD", "PATCH"),
@ -61,14 +19,14 @@ do_storage_call <- function(endpoint, path, options=list(), headers=list(), body
# use shared access signature if provided, otherwise key if provided, otherwise anonymous access
if(!is.null(sas))
{
url <- paste0(endpoint, path, sep="/") # don't use file.path because it strips trailing / on Windows
url <- paste0(endpoint_url, path, "/") # don't use file.path because it strips trailing / on Windows
url <- paste0(url, "?", sas)
url <- httr::parse_url(url)
headers <- httr::add_headers(.headers=unlist(headers))
}
else
{
url <- httr::parse_url(endpoint)
url <- httr::parse_url(endpoint_url)
url$path <- path
if(!is_empty(options))
url$query <- options[order(names(options))]
@ -155,18 +113,24 @@ make_signature <- function(key, verb, acct_name, resource, options, headers)
# keep only the scheme and host parts of a URL
get_hostroot <- function(url)
{
if(!inherits(url, "url"))
url <- httr::parse_url(url)
url$port <- url$path <- url$params <- url$fragment <- url$query <- url$username <- url$password <- NULL
httr::build_url(url)
parse_storage_url(url)[1]
}
parse_storage_url <- function(url)
{
url <- httr::parse_url(url)
endpoint <- get_hostroot(url)
endpoint <- paste0(url$scheme, "://", url$host, "/")
store <- sub("/.*$", "", url$path)
path <- sub("^[^/]+/", "", url$path)
c(endpoint, store, path)
}
is_endpoint_url <- function(url, type)
{
type <- sprintf("://[a-z0-9]+\\.%s\\.", type)
grepl(type, url)
}