зеркало из https://github.com/Azure/AzureDSVM.git
Updated cluster deployment. Modified vignette. Tested and deployment was succesful.
This commit is contained in:
Родитель
bb9118d552
Коммит
4a5c9c25d9
|
@ -1,14 +1,13 @@
|
|||
# Generated by roxygen2: do not edit by hand
|
||||
|
||||
export(createRInterface)
|
||||
export(createComputeInterface)
|
||||
export(deployDSVM)
|
||||
export(dumpObject)
|
||||
export(deployDSVMCluster)
|
||||
export(dumpInterface)
|
||||
export(executeScript)
|
||||
export(getVMSizes)
|
||||
export(newScript)
|
||||
export(operateDSVM)
|
||||
export(setConfig)
|
||||
export(setRInterface)
|
||||
export(updateScript)
|
||||
import(dplyr)
|
||||
import(magrittr)
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
#' @return An S3 compute interface object.
|
||||
#' @export
|
||||
createComputeInterface <- function(remote,
|
||||
user,
|
||||
script,
|
||||
config){
|
||||
user,
|
||||
script,
|
||||
config){
|
||||
ci_env <- new.env(parent=globalenv())
|
||||
|
||||
# initialize an compute interface object.
|
||||
|
@ -71,11 +71,11 @@ setConfig <- function(object,
|
|||
return(object)
|
||||
}
|
||||
|
||||
#' @title Dump out the object configuration.
|
||||
#' @title Dump the interface configuration.
|
||||
#' @param object The compute interface object.
|
||||
#' @return No return. Print compute interface object information.
|
||||
#' @export
|
||||
dumpObject <- function(object) {
|
||||
dumpInterface <- function(object) {
|
||||
cat(
|
||||
sprintf("---------------------------------------------------------------------------"),
|
||||
sprintf("Compute interface information"),
|
||||
|
@ -106,24 +106,23 @@ dumpObject <- function(object) {
|
|||
updateScript <- function(object) {
|
||||
if (!file.exists(object$script) || length(object$script) == 0)
|
||||
{
|
||||
stop(paste("The script does not exist or is not specified!",
|
||||
"Consider create a new one using riNewScript."))
|
||||
stop("The script does not exist or is not specified!")
|
||||
}
|
||||
|
||||
codes.body <- readLines(con=object$script)
|
||||
codes_body <- readLines(con=object$script)
|
||||
|
||||
# Remove the header.
|
||||
|
||||
if (codes.body[2] == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
|
||||
if (!is.na(codes_body[2]) && codes_body[2] == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
|
||||
{
|
||||
head.start <- which(codes.body == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
|
||||
head.end <- which(codes.body == "# END OF THE HEADER ADDED BY COMPUTE INTERFACE")
|
||||
codes.body <- codes.body[-((head.start - 1):(head.end + 1))]
|
||||
head.start <- which(codes_body == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
|
||||
head.end <- which(codes_body == "# END OF THE HEADER ADDED BY COMPUTE INTERFACE")
|
||||
codes_body <- codes_body[-((head.start - 1):(head.end + 1))]
|
||||
}
|
||||
|
||||
# Add context-specific info into header.
|
||||
|
||||
codes.head <- paste(
|
||||
codes_head <- paste(
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# THIS IS A HEADER ADDED BY COMPUTE INTERFACE",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
|
@ -132,8 +131,8 @@ updateScript <- function(object) {
|
|||
|
||||
if (object$config$CI_CONTEXT == "clusterParallel")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
codes_head <- paste(
|
||||
codes_head,
|
||||
paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"),
|
||||
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
|
||||
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
|
||||
|
@ -155,15 +154,15 @@ updateScript <- function(object) {
|
|||
)
|
||||
} else if (object$config$CI_CONTEXT == "Hadoop")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
codes_head <- paste(
|
||||
codes_head,
|
||||
"This is for Hadoop.",
|
||||
"\n"
|
||||
)
|
||||
} else if (object$config$CI_CONTEXT == "Spark")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
codes_head <- paste(
|
||||
codes_head,
|
||||
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
|
||||
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
|
||||
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
|
||||
|
@ -189,15 +188,15 @@ updateScript <- function(object) {
|
|||
)
|
||||
} else if (object$config$CI_CONTEXT == "Teradata")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
codes_head <- paste(
|
||||
codes_head,
|
||||
"This is for Teradata.",
|
||||
"\n"
|
||||
)
|
||||
} else if (object$config$CI_CONTEXT == "localParallel")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
codes_head <- paste(
|
||||
codes_head,
|
||||
paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"),
|
||||
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
|
||||
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
|
||||
|
@ -221,6 +220,6 @@ updateScript <- function(object) {
|
|||
"\"Hadoop\", \"Spark\", or \"Teradata\"."))
|
||||
}
|
||||
|
||||
cat(codes.head, file=object$script)
|
||||
cat(codes.body, file=object$script, sep="\n", append=TRUE)
|
||||
cat(codes_head, file=object$script)
|
||||
cat(codes_body, file=object$script, sep="\n", append=TRUE)
|
||||
}
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
#' @title Create an R interface object that handles interaction with remote instance.
|
||||
#' @param remote URL of remote instance.
|
||||
#' @param user User name.
|
||||
#' @param script R script with full path for execution at remote instance.
|
||||
#' @param config Configuration for remote execution. Settings include computing context, data reference, etc.
|
||||
#' @return An S3 R interface object.
|
||||
#' @export
|
||||
createRInterface <- function(remote,
|
||||
user,
|
||||
script,
|
||||
config){
|
||||
ri_env <- new.env(parent=globalenv())
|
||||
|
||||
# initialize an R interface object.
|
||||
|
||||
if(!missing(remote)) {
|
||||
ri_env$remote <- remote
|
||||
} else {
|
||||
ri_env$remote <- character(0)
|
||||
}
|
||||
|
||||
if(!missing(user)) {
|
||||
ri_env$user <- user
|
||||
} else {
|
||||
ri_env$user <- character(0)
|
||||
}
|
||||
|
||||
if(!missing(script)) {
|
||||
ri_env$script <- script
|
||||
} else {
|
||||
ri_env$script <- character(0)
|
||||
}
|
||||
|
||||
if(!missing(config)) {
|
||||
ri_env$config <- config
|
||||
} else {
|
||||
ri_env$config <- NULL
|
||||
}
|
||||
}
|
|
@ -165,11 +165,11 @@ deployDSVM <- function(context,
|
|||
resourceGroup=resource.group,
|
||||
mode=mode)
|
||||
|
||||
fqdn <- paste0(name, ".", location, ".cloudapp.azure.com")
|
||||
|
||||
if (tolower(mode) == "sync")
|
||||
attr(fqdn, "ip") <-
|
||||
system(paste("dig", fqdn, "+short"), intern=TRUE) # Get from the VM meta data?
|
||||
|
||||
return(fqdn)
|
||||
# fqdn <- paste0(name, ".", location, ".cloudapp.azure.com")
|
||||
#
|
||||
# if (tolower(mode) == "sync")
|
||||
# attr(fqdn, "ip") <-
|
||||
# system(paste("dig", fqdn, "+short"), intern=TRUE) # Get from the VM meta data?
|
||||
#
|
||||
# return(fqdn)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
#' @title This function creates a cluster of Data Science Virtual Machines and enable the DSVMs to communicate across the cluster via public key based credentials for high performance computation. All DSVMs in the cluster are based on Linux OS and use public key cryptogrphy for log in.
|
||||
#' @param context AzureSMR active context.
|
||||
#' @param resource.group The Azure resource group where the DSVM is allocated.
|
||||
#' @param location Location of the data centre to host the DSVM.
|
||||
#' @param count Number of DSVM instances to be created. Note deploying multiple DSVMs may consume some time.
|
||||
#' @param name Names of the DSVMs. Lowercase characters or numbers only. Special characters are not permitted.
|
||||
#' @param username User name of the DSVM. It should be different from name of the DSVM.
|
||||
#' @param pubkey Public key for the DSVM. Only applicable for
|
||||
#' public-key based authentication of Linux based DSVM.
|
||||
#' @param dns DNS label for the VM address. The URL for accessing the deployed DSVM will be "<dns_label>.<location>.cloudapp.azure.com
|
||||
#' @param cluster A logical value of TRUE or FALSE to indicate whether the deployed DSVMs form a cluster. If not, the deployment will assign the vectors of name, username, and public key as given in the input arguments to the DSVMs - this is usually used for creating multiple DSVMs for a group of data scientists. If it is TRUE, the deployment will use the first element (if it consists more than one elements) of the given DSVM names as base, and append serial number to the base to form a DSVM full name, and then use the SAME username and public key across the cluster - this can be used for creating a HPC engine on top of the deployed DSVMs in which parallel computing context which is availed in Microsoft R Server ScaleR package can be applied for embarassing parallelization.
|
||||
#' @export
|
||||
deployDSVMCluster <- function(context,
|
||||
resource.group,
|
||||
location,
|
||||
count=1,
|
||||
name,
|
||||
username,
|
||||
size="Standard_D1_v2",
|
||||
pubkey="",
|
||||
dns=name,
|
||||
cluster=FALSE) {
|
||||
|
||||
# do some checks for the input arguments.
|
||||
|
||||
if(missing(context))
|
||||
stop("Please specify a context (contains TID, CID, KEY).")
|
||||
|
||||
if(missing(resource.group))
|
||||
stop("Please specify an Azure resouce group.")
|
||||
|
||||
if(missing(location))
|
||||
stop("Please specify a data centre location.")
|
||||
|
||||
if(missing(name))
|
||||
stop("Please specify virtual machine name(s).")
|
||||
|
||||
if(missing(username))
|
||||
stop("Please specify virtual machine user name(s).")
|
||||
|
||||
# other preconditions.
|
||||
|
||||
# let's limit the number of DSVM deployments to a reasonable number.
|
||||
|
||||
if(count > 10) {
|
||||
ans <- readline("More than 10 DSVMs are going to be created and that may take a long time to finish. Continue? (y/n)")
|
||||
if(ans == "n" || ans == "N")
|
||||
return("The deployment is aborted.")
|
||||
}
|
||||
|
||||
# to deploy the cluster of DSVMs.
|
||||
|
||||
if(cluster) {
|
||||
|
||||
name <- ifelse(length(name) > 1, name[1], name)
|
||||
|
||||
# append serial no. to name base to form a full name.
|
||||
|
||||
names <- paste0(name, sprintf("%03d", 1:count))
|
||||
|
||||
for (inc in 1:count) {
|
||||
deployDSVM(context=context,
|
||||
resource.group=resource.group,
|
||||
location=location,
|
||||
name=names[inc],
|
||||
username=username,
|
||||
size=size,
|
||||
os="Linux",
|
||||
authen="Key",
|
||||
pubkey=pubkey,
|
||||
dns=names[inc],
|
||||
mode=ifelse(inc==count, "Sync", "ASync"))
|
||||
}
|
||||
|
||||
# set up public credentials for DSVM cluster, to allow DSVMs to communicate with each other. This is required if one wants to execute analytics on the cluster with parallel compute context in ScaleR.
|
||||
|
||||
# TODO: transmitting private key is not good practice! Seeking a better method...
|
||||
|
||||
HOME_DIR <- ifelse(identical(.Platform$OS.type, "windows"),
|
||||
normalizePath(paste0(Sys.getenv("HOME"), "/../"), winslash = "/"),
|
||||
Sys.getenv("HOME"))
|
||||
|
||||
# generate public key from private key.
|
||||
|
||||
shell(paste0("ssh-keygen -y -f ", HOME_DIR, ".ssh/id_rsa > ./id_rsa.pub"))
|
||||
|
||||
# copy private key into local directory. Note local machine may be either Linux or Windows based so it is treated differently.
|
||||
|
||||
ifelse(identical(.Platform$OS.type, "windows"),
|
||||
system(paste0("xcopy /f ", shQuote(paste0(HOME_DIR, ".ssh/id_rsa"), type = "cmd"),
|
||||
" ", shQuote(".", type = "cmd"))),
|
||||
system("cp ~/.ssh/id_rsa ."))
|
||||
|
||||
dns_name_list <- paste(names,
|
||||
location,
|
||||
"cloudapp.azure.com",
|
||||
sep=".")
|
||||
|
||||
# Distribute the key pair to all nodes of the cluster.
|
||||
|
||||
for (vm in dns_name_list)
|
||||
{
|
||||
# add an option to switch off host key checking - for the purpose of avoiding pop up.
|
||||
|
||||
option <- "-q -o StrictHostKeyChecking=no"
|
||||
|
||||
# copy the key pairs onto cluster node.
|
||||
|
||||
system(sprintf("scp %s ./id_rsa %s@%s:.ssh/", option, username, vm))
|
||||
system(sprintf("scp %s ./id_rsa.pub %s@%s:.ssh/", option, username, vm))
|
||||
|
||||
# create a config file to switch off strick host key checking to enable node-to-node authentication without pop up.
|
||||
|
||||
sh <- writeChar(c("cat .ssh/id_rsa.pub > .ssh/authorized_keys\n",
|
||||
paste0("echo Host *.", location, ".cloudapp.azure.com >> ~/.ssh/config\n"),
|
||||
paste0("echo StrictHostKeyChecking no >> ~/.ssh/config\n"),
|
||||
paste0("echo UserKnownHostsFile /dev/null >> ~/.ssh/config\n"),
|
||||
"chmod 600 ~/.ssh/config\n"), con = "./shell_script")
|
||||
|
||||
# upload, change mode of, and run the config script.
|
||||
|
||||
system(sprintf("scp %s shell_script %s@%s:~", option, username, vm), show.output.on.console = FALSE)
|
||||
system(sprintf("ssh %s -l %s %s 'chmod +x ~/shell_script'", option, username, vm), show.output.on.console = FALSE)
|
||||
system(sprintf("ssh %s -l %s %s '~/shell_script'", option, username, vm), show.output.on.console = FALSE)
|
||||
}
|
||||
|
||||
# Clean up - if you search "remove password" you will get 284,505 records so the following is to clean up confidential information in the working directory to prevent them from leaking anywhere out of your control.
|
||||
|
||||
file.remove("./id_rsa", "./id_rsa.pub", "./shell_script")
|
||||
} else {
|
||||
|
||||
# check whether the input arguments are valid for the multi-instance deployment-
|
||||
# 1. VM names, usernames, and public key should character vectors that have the same length as the number of DSVMs.
|
||||
# 2. The name and username vectors should not contain duplicated elements.
|
||||
|
||||
if(all(length(name) == count,
|
||||
length(username) == count,
|
||||
length(pubkey) == count) &&
|
||||
!any(c(duplicated(name), duplicated(username)))) {
|
||||
|
||||
names <- name
|
||||
dns_name_list <- paste(names,
|
||||
location,
|
||||
"cloudapp.azure.com",
|
||||
sep=".")
|
||||
|
||||
for (inc in 1:count) {
|
||||
deployDSVM(context=context,
|
||||
resource.group=resource.group,
|
||||
location=location,
|
||||
name=name[inc],
|
||||
username=username[inc],
|
||||
size=size,
|
||||
os="Linux",
|
||||
authen="Key",
|
||||
pubkey=pubkey[inc],
|
||||
dns=name[inc],
|
||||
mode=ifelse(inc==count, "Sync", "ASync"))
|
||||
}
|
||||
} else {
|
||||
stop("Please check input DSVM names, usernames, and public key. The input vectors should have the same length with count of DSVM and elements of each input vectot should be distinct from each other.")
|
||||
}
|
||||
}
|
||||
|
||||
# return results for reference.
|
||||
|
||||
df <- data.frame(Names = names,
|
||||
Usernames = rep(username, count),
|
||||
URL = dns_name_list,
|
||||
Sizes = rep(size, count))
|
||||
|
||||
writeLines("Summary of DSVMs deployed")
|
||||
print(df)
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
#' @title Dump out the object configuration.
|
||||
#' @param object The R interface object.
|
||||
#' @return No return. Print R interface object information.
|
||||
#' @export
|
||||
dumpObject <- function(object) {
|
||||
cat(
|
||||
sprintf("---------------------------------------------------------------------------"),
|
||||
sprintf("r Interface information"),
|
||||
sprintf("---------------------------------------------------------------------------"),
|
||||
sprintf("The R script to be executed:\t%s.", shQuote(object$script)),
|
||||
sprintf("The remote host:\t\t%s.", shQuote(object$remote)),
|
||||
sprintf("The login user name:\t\t%s.", shQuote(object$user)),
|
||||
sprintf("---------------------------------------------------------------------------"),
|
||||
sprintf("The configuration of the interface is:"),
|
||||
# sprintf("virtual machines: %s", ifelse(!is.na(object$config$RI_MACHINES), object$config$RI_MACHINES, "N/A")),
|
||||
sprintf("virtual machines\t\t %s", unlist(object$config$RI_MACHINES)),
|
||||
sprintf("dns list\t\t\t %s", unlist(object$config$RI_DNS)),
|
||||
sprintf("user to these machines\t\t %s", unlist(object$config$RI_VMUSER)),
|
||||
sprintf("the master node\t\t\t %s", unlist(object$config$RI_MASTER)),
|
||||
sprintf("the slave nodes\t\t\t %s", unlist(object$config$RI_SLAVES)),
|
||||
sprintf("the data source\t\t\t %s", unlist(object$config$RI_DATA)),
|
||||
sprintf("the computing context\t\t %s", unlist(object$config$RI_CONTEXT)),
|
||||
sprintf("---------------------------------------------------------------------------"),
|
||||
sprintf("# End of information session"),
|
||||
sprintf("---------------------------------------------------------------------------"),
|
||||
sep = "\n"
|
||||
)
|
||||
}
|
|
@ -1,4 +1,10 @@
|
|||
#' @title Remote execution of R script in an R interface object.
|
||||
#' @param context AzureSMR context.
|
||||
#' @param resourceGroup Resource group of Azure resources for computation.
|
||||
#' @param remote Remote URL for the computation engine. For DSVM, it is either DNS (usually in the format of <dsvm name>.<location>.cloudapp.azure.com) or IP address.
|
||||
#' @param user Username for logging into the remote resource.
|
||||
#' @param script R script to be executed on remote resource.
|
||||
#' @param computeContext Computation context of Microsoft R Server under which the mechanisms of parallelization (e.g., local parallel, cluster based parallel, or Spark) is specified. Accepted computing context include "localParallel", "clusterParallel", "Hadoop", and "Spark".
|
||||
#' @param inputs JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session.
|
||||
#' @param outputs Character vector of the names of the objects to retreive. Only primitives, vectors and dataframes can be retrieved using this function. Use getRemoteObject to get any type of R object from the remote session.
|
||||
#' @param checkLibraries if `TRUE`, check whether libraries used in the R script installed on the remote machine.
|
||||
|
@ -6,7 +12,9 @@
|
|||
#' @param writePlots If TRUE, plots generated during execution are copied to the working directory of the local session.
|
||||
#' @return Status of scription execution.
|
||||
#' @export
|
||||
executeScript <- function(remote,
|
||||
executeScript <- function(context,
|
||||
resourceGroup,
|
||||
remote,
|
||||
user,
|
||||
script,
|
||||
computeContext,
|
||||
|
@ -16,18 +24,45 @@ executeScript <- function(remote,
|
|||
displayPlots=FALSE,
|
||||
writePlots=FALSE) {
|
||||
|
||||
# manage input strings in an interface object, set configuration, show interface contents, and update script with computing context specification.
|
||||
# manage input strings in an interface object.
|
||||
|
||||
new_interface <-
|
||||
createComputeInterface(remote, user, script) %>%
|
||||
setConfig(dns_list=remote,
|
||||
machine_user=user,
|
||||
context=computeContext) %>%
|
||||
dumpInterface() %>%
|
||||
updateScript()
|
||||
new_interface <- createComputeInterface(remote, user, script)
|
||||
|
||||
# set configuration
|
||||
|
||||
new_interface %<>% setConfig(new_interface,
|
||||
dns_list=remote,
|
||||
machine_user=user,
|
||||
context=computeContext)
|
||||
|
||||
# print interface contents.
|
||||
|
||||
dumpInterface(new_interface)
|
||||
|
||||
# update script with computing context.
|
||||
|
||||
updateScript(new_interface)
|
||||
|
||||
# authenticate the remote server.
|
||||
|
||||
status <- operateDSVM(context, )
|
||||
|
||||
# some preconditions of using Microsoft R Server.
|
||||
|
||||
# load mrsdeploy on remote machine.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
mrsdeploy::remoteLogin(deployr_endpoint=object$remote,
|
||||
session=FALSE,
|
||||
commandline=FALSE,
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
#' @title Generate a new worker script which is run on the remote instance with specifications in R interface object configuration.
|
||||
#' @param path Path to the script.
|
||||
#' @param title Title of the script.
|
||||
#' @export
|
||||
newScript <- function(path=".",
|
||||
title=paste0("worker_new_", Sys.time(), ".R")) {
|
||||
notes <-
|
||||
sprintf(
|
||||
paste(
|
||||
"\n# ---------------------------------------------------------------------------",
|
||||
"# Your worker script starts from here ... ",
|
||||
"# ---------------------------------------------------------------------------\n",
|
||||
sep="\n"
|
||||
)
|
||||
)
|
||||
if (missing(path) || missing(title))
|
||||
{
|
||||
stop(sprintf("A default script named %s located at %s is created.", title, path))
|
||||
}
|
||||
|
||||
cat(notes, file=file.path(path, title))
|
||||
writeLines(
|
||||
sprintf("Worker script %s is created at %s.",
|
||||
title, ifelse(path == ".", "work directory", path))
|
||||
)
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
#' @title Set configuration for the R interface object.
|
||||
#' @param object An S3 R interface object.
|
||||
#' @param machine_list List of remote instances that execute R scripts.
|
||||
#' @param dns_list DNS of the remote instances.
|
||||
#' @param machine_user User name of the remote instances.
|
||||
#' @param master Master node of the machine.
|
||||
#' @param slaves Slave nodes of the machine.
|
||||
#' @param data Reference to data used in the analytics.
|
||||
#' @param context Computing context available in Microsoft R Server for running the analytics.
|
||||
#' @export
|
||||
setConfig <- function(object,
|
||||
machine_list,
|
||||
dns_list,
|
||||
machine_user,
|
||||
master,
|
||||
slaves,
|
||||
data,
|
||||
context) {
|
||||
object$config <- list(
|
||||
RI_MACHINES = ifelse(!missing(machine_list), list(machine_list), ""),
|
||||
RI_DNS = ifelse(!missing(dns_list), list(dns_list), ""),
|
||||
RI_VMUSER = ifelse(!missing(machine_user), list(machine_user), ""),
|
||||
RI_MASTER = ifelse(!missing(master), list(master), ""),
|
||||
RI_SLAVES = ifelse(!missing(slaves), list(slaves), ""),
|
||||
RI_DATA = ifelse(!missing(data), list(data), ""),
|
||||
RI_CONTEXT = ifelse(!missing(context), list(context), "")
|
||||
)
|
||||
|
||||
return(object)
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
#' @title Set values of the R interface object.
|
||||
#' @param object An S3 R interface object.
|
||||
#' @param remote URL of remote instance.
|
||||
#' @param user User name.
|
||||
#' @param script R script with full path for execution at remote instance.
|
||||
#' @param config Configuration for remote execution. Settings include computing context, data reference, etc.
|
||||
#' @return The updated R interface object.
|
||||
#' @export
|
||||
setRInterface <- function(object,
|
||||
remote,
|
||||
user,
|
||||
script,
|
||||
config) {
|
||||
if(!missing(remote)) object$remote <- remote
|
||||
if(!missing(user)) object$user <- user
|
||||
if(!missing(script) && file.exists(script))
|
||||
{
|
||||
object$script <- script
|
||||
}
|
||||
if(!missing(config)) object$config <- config
|
||||
|
||||
return(object)
|
||||
}
|
127
R/updateScript.R
127
R/updateScript.R
|
@ -1,127 +0,0 @@
|
|||
#' @title Update a worker script with R interface object configuration.
|
||||
#' @param object R interface object.
|
||||
#' @export
|
||||
updateScript <- function(object) {
|
||||
if (!file.exists(object$script) || length(object$script) == 0)
|
||||
{
|
||||
stop(paste("The script does not exist or is not specified!",
|
||||
"Consider create a new one using riNewScript."))
|
||||
}
|
||||
|
||||
codes.body <- readLines(con=object$script)
|
||||
|
||||
# Remove the header.
|
||||
|
||||
if (codes.body[2] == "# THIS IS A HEADER ADDED BY R INTERFACE")
|
||||
{
|
||||
head.start <- which(codes.body == "# THIS IS A HEADER ADDED BY R INTERFACE")
|
||||
head.end <- which(codes.body == "# END OF THE HEADER ADDED BY R INTERFACE")
|
||||
codes.body <- codes.body[-((head.start - 1):(head.end + 1))]
|
||||
}
|
||||
|
||||
# Add context-specific info into header.
|
||||
|
||||
codes.head <- paste(
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# THIS IS A HEADER ADDED BY R INTERFACE",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
sep="\n"
|
||||
)
|
||||
|
||||
if (object$config$RI_CONTEXT == "clusterParallel")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
paste("RI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$RI_MACHINES)), collapse=", "), ")"),
|
||||
paste("RI_DNS <-", "c(", paste(shQuote(unlist(object$config$RI_DNS)), collapse=", "), ")"),
|
||||
paste("RI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$RI_VMUSER)), collapse=", "), ")"),
|
||||
paste("RI_MASTER <-", "c(", paste(shQuote(unlist(object$config$RI_MASTER)), collapse=", "), ")"),
|
||||
paste("RI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$RI_SLAVES)), collapse=", "), ")"),
|
||||
paste("RI_DATA <-", paste(shQuote(unlist(object$config$RI_DATA)), collapse=", ")),
|
||||
paste("RI_CONTEXT <-", paste(shQuote(unlist(object$config$RI_CONTEXT)), collapse=", ")),
|
||||
"\nlibrary(RevoScaleR)",
|
||||
"# --------- Set compute context",
|
||||
"cl <- makePSOCKcluster(names=RI_SLAVES, master=RI_MASTER, user=RI_VMUSER)",
|
||||
"registerDoParallel(cl)",
|
||||
"rxSetComputeContext(RxForeachDoPar())",
|
||||
"# --------- Load data.",
|
||||
"download.file(url=RI_DATA, destfile='./data.csv')",
|
||||
"riData <- read.csv('./data.csv', header=T, sep=',', stringAsFactor=F)",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# END OF THE HEADER ADDED BY R INTERFACE",
|
||||
"# ---------------------------------------------------------------------------\n",
|
||||
sep="\n"
|
||||
)
|
||||
} else if (object$config$RI_CONTEXT == "Hadoop")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
"This is for Hadoop.",
|
||||
"\n"
|
||||
)
|
||||
} else if (object$config$RI_CONTEXT == "Spark")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
paste("RI_DNS <-", "c(", paste(shQuote(unlist(object$config$RI_DNS)), collapse=", "), ")"),
|
||||
paste("RI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$RI_VMUSER)), collapse=", "), ")"),
|
||||
paste("RI_MASTER <-", "c(", paste(shQuote(unlist(object$config$RI_MASTER)), collapse=", "), ")"),
|
||||
paste("RI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$RI_SLAVES)), collapse=", "), ")"),
|
||||
paste("RI_DATA <-", paste(shQuote(unlist(object$config$RI_DATA)), collapse=", ")),
|
||||
paste("RI_CONTEXT <-", paste(shQuote(unlist(object$config$RI_CONTEXT)), collapse=", ")),
|
||||
"\nlibrary(RevoScaleR)",
|
||||
"# --------- Set compute context",
|
||||
"myHadoopCluster <- RxSpark(persistentRun=TRUE, idleTimeOut=600)",
|
||||
"rxSetComputeContext(myHadoopCluster)",
|
||||
"# --------- Load data.",
|
||||
"download.file(url=RI_DATA, destfile='./data.csv')",
|
||||
"riData <- read.csv('./data.csv', header=T, sep=',', stringAsFactor=F)",
|
||||
"bigDataDirRoot <- '/share'",
|
||||
"inputDir <- file.path(bigDataDirRoot, 'riBigData')",
|
||||
"rxHadoopMakeDir(inputDir)",
|
||||
"rxHadoopCopyFromLocal('./data.csv', inputDir)",
|
||||
"hdfsFS <- RxHdfsFileSystem()",
|
||||
"riTextData <- RxTextData(file=inputDir, fileSystem=hdfsFS)",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# END OF THE HEADER ADDED BY R INTERFACE",
|
||||
"# ---------------------------------------------------------------------------\n",
|
||||
sep="\n"
|
||||
)
|
||||
} else if (object$config$RI_CONTEXT == "Teradata")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
"This is for Teradata.",
|
||||
"\n"
|
||||
)
|
||||
} else if (object$config$RI_CONTEXT == "localParallel")
|
||||
{
|
||||
codes.head <- paste(
|
||||
codes.head,
|
||||
paste("RI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$RI_MACHINES)), collapse=", "), ")"),
|
||||
paste("RI_DNS <-", "c(", paste(shQuote(unlist(object$config$RI_DNS)), collapse=", "), ")"),
|
||||
paste("RI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$RI_VMUSER)), collapse=", "), ")"),
|
||||
paste("RI_MASTER <-", "c(", paste(shQuote(unlist(object$config$RI_MASTER)), collapse=", "), ")"),
|
||||
paste("RI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$RI_SLAVES)), collapse=", "), ")"),
|
||||
paste("RI_DATA <-", paste(shQuote(unlist(object$config$RI_DATA)), collapse=", ")),
|
||||
paste("RI_CONTEXT <-", paste(shQuote(unlist(object$config$RI_CONTEXT)), collapse=", ")),
|
||||
"\nlibrary(RevoScaleR)",
|
||||
"library(doParallel)",
|
||||
"# --------- Set compute context",
|
||||
"rxSetComputeContext(RxLocalParallel())",
|
||||
"# --------- Load data.",
|
||||
"download.file(url=RI_DATA, destfile='./data.csv')",
|
||||
"riData <- read.csv('./data.csv', header=T, sep=',', stringAsFactor=F)",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# END OF THE HEADER ADDED BY R INTERFACE",
|
||||
"# ---------------------------------------------------------------------------\n",
|
||||
sep="\n"
|
||||
)
|
||||
} else {
|
||||
stop(paste("Specify a context from \"localParallel\", \"clusterParallel\",",
|
||||
"\"Hadoop\", \"Spark\", or \"Teradata\"."))
|
||||
}
|
||||
|
||||
cat(codes.head, file=object$script)
|
||||
cat(codes.body, file=object$script, sep="\n", append=TRUE)
|
||||
}
|
|
@ -1,14 +1,12 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/setInterface.R
|
||||
\name{setRInterface}
|
||||
\alias{setRInterface}
|
||||
\title{Set values of the R interface object.}
|
||||
% Please edit documentation in R/computeInterface.R
|
||||
\name{createComputeInterface}
|
||||
\alias{createComputeInterface}
|
||||
\title{Create an compute interface object that handles interaction with remote instance.}
|
||||
\usage{
|
||||
setRInterface(object, remote, user, script, config)
|
||||
createComputeInterface(remote, user, script, config)
|
||||
}
|
||||
\arguments{
|
||||
\item{object}{An S3 R interface object.}
|
||||
|
||||
\item{remote}{URL of remote instance.}
|
||||
|
||||
\item{user}{User name.}
|
||||
|
@ -18,6 +16,6 @@ setRInterface(object, remote, user, script, config)
|
|||
\item{config}{Configuration for remote execution. Settings include computing context, data reference, etc.}
|
||||
}
|
||||
\value{
|
||||
The updated R interface object.
|
||||
An S3 compute interface object.
|
||||
}
|
||||
|
|
@ -1,21 +0,0 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/createInterface.R
|
||||
\name{createRInterface}
|
||||
\alias{createRInterface}
|
||||
\title{Create an R interface object that handles interaction with remote instance.}
|
||||
\usage{
|
||||
createRInterface(remote, user, script, config)
|
||||
}
|
||||
\arguments{
|
||||
\item{remote}{URL of remote instance.}
|
||||
|
||||
\item{user}{User name.}
|
||||
|
||||
\item{script}{R script with full path for execution at remote instance.}
|
||||
|
||||
\item{config}{Configuration for remote execution. Settings include computing context, data reference, etc.}
|
||||
}
|
||||
\value{
|
||||
An S3 R interface object.
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/deployDSVMCluster.R
|
||||
\name{deployDSVMCluster}
|
||||
\alias{deployDSVMCluster}
|
||||
\title{This function creates a cluster of Data Science Virtual Machines and enable the DSVMs to communicate across the cluster via public key based credentials for high performance computation. All DSVMs in the cluster are based on Linux OS and use public key cryptogrphy for log in.}
|
||||
\usage{
|
||||
deployDSVMCluster(context, resource.group, location, count = 1, name,
|
||||
username, size = "Standard_D1_v2", pubkey = "", dns = name,
|
||||
cluster = FALSE, mode = "Sync")
|
||||
}
|
||||
\arguments{
|
||||
\item{context}{AzureSMR active context.}
|
||||
|
||||
\item{resource.group}{The Azure resource group where the DSVM is allocated.}
|
||||
|
||||
\item{location}{Location of the data centre to host the DSVM.}
|
||||
|
||||
\item{count}{Number of DSVM instances to be created. Note deploying multiple DSVMs may consume some time.}
|
||||
|
||||
\item{name}{Names of the DSVMs. Lowercase characters or numbers only. Special characters are not permitted.}
|
||||
|
||||
\item{username}{User name of the DSVM. It should be different from name of the DSVM.}
|
||||
|
||||
\item{pubkey}{Public key for the DSVM. Only applicable for
|
||||
public-key based authentication of Linux based DSVM.}
|
||||
|
||||
\item{dns}{DNS label for the VM address. The URL for accessing the deployed DSVM will be "<dns_label>.<location>.cloudapp.azure.com}
|
||||
|
||||
\item{cluster}{A logical value of TRUE or FALSE to indicate whether the deployed DSVMs form a cluster. If not, the deployment will assign the vectors of name, username, and public key as given in the input arguments to the DSVMs - this is usually used for creating multiple DSVMs for a group of data scientists. If it is TRUE, the deployment will use the first element (if it consists more than one elements) of the given DSVM names as base, and append serial number to the base to form a DSVM full name, and then use the SAME username and public key across the cluster - this can be used for creating a HPC engine on top of the deployed DSVMs in which parallel computing context which is availed in Microsoft R Server ScaleR package can be applied for embarassing parallelization.}
|
||||
|
||||
\item{mode}{Mode of virtual machine deployment. Default is "Sync".}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/computeInterface.R
|
||||
\name{dumpInterface}
|
||||
\alias{dumpInterface}
|
||||
\title{Dump the interface configuration.}
|
||||
\usage{
|
||||
dumpInterface(object)
|
||||
}
|
||||
\arguments{
|
||||
\item{object}{The compute interface object.}
|
||||
}
|
||||
\value{
|
||||
No return. Print compute interface object information.
|
||||
}
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/dumpInterface.R
|
||||
\name{dumpObject}
|
||||
\alias{dumpObject}
|
||||
\title{Dump out the object configuration.}
|
||||
\usage{
|
||||
dumpObject(object)
|
||||
}
|
||||
\arguments{
|
||||
\item{object}{The R interface object.}
|
||||
}
|
||||
\value{
|
||||
No return. Print R interface object information.
|
||||
}
|
||||
|
|
@ -4,11 +4,22 @@
|
|||
\alias{executeScript}
|
||||
\title{Remote execution of R script in an R interface object.}
|
||||
\usage{
|
||||
executeScript(object, inputs = NULL, outputs = NULL,
|
||||
checkLibraries = FALSE, displayPlots = FALSE, writePlots = FALSE)
|
||||
executeScript(context, resourceGroup, remote, user, script, computeContext,
|
||||
inputs = NULL, outputs = NULL, checkLibraries = FALSE,
|
||||
displayPlots = FALSE, writePlots = FALSE)
|
||||
}
|
||||
\arguments{
|
||||
\item{object}{R interface object.}
|
||||
\item{context}{AzureSMR context.}
|
||||
|
||||
\item{resourceGroup}{Resource group of Azure resources for computation.}
|
||||
|
||||
\item{remote}{Remote URL for the computation engine. For DSVM, it is either DNS (usually in the format of <dsvm name>.<location>.cloudapp.azure.com) or IP address.}
|
||||
|
||||
\item{user}{Username for logging into the remote resource.}
|
||||
|
||||
\item{script}{R script to be executed on remote resource.}
|
||||
|
||||
\item{computeContext}{Computation context of Microsoft R Server under which the mechanisms of parallelization (e.g., local parallel, cluster based parallel, or Spark) is specified. Accepted computing context include "localParallel", "clusterParallel", "Hadoop", and "Spark".}
|
||||
|
||||
\item{inputs}{JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session.}
|
||||
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/newScript.R
|
||||
\name{newScript}
|
||||
\alias{newScript}
|
||||
\title{Generate a new worker script which is run on the remote instance with specifications in R interface object configuration.}
|
||||
\usage{
|
||||
newScript(path = ".", title = paste0("worker_new_", Sys.time(), ".R"))
|
||||
}
|
||||
\arguments{
|
||||
\item{path}{Path to the script.}
|
||||
|
||||
\item{title}{Title of the script.}
|
||||
}
|
||||
|
|
@ -1,14 +1,14 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/setConfig.R
|
||||
% Please edit documentation in R/computeInterface.R
|
||||
\name{setConfig}
|
||||
\alias{setConfig}
|
||||
\title{Set configuration for the R interface object.}
|
||||
\title{Set configuration for the compute interface object.}
|
||||
\usage{
|
||||
setConfig(object, machine_list, dns_list, machine_user, master, slaves, data,
|
||||
context)
|
||||
}
|
||||
\arguments{
|
||||
\item{object}{An S3 R interface object.}
|
||||
\item{object}{An S3 compute interface object.}
|
||||
|
||||
\item{machine_list}{List of remote instances that execute R scripts.}
|
||||
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
% Generated by roxygen2: do not edit by hand
|
||||
% Please edit documentation in R/updateScript.R
|
||||
% Please edit documentation in R/computeInterface.R
|
||||
\name{updateScript}
|
||||
\alias{updateScript}
|
||||
\title{Update a worker script with R interface object configuration.}
|
||||
\title{Update a worker script with compute interface object configuration.}
|
||||
\usage{
|
||||
updateScript(object)
|
||||
}
|
||||
\arguments{
|
||||
\item{object}{R interface object.}
|
||||
\item{object}{compute interface object.}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,13 +50,14 @@ LOC <- "southeastasia" # Where the resource group (resources) will be hosted
|
|||
|
||||
# Create names for the VMs.
|
||||
|
||||
COUNT <- 5 # Number of VMs to deploy.
|
||||
COUNT <- 2 # Number of VMs to deploy.
|
||||
BASE <-
|
||||
runif(4, 1, 26) %>%
|
||||
round() %>%
|
||||
letters[.] %>%
|
||||
paste(collapse="")
|
||||
LDSVM <- paste0("ldsvm", BASE, sprintf("%03d", 1:COUNT)) %T>% print()
|
||||
LUSER <- paste0("user", BASE, sprintf("%03d", 1:COUNT)) %T>% print()
|
||||
```
|
||||
|
||||
```{r connect}
|
||||
|
@ -102,25 +103,21 @@ Create the actual Linux DSVM cluser with public-key based
|
|||
authentication method. Name, username, and size can also be
|
||||
configured.
|
||||
|
||||
```{r deploy}
|
||||
# Deploy the required Linux DSVM - generally 4 minutes each. All but
|
||||
# the last will be deployed asynchronously whilst the last is deployed
|
||||
# synchronously so that we wait for it and hopefully the others.
|
||||
```{r deploy a set of DSVMs}
|
||||
|
||||
# Deploy multiple DSVMs using deployDSVMCluster.
|
||||
|
||||
ldsvm_set <- deployDSVMCluster(context,
|
||||
resource.group=RG,
|
||||
location=LOC,
|
||||
count=COUNT,
|
||||
name=LDSVM,
|
||||
username=LUSER,
|
||||
pubkey=rep(PUBKEY, COUNT),
|
||||
cluster=FALSE)
|
||||
|
||||
# May want to have a check on the deployed machines?
|
||||
|
||||
for (vm in LDSVM)
|
||||
{
|
||||
deployDSVM(context,
|
||||
resource.group=RG,
|
||||
location=LOC,
|
||||
name=vm,
|
||||
username=USER,
|
||||
size="Standard_DS1_v2",
|
||||
os="Linux",
|
||||
authen="Key",
|
||||
pubkey=PUBKEY,
|
||||
mode=ifelse(vm==tail(LDSVM, 1), "Sync", "ASync"))
|
||||
}
|
||||
|
||||
for (vm in LDSVM)
|
||||
{
|
||||
cat(vm, "\n")
|
||||
|
@ -143,6 +140,25 @@ for (vm in LDSVM)
|
|||
}
|
||||
```
|
||||
|
||||
Then we try deploying a cluster of DSVMs. The function will automatically form a DSVM cluster for us with which an R analytical job can be executed on with a "cluster parallel" computing context.
|
||||
|
||||
```{r deploy a cluster of DSVMs}
|
||||
|
||||
# Deploy a cluster of DSVMs.
|
||||
|
||||
ldsvm_cluster <- deployDSVMCluster(context,
|
||||
resource.group=RG,
|
||||
location=LOC,
|
||||
count=COUNT,
|
||||
name="yyy",
|
||||
username="yyyuser",
|
||||
pubkey=PUBKEY,
|
||||
cluster=TRUE)
|
||||
|
||||
# throw an data science analysis onto the cluster and run it. Still figuring out how to use mrsdeploy::remoteExecute for the purpose.
|
||||
|
||||
```
|
||||
|
||||
# Optional Delete
|
||||
|
||||
```{r optionally delete resource group}
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Your worker script starts from here ...
|
||||
# ---------------------------------------------------------------------------
|
Загрузка…
Ссылка в новой задаче