Relocate scripts used in vignettes.

This commit is contained in:
yueguoguo 2017-06-28 14:47:56 +08:00
Родитель 694c378ad4
Коммит bc39753c3e
11 изменённых файлов: 662 добавлений и 9 удалений

Просмотреть файл

@ -174,7 +174,7 @@ data are labelled and so in our clustering analysis the label is
removed.
The R code for clustering is available from github as
[workerCluster.R]{...test/workerCluster.R}. The analysis basically
[workerCluster.R]{./test/workerCluster.R}. The analysis basically
normalises the credit transaction data and then performs 10 repeated
clustering analyses (targeting 2 clusters) for each using the k-means
algorithm. The repetition is completed in parallel with the specified
@ -184,7 +184,7 @@ computing context information will be automatically added by the
The script can then be saved and later on path to the script is used
as reference. For example, in this demo, the script is saved with name
"workerCluster.R" which is located in the "/test" directory.
"workerCluster.R" which is located in the "vignettes/test" directory.
The following code is to run the clustering analysis on a specified
computing environment. This is achieved by setting computing
@ -219,7 +219,7 @@ executeScript(context=context,
hostname=machines[1],
remote=master,
username=USER,
script="../test/workerCluster.R",
script="test/workerCluster.R",
compute.context="localParallel")
time_2 <- Sys.time()
@ -249,7 +249,7 @@ AzureDSVM::executeScript(context=context,
hostname=machines,
remote=master,
username=USER,
script="../test/workerCluster.R",
script="test/workerCluster.R",
master=master,
slaves=slaves,
compute.context="clusterParallel")

Просмотреть файл

@ -194,7 +194,7 @@ worker_scripts <- c("workerHotspotsFuncs.R",
sapply(worker_scripts,
fileTransfer,
from="../test",
from="test",
to=paste0(master, ":~"),
user=USER)
```
@ -213,7 +213,7 @@ AzureDSVM::executeScript(context=context,
hostname=machines,
remote=master,
username=USER,
script="../test/workerHotspots.R",
script="test/workerHotspots.R",
compute.context="localParallel")
time_2 <- Sys.time()

Просмотреть файл

@ -249,7 +249,7 @@ mlProcess <- function(formula, data, modelName, modelPara) {
The worker script can be executed on a remote Ubuntu DSVM or DSVM cluster with AzureDSVM function `executeScript` like what has been done in the previous tutorials.
The worker script for binary classification is located in "/test" directory, with name "worker_classficiation.R".
The worker script for binary classification is located in "vignettes/test" directory, with name "worker_classficiation.R".
```{r}
VM_URL <- paste(HOST, LOC, "cloudapp.azure.com", sep=".")
@ -271,7 +271,7 @@ executeScript(context,
hostname=HOST[1],
remote=FQDN[1],
username=USER,
script="../test/workerClassification.R",
script="test/workerClassification.R",
compute.context="localParallel")
# remote execution on a cluster of DSVMs.
@ -283,7 +283,7 @@ executeScript(context,
hostname=HOST,
remote=FQDN[1],
username=USER,
script="../test/workerClassification.R",
script="test/workerClassification.R",
master=FQDN[1],
slaves=FQDN[-1],
compute.context="clusterParallel")

Просмотреть файл

@ -0,0 +1,220 @@
# ---------------------------------------------------------------------------
# THIS IS A HEADER ADDED BY COMPUTE INTERFACE
# ---------------------------------------------------------------------------
CI_MACHINES <- c( "myqjqs", "myqjqs001", "myqjqs002", "myqjqs003", "myqjqs004" )
CI_DNS <- c( "myqjqs.southeastasia.cloudapp.azure.com", "myqjqs001.southeastasia.cloudapp.azure.com", "myqjqs002.southeastasia.cloudapp.azure.com", "myqjqs003.southeastasia.cloudapp.azure.com", "myqjqs004.southeastasia.cloudapp.azure.com" )
CI_VMUSER <- c( "zhle" )
CI_MASTER <- c( "myqjqs.southeastasia.cloudapp.azure.com" )
CI_SLAVES <- c( "myqjqs001.southeastasia.cloudapp.azure.com", "myqjqs002.southeastasia.cloudapp.azure.com", "myqjqs003.southeastasia.cloudapp.azure.com", "myqjqs004.southeastasia.cloudapp.azure.com" )
CI_DATA <- ""
CI_CONTEXT <- "clusterParallel"
library(RevoScaleR)
# library(readr)
library(doParallel)
# --------- Set compute context
cl <- makePSOCKcluster(names=CI_SLAVES, master=CI_MASTER, user=CI_VMUSER)
registerDoParallel(cl)
rxSetComputeContext(RxForeachDoPar())
# --------- Load data.
# ciData <- ifelse(CI_DATA != '', read_csv(CI_DATA), data.frame(0))
# ---------------------------------------------------------------------------
# END OF THE HEADER ADDED BY COMPUTE INTERFACE
# ---------------------------------------------------------------------------
# In this script a learning process that search for an optimal model for solving a classification problem is presented. To illustrate the convenience of using cloud for parallelizing such a learning process. AzureDSR is used.
# data for use.
# data to use for the ML process.
data_config <- data.frame(name=c("Employee Attrition Prediction",
"Adult Income",
"Credit Card Transaction",
"Australia Weather",
"Mushroom",
"Hep Mass",
"Higgs"),
url=c("https://zhledata.blob.core.windows.net/mldata/employee.xdf",
"https://zhledata.blob.core.windows.net/mldata/adult.xdf",
"https://zhledata.blob.core.windows.net/mldata/credit.xdf",
"https://zhledata.blob.core.windows.net/mldata/weather.xdf",
"https://zhledata.blob.core.windows.net/mldata/mushroom.xdf",
"https://zhledata.blob.core.windows.net/mldata/hepmass.xdf",
"https://zhledata.blob.core.windows.net/mldata/higgs.xdf"),
label=c("Attrition",
"X15",
"Class",
"RainTomorrow",
"class",
"class",
"X1"),
colOptions=c(TRUE,
FALSE,
TRUE,
TRUE,
TRUE,
TRUE,
FALSE),
stringsAsFactors=FALSE)
# algorithms for use.
model_config <- list(name=c("rxLogit", "rxBTrees", "rxDForest"),
para=list(list(list(maxIterations=10,
coeffTolerance=1e-6),
list(maxIterations=15,
coeffTolerance=2e-6),
list(maxIterations=20,
coeffTolerance=3e-6)),
list(list(nTree=10,
learningRate=0.05),
list(nTree=15,
learningRate=0.1),
list(nTree=20,
learningRate=0.15)),
list(list(cp=0.01,
nTree=10,
mTry=3),
list(cp=0.01,
nTree=15,
mTry=3),
list(cp=0.01,
nTree=20,
mTry=3))))
# define a function for binary classification problem.
mlProcess <- function(formula, data, modelName, modelPara) {
xdf <- RxXdfData(file=data)
# split data into training set (70%) and testing set (30%).
data_part <- c(train=0.7, test=0.3)
data_split <-
rxSplit(xdf,
outFilesBase=tempfile(),
splitByFactor="splitVar",
transforms=list(splitVar=
sample(data_factor,
size=.rxNumRows,
replace=TRUE,
prob=data_part)),
transformObjects=
list(data_part=data_part,
data_factor=factor(names(data_part), levels=names(data_part))))
data_train <- data_split[[1]]
data_test <- data_split[[2]]
# train model.
if(missing(modelPara) ||
is.null(modelPara) ||
length(modelPara) == 0) {
model <- do.call(modelName, list(data=data_train, formula=formula))
} else {
model <- do.call(modelName, c(list(data=data_train,
formula=formula),
modelPara))
}
# validate model
scores <- rxPredict(model,
data_test,
extraVarsToWrite=names(data_test),
predVarNames="Pred",
outData=tempfile(fileext=".xdf"),
overwrite=TRUE)
label <- as.character(formula[[2]])
roc <- rxRoc(actualVarName=label,
predVarNames=c("Pred"),
data=scores)
auc <- rxAuc(roc)
# clean up.
file.remove(c(data_train@file, data_test@file))
return(list(model=model, metric=auc))
}
# -----------------------------------------------------------------------
# Step 0 - let's do some test. Set up the experiment.
# -----------------------------------------------------------------------
# read data.
data_index <- 3
CI_DATA <- "https://zhledata.blob.core.windows.net/mldata/creditcard.xdf"
download.file(CI_DATA,
destfile="./data.xdf",
mode="wb")
# download data to all nodes if it is cluster parallel.
if (rxGetComputeContext()@description == "dopar") {
clusterCall(cl,
download.file,
url=CI_DATA,
destfile="./data.xdf",
mode="wb")
}
label <- data_config$label[data_index]
label <- as.character(label)
# create a formula.
names <- rxGetVarNames(data="./data.xdf")
names <- names[names != label]
formula <- as.formula(paste(label, "~", paste(names, collapse="+")))
# -----------------------------------------------------------------------
# Step1 - algorithm selection.
# -----------------------------------------------------------------------
# sweep candidate algorithms to select the best one - performance metric such as Area-Under-Curve can be used.
results1 <- rxExec(mlProcess,
formula=formula,
data="data.xdf",
modelName=rxElemArg(model_config$name))
metric1 <- lapply(results1, `[[`, "metric")
algo <- model_config$name[which(metric1 == max(unlist(metric1)))]
para <- model_config$para[[which(model_config$name == algo)]]
# -----------------------------------------------------------------------
# Step2 - parameter tuning.
# -----------------------------------------------------------------------
# after an algo is selected based on some criterion (let's say AUC, which is a balanced metric that considers both sensitivity and specificity.), another parallel execution on different sets of parameters are run - parameter tuning.
# sweep parameters of the selected algorithm to find the optimal model.
results2 <- rxExec(mlProcess,
formula=formula,
data="data.xdf",
modelName=algo,
modelPara=rxElemArg(para))
# select the optimal model with best performance.
metric2 <- lapply(results1, `[[`, "metric")
model_opt <- results2[[which(metric2 == max(unlist(metric2)))]][["model"]]
metric_opt <- results2[[which(metric2 == max(unlist(metric2)))]][["metric"]]
# save results for reference.
results <- list(model_opt, metric_opt)
save(results, file="./results.RData")

Просмотреть файл

@ -0,0 +1,75 @@
# ---------------------------------------------------------------------------
# THIS IS A HEADER ADDED BY COMPUTE INTERFACE
# ---------------------------------------------------------------------------
CI_MACHINES <- c( "jxss001", "jxss002", "jxss003", "jxss004" )
CI_DNS <- c( "jxss001.southeastasia.cloudapp.azure.com", "jxss002.southeastasia.cloudapp.azure.com", "jxss003.southeastasia.cloudapp.azure.com", "jxss004.southeastasia.cloudapp.azure.com" )
CI_VMUSER <- c( "zhle" )
CI_MASTER <- c( "jxss001.southeastasia.cloudapp.azure.com" )
CI_SLAVES <- c( "jxss002.southeastasia.cloudapp.azure.com", "jxss003.southeastasia.cloudapp.azure.com", "jxss004.southeastasia.cloudapp.azure.com" )
CI_DATA <- ""
CI_CONTEXT <- "clusterParallel"
library(RevoScaleR)
# library(readr)
library(doParallel)
# --------- Set compute context
cl <- makePSOCKcluster(names=CI_SLAVES, master=CI_MASTER, user=CI_VMUSER)
registerDoParallel(cl)
rxSetComputeContext(RxForeachDoPar())
# --------- Load data.
# ciData <- ifelse(CI_DATA != '', read_csv(CI_DATA), data.frame(0))
# ---------------------------------------------------------------------------
# END OF THE HEADER ADDED BY COMPUTE INTERFACE
# ---------------------------------------------------------------------------
# This is to run parallel work across nodes for clustering analysis.
# get data from remote blob.
DATA_URL <- "https://zhledata.blob.core.windows.net/mldata/creditcard.xdf"
download.file(DATA_URL,
destfile="./data.xdf",
mode="wb")
# download data to all nodes if it is cluster parallel.
if (rxGetComputeContext()@description == "dopar") {
clusterCall(cl,
download.file,
url=DATA_URL,
destfile="./data.xdf",
mode="wb")
}
# make a function to do clustering of given data set.
clusterAnalysis <- function(data, numClusters) {
xdf <- RxXdfData(data)
# create formula.
names <- rxGetVarNames(data=xdf)
names <- names[!(names %in% c("Class", "Time"))] # the original data set is labelled so remove the label.
formula <- as.formula(paste0("~", paste(names, collapse="+")))
# to scale data.
df <- rxImport(xdf,
varsToDrop=c("Time", "Class"))
df <- as.data.frame(scale(df))
clusters <- rxKmeans(formula,
df,
numClusters=numClusters)
clusters$cluster
}
# do kmeans clustering with rxExec parallelization.
results <- rxExec(FUN=clusterAnalysis,
data="data.xdf",
numClusters=rxElemArg(c(2:5)))
save(results, file="./results.RData")

Просмотреть файл

@ -0,0 +1,58 @@
# ---------------------------------------------------------------------------
# THIS IS A HEADER ADDED BY COMPUTE INTERFACE
# ---------------------------------------------------------------------------
CI_MACHINES <- c( "mytbmm" )
CI_DNS <- c( "", "" )
CI_VMUSER <- c( "zhle" )
CI_MASTER <- c( "" )
CI_SLAVES <- c( "" )
CI_DATA <- ""
CI_CONTEXT <- "localParallel"
library(RevoScaleR)
library(doParallel)
# library(readr)
# --------- Set compute context
rxSetComputeContext(RxLocalParallel())
# --------- Load data.
# ciData <- ifelse(CI_DATA != '', read_csv(CI_DATA), data.frame(0))
# ---------------------------------------------------------------------------
# END OF THE HEADER ADDED BY COMPUTE INTERFACE
# ---------------------------------------------------------------------------
# source the script to load functions used for the analysis.
source("workerHotspotsSetup.R")
source("workerHotspotsFuncs.R")
source("workerHotspotsTrain.R")
source("workerHotspotsTest.R")
source("workerHotspotsProcess.R")
# initial parameter definition.
number_of_clust <- 2:10
train_ratio <- 0.7
lib <- "~/lib" # install packages on a personal lib. Note this merely works for Linux machine.
pkgs <- c("dplyr", "stringr", "stringi", "magrittr", "readr", "rattle", "ggplot2", "DMwR")
data_url <- "https://zhledata.blob.core.windows.net/mldata/creditcard.xdf"
download.file(data_url,
destfile="./data.xdf",
mode="wb")
# install and load packages.
installPkgs(list_of_pkgs=pkgs, lib=lib)
sapply(pkgs, require, character.only=TRUE)
# Hotspots analysis.
eval <- hotSpotsProcess(data=RxXdfData("./data.xdf"),
number.of.clust=number_of_clust,
train.ratio=train_ratio)
# save results.
save(eval, file="./results.RData")

Просмотреть файл

@ -0,0 +1,26 @@
# do preparation.
# -------------------------------------------------------------------------
# Split data into training and testing sets.
# -------------------------------------------------------------------------
dataSplit <- function(data, ratio) {
data_part <- c(train=ratio, test= 1 - ratio)
data_split <-
rxSplit(data,
outFilesBase=tempfile(),
splitByFactor="splitVar",
transforms=list(splitVar=
sample(data_factor,
size=.rxNumRows,
replace=TRUE,
prob=data_part)),
transformObjects=
list(data_part=data_part,
data_factor=factor(names(data_part), levels=names(data_part))))
data_split
}

Просмотреть файл

@ -0,0 +1,84 @@
# whole process of Hotspots analysis (including model creation and evaluation)
hotSpotsProcess <- function(data,
number.of.clust=2:20,
train.ratio=0.7,
lib=.libPaths()) {
# ------------------------------------------------------------------------
# data preparation
# ------------------------------------------------------------------------
xdf <- RxXdfData(data)
# split into training and testing.
data_split <- dataSplit(xdf, train.ratio)
data_train <- RxXdfData(data_split[[1]], varsToDrop="splitVar")
data_test <- RxXdfData(data_split[[2]], varsToDrop="splitVar")
# ------------------------------------------------------------------------
# model training
# ------------------------------------------------------------------------
# clustering.
segments <- rxExec(FUN=clusterAnalysis,
data=data_train,
centers=rxElemArg(number.of.clust))
# determine the optimal k for kmeans with elbow.
elbow <- distPointLine(unlist(lapply(segments, `[[`, "metric")))
# train models for differentiating clusters and classifying data within a cluster.
cluster_model <- segments[[elbow]]$model
cl <- 1:max(cluster_model$cluster)
models <- rxExec(FUN=modelCreation,
data=data_train,
cluster=cluster_model,
cl=rxElemArg(cl),
lib=lib)
# ------------------------------------------------------------------------
# model testing
# ------------------------------------------------------------------------
# determine the cluster testing data belongs to and then use the classifier within that cluster to predict label.
df_test <- rxImport(data_test)
df_scores <- NULL
model_index <- 1:length(models)
# predict cluster.
df_scores <- rxExec(FUN=predictCluster,
df_test=df_test,
models=models,
index=rxElemArg(model_index))
df_scores <- data.frame(do.call(cbind, df_scores))
# predict label.
pred <- predictLabel(df_test, df_scores, models)
# just an illustration - get the confusion matrix as evaluation.
eval <- table(True=df_test$Class, Pred=pred$Class)
# or compute AUC?
roc <- rxRoc(actualVarName="Class",
predVarNames=c("Class_prob"),
data=pred)
# return the results.
results <- list(numberOfClusters=elbow,
confusionMatrix=eval,
roc=roc)
}

Просмотреть файл

@ -0,0 +1,14 @@
# install packages
installPkgs <- function(list_of_pkgs, lib=.libPaths()) {
if (!(lib %in% .libPaths())) {
dir.create(lib, showWarnings=FALSE)
.libPaths(c(lib, .libPaths()))
}
new_packages <- list_of_pkgs[!(list_of_pkgs %in% installed.packages()[,"Package"])]
if(length(new_packages)) {
sapply(new_packages, install.packages, lib=lib)
}
}

Просмотреть файл

@ -0,0 +1,43 @@
predictCluster <- function(df_test, models, index) {
model_s <- models[[index]]$model_s
scores <- rxPredict(model_s, df_test)
}
predictLabel <- function(df_test, df_scores, models) {
# get the highest score to determine the predictive model to use.
id <- max.col(df_scores)
df_test <-
cbind(df_test, id) %>%
mutate(key = row_number())
model_index <- unique(id)
pred <- NULL
for (j in model_index) {
df <- filter(df_test, id == j)
model_c <- models[[j]]$model_c
if (is.numeric(model_c)) {
# since there is only one class within the cluster.
pred <- rbind(pred, data.frame(Class_prob=rep(model_c, nrow(df)), key=df$key))
} else {
result <- rxPredict(model_c, df)
pred <- rbind(pred, mutate(result, key=df$key))
}
}
pred <-
mutate(pred, Class=ifelse(Class_prob > 0.5, 1, 0)) %>%
arrange(key)
pred
}

Просмотреть файл

@ -0,0 +1,133 @@
# functions for hotspot analysis.
# -------------------------------------------------------------------------
# cluster analysis using kmeans algorithm.
# -------------------------------------------------------------------------
clusterAnalysis <- function(data, centers) {
# to scale data.
df <- rxImport(data)
df <- subset(df, select=c(-Time, -Class))
df <- as.data.frame(scale(df))
for (i in 1:10) {
clusters <- kmeans(df,
centers=centers)
# calculate the explained variance for a reference of determining the optimal number of clusters.
model <- clusters
exp_var <- clusters$betweenss / clusters$totss
if (i == 1) {
exp_var_opt <- exp_var
} else {
if (exp_var > exp_var_opt) {
exp_var_opt <- exp_var
model_opt <- model
}
}
}
list(model=model, metric=exp_var_opt)
}
# -------------------------------------------------------------------------
# find "elbow" in a 1-d curve using point-to-line distance.
# -------------------------------------------------------------------------
# Input y is the y value of the points, and x is the index of the y variables. The function firstly defines a line passing through (x1, y1) and (xn, yn) where n is the length of the 1-d curve. Then distances between each point in the curve to that line is calculated. The maximum distance corresponds to the elbow.
distPointLine <- function(y) {
if (length(y) < 3)
error("Please sweep initial number of clusters in a larger range (> 3).")
x <- 1:length(y)
x_1 <- x[1]
x_2 <- x[length(x)]
y_1 <- y[1]
y_2 <- y[length(y)]
dist <- NULL
# calculate the distance
for (i in 2:(length(y) - 1)) {
dist[length(dist) + 1] <- abs((y_2 - y_1) * x[i] - (x_2 - x_1) * y[i] + x_2 * y_1 - y_2 * x_1) / (sqrt((y_2 - y_1) ^ 2 + (x_2 - x_1) ^ 2))
}
elbow <- which(dist == max(dist))
elbow
}
# -------------------------------------------------------------------------
# create tree models to differentiate data segments and classify data within a cluster.
# -------------------------------------------------------------------------
modelCreation <- function(data, cluster, cl, lib) {
library(dplyr)
library(DMwR, lib.loc=lib)
# TODO: do this natively on XDF with dplyrXdf.
data <- rxImport(data)
data <- subset(data, select=-Time)
df_clust <- filter(data, row_number() %in% which(cluster$cluster == cl))
df_not_clust <- filter(data, !(row_number() %in% which(cluster$cluster == cl)))
df_one <-
select(df_clust, -Class) %>%
mutate(label=factor(1, levels=c("0", "1")))
df_another <-
select(df_not_clust, -Class) %>%
mutate(label=factor(0, levels=c("0", "1")))
df_train_s <- rbind(df_one, df_another)
if (nrow(df_one) < round(.1 * nrow(df_another))) {
df_train_s <- as.data.frame(df_train_s)
df_train_s <- SMOTE(label ~ .,
data=df_train_s,
perc.over=300,
perc.under=150)
}
# train a tree model to differentiate one segment from the rest.
names <- rxGetVarNames(df_train_s)
vars <- names[-length(names)]
form <- formula(paste0("label ~ ", paste(vars, collapse="+")))
model_s <- rxBTrees(formula=form, data=df_train_s)
# then train another model to do classification within the cluster.
df_train_c <-
df_clust %>%
mutate(Class=factor(Class))
names <- rxGetVarNames(df_train_c)
vars <- names[-length(names)]
form <- formula(paste0("Class ~ ", paste(vars, collapse="+")))
# the segment may contain merely one class of data - in this case, the model will not be created, and a NULL is returned.
if (length(unique(df_train_c$Class)) == 1) {
model_c <- as.numeric(df_train_c$Class[1]) # if there is no model being created, return the class label as a numerical number.
} else {
model_c <- rxBTrees(formula=form, data=df_train_c)
}
# return the two models.
models <- list(model_s=model_s, model_c=model_c)
models
}