AzureSMR/R/AzureHive.R

153 строки
4.5 KiB
R
Исходник Обычный вид История

#' Get Status of a HDI Hive Service / version.
2016-12-17 00:18:42 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
2016-12-17 00:18:42 +03:00
#'
#' @importFrom httr authenticate
#'
2016-12-17 18:49:53 +03:00
#' @family Hive functions
2016-08-03 08:03:55 +03:00
#' @export
azureHiveStatus <- function(azureActiveContext, clustername, hdiAdmin,
hdiPassword, verbose = FALSE) {
2016-08-03 08:03:55 +03:00
HA = ""
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
2017-05-23 23:39:39 +03:00
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
2016-08-03 08:03:55 +03:00
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
2016-08-03 08:03:55 +03:00
uri <- paste0("https://", CN, ".azurehdinsight.net/templeton/v1/status")
2016-08-03 08:03:55 +03:00
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
2016-11-07 20:13:24 +03:00
if (status_code(r) != 200 && status_code(r) != 201) {
stop(paste0("Error: Return code(", status_code(r), ")"))
2016-11-07 20:13:24 +03:00
}
rl <- content(r, "text", encoding = "UTF-8")
2016-08-03 08:03:55 +03:00
df <- fromJSON(rl)
return(paste("Status:", df$status, " version:", df$version))
2016-08-03 08:03:55 +03:00
}
2016-12-17 00:18:42 +03:00
#' Submit SQL command to Hive Service.
2016-12-17 18:49:53 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
2016-12-17 18:49:53 +03:00
#'
#' @param CMD SQl COmmand String
#' @param path path
2016-12-17 00:18:42 +03:00
#'
2016-12-17 18:49:53 +03:00
#' @family Hive functions
2016-08-03 08:03:55 +03:00
#' @export
azureHiveSQL <- function(azureActiveContext, CMD, clustername, hdiAdmin,
hdiPassword, path = "wasb:///tmp/", verbose = FALSE) {
2016-08-03 08:03:55 +03:00
HA = ""
if (missing(clustername)) {
CN <- azureActiveContext$clustername
} else (CN = clustername)
if (missing(hdiAdmin)) {
HA <- azureActiveContext$hdiAdmin
} else (HA = hdiAdmin)
if (missing(hdiPassword)) {
HP <- azureActiveContext$hdiPassword
} else (HP = hdiPassword)
if (missing(CMD)) {
stop("Error: No Valid Command(CMD) provided")
}
2017-05-23 23:39:39 +03:00
verbosity <- set_verbosity(verbose)
2016-08-03 08:03:55 +03:00
if (!length(CN)) {
stop("Error: No Valid clustername provided")
}
if (!length(HA)) {
stop("Error: No Valid hdiAdmin provided")
}
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
2016-08-03 08:03:55 +03:00
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
2016-08-03 08:03:55 +03:00
# bodyI <- list(user.name = HA, execute = CMD, statusdir='wasb:///tmp/'
# ) bodyI <- '{execute=\'show
# tables\';statusdir=\'HiveJobStatusFeb3\';enabloelog=\'false\';}'
2016-08-03 08:03:55 +03:00
bodyI <- paste("user.name=", HA, "&execute=", CMD, "&statusdir=", path,
sep = "")
# print(bodyI) bodyI <- 'user.name=admin&execute=SHOW
# TABLES&statusdir=wasb:///tmp/' print(bodyI)
2016-08-03 08:03:55 +03:00
URL <- paste("https://", CN, ".azurehdinsight.net/templeton/v1/hive?user.name=",
HA, "&execute=", CMD, "&statusdir=wasb:///tmp/", sep = "")
URL <- paste("https://", CN, ".azurehdinsight.net/templeton/v1/hive?user.name=",
HA, sep = "")
2016-08-03 08:03:55 +03:00
r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/x-www-form-urlencoded")),
authenticate(HA, HP), body = bodyI, encode = "json", verbosity)
# ,authenticate('admin', 'Summer2014!')
rl <- content(r, "text", encoding = "UTF-8")
2016-08-03 08:03:55 +03:00
df <- fromJSON(rl)
# print(df$id)
URL <- paste("https://", CN, ".azurehdinsight.net/templeton/v1/jobs/",
df$id, sep = "")
2016-08-03 08:03:55 +03:00
Sys.sleep(2)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP))
rl <- content(r, "text", encoding = "UTF-8")
2016-08-03 08:03:55 +03:00
df <- fromJSON(rl)
2017-05-20 23:52:09 +03:00
message(paste("CMD Running: ", Sys.time()))
message("Prep(P), Running(R), Completed(C)")
2016-08-03 08:03:55 +03:00
DUR <- 2
# print(df$status$state)
while (df$status$state == "RUNNING" | df$status$state == "PREP") {
2016-08-03 08:03:55 +03:00
Sys.sleep(DUR)
if (DUR < 5)
DUR <- DUR + 1
if (df$status$state == "PREP")
2017-02-13 19:41:11 +03:00
message("P")
if (df$status$state == "RUNNING")
2017-02-13 19:41:11 +03:00
message("R")
# print(df$status$state)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP))
rl <- content(r, "text", encoding = "UTF-8")
2016-08-03 08:03:55 +03:00
rh <- headers(r)
df <- fromJSON(rl)
}
if (df$status$state == "SUCCEEDED")
2017-02-13 19:41:11 +03:00
message("S")
if (df$status$state == "FAILED")
2017-02-13 19:41:11 +03:00
message("F")
2016-08-03 08:03:55 +03:00
STATE <- df$status$state
2017-02-13 19:41:11 +03:00
message("Finished Running statement: ", Sys.time())
return(TRUE)
2016-08-03 08:03:55 +03:00
}