874 строки
36 KiB
R
874 строки
36 KiB
R
|
|
azureApiHeaders <- function(token) {
|
|
headers <- c(Host = "management.azure.com",
|
|
Authorization = token,
|
|
`Content-type` = "application/json")
|
|
httr::add_headers(.headers = headers)
|
|
}
|
|
|
|
# convert verbose=TRUE to httr verbose
|
|
set_verbosity <- function(verbose = FALSE) {
|
|
if (verbose) httr::verbose(TRUE) else NULL
|
|
}
|
|
|
|
extractUrlArguments <- function(x) {
|
|
ptn <- ".*\\?(.*?)"
|
|
args <- grepl("\\?", x)
|
|
z <- if (args) gsub(ptn, "\\1", x) else ""
|
|
if (z == "") {
|
|
""
|
|
} else {
|
|
z <- strsplit(z, "&")[[1]]
|
|
z <- sort(z)
|
|
z <- paste(z, collapse = "\n")
|
|
z <- gsub("=", ":", z)
|
|
paste0("\n", z)
|
|
}
|
|
}
|
|
|
|
callAzureStorageApi <- function(url, verb = "GET", storageKey, storageAccount,
|
|
headers = NULL, container = NULL, CMD, size = getContentSize(content), contenttype = NULL,
|
|
content = NULL,
|
|
verbose = FALSE) {
|
|
dateStamp <- httr::http_date(Sys.time())
|
|
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
if (missing(CMD) || is.null(CMD)) CMD <- extractUrlArguments(url)
|
|
|
|
sig <- createAzureStorageSignature(url = url, verb = verb,
|
|
key = storageKey, storageAccount = storageAccount, container = container,
|
|
headers = headers, CMD = CMD, size = size,
|
|
contenttype = contenttype, dateStamp = dateStamp, verbose = verbose)
|
|
|
|
azToken <- paste0("SharedKey ", storageAccount, ":", sig)
|
|
|
|
switch(verb,
|
|
"GET" = GET(url, add_headers(.headers = c(Authorization = azToken,
|
|
`Content-Length` = "0",
|
|
`x-ms-version` = "2017-04-17",
|
|
`x-ms-date` = dateStamp)
|
|
),
|
|
verbosity),
|
|
"PUT" = PUT(url, add_headers(.headers = c(Authorization = azToken,
|
|
`Content-Length` = size,
|
|
`x-ms-version` = "2017-04-17",
|
|
`x-ms-date` = dateStamp,
|
|
`x-ms-blob-type` = "Blockblob",
|
|
`Content-type` = contenttype)),
|
|
body = content,
|
|
verbosity)
|
|
)
|
|
}
|
|
|
|
getContentSize<- function(obj) {
|
|
switch(class(obj),
|
|
"raw" = length(obj),
|
|
"character" = nchar(obj),
|
|
nchar(obj))
|
|
}
|
|
|
|
createAzureStorageSignature <- function(url, verb,
|
|
key, storageAccount, container = NULL,
|
|
headers = NULL, CMD = NULL, size = NULL, contenttype = NULL, dateStamp, verbose = FALSE) {
|
|
|
|
if (missing(dateStamp)) {
|
|
dateStamp <- httr::http_date(Sys.time())
|
|
}
|
|
|
|
arg1 <- if (length(headers)) {
|
|
paste0(headers, "\nx-ms-date:", dateStamp, "\nx-ms-version:2017-04-17")
|
|
} else {
|
|
paste0("x-ms-date:", dateStamp, "\nx-ms-version:2017-04-17")
|
|
}
|
|
|
|
arg2 <- paste0("/", storageAccount, "/", container, CMD)
|
|
|
|
SIG <- paste0(verb, "\n\n\n", size, "\n\n", contenttype, "\n\n\n\n\n\n\n",
|
|
arg1, "\n", arg2)
|
|
if (verbose) message(paste0("TRACE: STRINGTOSIGN: ", SIG))
|
|
base64encode(hmac(key = base64decode(key),
|
|
object = iconv(SIG, "ASCII", to = "UTF-8"),
|
|
algo = "sha256",
|
|
raw = TRUE)
|
|
)
|
|
}
|
|
|
|
x_ms_date <- function() httr::http_date(Sys.time())
|
|
|
|
azure_storage_header <- function(shared_key, date = x_ms_date(), content_length = 0) {
|
|
if(!is.character(shared_key)) stop("Expecting a character for `shared_key`")
|
|
headers <- c(
|
|
Authorization = shared_key,
|
|
`Content-Length` = as.character(content_length),
|
|
`x-ms-version` = "2017-04-17",
|
|
`x-ms-date` = date
|
|
)
|
|
add_headers(.headers = headers)
|
|
}
|
|
|
|
getSig <- function(azureActiveContext, url, verb, key, storageAccount,
|
|
headers = NULL, container = NULL, CMD = NULL, size = NULL, contenttype = NULL,
|
|
date = x_ms_date(), verbose = FALSE) {
|
|
|
|
arg1 <- if (length(headers)) {
|
|
paste0(headers, "\nx-ms-date:", date, "\nx-ms-version:2017-04-17")
|
|
} else {
|
|
paste0("x-ms-date:", date, "\nx-ms-version:2017-04-17")
|
|
}
|
|
|
|
arg2 <- paste0("/", storageAccount, "/", container, CMD)
|
|
|
|
SIG <- paste0(verb, "\n\n\n", size, "\n\n", contenttype, "\n\n\n\n\n\n\n",
|
|
arg1, "\n", arg2)
|
|
if (verbose) message(paste0("TRACE: STRINGTOSIGN: ", SIG))
|
|
base64encode(hmac(key = base64decode(key),
|
|
object = iconv(SIG, "ASCII", to = "UTF-8"),
|
|
algo = "sha256",
|
|
raw = TRUE)
|
|
)
|
|
}
|
|
|
|
getAzureErrorMessage <- function(r) {
|
|
msg <- paste0(as.character(sys.call(1))[1], "()") # Name of calling fucntion
|
|
addToMsg <- function(x) {
|
|
if (!is.null(x)) x <- strwrap(x)
|
|
if(is.null(x)) msg else c(msg, x)
|
|
}
|
|
if(inherits(content(r), "xml_document")){
|
|
rr <- XML::xmlToList(XML::xmlParse(content(r)))
|
|
msg <- addToMsg(rr$Code)
|
|
msg <- addToMsg(rr$Message)
|
|
msg <- addToMsg(rr$AuthenticationErrorDetail)
|
|
} else {
|
|
rr <- content(r)
|
|
msg <- addToMsg(rr$code)
|
|
msg <- addToMsg(rr$message)
|
|
msg <- addToMsg(rr$error$message)
|
|
|
|
msg <- addToMsg(rr$Code)
|
|
msg <- addToMsg(rr$Message)
|
|
msg <- addToMsg(rr$Error$Message)
|
|
|
|
}
|
|
msg <- addToMsg(paste0("Return code: ", status_code(r)))
|
|
msg <- paste(msg, collapse = "\n")
|
|
return(msg)
|
|
}
|
|
|
|
stopWithAzureError <- function(r) {
|
|
if (status_code(r) < 300) return()
|
|
msg <- getAzureErrorMessage(r)
|
|
stop(msg, call. = FALSE)
|
|
}
|
|
|
|
extractResourceGroupname <- function(x) gsub(".*?/resourceGroups/(.*?)(/.*)*$", "\\1", x)
|
|
extractSubscriptionID <- function(x) gsub(".*?/subscriptions/(.*?)(/.*)*$", "\\1", x)
|
|
extractStorageAccount <- function(x) gsub(".*?/storageAccounts/(.*?)(/.*)*$", "\\1", x)
|
|
|
|
|
|
refreshStorageKey <- function(azureActiveContext, storageAccount, resourceGroup){
|
|
if (storageAccount != azureActiveContext$storageAccount ||
|
|
length(azureActiveContext$storageKey) == 0
|
|
) {
|
|
message("Fetching Storage Key..")
|
|
azureSAGetKey(azureActiveContext, resourceGroup = resourceGroup, storageAccount = storageAccount)
|
|
} else {
|
|
azureActiveContext$storageKey
|
|
}
|
|
}
|
|
|
|
|
|
updateAzureActiveContext <- function(x, storageAccount, storageKey, resourceGroup, container, blob, directory) {
|
|
# updates the active azure context in place
|
|
if (!is.null(x)) {
|
|
assert_that(is.azureActiveContext(x))
|
|
if (!missing(storageAccount)) x$storageAccount <- storageAccount
|
|
if (!missing(resourceGroup)) x$resourceGroup <- resourceGroup
|
|
if (!missing(storageKey)) x$storageKey <- storageKey
|
|
if (!missing(container)) x$container <- container
|
|
if (!missing(blob)) x$blob <- blob
|
|
if (!missing(directory)) x$directory <- directory
|
|
}
|
|
TRUE
|
|
}
|
|
|
|
## https://gist.github.com/cbare/5979354
|
|
## Version 4 UUIDs have the form xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx
|
|
## where x is any hexadecimal digit and y is one of 8, 9, A, or B
|
|
## e.g., f47ac10b-58cc-4372-a567-0e02b2c3d479
|
|
uuid <- function(uppercase=FALSE) {
|
|
|
|
hex_digits <- c(as.character(0:9), letters[1:6])
|
|
hex_digits <- if (uppercase) toupper(hex_digits) else hex_digits
|
|
|
|
y_digits <- hex_digits[9:12]
|
|
|
|
paste(
|
|
paste0(sample(hex_digits, 8, replace=TRUE), collapse=''),
|
|
paste0(sample(hex_digits, 4, replace=TRUE), collapse=''),
|
|
paste0('4', paste0(sample(hex_digits, 3, replace=TRUE), collapse=''), collapse=''),
|
|
paste0(sample(y_digits,1), paste0(sample(hex_digits, 3, replace=TRUE), collapse=''), collapse=''),
|
|
paste0(sample(hex_digits, 12, replace=TRUE), collapse=''),
|
|
sep='-')
|
|
}
|
|
|
|
## https://stackoverflow.com/questions/40059573/r-get-current-time-in-milliseconds
|
|
## R function to get current time in nanoseconds
|
|
getCurrentTimeInNanos <- function() {
|
|
return(as.numeric(Sys.time())*10^9)
|
|
}
|
|
|
|
# ADLS Global variables ----
|
|
|
|
{
|
|
# create a syncFlagEnum object used by the Azure Data Lake Store functions.
|
|
syncFlagEnum <- list("DATA", "METADATA", "CLOSE", "PIPELINE")
|
|
names(syncFlagEnum) <- syncFlagEnum
|
|
# create a retryPolicyEnum object used by the Azure Data Lake Store functions.
|
|
retryPolicyEnum <- list("EXPONENTIALBACKOFF", "NONIDEMPOTENT")
|
|
names(retryPolicyEnum) <- retryPolicyEnum
|
|
}
|
|
|
|
# ADLS Helper Functions ----
|
|
|
|
getAzureDataLakeSDKVersion <- function() {
|
|
return("1.3.0")
|
|
}
|
|
|
|
getAzureDataLakeSDKUserAgent <- function() {
|
|
sysInf <- as.list(strsplit(Sys.info(), "\t"))
|
|
adlsUA <- paste0("ADLSRSDK"
|
|
, "-", getAzureDataLakeSDKVersion()
|
|
, "/", sysInf$sysname, "-", sysInf$release
|
|
, "-", sysInf$version
|
|
, "-", sysInf$machine
|
|
, "/", R.version$version.string
|
|
)
|
|
return(adlsUA)
|
|
}
|
|
|
|
getAzureDataLakeBasePath <- function(azureDataLakeAccount) {
|
|
basePath <- paste0("https://", azureDataLakeAccount, ".azuredatalakestore.net/webhdfs/v1/")
|
|
return(basePath)
|
|
}
|
|
|
|
getAzureDataLakeApiVersion <- function() {
|
|
return("&api-version=2018-02-01")
|
|
}
|
|
|
|
getAzureDataLakeApiVersionForConcat <- function() {
|
|
return("&api-version=2018-05-01")
|
|
}
|
|
|
|
getAzureDataLakeDefaultBufferSize <- function() {
|
|
return(as.integer(4 * 1024 * 1024))
|
|
}
|
|
|
|
getAzureDataLakeURLEncodedString <- function(strToEncode) {
|
|
strEncoded <- URLencode(strToEncode, reserved = TRUE, repeated = TRUE)
|
|
return(strEncoded)
|
|
}
|
|
|
|
# log printer for Azure Data Lake Store
|
|
printADLSMessage <- function(fileName, functionName, message, error = NULL) {
|
|
msg <- paste0(Sys.time()
|
|
, " [", fileName, "]"
|
|
, " ", functionName
|
|
, ": message=", message
|
|
, ", error=", error
|
|
)
|
|
print(msg)
|
|
}
|
|
|
|
# ADLS Ingress - AdlFileOutputStream ----
|
|
|
|
#' Create an adlFileOutputStream.
|
|
#' Create a container (`adlFileOutputStream`) for holding variables used by the Azure Data Lake Store data functions.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @param accountName the account name
|
|
#' @param relativePath Relative path of a file/directory
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return An `adlFileOutputStream` object
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
adls.fileoutputstream.create <- function(azureActiveContext, accountName, relativePath, verbose = FALSE) {
|
|
azEnv <- new.env(parent = emptyenv())
|
|
azEnv <- as.adlFileOutputStream(azEnv)
|
|
list2env(
|
|
list(azureActiveContext = "", accountName = "", relativePath = ""),
|
|
envir = azEnv
|
|
)
|
|
if (!missing(azureActiveContext)) azEnv$azureActiveContext <- azureActiveContext
|
|
if (!missing(accountName)) azEnv$accountName <- accountName
|
|
if (!missing(relativePath)) azEnv$relativePath <- relativePath
|
|
azEnv$leaseId <- uuid()
|
|
azEnv$blockSize <- getAzureDataLakeDefaultBufferSize()
|
|
azEnv$buffer <- raw(0)
|
|
# cursors/indices/offsets in R should start from 1 and NOT 0.
|
|
# Because of this there are many adjustments that need to be done throughout the code!
|
|
azEnv$cursor <- 1L
|
|
res <- adls.file.info(azureActiveContext, accountName, relativePath, verbose)
|
|
azEnv$remoteCursor <- as.integer(res$FileStatus.length) # this remote cursor starts from 0
|
|
azEnv$streamClosed <- FALSE
|
|
azEnv$lastFlushUpdatedMetadata <- FALSE
|
|
|
|
# additional param required to implement bad offset handling
|
|
azEnv$numRetries <- 0
|
|
|
|
return(azEnv)
|
|
}
|
|
|
|
adls.fileoutputstream.addtobuffer <- function(adlFileOutputStream, contents, off, len) {
|
|
bufferlen <- getContentSize(adlFileOutputStream$buffer)
|
|
cursor <- adlFileOutputStream$cursor
|
|
if (len > bufferlen - (cursor - 1)) { # if requesting to copy more than remaining space in buffer
|
|
stop("IllegalArgumentException: invalid buffer copy requested in adls.fileoutputstream.addtobuffer")
|
|
}
|
|
# optimized arraycopy
|
|
adlFileOutputStream$buffer[cursor : (cursor + len - 1)] <- contents[off : (off + len - 1)]
|
|
adlFileOutputStream$cursor <- as.integer(cursor + len)
|
|
}
|
|
|
|
adls.fileoutputstream.dozerolengthappend <- function(adlFileOutputStream, azureDataLakeAccount, relativePath, offset, verbose = FALSE) {
|
|
resHttp <- adls.append.core(adlFileOutputStream$azureActiveContext, adlFileOutputStream,
|
|
azureDataLakeAccount, relativePath,
|
|
4194304L, contents = raw(0), contentSize = 0L,
|
|
leaseId = adlFileOutputStream$leaseId, sessionId = adlFileOutputStream$leaseId,
|
|
syncFlag = syncFlagEnum$METADATA, offsetToAppendTo = 0, verbose = verbose)
|
|
stopWithAzureError(resHttp)
|
|
# retrun a NULL (void)
|
|
return(TRUE)
|
|
}
|
|
|
|
#' The Core Append API.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
|
|
#' @param adlFileOutputStream The adlFileOutputStream object to operate with.
|
|
#' @param relativePath Relative path of a file.
|
|
#' @param bufferSize Size of the buffer to be used.
|
|
#' @param contents raw contents to be written to the file.
|
|
#' @param contentSize size of `contents` to be written to the file.
|
|
#' @param leaseId a String containing the lease ID (generated by client). Can be null.
|
|
#' @param sessionId a String containing the session ID (generated by client). Can be null.
|
|
#' @param syncFlag
|
|
#' Use `DATA` when writing more bytes to same file path. Most performant operation.
|
|
#' Use `METADATA` when metadata for the
|
|
#' file also needs to be updated especially file length
|
|
#' retrieved from `adls.file.info` or `adls.ls` API call.
|
|
#' Has an overhead of updating metadata operation.
|
|
#' Use `CLOSE` when no more data is
|
|
#' expected to be written in this path. Adl backend would
|
|
#' update metadata, close the stream handle and
|
|
#' release the lease on the
|
|
#' path if valid leaseId is passed.
|
|
#' Expensive operation and should be used only when last
|
|
#' bytes are written.
|
|
#' @param offsetToAppendTo offset at which to append to to file.
|
|
#' To let the server choose offset, pass `-1`.
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return response object
|
|
#' @details Exceptions - IOException
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
#'
|
|
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Append_to_a_File}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append-org.apache.hadoop.fs.Path-int-org.apache.hadoop.util.Progressable-}
|
|
adls.append.core <- function(azureActiveContext, adlFileOutputStream = NULL, azureDataLakeAccount, relativePath, bufferSize,
|
|
contents, contentSize = -1L,
|
|
leaseId = NULL, sessionId = NULL, syncFlag = NULL,
|
|
offsetToAppendTo = -1,
|
|
verbose = FALSE) {
|
|
if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
azureCheckToken(azureActiveContext)
|
|
}
|
|
assert_that(is_adls_account(azureDataLakeAccount))
|
|
assert_that(is_relativePath(relativePath))
|
|
assert_that(is_bufferSize(bufferSize))
|
|
assert_that(is_content(contents))
|
|
assert_that(is_contentSize(contentSize))
|
|
if (contentSize == -1) {
|
|
contentSize <- getContentSize(contents)
|
|
}
|
|
# allow a zero byte append
|
|
URL <- paste0(
|
|
getAzureDataLakeBasePath(azureDataLakeAccount),
|
|
getAzureDataLakeURLEncodedString(relativePath),
|
|
"?op=APPEND", "&append=true",
|
|
getAzureDataLakeApiVersion()
|
|
)
|
|
if (!missing(bufferSize) && !is.null(bufferSize)) URL <- paste0(URL, "&buffersize=", bufferSize)
|
|
if (!is.null(leaseId)) URL <- paste0(URL, "&leaseid=", leaseId)
|
|
if (!is.null(sessionId)) URL <- paste0(URL, "&filesessionid=", sessionId)
|
|
if (!is.null(syncFlag)) URL <- paste0(URL, "&syncFlag=", syncFlag)
|
|
if (offsetToAppendTo >= 0) URL <- paste0(URL, "&offset=", offsetToAppendTo)
|
|
retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
|
|
resHttp <- callAzureDataLakeApi(URL, verb = "POST",
|
|
azureActiveContext = azureActiveContext,
|
|
adlRetryPolicy = retryPolicy,
|
|
content = contents[1:contentSize],
|
|
verbose = verbose)
|
|
# update retry count - required for bad offset handling
|
|
if (!is.null(adlFileOutputStream)) {
|
|
adlFileOutputStream$numRetries <- retryPolicy$retryCount
|
|
}
|
|
return(resHttp)
|
|
}
|
|
|
|
# ADLS Egress - AdlFileInputStream ----
|
|
|
|
#' Create an adls.fileinputstream.create
|
|
#' Create a container (`adlFileInputStream`) for holding variables used by the Azure Data Lake Store data functions.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @param accountName the account name
|
|
#' @param relativePath Relative path of a file/directory
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return An `adlFileOutputStream` object
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
adls.fileinputstream.create <- function(azureActiveContext, accountName, relativePath, verbose = FALSE) {
|
|
azEnv <- new.env(parent = emptyenv())
|
|
azEnv <- as.adlFileInputStream(azEnv)
|
|
list2env(
|
|
list(azureActiveContext = "", accountName = "", relativePath = ""),
|
|
envir = azEnv
|
|
)
|
|
if (!missing(azureActiveContext)) azEnv$azureActiveContext <- azureActiveContext
|
|
if (!missing(accountName)) azEnv$accountName <- accountName
|
|
if (!missing(relativePath)) azEnv$relativePath <- relativePath
|
|
azEnv$directoryEntry <- adls.file.info(azureActiveContext, accountName, relativePath, verbose)
|
|
if(azEnv$directoryEntry$FileStatus.type == "DIRECTORY") {
|
|
msg <- paste0("ADLException: relativePath is not a file: ", relativePath)
|
|
stop(msg)
|
|
}
|
|
azEnv$sessionId <- uuid()
|
|
azEnv$blockSize <- getAzureDataLakeDefaultBufferSize()
|
|
azEnv$buffer <- raw(0)
|
|
# cursors/indices/offsets in R should start from 1 and NOT 0.
|
|
# Because of this there are many adjustments that need to be done throughout the code!
|
|
azEnv$fCursor <- 0L # cursor of buffer within file - offset of next byte to read from remote server
|
|
azEnv$bCursor <- 1L # cursor of read within buffer - offset of next byte to be returned from buffer
|
|
azEnv$limit <- 1L # offset of next byte to be read into buffer from service (i.e., upper marker+1 of valid bytes in buffer)
|
|
azEnv$streamClosed <- FALSE
|
|
|
|
return(azEnv)
|
|
}
|
|
|
|
#' Core function to open and read a file.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
|
|
#' @param relativePath Relative path of a file/directory.
|
|
#' @param offset Provide the offset to read from.
|
|
#' @param length Provide length of data to read.
|
|
#' @param bufferSize Size of the buffer to be used. (not honoured).
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return raw contents of the file.
|
|
#' @details Exceptions - IOException
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
#'
|
|
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#read-data}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Offset}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Length}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
|
|
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#open-org.apache.hadoop.fs.Path-int-}
|
|
adls.read.core <- function(azureActiveContext,
|
|
azureDataLakeAccount, relativePath,
|
|
offset, length, bufferSize = 4194304L,
|
|
verbose = FALSE) {
|
|
if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
azureCheckToken(azureActiveContext)
|
|
}
|
|
assert_that(is_adls_account(azureDataLakeAccount))
|
|
assert_that(is_relativePath(relativePath))
|
|
if (!missing(offset) && !is.null(offset)) assert_that(is_offset(offset))
|
|
if (!missing(length) && !is.null(length)) assert_that(is_length(length))
|
|
if (!missing(bufferSize) && !is.null(bufferSize)) assert_that(is_bufferSize(bufferSize))
|
|
URL <- paste0(
|
|
getAzureDataLakeBasePath(azureDataLakeAccount),
|
|
getAzureDataLakeURLEncodedString(relativePath),
|
|
"?op=OPEN", "&read=true",
|
|
getAzureDataLakeApiVersion()
|
|
)
|
|
if (!missing(offset) && !is.null(offset)) URL <- paste0(URL, "&offset=", offset)
|
|
if (!missing(length) && !is.null(length)) URL <- paste0(URL, "&length=", length)
|
|
if (!missing(bufferSize) && !is.null(bufferSize)) URL <- paste0(URL, "&buffersize=", bufferSize)
|
|
retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
|
|
resHttp <- callAzureDataLakeApi(URL,
|
|
azureActiveContext = azureActiveContext,
|
|
adlRetryPolicy = retryPolicy,
|
|
verbose = verbose)
|
|
return(resHttp)
|
|
}
|
|
|
|
#' Read from service attempts to read `blocksize` bytes from service.
|
|
#' Returns how many bytes are actually read, could be less than blocksize.
|
|
#'
|
|
#' @param adlFileInputStream the `adlFileInputStream` object to read from
|
|
#' @param verbose Print tracing information (default FALSE)
|
|
#' @return number of bytes actually read
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
adls.fileinputstream.readfromservice <- function(adlFileInputStream, verbose = FALSE) {
|
|
if (adlFileInputStream$bCursor < adlFileInputStream$limit) return(0) #if there's still unread data in the buffer then dont overwrite it At or past end of file
|
|
if (adlFileInputStream$fCursor >= adlFileInputStream$directoryEntry$FileStatus.length) return(-1)
|
|
if (adlFileInputStream$directoryEntry$FileStatus.length <= adlFileInputStream$blockSize)
|
|
return(adls.fileinputstream.slurpfullfile(adlFileInputStream))
|
|
|
|
#reset buffer to initial state - i.e., throw away existing data
|
|
adlFileInputStream$bCursor <- 1L
|
|
adlFileInputStream$limit <- 1L
|
|
if (is.null(adlFileInputStream$buffer)) adlFileInputStream$buffer <- raw(getAzureDataLakeDefaultBufferSize())
|
|
|
|
resHttp <- adls.read.core(adlFileInputStream$azureActiveContext,
|
|
adlFileInputStream$accountName, adlFileInputStream$relativePath,
|
|
adlFileInputStream$fCursor, adlFileInputStream$blockSize,
|
|
verbose = verbose)
|
|
stopWithAzureError(resHttp)
|
|
data <- content(resHttp, "raw", encoding = "UTF-8")
|
|
bytesRead <- getContentSize(data)
|
|
adlFileInputStream$buffer[1:bytesRead] <- data[1:bytesRead]
|
|
adlFileInputStream$limit <- adlFileInputStream$limit + bytesRead
|
|
adlFileInputStream$fCursor <- adlFileInputStream$fCursor + bytesRead
|
|
return(bytesRead)
|
|
}
|
|
|
|
#' Reads the whole file into buffer. Useful when reading small files.
|
|
#'
|
|
#' @param adlFileInputStream the adlFileInputStream object to read from
|
|
#' @param verbose Print tracing information (default FALSE)
|
|
#' @return number of bytes actually read
|
|
adls.fileinputstream.slurpfullfile <- function(adlFileInputStream, verbose = FALSE) {
|
|
if (is.null(adlFileInputStream$buffer)) {
|
|
adlFileInputStream$blocksize <- adlFileInputStream$directoryEntry$FileStatus.length
|
|
adlFileInputStream$buffer <- raw(adlFileInputStream$directoryEntry$FileStatus.length)
|
|
}
|
|
|
|
#reset buffer to initial state - i.e., throw away existing data
|
|
adlFileInputStream$bCursor <- adls.fileinputstream.getpos(adlFileInputStream) + 1L # preserve current file offset (may not be 0 if app did a seek before first read)
|
|
adlFileInputStream$limit <- 1L
|
|
adlFileInputStream$fCursor <- 0L # read from beginning
|
|
|
|
resHttp <- adls.read.core(adlFileInputStream$azureActiveContext,
|
|
adlFileInputStream$accountName, adlFileInputStream$relativePath,
|
|
adlFileInputStream$fCursor, adlFileInputStream$directoryEntry$FileStatus.length,
|
|
verbose = verbose)
|
|
stopWithAzureError(resHttp)
|
|
data <- content(resHttp, "raw", encoding = "UTF-8")
|
|
bytesRead <- getContentSize(data)
|
|
adlFileInputStream$buffer[1:bytesRead] <- data[1:bytesRead]
|
|
adlFileInputStream$limit <- adlFileInputStream$limit + bytesRead
|
|
adlFileInputStream$fCursor <- adlFileInputStream$fCursor + bytesRead
|
|
return(bytesRead)
|
|
}
|
|
|
|
# ADLS Retry Policies ----
|
|
|
|
#' NOTE: Folowing points on ADLS AdlsRetryPolicy:
|
|
#' 1. Not implemented speculative reads hence not implemented `NoRetryPolicy`.
|
|
#' 2. Not implemented ExponentialBackoffPolicyforMSI as its not used even in the JDK.
|
|
|
|
#' Create adlRetryPolicy.
|
|
#' Create a adlRetryPolicy (`adlRetryPolicy`) for holding variables used by the Azure Data Lake Store data functions.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @param retryPolicyType the type of retryPlociy object to create.
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return An `adlRetryPolicy` object
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
#'
|
|
#' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/RetryPolicy.java}
|
|
createAdlRetryPolicy <- function(azureActiveContext, retryPolicyType = retryPolicyEnum$EXPONENTIALBACKOFF, verbose = FALSE) {
|
|
azEnv <- new.env(parent = emptyenv())
|
|
azEnv <- as.adlRetryPolicy(azEnv)
|
|
list2env(
|
|
list(azureActiveContext = ""),
|
|
envir = azEnv
|
|
)
|
|
if (!missing(azureActiveContext)) azEnv$azureActiveContext <- azureActiveContext
|
|
# init the azEnv (adlRetryPolicy) with the right params
|
|
azEnv$retryPolicyType <- retryPolicyType
|
|
if(retryPolicyType == retryPolicyEnum$EXPONENTIALBACKOFF) {
|
|
return(createAdlExponentialBackoffRetryPolicy(azEnv, verbose))
|
|
} else if(retryPolicyType == retryPolicyEnum$NONIDEMPOTENT) {
|
|
return(createAdlNonIdempotentRetryPolicy(azEnv, verbose))
|
|
} else {
|
|
printADLSMessage("internal.R", "createAdlRetryPolicy",
|
|
paste0("UndefinedRetryPolicyTypeError: ", azEnv$retryPolicyType),
|
|
NULL)
|
|
return(NULL)
|
|
}
|
|
}
|
|
|
|
#' Create an adlExponentialBackoffRetryPolicy.
|
|
#'
|
|
#' @param adlRetryPolicy the retrypolicy object to initialize.
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return An `adlRetryPolicy` object
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
#'
|
|
#' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java}
|
|
createAdlExponentialBackoffRetryPolicy <- function(adlRetryPolicy, verbose = FALSE) {
|
|
adlRetryPolicy$retryCount <- 0
|
|
adlRetryPolicy$maxRetries <- 4
|
|
adlRetryPolicy$exponentialRetryInterval <- 1000 # in milliseconds
|
|
adlRetryPolicy$exponentialFactor <- 4
|
|
adlRetryPolicy$lastAttemptStartTime <- getCurrentTimeInNanos() # in nanoseconds
|
|
return(adlRetryPolicy)
|
|
}
|
|
|
|
#' Create an adlNonIdempotentRetryPolicy.
|
|
#'
|
|
#' @param adlRetryPolicy the retrypolicy object to initialize.
|
|
#' @param verbose Print tracing information (default FALSE).
|
|
#' @return An `adlRetryPolicy` object
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
#'
|
|
#' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java}
|
|
createAdlNonIdempotentRetryPolicy <- function(adlRetryPolicy, verbose = FALSE) {
|
|
adlRetryPolicy$retryCount401 <- 0
|
|
adlRetryPolicy$waitInterval <- 100
|
|
adlRetryPolicy$retryCount429 <- 0
|
|
adlRetryPolicy$maxRetries <- 4
|
|
adlRetryPolicy$exponentialRetryInterval <- 1000 # in milliseconds
|
|
adlRetryPolicy$exponentialFactor <- 4
|
|
return(adlRetryPolicy)
|
|
}
|
|
|
|
#' Check if retry should be done based on `adlRetryPolicy`.
|
|
#'
|
|
#' @param adlRetryPolicy the policy object to chek for retry
|
|
#' @param httpResponseCode the account name
|
|
#' @param lastException exception that was reported with failure
|
|
#' @param verbose Print tracing information (default FALSE)
|
|
#' @return TRUE for retry and FALSE otherwise
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
shouldRetry <- function(adlRetryPolicy,
|
|
httpResponseCode, lastException,
|
|
verbose = FALSE) {
|
|
if(adlRetryPolicy$retryPolicyType == retryPolicyEnum$EXPONENTIALBACKOFF) {
|
|
return(
|
|
shouldRetry.adlExponentialBackoffRetryPolicy(
|
|
adlRetryPolicy, httpResponseCode, lastException, verbose))
|
|
} else if(adlRetryPolicy$retryPolicyType == retryPolicyEnum$NONIDEMPOTENT) {
|
|
return(
|
|
shouldRetry.adlNonIdempotentRetryPolicy(
|
|
adlRetryPolicy, httpResponseCode, lastException, verbose))
|
|
} else {
|
|
printADLSMessage("internal.R", "shouldRetry",
|
|
paste0("UndefinedRetryPolicyTypeError: ", adlRetryPolicy$retryPolicyType),
|
|
NULL)
|
|
return(NULL)
|
|
}
|
|
}
|
|
|
|
#' Check if retry should be done based on `adlRetryPolicy` (adlExponentialBackoffRetryPolicy).
|
|
#'
|
|
#' @param adlRetryPolicy the policy object to chek for retry
|
|
#' @param httpResponseCode the account name
|
|
#' @param lastException exception that was reported with failure
|
|
#' @param verbose Print tracing information (default FALSE)
|
|
#' @return TRUE for retry and FALSE otherwise
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
shouldRetry.adlExponentialBackoffRetryPolicy <- function(adlRetryPolicy,
|
|
httpResponseCode, lastException,
|
|
verbose = FALSE) {
|
|
if (missing(adlRetryPolicy) || missing(httpResponseCode)) {
|
|
return(FALSE)
|
|
}
|
|
# Non-retryable error
|
|
if ((
|
|
httpResponseCode >= 300 && httpResponseCode < 500 # 3xx and 4xx, except specific ones below
|
|
&& httpResponseCode != 408
|
|
&& httpResponseCode != 429
|
|
&& httpResponseCode != 401
|
|
)
|
|
|| (httpResponseCode == 501) # Not Implemented
|
|
|| (httpResponseCode == 505) # Version Not Supported
|
|
) {
|
|
return(FALSE)
|
|
}
|
|
# Retryable error, retry with exponential backoff
|
|
if (!is.null(lastException)
|
|
|| httpResponseCode >= 500 # exception or 5xx, + specific ones below
|
|
|| httpResponseCode == 408
|
|
|| httpResponseCode == 429
|
|
|| httpResponseCode == 401) {
|
|
if (adlRetryPolicy$retryCount < adlRetryPolicy$maxRetries) {
|
|
timeSpentInMillis <- as.integer((getCurrentTimeInNanos() - adlRetryPolicy$lastAttemptStartTime) / 1000000)
|
|
wait(adlRetryPolicy$exponentialRetryInterval - timeSpentInMillis)
|
|
adlRetryPolicy$exponentialRetryInterval <- (adlRetryPolicy$exponentialRetryInterval * adlRetryPolicy$exponentialFactor)
|
|
adlRetryPolicy$retryCount <- adlRetryPolicy$retryCount + 1
|
|
adlRetryPolicy$lastAttemptStartTime <- getCurrentTimeInNanos()
|
|
return(TRUE)
|
|
} else {
|
|
return(FALSE) # max # of retries exhausted
|
|
}
|
|
}
|
|
# these are not errors - this method should never have been called with this
|
|
if (httpResponseCode >= 100 && httpResponseCode < 300) {
|
|
return(FALSE)
|
|
}
|
|
# Dont know what happened - we should never get here
|
|
return(FALSE)
|
|
}
|
|
|
|
#' Check if retry should be done based on `adlRetryPolicy` (adlNonIdempotentRetryPolicy).
|
|
#'
|
|
#' @param adlRetryPolicy the policy object to chek for retry
|
|
#' @param httpResponseCode the account name
|
|
#' @param lastException exception that was reported with failure
|
|
#' @param verbose Print tracing information (default FALSE)
|
|
#' @return TRUE for retry and FALSE otherwise
|
|
#'
|
|
#' @family Azure Data Lake Store functions
|
|
shouldRetry.adlNonIdempotentRetryPolicy <- function(adlRetryPolicy,
|
|
httpResponseCode, lastException,
|
|
verbose = FALSE) {
|
|
if (httpResponseCode == 401 && adlRetryPolicy$retryCount401 == 0) {
|
|
# this could be because of call delay. Just retry once, in hope of token being renewed by now
|
|
wait(adlRetryPolicy$waitInterval)
|
|
adlRetryPolicy$retryCount401 <- (adlRetryPolicy$retryCount401 + 1)
|
|
return(TRUE)
|
|
}
|
|
|
|
if (httpResponseCode == 429) {
|
|
# 429 means that the backend did not change any state.
|
|
if (adlRetryPolicy$retryCount429 < adlRetryPolicy$maxRetries) {
|
|
wait(adlRetryPolicy$exponentialRetryInterval)
|
|
adlRetryPolicy$exponentialRetryInterval <- (adlRetryPolicy$exponentialRetryInterval * adlRetryPolicy$exponentialFactor)
|
|
adlRetryPolicy$retryCount429 <- (adlRetryPolicy$retryCount429 + 1)
|
|
return(TRUE)
|
|
} else {
|
|
return(FALSE) # max # of retries exhausted
|
|
}
|
|
}
|
|
|
|
return(FALSE)
|
|
}
|
|
|
|
wait <- function(waitTimeInMilliSeconds, verbose = FALSE) {
|
|
if (waitTimeInMilliSeconds <= 0) {
|
|
return(NULL)
|
|
}
|
|
tryCatch(
|
|
{
|
|
if(verbose) {
|
|
printADLSMessage("internal.R", "wait",
|
|
paste0("going into wait for waitTimeInMilliSeconds=", waitTimeInMilliSeconds),
|
|
NULL)
|
|
}
|
|
Sys.sleep(waitTimeInMilliSeconds/1000)
|
|
}, interrupt = function(e) {
|
|
if (verbose) {
|
|
printADLSMessage("internal.R", "wait", "interrupted while wait during retry", e)
|
|
}
|
|
}, error = function(e) {
|
|
if (verbose) {
|
|
printADLSMessage("internal.R", "wait", "error while wait during retry", e)
|
|
}
|
|
}
|
|
)
|
|
return(NULL)
|
|
}
|
|
|
|
isSuccessfulResponse <- function(resHttp, op) {
|
|
#if (http_error(resHttp)) return(FALSE)
|
|
#if (http_status(resHttp)$category != "Success") return(FALSE)
|
|
if (status_code(resHttp) >= 100 && status_code(resHttp) < 300) return(TRUE) # 1xx and 2xx return codes
|
|
return(FALSE) # anything else
|
|
}
|
|
|
|
# ADLS Rest Calls ----
|
|
|
|
callAzureDataLakeApi <- function(url, verb = "GET", azureActiveContext, adlRetryPolicy = NULL,
|
|
content = raw(0), contenttype = NULL, #"application/octet-stream",
|
|
verbose = FALSE) {
|
|
resHttp <- NULL
|
|
repeat {
|
|
resHttp <- callAzureDataLakeRestEndPoint(url, verb, azureActiveContext,
|
|
content, contenttype,
|
|
verbose)
|
|
if (!isSuccessfulResponse(resHttp)
|
|
&& shouldRetry(adlRetryPolicy, status_code(resHttp), NULL)) {
|
|
if (verbose) {
|
|
msg <- paste0("retry request: "
|
|
, " status=", http_status(resHttp)$message
|
|
, ", url=", url, ", verb=", verb
|
|
, ", adlsRetryPolicy=", as.character.adlRetryPolicy(adlRetryPolicy))
|
|
printADLSMessage("internal.R", "callAzureDataLakeApi", msg, NULL)
|
|
}
|
|
next # continue trying till succeeded or retries exceeded
|
|
} else {
|
|
break # break on success or all planned retries failed
|
|
}
|
|
}
|
|
return(resHttp)
|
|
}
|
|
|
|
callAzureDataLakeRestEndPoint <- function(url, verb = "GET", azureActiveContext,
|
|
content = raw(0), contenttype = NULL, #"application/octet-stream",
|
|
verbose = FALSE) {
|
|
verbosity <- set_verbosity(verbose)
|
|
commonHeaders <- c(Authorization = azureActiveContext$Token
|
|
, `User-Agent` = getAzureDataLakeSDKUserAgent()
|
|
, `x-ms-client-request-id` = uuid()
|
|
)
|
|
resHttp <- switch(verb,
|
|
"GET" = GET(url,
|
|
add_headers(.headers = c(commonHeaders
|
|
, `Content-Length` = "0"
|
|
)
|
|
),
|
|
verbosity
|
|
),
|
|
"PUT" = PUT(url,
|
|
add_headers(.headers = c(commonHeaders
|
|
#, `Transfer-Encoding` = "chunked"
|
|
, `Content-Length` = getContentSize(content)
|
|
, `Content-Type` = contenttype
|
|
)
|
|
),
|
|
body = content,
|
|
verbosity
|
|
),
|
|
"POST" = POST(url,
|
|
add_headers(.headers = c(commonHeaders
|
|
#, `Transfer-Encoding` = "chunked"
|
|
, `Content-Length` = getContentSize(content)
|
|
, `Content-Type` = contenttype
|
|
)
|
|
),
|
|
body = content,
|
|
verbosity
|
|
),
|
|
"DELETE" = DELETE(url,
|
|
add_headers(.headers = c(commonHeaders
|
|
, `Content-Length` = "0"
|
|
)
|
|
),
|
|
verbosity
|
|
)
|
|
)
|
|
# Print the response body in case verbose is enabled.
|
|
if (verbose) {
|
|
resJsonStr <- content(resHttp, "text", encoding = "UTF-8")
|
|
printADLSMessage("internal.R", "callAzureDataLakeRestEndPoint", resJsonStr, NULL)
|
|
}
|
|
return(resHttp)
|
|
}
|