2018-07-05 06:50:04 +03:00
2017-05-23 23:39:39 +03:00
azureApiHeaders <- function ( token ) {
headers <- c ( Host = " management.azure.com" ,
Authorization = token ,
`Content-type` = " application/json" )
httr :: add_headers ( .headers = headers )
}
# convert verbose=TRUE to httr verbose
set_verbosity <- function ( verbose = FALSE ) {
if ( verbose ) httr :: verbose ( TRUE ) else NULL
}
2017-02-11 21:50:54 +03:00
extractUrlArguments <- function ( x ) {
ptn <- " .*\\?(.*?)"
args <- grepl ( " \\?" , x )
z <- if ( args ) gsub ( ptn , " \\1" , x ) else " "
if ( z == " " ) {
" "
} else {
z <- strsplit ( z , " &" ) [ [1 ] ]
z <- sort ( z )
z <- paste ( z , collapse = " \n" )
z <- gsub ( " =" , " :" , z )
paste0 ( " \n" , z )
}
}
callAzureStorageApi <- function ( url , verb = " GET" , storageKey , storageAccount ,
2017-08-28 06:25:13 +03:00
headers = NULL , container = NULL , CMD , size = getContentSize ( content ) , contenttype = NULL ,
2017-02-13 12:06:40 +03:00
content = NULL ,
2017-02-11 21:50:54 +03:00
verbose = FALSE ) {
2017-05-10 22:39:30 +03:00
dateStamp <- httr :: http_date ( Sys.time ( ) )
2017-02-11 21:50:54 +03:00
2017-05-23 23:39:39 +03:00
verbosity <- set_verbosity ( verbose )
2017-02-11 21:50:54 +03:00
if ( missing ( CMD ) || is.null ( CMD ) ) CMD <- extractUrlArguments ( url )
2017-05-20 23:54:06 +03:00
sig <- createAzureStorageSignature ( url = url , verb = verb ,
2017-02-13 12:06:40 +03:00
key = storageKey , storageAccount = storageAccount , container = container ,
headers = headers , CMD = CMD , size = size ,
contenttype = contenttype , dateStamp = dateStamp , verbose = verbose )
2017-02-11 21:50:54 +03:00
2017-05-23 23:39:39 +03:00
azToken <- paste0 ( " SharedKey " , storageAccount , " :" , sig )
2017-02-11 21:50:54 +03:00
2017-02-13 12:06:40 +03:00
switch ( verb ,
2017-05-23 23:39:39 +03:00
" GET" = GET ( url , add_headers ( .headers = c ( Authorization = azToken ,
2017-02-11 21:50:54 +03:00
`Content-Length` = " 0" ,
2017-09-20 11:32:39 +03:00
`x-ms-version` = " 2017-04-17" ,
2017-02-12 13:48:22 +03:00
`x-ms-date` = dateStamp )
2017-02-11 21:50:54 +03:00
) ,
2017-02-13 12:06:40 +03:00
verbosity ) ,
2017-05-23 23:39:39 +03:00
" PUT" = PUT ( url , add_headers ( .headers = c ( Authorization = azToken ,
2017-08-28 06:25:13 +03:00
`Content-Length` = size ,
2017-09-20 11:32:39 +03:00
`x-ms-version` = " 2017-04-17" ,
2017-02-13 12:06:40 +03:00
`x-ms-date` = dateStamp ,
`x-ms-blob-type` = " Blockblob" ,
2017-08-28 06:25:13 +03:00
`Content-type` = contenttype ) ) ,
2017-02-13 12:06:40 +03:00
body = content ,
2017-02-11 21:50:54 +03:00
verbosity )
2017-02-13 12:06:40 +03:00
)
2017-02-11 21:50:54 +03:00
}
2017-08-28 06:25:13 +03:00
getContentSize <- function ( obj ) {
switch ( class ( obj ) ,
" raw" = length ( obj ) ,
" character" = nchar ( obj ) ,
nchar ( obj ) )
}
2017-02-11 21:50:54 +03:00
2017-02-12 13:48:22 +03:00
createAzureStorageSignature <- function ( url , verb ,
key , storageAccount , container = NULL ,
headers = NULL , CMD = NULL , size = NULL , contenttype = NULL , dateStamp , verbose = FALSE ) {
if ( missing ( dateStamp ) ) {
2017-05-10 22:39:30 +03:00
dateStamp <- httr :: http_date ( Sys.time ( ) )
2017-02-11 21:50:54 +03:00
}
2016-12-22 14:23:35 +03:00
2017-02-11 21:50:54 +03:00
arg1 <- if ( length ( headers ) ) {
2017-09-20 11:32:39 +03:00
paste0 ( headers , " \nx-ms-date:" , dateStamp , " \nx-ms-version:2017-04-17" )
2016-12-22 14:23:35 +03:00
} else {
2017-09-20 11:32:39 +03:00
paste0 ( " x-ms-date:" , dateStamp , " \nx-ms-version:2017-04-17" )
2016-12-22 14:23:35 +03:00
}
2017-02-11 21:50:54 +03:00
arg2 <- paste0 ( " /" , storageAccount , " /" , container , CMD )
2016-12-22 14:23:35 +03:00
SIG <- paste0 ( verb , " \n\n\n" , size , " \n\n" , contenttype , " \n\n\n\n\n\n\n" ,
2017-02-11 21:50:54 +03:00
arg1 , " \n" , arg2 )
2016-12-22 14:23:35 +03:00
if ( verbose ) message ( paste0 ( " TRACE: STRINGTOSIGN: " , SIG ) )
base64encode ( hmac ( key = base64decode ( key ) ,
2017-02-11 21:50:54 +03:00
object = iconv ( SIG , " ASCII" , to = " UTF-8" ) ,
2016-12-22 14:23:35 +03:00
algo = " sha256" ,
raw = TRUE )
2017-02-11 21:50:54 +03:00
)
2016-12-22 14:23:35 +03:00
}
2017-05-25 15:14:40 +03:00
x_ms_date <- function ( ) httr :: http_date ( Sys.time ( ) )
azure_storage_header <- function ( shared_key , date = x_ms_date ( ) , content_length = 0 ) {
if ( ! is.character ( shared_key ) ) stop ( " Expecting a character for `shared_key`" )
headers <- c (
Authorization = shared_key ,
`Content-Length` = as.character ( content_length ) ,
2017-09-20 11:32:39 +03:00
`x-ms-version` = " 2017-04-17" ,
2017-05-25 15:14:40 +03:00
`x-ms-date` = date
)
add_headers ( .headers = headers )
}
2017-02-11 21:50:54 +03:00
getSig <- function ( azureActiveContext , url , verb , key , storageAccount ,
headers = NULL , container = NULL , CMD = NULL , size = NULL , contenttype = NULL ,
2017-05-25 15:14:40 +03:00
date = x_ms_date ( ) , verbose = FALSE ) {
2017-02-11 21:50:54 +03:00
arg1 <- if ( length ( headers ) ) {
2017-09-20 11:32:39 +03:00
paste0 ( headers , " \nx-ms-date:" , date , " \nx-ms-version:2017-04-17" )
2017-02-11 21:50:54 +03:00
} else {
2017-09-20 11:32:39 +03:00
paste0 ( " x-ms-date:" , date , " \nx-ms-version:2017-04-17" )
2017-02-11 21:50:54 +03:00
}
arg2 <- paste0 ( " /" , storageAccount , " /" , container , CMD )
SIG <- paste0 ( verb , " \n\n\n" , size , " \n\n" , contenttype , " \n\n\n\n\n\n\n" ,
arg1 , " \n" , arg2 )
if ( verbose ) message ( paste0 ( " TRACE: STRINGTOSIGN: " , SIG ) )
base64encode ( hmac ( key = base64decode ( key ) ,
object = iconv ( SIG , " ASCII" , to = " UTF-8" ) ,
algo = " sha256" ,
raw = TRUE )
)
}
2018-03-23 10:01:31 +03:00
getAzureErrorMessage <- function ( r ) {
2016-12-22 14:58:13 +03:00
msg <- paste0 ( as.character ( sys.call ( 1 ) ) [1 ] , " ()" ) # Name of calling fucntion
2017-05-28 11:32:46 +03:00
addToMsg <- function ( x ) {
if ( ! is.null ( x ) ) x <- strwrap ( x )
if ( is.null ( x ) ) msg else c ( msg , x )
2018-03-23 10:01:31 +03:00
}
2016-12-22 09:58:14 +03:00
if ( inherits ( content ( r ) , " xml_document" ) ) {
rr <- XML :: xmlToList ( XML :: xmlParse ( content ( r ) ) )
2016-12-22 14:58:13 +03:00
msg <- addToMsg ( rr $ Code )
msg <- addToMsg ( rr $ Message )
2017-05-29 18:41:29 +03:00
msg <- addToMsg ( rr $ AuthenticationErrorDetail )
2016-12-22 09:58:14 +03:00
} else {
2016-12-22 14:58:13 +03:00
rr <- content ( r )
2017-02-05 10:49:39 +03:00
msg <- addToMsg ( rr $ code )
msg <- addToMsg ( rr $ message )
2017-05-22 15:39:02 +03:00
msg <- addToMsg ( rr $ error $ message )
2017-09-27 12:04:48 +03:00
msg <- addToMsg ( rr $ Code )
msg <- addToMsg ( rr $ Message )
msg <- addToMsg ( rr $ Error $ Message )
2016-12-22 09:58:14 +03:00
}
2016-12-22 14:58:13 +03:00
msg <- addToMsg ( paste0 ( " Return code: " , status_code ( r ) ) )
2017-05-28 11:32:46 +03:00
msg <- paste ( msg , collapse = " \n" )
2018-03-23 10:01:31 +03:00
return ( msg )
}
stopWithAzureError <- function ( r ) {
if ( status_code ( r ) < 300 ) return ( )
msg <- getAzureErrorMessage ( r )
2016-12-22 14:58:13 +03:00
stop ( msg , call. = FALSE )
2016-12-18 23:06:12 +03:00
}
2016-12-19 20:36:53 +03:00
2016-12-22 09:58:14 +03:00
extractResourceGroupname <- function ( x ) gsub ( " .*?/resourceGroups/(.*?)(/.*)*$" , " \\1" , x )
extractSubscriptionID <- function ( x ) gsub ( " .*?/subscriptions/(.*?)(/.*)*$" , " \\1" , x )
extractStorageAccount <- function ( x ) gsub ( " .*?/storageAccounts/(.*?)(/.*)*$" , " \\1" , x )
2016-12-19 20:36:53 +03:00
2017-05-29 18:41:29 +03:00
2016-12-21 15:26:53 +03:00
refreshStorageKey <- function ( azureActiveContext , storageAccount , resourceGroup ) {
2017-06-06 19:13:13 +03:00
if ( storageAccount != azureActiveContext $ storageAccount ||
length ( azureActiveContext $ storageKey ) == 0
2016-12-19 20:36:53 +03:00
) {
message ( " Fetching Storage Key.." )
2016-12-21 15:26:53 +03:00
azureSAGetKey ( azureActiveContext , resourceGroup = resourceGroup , storageAccount = storageAccount )
2016-12-19 20:36:53 +03:00
} else {
2016-12-20 12:10:43 +03:00
azureActiveContext $ storageKey
2016-12-19 20:36:53 +03:00
}
}
2017-02-12 13:48:22 +03:00
2017-02-13 12:06:40 +03:00
updateAzureActiveContext <- function ( x , storageAccount , storageKey , resourceGroup , container , blob , directory ) {
2017-02-12 13:48:22 +03:00
# updates the active azure context in place
2017-07-22 17:07:15 +03:00
if ( ! is.null ( x ) ) {
assert_that ( is.azureActiveContext ( x ) )
if ( ! missing ( storageAccount ) ) x $ storageAccount <- storageAccount
if ( ! missing ( resourceGroup ) ) x $ resourceGroup <- resourceGroup
if ( ! missing ( storageKey ) ) x $ storageKey <- storageKey
if ( ! missing ( container ) ) x $ container <- container
if ( ! missing ( blob ) ) x $ blob <- blob
if ( ! missing ( directory ) ) x $ directory <- directory
}
2017-02-12 13:48:22 +03:00
TRUE
}
2018-01-29 09:01:37 +03:00
## https://gist.github.com/cbare/5979354
## Version 4 UUIDs have the form xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx
## where x is any hexadecimal digit and y is one of 8, 9, A, or B
## e.g., f47ac10b-58cc-4372-a567-0e02b2c3d479
uuid <- function ( uppercase = FALSE ) {
hex_digits <- c ( as.character ( 0 : 9 ) , letters [1 : 6 ] )
hex_digits <- if ( uppercase ) toupper ( hex_digits ) else hex_digits
y_digits <- hex_digits [9 : 12 ]
paste (
paste0 ( sample ( hex_digits , 8 , replace = TRUE ) , collapse = ' ' ) ,
paste0 ( sample ( hex_digits , 4 , replace = TRUE ) , collapse = ' ' ) ,
paste0 ( ' 4' , paste0 ( sample ( hex_digits , 3 , replace = TRUE ) , collapse = ' ' ) , collapse = ' ' ) ,
paste0 ( sample ( y_digits , 1 ) , paste0 ( sample ( hex_digits , 3 , replace = TRUE ) , collapse = ' ' ) , collapse = ' ' ) ,
paste0 ( sample ( hex_digits , 12 , replace = TRUE ) , collapse = ' ' ) ,
sep = ' -' )
}
2018-07-05 06:50:04 +03:00
## https://stackoverflow.com/questions/40059573/r-get-current-time-in-milliseconds
## R function to get current time in nanoseconds
getCurrentTimeInNanos <- function ( ) {
return ( as.numeric ( Sys.time ( ) ) * 10 ^9 )
}
# ADLS Global variables ----
{
# create a syncFlagEnum object used by the Azure Data Lake Store functions.
syncFlagEnum <- list ( " DATA" , " METADATA" , " CLOSE" , " PIPELINE" )
names ( syncFlagEnum ) <- syncFlagEnum
# create a retryPolicyEnum object used by the Azure Data Lake Store functions.
retryPolicyEnum <- list ( " EXPONENTIALBACKOFF" , " NONIDEMPOTENT" )
names ( retryPolicyEnum ) <- retryPolicyEnum
}
# ADLS Helper Functions ----
getAzureDataLakeSDKVersion <- function ( ) {
2018-07-05 08:20:24 +03:00
return ( " 1.3.0" )
2018-07-05 06:50:04 +03:00
}
getAzureDataLakeSDKUserAgent <- function ( ) {
sysInf <- as.list ( strsplit ( Sys.info ( ) , " \t" ) )
adlsUA <- paste0 ( " ADLSRSDK"
, " -" , getAzureDataLakeSDKVersion ( )
, " /" , sysInf $ sysname , " -" , sysInf $ release
, " -" , sysInf $ version
, " -" , sysInf $ machine
, " /" , R.version $ version.string
)
return ( adlsUA )
}
getAzureDataLakeBasePath <- function ( azureDataLakeAccount ) {
basePath <- paste0 ( " https://" , azureDataLakeAccount , " .azuredatalakestore.net/webhdfs/v1/" )
return ( basePath )
}
getAzureDataLakeApiVersion <- function ( ) {
2018-07-05 08:20:24 +03:00
return ( " &api-version=2018-02-01" )
}
getAzureDataLakeApiVersionForConcat <- function ( ) {
return ( " &api-version=2018-05-01" )
2018-07-05 06:50:04 +03:00
}
getAzureDataLakeDefaultBufferSize <- function ( ) {
return ( as.integer ( 4 * 1024 * 1024 ) )
}
getAzureDataLakeURLEncodedString <- function ( strToEncode ) {
2018-07-05 08:20:24 +03:00
strEncoded <- URLencode ( strToEncode , reserved = TRUE , repeated = TRUE )
return ( strEncoded )
2018-07-05 06:50:04 +03:00
}
# log printer for Azure Data Lake Store
printADLSMessage <- function ( fileName , functionName , message , error = NULL ) {
msg <- paste0 ( Sys.time ( )
, " [" , fileName , " ]"
, " " , functionName
, " : message=" , message
, " , error=" , error
)
print ( msg )
}
2018-07-06 06:30:50 +03:00
# ADLS Ingress - AdlFileOutputStream ----
#' Create an adlFileOutputStream.
#' Create a container (`adlFileOutputStream`) for holding variables used by the Azure Data Lake Store data functions.
#'
#' @inheritParams setAzureContext
#' @param accountName the account name
#' @param relativePath Relative path of a file/directory
#' @param verbose Print tracing information (default FALSE).
#' @return An `adlFileOutputStream` object
#'
#' @family Azure Data Lake Store functions
adls.fileoutputstream.create <- function ( azureActiveContext , accountName , relativePath , verbose = FALSE ) {
azEnv <- new.env ( parent = emptyenv ( ) )
azEnv <- as.adlFileOutputStream ( azEnv )
list2env (
list ( azureActiveContext = " " , accountName = " " , relativePath = " " ) ,
envir = azEnv
)
if ( ! missing ( azureActiveContext ) ) azEnv $ azureActiveContext <- azureActiveContext
if ( ! missing ( accountName ) ) azEnv $ accountName <- accountName
if ( ! missing ( relativePath ) ) azEnv $ relativePath <- relativePath
azEnv $ leaseId <- uuid ( )
azEnv $ blockSize <- getAzureDataLakeDefaultBufferSize ( )
azEnv $ buffer <- raw ( 0 )
# cursors/indices/offsets in R should start from 1 and NOT 0.
# Because of this there are many adjustments that need to be done throughout the code!
azEnv $ cursor <- 1L
res <- adls.file.info ( azureActiveContext , accountName , relativePath , verbose )
azEnv $ remoteCursor <- as.integer ( res $ FileStatus.length ) # this remote cursor starts from 0
azEnv $ streamClosed <- FALSE
azEnv $ lastFlushUpdatedMetadata <- FALSE
# additional param required to implement bad offset handling
azEnv $ numRetries <- 0
return ( azEnv )
}
adls.fileoutputstream.addtobuffer <- function ( adlFileOutputStream , contents , off , len ) {
bufferlen <- getContentSize ( adlFileOutputStream $ buffer )
cursor <- adlFileOutputStream $ cursor
if ( len > bufferlen - ( cursor - 1 ) ) { # if requesting to copy more than remaining space in buffer
stop ( " IllegalArgumentException: invalid buffer copy requested in adls.fileoutputstream.addtobuffer" )
}
# optimized arraycopy
adlFileOutputStream $ buffer [cursor : ( cursor + len - 1 ) ] <- contents [off : ( off + len - 1 ) ]
adlFileOutputStream $ cursor <- as.integer ( cursor + len )
}
adls.fileoutputstream.dozerolengthappend <- function ( adlFileOutputStream , azureDataLakeAccount , relativePath , offset , verbose = FALSE ) {
resHttp <- adls.append.core ( adlFileOutputStream $ azureActiveContext , adlFileOutputStream ,
azureDataLakeAccount , relativePath ,
4194304L , contents = raw ( 0 ) , contentSize = 0L ,
leaseId = adlFileOutputStream $ leaseId , sessionId = adlFileOutputStream $ leaseId ,
syncFlag = syncFlagEnum $ METADATA , offsetToAppendTo = 0 , verbose = verbose )
stopWithAzureError ( resHttp )
# retrun a NULL (void)
return ( TRUE )
}
#' The Core Append API.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param adlFileOutputStream The adlFileOutputStream object to operate with.
#' @param relativePath Relative path of a file.
#' @param bufferSize Size of the buffer to be used.
#' @param contents raw contents to be written to the file.
#' @param contentSize size of `contents` to be written to the file.
#' @param leaseId a String containing the lease ID (generated by client). Can be null.
#' @param sessionId a String containing the session ID (generated by client). Can be null.
#' @param syncFlag
#' Use `DATA` when writing more bytes to same file path. Most performant operation.
#' Use `METADATA` when metadata for the
#' file also needs to be updated especially file length
#' retrieved from `adls.file.info` or `adls.ls` API call.
#' Has an overhead of updating metadata operation.
#' Use `CLOSE` when no more data is
#' expected to be written in this path. Adl backend would
#' update metadata, close the stream handle and
#' release the lease on the
#' path if valid leaseId is passed.
#' Expensive operation and should be used only when last
#' bytes are written.
#' @param offsetToAppendTo offset at which to append to to file.
#' To let the server choose offset, pass `-1`.
#' @param verbose Print tracing information (default FALSE).
#' @return response object
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Append_to_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append-org.apache.hadoop.fs.Path-int-org.apache.hadoop.util.Progressable-}
adls.append.core <- function ( azureActiveContext , adlFileOutputStream = NULL , azureDataLakeAccount , relativePath , bufferSize ,
contents , contentSize = -1L ,
leaseId = NULL , sessionId = NULL , syncFlag = NULL ,
offsetToAppendTo = -1 ,
verbose = FALSE ) {
if ( ! missing ( azureActiveContext ) && ! is.null ( azureActiveContext ) ) {
assert_that ( is.azureActiveContext ( azureActiveContext ) )
azureCheckToken ( azureActiveContext )
}
assert_that ( is_adls_account ( azureDataLakeAccount ) )
assert_that ( is_relativePath ( relativePath ) )
assert_that ( is_bufferSize ( bufferSize ) )
assert_that ( is_content ( contents ) )
assert_that ( is_contentSize ( contentSize ) )
if ( contentSize == -1 ) {
contentSize <- getContentSize ( contents )
}
# allow a zero byte append
URL <- paste0 (
getAzureDataLakeBasePath ( azureDataLakeAccount ) ,
getAzureDataLakeURLEncodedString ( relativePath ) ,
" ?op=APPEND" , " &append=true" ,
getAzureDataLakeApiVersion ( )
)
if ( ! missing ( bufferSize ) && ! is.null ( bufferSize ) ) URL <- paste0 ( URL , " &buffersize=" , bufferSize )
if ( ! is.null ( leaseId ) ) URL <- paste0 ( URL , " &leaseid=" , leaseId )
if ( ! is.null ( sessionId ) ) URL <- paste0 ( URL , " &filesessionid=" , sessionId )
if ( ! is.null ( syncFlag ) ) URL <- paste0 ( URL , " &syncFlag=" , syncFlag )
if ( offsetToAppendTo >= 0 ) URL <- paste0 ( URL , " &offset=" , offsetToAppendTo )
retryPolicy <- createAdlRetryPolicy ( azureActiveContext , verbose = verbose )
resHttp <- callAzureDataLakeApi ( URL , verb = " POST" ,
azureActiveContext = azureActiveContext ,
adlRetryPolicy = retryPolicy ,
content = contents [1 : contentSize ] ,
verbose = verbose )
# update retry count - required for bad offset handling
if ( ! is.null ( adlFileOutputStream ) ) {
adlFileOutputStream $ numRetries <- retryPolicy $ retryCount
}
return ( resHttp )
}
# ADLS Egress - AdlFileInputStream ----
#' Create an adls.fileinputstream.create
#' Create a container (`adlFileInputStream`) for holding variables used by the Azure Data Lake Store data functions.
#'
#' @inheritParams setAzureContext
#' @param accountName the account name
#' @param relativePath Relative path of a file/directory
#' @param verbose Print tracing information (default FALSE).
#' @return An `adlFileOutputStream` object
#'
#' @family Azure Data Lake Store functions
adls.fileinputstream.create <- function ( azureActiveContext , accountName , relativePath , verbose = FALSE ) {
azEnv <- new.env ( parent = emptyenv ( ) )
azEnv <- as.adlFileInputStream ( azEnv )
list2env (
list ( azureActiveContext = " " , accountName = " " , relativePath = " " ) ,
envir = azEnv
)
if ( ! missing ( azureActiveContext ) ) azEnv $ azureActiveContext <- azureActiveContext
if ( ! missing ( accountName ) ) azEnv $ accountName <- accountName
if ( ! missing ( relativePath ) ) azEnv $ relativePath <- relativePath
azEnv $ directoryEntry <- adls.file.info ( azureActiveContext , accountName , relativePath , verbose )
if ( azEnv $ directoryEntry $ FileStatus.type == " DIRECTORY" ) {
msg <- paste0 ( " ADLException: relativePath is not a file: " , relativePath )
stop ( msg )
}
azEnv $ sessionId <- uuid ( )
azEnv $ blockSize <- getAzureDataLakeDefaultBufferSize ( )
azEnv $ buffer <- raw ( 0 )
# cursors/indices/offsets in R should start from 1 and NOT 0.
# Because of this there are many adjustments that need to be done throughout the code!
azEnv $ fCursor <- 0L # cursor of buffer within file - offset of next byte to read from remote server
azEnv $ bCursor <- 1L # cursor of read within buffer - offset of next byte to be returned from buffer
azEnv $ limit <- 1L # offset of next byte to be read into buffer from service (i.e., upper marker+1 of valid bytes in buffer)
azEnv $ streamClosed <- FALSE
return ( azEnv )
}
#' Core function to open and read a file.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file/directory.
#' @param offset Provide the offset to read from.
#' @param length Provide length of data to read.
#' @param bufferSize Size of the buffer to be used. (not honoured).
#' @param verbose Print tracing information (default FALSE).
#' @return raw contents of the file.
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#read-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Offset}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Length}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#open-org.apache.hadoop.fs.Path-int-}
adls.read.core <- function ( azureActiveContext ,
azureDataLakeAccount , relativePath ,
offset , length , bufferSize = 4194304L ,
verbose = FALSE ) {
if ( ! missing ( azureActiveContext ) && ! is.null ( azureActiveContext ) ) {
assert_that ( is.azureActiveContext ( azureActiveContext ) )
azureCheckToken ( azureActiveContext )
}
assert_that ( is_adls_account ( azureDataLakeAccount ) )
assert_that ( is_relativePath ( relativePath ) )
if ( ! missing ( offset ) && ! is.null ( offset ) ) assert_that ( is_offset ( offset ) )
if ( ! missing ( length ) && ! is.null ( length ) ) assert_that ( is_length ( length ) )
if ( ! missing ( bufferSize ) && ! is.null ( bufferSize ) ) assert_that ( is_bufferSize ( bufferSize ) )
URL <- paste0 (
getAzureDataLakeBasePath ( azureDataLakeAccount ) ,
getAzureDataLakeURLEncodedString ( relativePath ) ,
" ?op=OPEN" , " &read=true" ,
getAzureDataLakeApiVersion ( )
)
if ( ! missing ( offset ) && ! is.null ( offset ) ) URL <- paste0 ( URL , " &offset=" , offset )
if ( ! missing ( length ) && ! is.null ( length ) ) URL <- paste0 ( URL , " &length=" , length )
if ( ! missing ( bufferSize ) && ! is.null ( bufferSize ) ) URL <- paste0 ( URL , " &buffersize=" , bufferSize )
retryPolicy <- createAdlRetryPolicy ( azureActiveContext , verbose = verbose )
resHttp <- callAzureDataLakeApi ( URL ,
azureActiveContext = azureActiveContext ,
adlRetryPolicy = retryPolicy ,
verbose = verbose )
return ( resHttp )
}
#' Read from service attempts to read `blocksize` bytes from service.
#' Returns how many bytes are actually read, could be less than blocksize.
#'
#' @param adlFileInputStream the `adlFileInputStream` object to read from
#' @param verbose Print tracing information (default FALSE)
#' @return number of bytes actually read
#'
#' @family Azure Data Lake Store functions
adls.fileinputstream.readfromservice <- function ( adlFileInputStream , verbose = FALSE ) {
if ( adlFileInputStream $ bCursor < adlFileInputStream $ limit ) return ( 0 ) #if there's still unread data in the buffer then dont overwrite it At or past end of file
if ( adlFileInputStream $ fCursor >= adlFileInputStream $ directoryEntry $ FileStatus.length ) return ( -1 )
if ( adlFileInputStream $ directoryEntry $ FileStatus.length <= adlFileInputStream $ blockSize )
return ( adls.fileinputstream.slurpfullfile ( adlFileInputStream ) )
#reset buffer to initial state - i.e., throw away existing data
adlFileInputStream $ bCursor <- 1L
adlFileInputStream $ limit <- 1L
if ( is.null ( adlFileInputStream $ buffer ) ) adlFileInputStream $ buffer <- raw ( getAzureDataLakeDefaultBufferSize ( ) )
resHttp <- adls.read.core ( adlFileInputStream $ azureActiveContext ,
adlFileInputStream $ accountName , adlFileInputStream $ relativePath ,
adlFileInputStream $ fCursor , adlFileInputStream $ blockSize ,
verbose = verbose )
stopWithAzureError ( resHttp )
data <- content ( resHttp , " raw" , encoding = " UTF-8" )
bytesRead <- getContentSize ( data )
adlFileInputStream $ buffer [1 : bytesRead ] <- data [1 : bytesRead ]
adlFileInputStream $ limit <- adlFileInputStream $ limit + bytesRead
adlFileInputStream $ fCursor <- adlFileInputStream $ fCursor + bytesRead
return ( bytesRead )
}
#' Reads the whole file into buffer. Useful when reading small files.
#'
#' @param adlFileInputStream the adlFileInputStream object to read from
#' @param verbose Print tracing information (default FALSE)
#' @return number of bytes actually read
adls.fileinputstream.slurpfullfile <- function ( adlFileInputStream , verbose = FALSE ) {
if ( is.null ( adlFileInputStream $ buffer ) ) {
adlFileInputStream $ blocksize <- adlFileInputStream $ directoryEntry $ FileStatus.length
adlFileInputStream $ buffer <- raw ( adlFileInputStream $ directoryEntry $ FileStatus.length )
}
#reset buffer to initial state - i.e., throw away existing data
adlFileInputStream $ bCursor <- adls.fileinputstream.getpos ( adlFileInputStream ) + 1L # preserve current file offset (may not be 0 if app did a seek before first read)
adlFileInputStream $ limit <- 1L
adlFileInputStream $ fCursor <- 0L # read from beginning
resHttp <- adls.read.core ( adlFileInputStream $ azureActiveContext ,
adlFileInputStream $ accountName , adlFileInputStream $ relativePath ,
adlFileInputStream $ fCursor , adlFileInputStream $ directoryEntry $ FileStatus.length ,
verbose = verbose )
stopWithAzureError ( resHttp )
data <- content ( resHttp , " raw" , encoding = " UTF-8" )
bytesRead <- getContentSize ( data )
adlFileInputStream $ buffer [1 : bytesRead ] <- data [1 : bytesRead ]
adlFileInputStream $ limit <- adlFileInputStream $ limit + bytesRead
adlFileInputStream $ fCursor <- adlFileInputStream $ fCursor + bytesRead
return ( bytesRead )
}
2018-07-05 06:50:04 +03:00
# ADLS Retry Policies ----
2018-07-05 08:20:24 +03:00
#' NOTE: Folowing points on ADLS AdlsRetryPolicy:
#' 1. Not implemented speculative reads hence not implemented `NoRetryPolicy`.
#' 2. Not implemented ExponentialBackoffPolicyforMSI as its not used even in the JDK.
2018-07-05 06:50:04 +03:00
#' Create adlRetryPolicy.
#' Create a adlRetryPolicy (`adlRetryPolicy`) for holding variables used by the Azure Data Lake Store data functions.
#'
#' @inheritParams setAzureContext
#' @param retryPolicyType the type of retryPlociy object to create.
#' @param verbose Print tracing information (default FALSE).
#' @return An `adlRetryPolicy` object
#'
#' @family Azure Data Lake Store functions
#'
#' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/RetryPolicy.java}
createAdlRetryPolicy <- function ( azureActiveContext , retryPolicyType = retryPolicyEnum $ EXPONENTIALBACKOFF , verbose = FALSE ) {
azEnv <- new.env ( parent = emptyenv ( ) )
azEnv <- as.adlRetryPolicy ( azEnv )
list2env (
list ( azureActiveContext = " " ) ,
envir = azEnv
)
if ( ! missing ( azureActiveContext ) ) azEnv $ azureActiveContext <- azureActiveContext
# init the azEnv (adlRetryPolicy) with the right params
azEnv $ retryPolicyType <- retryPolicyType
if ( retryPolicyType == retryPolicyEnum $ EXPONENTIALBACKOFF ) {
return ( createAdlExponentialBackoffRetryPolicy ( azEnv , verbose ) )
} else if ( retryPolicyType == retryPolicyEnum $ NONIDEMPOTENT ) {
return ( createAdlNonIdempotentRetryPolicy ( azEnv , verbose ) )
} else {
printADLSMessage ( " internal.R" , " createAdlRetryPolicy" ,
paste0 ( " UndefinedRetryPolicyTypeError: " , azEnv $ retryPolicyType ) ,
NULL )
return ( NULL )
}
}
#' Create an adlExponentialBackoffRetryPolicy.
#'
#' @param adlRetryPolicy the retrypolicy object to initialize.
#' @param verbose Print tracing information (default FALSE).
#' @return An `adlRetryPolicy` object
#'
#' @family Azure Data Lake Store functions
#'
#' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java}
createAdlExponentialBackoffRetryPolicy <- function ( adlRetryPolicy , verbose = FALSE ) {
adlRetryPolicy $ retryCount <- 0
adlRetryPolicy $ maxRetries <- 4
adlRetryPolicy $ exponentialRetryInterval <- 1000 # in milliseconds
adlRetryPolicy $ exponentialFactor <- 4
adlRetryPolicy $ lastAttemptStartTime <- getCurrentTimeInNanos ( ) # in nanoseconds
return ( adlRetryPolicy )
}
#' Create an adlNonIdempotentRetryPolicy.
#'
#' @param adlRetryPolicy the retrypolicy object to initialize.
#' @param verbose Print tracing information (default FALSE).
#' @return An `adlRetryPolicy` object
#'
#' @family Azure Data Lake Store functions
#'
#' @references \url{https://github.com/Azure/azure-data-lake-store-java/blob/master/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java}
createAdlNonIdempotentRetryPolicy <- function ( adlRetryPolicy , verbose = FALSE ) {
adlRetryPolicy $ retryCount401 <- 0
adlRetryPolicy $ waitInterval <- 100
adlRetryPolicy $ retryCount429 <- 0
adlRetryPolicy $ maxRetries <- 4
adlRetryPolicy $ exponentialRetryInterval <- 1000 # in milliseconds
adlRetryPolicy $ exponentialFactor <- 4
return ( adlRetryPolicy )
}
#' Check if retry should be done based on `adlRetryPolicy`.
#'
#' @param adlRetryPolicy the policy object to chek for retry
#' @param httpResponseCode the account name
#' @param lastException exception that was reported with failure
#' @param verbose Print tracing information (default FALSE)
#' @return TRUE for retry and FALSE otherwise
#'
#' @family Azure Data Lake Store functions
shouldRetry <- function ( adlRetryPolicy ,
httpResponseCode , lastException ,
verbose = FALSE ) {
if ( adlRetryPolicy $ retryPolicyType == retryPolicyEnum $ EXPONENTIALBACKOFF ) {
return (
shouldRetry.adlExponentialBackoffRetryPolicy (
adlRetryPolicy , httpResponseCode , lastException , verbose ) )
} else if ( adlRetryPolicy $ retryPolicyType == retryPolicyEnum $ NONIDEMPOTENT ) {
return (
shouldRetry.adlNonIdempotentRetryPolicy (
adlRetryPolicy , httpResponseCode , lastException , verbose ) )
} else {
printADLSMessage ( " internal.R" , " shouldRetry" ,
paste0 ( " UndefinedRetryPolicyTypeError: " , adlRetryPolicy $ retryPolicyType ) ,
NULL )
return ( NULL )
}
}
#' Check if retry should be done based on `adlRetryPolicy` (adlExponentialBackoffRetryPolicy).
#'
#' @param adlRetryPolicy the policy object to chek for retry
#' @param httpResponseCode the account name
#' @param lastException exception that was reported with failure
#' @param verbose Print tracing information (default FALSE)
#' @return TRUE for retry and FALSE otherwise
#'
#' @family Azure Data Lake Store functions
shouldRetry.adlExponentialBackoffRetryPolicy <- function ( adlRetryPolicy ,
httpResponseCode , lastException ,
verbose = FALSE ) {
if ( missing ( adlRetryPolicy ) || missing ( httpResponseCode ) ) {
return ( FALSE )
}
# Non-retryable error
if ( (
httpResponseCode >= 300 && httpResponseCode < 500 # 3xx and 4xx, except specific ones below
&& httpResponseCode != 408
&& httpResponseCode != 429
&& httpResponseCode != 401
)
|| ( httpResponseCode == 501 ) # Not Implemented
|| ( httpResponseCode == 505 ) # Version Not Supported
) {
return ( FALSE )
}
# Retryable error, retry with exponential backoff
if ( ! is.null ( lastException )
|| httpResponseCode >= 500 # exception or 5xx, + specific ones below
|| httpResponseCode == 408
|| httpResponseCode == 429
|| httpResponseCode == 401 ) {
if ( adlRetryPolicy $ retryCount < adlRetryPolicy $ maxRetries ) {
timeSpentInMillis <- as.integer ( ( getCurrentTimeInNanos ( ) - adlRetryPolicy $ lastAttemptStartTime ) / 1000000 )
wait ( adlRetryPolicy $ exponentialRetryInterval - timeSpentInMillis )
adlRetryPolicy $ exponentialRetryInterval <- ( adlRetryPolicy $ exponentialRetryInterval * adlRetryPolicy $ exponentialFactor )
adlRetryPolicy $ retryCount <- adlRetryPolicy $ retryCount + 1
adlRetryPolicy $ lastAttemptStartTime <- getCurrentTimeInNanos ( )
return ( TRUE )
} else {
return ( FALSE ) # max # of retries exhausted
}
}
# these are not errors - this method should never have been called with this
if ( httpResponseCode >= 100 && httpResponseCode < 300 ) {
return ( FALSE )
}
# Dont know what happened - we should never get here
return ( FALSE )
}
#' Check if retry should be done based on `adlRetryPolicy` (adlNonIdempotentRetryPolicy).
#'
#' @param adlRetryPolicy the policy object to chek for retry
#' @param httpResponseCode the account name
#' @param lastException exception that was reported with failure
#' @param verbose Print tracing information (default FALSE)
#' @return TRUE for retry and FALSE otherwise
#'
#' @family Azure Data Lake Store functions
shouldRetry.adlNonIdempotentRetryPolicy <- function ( adlRetryPolicy ,
httpResponseCode , lastException ,
verbose = FALSE ) {
if ( httpResponseCode == 401 && adlRetryPolicy $ retryCount401 == 0 ) {
# this could be because of call delay. Just retry once, in hope of token being renewed by now
wait ( adlRetryPolicy $ waitInterval )
adlRetryPolicy $ retryCount401 <- ( adlRetryPolicy $ retryCount401 + 1 )
return ( TRUE )
}
if ( httpResponseCode == 429 ) {
# 429 means that the backend did not change any state.
if ( adlRetryPolicy $ retryCount429 < adlRetryPolicy $ maxRetries ) {
wait ( adlRetryPolicy $ exponentialRetryInterval )
adlRetryPolicy $ exponentialRetryInterval <- ( adlRetryPolicy $ exponentialRetryInterval * adlRetryPolicy $ exponentialFactor )
adlRetryPolicy $ retryCount429 <- ( adlRetryPolicy $ retryCount429 + 1 )
return ( TRUE )
} else {
return ( FALSE ) # max # of retries exhausted
}
}
return ( FALSE )
}
wait <- function ( waitTimeInMilliSeconds , verbose = FALSE ) {
if ( waitTimeInMilliSeconds <= 0 ) {
return ( NULL )
}
tryCatch (
{
if ( verbose ) {
printADLSMessage ( " internal.R" , " wait" ,
paste0 ( " going into wait for waitTimeInMilliSeconds=" , waitTimeInMilliSeconds ) ,
NULL )
}
Sys.sleep ( waitTimeInMilliSeconds / 1000 )
} , interrupt = function ( e ) {
if ( verbose ) {
printADLSMessage ( " internal.R" , " wait" , " interrupted while wait during retry" , e )
}
} , error = function ( e ) {
if ( verbose ) {
printADLSMessage ( " internal.R" , " wait" , " error while wait during retry" , e )
}
}
)
return ( NULL )
}
isSuccessfulResponse <- function ( resHttp , op ) {
#if (http_error(resHttp)) return(FALSE)
#if (http_status(resHttp)$category != "Success") return(FALSE)
if ( status_code ( resHttp ) >= 100 && status_code ( resHttp ) < 300 ) return ( TRUE ) # 1xx and 2xx return codes
return ( FALSE ) # anything else
}
# ADLS Rest Calls ----
callAzureDataLakeApi <- function ( url , verb = " GET" , azureActiveContext , adlRetryPolicy = NULL ,
content = raw ( 0 ) , contenttype = NULL , #"application/octet-stream",
verbose = FALSE ) {
resHttp <- NULL
repeat {
resHttp <- callAzureDataLakeRestEndPoint ( url , verb , azureActiveContext ,
content , contenttype ,
verbose )
if ( ! isSuccessfulResponse ( resHttp )
&& shouldRetry ( adlRetryPolicy , status_code ( resHttp ) , NULL ) ) {
if ( verbose ) {
msg <- paste0 ( " retry request: "
, " status=" , http_status ( resHttp ) $ message
, " , url=" , url , " , verb=" , verb
, " , adlsRetryPolicy=" , as.character.adlRetryPolicy ( adlRetryPolicy ) )
printADLSMessage ( " internal.R" , " callAzureDataLakeApi" , msg , NULL )
}
next # continue trying till succeeded or retries exceeded
} else {
break # break on success or all planned retries failed
}
}
return ( resHttp )
}
callAzureDataLakeRestEndPoint <- function ( url , verb = " GET" , azureActiveContext ,
content = raw ( 0 ) , contenttype = NULL , #"application/octet-stream",
verbose = FALSE ) {
verbosity <- set_verbosity ( verbose )
commonHeaders <- c ( Authorization = azureActiveContext $ Token
, `User-Agent` = getAzureDataLakeSDKUserAgent ( )
, `x-ms-client-request-id` = uuid ( )
)
resHttp <- switch ( verb ,
" GET" = GET ( url ,
add_headers ( .headers = c ( commonHeaders
, `Content-Length` = " 0"
)
) ,
verbosity
) ,
" PUT" = PUT ( url ,
add_headers ( .headers = c ( commonHeaders
#, `Transfer-Encoding` = "chunked"
, `Content-Length` = getContentSize ( content )
, `Content-Type` = contenttype
)
) ,
body = content ,
verbosity
) ,
" POST" = POST ( url ,
add_headers ( .headers = c ( commonHeaders
#, `Transfer-Encoding` = "chunked"
, `Content-Length` = getContentSize ( content )
, `Content-Type` = contenttype
)
) ,
body = content ,
verbosity
) ,
" DELETE" = DELETE ( url ,
add_headers ( .headers = c ( commonHeaders
, `Content-Length` = " 0"
)
) ,
verbosity
)
)
# Print the response body in case verbose is enabled.
if ( verbose ) {
resJsonStr <- content ( resHttp , " text" , encoding = " UTF-8" )
printADLSMessage ( " internal.R" , " callAzureDataLakeRestEndPoint" , resJsonStr , NULL )
}
return ( resHttp )
}