502 строки
14 KiB
R
502 строки
14 KiB
R
#' Create new Spark Session.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureCreateHDI
|
|
#'
|
|
#' @family Spark functions
|
|
#' @export
|
|
azureSparkNewSession <- function(azureActiveContext, clustername, hdiAdmin,
|
|
hdiPassword, kind = "spark", verbose = FALSE) {
|
|
|
|
if (missing(clustername)) {
|
|
CN <- azureActiveContext$clustername
|
|
} else (CN = clustername)
|
|
if (missing(hdiAdmin)) {
|
|
HA <- azureActiveContext$hdiAdmin
|
|
} else (HA = hdiAdmin)
|
|
if (missing(hdiPassword)) {
|
|
HP <- azureActiveContext$hdiPassword
|
|
} else (HP = hdiPassword)
|
|
if (missing(kind)) {
|
|
KI <- azureActiveContext$kind
|
|
} else (KI = kind)
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
|
|
if (!length(CN)) {
|
|
stop("Error: No Valid clustername provided")
|
|
}
|
|
if (!length(HA)) {
|
|
stop("Error: No Valid hdiAdmin provided")
|
|
}
|
|
if (!length(HP)) {
|
|
stop("Error: No Valid hdiPassword provided")
|
|
}
|
|
if (!length(kind)) {
|
|
stop("Error: No Valid kind provided")
|
|
}
|
|
|
|
azureActiveContext$hdiAdmin <- HA
|
|
azureActiveContext$hdiPassword <- HP
|
|
azureActiveContext$clustername <- CN
|
|
azureActiveContext$kind <- KI
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions", sep = "")
|
|
|
|
# URL <-
|
|
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
|
|
|
|
# print(URL)
|
|
|
|
bodyI <- list(kind = KI)
|
|
|
|
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
|
|
|
|
if (status_code(r) != "201")
|
|
stop(paste("Error Return Code:", status_code(r)))
|
|
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
# print(rl)
|
|
df <- fromJSON(rl)
|
|
# print(df$id)
|
|
azureActiveContext$sessionID <- toString(df$id)
|
|
return(df$id)
|
|
}
|
|
|
|
|
|
#' List Spark Sessions.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureSparkNewSession
|
|
#'
|
|
#' @family Spark functions
|
|
#' @export
|
|
azureSparkListSessions <- function(azureActiveContext, clustername, hdiAdmin,
|
|
hdiPassword, verbose = FALSE) {
|
|
HA = ""
|
|
if (missing(clustername)) {
|
|
CN <- azureActiveContext$clustername
|
|
} else (CN = clustername)
|
|
if (missing(hdiAdmin)) {
|
|
HA <- azureActiveContext$hdiAdmin
|
|
} else (HA = hdiAdmin)
|
|
if (missing(hdiPassword)) {
|
|
HP <- azureActiveContext$hdiPassword
|
|
} else (HP = hdiPassword)
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
|
|
if (!length(CN)) {
|
|
stop("Error: No Valid clustername provided")
|
|
}
|
|
if (!length(HA)) {
|
|
stop("Error: No Valid hdiAdmin provided")
|
|
}
|
|
if (!length(HP)) {
|
|
stop("Error: No Valid hdiPassword provided")
|
|
}
|
|
|
|
azureActiveContext$hdiAdmin <- HA
|
|
azureActiveContext$hdiPassword <- HP
|
|
azureActiveContext$clustername <- CN
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions", sep = "")
|
|
|
|
# URL <-
|
|
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
|
|
|
|
# print(URL)
|
|
|
|
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), verbosity)
|
|
# ,authenticate('admin', 'Summer2014!')
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
df <- fromJSON(rl)
|
|
# print(df) print(df$sessions$appId)
|
|
dfn <- as.data.frame(df$sessions$id)
|
|
clust <- nrow(dfn)
|
|
if (clust == 0)
|
|
stop("No Sessions available")
|
|
dfn[1:clust, 2] <- df$sessions$appId
|
|
dfn[1:clust, 3] <- df$sessions$state
|
|
dfn[1:clust, 4] <- df$sessions$proxyUser
|
|
dfn[1:clust, 5] <- df$sessions$kind
|
|
colnames(dfn) <- c("ID", "appID", "state", "proxyUser", "kind")
|
|
|
|
return(dfn)
|
|
}
|
|
|
|
|
|
#' Stop a Spark Sessions.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureSparkNewSession
|
|
#'
|
|
#' @family Spark functions
|
|
#' @export
|
|
azureSparkStopSession <- function(azureActiveContext, clustername, hdiAdmin,
|
|
hdiPassword, sessionID, verbose = FALSE) {
|
|
|
|
azureCheckToken(azureActiveContext)
|
|
|
|
if (missing(clustername)) {
|
|
CN <- azureActiveContext$clustername
|
|
} else (CN = clustername)
|
|
if (missing(hdiAdmin)) {
|
|
HA <- azureActiveContext$hdiAdmin
|
|
} else (HA = hdiAdmin)
|
|
if (missing(hdiPassword)) {
|
|
HP <- azureActiveContext$hdiPassword
|
|
} else (HP = hdiPassword)
|
|
if (missing(sessionID)) {
|
|
SI <- azureActiveContext$sessionID
|
|
} else (SI = sessionID)
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
|
|
if (!length(CN)) {
|
|
stop("Error: No clustername provided")
|
|
}
|
|
if (!length(HA)) {
|
|
stop("Error: No hdiAdmin provided")
|
|
}
|
|
if (!length(HP)) {
|
|
stop("Error: No hdiPassword provided")
|
|
}
|
|
if (!length(SI)) {
|
|
stop("Error: No sessionID provided")
|
|
}
|
|
|
|
azureActiveContext$hdiAdmin <- HA
|
|
azureActiveContext$hdiPassword <- HP
|
|
azureActiveContext$clustername <- CN
|
|
azureActiveContext$sessionID <- SI
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions/",
|
|
SI, sep = "")
|
|
|
|
# URL <-
|
|
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
|
|
|
|
# print(URL)
|
|
|
|
r <- DELETE(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), verbosity)
|
|
|
|
# rl <- content(r,'text',encoding='UTF-8') print(rl)
|
|
if (status_code(r) == "404")
|
|
stop(paste("sessionID not found (", status_code(r), ")"))
|
|
|
|
if (status_code(r) != "200")
|
|
stop(paste("Error Return Code:", status_code(r)))
|
|
return(TRUE)
|
|
}
|
|
|
|
|
|
#' Send Spark Statements/comamnds (REPL/Interactive mode).
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureSparkNewSession
|
|
#'
|
|
#' @param CMD CMD
|
|
#'
|
|
#' @family Spark functions
|
|
#' @export
|
|
azureSparkCMD <- function(azureActiveContext, CMD, clustername, hdiAdmin,
|
|
hdiPassword, sessionID, verbose = FALSE) {
|
|
|
|
if (missing(clustername)) {
|
|
CN <- azureActiveContext$clustername
|
|
} else (CN = clustername)
|
|
if (missing(hdiAdmin)) {
|
|
HA <- azureActiveContext$hdiAdmin
|
|
} else (HA = hdiAdmin)
|
|
if (missing(hdiPassword)) {
|
|
HP <- azureActiveContext$hdiPassword
|
|
} else (HP = hdiPassword)
|
|
if (missing(sessionID)) {
|
|
SI <- azureActiveContext$sessionID
|
|
} else (SI = sessionID)
|
|
if (missing(CMD)) {
|
|
stop("Error: No CMD provided")
|
|
}
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
|
|
if (!length(CN)) {
|
|
stop("Error: No Valid clustername provided")
|
|
}
|
|
if (!length(HA)) {
|
|
stop("Error: No Valid hdiAdmin provided")
|
|
}
|
|
if (!length(HP)) {
|
|
stop("Error: No Valid hdiPassword provided")
|
|
}
|
|
if (!length(SI)) {
|
|
stop("Error: No sessionID provided")
|
|
}
|
|
|
|
azureActiveContext$hdiAdmin <- HA
|
|
azureActiveContext$hdiPassword <- HP
|
|
azureActiveContext$clustername <- CN
|
|
azureActiveContext$sessionID <- SI
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions/",
|
|
SI, "/statements", sep = "")
|
|
|
|
# URL <-
|
|
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
|
|
|
|
# print(URL) print(typeof(CMD))
|
|
bodyI <- list(code = CMD)
|
|
# print(CMD)
|
|
|
|
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
|
|
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
rh <- headers(r)
|
|
|
|
if (status_code(r) == "404")
|
|
stop(paste("sessionID not found (", status_code(r), ")"))
|
|
|
|
if (status_code(r) != "201")
|
|
stop(paste("Error Return Code:", status_code(r)))
|
|
|
|
df <- fromJSON(rl)
|
|
# print(df$sessions$appId)
|
|
if (df$state == "available") {
|
|
RET <- df$output$data
|
|
return(toString(RET))
|
|
}
|
|
DUR <- 2
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/", rh$location,
|
|
sep = "")
|
|
# print(URL)
|
|
message(paste("CMD Running: ", Sys.time()))
|
|
message("Running(R) Waiting(W) Completed(C)")
|
|
|
|
while (df$state == "running" || df$state == "waiting") {
|
|
Sys.sleep(DUR)
|
|
if (DUR < 5)
|
|
DUR <- DUR + 1
|
|
if (df$state == "running")
|
|
message("R",appendLF = FALSE)
|
|
if (df$state == "waiting")
|
|
message("W",appendLF = FALSE)
|
|
|
|
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP))
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
rh <- headers(r)
|
|
df <- fromJSON(rl)
|
|
|
|
}
|
|
message("C",appendLF = FALSE)
|
|
message("Finished Running statement: ", Sys.time())
|
|
RET <- df$output$data[1]
|
|
# rownames(RET) <- 'Return Value'
|
|
return(toString(RET))
|
|
|
|
}
|
|
|
|
|
|
#' Submit Spark Job (Batch mode).
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureSparkNewSession
|
|
#'
|
|
#' @param FILE file
|
|
#' @param log log
|
|
#'
|
|
#' @family Spark functions
|
|
#' @export
|
|
azureSparkJob <- function(azureActiveContext, FILE, clustername, hdiAdmin,
|
|
hdiPassword, log = "URL", verbose = FALSE) {
|
|
|
|
if (missing(clustername)) {
|
|
CN <- azureActiveContext$clustername
|
|
} else (CN = clustername)
|
|
if (missing(hdiAdmin)) {
|
|
HA <- azureActiveContext$hdiAdmin
|
|
} else (HA = hdiAdmin)
|
|
if (missing(hdiPassword)) {
|
|
HP <- azureActiveContext$hdiPassword
|
|
} else (HP = hdiPassword)
|
|
if (missing(FILE)) {
|
|
stop("Error: No CMD provided")
|
|
}
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
|
|
if (!length(CN)) {
|
|
stop("Error: No Valid clustername provided")
|
|
}
|
|
if (!length(HA)) {
|
|
stop("Error: No Valid hdiAdmin provided")
|
|
}
|
|
if (!length(HP)) {
|
|
stop("Error: No Valid hdiPassword provided")
|
|
}
|
|
|
|
azureActiveContext$hdiAdmin <- HA
|
|
azureActiveContext$hdiPassword <- HP
|
|
azureActiveContext$clustername <- CN
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches", sep = "")
|
|
|
|
# URL <-
|
|
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
|
|
|
|
# print(URL) print(typeof(CMD))
|
|
bodyI <- list(file = FILE)
|
|
# print(CMD)
|
|
|
|
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
|
|
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
rh <- headers(r)
|
|
|
|
if (status_code(r) == "404")
|
|
stop(paste("sessionID not found (", status_code(r), ")"))
|
|
|
|
if (status_code(r) != "201")
|
|
stop(paste("Error Return Code:", status_code(r)))
|
|
|
|
df <- fromJSON(rl)
|
|
BI <- df$id
|
|
# print(df$sessions$appId)
|
|
if (df$state == "available")
|
|
return(df$output$data)
|
|
DUR <- 2
|
|
BI <- df$id
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches/", BI,
|
|
sep = "")
|
|
# print(URL)
|
|
message(paste("CMD Running: ", Sys.time()))
|
|
message("Running(R), Completed(C)")
|
|
LOGURL2 <- ""
|
|
|
|
while (df$state == "running") {
|
|
Sys.sleep(DUR)
|
|
if (DUR < 5)
|
|
DUR <- DUR + 1
|
|
message("R")
|
|
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), verbosity)
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
rh <- headers(r)
|
|
df <- fromJSON(rl)
|
|
if (length(df$appInfo$driverlogUrl))
|
|
LOGURL2 <- df$appInfo$driverlogUrl
|
|
}
|
|
message("C")
|
|
STATE <- df$state
|
|
message("")
|
|
message(paste("Finished Running statement: ", Sys.time()))
|
|
|
|
# BID = gsub('application_','container_',df$appId) print(df$log[2])
|
|
# HN<- strsplit(df$log[2], ' ')
|
|
print("LOGURL")
|
|
print(LOGURL2)
|
|
|
|
if (log == "URL")
|
|
azureActiveContext$log <- LOGURL2 else {
|
|
print("LOGURL")
|
|
print(LOGURL2)
|
|
|
|
r <- GET(LOGURL2, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), verbosity)
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
azureActiveContext$log <- rl
|
|
}
|
|
return(STATE)
|
|
}
|
|
|
|
|
|
#' List Spark Jobs (Batch mode).
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureSparkNewSession
|
|
#'
|
|
#' @family Spark functions
|
|
#' @return manually direct output to blob fule /SQL in script
|
|
#' @export
|
|
azureSparkListJobs <- function(azureActiveContext, clustername, hdiAdmin,
|
|
hdiPassword, verbose = FALSE) {
|
|
HA = ""
|
|
if (missing(clustername)) {
|
|
CN <- azureActiveContext$clustername
|
|
} else (CN = clustername)
|
|
if (missing(hdiAdmin)) {
|
|
HA <- azureActiveContext$hdiAdmin
|
|
} else (HA = hdiAdmin)
|
|
if (missing(hdiPassword)) {
|
|
HP <- azureActiveContext$hdiPassword
|
|
} else (HP = hdiPassword)
|
|
verbosity <- set_verbosity(verbose)
|
|
|
|
|
|
if (!length(CN)) {
|
|
stop("Error: No Valid clustername provided")
|
|
}
|
|
if (!length(HA)) {
|
|
stop("Error: No Valid hdiAdmin provided")
|
|
}
|
|
if (!length(HP)) {
|
|
stop("Error: No Valid hdiPassword provided")
|
|
}
|
|
|
|
azureActiveContext$hdiAdmin <- HA
|
|
azureActiveContext$hdiPassword <- HP
|
|
azureActiveContext$clustername <- CN
|
|
|
|
URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches", sep = "")
|
|
|
|
# URL <-
|
|
# paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')
|
|
|
|
# print(URL)
|
|
|
|
|
|
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
|
|
authenticate(HA, HP), verbosity)
|
|
# ,authenticate('admin', 'Summer2014!')
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
df <- fromJSON(rl)
|
|
# print(df$sessions$id)
|
|
print(colnames(df))
|
|
dfn <- as.data.frame(df$sessions$id)
|
|
clust <- nrow(dfn)
|
|
if (clust == 0)
|
|
stop("No Sessions available")
|
|
dfn[1:clust, 2] <- df$sessions$appId
|
|
dfn[1:clust, 3] <- df$sessions$state
|
|
|
|
colnames(dfn) <- c("ID", "appID", "state")
|
|
|
|
return(dfn)
|
|
}
|
|
|
|
|
|
#' Show Spark log Output.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureSparkNewSession
|
|
#'
|
|
#' @param URL URL
|
|
#'
|
|
#' @family Spark
|
|
#'
|
|
#' @family Spark functions
|
|
#' @export
|
|
azureSparkShowURL <- function(azureActiveContext, URL) {
|
|
if (!missing(URL))
|
|
browseURL(URL) else browseURL(azureActiveContext$log)
|
|
return(TRUE)
|
|
}
|