AzureSMR/R/AzureHDI.R

522 строки
18 KiB
R
Исходник Обычный вид История

#' Get all HDInsight Clusters in default Subscription or details for a specified cluster name.
2016-12-17 00:18:42 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureListAllResources
#'
#' @family HDInsight functions
2017-05-30 12:34:22 +03:00
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#list-by-subscription
2016-12-17 00:18:42 +03:00
#'
2017-05-30 12:34:22 +03:00
#' @return data frame with summary information of HDI clusters
2016-08-03 08:03:55 +03:00
#' @export
azureListHDI <- function(azureActiveContext, resourceGroup, clustername = "*",
subscriptionID, name, type, location, verbose = FALSE) {
assert_that(is.azureActiveContext(azureActiveContext))
azureCheckToken(azureActiveContext)
2017-05-23 23:39:39 +03:00
azToken <- azureActiveContext$Token
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
2017-05-23 23:39:39 +03:00
verbosity <- set_verbosity(verbose)
2017-05-30 12:34:22 +03:00
assert_that(is_subscription_id(subscriptionID))
if (clustername != "*") {
assert_that(is_clustername(clustername))
assert_that(is_resource_group(resourceGroup))
}
rg <- if (clustername == "*") "" else paste0("/resourceGroups/", resourceGroup)
cn <- if (clustername == "*") "" else clustername
2017-05-30 12:34:22 +03:00
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID, rg,
2017-05-30 12:34:22 +03:00
"/providers/Microsoft.HDInsight/clusters/", cn,
"?api-version=2015-03-01-preview")
r <- GET(URL, azureApiHeaders(azToken), verbosity)
stopWithAzureError(r)
2017-05-30 12:34:22 +03:00
rc <- content(r)
extract_one <- function(x) {
as.data.frame(
c(
x[c("name", "id", "location", "type")],
x$properties[c("tier", "osType", "provisioningState", "clusterState", "createdDate")],
x$properties$clusterDefinition[c("kind")]
2017-05-30 12:34:22 +03:00
))
}
z <- if (is.null(rc$value)) {
extract_one(rc)
} else {
do.call(rbind, lapply(rc$value, extract_one))
}
azureActiveContext$resourceGroup <- resourceGroup
return(z)
2016-08-03 08:03:55 +03:00
}
2016-12-17 00:18:42 +03:00
2017-05-30 18:08:14 +03:00
#' Get configuration information for a specified cluster name.
2016-12-17 00:18:42 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureListHDI
2016-12-17 00:18:42 +03:00
#'
#'
2016-08-03 08:03:55 +03:00
#' @return Returns Dataframe of HDInsight Clusters information
#' @family HDInsight functions
2016-08-03 08:03:55 +03:00
#' @export
azureHDIConf <- function(azureActiveContext, clustername, resourceGroup,
subscriptionID, name, type, location, verbose = FALSE) {
2017-05-30 12:34:22 +03:00
assert_that(is.azureActiveContext(azureActiveContext))
azureCheckToken(azureActiveContext)
2017-05-23 23:39:39 +03:00
azToken <- azureActiveContext$Token
2017-05-30 12:34:22 +03:00
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
if (missing(clustername)) clustername <- azureActiveContext$clustername
2017-05-23 23:39:39 +03:00
verbosity <- set_verbosity(verbose)
2017-05-30 12:34:22 +03:00
assert_that(is_subscription_id(subscriptionID))
assert_that(is_resource_group(resourceGroup))
assert_that(is_clustername(clustername))
2016-12-19 20:38:23 +03:00
2017-05-30 12:34:22 +03:00
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
"/resourceGroups/", resourceGroup,
"/providers/Microsoft.HDInsight/clusters/", clustername,
"?api-version=2015-03-01-preview")
2016-08-03 08:03:55 +03:00
2017-05-30 12:34:22 +03:00
r <- GET(URL, azureApiHeaders(azToken), verbosity)
2017-05-30 18:08:14 +03:00
browser()
2017-05-30 12:34:22 +03:00
rc <- content(r)
2016-12-19 20:38:23 +03:00
2017-05-30 12:34:22 +03:00
if (length(rc) == 0) {
warning("No HDInsight clusters found", immediate. = TRUE)
2016-08-03 08:03:55 +03:00
}
2017-05-30 12:34:22 +03:00
info <- paste(vapply(rc$properties$computeProfile$roles, function(x) {
sprintf("%s: %s * %s",
x$name,
x$targetInstanceCount,
x$hardwareProfile$vmSize)}, FUN.VALUE = character(1)
), collapse = ", "
)
dfn <- with(rc, data.frame(
name = name,
id = id,
location = location,
type = type,
tier = rc$properties$tier,
kind = rc$properties$clusterDefinition$kind,
osType = rc$properties$osType,
provisioningState = rc$properties$provisioningState,
status = rc$properties$clusterState,
created = rc$properties$createdDate,
numCores = rc$properties$quotaInfo$coresUsed,
information = info,
stringsAsFactors = FALSE
))
return(dfn)
2016-08-03 08:03:55 +03:00
}
2016-12-17 00:18:42 +03:00
#' Create HDInsight cluster.
2016-12-17 00:18:42 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureListHDI
2016-12-17 00:18:42 +03:00
#'
#'
#' @param version HDinsight version
#' @param kind HDinsight kind: "hadoop","spark" or "rserver"
#' @param adminUser Admin user name
#' @param adminPassword Admin user password
2016-12-20 20:02:19 +03:00
#' @param workers Define the number of worker nodes
#' @param sshUser SSH user name
#' @param sshPassword SSH user password
2016-12-20 20:02:19 +03:00
#' @param hiveServer URI address of the Hive server
#' @param hiveDB Hive DB name
#' @param hiveUser Hive user name
#' @param hivePassword Hive user password
2016-12-21 19:06:43 +03:00
#' @param componentVersion Spark componentVersion. Default : 1.6.2
#' @param vmSize Size of nodes: "Large", "Small", "Standard_D14_V2", etc.
#' @param mode Provisioning mode, "Sync" or "Async". Use "Async" to immediately return to R session after submission of request
2017-05-30 12:34:22 +03:00
#' @param debug Used for debugging purposes. If TRUE, returns json without attempting to connect to Azure
2016-12-17 18:49:53 +03:00
#'
#' @return Success message
#' @family HDInsight functions
2016-12-20 20:02:19 +03:00
#' @note See \url{https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-component-versioning} to learn about HDInsight Versions
2017-05-30 18:08:14 +03:00
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#create
2016-09-14 15:09:36 +03:00
#' @export
azureCreateHDI <- function(azureActiveContext, resourceGroup, location,
clustername, kind = c("rserver", "spark", "hadoop"),
storageAccount, storageKey,
version = "3.5", componentVersion = "1.6.2",
workers = 2,
adminUser, adminPassword, sshUser, sshPassword,
hiveServer, hiveDB, hiveUser, hivePassword,
2017-05-30 12:34:22 +03:00
vmSize = "Large",
subscriptionID, mode = c("Sync", "Async"),
verbose = FALSE, debug = FALSE) {
assert_that(is.azureActiveContext(azureActiveContext))
azureCheckToken(azureActiveContext)
2017-05-23 23:39:39 +03:00
azToken <- azureActiveContext$Token
kind <- match.arg(kind)
mode <- match.arg(mode)
2016-12-19 20:38:23 +03:00
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
if (missing(storageAccount)) storageAccount <- azureActiveContext$storageAccount
verbosity <- set_verbosity(verbose)
assert_that(is_resource_group(resourceGroup))
if (missing(location)) {
location <- getResourceGroupLocation(azureActiveContext, resourceGroup = resourceGroup)
2016-12-19 20:38:23 +03:00
}
assert_that(is_location(location))
assert_that(is_subscription_id(subscriptionID))
assert_that(is_clustername(clustername))
assert_that(is_storage_account(storageAccount))
assert_that(is_admin_user(adminUser))
assert_that(is_admin_password(adminPassword))
assert_that(is_ssh_user(sshUser))
assert_that(is_ssh_password(sshPassword))
storage_accounts <- azureListSA(azureActiveContext)
if (!storageAccount %in% storage_accounts$name) {
# create storage account
message("creating storage account: ", storageAccount)
azureCreateStorageAccount(azureActiveContext, storageAccount = storageAccount, resourceGroup = resourceGroup, location = location)
storageResGroup <- resourceGroup
} else {
# retrieve resource group of storage account
idx <- storage_accounts$name == storageAccount
storageResGroup <- storage_accounts$resourceGroup[idx]
2016-12-19 20:38:23 +03:00
}
2016-09-14 15:09:36 +03:00
storageKey <- azureSAGetKey(azureActiveContext, storageAccount = storageAccount, resourceGroup = storageResGroup)
2016-11-07 20:13:24 +03:00
2016-09-14 15:09:36 +03:00
HIVE <- FALSE
hivejson <- ""
if (!missing(hiveServer)) {
2016-11-07 20:13:24 +03:00
2016-09-14 15:09:36 +03:00
HIVE <- TRUE
if (!length(hiveDB)) {
stop("Error: hiveServer: No Valid hiveDB provided")
2016-12-19 20:38:23 +03:00
}
if (!length(hiveUser)) {
stop("Error: hiveServer: No Valid hiveUser provided")
2016-12-19 20:38:23 +03:00
}
if (!length(hivePassword)) {
stop("Error: hiveServer: No Valid hivePassword provided")
2016-12-19 20:38:23 +03:00
}
hivejson <- hive_json(hiveServer = hiveServer, hiveDB = hiveDB,
hiveUser = hiveUser, hivePassword = hivePassword)
2016-09-14 15:09:36 +03:00
}
bodyI <- hdi_json(subscriptionID = subscriptionID, clustername = clustername,
location = location, storageAccount = storageAccount, storageKey = storageKey,
version = version,
kind = kind, vmSize = vmSize,
hivejson = hivejson,
componentVersion = componentVersion,
sshUser = sshUser, sshPassword = sshPassword,
adminUser = adminUser, adminPassword = adminPassword,
workers = workers)
2016-09-14 15:09:36 +03:00
if (debug) {
z <- fromJSON(bodyI)
return(z)
2016-09-14 15:09:36 +03:00
}
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
"/resourceGroups/", resourceGroup,
"/providers/Microsoft.HDInsight/clusters/", clustername,
2016-12-19 20:38:23 +03:00
"?api-version=2015-03-01-preview")
r <- PUT(URL, azureApiHeaders(azToken), body = bodyI, encode = "json", verbosity)
stopWithAzureError(r)
azureActiveContext$resourceGroup <- resourceGroup
2016-12-19 20:38:23 +03:00
rl <- content(r, "text", encoding = "UTF-8")
if (mode == "Sync") {
z <- pollStatusHDI(azureActiveContext, clustername = clustername)
if (!z) return(FALSE)
}
azureActiveContext$hdiAdmin <- adminUser
azureActiveContext$hdiPassword <- adminPassword
azureActiveContext$clustername <- clustername
2017-05-20 23:52:09 +03:00
message(paste("Finished: ", Sys.time()))
return(TRUE)
}
2016-09-14 15:09:36 +03:00
2016-12-17 00:18:42 +03:00
#' Resize a HDInsight cluster role.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureListHDI
#' @inheritParams azureCreateHDI
#'
#' @param role role type: 'worker', 'head' or 'edge'
#' @param size Numeric: the number of nodes for this type of role
#'
#' @family HDInsight functions
#' @export
azureResizeHDI <- function(azureActiveContext, clustername,
role = c("worker", "head", "edge"),
size = 2, mode = c("Sync", "Async"), subscriptionID,
resourceGroup, verbose = FALSE) {
azureCheckToken(azureActiveContext)
azToken <- azureActiveContext$Token
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
verbosity <- set_verbosity(verbose)
assert_that(is_resource_group(resourceGroup))
assert_that(is_clustername(clustername))
assert_that(is.integer(size))
role <- match.arg(role)
mode <- match.arg(mode)
verbosity <- set_verbosity(verbose)
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
"/resourceGroups/", resourceGroup,
"/providers/Microsoft.HDInsight/clusters/", clustername,
"/roles/", role, "/resize?api-version=2015-03-01-preview")
bodyI <- list(targetInstanceCount = size)
r <- POST(URL, azureApiHeaders(azToken),
body = bodyI,
encode = "json", verbosity)
rl <- content(r, "text", encoding = "UTF-8")
if (status_code(r) != 202) {
stop(paste("Error: Return code", status_code(r)))
}
RT <- "Request accepted"
if (mode == "Sync") {
azureActiveContext$resourceGroup <- resourceGroup
message(paste("azureResizeHDI: request submitted: ", Sys.time()))
message("Key: A - accepted, (.) - in progress, S - succeeded")
a <- 1
while (a > 0) {
rc <- azureListHDI(azureActiveContext, clustername = clustername)
rc1 <- rc[9, 1]
if (rc1 == "Running") {
message("R")
message("")
message(paste("Finished Resizing Sucessfully: ", Sys.time()))
(break)()
}
if (rc1 == "Error") {
message("")
message(paste("Error Resizing: ", Sys.time()))
(break)()
}
a <- a + 1
if (rc1 == "Accepted") {
rc1 <- "A"
}
if (rc1 == "InProgress") {
rc1 <- "R"
}
if (rc1 == "AzureVMConfiguration") {
rc1 <- "R"
}
if (rc1 == "HdInsightConfiguration") {
rc1 <- "R"
}
message(rc1)
if (a > 500)
(break)()
Sys.sleep(5)
}
# RT <- clusters[12,1]
}
message(paste("Finished: ", Sys.time()))
return(TRUE)
}
#' Delete HDInsight cluster.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureListHDI
#'
#' @return Returns Dataframe of HDInsight Clusters information
#' @family HDInsight functions
2017-05-30 18:08:14 +03:00
#' @reference https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#delete
#' @export
azureDeleteHDI <- function(azureActiveContext, clustername, subscriptionID,
resourceGroup, verbose = FALSE) {
assert_that(is.azureActiveContext(azureActiveContext))
azureCheckToken(azureActiveContext)
azToken <- azureActiveContext$Token
assert_that(is_clustername(clustername))
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
verbosity <- set_verbosity(verbose)
assert_that(is_resource_group(resourceGroup))
assert_that(is_clustername(clustername))
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
2017-05-30 18:08:14 +03:00
"/resourceGroups/", resourceGroup,
"/providers/Microsoft.HDInsight/clusters/", clustername,
"?api-version=2015-03-01-preview")
r <- DELETE(URL, azureApiHeaders(azToken), verbosity)
stopWithAzureError(r)
message("Delete request accepted")
return(TRUE)
}
#' Run script action on HDI cluster.
2016-12-17 00:18:42 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureListHDI
#' @inheritParams azureListVM
2016-12-17 00:18:42 +03:00
#'
#' @param scriptname Identifier for Custom action script operation
#' @param scriptURL URL to custom action script
#' @param headNode install on head nodes
#' @param workerNode install on worker nodes
#' @param edgeNode install on worker nodes
#' @param parameters parameters
2017-05-30 18:08:14 +03:00
#' @param wait If TRUE, runs script action synchronously, i.e. waits for successfull completion. If FALSE, submits the action asynchronously
2016-12-17 00:18:42 +03:00
#'
2016-09-14 15:09:36 +03:00
#' @return Returns Success Message
#' @family HDInsight functions
2017-05-30 18:08:14 +03:00
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#run-script-actions-on-a-running-cluster-linux-cluster-only
2016-09-14 15:09:36 +03:00
#' @export
azureRunScriptAction <- function(azureActiveContext, scriptname, scriptURL,
headNode = TRUE, workerNode = FALSE, edgeNode = FALSE,
clustername, resourceGroup,
2017-05-30 18:08:14 +03:00
parameters = "", subscriptionID,
wait = TRUE, verbose = FALSE) {
assert_that(is.azureActiveContext(azureActiveContext))
azureCheckToken(azureActiveContext)
2017-05-23 23:39:39 +03:00
azToken <- azureActiveContext$Token
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
if (missing(clustername)) clustername <- azureActiveContext$clustername
2017-05-23 23:39:39 +03:00
verbosity <- set_verbosity(verbose)
assert_that(is_resource_group(resourceGroup))
assert_that(is_subscription_id(subscriptionID))
assert_that(is_clustername(clustername))
2016-12-19 20:38:23 +03:00
2017-05-30 18:08:14 +03:00
if (!length(scriptname)) stop("Error: No Valid scriptname provided")
if (!length(scriptURL)) stop("Error: No Valid scriptURL provided")
if (!any(headNode, workerNode, edgeNode)) {
stop("Error: No role(headNode,workerNode,edgeNode) flag set to TRUE")
2016-09-14 15:09:36 +03:00
}
roles <- c(headNode = '"headnode"',
workerNode = '"workernode"',
edgeNode = '"edgenode"')
RL <- paste(roles[c(headNode, workerNode, edgeNode)], sep = ", ")
bodyI <- paste0('
{
"scriptActions": [{
"name":"', scriptname, '",
"uri":"', scriptURL, '",
"parameters":"', parameters, '",
"roles":[', RL, ']
}],
"persistOnSuccess": true
}')
2016-09-14 15:09:36 +03:00
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
"/resourceGroups/", resourceGroup,
"/providers/Microsoft.HDInsight/clusters/", clustername,
"/executeScriptActions?api-version=2015-03-01-preview")
2016-09-14 15:09:36 +03:00
r <- POST(URL, azureApiHeaders(azToken),
2016-12-19 20:38:23 +03:00
body = bodyI,
encode = "json", verbosity)
stopWithAzureError(r)
2016-11-07 20:13:24 +03:00
azureActiveContext$clustername <- clustername
message("Accepted")
2017-05-30 18:08:14 +03:00
if (wait) pollStatusScriptAction(azureActiveContext, scriptname = scriptname)
return(TRUE)
2016-09-14 15:09:36 +03:00
}
2016-12-17 00:18:42 +03:00
#' Get all HDInsight script action history for a specified cluster name.
2016-12-17 00:18:42 +03:00
#'
#' @inheritParams setAzureContext
#' @inheritParams azureListHDI
#' @inheritParams azureRunScriptAction
2016-12-17 00:18:42 +03:00
#'
#' @return Dataframe of HDInsight Clusters
#' @family HDInsight functions
2017-05-30 18:08:14 +03:00
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#list-all-persisted-script-actions-for-a-cluster-linux-cluster-only
2016-09-14 15:09:36 +03:00
#' @export
azureScriptActionHistory <- function(azureActiveContext, resourceGroup,
clustername = "*", subscriptionID,
name, type, verbose = FALSE) {
assert_that(is.azureActiveContext(azureActiveContext))
azureCheckToken(azureActiveContext)
2017-05-23 23:39:39 +03:00
azToken <- azureActiveContext$Token
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
if (missing(clustername)) clustername <- azureActiveContext$clustername
2017-05-23 23:39:39 +03:00
verbosity <- set_verbosity(verbose)
assert_that(is_resource_group(resourceGroup))
assert_that(is_subscription_id(subscriptionID))
assert_that(is_clustername(clustername))
2016-09-14 15:09:36 +03:00
URL <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
"/resourceGroups/", resourceGroup,
"/providers/Microsoft.HDInsight/clusters/", clustername,
"/scriptExecutionHistory/?api-version=2015-03-01-preview")
2016-09-14 15:09:36 +03:00
r <- GET(URL, azureApiHeaders(azToken), verbosity)
stopWithAzureError(r)
2016-09-14 15:09:36 +03:00
rc <- content(r)$value
if (length(rc) == 0) {
message("No script action history found")
2016-12-19 20:38:23 +03:00
}
2017-05-30 18:08:14 +03:00
class(rc) <- "azureScriptActionHistory"
2016-09-14 15:09:36 +03:00
2017-05-30 18:08:14 +03:00
azureActiveContext$clustername <- clustername
return(rc)
}
#' @export
#' @rdname azureScriptActionHistory
summary.azureScriptActionHistory <- function(x) {
do.call(rbind, lapply(x, function(x) {
data.frame(
x[c("name", "scriptExecutionId", "startTime")],
2017-05-30 18:08:14 +03:00
if (is.null(x$endTime)) list(endTime = NA) else x["endTime"],
x[c("status", "uri", "parameters")]
)
}))
2016-09-14 15:09:36 +03:00
}