diff --git a/Data/about data.txt b/Data/about data.txt new file mode 100644 index 0000000..ba52d90 --- /dev/null +++ b/Data/about data.txt @@ -0,0 +1,4 @@ +Data used in this analysis are Criteo's day 14 - day 23 data, which is about 420 GB. More information about the data can be found at: +https://blogs.technet.microsoft.com/machinelearning/2015/04/01/now-available-on-azure-ml-criteos-1tb-click-prediction-dataset/ + +The unique values for categorical variables and means for integer variables are used in feature engineering step and they are saved in separate datasets. These summaries are based on day 0 - day 23 data which use about 1 TB disk space. The summaries can be downloaded from https://mypublicstorage.blob.core.windows.net/mycontainer/CriteoSummaries.zip \ No newline at end of file diff --git a/README.md b/README.md index 2f6742e..f60d4ee 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,33 @@ # RServer-for-HDInsight-example-CriteoDataSet This repo contains a walkthrough of how to use RServer for HDInsight with large data sets like Criteo. + +## Running Instructions +It took about 10 hours to run the analysis on my cluster using the Criteo data for day 14 - day 23 (420 GB). You can test your cluster and the program by using a subset of the data, e.g., data for day 14 (46 GB). + +### Deploy an HDInsight cluster +More information about how to deploy [R Server for HDInsight](https://azure.microsoft.com/en-us/services/hdinsight/r-server/) can be found at the [documentation site](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-r-server-overview/). It is recommended that you install RStudio on the cluster by following the [instructions](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-r-server-install-r-studio/) as well. Here's the information on the cluster I deployed: + +| Type | Cores | RAM (GB) | Nodes | Pricing Tier | +|--------------|------:|---------:|------:|--------------| +| Head Nodes | 32 | 224 | 2 | D14 | +| Worker Nodes | 960 | 6,720 | 60 | D14 | + +### Get the Criteo data +Information on the data can be found at [Now Available on Azure ML – Criteo's 1TB Click Prediction Dataset](https://blogs.technet.microsoft.com/machinelearning/2015/04/01/now-available-on-azure-ml-criteos-1tb-click-prediction-dataset/). After downloading and extracting data for day 14 - day 23, upload them to a folder on your HDInsight cluster using tools like [AzCopy](https://azure.microsoft.com/en-us/documentation/articles/storage-use-azcopy/). + +### Get the summary data +The summary data can be downloaded from [an Azure blob](https://mypublicstorage.blob.core.windows.net/mycontainer/CriteoSummaries.zip). The summary is for the 1 TB data and includes frequency counts for categorical variables and means for integer variables. After downloading and extracting data, upload them to your HDInsight cluster using tools like [AzCopy](https://azure.microsoft.com/en-us/documentation/articles/storage-use-azcopy/). + +### Update the programs +SetComputeContext.R +* Enter the nodename of your cluster and update the WASB address. +* Replace the value of *dataDir* with the correct path to where the data is saved. For example, I saved all data for my project in the folder "/lixun/CriteoAzure" so I assiged this path to *dataDir*. + +CriteoMain.R +* Update the paths to the raw Criteo data as well as summaries of categorical and integer variables. + +CriteoMainCall.R +* Change the working directory to point to your folder where the programs are saved. + +### Run CriteoMainCall.R +For example, you can run the program from RStudio installed on the HDInsight cluster. \ No newline at end of file diff --git a/RServerCode/CriteoMain.R b/RServerCode/CriteoMain.R new file mode 100644 index 0000000..bc04b29 --- /dev/null +++ b/RServerCode/CriteoMain.R @@ -0,0 +1,467 @@ +# criteo analysis with SparkR and MRS + +# ---------------------------------------------------------------------------- +# get compute context parameters +# ---------------------------------------------------------------------------- +source("SetComputeContext.R") + +# ---------------------------------------------------------------------------- +# general parameters +# ---------------------------------------------------------------------------- +# save the current time +time_start <- proc.time() + +# to make results replicable +set.seed(1) + +# report rows processed and timings +rxOptions(reportProgress = 2) + +# frequency threshold for the categorical variables +t_percent <- 0.05 + +# variable lists +myvars_I <- c("I1", "I2", "I3", "I4", "I5", "I6", "I7", "I8", + "I9", "I10", "I11", "I12", "I13") + +myvars_C <- c("C1", "C2", "C3", "C4", + "C5", "C6", "C7", "C8", "C9", + "C10", "C11", "C12", "C13", + "C14", "C15", "C16", "C17", + "C18", "C19", "C20", "C21", + "C22", "C23", "C24", "C25", + "C26") + +myvars <- c(myvars_I, myvars_C) + +# save the current time +time_read_data <- proc.time() + +# ---------------------------------------------------------------------------- +# extract frequent values for each categorical variable +# ---------------------------------------------------------------------------- + +colInfo <- list(V1 = list(newName = "group", type="character"), + V2 = list(newName = "count", type="numeric"), + V3 = list(newName = "varId", type="integer")) + +# read the summary info for whole dataset into memory +summaryCatTXT <- RxTextData(file.path(dataDir, "summaryCategoricalCleaned0"), colInfo = colInfo) +summaryCatXDF <- RxXdfData(file.path(dataDir, "summaryCategoricalXdf")) +rxImport(inData = summaryCatTXT, + outFile = summaryCatXDF, + overwrite = TRUE) + +summary_temp <- rxDataStep(inData = summaryCatXDF) +total_obs <- summary_temp$count + +# define the frequency threshold +Cvar_Thr <- total_obs * t_percent + +# read the summary info for individual variables into memory +summaryCatTXT <- RxTextData(file.path(dataDir, "summaryCategoricalCleaned"), colInfo = colInfo) +summaryCatXDF <- RxXdfData(file.path(dataDir, "summaryCategoricalXdf")) + +rxImport(inData = summaryCatTXT, + outFile = summaryCatXDF, + rowSelection = (count > CvarThr), + transformObjects = list(CvarThr = Cvar_Thr), + overwrite = TRUE) + +summary_temp <- rxDataStep(inData = summaryCatXDF) + +summary_temp$group <- sapply(summary_temp$group, function(x) gsub("\"", "", x)) + +i <- 1 # column id of the first categorical variable +for (var_name in myvars_C) { + assign(paste0("myset_", var_name), c(as.character(summary_temp[summary_temp$varId == i,1]), 'ffffffff')) + i <- i + 1 +} + +# save the current time +time_get_summary_cat <- proc.time() + +# ---------------------------------------------------------------------------- +# import means for integers +# ---------------------------------------------------------------------------- +colInfoInt <- list(V1 = list(newName = "mean", type="numeric")) +summaryIntTXT <- RxTextData(file.path(dataDir, "summaryInteger"), colInfo = colInfoInt) +summaryIntXDF <- RxXdfData(file.path(dataDir, "summaryIntegerXdf")) + +rxImport(inData = summaryIntTXT, + outFile = summaryIntXDF, + overwrite = TRUE) + +summary_temp_Int <- rxDataStep(inData = summaryIntXDF) + +# ---------------------------------------------------------------------------- +# assign means for integers +# ---------------------------------------------------------------------------- +i <- 1 +for (var in myvars_I){ + mean_v <- summary_temp_Int[i,1] + assign(paste0(var, "_mean"), mean_v) + i <- i + 1 +} + +# ---------------------------------------------------------------------------- +# assign a value in preparation for the log operation +# ---------------------------------------------------------------------------- +add_value <- 100 + +# save the current time +time_get_summary_int <- proc.time() + +# ---------------------------------------------------------------------------- +# import data to XDF and feature engineering +# ---------------------------------------------------------------------------- +newVarInfo <- list(V1 = list(newName = "clicked"), + V2 = list(newName = "I1"), + V3 = list(newName = "I2"), + V4 = list(newName = "I3"), + V5 = list(newName = "I4"), + V6 = list(newName = "I5"), + V7 = list(newName = "I6"), + V8 = list(newName = "I7"), + V9 = list(newName = "I8"), + V10 = list(newName = "I9"), + V11 = list(newName = "I10"), + V12 = list(newName = "I11"), + V13 = list(newName = "I12"), + V14 = list(newName = "I13"), + V15 = list(newName = "C1"), + V16 = list(newName = "C2"), + V17 = list(newName = "C3"), + V18 = list(newName = "C4"), + V19 = list(newName = "C5"), + V20 = list(newName = "C6"), + V21 = list(newName = "C7"), + V22 = list(newName = "C8"), + V23 = list(newName = "C9"), + V24 = list(newName = "C10"), + V25 = list(newName = "C11"), + V26 = list(newName = "C12"), + V27 = list(newName = "C13"), + V28 = list(newName = "C14"), + V29 = list(newName = "C15"), + V30 = list(newName = "C16"), + V31 = list(newName = "C17"), + V32 = list(newName = "C18"), + V33 = list(newName = "C19"), + V34 = list(newName = "C20"), + V35 = list(newName = "C21"), + V36 = list(newName = "C22"), + V37 = list(newName = "C23"), + V38 = list(newName = "C24"), + V39 = list(newName = "C25"), + V40 = list(newName = "C26")) + +mydataCSV <- RxTextData(file.path(dataDir, "CSV"), + colInfo = newVarInfo) +mydataXdf <- RxXdfData(file.path(dataDir, "XDF")) + +rxImport(inData = mydataCSV, outFile = mydataXdf, + transforms = list( + # fill in missing values + I1=(ifelse(is.na(I1), I1_Mean, I1)), + I2=(ifelse(is.na(I2), I2_Mean, I2)), + I3=(ifelse(is.na(I3), I3_Mean, I3)), + I4=(ifelse(is.na(I4), I4_Mean, I4)), + I5=(ifelse(is.na(I5), I5_Mean, I5)), + I6=(ifelse(is.na(I6), I6_Mean, I6)), + I7=(ifelse(is.na(I7), I7_Mean, I7)), + I8=(ifelse(is.na(I8), I8_Mean, I8)), + I9=(ifelse(is.na(I9), I9_Mean, I9)), + I10=(ifelse(is.na(I10), I10_Mean, I10)), + I11=(ifelse(is.na(I11), I11_Mean, I11)), + I12=(ifelse(is.na(I12), I12_Mean, I12)), + I13=(ifelse(is.na(I13), I13_Mean, I13)), + + # get floor to reduce unique values + I1_new = as.integer(floor(log(I1+addvalue)^2)), + I2_new = as.integer(floor(log(I2+addvalue)^2)), + I3_new = as.integer(floor(log(I3+addvalue)^2)), + I4_new = as.integer(floor(log(I4+addvalue)^2)), + I5_new = as.integer(floor(log(I5+addvalue)^2)), + I6_new = as.integer(floor(log(I6+addvalue)^2)), + I7_new = as.integer(floor(log(I7+addvalue)^2)), + I8_new = as.integer(floor(log(I8+addvalue)^2)), + I9_new = as.integer(floor(log(I9+addvalue)^2)), + I10_new = as.integer(floor(log(I10+addvalue)^2)), + I11_new = as.integer(floor(log(I11+addvalue)^2)), + I12_new = as.integer(floor(log(I12+addvalue)^2)), + I13_new = as.integer(floor(log(I13+addvalue)^2)), + + # character variables - set missing value to "ffffffff" + C1_new=(ifelse(is.na(C1), "ffffffff", C1)), + C2_new=(ifelse(is.na(C2), "ffffffff", C2)), + C3_new=(ifelse(is.na(C3), "ffffffff", C3)), + C4_new=(ifelse(is.na(C4), "ffffffff", C4)), + C5_new=(ifelse(is.na(C5), "ffffffff", C5)), + C6_new=(ifelse(is.na(C6), "ffffffff", C6)), + C7_new=(ifelse(is.na(C7), "ffffffff", C7)), + C8_new=(ifelse(is.na(C8), "ffffffff", C8)), + C9_new=(ifelse(is.na(C9), "ffffffff", C9)), + C10_new=(ifelse(is.na(C10), "ffffffff", C10)), + C11_new=(ifelse(is.na(C11), "ffffffff", C11)), + C12_new=(ifelse(is.na(C12), "ffffffff", C12)), + C13_new=(ifelse(is.na(C13), "ffffffff", C13)), + C14_new=(ifelse(is.na(C14), "ffffffff", C14)), + C15_new=(ifelse(is.na(C15), "ffffffff", C15)), + C16_new=(ifelse(is.na(C16), "ffffffff", C16)), + C17_new=(ifelse(is.na(C17), "ffffffff", C17)), + C18_new=(ifelse(is.na(C18), "ffffffff", C18)), + C19_new=(ifelse(is.na(C19), "ffffffff", C19)), + C20_new=(ifelse(is.na(C20), "ffffffff", C20)), + C21_new=(ifelse(is.na(C21), "ffffffff", C21)), + C22_new=(ifelse(is.na(C22), "ffffffff", C22)), + C23_new=(ifelse(is.na(C23), "ffffffff", C23)), + C24_new=(ifelse(is.na(C24), "ffffffff", C24)), + C25_new=(ifelse(is.na(C25), "ffffffff", C25)), + C26_new=(ifelse(is.na(C26), "ffffffff", C26)), + + # replace rare values with "eeeeeeee" + C1_new_norm = (ifelse(C1_new %in% mySetC1, C1_new, "eeeeeeee")), + C2_new_norm = (ifelse(C2_new %in% mySetC2, C2_new, "eeeeeeee")), + C3_new_norm = (ifelse(C3_new %in% mySetC3, C3_new, "eeeeeeee")), + C4_new_norm = (ifelse(C4_new %in% mySetC4, C4_new, "eeeeeeee")), + C5_new_norm = (ifelse(C5_new %in% mySetC5, C5_new, "eeeeeeee")), + C6_new_norm = (ifelse(C6_new %in% mySetC6, C6_new, "eeeeeeee")), + C7_new_norm = (ifelse(C7_new %in% mySetC7, C7_new, "eeeeeeee")), + C8_new_norm = (ifelse(C8_new %in% mySetC8, C8_new, "eeeeeeee")), + C9_new_norm = (ifelse(C9_new %in% mySetC9, C9_new, "eeeeeeee")), + C10_new_norm=(ifelse(C10_new %in% mySetC10, C10_new, "eeeeeeee")), + C11_new_norm=(ifelse(C11_new %in% mySetC11, C11_new, "eeeeeeee")), + C12_new_norm=(ifelse(C12_new %in% mySetC12, C12_new, "eeeeeeee")), + C13_new_norm=(ifelse(C13_new %in% mySetC13, C13_new, "eeeeeeee")), + C14_new_norm=(ifelse(C14_new %in% mySetC14, C14_new, "eeeeeeee")), + C15_new_norm=(ifelse(C15_new %in% mySetC15, C15_new, "eeeeeeee")), + C16_new_norm=(ifelse(C16_new %in% mySetC16, C16_new, "eeeeeeee")), + C17_new_norm=(ifelse(C17_new %in% mySetC17, C17_new, "eeeeeeee")), + C18_new_norm=(ifelse(C18_new %in% mySetC18, C18_new, "eeeeeeee")), + C19_new_norm=(ifelse(C19_new %in% mySetC19, C19_new, "eeeeeeee")), + C20_new_norm=(ifelse(C20_new %in% mySetC20, C20_new, "eeeeeeee")), + C21_new_norm=(ifelse(C21_new %in% mySetC21, C21_new, "eeeeeeee")), + C22_new_norm=(ifelse(C22_new %in% mySetC22, C22_new, "eeeeeeee")), + C23_new_norm=(ifelse(C23_new %in% mySetC23, C23_new, "eeeeeeee")), + C24_new_norm=(ifelse(C24_new %in% mySetC24, C24_new, "eeeeeeee")), + C25_new_norm=(ifelse(C25_new %in% mySetC25, C25_new, "eeeeeeee")), + C26_new_norm=(ifelse(C26_new %in% mySetC26, C26_new, "eeeeeeee")), + + # train validate data + TrainValidate = (ifelse(rbinom(.rxNumRows, + size = 1, + prob = 0.8), + "train", + "validate")) + ), + transformObjects = list(I1_Mean = I1_mean, + I2_Mean = I2_mean, + I3_Mean = I3_mean, + I4_Mean = I4_mean, + I5_Mean = I5_mean, + I6_Mean = I6_mean, + I7_Mean = I7_mean, + I8_Mean = I8_mean, + I9_Mean = I9_mean, + I10_Mean = I10_mean, + I11_Mean = I11_mean, + I12_Mean = I12_mean, + I13_Mean = I13_mean, + + addvalue = add_value, + + mySetC1 = myset_C1, + mySetC2 = myset_C2, + mySetC3 = myset_C3, + mySetC4 = myset_C4, + mySetC5 = myset_C5, + mySetC6 = myset_C6, + mySetC7 = myset_C7, + mySetC8 = myset_C8, + mySetC9 = myset_C9, + mySetC10 = myset_C10, + mySetC11 = myset_C11, + mySetC12 = myset_C12, + mySetC13 = myset_C13, + mySetC14 = myset_C14, + mySetC15 = myset_C15, + mySetC16 = myset_C16, + mySetC17 = myset_C17, + mySetC18 = myset_C18, + mySetC19 = myset_C19, + mySetC20 = myset_C20, + mySetC21 = myset_C21, + mySetC22 = myset_C22, + mySetC23 = myset_C23, + mySetC24 = myset_C24, + mySetC25 = myset_C25, + mySetC26 = myset_C26 + ), + overwrite = TRUE) + +rxGetInfo(mydataXdf, getVarInfo = TRUE, numRows = 5) + +# save the current time +time_feature_engineering <- proc.time() + +# ---------------------------------------------------------------------------- +# create factor variables +# ---------------------------------------------------------------------------- +# write to txt so that columns can be imported as factors +tempCsv <- RxTextData(file.path(dataDir, "CSV_temp")) +rxDataStep(inData = mydataXdf, + outFile = tempCsv, + overwrite = TRUE) + +# import select columns +vars_to_keep <- c("clicked", "I1_new", "I2_new", "I3_new", "I4_new", "I5_new", "I6_new", "I7_new", + "I8_new", "I9_new", "I10_new", "I11_new", "I12_new", "I13_new", + "C1_new_norm", "C2_new_norm", "C3_new_norm", "C4_new_norm", "C5_new_norm", + "C6_new_norm", "C7_new_norm", "C8_new_norm", "C9_new_norm", "C10_new_norm", + "C11_new_norm", "C12_new_norm", "C13_new_norm", "C14_new_norm", "C15_new_norm", + "C16_new_norm", "C17_new_norm", "C18_new_norm", "C19_new_norm", "C20_new_norm", + "C21_new_norm", "C22_new_norm", "C23_new_norm", "C24_new_norm", "C25_new_norm", + "C26_new_norm", "TrainValidate") +tempCsv <- RxTextData(file.path(dataDir, "CSV_temp"), varsToKeep = vars_to_keep) +trainXdf_factor <- RxXdfData(file.path(dataDir, "XdfFactor")) +colClasses <- c("integer", rep("factor", (length(vars_to_keep)-1))) +names(colClasses) <- vars_to_keep +rxImport(inData = tempCsv, + outFile = trainXdf_factor, + colClasses = colClasses, + varsToKeep = vars_to_keep, + overwrite = TRUE) +rxGetInfo(trainXdf_factor, getVarInfo = TRUE, numRows = 5) + +# save the current time +time_create_factors_All <- proc.time() + +# ---------------------------------------------------------------------------- +# separate out the datasets +# ---------------------------------------------------------------------------- +# split out the training data +train_cleaned <- RxXdfData(file.path(dataDir, "finalDataTrain" )) +rxDataStep(inData = trainXdf_factor, + outFile = train_cleaned, + rowSelection = (TrainValidate == 'train'), + overwrite = TRUE) +rxGetInfo(train_cleaned, getVarInfo = TRUE, numRows = 5) + +# split out the validation data +validate_cleaned <- RxXdfData( file.path(dataDir, "finalDataValidate" )) +rxDataStep(inData = trainXdf_factor, + outFile = validate_cleaned, + rowSelection = (TrainValidate =='validate'), + overwrite = TRUE) +rxGetInfo(validate_cleaned, getVarInfo = TRUE, numRows = 5) + +# get count info +total_rows_train <- rxGetInfo(train_cleaned)$numRows +total_rows_validate <- rxGetInfo(validate_cleaned)$numRows + +# save the current time +time_split_data <- proc.time() + +# ---------------------------------------------------------------------------- +# formula for model +# ---------------------------------------------------------------------------- +# for full dataset +myformula_model <- formula(train_cleaned, + depVars = 'clicked', + varsToDrop=c("TrainValidate")) + +# for testing data +# pred_vars <- c("I1_new", "C1_new_norm") +# myformula_model <- as.formula(paste("clicked ~ ", +# paste(pred_vars, collapse= "+"))) + +# ---------------------------------------------------------------------------- +# decision tree +# ---------------------------------------------------------------------------- +# train model +Tree <- rxDTree(myformula_model, + data = train_cleaned, + cp = 0.001, + reportProgress = 2) + +# save the current time +time_train_model_tree <- proc.time() + +# make predictions +evaluateData <- train_cleaned +evaluateData <- validate_cleaned +mypred <- RxXdfData(file.path(dataDir, "treePredict")) + +rxPredict(Tree, evaluateData, mypred, writeModelVars = TRUE, + overwrite = TRUE, predVarNames = c("predicted_clicked")) +rxGetInfo(mypred, getVarInfo = TRUE, numRows = 5) + +# plot ROC +rxRocCurve(actualVarName = "clicked", + predVarNames = "predicted_clicked", + data = mypred) + +# save the plot +dev.copy(png,'../roc_plot_tree.png') +dev.off() + +# calculate AUC +roc_df <- rxRoc(actualVarName = "clicked", + predVarNames = "predicted_clicked", + data = mypred) + +head(roc_df) +rxAuc(roc_df) + +# save the current time +time_prediction_validate_tree <- proc.time() + +# save the model +save(Tree, file = "../dTreeModel.RData") + +# ---------------------------------------------------------------------------- +# calculate time and save +# ---------------------------------------------------------------------------- +t_total <- time_prediction_validate_tree - time_start +t_read_data <- (time_read_data - time_start)[3] +t_get_summary_cat <- (time_get_summary_cat - time_read_data)[3] +t_get_summary_int <- (time_get_summary_int - time_get_summary_cat)[3] +t_feature_engineering <- (time_feature_engineering - time_get_summary_int)[3] +t_create_factors_All <- (time_create_factors_All - time_feature_engineering)[3] +t_split_data <- (time_split_data - time_create_factors_All)[3] +t_train_model_tree <- (time_train_model_tree - time_split_data)[3] +t_prediction_validate_tree <- (time_prediction_validate_tree - time_train_model_tree)[3] + +Item <- c("total_rows_train", + "total_rows_validate", + "time_user", + "time_system", + "time_elapsed", + "time_read_data", + "time_get_summary_cat", + "time_get_summary_int", + "time_feature_engineering", + "time_create_factors_All", + "time_split_data", + "time_train_model_tree", + "time_prediction_validate_tree" +) + +Number <- c(total_rows_train, + total_rows_validate, + t_total[1], + t_total[2], + t_total[3], + t_read_data, + t_get_summary_cat, + t_get_summary_int, + t_feature_engineering, + t_create_factors_All, + t_split_data, + t_train_model_tree, + t_prediction_validate_tree +) + +total_time_df <- as.data.frame(cbind(Item = Item, Number = Number)) +write.csv(total_time_df, "time_log_mrs_only_no_rxSummary_rxFactors.csv", row.names = FALSE) diff --git a/RServerCode/CriteoMainCall.R b/RServerCode/CriteoMainCall.R new file mode 100644 index 0000000..1790527 --- /dev/null +++ b/RServerCode/CriteoMainCall.R @@ -0,0 +1,24 @@ +# ---------------------------------------------------------------------------- +# this is the script for calling the main code +# ---------------------------------------------------------------------------- +# set the working directory +# setwd("F:/CriteoSummaries") # local +setwd("/home/lixzhang/lixun/Test") # on Spark + +wd <- getwd() +console_output <- "console_output.log" +setwd(wd) +con <- file(console_output) +sink(con, append=TRUE) # this line is required for initiating the file +sink(con, append=TRUE, type="message") + +# echo all input and not truncate 150+ character lines... +source("CriteoMain.R", echo=TRUE, max.deparse.length=10000) + +# restore output to console +sink() # this line is required to avoid duplicating output +sink(type="message") + +# look at the log +setwd(wd) # return to the correct directory for test.log +cat(readLines(console_output), sep="\n") diff --git a/RServerCode/SetComputeContext.R b/RServerCode/SetComputeContext.R new file mode 100644 index 0000000..c081a86 --- /dev/null +++ b/RServerCode/SetComputeContext.R @@ -0,0 +1,58 @@ +# ---------------------------------------------------------------------------- +# set compute context +# ---------------------------------------------------------------------------- +# check the system +isLinux <- Sys.info()["sysname"] == "Linux" + +useHDFS <- isLinux +useRxSpark <- isLinux + +# change this depending on your cluster info +if (Sys.info()["nodename"] == "ed00-lixun2") { + rxOptions(hdfsHost = "wasb://lixun2sparkcontainer@lixunsparkstorage.blob.core.windows.net") +} + +# change the value of dataDir to point to the correct folder +if (useHDFS) { + # use Hadoop-compatible Distributed File System + rxOptions(fileSystem = RxHdfsFileSystem()) + + dataDir <- "/lixun/CriteoAzure" +} else { + # use Native, Local File System + rxOptions(fileSystem = RxNativeFileSystem()) + dataDir <- file.path(getwd(), "") +} + +if (useRxSpark) { + # distributed computing using Spark + computeContext <- RxSpark(executorCores = 1, + executorMem = "10g", + executorOverheadMem = "10g", + consoleOutput = TRUE) +} else { + computeContext <- RxLocalSeq() +} + +rxSetComputeContext(computeContext) + +# ---------------------------------------------------------------------------- +# define some utility functions because they work only under the local compute context +# ---------------------------------------------------------------------------- +rxRocCurve <- function(...){ + rxSetComputeContext(RxLocalSeq()) + + rxRocCurve <- RevoScaleR::rxRocCurve(...) + + rxSetComputeContext(computeContext) +} + +rxRoc <- function(...){ + rxSetComputeContext(RxLocalSeq()) + + roc <- RevoScaleR::rxRoc(...) + + rxSetComputeContext(computeContext) + + return(roc) +}