azureApiHeaders <- function(token) { headers <- c(Host = "management.azure.com", Authorization = token, `Content-type` = "application/json") httr::add_headers(.headers = headers) } # convert verbose=TRUE to httr verbose set_verbosity <- function(verbose = FALSE) { if (verbose) httr::verbose(TRUE) else NULL } extractUrlArguments <- function(x) { ptn <- ".*\\?(.*?)" args <- grepl("\\?", x) z <- if (args) gsub(ptn, "\\1", x) else "" if (z == "") { "" } else { z <- strsplit(z, "&")[[1]] z <- sort(z) z <- paste(z, collapse = "\n") z <- gsub("=", ":", z) paste0("\n", z) } } callAzureStorageApi <- function(url, verb = "GET", storageKey, storageAccount, headers = NULL, container = NULL, CMD, size = getContentSize(content), contenttype = NULL, content = NULL, verbose = FALSE) { dateStamp <- httr::http_date(Sys.time()) verbosity <- set_verbosity(verbose) if (missing(CMD) || is.null(CMD)) CMD <- extractUrlArguments(url) sig <- createAzureStorageSignature(url = url, verb = verb, key = storageKey, storageAccount = storageAccount, container = container, headers = headers, CMD = CMD, size = size, contenttype = contenttype, dateStamp = dateStamp, verbose = verbose) azToken <- paste0("SharedKey ", storageAccount, ":", sig) switch(verb, "GET" = GET(url, add_headers(.headers = c(Authorization = azToken, `Content-Length` = "0", `x-ms-version` = "2017-04-17", `x-ms-date` = dateStamp) ), verbosity), "PUT" = PUT(url, add_headers(.headers = c(Authorization = azToken, `Content-Length` = size, `x-ms-version` = "2017-04-17", `x-ms-date` = dateStamp, `x-ms-blob-type` = "Blockblob", `Content-type` = contenttype)), body = content, verbosity) ) } getContentSize<- function(obj) { switch(class(obj), "raw" = length(obj), "character" = nchar(obj), nchar(obj)) } createAzureStorageSignature <- function(url, verb, key, storageAccount, container = NULL, headers = NULL, CMD = NULL, size = NULL, contenttype = NULL, dateStamp, verbose = FALSE) { if (missing(dateStamp)) { dateStamp <- httr::http_date(Sys.time()) } arg1 <- if (length(headers)) { paste0(headers, "\nx-ms-date:", dateStamp, "\nx-ms-version:2017-04-17") } else { paste0("x-ms-date:", dateStamp, "\nx-ms-version:2017-04-17") } arg2 <- paste0("/", storageAccount, "/", container, CMD) SIG <- paste0(verb, "\n\n\n", size, "\n\n", contenttype, "\n\n\n\n\n\n\n", arg1, "\n", arg2) if (verbose) message(paste0("TRACE: STRINGTOSIGN: ", SIG)) base64encode(hmac(key = base64decode(key), object = iconv(SIG, "ASCII", to = "UTF-8"), algo = "sha256", raw = TRUE) ) } x_ms_date <- function() httr::http_date(Sys.time()) azure_storage_header <- function(shared_key, date = x_ms_date(), content_length = 0) { if(!is.character(shared_key)) stop("Expecting a character for `shared_key`") headers <- c( Authorization = shared_key, `Content-Length` = as.character(content_length), `x-ms-version` = "2017-04-17", `x-ms-date` = date ) add_headers(.headers = headers) } getSig <- function(azureActiveContext, url, verb, key, storageAccount, headers = NULL, container = NULL, CMD = NULL, size = NULL, contenttype = NULL, date = x_ms_date(), verbose = FALSE) { arg1 <- if (length(headers)) { paste0(headers, "\nx-ms-date:", date, "\nx-ms-version:2017-04-17") } else { paste0("x-ms-date:", date, "\nx-ms-version:2017-04-17") } arg2 <- paste0("/", storageAccount, "/", container, CMD) SIG <- paste0(verb, "\n\n\n", size, "\n\n", contenttype, "\n\n\n\n\n\n\n", arg1, "\n", arg2) if (verbose) message(paste0("TRACE: STRINGTOSIGN: ", SIG)) base64encode(hmac(key = base64decode(key), object = iconv(SIG, "ASCII", to = "UTF-8"), algo = "sha256", raw = TRUE) ) } getAzureErrorMessage <- function(r) { msg <- paste0(as.character(sys.call(1))[1], "()") # Name of calling fucntion addToMsg <- function(x) { if (!is.null(x)) x <- strwrap(x) if(is.null(x)) msg else c(msg, x) } if(inherits(content(r), "xml_document")){ rr <- XML::xmlToList(XML::xmlParse(content(r))) msg <- addToMsg(rr$Code) msg <- addToMsg(rr$Message) msg <- addToMsg(rr$AuthenticationErrorDetail) } else { rr <- content(r) msg <- addToMsg(rr$code) msg <- addToMsg(rr$message) msg <- addToMsg(rr$error$message) msg <- addToMsg(rr$Code) msg <- addToMsg(rr$Message) msg <- addToMsg(rr$Error$Message) } msg <- addToMsg(paste0("Return code: ", status_code(r))) msg <- paste(msg, collapse = "\n") return(msg) } stopWithAzureError <- function(r) { if (status_code(r) < 300) return() msg <- getAzureErrorMessage(r) stop(msg, call. = FALSE) } extractResourceGroupname <- function(x) gsub(".*?/resourceGroups/(.*?)(/.*)*$", "\\1", x) extractSubscriptionID <- function(x) gsub(".*?/subscriptions/(.*?)(/.*)*$", "\\1", x) extractStorageAccount <- function(x) gsub(".*?/storageAccounts/(.*?)(/.*)*$", "\\1", x) refreshStorageKey <- function(azureActiveContext, storageAccount, resourceGroup){ if (storageAccount != azureActiveContext$storageAccount || length(azureActiveContext$storageKey) == 0 ) { message("Fetching Storage Key..") azureSAGetKey(azureActiveContext, resourceGroup = resourceGroup, storageAccount = storageAccount) } else { azureActiveContext$storageKey } } updateAzureActiveContext <- function(x, storageAccount, storageKey, resourceGroup, container, blob, directory) { # updates the active azure context in place if (!is.null(x)) { assert_that(is.azureActiveContext(x)) if (!missing(storageAccount)) x$storageAccount <- storageAccount if (!missing(resourceGroup)) x$resourceGroup <- resourceGroup if (!missing(storageKey)) x$storageKey <- storageKey if (!missing(container)) x$container <- container if (!missing(blob)) x$blob <- blob if (!missing(directory)) x$directory <- directory } TRUE } ## https://gist.github.com/cbare/5979354 ## Version 4 UUIDs have the form xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx ## where x is any hexadecimal digit and y is one of 8, 9, A, or B ## e.g., f47ac10b-58cc-4372-a567-0e02b2c3d479 uuid <- function(uppercase=FALSE) { hex_digits <- c(as.character(0:9), letters[1:6]) hex_digits <- if (uppercase) toupper(hex_digits) else hex_digits y_digits <- hex_digits[9:12] paste( paste0(sample(hex_digits, 8, replace=TRUE), collapse=''), paste0(sample(hex_digits, 4, replace=TRUE), collapse=''), paste0('4', paste0(sample(hex_digits, 3, replace=TRUE), collapse=''), collapse=''), paste0(sample(y_digits,1), paste0(sample(hex_digits, 3, replace=TRUE), collapse=''), collapse=''), paste0(sample(hex_digits, 12, replace=TRUE), collapse=''), sep='-') } ## https://stackoverflow.com/questions/40059573/r-get-current-time-in-milliseconds ## R function to get current time in nanoseconds getCurrentTimeInNanos <- function() { return(as.numeric(Sys.time())*10^9) } # ADLS Global variables ---- { # create a syncFlagEnum object used by the Azure Data Lake Store functions. syncFlagEnum <- list("DATA", "METADATA", "CLOSE", "PIPELINE") names(syncFlagEnum) <- syncFlagEnum # create a retryPolicyEnum object used by the Azure Data Lake Store functions. retryPolicyEnum <- list("EXPONENTIALBACKOFF", "NONIDEMPOTENT") names(retryPolicyEnum) <- retryPolicyEnum } # ADLS Helper Functions ---- getAzureDataLakeSDKVersion <- function() { return("1.3.0") } getAzureDataLakeSDKUserAgent <- function() { sysInf <- as.list(strsplit(Sys.info(), "\t")) adlsUA <- paste0("ADLSRSDK" , "-", getAzureDataLakeSDKVersion() , "/", sysInf$sysname, "-", sysInf$release , "-", sysInf$version , "-", sysInf$machine , "/", R.version$version.string ) return(adlsUA) } getAzureDataLakeBasePath <- function(azureDataLakeAccount) { basePath <- paste0("https://", azureDataLakeAccount, ".azuredatalakestore.net/webhdfs/v1/") return(basePath) } getAzureDataLakeApiVersion <- function() { return("&api-version=2018-02-01") } getAzureDataLakeApiVersionForConcat <- function() { return("&api-version=2018-05-01") } getAzureDataLakeDefaultBufferSize <- function() { return(as.integer(4 * 1024 * 1024)) } getAzureDataLakeURLEncodedString <- function(strToEncode) { strEncoded <- URLencode(strToEncode, reserved = TRUE, repeated = TRUE) return(strEncoded) } # log printer for Azure Data Lake Store printADLSMessage <- function(fileName, functionName, message, error = NULL) { msg <- paste0(Sys.time() , " [", fileName, "]" , " ", functionName , ": message=", message , ", error=", error ) print(msg) } # ADLS Ingress - AdlFileOutputStream ---- #' Create an adlFileOutputStream. #' Create a container (`adlFileOutputStream`) for holding variables used by the Azure Data Lake Store data functions. #' #' @inheritParams setAzureContext #' @param accountName the account name #' @param relativePath Relative path of a file/directory #' @param verbose Print tracing information (default FALSE). #' @return An `adlFileOutputStream` object #' #' @family Azure Data Lake Store functions adls.fileoutputstream.create <- function(azureActiveContext, accountName, relativePath, verbose = FALSE) { azEnv <- new.env(parent = emptyenv()) azEnv <- as.adlFileOutputStream(azEnv) list2env( list(azureActiveContext = "", accountName = "", relativePath = ""), envir = azEnv ) if (!missing(azureActiveContext)) azEnv$azureActiveContext <- azureActiveContext if (!missing(accountName)) azEnv$accountName <- accountName if (!missing(relativePath)) azEnv$relativePath <- relativePath azEnv$leaseId <- uuid() azEnv$blockSize <- getAzureDataLakeDefaultBufferSize() azEnv$buffer <- raw(0) # cursors/indices/offsets in R should start from 1 and NOT 0. # Because of this there are many adjustments that need to be done throughout the code! azEnv$cursor <- 1L res <- adls.file.info(azureActiveContext, accountName, relativePath, verbose) azEnv$remoteCursor <- as.integer(res$FileStatus.length) # this remote cursor starts from 0 azEnv$streamClosed <- FALSE azEnv$lastFlushUpdatedMetadata <- FALSE # additional param required to implement bad offset handling azEnv$numRetries <- 0 return(azEnv) } adls.fileoutputstream.addtobuffer <- function(adlFileOutputStream, contents, off, len) { bufferlen <- getContentSize(adlFileOutputStream$buffer) cursor <- adlFileOutputStream$cursor if (len > bufferlen - (cursor - 1)) { # if requesting to copy more than remaining space in buffer stop("IllegalArgumentException: invalid buffer copy requested in adls.fileoutputstream.addtobuffer") } # optimized arraycopy adlFileOutputStream$buffer[cursor : (cursor + len - 1)] <- contents[off : (off + len - 1)] adlFileOutputStream$cursor <- as.integer(cursor + len) } adls.fileoutputstream.dozerolengthappend <- function(adlFileOutputStream, azureDataLakeAccount, relativePath, offset, verbose = FALSE) { resHttp <- adls.append.core(adlFileOutputStream$azureActiveContext, adlFileOutputStream, azureDataLakeAccount, relativePath, 4194304L, contents = raw(0), contentSize = 0L, leaseId = adlFileOutputStream$leaseId, sessionId = adlFileOutputStream$leaseId, syncFlag = syncFlagEnum$METADATA, offsetToAppendTo = 0, verbose = verbose) stopWithAzureError(resHttp) # retrun a NULL (void) return(TRUE) } #' The Core Append API. #' #' @inheritParams setAzureContext #' @param azureDataLakeAccount Name of the Azure Data Lake account. #' @param adlFileOutputStream The adlFileOutputStream object to operate with. #' @param relativePath Relative path of a file. #' @param bufferSize Size of the buffer to be used. #' @param contents raw contents to be written to the file. #' @param contentSize size of `contents` to be written to the file. #' @param leaseId a String containing the lease ID (generated by client). Can be null. #' @param sessionId a String containing the session ID (generated by client). Can be null. #' @param syncFlag #' Use `DATA` when writing more bytes to same file path. Most performant operation. #' Use `METADATA` when metadata for the #' file also needs to be updated especially file length #' retrieved from `adls.file.info` or `adls.ls` API call. #' Has an overhead of updating metadata operation. #' Use `CLOSE` when no more data is #' expected to be written in this path. Adl backend would #' update metadata, close the stream handle and #' release the lease on the #' path if valid leaseId is passed. #' Expensive operation and should be used only when last #' bytes are written. #' @param offsetToAppendTo offset at which to append to to file. #' To let the server choose offset, pass `-1`. #' @param verbose Print tracing information (default FALSE). #' @return response object #' @details Exceptions - IOException #' #' @family Azure Data Lake Store functions #' #' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data} #' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Append_to_a_File} #' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size} #' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append-org.apache.hadoop.fs.Path-int-org.apache.hadoop.util.Progressable-} adls.append.core <- function(azureActiveContext, adlFileOutputStream = NULL, azureDataLakeAccount, relativePath, bufferSize, contents, contentSize = -1L, leaseId = NULL, sessionId = NULL, syncFlag = NULL, offsetToAppendTo = -1, verbose = FALSE) { if (!missing(azureActiveContext) && !is.null(azureActiveContext)) { assert_that(is.azureActiveContext(azureActiveContext)) azureCheckToken(azureActiveContext) } assert_that(is_adls_account(azureDataLakeAccount)) assert_that(is_relativePath(relativePath)) assert_that(is_bufferSize(bufferSize)) assert_that(is_content(contents)) assert_that(is_contentSize(contentSize)) if (contentSize == -1) { contentSize <- getContentSize(contents) } # allow a zero byte append URL <- paste0( getAzureDataLakeBasePath(azureDataLakeAccount), getAzureDataLakeURLEncodedString(relativePath), "?op=APPEND", "&append=true", getAzureDataLakeApiVersion() ) if (!missing(bufferSize) && !is.null(bufferSize)) URL <- paste0(URL, "&buffersize=", bufferSize) if (!is.null(leaseId)) URL <- paste0(URL, "&leaseid=", leaseId) if (!is.null(sessionId)) URL <- paste0(URL, "&filesessionid=", sessionId) if (!is.null(syncFlag)) URL <- paste0(URL, "&syncFlag=", syncFlag) if (offsetToAppendTo >= 0) URL <- paste0(URL, "&offset=", offsetToAppendTo) retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose) resHttp <- callAzureDataLakeApi(URL, verb = "POST", azureActiveContext = azureActiveContext, adlRetryPolicy = retryPolicy, content = contents[1:contentSize], verbose = verbose) # update retry count - required for bad offset handling if (!is.null(adlFileOutputStream)) { adlFileOutputStream$numRetries <- retryPolicy$retryCount } return(resHttp) } # ADLS Egress - AdlFileInputStream ---- #' Create an adls.fileinputstream.create #' Create a container (`adlFileInputStream`) for holding variables used by the Azure Data Lake Store data functions. #' #' @inheritParams setAzureContext #' @param accountName the account name #' @param relativePath Relative path of a file/directory #' @param verbose Print tracing information (default FALSE). #' @return An `adlFileOutputStream` object #' #' @family Azure Data Lake Store functions adls.fileinputstream.create <- function(azureActiveContext, accountName, relativePath, verbose = FALSE) { azEnv <- new.env(parent = emptyenv()) azEnv <- as.adlFileInputStream(azEnv) list2env( list(azureActiveContext = "", accountName = "", relativePath = ""), envir = azEnv ) if (!missing(azureActiveContext)) azEnv$azureActiveContext <- azureActiveContext if (!missing(accountName)) azEnv$accountName <- accountName if (!missing(relativePath)) azEnv$relativePath <- relativePath azEnv$directoryEntry <- adls.file.info(azureActiveContext, accountName, relativePath, verbose) if(azEnv$directoryEntry$FileStatus.type == "DIRECTORY") { msg <- paste0("ADLException: relativePath is not a file: ", relativePath) stop(msg) } azEnv$sessionId <- uuid() azEnv$blockSize <- getAzureDataLakeDefaultBufferSize() azEnv$buffer <- raw(0) # cursors/indices/offsets in R should start from 1 and NOT 0. # Because of this there are many adjustments that need to be done throughout the code! azEnv$fCursor <- 0L # cursor of buffer within file - offset of next byte to read from remote server azEnv$bCursor <- 1L # cursor of read within buffer - offset of next byte to be returned from buffer azEnv$limit <- 1L # offset of next byte to be read into buffer from service (i.e., upper marker+1 of valid bytes in buffer) azEnv$streamClosed <- FALSE return(azEnv) } #' Core function to open and read a file. #' #' @inheritParams setAzureContext #' @param azureDataLakeAccount Name of the Azure Data Lake account. #' @param relativePath Relative path of a file/directory. #' @param offset Provide the offset to read from. #' @param length Provide length of data to read. #' @param bufferSize Size of the buffer to be used. (not honoured). #' @param verbose Print tracing information (default FALSE). #' @return raw contents of the file. #' @details Exceptions - IOException #' #' @family Azure Data Lake Store functions #' #' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#read-data} #' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File} #' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Offset} #' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Length} #' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size} #' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#open-org.apache.hadoop.fs.Path-int-} adls.read.core <- function(azureActiveContext, azureDataLakeAccount, relativePath, offset, length, bufferSize = 4194304L, verbose = FALSE) { if (!missing(azureActiveContext) && !is.null(azureActiveContext)) { assert_that(is.azureActiveContext(azureActiveContext)) azureCheckToken(azureActiveContext) } assert_that(is_adls_account(azureDataLakeAccount)) assert_that(is_relativePath(relativePath)) if (!missing(offset) && !is.null(offset)) assert_that(is_offset(offset)) if (!missing(length) && !is.null(length)) assert_that(is_length(length)) if (!missing(bufferSize) && !is.null(bufferSize)) assert_that(is_bufferSize(bufferSize)) URL <- paste0( getAzureDataLakeBasePath(azureDataLakeAccount), getAzureDataLakeURLEncodedString(relativePath), "?op=OPEN", "&read=true", getAzureDataLakeApiVersion() ) if (!missing(offset) && !is.null(offset)) URL <- paste0(URL, "&offset=", offset) if (!missing(length) && !is.null(length)) URL <- paste0(URL, "&length=", length) if (!missing(bufferSize) && !is.null(bufferSize)) URL <- paste0(URL, "&buffersize=", bufferSize) retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose) resHttp <- callAzureDataLakeApi(URL, azureActiveContext = azureActiveContext, adlRetryPolicy = retryPolicy, verbose = verbose) return(resHttp) } #' Read from service attempts to read `blocksize` bytes from service. #' Returns how many bytes are actually read, could be less than blocksize. #' #' @param adlFileInputStream the `adlFileInputStream` object to read from #' @param verbose Print tracing information (default FALSE) #' @return number of bytes actually read #' #' @family Azure Data Lake Store functions adls.fileinputstream.readfromservice <- function(adlFileInputStream, verbose = FALSE) { if (adlFileInputStream$bCursor < adlFileInputStream$limit) return(0) #if there's still unread data in the buffer then dont overwrite it At or past end of file if (adlFileInputStream$fCursor >= adlFileInputStream$directoryEntry$FileStatus.length) return(-1) if (adlFileInputStream$directoryEntry$FileStatus.length <= adlFileInputStream$blockSize) return(adls.fileinputstream.slurpfullfile(adlFileInputStream)) #reset buffer to initial state - i.e., throw away existing data adlFileInputStream$bCursor <- 1L adlFileInputStream$limit <- 1L if (is.null(adlFileInputStream$buffer)) adlFileInputStream$buffer <- raw(getAzureDataLakeDefaultBufferSize()) resHttp <- adls.read.core(adlFileInputStream$azureActiveContext, adlFileInputStream$accountName, adlFileInputStream$relativePath, adlFileInputStream$fCursor, adlFileInputStream$blockSize, verbose = verbose) stopWithAzureError(resHttp) data <- content(resHttp, "raw", encoding = "UTF-8") bytesRead <- getContentSize(data) adlFileInputStream$buffer[1:bytesRead] <- data[1:bytesRead] adlFileInputStream$limit <- adlFileInputStream$limit + bytesRead adlFileInputStream$fCursor <- adlFileInputStream$fCursor + bytesRead return(bytesRead) } #' Reads the whole file into buffer. Useful when reading small files. #' #' @param adlFileInputStream the adlFileInputStream object to read from #' @param verbose Print tracing information (default FALSE) #' @return number of bytes actually read adls.fileinputstream.slurpfullfile <- function(adlFileInputStream, verbose = FALSE) { if (is.null(adlFileInputStream$buffer)) { adlFileInputStream$blocksize <- adlFileInputStream$directoryEntry$FileStatus.length adlFileInputStream$buffer <- raw(adlFileInputStream$directoryEntry$FileStatus.length) } #reset buffer to initial state - i.e., throw away existing data adlFileInputStream$bCursor <- adls.fileinputstream.getpos(adlFileInputStream) + 1L # preserve current file offset (may not be 0 if app did a seek before first read) adlFileInputStream$limit <- 1L adlFileInputStream$fCursor <- 0L # read from beginning resHttp <- adls.read.core(adlFileInputStream$azureActiveContext, adlFileInputStream$accountName, adlFileInputStream$relativePath, adlFileInputStream$fCursor, adlFileInputStream$directoryEntry$FileStatus.length, verbose = verbose) stopWithAzureError(resHttp) data <- content(resHttp, "raw", encoding = "UTF-8") bytesRead <- getContentSize(data) adlFileInputStream$buffer[1:bytesRead] <- data[1:bytesRead] adlFileInputStream$limit <- adlFileInputStream$limit + bytesRead adlFileInputStream$fCursor <- adlFileInputStream$fCursor + bytesRead return(bytesRead) } # ADLS Retry Policies ---- #' NOTE: Folowing points on ADLS AdlsRetryPolicy: #' 1. Not implemented speculative reads hence not implemented `NoRetryPolicy`. #' 2. Not implemented ExponentialBackoffPolicyforMSI as its not used even in the JDK. #' Create adlRetryPolicy. #' Create a adlRetryPolicy (`adlRetryPolicy`) for holding variables used by the Azure Data Lake Store data functions. #' #' @inheritParams setAzureContext #' @param retryPolicyType the type of retryPlociy object to create. #' @param verbose Print tracing information (default FALSE). #' @return An `adlRetryPolicy` object #' #' @family Azure Data Lake Store functions #' #' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/RetryPolicy.java} createAdlRetryPolicy <- function(azureActiveContext, retryPolicyType = retryPolicyEnum$EXPONENTIALBACKOFF, verbose = FALSE) { azEnv <- new.env(parent = emptyenv()) azEnv <- as.adlRetryPolicy(azEnv) list2env( list(azureActiveContext = ""), envir = azEnv ) if (!missing(azureActiveContext)) azEnv$azureActiveContext <- azureActiveContext # init the azEnv (adlRetryPolicy) with the right params azEnv$retryPolicyType <- retryPolicyType if(retryPolicyType == retryPolicyEnum$EXPONENTIALBACKOFF) { return(createAdlExponentialBackoffRetryPolicy(azEnv, verbose)) } else if(retryPolicyType == retryPolicyEnum$NONIDEMPOTENT) { return(createAdlNonIdempotentRetryPolicy(azEnv, verbose)) } else { printADLSMessage("internal.R", "createAdlRetryPolicy", paste0("UndefinedRetryPolicyTypeError: ", azEnv$retryPolicyType), NULL) return(NULL) } } #' Create an adlExponentialBackoffRetryPolicy. #' #' @param adlRetryPolicy the retrypolicy object to initialize. #' @param verbose Print tracing information (default FALSE). #' @return An `adlRetryPolicy` object #' #' @family Azure Data Lake Store functions #' #' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java} createAdlExponentialBackoffRetryPolicy <- function(adlRetryPolicy, verbose = FALSE) { adlRetryPolicy$retryCount <- 0 adlRetryPolicy$maxRetries <- 4 adlRetryPolicy$exponentialRetryInterval <- 1000 # in milliseconds adlRetryPolicy$exponentialFactor <- 4 adlRetryPolicy$lastAttemptStartTime <- getCurrentTimeInNanos() # in nanoseconds return(adlRetryPolicy) } #' Create an adlNonIdempotentRetryPolicy. #' #' @param adlRetryPolicy the retrypolicy object to initialize. #' @param verbose Print tracing information (default FALSE). #' @return An `adlRetryPolicy` object #' #' @family Azure Data Lake Store functions #' #' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java} createAdlNonIdempotentRetryPolicy <- function(adlRetryPolicy, verbose = FALSE) { adlRetryPolicy$retryCount401 <- 0 adlRetryPolicy$waitInterval <- 100 adlRetryPolicy$retryCount429 <- 0 adlRetryPolicy$maxRetries <- 4 adlRetryPolicy$exponentialRetryInterval <- 1000 # in milliseconds adlRetryPolicy$exponentialFactor <- 4 return(adlRetryPolicy) } #' Check if retry should be done based on `adlRetryPolicy`. #' #' @param adlRetryPolicy the policy object to chek for retry #' @param httpResponseCode the account name #' @param lastException exception that was reported with failure #' @param verbose Print tracing information (default FALSE) #' @return TRUE for retry and FALSE otherwise #' #' @family Azure Data Lake Store functions shouldRetry <- function(adlRetryPolicy, httpResponseCode, lastException, verbose = FALSE) { if(adlRetryPolicy$retryPolicyType == retryPolicyEnum$EXPONENTIALBACKOFF) { return( shouldRetry.adlExponentialBackoffRetryPolicy( adlRetryPolicy, httpResponseCode, lastException, verbose)) } else if(adlRetryPolicy$retryPolicyType == retryPolicyEnum$NONIDEMPOTENT) { return( shouldRetry.adlNonIdempotentRetryPolicy( adlRetryPolicy, httpResponseCode, lastException, verbose)) } else { printADLSMessage("internal.R", "shouldRetry", paste0("UndefinedRetryPolicyTypeError: ", adlRetryPolicy$retryPolicyType), NULL) return(NULL) } } #' Check if retry should be done based on `adlRetryPolicy` (adlExponentialBackoffRetryPolicy). #' #' @param adlRetryPolicy the policy object to chek for retry #' @param httpResponseCode the account name #' @param lastException exception that was reported with failure #' @param verbose Print tracing information (default FALSE) #' @return TRUE for retry and FALSE otherwise #' #' @family Azure Data Lake Store functions shouldRetry.adlExponentialBackoffRetryPolicy <- function(adlRetryPolicy, httpResponseCode, lastException, verbose = FALSE) { if (missing(adlRetryPolicy) || missing(httpResponseCode)) { return(FALSE) } # Non-retryable error if (( httpResponseCode >= 300 && httpResponseCode < 500 # 3xx and 4xx, except specific ones below && httpResponseCode != 408 && httpResponseCode != 429 && httpResponseCode != 401 ) || (httpResponseCode == 501) # Not Implemented || (httpResponseCode == 505) # Version Not Supported ) { return(FALSE) } # Retryable error, retry with exponential backoff if (!is.null(lastException) || httpResponseCode >= 500 # exception or 5xx, + specific ones below || httpResponseCode == 408 || httpResponseCode == 429 || httpResponseCode == 401) { if (adlRetryPolicy$retryCount < adlRetryPolicy$maxRetries) { timeSpentInMillis <- as.integer((getCurrentTimeInNanos() - adlRetryPolicy$lastAttemptStartTime) / 1000000) wait(adlRetryPolicy$exponentialRetryInterval - timeSpentInMillis) adlRetryPolicy$exponentialRetryInterval <- (adlRetryPolicy$exponentialRetryInterval * adlRetryPolicy$exponentialFactor) adlRetryPolicy$retryCount <- adlRetryPolicy$retryCount + 1 adlRetryPolicy$lastAttemptStartTime <- getCurrentTimeInNanos() return(TRUE) } else { return(FALSE) # max # of retries exhausted } } # these are not errors - this method should never have been called with this if (httpResponseCode >= 100 && httpResponseCode < 300) { return(FALSE) } # Dont know what happened - we should never get here return(FALSE) } #' Check if retry should be done based on `adlRetryPolicy` (adlNonIdempotentRetryPolicy). #' #' @param adlRetryPolicy the policy object to chek for retry #' @param httpResponseCode the account name #' @param lastException exception that was reported with failure #' @param verbose Print tracing information (default FALSE) #' @return TRUE for retry and FALSE otherwise #' #' @family Azure Data Lake Store functions shouldRetry.adlNonIdempotentRetryPolicy <- function(adlRetryPolicy, httpResponseCode, lastException, verbose = FALSE) { if (httpResponseCode == 401 && adlRetryPolicy$retryCount401 == 0) { # this could be because of call delay. Just retry once, in hope of token being renewed by now wait(adlRetryPolicy$waitInterval) adlRetryPolicy$retryCount401 <- (adlRetryPolicy$retryCount401 + 1) return(TRUE) } if (httpResponseCode == 429) { # 429 means that the backend did not change any state. if (adlRetryPolicy$retryCount429 < adlRetryPolicy$maxRetries) { wait(adlRetryPolicy$exponentialRetryInterval) adlRetryPolicy$exponentialRetryInterval <- (adlRetryPolicy$exponentialRetryInterval * adlRetryPolicy$exponentialFactor) adlRetryPolicy$retryCount429 <- (adlRetryPolicy$retryCount429 + 1) return(TRUE) } else { return(FALSE) # max # of retries exhausted } } return(FALSE) } wait <- function(waitTimeInMilliSeconds, verbose = FALSE) { if (waitTimeInMilliSeconds <= 0) { return(NULL) } tryCatch( { if(verbose) { printADLSMessage("internal.R", "wait", paste0("going into wait for waitTimeInMilliSeconds=", waitTimeInMilliSeconds), NULL) } Sys.sleep(waitTimeInMilliSeconds/1000) }, interrupt = function(e) { if (verbose) { printADLSMessage("internal.R", "wait", "interrupted while wait during retry", e) } }, error = function(e) { if (verbose) { printADLSMessage("internal.R", "wait", "error while wait during retry", e) } } ) return(NULL) } isSuccessfulResponse <- function(resHttp, op) { #if (http_error(resHttp)) return(FALSE) #if (http_status(resHttp)$category != "Success") return(FALSE) if (status_code(resHttp) >= 100 && status_code(resHttp) < 300) return(TRUE) # 1xx and 2xx return codes return(FALSE) # anything else } # ADLS Rest Calls ---- callAzureDataLakeApi <- function(url, verb = "GET", azureActiveContext, adlRetryPolicy = NULL, content = raw(0), contenttype = NULL, #"application/octet-stream", verbose = FALSE) { resHttp <- NULL repeat { resHttp <- callAzureDataLakeRestEndPoint(url, verb, azureActiveContext, content, contenttype, verbose) if (!isSuccessfulResponse(resHttp) && shouldRetry(adlRetryPolicy, status_code(resHttp), NULL)) { if (verbose) { msg <- paste0("retry request: " , " status=", http_status(resHttp)$message , ", url=", url, ", verb=", verb , ", adlsRetryPolicy=", as.character.adlRetryPolicy(adlRetryPolicy)) printADLSMessage("internal.R", "callAzureDataLakeApi", msg, NULL) } next # continue trying till succeeded or retries exceeded } else { break # break on success or all planned retries failed } } return(resHttp) } callAzureDataLakeRestEndPoint <- function(url, verb = "GET", azureActiveContext, content = raw(0), contenttype = NULL, #"application/octet-stream", verbose = FALSE) { verbosity <- set_verbosity(verbose) commonHeaders <- c(Authorization = azureActiveContext$Token , `User-Agent` = getAzureDataLakeSDKUserAgent() , `x-ms-client-request-id` = uuid() ) resHttp <- switch(verb, "GET" = GET(url, add_headers(.headers = c(commonHeaders , `Content-Length` = "0" ) ), verbosity ), "PUT" = PUT(url, add_headers(.headers = c(commonHeaders #, `Transfer-Encoding` = "chunked" , `Content-Length` = getContentSize(content) , `Content-Type` = contenttype ) ), body = content, verbosity ), "POST" = POST(url, add_headers(.headers = c(commonHeaders #, `Transfer-Encoding` = "chunked" , `Content-Length` = getContentSize(content) , `Content-Type` = contenttype ) ), body = content, verbosity ), "DELETE" = DELETE(url, add_headers(.headers = c(commonHeaders , `Content-Length` = "0" ) ), verbosity ) ) # Print the response body in case verbose is enabled. if (verbose) { resJsonStr <- content(resHttp, "text", encoding = "UTF-8") printADLSMessage("internal.R", "callAzureDataLakeRestEndPoint", resJsonStr, NULL) } return(resHttp) }