506 строки
18 KiB
R
506 строки
18 KiB
R
#' Get all HDInsight Clusters in default Subscription or details for a specified cluster name.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureListAllResources
|
|
#'
|
|
#' @family HDInsight functions
|
|
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#list-by-subscription
|
|
#'
|
|
#' @return data frame with summary information of HDI clusters
|
|
#' @export
|
|
azureListHDI <- function(azureActiveContext, resourceGroup, clustername = "*",
|
|
subscriptionID, name, type, location, verbose = FALSE) {
|
|
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
|
|
assert_that(is_subscription_id(subscriptionID))
|
|
if (clustername != "*") {
|
|
assert_that(is_clustername(clustername))
|
|
assert_that(is_resource_group(resourceGroup))
|
|
}
|
|
|
|
rg <- if (clustername == "*") "" else paste0("/resourceGroups/", resourceGroup)
|
|
cn <- if (clustername == "*") "" else clustername
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID, rg,
|
|
"/providers/Microsoft.HDInsight/clusters/", cn,
|
|
"?api-version=2015-03-01-preview")
|
|
|
|
r <- call_azure_sm(azureActiveContext, uri = uri, verbose = verbose)
|
|
stopWithAzureError(r)
|
|
rc <- content(r)
|
|
extract_one <- function(x) {
|
|
as.data.frame(
|
|
c(
|
|
x[c("name", "id", "location", "type")],
|
|
x$properties[c("tier", "osType", "provisioningState", "clusterState", "createdDate")],
|
|
x$properties$clusterDefinition[c("kind")]
|
|
))
|
|
}
|
|
|
|
z <- if (is.null(rc$value)) {
|
|
extract_one(rc)
|
|
} else {
|
|
do.call(rbind, lapply(rc$value, extract_one))
|
|
}
|
|
|
|
azureActiveContext$resourceGroup <- resourceGroup
|
|
return(z)
|
|
}
|
|
|
|
|
|
|
|
#' Get configuration information for a specified cluster name.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureListHDI
|
|
#'
|
|
#'
|
|
#' @return Returns Dataframe of HDInsight Clusters information
|
|
#' @family HDInsight functions
|
|
#' @export
|
|
azureHDIConf <- function(azureActiveContext, clustername, resourceGroup,
|
|
subscriptionID, name, type, location, verbose = FALSE) {
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
if (missing(clustername)) clustername <- azureActiveContext$clustername
|
|
|
|
assert_that(is_subscription_id(subscriptionID))
|
|
assert_that(is_resource_group(resourceGroup))
|
|
assert_that(is_clustername(clustername))
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
|
|
"/resourceGroups/", resourceGroup,
|
|
"/providers/Microsoft.HDInsight/clusters/", clustername,
|
|
"?api-version=2015-03-01-preview")
|
|
|
|
r <- call_azure_sm(azureActiveContext, uri = uri, verbose = verbose)
|
|
rc <- content(r)
|
|
|
|
if (length(rc) == 0) {
|
|
warning("No HDInsight clusters found", immediate. = TRUE)
|
|
}
|
|
|
|
info <- paste(vapply(rc$properties$computeProfile$roles, function(x) {
|
|
sprintf("%s: %s * %s",
|
|
x$name,
|
|
x$targetInstanceCount,
|
|
x$hardwareProfile$vmSize)}, FUN.VALUE = character(1)
|
|
), collapse = ", "
|
|
)
|
|
return(rc)
|
|
|
|
|
|
dfn <- with(rc, data.frame(
|
|
name = name,
|
|
id = id,
|
|
location = location,
|
|
type = type,
|
|
tier = rc$properties$tier,
|
|
kind = rc$properties$clusterDefinition$kind,
|
|
osType = rc$properties$osType,
|
|
provisioningState = rc$properties$provisioningState,
|
|
status = rc$properties$clusterState,
|
|
created = rc$properties$createdDate,
|
|
numCores = rc$properties$quotaInfo$coresUsed,
|
|
information = info,
|
|
stringsAsFactors = FALSE
|
|
))
|
|
|
|
return(dfn)
|
|
}
|
|
|
|
#' Create HDInsight cluster.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureListHDI
|
|
#'
|
|
#'
|
|
#' @param version HDinsight version
|
|
#' @param kind HDinsight kind: "hadoop","spark" or "rserver"
|
|
#' @param adminUser Admin user name
|
|
#' @param adminPassword Admin user password
|
|
#' @param workers Define the number of worker nodes
|
|
#' @param sshUser SSH user name
|
|
#' @param sshPassword SSH user password
|
|
#' @param hiveServer URI address of the Hive server
|
|
#' @param hiveDB Hive DB name
|
|
#' @param hiveUser Hive user name
|
|
#' @param hivePassword Hive user password
|
|
#' @param componentVersion Spark componentVersion. Default : 1.6.2
|
|
#' @param vmSize Size of nodes: "Large", "Small", "Standard_D14_V2", etc.
|
|
#' @param mode Provisioning mode, "Sync" or "Async". Use "Async" to immediately return to R session after submission of request
|
|
#' @param debug Used for debugging purposes. If TRUE, returns json without attempting to connect to Azure
|
|
#'
|
|
#' @return Success message
|
|
#' @family HDInsight functions
|
|
#' @note See \url{https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-component-versioning} to learn about HDInsight Versions
|
|
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#create
|
|
#' @export
|
|
azureCreateHDI <- function(azureActiveContext, resourceGroup, location,
|
|
clustername, kind = c("rserver", "spark", "hadoop"),
|
|
storageAccount, storageKey,
|
|
version = "3.5", componentVersion = "1.6.2",
|
|
workers = 2,
|
|
adminUser, adminPassword, sshUser, sshPassword,
|
|
hiveServer, hiveDB, hiveUser, hivePassword,
|
|
vmSize = "Large",
|
|
subscriptionID, mode = c("Sync", "Async"),
|
|
verbose = FALSE, debug = FALSE) {
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
|
|
kind <- match.arg(kind)
|
|
mode <- match.arg(mode)
|
|
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
if (missing(storageAccount)) storageAccount <- azureActiveContext$storageAccount
|
|
|
|
assert_that(is_resource_group(resourceGroup))
|
|
if (missing(location)) {
|
|
location <- getResourceGroupLocation(azureActiveContext, resourceGroup = resourceGroup)
|
|
}
|
|
assert_that(is_location(location))
|
|
assert_that(is_subscription_id(subscriptionID))
|
|
|
|
assert_that(is_clustername(clustername))
|
|
assert_that(is_storage_account(storageAccount))
|
|
|
|
assert_that(is_admin_user(adminUser))
|
|
assert_that(is_admin_password(adminPassword))
|
|
assert_that(is_ssh_user(sshUser))
|
|
assert_that(is_ssh_password(sshPassword))
|
|
|
|
storage_accounts <- azureListSA(azureActiveContext)
|
|
if (!storageAccount %in% storage_accounts$name) {
|
|
# create storage account
|
|
message("creating storage account: ", storageAccount)
|
|
azureCreateStorageAccount(azureActiveContext, storageAccount = storageAccount, resourceGroup = resourceGroup, location = location)
|
|
storageResGroup <- resourceGroup
|
|
} else {
|
|
# retrieve resource group of storage account
|
|
idx <- storage_accounts$name == storageAccount
|
|
storageResGroup <- storage_accounts$resourceGroup[idx]
|
|
}
|
|
|
|
storageKey <- azureSAGetKey(azureActiveContext, storageAccount = storageAccount, resourceGroup = storageResGroup)
|
|
|
|
HIVE <- FALSE
|
|
hivejson <- ""
|
|
if (!missing(hiveServer)) {
|
|
|
|
HIVE <- TRUE
|
|
if (!length(hiveDB)) {
|
|
stop("Error: hiveServer: No Valid hiveDB provided")
|
|
}
|
|
if (!length(hiveUser)) {
|
|
stop("Error: hiveServer: No Valid hiveUser provided")
|
|
}
|
|
if (!length(hivePassword)) {
|
|
stop("Error: hiveServer: No Valid hivePassword provided")
|
|
}
|
|
hivejson <- hive_json(hiveServer = hiveServer, hiveDB = hiveDB,
|
|
hiveUser = hiveUser, hivePassword = hivePassword)
|
|
}
|
|
|
|
body <- hdi_json(subscriptionID = subscriptionID, clustername = clustername,
|
|
location = location, storageAccount = storageAccount, storageKey = storageKey,
|
|
version = version,
|
|
kind = kind, vmSize = vmSize,
|
|
hivejson = hivejson,
|
|
componentVersion = componentVersion,
|
|
sshUser = sshUser, sshPassword = sshPassword,
|
|
adminUser = adminUser, adminPassword = adminPassword,
|
|
workers = workers)
|
|
|
|
if (debug) {
|
|
z <- fromJSON(body)
|
|
return(z)
|
|
}
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
|
|
"/resourceGroups/", resourceGroup,
|
|
"/providers/Microsoft.HDInsight/clusters/", clustername,
|
|
"?api-version=2015-03-01-preview")
|
|
|
|
r <- call_azure_sm(azureActiveContext, uri = uri, body = body,
|
|
verb = "PUT", verbose = verbose)
|
|
stopWithAzureError(r)
|
|
|
|
azureActiveContext$resourceGroup <- resourceGroup
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
if (mode == "Sync") {
|
|
z <- pollStatusHDI(azureActiveContext, clustername = clustername)
|
|
if (!z) return(FALSE)
|
|
}
|
|
azureActiveContext$hdiAdmin <- adminUser
|
|
azureActiveContext$hdiPassword <- adminPassword
|
|
azureActiveContext$clustername <- clustername
|
|
message(paste("Finished: ", Sys.time()))
|
|
return(TRUE)
|
|
}
|
|
|
|
|
|
#' Resize a HDInsight cluster role.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureListHDI
|
|
#' @inheritParams azureCreateHDI
|
|
#'
|
|
#' @param role role type: 'worker', 'head' or 'edge'
|
|
#' @param size Numeric: the number of nodes for this type of role
|
|
#'
|
|
#' @family HDInsight functions
|
|
#' @export
|
|
azureResizeHDI <- function(azureActiveContext, clustername,
|
|
role = c("workernode", "headnode", "edgenode"),
|
|
size = 2, mode = c("Sync", "Async"), subscriptionID,
|
|
resourceGroup, verbose = FALSE) {
|
|
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
|
|
assert_that(is_resource_group(resourceGroup))
|
|
assert_that(is_clustername(clustername))
|
|
assert_that(is.integer(as.integer(size)))
|
|
|
|
role <- match.arg(role)
|
|
mode <- match.arg(mode)
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
|
|
"/resourceGroups/", resourceGroup,
|
|
"/providers/Microsoft.HDInsight/clusters/", clustername,
|
|
"/roles/", role, "/resize?api-version=2015-03-01-preview")
|
|
|
|
body <- list(targetInstanceCount = size)
|
|
r <- call_azure_sm(azureActiveContext, uri = uri, body = body,
|
|
verb = "POST", verbose = verbose)
|
|
stopWithAzureError(r)
|
|
|
|
rl <- content(r, "text", encoding = "UTF-8")
|
|
if (status_code(r) != 202) {
|
|
stop(paste("Error: Return code", status_code(r)))
|
|
}
|
|
RT <- "Request accepted"
|
|
if (mode == "Sync") {
|
|
azureActiveContext$resourceGroup <- resourceGroup
|
|
message(paste("azureResizeHDI: request submitted: ", Sys.time()))
|
|
message("Key: A - accepted, (.) - in progress, S - succeeded")
|
|
a <- 1
|
|
while (a > 0) {
|
|
rc <- azureListHDI(azureActiveContext, clustername = clustername)
|
|
rc1 <- rc[9, 1]
|
|
if (rc1 == "Running") {
|
|
message("R")
|
|
message("")
|
|
message(paste("Finished Resizing Sucessfully: ", Sys.time()))
|
|
(break)()
|
|
}
|
|
|
|
if (rc1 == "Error") {
|
|
message("")
|
|
message(paste("Error Resizing: ", Sys.time()))
|
|
(break)()
|
|
}
|
|
|
|
a <- a + 1
|
|
if (rc1 == "Accepted") {
|
|
rc1 <- "A"
|
|
}
|
|
if (rc1 == "InProgress") {
|
|
rc1 <- "R"
|
|
}
|
|
if (rc1 == "AzureVMConfiguration") {
|
|
rc1 <- "R"
|
|
}
|
|
if (rc1 == "HdInsightConfiguration") {
|
|
rc1 <- "R"
|
|
}
|
|
message(rc1)
|
|
|
|
if (a > 500)
|
|
(break)()
|
|
Sys.sleep(5)
|
|
}
|
|
# RT <- clusters[12,1]
|
|
}
|
|
message(paste("Finished: ", Sys.time()))
|
|
|
|
return(TRUE)
|
|
}
|
|
|
|
|
|
#' Delete HDInsight cluster.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureListHDI
|
|
#'
|
|
#' @return Data frame with HDInsight clusters information
|
|
#' @family HDInsight functions
|
|
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#delete
|
|
#' @export
|
|
azureDeleteHDI <- function(azureActiveContext, clustername, subscriptionID,
|
|
resourceGroup, verbose = FALSE) {
|
|
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
|
|
assert_that(is_clustername(clustername))
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
|
|
assert_that(is_resource_group(resourceGroup))
|
|
assert_that(is_clustername(clustername))
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
|
|
"/resourceGroups/", resourceGroup,
|
|
"/providers/Microsoft.HDInsight/clusters/", clustername,
|
|
"?api-version=2015-03-01-preview")
|
|
|
|
r <- call_azure_sm(azureActiveContext, uri = uri,
|
|
verb = "DELETE", verbose = verbose)
|
|
stopWithAzureError(r)
|
|
|
|
message("Delete request accepted")
|
|
return(TRUE)
|
|
}
|
|
|
|
#' Run script action on HDI cluster.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureAuthenticate
|
|
#' @inheritParams azureListHDI
|
|
#' @inheritParams azureListVM
|
|
#'
|
|
#' @param scriptname Identifier for Custom action script operation
|
|
#' @param scriptURL URL to custom action script
|
|
#' @param headNode install on head nodes
|
|
#' @param workerNode install on worker nodes
|
|
#' @param edgeNode install on worker nodes
|
|
#' @param parameters parameters
|
|
#' @param wait If TRUE, runs script action synchronously, i.e. waits for successfull completion. If FALSE, submits the action asynchronously
|
|
#'
|
|
#' @return Returns Success Message
|
|
#' @family HDInsight functions
|
|
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#run-script-actions-on-a-running-cluster-linux-cluster-only
|
|
#' @export
|
|
azureRunScriptAction <- function(azureActiveContext, scriptname, scriptURL,
|
|
headNode = TRUE, workerNode = FALSE, edgeNode = FALSE,
|
|
clustername, resourceGroup,
|
|
parameters = "", subscriptionID,
|
|
wait = TRUE, verbose = FALSE) {
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
if (missing(clustername)) clustername <- azureActiveContext$clustername
|
|
|
|
assert_that(is_resource_group(resourceGroup))
|
|
assert_that(is_subscription_id(subscriptionID))
|
|
assert_that(is_clustername(clustername))
|
|
|
|
if (!length(scriptname)) stop("Error: No Valid scriptname provided")
|
|
if (!length(scriptURL)) stop("Error: No Valid scriptURL provided")
|
|
if (!any(headNode, workerNode, edgeNode)) {
|
|
stop("Error: No role(headNode,workerNode,edgeNode) flag set to TRUE")
|
|
}
|
|
|
|
roles <- c(headNode = '"headnode"',
|
|
workerNode = '"workernode"',
|
|
edgeNode = '"edgenode"')
|
|
RL <- paste(roles[c(headNode, workerNode, edgeNode)], sep = ", ")
|
|
|
|
body <- paste0('
|
|
{
|
|
"scriptActions": [{
|
|
"name":"', scriptname, '",
|
|
"uri":"', scriptURL, '",
|
|
"parameters":"', parameters, '",
|
|
"roles":[', RL, ']
|
|
}],
|
|
"persistOnSuccess": true
|
|
}')
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
|
|
"/resourceGroups/", resourceGroup,
|
|
"/providers/Microsoft.HDInsight/clusters/", clustername,
|
|
"/executeScriptActions?api-version=2015-03-01-preview")
|
|
|
|
r <- call_azure_sm(azureActiveContext, uri = uri, body = body,
|
|
verb = "POST", verbose = verbose)
|
|
stopWithAzureError(r)
|
|
|
|
azureActiveContext$clustername <- clustername
|
|
azureActiveContext$resourceGroup <- resourceGroup
|
|
|
|
|
|
message("Accepted")
|
|
if (wait) pollStatusScriptAction(azureActiveContext, scriptname = scriptname, resourceGroup = resourceGroup)
|
|
return(TRUE)
|
|
}
|
|
|
|
|
|
#' Get all HDInsight script action history for a specified cluster name.
|
|
#'
|
|
#' @inheritParams setAzureContext
|
|
#' @inheritParams azureListHDI
|
|
#' @inheritParams azureRunScriptAction
|
|
#'
|
|
#' @return Dataframe of HDInsight Clusters
|
|
#' @family HDInsight functions
|
|
#' @references https://docs.microsoft.com/en-us/rest/api/hdinsight/hdinsight-cluster#list-all-persisted-script-actions-for-a-cluster-linux-cluster-only
|
|
#' @export
|
|
azureScriptActionHistory <- function(azureActiveContext, resourceGroup,
|
|
clustername = "*", subscriptionID,
|
|
name, type, verbose = FALSE) {
|
|
assert_that(is.azureActiveContext(azureActiveContext))
|
|
|
|
if (missing(resourceGroup)) resourceGroup <- azureActiveContext$resourceGroup
|
|
if (missing(subscriptionID)) subscriptionID <- azureActiveContext$subscriptionID
|
|
if (missing(clustername)) clustername <- azureActiveContext$clustername
|
|
|
|
assert_that(is_resource_group(resourceGroup))
|
|
assert_that(is_subscription_id(subscriptionID))
|
|
assert_that(is_clustername(clustername))
|
|
|
|
uri <- paste0("https://management.azure.com/subscriptions/", subscriptionID,
|
|
"/resourceGroups/", resourceGroup,
|
|
"/providers/Microsoft.HDInsight/clusters/", clustername,
|
|
"/scriptExecutionHistory/?api-version=2015-03-01-preview")
|
|
|
|
r <- call_azure_sm(azureActiveContext, uri = uri,
|
|
verb = "GET", verbose = verbose)
|
|
stopWithAzureError(r)
|
|
|
|
rc <- content(r, bigint_as_char = TRUE)$value
|
|
if (length(rc) == 0) {
|
|
message("No script action history found")
|
|
}
|
|
class(rc) <- "azureScriptActionHistory"
|
|
|
|
|
|
azureActiveContext$clustername <- clustername
|
|
return(rc)
|
|
}
|
|
|
|
#' @export
|
|
#' @param object azureScriptActionHistory object, created by [azureScriptActionHistory()]
|
|
#' @param ... not used
|
|
#' @rdname azureScriptActionHistory
|
|
summary.azureScriptActionHistory <- function(object, ...) {
|
|
do.call(rbind, lapply(object, function(x) {
|
|
data.frame(
|
|
x[c("name", "scriptExecutionId", "startTime")],
|
|
if (is.null(x$endTime)) list(endTime = NA) else x["endTime"],
|
|
x[c("status", "uri", "parameters")]
|
|
)
|
|
}))
|
|
}
|