зеркало из https://github.com/Azure/AzureDSVM.git
Rewrote executeScript.R
This commit is contained in:
Родитель
e8d0ab0452
Коммит
6bed033bd8
|
@ -141,12 +141,14 @@ updateScript <- function(object) {
|
|||
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
|
||||
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
|
||||
"\nlibrary(RevoScaleR)",
|
||||
"library(readr)",
|
||||
"library(doParallel)",
|
||||
"# --------- Set compute context",
|
||||
"cl <- makePSOCKcluster(names=CI_SLAVES, master=CI_MASTER, user=CI_VMUSER)",
|
||||
"registerDoParallel(cl)",
|
||||
"rxSetComputeContext(RxForeachDoPar())",
|
||||
"# --------- Load data.",
|
||||
"ciData <- read_csv(CI_DATA)",
|
||||
"ciData <- ifelse(CI_DATA != '', read_csv(CI_DATA), data.frame(0))",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
|
||||
"# ---------------------------------------------------------------------------\n",
|
||||
|
@ -170,11 +172,12 @@ updateScript <- function(object) {
|
|||
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
|
||||
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
|
||||
"\nlibrary(RevoScaleR)",
|
||||
"library(readr)",
|
||||
"# --------- Set compute context",
|
||||
"myHadoopCluster <- RxSpark(persistentRun=TRUE, idleTimeOut=600)",
|
||||
"rxSetComputeContext(myHadoopCluster)",
|
||||
"# --------- Load data.",
|
||||
"ciData <- read_csv(CI_DATA)",
|
||||
"ciData <- ifelse(CI_DATA != '', read_csv(CI_DATA), data.frame(0))",
|
||||
"bigDataDirRoot <- '/share'",
|
||||
"inputDir <- file.path(bigDataDirRoot, 'riBigData')",
|
||||
"rxHadoopMakeDir(inputDir)",
|
||||
|
@ -206,10 +209,11 @@ updateScript <- function(object) {
|
|||
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
|
||||
"\nlibrary(RevoScaleR)",
|
||||
"library(doParallel)",
|
||||
"library(readr)",
|
||||
"# --------- Set compute context",
|
||||
"rxSetComputeContext(RxLocalParallel())",
|
||||
"# --------- Load data.",
|
||||
"ciData <- read_csv(CI_DATA)",
|
||||
"ciData <- ifelse(CI_DATA != '', read_csv(CI_DATA), data.frame(0))",
|
||||
"# ---------------------------------------------------------------------------",
|
||||
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
|
||||
"# ---------------------------------------------------------------------------\n",
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
#' @title Remote execution of R script in an R interface object.
|
||||
#' @title Remote execution of R script in an R interface new_interface.
|
||||
#' @param context AzureSMR context.
|
||||
#' @param resourceGroup Resource group of Azure resources for computation.
|
||||
#' @param machines Remote DSVMs that will be used for computation.
|
||||
#' @param remote Remote URL for the computation engine. For DSVM, it is either DNS (usually in the format of <dsvm name>.<location>.cloudapp.azure.com) or IP address.
|
||||
#' @param remote IP address or URL for a computation engine. For DSVM, it is either DNS (usually in the format of <dsvm name>.<location>.cloudapp.azure.com) or its public IP address. Note if more than one machines are used for execution, the remote is used as master node by default.
|
||||
#' @param user Username for logging into the remote resource.
|
||||
#' @param script R script to be executed on remote resource.
|
||||
#' @param script R script to be executed on remote resource(s).
|
||||
#' @param master IP address or URL of a DSVM which will be used as the master. By default is remote.
|
||||
#' @param slaves IP addresses or URLs of slave DSVMs.
|
||||
#' @param computeContext Computation context of Microsoft R Server under which the mechanisms of parallelization (e.g., local parallel, cluster based parallel, or Spark) is specified. Accepted computing context include "localParallel", "clusterParallel", "Hadoop", and "Spark".
|
||||
#' @return Status of scription execution.
|
||||
#' @export
|
||||
|
@ -14,6 +16,8 @@ executeScript <- function(context,
|
|||
remote,
|
||||
user,
|
||||
script,
|
||||
master,
|
||||
slaves,
|
||||
computeContext) {
|
||||
|
||||
# switch on the machines.
|
||||
|
@ -27,14 +31,16 @@ executeScript <- function(context,
|
|||
operation="Start")
|
||||
}
|
||||
|
||||
# manage input strings in an interface object.
|
||||
# manage input strings in an interface new_interface.
|
||||
|
||||
new_interface <- createComputeInterface(remote, user, script)
|
||||
|
||||
# set configuration
|
||||
|
||||
new_interface %<>% setConfig(new_interface,
|
||||
dns_list=remote,
|
||||
new_interface %<>% setConfig(machine_list=machines,
|
||||
master=master,
|
||||
slaves=slaves,
|
||||
dns_list=c(master, slaves),
|
||||
machine_user=user,
|
||||
context=computeContext)
|
||||
|
||||
|
@ -49,45 +55,49 @@ executeScript <- function(context,
|
|||
# execute script on remote machine(s).
|
||||
|
||||
option <- "-q -o StrictHostKeyChecking=no"
|
||||
remote_script <- paste0("script", as.character(Sys.time()), ".R")
|
||||
remote_script <- paste0("script_", paste0(sample(letters, 5), collapse=""), ".R")
|
||||
|
||||
exe <- system(paste0("scp %s -l %s %s %s:%s",
|
||||
option,
|
||||
object$user,
|
||||
object$script,
|
||||
object$remote,
|
||||
remote_script),
|
||||
exe <- system(sprintf("scp %s %s %s@%s:~/%s",
|
||||
option,
|
||||
new_interface$script,
|
||||
new_interface$user,
|
||||
new_interface$remote,
|
||||
remote_script),
|
||||
show.output.on.console=FALSE)
|
||||
if (is.null(attributes(exe)))
|
||||
{
|
||||
writeLines(sprintf("File %s is successfully uploaded on %s$%s.",
|
||||
object$script, object$user, object$remote))
|
||||
new_interface$script, new_interface$user, new_interface$remote))
|
||||
} else {
|
||||
writeLines("Something must be wrong....... See warning message.")
|
||||
}
|
||||
|
||||
# Execute the script.
|
||||
|
||||
exe <- system(paste("ssh %s -l %s %s Rscript %s %s",
|
||||
option,
|
||||
object$user,
|
||||
object$remote,
|
||||
roptions,
|
||||
remote_script),
|
||||
exe <- system(sprintf("ssh %s -l %s %s Rscript %s",
|
||||
option,
|
||||
new_interface$user,
|
||||
new_interface$remote,
|
||||
remote_script),
|
||||
intern=TRUE,
|
||||
show.output.on.console=TRUE)
|
||||
if (is.null(attributes(exe)))
|
||||
{
|
||||
writeLines(sprintf("File %s is successfully executed on %s$%s.",
|
||||
object$script, object$user, object$remote))
|
||||
new_interface$script, new_interface$user, new_interface$remote))
|
||||
} else {
|
||||
writeLines("Something must be wrong....... See warning message.")
|
||||
}
|
||||
|
||||
if (!missing(verbose))
|
||||
{
|
||||
if (verbose == TRUE) writeLines(exe)
|
||||
}
|
||||
writeLines(exe)
|
||||
|
||||
# need post-execution message...
|
||||
|
||||
# clean up - remove the script.
|
||||
|
||||
system(sprintf("ssh %s -l %s %s rm %s",
|
||||
option,
|
||||
new_interface$user,
|
||||
new_interface$remote,
|
||||
remote_script))
|
||||
}
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
#' @title Operations on a data science virtual machine. Available operations are "Check", "Start", "Stop", and "Delete".
|
||||
#' @param context AzureSMR context.
|
||||
#' @param resource.group Resource group.
|
||||
#' @param name Name of the DSVM.
|
||||
#' @param name Name(s) of the DSVM(s).
|
||||
#' @param operation Operations on the DSVM. Available operations are "Check", "Start", "Stop", "Delete", which check the status of, start running, stop running, and delete a DSVM, respectively.
|
||||
#' @export
|
||||
operateDSVM <- function(context,
|
||||
resource.group,
|
||||
name,
|
||||
names,
|
||||
operation="Check") {
|
||||
# check if token is valid.
|
||||
|
||||
|
@ -16,7 +16,7 @@ operateDSVM <- function(context,
|
|||
|
||||
if (missing(context)) stop("Please specify AzureSMR context.")
|
||||
if (missing(resource.group)) stop("Please specify resource group.")
|
||||
if (missing(name)) stop("Please specify DSVM name.")
|
||||
if (missing(names)) stop("Please specify DSVM name.")
|
||||
if (missing(operation)) stop("Please specify an operation on the DSVM")
|
||||
|
||||
# check if input operations are available.
|
||||
|
@ -29,36 +29,38 @@ operateDSVM <- function(context,
|
|||
resourceGroup=resource.group,
|
||||
verbose=FALSE)
|
||||
|
||||
if(!(name %in% unlist(vm_names$name)))
|
||||
stop("DSVM does not exist.")
|
||||
if(!all(names %in% unlist(vm_names$name)))
|
||||
stop("Given DSVM(s) does not exist in resource group.")
|
||||
|
||||
status <- AzureSMR::azureVMStatus(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
for (name in names) {
|
||||
status <- AzureSMR::azureVMStatus(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
|
||||
if (operation == "Check") {
|
||||
print(status)
|
||||
} else if (operation == "Start") {
|
||||
if(status == "Provisioning succeeded, VM running")
|
||||
return("The DSVM has already been started.")
|
||||
if (operation == "Check") {
|
||||
print(status)
|
||||
} else if (operation == "Start") {
|
||||
if(status == "Provisioning succeeded, VM running")
|
||||
return("The DSVM has already been started.")
|
||||
|
||||
AzureSMR::azureStartVM(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
} else if (operation == "Stop") {
|
||||
if(status == "Provisioning succeeded, VM deallocated")
|
||||
return("The DSVM has already been stopped.")
|
||||
AzureSMR::azureStartVM(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
} else if (operation == "Stop") {
|
||||
if(status == "Provisioning succeeded, VM deallocated")
|
||||
return("The DSVM has already been stopped.")
|
||||
|
||||
AzureSMR::azureStopVM(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
} else {
|
||||
AzureSMR::azureDeleteVM(azureActiveContext=context,
|
||||
AzureSMR::azureStopVM(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
} else {
|
||||
AzureSMR::azureDeleteVM(azureActiveContext=context,
|
||||
resourceGroup=resource.group,
|
||||
vmName=name,
|
||||
verbose=FALSE)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,16 @@ high-performance computing context.
|
|||
We assume that the first step of [ConnectToLinuxDSVM](https://github.com/Azure/AzureDSR/vignettes/ConnectToLinuxDSVM.Rmd) has been done, and there is at least one Linux DSVM existing at the created resouce group.
|
||||
|
||||
To begin with, let's check the status of the DSVM and start it if it is deallocated. This is achieved with AzureSMR, and again confidentials for authenticating the app in Active Directory should be provided.
|
||||
|
||||
```{r setup}
|
||||
# Load the required subscription resources: TID, CID, and KEY.
|
||||
# Also includes the ssh PUBKEY for the user.
|
||||
|
||||
USER <- Sys.getenv("USER")
|
||||
|
||||
source(paste0(USER, "_credentials.R"))
|
||||
```
|
||||
|
||||
```{r credentials, eval=FALSE}
|
||||
# Credentials come from app creation in Active Directory within Azure.
|
||||
|
||||
|
@ -52,8 +62,9 @@ library(rattle) # Use weatherAUS as a "large" dataset.
|
|||
|
||||
RG <- "my_dsvm_rg_sea" # Create if not already exist then kill.
|
||||
LOC <- "southeastasia" # Where the resource group (resources) will be hosted.
|
||||
LDSVM <- "mydsvm001"
|
||||
VM_URL <- paste(VM, LOC, "cloudapp.azure.com", sep=".")
|
||||
# LDSVM <- "mydsvm001"
|
||||
LDSVM <- c("zzz001", "zzz002")
|
||||
VM_URL <- paste(LDSVM, LOC, "cloudapp.azure.com", sep=".")
|
||||
```
|
||||
|
||||
# DSVM Operation
|
||||
|
@ -71,17 +82,14 @@ vm_names <-
|
|||
AzureSMR::azureListVM(context, RG, LOC) %T>%
|
||||
print()
|
||||
|
||||
# check status of a DSVM.
|
||||
|
||||
operateDSVM(context, RG, LDSVM, operation="Check")
|
||||
|
||||
# start the DSVM if it is not running.
|
||||
|
||||
operateDSVM(context, RG, LDSVM, operation="Start")
|
||||
|
||||
# stop the DSVM
|
||||
# check status of a DSVM.
|
||||
|
||||
operateDSVM(context, RG, LDSVM, operation="Check")
|
||||
|
||||
operateDSVM(context, RG, LDSVM, operation="Stop")
|
||||
```
|
||||
|
||||
# Run analytics.
|
||||
|
@ -90,17 +98,82 @@ Next step is to use the DSVM for data analytics.
|
|||
|
||||
There are many ways of interacting with a DSVM. For both Linux and Windows based DSVMs, it is convenient to remote login onto the machines with GUI (more detailed information can be found [here](https://docs.microsoft.com/en-us/azure/machine-learning/machine-learning-data-science-provision-vm)). A lot of times remote execution within R session is preferred by data scientist as it can be efficiently automated by R scripts. The following chunks of codes demonstrate how to use an R interface for remote execution of R scripts under a desired computing context.
|
||||
|
||||
A very simple experiment on random number generation.
|
||||
```{r set R interface}
|
||||
|
||||
# Create an R interface for handling the remote execution.
|
||||
|
||||
interface <- createRInterface(remote=VM_URL, user=USER)
|
||||
|
||||
# Create a script for remote execution.
|
||||
|
||||
newScript(path=".", title="experiment1.R")
|
||||
codes <- "x <- seq(1, 500); y <- x * rnorm(length(x), 0, 0.1); sprintf('The results is %d', y)"
|
||||
|
||||
# Put analytics into the script.
|
||||
writeLines(codes, "./experiment1.R")
|
||||
|
||||
# local parallelism on node cores.
|
||||
|
||||
t1 <- Sys.time()
|
||||
|
||||
executeScript(context,
|
||||
resourceGroup=RG,
|
||||
machines=LDSVM,
|
||||
remote=VM_URL[1],
|
||||
user="zzzuser",
|
||||
script="./experiment1.R",
|
||||
master=VM_URL[1],
|
||||
slaves=VM_URL[1],
|
||||
computeContext="localParallel")
|
||||
|
||||
t2 <- Sys.time()
|
||||
|
||||
# cluster parallelism across nodes.
|
||||
|
||||
executeScript(context,
|
||||
resourceGroup=RG,
|
||||
machines=LDSVM,
|
||||
remote=VM_URL[1],
|
||||
user="zzzuser",
|
||||
script="./experiment1.R",
|
||||
master=VM_URL[1],
|
||||
slaves=VM_URL[-1],
|
||||
computeContext="clusterParallel")
|
||||
|
||||
t3 <- Sys.time()
|
||||
|
||||
performance1 <- t2 - t1
|
||||
performance2 <- t3 - t2
|
||||
|
||||
performance1
|
||||
performance2
|
||||
|
||||
```
|
||||
|
||||
Yet another example with parallel execution by using `rxExec` function
|
||||
|
||||
```{r}
|
||||
|
||||
# parallelizing k-means clustering on iris data.
|
||||
|
||||
codes <- paste("library(scales)",
|
||||
"df <- scale(iris[, -5])",
|
||||
"rxExec(kmeans, x=df, centers=2)",
|
||||
sep=";")
|
||||
|
||||
writeLines(codes, "./experiment2.R")
|
||||
|
||||
executeScript(context,
|
||||
resourceGroup=RG,
|
||||
machines=LDSVM,
|
||||
remote=VM_URL[1],
|
||||
user="zzzuser",
|
||||
script="./experiment2.R",
|
||||
master=VM_URL[1],
|
||||
slaves=VM_URL[-1],
|
||||
computeContext="clusterParallel")
|
||||
|
||||
```
|
||||
|
||||
Clean up.
|
||||
|
||||
```{r}
|
||||
|
||||
file.remove("./experiment1.R", "./experiment2.R")
|
||||
|
||||
updateScript(interface)
|
||||
```
|
||||
|
|
Загрузка…
Ссылка в новой задаче