From 4a5c9c25d9074d29c109b0f8217bd4f221d0994f Mon Sep 17 00:00:00 2001 From: yueguoguo Date: Mon, 20 Feb 2017 19:11:01 +0800 Subject: [PATCH] Updated cluster deployment. Modified vignette. Tested and deployment was succesful. --- NAMESPACE | 7 +- R/computeInterface.R | 49 +++-- R/createInterface.R | 39 ---- R/deployDSVM.R | 14 +- R/deployDSVMCluster.R | 174 ++++++++++++++++++ R/dumpInterface.R | 28 --- R/executeScript.R | 53 +++++- R/newScript.R | 26 --- R/setConfig.R | 30 --- R/setInterface.R | 23 --- R/updateScript.R | 127 ------------- ...Interface.Rd => createComputeInterface.Rd} | 14 +- man/createRInterface.Rd | 21 --- man/deployDSVMCluster.Rd | 33 ++++ man/dumpInterface.Rd | 15 ++ man/dumpObject.Rd | 15 -- man/executeScript.Rd | 17 +- man/newScript.Rd | 14 -- man/setConfig.Rd | 6 +- man/updateScript.Rd | 6 +- vignettes/ClusterDSVM.Rmd | 54 ++++-- vignettes/experiment1.R | 4 - 22 files changed, 361 insertions(+), 408 deletions(-) delete mode 100644 R/createInterface.R create mode 100644 R/deployDSVMCluster.R delete mode 100644 R/dumpInterface.R delete mode 100644 R/newScript.R delete mode 100644 R/setConfig.R delete mode 100644 R/setInterface.R delete mode 100644 R/updateScript.R rename man/{setRInterface.Rd => createComputeInterface.Rd} (53%) delete mode 100644 man/createRInterface.Rd create mode 100644 man/deployDSVMCluster.Rd create mode 100644 man/dumpInterface.Rd delete mode 100644 man/dumpObject.Rd delete mode 100644 man/newScript.Rd delete mode 100644 vignettes/experiment1.R diff --git a/NAMESPACE b/NAMESPACE index 7d46d50..de4c529 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,14 +1,13 @@ # Generated by roxygen2: do not edit by hand -export(createRInterface) +export(createComputeInterface) export(deployDSVM) -export(dumpObject) +export(deployDSVMCluster) +export(dumpInterface) export(executeScript) export(getVMSizes) -export(newScript) export(operateDSVM) export(setConfig) -export(setRInterface) export(updateScript) import(dplyr) import(magrittr) diff --git a/R/computeInterface.R b/R/computeInterface.R index 00d8f4a..2fbbcfe 100644 --- a/R/computeInterface.R +++ b/R/computeInterface.R @@ -6,9 +6,9 @@ #' @return An S3 compute interface object. #' @export createComputeInterface <- function(remote, - user, - script, - config){ + user, + script, + config){ ci_env <- new.env(parent=globalenv()) # initialize an compute interface object. @@ -71,11 +71,11 @@ setConfig <- function(object, return(object) } -#' @title Dump out the object configuration. +#' @title Dump the interface configuration. #' @param object The compute interface object. #' @return No return. Print compute interface object information. #' @export -dumpObject <- function(object) { +dumpInterface <- function(object) { cat( sprintf("---------------------------------------------------------------------------"), sprintf("Compute interface information"), @@ -106,24 +106,23 @@ dumpObject <- function(object) { updateScript <- function(object) { if (!file.exists(object$script) || length(object$script) == 0) { - stop(paste("The script does not exist or is not specified!", - "Consider create a new one using riNewScript.")) + stop("The script does not exist or is not specified!") } - codes.body <- readLines(con=object$script) + codes_body <- readLines(con=object$script) # Remove the header. - if (codes.body[2] == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE") + if (!is.na(codes_body[2]) && codes_body[2] == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE") { - head.start <- which(codes.body == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE") - head.end <- which(codes.body == "# END OF THE HEADER ADDED BY COMPUTE INTERFACE") - codes.body <- codes.body[-((head.start - 1):(head.end + 1))] + head.start <- which(codes_body == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE") + head.end <- which(codes_body == "# END OF THE HEADER ADDED BY COMPUTE INTERFACE") + codes_body <- codes_body[-((head.start - 1):(head.end + 1))] } # Add context-specific info into header. - codes.head <- paste( + codes_head <- paste( "# ---------------------------------------------------------------------------", "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE", "# ---------------------------------------------------------------------------", @@ -132,8 +131,8 @@ updateScript <- function(object) { if (object$config$CI_CONTEXT == "clusterParallel") { - codes.head <- paste( - codes.head, + codes_head <- paste( + codes_head, paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"), paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"), paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"), @@ -155,15 +154,15 @@ updateScript <- function(object) { ) } else if (object$config$CI_CONTEXT == "Hadoop") { - codes.head <- paste( - codes.head, + codes_head <- paste( + codes_head, "This is for Hadoop.", "\n" ) } else if (object$config$CI_CONTEXT == "Spark") { - codes.head <- paste( - codes.head, + codes_head <- paste( + codes_head, paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"), paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"), paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"), @@ -189,15 +188,15 @@ updateScript <- function(object) { ) } else if (object$config$CI_CONTEXT == "Teradata") { - codes.head <- paste( - codes.head, + codes_head <- paste( + codes_head, "This is for Teradata.", "\n" ) } else if (object$config$CI_CONTEXT == "localParallel") { - codes.head <- paste( - codes.head, + codes_head <- paste( + codes_head, paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"), paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"), paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"), @@ -221,6 +220,6 @@ updateScript <- function(object) { "\"Hadoop\", \"Spark\", or \"Teradata\".")) } - cat(codes.head, file=object$script) - cat(codes.body, file=object$script, sep="\n", append=TRUE) + cat(codes_head, file=object$script) + cat(codes_body, file=object$script, sep="\n", append=TRUE) } diff --git a/R/createInterface.R b/R/createInterface.R deleted file mode 100644 index 865202b..0000000 --- a/R/createInterface.R +++ /dev/null @@ -1,39 +0,0 @@ -#' @title Create an R interface object that handles interaction with remote instance. -#' @param remote URL of remote instance. -#' @param user User name. -#' @param script R script with full path for execution at remote instance. -#' @param config Configuration for remote execution. Settings include computing context, data reference, etc. -#' @return An S3 R interface object. -#' @export -createRInterface <- function(remote, - user, - script, - config){ - ri_env <- new.env(parent=globalenv()) - - # initialize an R interface object. - - if(!missing(remote)) { - ri_env$remote <- remote - } else { - ri_env$remote <- character(0) - } - - if(!missing(user)) { - ri_env$user <- user - } else { - ri_env$user <- character(0) - } - - if(!missing(script)) { - ri_env$script <- script - } else { - ri_env$script <- character(0) - } - - if(!missing(config)) { - ri_env$config <- config - } else { - ri_env$config <- NULL - } -} diff --git a/R/deployDSVM.R b/R/deployDSVM.R index c341ba1..db680b7 100644 --- a/R/deployDSVM.R +++ b/R/deployDSVM.R @@ -165,11 +165,11 @@ deployDSVM <- function(context, resourceGroup=resource.group, mode=mode) - fqdn <- paste0(name, ".", location, ".cloudapp.azure.com") - - if (tolower(mode) == "sync") - attr(fqdn, "ip") <- - system(paste("dig", fqdn, "+short"), intern=TRUE) # Get from the VM meta data? - - return(fqdn) + # fqdn <- paste0(name, ".", location, ".cloudapp.azure.com") + # + # if (tolower(mode) == "sync") + # attr(fqdn, "ip") <- + # system(paste("dig", fqdn, "+short"), intern=TRUE) # Get from the VM meta data? + # + # return(fqdn) } diff --git a/R/deployDSVMCluster.R b/R/deployDSVMCluster.R new file mode 100644 index 0000000..ed4f5d1 --- /dev/null +++ b/R/deployDSVMCluster.R @@ -0,0 +1,174 @@ +#' @title This function creates a cluster of Data Science Virtual Machines and enable the DSVMs to communicate across the cluster via public key based credentials for high performance computation. All DSVMs in the cluster are based on Linux OS and use public key cryptogrphy for log in. +#' @param context AzureSMR active context. +#' @param resource.group The Azure resource group where the DSVM is allocated. +#' @param location Location of the data centre to host the DSVM. +#' @param count Number of DSVM instances to be created. Note deploying multiple DSVMs may consume some time. +#' @param name Names of the DSVMs. Lowercase characters or numbers only. Special characters are not permitted. +#' @param username User name of the DSVM. It should be different from name of the DSVM. +#' @param pubkey Public key for the DSVM. Only applicable for +#' public-key based authentication of Linux based DSVM. +#' @param dns DNS label for the VM address. The URL for accessing the deployed DSVM will be "..cloudapp.azure.com +#' @param cluster A logical value of TRUE or FALSE to indicate whether the deployed DSVMs form a cluster. If not, the deployment will assign the vectors of name, username, and public key as given in the input arguments to the DSVMs - this is usually used for creating multiple DSVMs for a group of data scientists. If it is TRUE, the deployment will use the first element (if it consists more than one elements) of the given DSVM names as base, and append serial number to the base to form a DSVM full name, and then use the SAME username and public key across the cluster - this can be used for creating a HPC engine on top of the deployed DSVMs in which parallel computing context which is availed in Microsoft R Server ScaleR package can be applied for embarassing parallelization. +#' @export +deployDSVMCluster <- function(context, + resource.group, + location, + count=1, + name, + username, + size="Standard_D1_v2", + pubkey="", + dns=name, + cluster=FALSE) { + + # do some checks for the input arguments. + + if(missing(context)) + stop("Please specify a context (contains TID, CID, KEY).") + + if(missing(resource.group)) + stop("Please specify an Azure resouce group.") + + if(missing(location)) + stop("Please specify a data centre location.") + + if(missing(name)) + stop("Please specify virtual machine name(s).") + + if(missing(username)) + stop("Please specify virtual machine user name(s).") + + # other preconditions. + + # let's limit the number of DSVM deployments to a reasonable number. + + if(count > 10) { + ans <- readline("More than 10 DSVMs are going to be created and that may take a long time to finish. Continue? (y/n)") + if(ans == "n" || ans == "N") + return("The deployment is aborted.") + } + + # to deploy the cluster of DSVMs. + + if(cluster) { + + name <- ifelse(length(name) > 1, name[1], name) + + # append serial no. to name base to form a full name. + + names <- paste0(name, sprintf("%03d", 1:count)) + + for (inc in 1:count) { + deployDSVM(context=context, + resource.group=resource.group, + location=location, + name=names[inc], + username=username, + size=size, + os="Linux", + authen="Key", + pubkey=pubkey, + dns=names[inc], + mode=ifelse(inc==count, "Sync", "ASync")) + } + + # set up public credentials for DSVM cluster, to allow DSVMs to communicate with each other. This is required if one wants to execute analytics on the cluster with parallel compute context in ScaleR. + + # TODO: transmitting private key is not good practice! Seeking a better method... + + HOME_DIR <- ifelse(identical(.Platform$OS.type, "windows"), + normalizePath(paste0(Sys.getenv("HOME"), "/../"), winslash = "/"), + Sys.getenv("HOME")) + + # generate public key from private key. + + shell(paste0("ssh-keygen -y -f ", HOME_DIR, ".ssh/id_rsa > ./id_rsa.pub")) + + # copy private key into local directory. Note local machine may be either Linux or Windows based so it is treated differently. + + ifelse(identical(.Platform$OS.type, "windows"), + system(paste0("xcopy /f ", shQuote(paste0(HOME_DIR, ".ssh/id_rsa"), type = "cmd"), + " ", shQuote(".", type = "cmd"))), + system("cp ~/.ssh/id_rsa .")) + + dns_name_list <- paste(names, + location, + "cloudapp.azure.com", + sep=".") + + # Distribute the key pair to all nodes of the cluster. + + for (vm in dns_name_list) + { + # add an option to switch off host key checking - for the purpose of avoiding pop up. + + option <- "-q -o StrictHostKeyChecking=no" + + # copy the key pairs onto cluster node. + + system(sprintf("scp %s ./id_rsa %s@%s:.ssh/", option, username, vm)) + system(sprintf("scp %s ./id_rsa.pub %s@%s:.ssh/", option, username, vm)) + + # create a config file to switch off strick host key checking to enable node-to-node authentication without pop up. + + sh <- writeChar(c("cat .ssh/id_rsa.pub > .ssh/authorized_keys\n", + paste0("echo Host *.", location, ".cloudapp.azure.com >> ~/.ssh/config\n"), + paste0("echo StrictHostKeyChecking no >> ~/.ssh/config\n"), + paste0("echo UserKnownHostsFile /dev/null >> ~/.ssh/config\n"), + "chmod 600 ~/.ssh/config\n"), con = "./shell_script") + + # upload, change mode of, and run the config script. + + system(sprintf("scp %s shell_script %s@%s:~", option, username, vm), show.output.on.console = FALSE) + system(sprintf("ssh %s -l %s %s 'chmod +x ~/shell_script'", option, username, vm), show.output.on.console = FALSE) + system(sprintf("ssh %s -l %s %s '~/shell_script'", option, username, vm), show.output.on.console = FALSE) + } + + # Clean up - if you search "remove password" you will get 284,505 records so the following is to clean up confidential information in the working directory to prevent them from leaking anywhere out of your control. + + file.remove("./id_rsa", "./id_rsa.pub", "./shell_script") + } else { + + # check whether the input arguments are valid for the multi-instance deployment- + # 1. VM names, usernames, and public key should character vectors that have the same length as the number of DSVMs. + # 2. The name and username vectors should not contain duplicated elements. + + if(all(length(name) == count, + length(username) == count, + length(pubkey) == count) && + !any(c(duplicated(name), duplicated(username)))) { + + names <- name + dns_name_list <- paste(names, + location, + "cloudapp.azure.com", + sep=".") + + for (inc in 1:count) { + deployDSVM(context=context, + resource.group=resource.group, + location=location, + name=name[inc], + username=username[inc], + size=size, + os="Linux", + authen="Key", + pubkey=pubkey[inc], + dns=name[inc], + mode=ifelse(inc==count, "Sync", "ASync")) + } + } else { + stop("Please check input DSVM names, usernames, and public key. The input vectors should have the same length with count of DSVM and elements of each input vectot should be distinct from each other.") + } + } + + # return results for reference. + + df <- data.frame(Names = names, + Usernames = rep(username, count), + URL = dns_name_list, + Sizes = rep(size, count)) + + writeLines("Summary of DSVMs deployed") + print(df) +} diff --git a/R/dumpInterface.R b/R/dumpInterface.R deleted file mode 100644 index 085714f..0000000 --- a/R/dumpInterface.R +++ /dev/null @@ -1,28 +0,0 @@ -#' @title Dump out the object configuration. -#' @param object The R interface object. -#' @return No return. Print R interface object information. -#' @export -dumpObject <- function(object) { - cat( - sprintf("---------------------------------------------------------------------------"), - sprintf("r Interface information"), - sprintf("---------------------------------------------------------------------------"), - sprintf("The R script to be executed:\t%s.", shQuote(object$script)), - sprintf("The remote host:\t\t%s.", shQuote(object$remote)), - sprintf("The login user name:\t\t%s.", shQuote(object$user)), - sprintf("---------------------------------------------------------------------------"), - sprintf("The configuration of the interface is:"), - # sprintf("virtual machines: %s", ifelse(!is.na(object$config$RI_MACHINES), object$config$RI_MACHINES, "N/A")), - sprintf("virtual machines\t\t %s", unlist(object$config$RI_MACHINES)), - sprintf("dns list\t\t\t %s", unlist(object$config$RI_DNS)), - sprintf("user to these machines\t\t %s", unlist(object$config$RI_VMUSER)), - sprintf("the master node\t\t\t %s", unlist(object$config$RI_MASTER)), - sprintf("the slave nodes\t\t\t %s", unlist(object$config$RI_SLAVES)), - sprintf("the data source\t\t\t %s", unlist(object$config$RI_DATA)), - sprintf("the computing context\t\t %s", unlist(object$config$RI_CONTEXT)), - sprintf("---------------------------------------------------------------------------"), - sprintf("# End of information session"), - sprintf("---------------------------------------------------------------------------"), - sep = "\n" - ) -} diff --git a/R/executeScript.R b/R/executeScript.R index 3ae2592..32da130 100644 --- a/R/executeScript.R +++ b/R/executeScript.R @@ -1,4 +1,10 @@ #' @title Remote execution of R script in an R interface object. +#' @param context AzureSMR context. +#' @param resourceGroup Resource group of Azure resources for computation. +#' @param remote Remote URL for the computation engine. For DSVM, it is either DNS (usually in the format of ..cloudapp.azure.com) or IP address. +#' @param user Username for logging into the remote resource. +#' @param script R script to be executed on remote resource. +#' @param computeContext Computation context of Microsoft R Server under which the mechanisms of parallelization (e.g., local parallel, cluster based parallel, or Spark) is specified. Accepted computing context include "localParallel", "clusterParallel", "Hadoop", and "Spark". #' @param inputs JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session. #' @param outputs Character vector of the names of the objects to retreive. Only primitives, vectors and dataframes can be retrieved using this function. Use getRemoteObject to get any type of R object from the remote session. #' @param checkLibraries if `TRUE`, check whether libraries used in the R script installed on the remote machine. @@ -6,7 +12,9 @@ #' @param writePlots If TRUE, plots generated during execution are copied to the working directory of the local session. #' @return Status of scription execution. #' @export -executeScript <- function(remote, +executeScript <- function(context, + resourceGroup, + remote, user, script, computeContext, @@ -16,18 +24,45 @@ executeScript <- function(remote, displayPlots=FALSE, writePlots=FALSE) { - # manage input strings in an interface object, set configuration, show interface contents, and update script with computing context specification. + # manage input strings in an interface object. - new_interface <- - createComputeInterface(remote, user, script) %>% - setConfig(dns_list=remote, - machine_user=user, - context=computeContext) %>% - dumpInterface() %>% - updateScript() + new_interface <- createComputeInterface(remote, user, script) + + # set configuration + + new_interface %<>% setConfig(new_interface, + dns_list=remote, + machine_user=user, + context=computeContext) + + # print interface contents. + + dumpInterface(new_interface) + + # update script with computing context. + + updateScript(new_interface) # authenticate the remote server. + status <- operateDSVM(context, ) + + # some preconditions of using Microsoft R Server. + + # load mrsdeploy on remote machine. + + + + + + + + + + + + + mrsdeploy::remoteLogin(deployr_endpoint=object$remote, session=FALSE, commandline=FALSE, diff --git a/R/newScript.R b/R/newScript.R deleted file mode 100644 index b2a91f6..0000000 --- a/R/newScript.R +++ /dev/null @@ -1,26 +0,0 @@ -#' @title Generate a new worker script which is run on the remote instance with specifications in R interface object configuration. -#' @param path Path to the script. -#' @param title Title of the script. -#' @export -newScript <- function(path=".", - title=paste0("worker_new_", Sys.time(), ".R")) { - notes <- - sprintf( - paste( - "\n# ---------------------------------------------------------------------------", - "# Your worker script starts from here ... ", - "# ---------------------------------------------------------------------------\n", - sep="\n" - ) - ) - if (missing(path) || missing(title)) - { - stop(sprintf("A default script named %s located at %s is created.", title, path)) - } - - cat(notes, file=file.path(path, title)) - writeLines( - sprintf("Worker script %s is created at %s.", - title, ifelse(path == ".", "work directory", path)) - ) -} diff --git a/R/setConfig.R b/R/setConfig.R deleted file mode 100644 index a7a7279..0000000 --- a/R/setConfig.R +++ /dev/null @@ -1,30 +0,0 @@ -#' @title Set configuration for the R interface object. -#' @param object An S3 R interface object. -#' @param machine_list List of remote instances that execute R scripts. -#' @param dns_list DNS of the remote instances. -#' @param machine_user User name of the remote instances. -#' @param master Master node of the machine. -#' @param slaves Slave nodes of the machine. -#' @param data Reference to data used in the analytics. -#' @param context Computing context available in Microsoft R Server for running the analytics. -#' @export -setConfig <- function(object, - machine_list, - dns_list, - machine_user, - master, - slaves, - data, - context) { - object$config <- list( - RI_MACHINES = ifelse(!missing(machine_list), list(machine_list), ""), - RI_DNS = ifelse(!missing(dns_list), list(dns_list), ""), - RI_VMUSER = ifelse(!missing(machine_user), list(machine_user), ""), - RI_MASTER = ifelse(!missing(master), list(master), ""), - RI_SLAVES = ifelse(!missing(slaves), list(slaves), ""), - RI_DATA = ifelse(!missing(data), list(data), ""), - RI_CONTEXT = ifelse(!missing(context), list(context), "") - ) - - return(object) -} diff --git a/R/setInterface.R b/R/setInterface.R deleted file mode 100644 index 33adac4..0000000 --- a/R/setInterface.R +++ /dev/null @@ -1,23 +0,0 @@ -#' @title Set values of the R interface object. -#' @param object An S3 R interface object. -#' @param remote URL of remote instance. -#' @param user User name. -#' @param script R script with full path for execution at remote instance. -#' @param config Configuration for remote execution. Settings include computing context, data reference, etc. -#' @return The updated R interface object. -#' @export -setRInterface <- function(object, - remote, - user, - script, - config) { - if(!missing(remote)) object$remote <- remote - if(!missing(user)) object$user <- user - if(!missing(script) && file.exists(script)) - { - object$script <- script - } - if(!missing(config)) object$config <- config - - return(object) -} diff --git a/R/updateScript.R b/R/updateScript.R deleted file mode 100644 index c2a5b7a..0000000 --- a/R/updateScript.R +++ /dev/null @@ -1,127 +0,0 @@ -#' @title Update a worker script with R interface object configuration. -#' @param object R interface object. -#' @export -updateScript <- function(object) { - if (!file.exists(object$script) || length(object$script) == 0) - { - stop(paste("The script does not exist or is not specified!", - "Consider create a new one using riNewScript.")) - } - - codes.body <- readLines(con=object$script) - - # Remove the header. - - if (codes.body[2] == "# THIS IS A HEADER ADDED BY R INTERFACE") - { - head.start <- which(codes.body == "# THIS IS A HEADER ADDED BY R INTERFACE") - head.end <- which(codes.body == "# END OF THE HEADER ADDED BY R INTERFACE") - codes.body <- codes.body[-((head.start - 1):(head.end + 1))] - } - - # Add context-specific info into header. - - codes.head <- paste( - "# ---------------------------------------------------------------------------", - "# THIS IS A HEADER ADDED BY R INTERFACE", - "# ---------------------------------------------------------------------------", - sep="\n" - ) - - if (object$config$RI_CONTEXT == "clusterParallel") - { - codes.head <- paste( - codes.head, - paste("RI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$RI_MACHINES)), collapse=", "), ")"), - paste("RI_DNS <-", "c(", paste(shQuote(unlist(object$config$RI_DNS)), collapse=", "), ")"), - paste("RI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$RI_VMUSER)), collapse=", "), ")"), - paste("RI_MASTER <-", "c(", paste(shQuote(unlist(object$config$RI_MASTER)), collapse=", "), ")"), - paste("RI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$RI_SLAVES)), collapse=", "), ")"), - paste("RI_DATA <-", paste(shQuote(unlist(object$config$RI_DATA)), collapse=", ")), - paste("RI_CONTEXT <-", paste(shQuote(unlist(object$config$RI_CONTEXT)), collapse=", ")), - "\nlibrary(RevoScaleR)", - "# --------- Set compute context", - "cl <- makePSOCKcluster(names=RI_SLAVES, master=RI_MASTER, user=RI_VMUSER)", - "registerDoParallel(cl)", - "rxSetComputeContext(RxForeachDoPar())", - "# --------- Load data.", - "download.file(url=RI_DATA, destfile='./data.csv')", - "riData <- read.csv('./data.csv', header=T, sep=',', stringAsFactor=F)", - "# ---------------------------------------------------------------------------", - "# END OF THE HEADER ADDED BY R INTERFACE", - "# ---------------------------------------------------------------------------\n", - sep="\n" - ) - } else if (object$config$RI_CONTEXT == "Hadoop") - { - codes.head <- paste( - codes.head, - "This is for Hadoop.", - "\n" - ) - } else if (object$config$RI_CONTEXT == "Spark") - { - codes.head <- paste( - codes.head, - paste("RI_DNS <-", "c(", paste(shQuote(unlist(object$config$RI_DNS)), collapse=", "), ")"), - paste("RI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$RI_VMUSER)), collapse=", "), ")"), - paste("RI_MASTER <-", "c(", paste(shQuote(unlist(object$config$RI_MASTER)), collapse=", "), ")"), - paste("RI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$RI_SLAVES)), collapse=", "), ")"), - paste("RI_DATA <-", paste(shQuote(unlist(object$config$RI_DATA)), collapse=", ")), - paste("RI_CONTEXT <-", paste(shQuote(unlist(object$config$RI_CONTEXT)), collapse=", ")), - "\nlibrary(RevoScaleR)", - "# --------- Set compute context", - "myHadoopCluster <- RxSpark(persistentRun=TRUE, idleTimeOut=600)", - "rxSetComputeContext(myHadoopCluster)", - "# --------- Load data.", - "download.file(url=RI_DATA, destfile='./data.csv')", - "riData <- read.csv('./data.csv', header=T, sep=',', stringAsFactor=F)", - "bigDataDirRoot <- '/share'", - "inputDir <- file.path(bigDataDirRoot, 'riBigData')", - "rxHadoopMakeDir(inputDir)", - "rxHadoopCopyFromLocal('./data.csv', inputDir)", - "hdfsFS <- RxHdfsFileSystem()", - "riTextData <- RxTextData(file=inputDir, fileSystem=hdfsFS)", - "# ---------------------------------------------------------------------------", - "# END OF THE HEADER ADDED BY R INTERFACE", - "# ---------------------------------------------------------------------------\n", - sep="\n" - ) - } else if (object$config$RI_CONTEXT == "Teradata") - { - codes.head <- paste( - codes.head, - "This is for Teradata.", - "\n" - ) - } else if (object$config$RI_CONTEXT == "localParallel") - { - codes.head <- paste( - codes.head, - paste("RI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$RI_MACHINES)), collapse=", "), ")"), - paste("RI_DNS <-", "c(", paste(shQuote(unlist(object$config$RI_DNS)), collapse=", "), ")"), - paste("RI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$RI_VMUSER)), collapse=", "), ")"), - paste("RI_MASTER <-", "c(", paste(shQuote(unlist(object$config$RI_MASTER)), collapse=", "), ")"), - paste("RI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$RI_SLAVES)), collapse=", "), ")"), - paste("RI_DATA <-", paste(shQuote(unlist(object$config$RI_DATA)), collapse=", ")), - paste("RI_CONTEXT <-", paste(shQuote(unlist(object$config$RI_CONTEXT)), collapse=", ")), - "\nlibrary(RevoScaleR)", - "library(doParallel)", - "# --------- Set compute context", - "rxSetComputeContext(RxLocalParallel())", - "# --------- Load data.", - "download.file(url=RI_DATA, destfile='./data.csv')", - "riData <- read.csv('./data.csv', header=T, sep=',', stringAsFactor=F)", - "# ---------------------------------------------------------------------------", - "# END OF THE HEADER ADDED BY R INTERFACE", - "# ---------------------------------------------------------------------------\n", - sep="\n" - ) - } else { - stop(paste("Specify a context from \"localParallel\", \"clusterParallel\",", - "\"Hadoop\", \"Spark\", or \"Teradata\".")) - } - - cat(codes.head, file=object$script) - cat(codes.body, file=object$script, sep="\n", append=TRUE) -} diff --git a/man/setRInterface.Rd b/man/createComputeInterface.Rd similarity index 53% rename from man/setRInterface.Rd rename to man/createComputeInterface.Rd index 7eda422..b550d50 100644 --- a/man/setRInterface.Rd +++ b/man/createComputeInterface.Rd @@ -1,14 +1,12 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/setInterface.R -\name{setRInterface} -\alias{setRInterface} -\title{Set values of the R interface object.} +% Please edit documentation in R/computeInterface.R +\name{createComputeInterface} +\alias{createComputeInterface} +\title{Create an compute interface object that handles interaction with remote instance.} \usage{ -setRInterface(object, remote, user, script, config) +createComputeInterface(remote, user, script, config) } \arguments{ -\item{object}{An S3 R interface object.} - \item{remote}{URL of remote instance.} \item{user}{User name.} @@ -18,6 +16,6 @@ setRInterface(object, remote, user, script, config) \item{config}{Configuration for remote execution. Settings include computing context, data reference, etc.} } \value{ -The updated R interface object. +An S3 compute interface object. } diff --git a/man/createRInterface.Rd b/man/createRInterface.Rd deleted file mode 100644 index b9e8583..0000000 --- a/man/createRInterface.Rd +++ /dev/null @@ -1,21 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/createInterface.R -\name{createRInterface} -\alias{createRInterface} -\title{Create an R interface object that handles interaction with remote instance.} -\usage{ -createRInterface(remote, user, script, config) -} -\arguments{ -\item{remote}{URL of remote instance.} - -\item{user}{User name.} - -\item{script}{R script with full path for execution at remote instance.} - -\item{config}{Configuration for remote execution. Settings include computing context, data reference, etc.} -} -\value{ -An S3 R interface object. -} - diff --git a/man/deployDSVMCluster.Rd b/man/deployDSVMCluster.Rd new file mode 100644 index 0000000..68ddd3c --- /dev/null +++ b/man/deployDSVMCluster.Rd @@ -0,0 +1,33 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/deployDSVMCluster.R +\name{deployDSVMCluster} +\alias{deployDSVMCluster} +\title{This function creates a cluster of Data Science Virtual Machines and enable the DSVMs to communicate across the cluster via public key based credentials for high performance computation. All DSVMs in the cluster are based on Linux OS and use public key cryptogrphy for log in.} +\usage{ +deployDSVMCluster(context, resource.group, location, count = 1, name, + username, size = "Standard_D1_v2", pubkey = "", dns = name, + cluster = FALSE, mode = "Sync") +} +\arguments{ +\item{context}{AzureSMR active context.} + +\item{resource.group}{The Azure resource group where the DSVM is allocated.} + +\item{location}{Location of the data centre to host the DSVM.} + +\item{count}{Number of DSVM instances to be created. Note deploying multiple DSVMs may consume some time.} + +\item{name}{Names of the DSVMs. Lowercase characters or numbers only. Special characters are not permitted.} + +\item{username}{User name of the DSVM. It should be different from name of the DSVM.} + +\item{pubkey}{Public key for the DSVM. Only applicable for +public-key based authentication of Linux based DSVM.} + +\item{dns}{DNS label for the VM address. The URL for accessing the deployed DSVM will be "..cloudapp.azure.com} + +\item{cluster}{A logical value of TRUE or FALSE to indicate whether the deployed DSVMs form a cluster. If not, the deployment will assign the vectors of name, username, and public key as given in the input arguments to the DSVMs - this is usually used for creating multiple DSVMs for a group of data scientists. If it is TRUE, the deployment will use the first element (if it consists more than one elements) of the given DSVM names as base, and append serial number to the base to form a DSVM full name, and then use the SAME username and public key across the cluster - this can be used for creating a HPC engine on top of the deployed DSVMs in which parallel computing context which is availed in Microsoft R Server ScaleR package can be applied for embarassing parallelization.} + +\item{mode}{Mode of virtual machine deployment. Default is "Sync".} +} + diff --git a/man/dumpInterface.Rd b/man/dumpInterface.Rd new file mode 100644 index 0000000..562b68a --- /dev/null +++ b/man/dumpInterface.Rd @@ -0,0 +1,15 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/computeInterface.R +\name{dumpInterface} +\alias{dumpInterface} +\title{Dump the interface configuration.} +\usage{ +dumpInterface(object) +} +\arguments{ +\item{object}{The compute interface object.} +} +\value{ +No return. Print compute interface object information. +} + diff --git a/man/dumpObject.Rd b/man/dumpObject.Rd deleted file mode 100644 index 0879300..0000000 --- a/man/dumpObject.Rd +++ /dev/null @@ -1,15 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/dumpInterface.R -\name{dumpObject} -\alias{dumpObject} -\title{Dump out the object configuration.} -\usage{ -dumpObject(object) -} -\arguments{ -\item{object}{The R interface object.} -} -\value{ -No return. Print R interface object information. -} - diff --git a/man/executeScript.Rd b/man/executeScript.Rd index 11c44f5..8771566 100644 --- a/man/executeScript.Rd +++ b/man/executeScript.Rd @@ -4,11 +4,22 @@ \alias{executeScript} \title{Remote execution of R script in an R interface object.} \usage{ -executeScript(object, inputs = NULL, outputs = NULL, - checkLibraries = FALSE, displayPlots = FALSE, writePlots = FALSE) +executeScript(context, resourceGroup, remote, user, script, computeContext, + inputs = NULL, outputs = NULL, checkLibraries = FALSE, + displayPlots = FALSE, writePlots = FALSE) } \arguments{ -\item{object}{R interface object.} +\item{context}{AzureSMR context.} + +\item{resourceGroup}{Resource group of Azure resources for computation.} + +\item{remote}{Remote URL for the computation engine. For DSVM, it is either DNS (usually in the format of ..cloudapp.azure.com) or IP address.} + +\item{user}{Username for logging into the remote resource.} + +\item{script}{R script to be executed on remote resource.} + +\item{computeContext}{Computation context of Microsoft R Server under which the mechanisms of parallelization (e.g., local parallel, cluster based parallel, or Spark) is specified. Accepted computing context include "localParallel", "clusterParallel", "Hadoop", and "Spark".} \item{inputs}{JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session.} diff --git a/man/newScript.Rd b/man/newScript.Rd deleted file mode 100644 index 181fbcc..0000000 --- a/man/newScript.Rd +++ /dev/null @@ -1,14 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/newScript.R -\name{newScript} -\alias{newScript} -\title{Generate a new worker script which is run on the remote instance with specifications in R interface object configuration.} -\usage{ -newScript(path = ".", title = paste0("worker_new_", Sys.time(), ".R")) -} -\arguments{ -\item{path}{Path to the script.} - -\item{title}{Title of the script.} -} - diff --git a/man/setConfig.Rd b/man/setConfig.Rd index 280c42f..9f0f652 100644 --- a/man/setConfig.Rd +++ b/man/setConfig.Rd @@ -1,14 +1,14 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/setConfig.R +% Please edit documentation in R/computeInterface.R \name{setConfig} \alias{setConfig} -\title{Set configuration for the R interface object.} +\title{Set configuration for the compute interface object.} \usage{ setConfig(object, machine_list, dns_list, machine_user, master, slaves, data, context) } \arguments{ -\item{object}{An S3 R interface object.} +\item{object}{An S3 compute interface object.} \item{machine_list}{List of remote instances that execute R scripts.} diff --git a/man/updateScript.Rd b/man/updateScript.Rd index 8509390..8d6e613 100644 --- a/man/updateScript.Rd +++ b/man/updateScript.Rd @@ -1,12 +1,12 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/updateScript.R +% Please edit documentation in R/computeInterface.R \name{updateScript} \alias{updateScript} -\title{Update a worker script with R interface object configuration.} +\title{Update a worker script with compute interface object configuration.} \usage{ updateScript(object) } \arguments{ -\item{object}{R interface object.} +\item{object}{compute interface object.} } diff --git a/vignettes/ClusterDSVM.Rmd b/vignettes/ClusterDSVM.Rmd index 4ff4e95..8308fe2 100644 --- a/vignettes/ClusterDSVM.Rmd +++ b/vignettes/ClusterDSVM.Rmd @@ -50,13 +50,14 @@ LOC <- "southeastasia" # Where the resource group (resources) will be hosted # Create names for the VMs. -COUNT <- 5 # Number of VMs to deploy. +COUNT <- 2 # Number of VMs to deploy. BASE <- runif(4, 1, 26) %>% round() %>% letters[.] %>% paste(collapse="") LDSVM <- paste0("ldsvm", BASE, sprintf("%03d", 1:COUNT)) %T>% print() +LUSER <- paste0("user", BASE, sprintf("%03d", 1:COUNT)) %T>% print() ``` ```{r connect} @@ -102,25 +103,21 @@ Create the actual Linux DSVM cluser with public-key based authentication method. Name, username, and size can also be configured. -```{r deploy} -# Deploy the required Linux DSVM - generally 4 minutes each. All but -# the last will be deployed asynchronously whilst the last is deployed -# synchronously so that we wait for it and hopefully the others. +```{r deploy a set of DSVMs} + +# Deploy multiple DSVMs using deployDSVMCluster. + +ldsvm_set <- deployDSVMCluster(context, + resource.group=RG, + location=LOC, + count=COUNT, + name=LDSVM, + username=LUSER, + pubkey=rep(PUBKEY, COUNT), + cluster=FALSE) + +# May want to have a check on the deployed machines? -for (vm in LDSVM) -{ - deployDSVM(context, - resource.group=RG, - location=LOC, - name=vm, - username=USER, - size="Standard_DS1_v2", - os="Linux", - authen="Key", - pubkey=PUBKEY, - mode=ifelse(vm==tail(LDSVM, 1), "Sync", "ASync")) -} - for (vm in LDSVM) { cat(vm, "\n") @@ -143,6 +140,25 @@ for (vm in LDSVM) } ``` +Then we try deploying a cluster of DSVMs. The function will automatically form a DSVM cluster for us with which an R analytical job can be executed on with a "cluster parallel" computing context. + +```{r deploy a cluster of DSVMs} + +# Deploy a cluster of DSVMs. + +ldsvm_cluster <- deployDSVMCluster(context, + resource.group=RG, + location=LOC, + count=COUNT, + name="yyy", + username="yyyuser", + pubkey=PUBKEY, + cluster=TRUE) + +# throw an data science analysis onto the cluster and run it. Still figuring out how to use mrsdeploy::remoteExecute for the purpose. + +``` + # Optional Delete ```{r optionally delete resource group} diff --git a/vignettes/experiment1.R b/vignettes/experiment1.R deleted file mode 100644 index ae29d40..0000000 --- a/vignettes/experiment1.R +++ /dev/null @@ -1,4 +0,0 @@ - -# --------------------------------------------------------------------------- -# Your worker script starts from here ... -# ---------------------------------------------------------------------------