AzureSMR/R/AzureSpark.R

502 строки
14 KiB
R

#' Create new Spark Session.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureCreateHDI
#'
#' @family Spark functions
#' @export
azureSparkNewSession <- function(azureActiveContext, clustername, hdiAdmin,
hdiPassword, kind = "spark", verbose = FALSE) {
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
if (missing(kind)) {
KI <- azureActiveContext$kind
} else (KI = kind)
verbosity <- set_verbosity(verbose)
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
if (!length(kind)) {
stop("Error: No Valid kind provided")
}
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
azureActiveContext$kind <- KI
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions", sep = "")
# URL <-
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
# print(URL)
bodyI <- list(kind = KI)
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
if (status_code(r) != "201")
stop(paste("Error Return Code:", status_code(r)))
rl <- content(r, "text", encoding = "UTF-8")
# print(rl)
df <- fromJSON(rl)
# print(df$id)
azureActiveContext$sessionID <- toString(df$id)
return(df$id)
}
#' List Spark Sessions.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @family Spark functions
#' @export
azureSparkListSessions <- function(azureActiveContext, clustername, hdiAdmin,
hdiPassword, verbose = FALSE) {
HA = ""
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
verbosity <- set_verbosity(verbose)
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions", sep = "")
# URL <-
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
# print(URL)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
# ,authenticate('admin', 'Summer2014!')
rl <- content(r, "text", encoding = "UTF-8")
df <- fromJSON(rl)
# print(df) print(df$sessions$appId)
dfn <- as.data.frame(df$sessions$id)
clust <- nrow(dfn)
if (clust == 0)
stop("No Sessions available")
dfn[1:clust, 2] <- df$sessions$appId
dfn[1:clust, 3] <- df$sessions$state
dfn[1:clust, 4] <- df$sessions$proxyUser
dfn[1:clust, 5] <- df$sessions$kind
colnames(dfn) <- c("ID", "appID", "state", "proxyUser", "kind")
return(dfn)
}
#' Stop a Spark Sessions.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @family Spark functions
#' @export
azureSparkStopSession <- function(azureActiveContext, clustername, hdiAdmin,
hdiPassword, sessionID, verbose = FALSE) {
azureCheckToken(azureActiveContext)
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
if (missing(sessionID)) {
SI <- azureActiveContext$sessionID
} else (SI = sessionID)
verbosity <- set_verbosity(verbose)
if (!length(CN)) {
stop("Error: No clustername provided")
}
if (!length(HA)) {
stop("Error: No hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No hdiPassword provided")
}
if (!length(SI)) {
stop("Error: No sessionID provided")
}
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
azureActiveContext$sessionID <- SI
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions/",
SI, sep = "")
# URL <-
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
# print(URL)
r <- DELETE(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
# rl <- content(r,'text',encoding='UTF-8') print(rl)
if (status_code(r) == "404")
stop(paste("sessionID not found (", status_code(r), ")"))
if (status_code(r) != "200")
stop(paste("Error Return Code:", status_code(r)))
return(TRUE)
}
#' Send Spark Statements/comamnds (REPL/Interactive mode).
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @param CMD CMD
#'
#' @family Spark functions
#' @export
azureSparkCMD <- function(azureActiveContext, CMD, clustername, hdiAdmin,
hdiPassword, sessionID, verbose = FALSE) {
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
if (missing(sessionID)) {
SI <- azureActiveContext$sessionID
} else (SI = sessionID)
if (missing(CMD)) {
stop("Error: No CMD provided")
}
verbosity <- set_verbosity(verbose)
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
if (!length(SI)) {
stop("Error: No sessionID provided")
}
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
azureActiveContext$sessionID <- SI
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions/",
SI, "/statements", sep = "")
# URL <-
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
# print(URL) print(typeof(CMD))
bodyI <- list(code = CMD)
# print(CMD)
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
rl <- content(r, "text", encoding = "UTF-8")
rh <- headers(r)
if (status_code(r) == "404")
stop(paste("sessionID not found (", status_code(r), ")"))
if (status_code(r) != "201")
stop(paste("Error Return Code:", status_code(r)))
df <- fromJSON(rl)
# print(df$sessions$appId)
if (df$state == "available") {
RET <- df$output$data
return(toString(RET))
}
DUR <- 2
URL <- paste("https://", CN, ".azurehdinsight.net/livy/", rh$location,
sep = "")
# print(URL)
message(paste("CMD Running: ", Sys.time()))
message("Running(R) Waiting(W) Completed(C)")
while (df$state == "running" || df$state == "waiting") {
Sys.sleep(DUR)
if (DUR < 5)
DUR <- DUR + 1
if (df$state == "running")
message("R",appendLF = FALSE)
if (df$state == "waiting")
message("W",appendLF = FALSE)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP))
rl <- content(r, "text", encoding = "UTF-8")
rh <- headers(r)
df <- fromJSON(rl)
}
message("C",appendLF = FALSE)
message("Finished Running statement: ", Sys.time())
RET <- df$output$data[1]
# rownames(RET) <- 'Return Value'
return(toString(RET))
}
#' Submit Spark Job (Batch mode).
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @param FILE file
#' @param log log
#'
#' @family Spark functions
#' @export
azureSparkJob <- function(azureActiveContext, FILE, clustername, hdiAdmin,
hdiPassword, log = "URL", verbose = FALSE) {
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
if (missing(FILE)) {
stop("Error: No CMD provided")
}
verbosity <- set_verbosity(verbose)
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches", sep = "")
# URL <-
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
# print(URL) print(typeof(CMD))
bodyI <- list(file = FILE)
# print(CMD)
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
rl <- content(r, "text", encoding = "UTF-8")
rh <- headers(r)
if (status_code(r) == "404")
stop(paste("sessionID not found (", status_code(r), ")"))
if (status_code(r) != "201")
stop(paste("Error Return Code:", status_code(r)))
df <- fromJSON(rl)
BI <- df$id
# print(df$sessions$appId)
if (df$state == "available")
return(df$output$data)
DUR <- 2
BI <- df$id
URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches/", BI,
sep = "")
# print(URL)
message(paste("CMD Running: ", Sys.time()))
message("Running(R), Completed(C)")
LOGURL2 <- ""
while (df$state == "running") {
Sys.sleep(DUR)
if (DUR < 5)
DUR <- DUR + 1
message("R")
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
rl <- content(r, "text", encoding = "UTF-8")
rh <- headers(r)
df <- fromJSON(rl)
if (length(df$appInfo$driverlogUrl))
LOGURL2 <- df$appInfo$driverlogUrl
}
message("C")
STATE <- df$state
message("")
message(paste("Finished Running statement: ", Sys.time()))
# BID = gsub('application_','container_',df$appId) print(df$log[2])
# HN<- strsplit(df$log[2], ' ')
print("LOGURL")
print(LOGURL2)
if (log == "URL")
azureActiveContext$log <- LOGURL2 else {
print("LOGURL")
print(LOGURL2)
r <- GET(LOGURL2, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
rl <- content(r, "text", encoding = "UTF-8")
azureActiveContext$log <- rl
}
return(STATE)
}
#' List Spark Jobs (Batch mode).
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @family Spark functions
#' @return manually direct output to blob fule /SQL in script
#' @export
azureSparkListJobs <- function(azureActiveContext, clustername, hdiAdmin,
hdiPassword, verbose = FALSE) {
HA = ""
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
verbosity <- set_verbosity(verbose)
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches", sep = "")
# URL <-
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
# print(URL)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
# ,authenticate('admin', 'Summer2014!')
rl <- content(r, "text", encoding = "UTF-8")
df <- fromJSON(rl)
# print(df$sessions$id)
print(colnames(df))
dfn <- as.data.frame(df$sessions$id)
clust <- nrow(dfn)
if (clust == 0)
stop("No Sessions available")
dfn[1:clust, 2] <- df$sessions$appId
dfn[1:clust, 3] <- df$sessions$state
colnames(dfn) <- c("ID", "appID", "state")
return(dfn)
}
#' Show Spark log Output.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @param URL URL
#'
#' @family Spark
#'
#' @family Spark functions
#' @export
azureSparkShowURL <- function(azureActiveContext, URL) {
if (!missing(URL))
browseURL(URL) else browseURL(azureActiveContext$log)
return(TRUE)
}