Merge branch 'master' of github.com:Azure/AzureDSR

This commit is contained in:
Graham Williams 2017-02-17 14:47:16 +08:00
Родитель 67f5d24135 36f344eb66
Коммит ed1e80f8ea
4 изменённых файлов: 242 добавлений и 8 удалений

226
R/computeInterface.R Normal file
Просмотреть файл

@ -0,0 +1,226 @@
#' @title Create an compute interface object that handles interaction with remote instance.
#' @param remote URL of remote instance.
#' @param user User name.
#' @param script R script with full path for execution at remote instance.
#' @param config Configuration for remote execution. Settings include computing context, data reference, etc.
#' @return An S3 compute interface object.
#' @export
createComputeInterface <- function(remote,
user,
script,
config){
ci_env <- new.env(parent=globalenv())
# initialize an compute interface object.
if(!missing(remote)) {
ci_env$remote <- remote
} else {
ci_env$remote <- character(0)
}
if(!missing(user)) {
ci_env$user <- user
} else {
ci_env$user <- character(0)
}
if(!missing(script)) {
ci_env$script <- script
} else {
ci_env$script <- character(0)
}
if(!missing(config)) {
ci_env$config <- config
} else {
ci_env$config <- NULL
}
return(ci_env)
}
#' @title Set configuration for the compute interface object.
#' @param object An S3 compute interface object.
#' @param machine_list List of remote instances that execute R scripts.
#' @param dns_list DNS of the remote instances.
#' @param machine_user User name of the remote instances.
#' @param master Master node of the machine.
#' @param slaves Slave nodes of the machine.
#' @param data Reference to data used in the analytics.
#' @param context Computing context available in Microsoft R Server for running the analytics.
#' @export
setConfig <- function(object,
machine_list,
dns_list,
machine_user,
master,
slaves,
data,
context) {
object$config <- list(
CI_MACHINES = ifelse(!missing(machine_list), list(machine_list), ""),
CI_DNS = ifelse(!missing(dns_list), list(dns_list), ""),
CI_VMUSER = ifelse(!missing(machine_user), list(machine_user), ""),
CI_MASTER = ifelse(!missing(master), list(master), ""),
CI_SLAVES = ifelse(!missing(slaves), list(slaves), ""),
CI_DATA = ifelse(!missing(data), list(data), ""),
CI_CONTEXT = ifelse(!missing(context), list(context), "")
)
return(object)
}
#' @title Dump out the object configuration.
#' @param object The compute interface object.
#' @return No return. Print compute interface object information.
#' @export
dumpObject <- function(object) {
cat(
sprintf("---------------------------------------------------------------------------"),
sprintf("Compute interface information"),
sprintf("---------------------------------------------------------------------------"),
sprintf("The R script to be executed:\t%s.", shQuote(object$script)),
sprintf("The remote host:\t\t%s.", shQuote(object$remote)),
sprintf("The login user name:\t\t%s.", shQuote(object$user)),
sprintf("---------------------------------------------------------------------------"),
sprintf("The configuration of the interface is:"),
# sprintf("virtual machines: %s", ifelse(!is.na(object$config$CI_MACHINES), object$config$CI_MACHINES, "N/A")),
sprintf("virtual machines\t\t %s", unlist(object$config$CI_MACHINES)),
sprintf("dns list\t\t\t %s", unlist(object$config$CI_DNS)),
sprintf("user to these machines\t\t %s", unlist(object$config$CI_VMUSER)),
sprintf("the master node\t\t\t %s", unlist(object$config$CI_MASTER)),
sprintf("the slave nodes\t\t\t %s", unlist(object$config$CI_SLAVES)),
sprintf("the data source\t\t\t %s", unlist(object$config$CI_DATA)),
sprintf("the computing context\t\t %s", unlist(object$config$CI_CONTEXT)),
sprintf("---------------------------------------------------------------------------"),
sprintf("# End of information session"),
sprintf("---------------------------------------------------------------------------"),
sep = "\n"
)
}
#' @title Update a worker script with compute interface object configuration.
#' @param object compute interface object.
#' @export
updateScript <- function(object) {
if (!file.exists(object$script) || length(object$script) == 0)
{
stop(paste("The script does not exist or is not specified!",
"Consider create a new one using riNewScript."))
}
codes.body <- readLines(con=object$script)
# Remove the header.
if (codes.body[2] == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
{
head.start <- which(codes.body == "# THIS IS A HEADER ADDED BY COMPUTE INTERFACE")
head.end <- which(codes.body == "# END OF THE HEADER ADDED BY COMPUTE INTERFACE")
codes.body <- codes.body[-((head.start - 1):(head.end + 1))]
}
# Add context-specific info into header.
codes.head <- paste(
"# ---------------------------------------------------------------------------",
"# THIS IS A HEADER ADDED BY COMPUTE INTERFACE",
"# ---------------------------------------------------------------------------",
sep="\n"
)
if (object$config$CI_CONTEXT == "clusterParallel")
{
codes.head <- paste(
codes.head,
paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"),
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
paste("CI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$CI_SLAVES)), collapse=", "), ")"),
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
"\nlibrary(RevoScaleR)",
"# --------- Set compute context",
"cl <- makePSOCKcluster(names=CI_SLAVES, master=CI_MASTER, user=CI_VMUSER)",
"registerDoParallel(cl)",
"rxSetComputeContext(RxForeachDoPar())",
"# --------- Load data.",
"ciData <- read_csv(CI_DATA)",
"# ---------------------------------------------------------------------------",
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
"# ---------------------------------------------------------------------------\n",
sep="\n"
)
} else if (object$config$CI_CONTEXT == "Hadoop")
{
codes.head <- paste(
codes.head,
"This is for Hadoop.",
"\n"
)
} else if (object$config$CI_CONTEXT == "Spark")
{
codes.head <- paste(
codes.head,
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
paste("CI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$CI_SLAVES)), collapse=", "), ")"),
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
"\nlibrary(RevoScaleR)",
"# --------- Set compute context",
"myHadoopCluster <- RxSpark(persistentRun=TRUE, idleTimeOut=600)",
"rxSetComputeContext(myHadoopCluster)",
"# --------- Load data.",
"ciData <- read_csv(CI_DATA)",
"bigDataDirRoot <- '/share'",
"inputDir <- file.path(bigDataDirRoot, 'riBigData')",
"rxHadoopMakeDir(inputDir)",
"rxHadoopCopyFromLocal('./data.csv', inputDir)",
"hdfsFS <- RxHdfsFileSystem()",
"riTextData <- RxTextData(file=inputDir, fileSystem=hdfsFS)",
"# ---------------------------------------------------------------------------",
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
"# ---------------------------------------------------------------------------\n",
sep="\n"
)
} else if (object$config$CI_CONTEXT == "Teradata")
{
codes.head <- paste(
codes.head,
"This is for Teradata.",
"\n"
)
} else if (object$config$CI_CONTEXT == "localParallel")
{
codes.head <- paste(
codes.head,
paste("CI_MACHINES <-", "c(", paste(shQuote(unlist(object$config$CI_MACHINES)), collapse=", "), ")"),
paste("CI_DNS <-", "c(", paste(shQuote(unlist(object$config$CI_DNS)), collapse=", "), ")"),
paste("CI_VMUSER <-", "c(", paste(shQuote(unlist(object$config$CI_VMUSER)), collapse=", "), ")"),
paste("CI_MASTER <-", "c(", paste(shQuote(unlist(object$config$CI_MASTER)), collapse=", "), ")"),
paste("CI_SLAVES <-", "c(", paste(shQuote(unlist(object$config$CI_SLAVES)), collapse=", "), ")"),
paste("CI_DATA <-", paste(shQuote(unlist(object$config$CI_DATA)), collapse=", ")),
paste("CI_CONTEXT <-", paste(shQuote(unlist(object$config$CI_CONTEXT)), collapse=", ")),
"\nlibrary(RevoScaleR)",
"library(doParallel)",
"# --------- Set compute context",
"rxSetComputeContext(RxLocalParallel())",
"# --------- Load data.",
"ciData <- read_csv(CI_DATA)",
"# ---------------------------------------------------------------------------",
"# END OF THE HEADER ADDED BY COMPUTE INTERFACE",
"# ---------------------------------------------------------------------------\n",
sep="\n"
)
} else {
stop(paste("Specify a context from \"localParallel\", \"clusterParallel\",",
"\"Hadoop\", \"Spark\", or \"Teradata\"."))
}
cat(codes.head, file=object$script)
cat(codes.body, file=object$script, sep="\n", append=TRUE)
}

Просмотреть файл

@ -1,5 +1,4 @@
#' @title Remote execution of R script in an R interface object.
#' @param object R interface object.
#' @param inputs JSON encoded string of R objects that are loaded into the Remote R session's workspace prior to execution. Only R objects of type: primitives, vectors and dataframes are supported via this parameter. Alternatively the putLocalObject can be used, prior to a call to this function, to move any R object from the local workspace into the remote R session.
#' @param outputs Character vector of the names of the objects to retreive. Only primitives, vectors and dataframes can be retrieved using this function. Use getRemoteObject to get any type of R object from the remote session.
#' @param checkLibraries if `TRUE`, check whether libraries used in the R script installed on the remote machine.
@ -7,16 +6,25 @@
#' @param writePlots If TRUE, plots generated during execution are copied to the working directory of the local session.
#' @return Status of scription execution.
#' @export
executeScript <- function(object,
executeScript <- function(remote,
user,
script,
computeContext,
inputs=NULL,
outputs=NULL,
checkLibraries=FALSE,
displayPlots=FALSE,
writePlots=FALSE) {
# add remote spefic header to worker script.
# manage input strings in an interface object, set configuration, show interface contents, and update script with computing context specification.
editScript(object)
new_interface <-
createComputeInterface(remote, user, script) %>%
setConfig(dns_list=remote,
machine_user=user,
context=computeContext) %>%
dumpInterface() %>%
updateScript()
# authenticate the remote server.

Просмотреть файл

@ -26,7 +26,7 @@
"virtualNetworks_name": {
"value": "<DEFAULT>vnet"
},
"storageAccounts_dsvmdisks490_name": {
"storageAccounts_name": {
"value": "<DEFAULT>sa"
}
}

Просмотреть файл

@ -38,7 +38,7 @@
"defaultValue": "my_dsvm_rg_sea-vnet",
"type": "String"
},
"storageAccounts_dsvmdisks490_name": {
"storageAccounts_name": {
"defaultValue": "dsvmdisks490",
"type": "String"
}
@ -70,7 +70,7 @@
"name": "[parameters('virtualMachines_name')]",
"createOption": "FromImage",
"vhd": {
"uri": "[concat('https://projectsdisks415.blob.core.windows.net/vhds/', parameters('virtualMachines_name'),'20170215163501.vhd')]"
"uri": "[concat('http://',concat(uniquestring(resourceGroup().id), parameters('storageAccounts_name')),'.blob.core.windows.net/','vhds','/','osdiskforwindowssimple', parameters('virtualMachines_name'), '.vhd')]"
},
"caching": "ReadWrite"
},
@ -231,7 +231,7 @@
"tier": "Standard"
},
"kind": "Storage",
"name": "[parameters('storageAccounts_dsvmdisks490_name')]",
"name": "[parameters('storageAccounts_name')]",
"apiVersion": "2016-01-01",
"location": "[parameters('location')]",
"tags": {},