зеркало из https://github.com/Azure/AzureDSVM.git
Fixed storageAccount name duplicate issues in deployment templates
This commit is contained in:
Родитель
a625ab22e4
Коммит
36f344eb66
|
@ -0,0 +1,226 @@
|
||||||
|
#' @title Create an compute interface object that handles interaction with remote instance.
|
||||||
|
#' @param remote URL of remote instance.
|
||||||
|
#' @param user User name.
|
||||||
|
#' @param script R script with full path for execution at remote instance.
|
||||||
|
#' @param config Configuration for remote execution. Settings include computing context, data reference, etc.
|
||||||
|
#' @return An S3 compute interface object.
|
||||||
|
#' @export
|
||||||
|
createComputeInterface <- function(remote,
|
||||||
|
user,
|
||||||
|
script,
|
||||||
|
config){
|
||||||
|
ci_env <- new.env(parent=globalenv())
|
||||||
|
|
||||||
|
# initialize an compute interface object.
|
||||||
|
|
||||||
|
if(!missing(remote)) {
|
||||||
|
ci_env$remote <- remote
|
||||||
|
} else {
|
||||||
|
ci_env$remote <- character(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!missing(user)) {
|
||||||
|
ci_env$user <- user
|
||||||
|
} else {
|
||||||
|
ci_env$user <- character(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!missing(script)) {
|
||||||
|
ci_env$script <- script
|
||||||
|
} else {
|
||||||
|
ci_env$script <- character(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!missing(config)) {
|
||||||
|
ci_env$config <- config
|
||||||
|
} else {
|
||||||
|
ci_env$config <- NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
return(ci_env)
|
||||||
|
}
|
||||||
|
|
||||||
|
#' @title Set configuration for the compute interface object.
|
||||||
|
#' @param object An S3 compute interface object.
|
||||||
|
#' @param machine_list List of remote instances that execute R scripts.
|
||||||
|
#' @param dns_list DNS of the remote instances.
|
||||||
|
#' @param machine_user User name of the remote instances.
|
||||||
|
#' @param master Master node of the machine.
|
||||||
|
#' @param slaves Slave nodes of the machine.
|
||||||
|
#' @param data Reference to data used in the analytics.
|
||||||
|
#' @param context Computing context available in Microsoft R Server for running the analytics.
|
||||||
|
#' @export
|
||||||
|
setConfig <- function(object,
|
||||||
|
machine_list,
|
||||||
|
dns_list,
|
||||||
|
machine_user,
|
||||||
|
master,
|
||||||
|
slaves,
|
||||||
|
data,
|
||||||
|
context) {
|
||||||
|
object$config <- list(
|
||||||
|
CI_MACHINES = ifelse(!missing(machine_list), list(machine_list), ""),
|
||||||
|
CI_DNS = ifelse(!missing(dns_list), list(dns_list), ""),
|
||||||
|
CI_VMUSER = ifelse(!missing(machine_user), list(machine_user), ""),
|
||||||
|
CI_MASTER = ifelse(!missing(master), list(master), ""),
|
||||||
|
CI_SLAVES = ifelse(!missing(slaves), list(slaves), ""),
|
||||||
|
CI_DATA = ifelse(!missing(data), list(data), ""),
|
||||||
|
CI_CONTEXT = ifelse(!missing(context), list(context), "")
|
||||||
|
)
|
||||||
|
|
||||||
|
return(object)
|
||||||
|
}
|
||||||
|
|
||||||
|
#' @title Dump out the object configuration.
|
||||||
|
#' @param object The compute interface object.
|
||||||
|
#' @return No return. Print compute interface object information.
|
||||||
|
#' @export
|
||||||
|
dumpObject <- function(object) {
|
||||||
|
cat(
|
||||||
|
sprintf("---------------------------------------------------------------------------"),
|
||||||
|
sprintf("Compute interface information"),
|
||||||
|
sprintf("---------------------------------------------------------------------------"),
|
||||||
|
sprintf("The R script to be executed:\t%s.", shQuote(object$script)),
|
||||||
|
sprintf("The remote host:\t\t%s.", shQuote(object$remote)),
|
||||||
|
sprintf("The login user name:\t\t%s.", shQuote(object$user)),
|
||||||
|
sprintf("---------------------------------------------------------------------------"),
|
||||||
|
sprintf("The configuration of the interface is:"),
|
||||||
|
# sprintf("virtual machines: %s", ifelse(!is.na(object$config$CI_MACHINES), object$config$CI_MACHINES, "N/A")),
|
||||||
|
sprintf("virtual machines\t\t %s", unlist(object$config$CI_MACHINES)),
|
||||||
|
sprintf("dns list\t\t\t %s", unlist(object$config$CI_DNS)),
|
||||||
|
sprintf("user to these machines\t\t %s", unlist(object$config$CI_VMUSER)),
|
||||||
|
sprintf("the master node\t\t\t %s", unlist(object$config$CI_MASTER)),
|
||||||
|
sprintf("the slave nodes\t\t\t %s", unlist(object$config$CI_SLAVES)),
|
||||||
|
sprintf("the data source\t\t\t %s", unlist(object$config$CI_DATA)),
|
||||||
|
sprintf("the computing context\t\t %s", unlist(object$config$CI_CONTEXT)),
|
||||||
|
sprintf("---------------------------------------------------------------------------"),
|
||||||
|
sprintf("# End of information session"),
|
||||||
|
sprintf("---------------------------------------------------------------------------"),
|
||||||
|
sep = "\n"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#' @title Update a worker script with compute interface object configuration.
|
||||||
|
#' @param object compute interface object.
|
||||||
|
#' @export
|
||||||
|
updateScript <- function(object) {
|
||||||
|
if (!file.exists(object$script) || length(object$script) == 0)
|
||||||
|
{
|
||||||
|
stop(paste("The script does not exist or is not specified!",
|
||||||
|
"Consider create a new one using riNewScript."))
|
||||||
|
}
|
||||||
|
|
||||||
|
codes.body <- readLines(con=object$script)
|
||||||
|
|
||||||
|
# Remove the header.
|
||||||
|
|
||||||
|
if (codes.body[2] == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
|
||||||
|
{
|
||||||
|
head.start <- which(codes.body == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
|
||||||
|
head.end <- which(codes.body == "# END OF THE HEADER ADDED BY COMPUTE INTERFACE")
|
||||||
|
codes.body <- codes.body[-((head.start - 1):(head.end + 1))]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add context-specific info into header.
|
||||||
|
|
||||||
|
codes.head <- paste(
|
||||||
|
"# ---------------------------------------------------------------------------",
|
||||||
|
"# THIS IS A HEADER ADDED BY COMPUTE INTERFACE",
|
||||||
|
"# ---------------------------------------------------------------------------",
|
||||||
|
sep="\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
if (object$config$CI_CONTEXT == "clusterParallel")
|
||||||
|
{
|
||||||
|
codes.head <- paste(
|
||||||
|
codes.head,
|
||||||
|
paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"),
|
||||||
|
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
|
||||||
|
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
|
||||||
|
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
|
||||||
|
paste("CI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$CI_SLAVES)), collapse=", "), ")"),
|
||||||
|
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
|
||||||
|
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
|
||||||
|
"\nlibrary(RevoScaleR)",
|
||||||
|
"# --------- Set compute context",
|
||||||
|
"cl <- makePSOCKcluster(names=CI_SLAVES, master=CI_MASTER, user=CI_VMUSER)",
|
||||||
|
"registerDoParallel(cl)",
|
||||||
|
"rxSetComputeContext(RxForeachDoPar())",
|
||||||
|
"# --------- Load data.",
|
||||||
|
"ciData <- read_csv(CI_DATA)",
|
||||||
|
"# ---------------------------------------------------------------------------",
|
||||||
|
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
|
||||||
|
"# ---------------------------------------------------------------------------\n",
|
||||||
|
sep="\n"
|
||||||
|
)
|
||||||
|
} else if (object$config$CI_CONTEXT == "Hadoop")
|
||||||
|
{
|
||||||
|
codes.head <- paste(
|
||||||
|
codes.head,
|
||||||
|
"This is for Hadoop.",
|
||||||
|
"\n"
|
||||||
|
)
|
||||||
|
} else if (object$config$CI_CONTEXT == "Spark")
|
||||||
|
{
|
||||||
|
codes.head <- paste(
|
||||||
|
codes.head,
|
||||||
|
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
|
||||||
|
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
|
||||||
|
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
|
||||||
|
paste("CI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$CI_SLAVES)), collapse=", "), ")"),
|
||||||
|
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
|
||||||
|
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
|
||||||
|
"\nlibrary(RevoScaleR)",
|
||||||
|
"# --------- Set compute context",
|
||||||
|
"myHadoopCluster <- RxSpark(persistentRun=TRUE, idleTimeOut=600)",
|
||||||
|
"rxSetComputeContext(myHadoopCluster)",
|
||||||
|
"# --------- Load data.",
|
||||||
|
"ciData <- read_csv(CI_DATA)",
|
||||||
|
"bigDataDirRoot <- '/share'",
|
||||||
|
"inputDir <- file.path(bigDataDirRoot, 'riBigData')",
|
||||||
|
"rxHadoopMakeDir(inputDir)",
|
||||||
|
"rxHadoopCopyFromLocal('./data.csv', inputDir)",
|
||||||
|
"hdfsFS <- RxHdfsFileSystem()",
|
||||||
|
"riTextData <- RxTextData(file=inputDir, fileSystem=hdfsFS)",
|
||||||
|
"# ---------------------------------------------------------------------------",
|
||||||
|
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
|
||||||
|
"# ---------------------------------------------------------------------------\n",
|
||||||
|
sep="\n"
|
||||||
|
)
|
||||||
|
} else if (object$config$CI_CONTEXT == "Teradata")
|
||||||
|
{
|
||||||
|
codes.head <- paste(
|
||||||
|
codes.head,
|
||||||
|
"This is for Teradata.",
|
||||||
|
"\n"
|
||||||
|
)
|
||||||
|
} else if (object$config$CI_CONTEXT == "localParallel")
|
||||||
|
{
|
||||||
|
codes.head <- paste(
|
||||||
|
codes.head,
|
||||||
|
paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"),
|
||||||
|
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
|
||||||
|
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
|
||||||
|
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
|
||||||
|
paste("CI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$CI_SLAVES)), collapse=", "), ")"),
|
||||||
|
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
|
||||||
|
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
|
||||||
|
"\nlibrary(RevoScaleR)",
|
||||||
|
"library(doParallel)",
|
||||||
|
"# --------- Set compute context",
|
||||||
|
"rxSetComputeContext(RxLocalParallel())",
|
||||||
|
"# --------- Load data.",
|
||||||
|
"ciData <- read_csv(CI_DATA)",
|
||||||
|
"# ---------------------------------------------------------------------------",
|
||||||
|
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
|
||||||
|
"# ---------------------------------------------------------------------------\n",
|
||||||
|
sep="\n"
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
stop(paste("Specify a context from \"localParallel\", \"clusterParallel\",",
|
||||||
|
"\"Hadoop\", \"Spark\", or \"Teradata\"."))
|
||||||
|
}
|
||||||
|
|
||||||
|
cat(codes.head, file=object$script)
|
||||||
|
cat(codes.body, file=object$script, sep="\n", append=TRUE)
|
||||||
|
}
|
|
@ -1,5 +1,4 @@
|
||||||
#' @title Remote execution of R script in an R interface object.
|
#' @title Remote execution of R script in an R interface object.
|
||||||
#' @param object R interface object.
|
|
||||||
#' @param inputs JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session.
|
#' @param inputs JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session.
|
||||||
#' @param outputs Character vector of the names of the objects to retreive. Only primitives, vectors and dataframes can be retrieved using this function. Use getRemoteObject to get any type of R object from the remote session.
|
#' @param outputs Character vector of the names of the objects to retreive. Only primitives, vectors and dataframes can be retrieved using this function. Use getRemoteObject to get any type of R object from the remote session.
|
||||||
#' @param checkLibraries if `TRUE`, check whether libraries used in the R script installed on the remote machine.
|
#' @param checkLibraries if `TRUE`, check whether libraries used in the R script installed on the remote machine.
|
||||||
|
@ -7,16 +6,25 @@
|
||||||
#' @param writePlots If TRUE, plots generated during execution are copied to the working directory of the local session.
|
#' @param writePlots If TRUE, plots generated during execution are copied to the working directory of the local session.
|
||||||
#' @return Status of scription execution.
|
#' @return Status of scription execution.
|
||||||
#' @export
|
#' @export
|
||||||
executeScript <- function(object,
|
executeScript <- function(remote,
|
||||||
|
user,
|
||||||
|
script,
|
||||||
|
computeContext,
|
||||||
inputs=NULL,
|
inputs=NULL,
|
||||||
outputs=NULL,
|
outputs=NULL,
|
||||||
checkLibraries=FALSE,
|
checkLibraries=FALSE,
|
||||||
displayPlots=FALSE,
|
displayPlots=FALSE,
|
||||||
writePlots=FALSE) {
|
writePlots=FALSE) {
|
||||||
|
|
||||||
# add remote spefic header to worker script.
|
# manage input strings in an interface object, set configuration, show interface contents, and update script with computing context specification.
|
||||||
|
|
||||||
editScript(object)
|
new_interface <-
|
||||||
|
createComputeInterface(remote, user, script) %>%
|
||||||
|
setConfig(dns_list=remote,
|
||||||
|
machine_user=user,
|
||||||
|
context=computeContext) %>%
|
||||||
|
dumpInterface() %>%
|
||||||
|
updateScript()
|
||||||
|
|
||||||
# authenticate the remote server.
|
# authenticate the remote server.
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
"virtualNetworks_name": {
|
"virtualNetworks_name": {
|
||||||
"value": "<DEFAULT>vnet"
|
"value": "<DEFAULT>vnet"
|
||||||
},
|
},
|
||||||
"storageAccounts_dsvmdisks490_name": {
|
"storageAccounts_name": {
|
||||||
"value": "<DEFAULT>sa"
|
"value": "<DEFAULT>sa"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
"defaultValue": "my_dsvm_rg_sea-vnet",
|
"defaultValue": "my_dsvm_rg_sea-vnet",
|
||||||
"type": "String"
|
"type": "String"
|
||||||
},
|
},
|
||||||
"storageAccounts_dsvmdisks490_name": {
|
"storageAccounts_name": {
|
||||||
"defaultValue": "dsvmdisks490",
|
"defaultValue": "dsvmdisks490",
|
||||||
"type": "String"
|
"type": "String"
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,7 @@
|
||||||
"name": "[parameters('virtualMachines_name')]",
|
"name": "[parameters('virtualMachines_name')]",
|
||||||
"createOption": "FromImage",
|
"createOption": "FromImage",
|
||||||
"vhd": {
|
"vhd": {
|
||||||
"uri": "[concat('https://projectsdisks415.blob.core.windows.net/vhds/', parameters('virtualMachines_name'),'20170215163501.vhd')]"
|
"uri": "[concat('http://',concat(uniquestring(resourceGroup().id), parameters('storageAccounts_name')),'.blob.core.windows.net/','vhds','/','osdiskforwindowssimple', parameters('virtualMachines_name'), '.vhd')]"
|
||||||
},
|
},
|
||||||
"caching": "ReadWrite"
|
"caching": "ReadWrite"
|
||||||
},
|
},
|
||||||
|
@ -231,7 +231,7 @@
|
||||||
"tier": "Standard"
|
"tier": "Standard"
|
||||||
},
|
},
|
||||||
"kind": "Storage",
|
"kind": "Storage",
|
||||||
"name": "[parameters('storageAccounts_dsvmdisks490_name')]",
|
"name": "[parameters('storageAccounts_name')]",
|
||||||
"apiVersion": "2016-01-01",
|
"apiVersion": "2016-01-01",
|
||||||
"location": "[parameters('location')]",
|
"location": "[parameters('location')]",
|
||||||
"tags": {},
|
"tags": {},
|
||||||
|
|
Загрузка…
Ссылка в новой задаче