speed up production, deployment

This commit is contained in:
sgilley 2017-07-20 17:06:22 -05:00
Родитель c752d6c819
Коммит 58de0cd17c
3 изменённых файлов: 65 добавлений и 22 удалений

Просмотреть файл

@ -30,6 +30,12 @@ remoteLogin(
session = FALSE
)
# Grant additional permissions on HDFS and the edge node.
system("hadoop fs -mkdir /user/RevoShare/rserve2")
system("hadoop fs -chmod 777 /user/RevoShare/rserve2")
dir.create("/var/RevoShare/rserve2", recursive = TRUE)
system("sudo chmod 777 /var/RevoShare/rserve2")
##########################################################################################################################################
## Directories
##########################################################################################################################################
@ -197,16 +203,6 @@ Borrower_str <- "/Loans/Data/Borrower_Prod.csv"
Loan_df <- rxImport(RxTextData(file = Loan_str, fileSystem = RxHdfsFileSystem()), stringsAsFactors = T)
Borrower_df <- rxImport(RxTextData(file = Borrower_str, fileSystem = RxHdfsFileSystem()), stringsAsFactors = T)
# Verify the string input case.
result_string <- api_string$loan_web_scoring(
Loan = Loan_str,
Borrower = Borrower_str,
LocalWorkDir = LocalWorkDir,
HDFSWorkDir = HDFSWorkDir,
Stage = "Web",
Username = Sys.info()[["user"]]
)
# Verify the data frame input case.
result_frame <- api_frame$loan_web_scoring(
Loan = Loan_df,
@ -220,4 +216,21 @@ result_frame <- api_frame$loan_web_scoring(
## To get the data frame result in a readable format:
rows_number <- length(result_frame$outputParameters$answer$badRate)
Scores <- data.frame(matrix(unlist(result_frame$outputParameters$answer), nrow = rows_number), stringsAsFactors = F)
colnames(Scores) <- names(result_frame$outputParameters$answer)
colnames(Scores) <- names(result_frame$outputParameters$answer)
# Verify the string input case.
## This alternative is slow and should only be used if the data set to score is too large to fit in memory.
#result_string <- api_string$loan_web_scoring(
# Loan = Loan_str,
# Borrower = Borrower_str,
# LocalWorkDir = LocalWorkDir,
# HDFSWorkDir = HDFSWorkDir,
# Stage = "Web",
# Username = Sys.info()[["user"]]
#)
# NOTE: If the api_string takes a very long time to run (> 15 minutes), you can try to kill all the YARN applications first.
# To do so, look for all the currently running YARN applications by running:
## system("yarn application -list")
# Then kill each one of the applications. For example, if you see application_1498842980780_0027, run:
## system("yarn application -kill application_1498842980780_0027")

Просмотреть файл

@ -62,6 +62,10 @@ loan_prod <- function(Loan,
# Directory that holds the tables and model from the Development stage.
LocalModelsDir <- file.path(LocalWorkDir, "model")
# Intermediate directories creation.
print("Creating Intermediate Directories on Local and HDFS...")
source(paste(getwd(),"/step0_directories_creation.R", sep =""))
if((class(Loan) == "data.frame") & (class(Borrower) == "data.frame")){ # In-memory scoring.
source(paste(getwd(),"/in_memory_scoring.R", sep =""))
print("Scoring in-memory...")
@ -69,10 +73,6 @@ loan_prod <- function(Loan,
} else{ # Using Spark for scoring.
# step0: intermediate directories creation.
print("Creating Intermediate Directories on Local and HDFS...")
source(paste(getwd(),"/step0_directories_creation.R", sep =""))
# step1: data processing
source(paste(getwd(),"/step1_preprocessing.R", sep =""))
print("Step 1: Data Processing.")
@ -121,12 +121,32 @@ loan_prod <- function(Loan,
## Apply the main function
##############################################################################################################################
# Case 1: Input are paths to csv files. Scoring using Spark.
scores_directory <- loan_prod (Loan_str, Borrower_str, LocalWorkDir, HDFSWorkDir, Stage = "Prod")
# Case 1: Input are data frames. Scoring is performed in-memory.
Scores <- loan_prod (Loan_df, Borrower_df, LocalWorkDir, HDFSWorkDir, Stage = "Prod")
# Write the Merged and Scores to a Hive table for visualizations in PowerBI.
## The 2 data frames should be converted to xdf first.
rxSetComputeContext('local')
Merged <- rxMerge(Loan_df, Borrower_df, type = "inner", matchVars = "memberId")
Scores_xdf <- RxXdfData(file.path(HDFSWorkDir,"temp", "ScoresPBI"), fileSystem = RxHdfsFileSystem(), createCompositeSet = T)
Merged_xdf <- RxXdfData(file.path(HDFSWorkDir,"temp", "MergedPBI"), fileSystem = RxHdfsFileSystem(), createCompositeSet = T)
rxDataStep(inData = Scores, outFile = Scores_xdf, overwrite = TRUE)
rxDataStep(inData = Merged, outFile = Merged_xdf, overwrite = TRUE)
## The xdf files are then converted to Hive tables.
rxSparkConnect(consoleOutput = TRUE, reset = FALSE)
ScoresData_hive <- RxHiveData(table = "ScoresData_Prod")
Merged_hive <- RxHiveData(table = "Merged_Prod")
rxDataStep(inData = Scores_xdf, outFile = ScoresData_hive, overwrite = TRUE)
rxDataStep(inData = Merged_xdf, outFile = Merged_hive, overwrite = TRUE)
# Case 2: Input are paths to csv files. Scoring using Spark.
## This alternative is slow and should only be used if the data set to score is too large to fit in memory.
# scores_directory <- loan_prod (Loan_str, Borrower_str, LocalWorkDir, HDFSWorkDir, Stage = "Prod")
# Warning: in case you get the following error: "Error: file.exists(inData1) is not TRUE",
# you should reset your R session with Ctrl + Shift + F10 (or Session -> Restart R) and try running it again.
# Case 2: Input are data frames. Scoring is performed in-memory.
Scores <- loan_prod (Loan_df, Borrower_df, LocalWorkDir, HDFSWorkDir, Stage = "Prod")

Просмотреть файл

@ -31,6 +31,16 @@ data_preprocess <- function(Loan,
# Define the directory where summary statistics will be saved in the Development stage or loaded from in Production.
LocalModelsDir <- file.path(LocalWorkDir, "model")
# For the Production or Web-Scoring stages, in order to avoid overwriting hive tables from the Development stage,
# we will add the suffix Prod to the table names. This is encoded in the variable hive_name that will be
## an empty string for Dev
## "Prod" for Prod or Web.
if(Stage == "Dev"){
hive_name <- ""
}else{
hive_name <- "_Prod"
}
##############################################################################################################################
## The block below will convert the data format to xdf in order to increase the efficiency of rx functions.
##############################################################################################################################
@ -82,7 +92,7 @@ data_preprocess <- function(Loan,
colInfo$incomeVerified$type <- "factor"
colInfo$incomeVerified$levels <- c("0", "1")
Merged_hive <- RxHiveData(table = "Merged", colInfo = colInfo)
Merged_hive <- RxHiveData(table = sprintf("Merged%s", hive_name), colInfo = colInfo)
# Merge Loan and Borrower on memberId.
rxMerge(inData1 = Loan_xdf,