updates for HDI Spark solution (#11)

This commit is contained in:
Sheri Gilley 2017-06-30 08:54:14 -05:00 коммит произвёл GitHub
Родитель 6ae0e4833b
Коммит ca58a88628
13 изменённых файлов: 2027 добавлений и 0 удалений

Двоичные данные
LoanCreditRisk HDI.pbix Normal file

Двоичный файл не отображается.

Просмотреть файл

@ -0,0 +1,33 @@
##########################################################################################################################################
## This R script will define a function, copy_to_prod, that:
## 1. Cleans up an already existing directory or create it on the edge node, ProdModelDir.
## 2. Copies to that directory: Summary statistics, Bins, Logistic Regression model, etc. from the Development directory.
## Input : DevModelDir: Path to the directory on the edge node storing the summary statistics, bins, model, etc.
## ProdModelDir: Path o the directory where the contents of DevModelDir should be copied.
## Output: ProdModelDir with data trasferred from DevModelDir.
## It should be applied:
## a) If running the Production stage for the first time.
## b) If you want to run the Production stage with a newly trained model; the older one will be overwritten.
##########################################################################################################################################
copy_dev_to_prod <- function(DevModelDir, ProdModelDir){
# Clean or create a new directory in the Prodution directory.
if(dir.exists(ProdModelDir)){
system(paste("rm -rf ", ProdModelDir, sep="")) # remove the directory if exists
system(paste("mkdir -p -m 777 ", ProdModelDir, sep="")) # create a new directory
} else {
system(paste("mkdir -p -m 777 ", ProdModelDir, sep="")) # make new directory if doesn't exist
}
# Copy the model, statistics and other data from the Development directory to the Production directory.
system(paste("cp ", DevModelDir, "/*.rds ", ProdModelDir, sep = ""))
}

Просмотреть файл

@ -0,0 +1,272 @@
##########################################################################################################################################
## Declare the number of unique loans and members.
##########################################################################################################################################
# Let's assume each member applied to only one loan.
no_of_unique_id <- 1000000
##########################################################################################################################################
## Generate loan_id and member_id.
##########################################################################################################################################
loanId <- sample(x = seq(no_of_unique_id, (no_of_unique_id + 9*no_of_unique_id )), size = no_of_unique_id , replace = FALSE, prob = NULL)
memberId <- sample(x = seq(2*no_of_unique_id, (2*no_of_unique_id + 9*no_of_unique_id )), size = no_of_unique_id , replace = FALSE, prob = NULL)
LB <- data.frame(loanId = loanId, memberId = memberId)
##########################################################################################################################################
## Generate is_bad variable randomly with prior probability 0.9 for is_bad = 0.
##########################################################################################################################################
LB$isBad <- sample(c("0","1"), no_of_unique_id, replace = T, prob = c(0.9, 0.1))
##########################################################################################################################################
## Generate some variables independently of the label.
##########################################################################################################################################
# Term and application type.
LB$term <- sample(c("36 months","48 months","60 months"), no_of_unique_id, replace = T, prob = c(0.33, 0.33, 0.34))
LB$isJointApplication <- sample(c("0", "1"), no_of_unique_id, replace = T, prob = c(0.95, 0.05))
# Total accounts and open accounts: open should be <= total.
LB$numTotalCreditLines <- round(rnorm(no_of_unique_id , mean = 15, sd = 4), digits = 0)
LB$numTotalCreditLines <- ifelse(LB$numTotalCreditLines < 1, 1,
ifelse(LB$numTotalCreditLines > 35, 35, LB$numTotalCreditLines))
LB$numOpenCreditLines <- pmax(1, LB$numTotalCreditLines - round(runif(no_of_unique_id, min = 0, max = LB$numTotalCreditLines/2)))
# Address (without Hawaii HI, Alaska AK and Puerto Rico PR).
LB$residentialState <- sample(c("AL", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FL", "GA", "ID", "IL", "IN", "IA",
"KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
"NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT",
"VA", "WA", "WV", "WI", "WY"),
no_of_unique_id, replace = T,
prob = c(rep(0.4/49, 3), 3.4/49, rep(0.4/49, 3), 2.5/49, 3.2/49, 0.4/49, rep(0.955/49, 20),
3/49, rep(0.5/49, 4), rep(1/49, 6), 2/49, rep(0.5/49, 3), 2/49, rep(0.5/49, 3))
)
## We will not provide a zip_code variable.
# Issue Date.
LB$date <- format(sample(seq(ISOdate(2014, 5, 13), ISOdate(2016, 6, 30), by = "day"), no_of_unique_id, replace = T),"%Y-%m-%d")
# Variables left to generate based on the label:
continuous <- c("loanAmount", "interestRate", "annualIncome", "dtiRatio", "revolvingBalance", "revolvingUtilizationRate")
integer <- c("numDelinquency2Years", "numDerogatoryRec", "numChargeoff1year", "numInquiries6Mon", "lengthCreditHistory",
"numOpenCreditLines1Year")
character <- c("grade", "purpose", "yearsEmployment", "homeOwnership", "incomeVerified")
## monthlyPayment and loanStatus will be created at the end:
### monthlyPayment: using a formula based on interest rate, loan amount and term.
### loanStatus: based on isBad and then we remove is_bad.
# Split in 2 to sample conditionally on isBad.
LB0 <- LB[LB$isBad == "0", ]
n0 <- nrow(LB0)
LB1 <- LB[LB$isBad == "1", ]
n1 <- nrow(LB1)
##########################################################################################################################################
## CHARACTER: Conditional probabilities for variables based on is_bad.
##########################################################################################################################################
character <- c("grade", "purpose", "yearsEmployment", "homeOwnership", "incomeVerified")
# Probabilities conditional on labels 0 and 1.
grade_list <- c("A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3", "D1", "D2", "D3", "E1", "E2", "E3")
grade_p0 <- c(1.8, 1.7, 1.7, 1.4, 1.3, 1.3, 1.1, 1, 0.8, 0.6, 0.6, 0.5, 0.4, 0.4, 0.4)/15
grade_p1 <- rev(grade_p0)
purpose_list <- c("debtconsolidation", "healthcare", "education", "business", "auto", "homeimprovement", "other")
purpose_p0 <- c(0.82, 0.01, 0.01, 0.03, 0.01, 0.08, 0.04)
purpose_p1 <- c(0.78, 0.03, 0.01, 0.04, 0.01, 0.09, 0.04)
yearsEmployment_list <- c("< 1 year", "1 year", "2-5 years", "6-9 years", "10+ years")
yearsEmployment_p0 <- c(0.19, 0.19, 0.19, 0.20, 0.23)
yearsEmployment_p1 <- rev(yearsEmployment_p0)
homeOwnership_list <- c("own", "rent", "mortgage")
homeOwnership_p0 <- c(0.31, 0.33, 0.36)
homeOwnership_p1 <- c(0.30, 0.32, 0.38)
incomeVerified_list <- c("0", "1")
incomeVerified_p0 <- c(0.32, 0.68)
incomeVerified_p1 <- c(0.23, 0.67)
# Generate the variables.
for (name in character){
LB0[,name] <- sample(get(paste(name, "_list", sep = "")), n0, replace = T, prob = get(paste(name, "_p0", sep = "")))
LB1[,name] <- sample(get(paste(name, "_list", sep = "")), n1, replace = T, prob = get(paste(name, "_p1", sep = "")))
}
##########################################################################################################################################
## INTEGER: Conditional probabilities for variables based on is_bad.
##########################################################################################################################################
integer <- c("numDelinquency2Years", "numDerogatoryRec", "numChargeoff1year", "numInquiries6Mon", "lengthCreditHistory",
"numOpenCreditLines1Year")
# Some variables are directly related:
## Assuming that a chargeoff is a delinquency, we should have "numChargeoff1year" <= "numDelinquency2Years"
## "numOpenCreditLines1Year" should be <= "numOpenCreditLines".
integer0 <- c("numDelinquency2Years", "numDerogatoryRec", "numInquiries6Mon", "lengthCreditHistory")
numDelinquency2Years_list <- seq(0, 20)
numDelinquency2Years_p0 <- c(0.75, 0.08, 0.02, 0.02, rep(0.01, 7), rep(0.006, 10))
numDelinquency2Years_p1 <- c(0.71, 0.09, 0.03, 0.026, rep(0.012, 7), rep(0.006, 10))
numDerogatoryRec_list <- seq(0, 15)
numDerogatoryRec_p0 <- c(0.82, 0.07, 0.018, rep(0.009, 8), rep(0.004, 5))
numDerogatoryRec_p1 <- c(0.79, 0.09, 0.02, rep(0.01, 8), rep(0.004, 5))
numInquiries6Mon_list <- seq(0, 19)
numInquiries6Mon_p0 <- c(0.55, 0.25, 0.09, 0.031, rep(0.005, 15), 0.004)
numInquiries6Mon_p1 <- c(0.45, 0.30, 0.12, 0.037, rep(0.006, 15), 0.003)
lengthCreditHistory_list <- seq(1, 40)
lengthCreditHistory_p0 <- c(0.08, 0.08, 0.08, 0.08, 0.09, 0.09, 0.05, 0.05, 0.05, 0.05, rep(0.01, 30))
lengthCreditHistory_p1 <- c(0.09, 0.09, 0.10, 0.09, 0.10, 0.10, 0.06, 0.05, 0.04, 0.04, rep(0.008, 30))
## Generate the first set of variables.
for (name in integer0){
LB0[,name] <- sample(get(paste(name, "_list", sep = "")), n0, replace = T, prob = get(paste(name, "_p0", sep = "")))
LB1[,name] <- sample(get(paste(name, "_list", sep = "")), n1, replace = T, prob = get(paste(name, "_p1", sep = "")))
}
## Generate the dependent variables.
LB0$numChargeoff1year <- pmax(0, LB0$numDelinquency2Years - round(runif(n0, min = 0, max = LB0$numDelinquency2Years/2)))
LB1$numChargeoff1year <- pmax(0, LB1$numDelinquency2Years - round(runif(n1, min = 0, max = LB1$numDelinquency2Years/2)))
LB0$numOpenCreditLines1Year <- pmax(1, LB0$numOpenCreditLines - round(runif(n0, min = 0, max = LB0$numOpenCreditLines/1.5)))
LB1$numOpenCreditLines1Year <- pmax(1, LB1$numOpenCreditLines - round(runif(n1, min = 0, max = LB1$numOpenCreditLines/2)))
##########################################################################################################################################
## CONTINUOUS: Distributions with means dependent on the label isBad.
##########################################################################################################################################
continuous <- c("loanAmount", "interestRate", "annualIncome", "dtiRatio", "revolvingBalance", "revolvingUtilizationRate")
# loanAmount. We first generate it as a normal dist based on is_bad, then we increase it for some States.
LB0$loanAmount <- round(rnorm(n0, mean = 20000, sd = 4500), digits = 0)
LB1$loanAmount <- round(rnorm(n1, mean = 22000, sd = 4500), digits = 0)
LB0$loanAmount <- ifelse(LB0$loanAmount <= 1000, 1000, LB0$loanAmount)
LB1$loanAmount <- ifelse(LB1$loanAmount <= 1000, 1000, LB1$loanAmount)
LB0$loanAmount <- ifelse(LB0$residentialState == "CA", LB0$loanAmount + round(runif(n0, min = 0, max = 8000)),
ifelse (LB0$residentialState == "NY", LB0$loanAmount + round(runif(n0, min = 0, max = 5000)),
ifelse(LB0$residentialState == "FL", LB0$loanAmount + round(runif(n0, min = 0, max = 2000)),
ifelse(LB0$residentialState == "TX", LB0$loanAmount + round(runif(n0, min = 0, max = 1000)),
LB0$loanAmount))))
LB1$loanAmount <- ifelse(LB1$residentialState == "CA", LB1$loanAmount + round(runif(n1, min = 0, max = 10000)),
ifelse (LB1$residentialState == "NY", LB1$loanAmount + round(runif(n1, min = 0, max = 6000)),
ifelse(LB1$residentialState == "FL", LB1$loanAmount + round(runif(n1, min = 0, max = 3000)),
ifelse(LB1$residentialState == "TX", LB1$loanAmount + round(runif(n1, min = 0, max = 2000)),
LB1$loanAmount))))
# interestRate.
LB0$interestRate <- round(4 + 20*rbeta(n0, 2, 4), digits = 2)
LB1$interestRate <- round(4 + 30*rbeta(n1, 2, 4), digits = 2)
# annualIncome.
LB0$annualIncome <- round(rnorm(n0, mean = 55000, sd = 3000), digits = 0)
LB1$annualIncome <- round(rnorm(n1, mean = 52000, sd = 3500), digits = 0)
LB0$annualIncome <- ifelse(LB0$annualIncome < 13000, 13000, LB0$annualIncome)
LB1$annualIncome <- ifelse(LB1$annualIncome < 13000, 13000, LB1$annualIncome)
## dtiRatio cannot be computed directly from data, so we can generate it only based on is_bad.
LB0$dtiRatio <- round(rnorm(n0, mean = 17, sd = 5), digits = 2)
LB1$dtiRatio <- round(rnorm(n1, mean = 20, sd = 5), digits = 2)
LB0$dtiRatio <- ifelse(LB0$dtiRatio < 0, 0, LB0$dtiRatio)
LB1$dtiRatio <- ifelse(LB1$dtiRatio < 0, 0, LB1$dtiRatio)
## revolvingBalance and revolvingUtilizationRate are correlated (0.21 on real data): the higher the balance and the higher the utilization rate.
LB0$revolvingBalance <- round(rnorm(n0, mean = 15000, sd = 2500), digits = 0)
LB1$revolvingBalance <- round(rnorm(n1, mean = 13500, sd = 2250), digits = 0)
LB0$revolvingBalance <- ifelse(LB0$revolvingBalance < 0, 0, LB0$revolvingBalance)
LB1$revolvingBalance <- ifelse(LB1$revolvingBalance < 0, 0, LB1$revolvingBalance)
LB0$revolvingUtilizationRate <- ifelse(LB0$revolvingBalance == 0, 0,
ifelse(LB0$revolvingBalance <= 10000, round(rnorm(n0, mean = 45, sd = 5), digits = 2),
ifelse(LB0$revolvingBalance <= 20000, round(rnorm(n0, mean = 65, sd = 15), digits = 2),
round(rnorm(n0, mean = 75, sd = 25), digits = 2))))
LB1$revolvingUtilizationRate <- ifelse(LB1$revolvingBalance == 0, 0,
ifelse(LB1$revolvingBalance <= 8000, round(rnorm(n1, mean = 48, sd = 5), digits = 2),
ifelse(LB1$revolvingBalance <= 18000, round(rnorm(n1, mean = 69, sd = 15), digits = 2),
round(rnorm(n1, mean = 77, sd = 25), digits = 2))))
LB0$revolvingUtilizationRate <- ifelse(LB0$revolvingUtilizationRate < 0, 0,
ifelse(LB0$revolvingUtilizationRate > 100, 100,
LB0$revolvingUtilizationRate))
LB1$revolvingUtilizationRate <- ifelse(LB1$revolvingUtilizationRate < 0, 0,
ifelse(LB1$revolvingUtilizationRate > 100, 100,
LB1$revolvingUtilizationRate))
##########################################################################################################################################
## Binding LB0 and LB1 + Additional Perturbations and Modifications.
##########################################################################################################################################
# Bind and then shuffle rows.
LB <- rbind(LB0, LB1)
LB <- LB[sample(nrow(LB), nrow(LB), replace = F), ]
# Generate loanStatus based on isBad.
LB$loanStatus <- ifelse(LB$isBad == "0", "Current", "Default")
# Generate monthlyPayment based on interest rate, loan amount and term.
LB$term2 <- as.character(LB$term)
LB$term2 <- as.numeric(gsub("months", "", LB$term2))
LB$interestRate2 <- LB$interestRate/(12*100)
LB$monthlyPayment <- round(LB$loanAmount*(LB$interestRate2)/(1 - (1 + LB$interestRate2)**(-LB$term2)), digits = 0)
# Add missing values.
LB$loanAmount <- ifelse(sample(c(1, 2), no_of_unique_id, replace = T, prob = c(0.99, 0.01)) == 1, LB$loanAmount, "")
LB$term <- ifelse(sample(c(1, 2), no_of_unique_id, replace = T, prob = c(0.99, 0.01)) == 1, LB$term, "")
LB$isJointApplication <- ifelse(sample(c(1, 2), no_of_unique_id, replace = T, prob = c(0.99, 0.01)) == 1, LB$isJointApplication, "")
LB$numOpenCreditLines <- ifelse(sample(c(1, 2), no_of_unique_id, replace = T, prob = c(0.99, 0.01)) == 1, LB$numOpenCreditLines, "")
##########################################################################################################################################
## Separate in Loan and Borrower and write to disk on the edge node.
##########################################################################################################################################
colnames_loan <- c("loanId", "memberId", "date", "purpose", "isJointApplication",
"loanAmount", "term", "interestRate", "monthlyPayment", "grade", "loanStatus" )
colnames_borrower <- c("memberId", "residentialState", "yearsEmployment", "homeOwnership", "annualIncome", "incomeVerified",
"dtiRatio", "lengthCreditHistory", "numTotalCreditLines", "numOpenCreditLines", "numOpenCreditLines1Year",
"revolvingBalance", "revolvingUtilizationRate", "numDerogatoryRec", "numDelinquency2Years",
"numChargeoff1year", "numInquiries6Mon")
Loan <- LB[, colnames_loan]
Borrower <- LB[, colnames_borrower]
write.csv(Loan, file = "Loan.csv", row.names = FALSE , quote = FALSE, na = "")
write.csv(Borrower, file = "Borrower.csv", row.names = FALSE , quote = FALSE, na = "")
##########################################################################################################################################
## Copy the data to HDFS.
##########################################################################################################################################
source = paste0(getwd(), "/*.csv");
DataDir = "/Loans/Data"
# Copy the data from the edge node to HDFS.
rxHadoopCopyFromLocal(source, DataDir)
# Remove local files.
file.remove("Loan.csv")
file.remove("Borrower.csv")
# Clean up the environment.
rm(list = ls())

Просмотреть файл

@ -0,0 +1,223 @@
##########################################################################################################################################
## This R script will do the following:
## 1. Remote login to the edge node for authentication purpose.
## 2. Load model related files as a list which will be used when publishing web service.
## 3. Define the main web scoring function.
## 4. Publish the web service.
## 3. Verify the webservice locally.
## Input : 1. Full path of the two input tables on HDFS (for processing with Spark)
## OR the two tables as data frames (for in-memory processing).
## 2. Working directories on local edge node and HDFS.
## 3. Stage: "Web" for scoring remotely with web service.
## Output: The directory on HDFS which contains the Scores (Spark version) or The Scores table (in-memory version).
##########################################################################################################################################
##############################################################################################################################
## Setup
##############################################################################################################################
# Load mrsdeploy package.
library(mrsdeploy)
# Remote login for authentication purpose.
## This would only work if the edge node was configured to host web services.
remoteLogin(
"http://localhost:12800",
username = "admin",
password = "XXYOURSQLPW",
session = FALSE
)
##########################################################################################################################################
## Directories
##########################################################################################################################################
# Local (edge node) working directory. We assume it already exists.
LocalWorkDir <- paste("/var/RevoShare/", Sys.info()[["user"]], "/LoanCreditRisk/prod", sep="")
#dir.create(LocalWorkDir, recursive = TRUE)
# HDFS directory for user calculation. We assume it already exists.
HDFSWorkDir <- paste("/",Sys.info()[["user"]],"/LoanCreditRisk/prod", sep="")
#rxHadoopMakeDir(HDFSWorkDir)
# Local directory holding data and model from the Development Stage.
ProdModelDir <- paste(LocalWorkDir, "/model", sep ="")
##########################################################################################################################################
## Load data from the Development stage.
##########################################################################################################################################
# Load .rds files saved from the Development stage and that will be used for web-scoring.
## Numeric_Means and Categorical_Modes: global means and modes of the dev data, for missing values replacement.
## bins: list of cutoffs to bucketize numeric variables.
## column_factor_info: factor variables and their levels in the dev data set.
## logistic_model: logistic regression model trained in the dev stage.
## dev_test_avg_score: average score on the dev testing set; used for score transformation.
## Operational_Metrics: scores mapping (percentiles, cutoffs and expected bad rates).
Numeric_Means <- readRDS(file.path(ProdModelDir,"/Numeric_Means.rds"))
Categorical_Modes <- readRDS(file.path(ProdModelDir,"/Categorical_Modes.rds"))
bins <- readRDS(file.path(ProdModelDir,"/bins.rds"))
column_factor_info <- readRDS(file.path(ProdModelDir,"/column_factor_info.rds"))
logistic_model <- readRDS(file.path(ProdModelDir,"/logistic_model.rds"))
dev_test_avg_score <- readRDS(file.path(ProdModelDir,"/dev_test_avg_score.rds"))
Operational_Metrics <- readRDS(file.path(ProdModelDir,"/Operational_Metrics.rds"))
# They are packed in a list to be published along with the scoring function.
model_objects <- list(Numeric_Means = Numeric_Means,
Categorical_Modes = Categorical_Modes,
bins = bins,
column_factor_info = column_factor_info,
logistic_model = logistic_model,
dev_test_avg_score = dev_test_avg_score,
Operational_Metrics = Operational_Metrics)
##############################################################################################################################
## Define main function
##############################################################################################################################
## If Loan and Borrower are data frames, the web scoring is done in_memory.
## Use paths to csv files on HDFS for large data sets that do not fit in-memory.
loan_web_scoring <- function(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage = "Web",
Username = Sys.info()[["user"]])
{
if((class(Loan) == "data.frame") & (class(Borrower) == "data.frame")){ # In-memory scoring.
source(paste("/home/", Username,"/in_memory_scoring.R", sep=""))
print("Scoring in-memory...")
return(in_memory_scoring(Loan, Borrower, Stage = Stage))
} else{ # Using Spark for scoring.
library(RevoScaleR)
rxSparkConnect(consoleOutput = TRUE, reset = TRUE)
# step0: intermediate directories creation.
print("Creating Intermediate Directories on Local and HDFS...")
source(paste("/home/", Username,"/step0_directories_creation.R", sep=""))
# step1: data processing
source(paste("/home/", Username,"/step1_preprocessing.R", sep=""))
print("Step 1: Data Processing.")
data_preprocess(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage = Stage)
# step2: feature engineering
source(paste("/home/", Username,"/step2_feature_engineering.R", sep=""))
print("Step 2: Feature Engineering.")
## splitting_ratio is not used in this stage.
feature_engineer(LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage = Stage)
# step3: making predictions.
source(paste("/home/", Username,"/step3_train_score_evaluate.R", sep=""))
print("Step 3: Making Predictions.")
## splitting_ratio is not used in this stage.
training_evaluation (LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage = Stage)
# Step 4: scores transformation.
source(paste("/home/", Username,"/step4_operational_metrics.R", sep=""))
print("Step 4: Scores Transformation.")
## Transform the scores using the computed thresholds.
apply_score_transformation (LocalWorkDir,
HDFSWorkDir,
Stage = Stage)
# Return the directory storing the final scores.
return(file.path(HDFSWorkDir,"temp", "Scores"))
}
}
##############################################################################################################################
## Publish as a Web Service
##############################################################################################################################
# Specify the version of the web service
version <- "v1.2.287"
# Publish the api for the character input case (ie. Loan and Borrower are data paths.)
api_string <- publishService(
"loan_scoring_string_input",
code = loan_web_scoring,
model = model_objects,
inputs = list(Loan = "character",
Borrower = "character",
LocalWorkDir = "character",
HDFSWorkDir = "character",
Stage = "character",
Username = "character"),
outputs = list(answer = "character"),
v = version
)
# Publish the api for the data frame input case (ie. Web scoring is done in-memory.)
api_frame <- publishService(
"loan_scoring_dframe_input",
code = loan_web_scoring,
model = model_objects,
inputs = list(Loan = "data.frame",
Borrower = "data.frame",
LocalWorkDir = "character",
HDFSWorkDir = "character",
Stage = "character",
Username = "character"),
outputs = list(answer = "data.frame"),
v = version
)
##############################################################################################################################
## Verify The Published API
##############################################################################################################################
# Specify the full path of input .csv files on HDFS
Loan_str <- "/Loans/Data/Loan_Prod.csv"
Borrower_str <- "/Loans/Data/Borrower_Prod.csv"
# Import the .csv files as data frame.
Loan_df <- rxImport(RxTextData(file = Loan_str, fileSystem = RxHdfsFileSystem()), stringsAsFactors = T)
Borrower_df <- rxImport(RxTextData(file = Borrower_str, fileSystem = RxHdfsFileSystem()), stringsAsFactors = T)
# Verify the string input case.
result_string <- api_string$loan_web_scoring(
Loan = Loan_str,
Borrower = Borrower_str,
LocalWorkDir = LocalWorkDir,
HDFSWorkDir = HDFSWorkDir,
Stage = "Web",
Username = Sys.info()[["user"]]
)
# Verify the data frame input case.
result_frame <- api_frame$loan_web_scoring(
Loan = Loan_df,
Borrower = Borrower_df,
LocalWorkDir = LocalWorkDir,
HDFSWorkDir = HDFSWorkDir,
Stage = "Web",
Username = Sys.info()[["user"]]
)
## To get the data frame result in a readable format:
rows_number <- length(result_frame$outputParameters$answer$badRate)
Scores <- data.frame(matrix(unlist(result_frame$outputParameters$answer), nrow = rows_number), stringsAsFactors = F)
colnames(Scores) <- names(result_frame$outputParameters$answer)

Просмотреть файл

@ -0,0 +1,143 @@
##########################################################################################################################################
## This R script will do the following:
## 1. Specify parameters for main function.
## 2. Define the main function for development.
## 3. Invoke the main function.
## Input : 1. Full path of the two input tables on HDFS.
## 2. Working directories on local edge node and HDFS
## 3. Stage: "Dev" for development.
## Output: The evaluation metrics of the model.
## Tables and model to be used for Production or Web Scoring are copied to the Production directory.
##########################################################################################################################################
# Current working directory should be set with setwd() to the location of the .R files.
##########################################################################################################################################
## Open Spark Connection and load RevoScaleR library.
##########################################################################################################################################
rxSparkConnect(consoleOutput = TRUE, reset = TRUE)
library(RevoScaleR)
##########################################################################################################################################
## Data sets full path
##########################################################################################################################################
# Generate data with 1 Mn rows and copy it to the data directory "/Loans/Data" on HDFS.
source(paste(getwd(),"/data_generation.R", sep =""))
# Write the full path to the 2 data sets.
Loan <- "/Loans/Data/Loan.csv"
Borrower <- "/Loans/Data/Borrower.csv"
##########################################################################################################################################
## Directories
##########################################################################################################################################
# Local (edge node) working directory. We assume it already exists.
LocalWorkDir <- paste("/var/RevoShare/", Sys.info()[["user"]], "/LoanCreditRisk/dev", sep="")
#dir.create(LocalWorkDir, recursive = TRUE)
# HDFS directory for user calculation. We assume it already exists.
HDFSWorkDir <- paste("/",Sys.info()[["user"]],"/LoanCreditRisk/dev", sep="")
#rxHadoopMakeDir(HDFSWorkDir)
##############################################################################################################################
## Define main function
##############################################################################################################################
## The user should replace the directory in "source" function with the directory of his own.
## The directory should be the full path containing the source scripts.
loan_dev <- function(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage = "Dev"){
# step0: intermediate directories creation.
print("Creating Intermediate Directories on Local and HDFS...")
source(paste(getwd(),"/step0_directories_creation.R", sep =""))
## Define and create the directory where summary statistics, models etc. will be saved in the Development stage.
LocalModelsDir <- file.path(LocalWorkDir, "model")
if(dir.exists(LocalModelsDir)){
system(paste("rm -rf ",LocalModelsDir,"/*", sep="")) # clean up the directory if exists
} else {
dir.create(LocalModelsDir, recursive = TRUE) # make new directory if doesn't exist
}
# step1: data processing
source(paste(getwd(),"/step1_preprocessing.R", sep =""))
print("Step 1: Data Processing.")
data_preprocess(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage = Stage)
# step2: feature engineering
source(paste(getwd(),"/step2_feature_engineering.R", sep =""))
print("Step 2: Feature Engineering.")
feature_engineer(LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage = Stage)
# step3: training, scoring and evaluation of Logistic Regression.
source(paste(getwd(),"/step3_train_score_evaluate.R", sep =""))
print("Step 3: Training, Scoring and Evaluating.")
metrics <- training_evaluation (LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage = Stage)
# Step 4: operational metrics computation and scores transformation.
source(paste(getwd(),"/step4_operational_metrics.R", sep =""))
print("Step 4: Operational Metrics Computation and Scores Transformation.")
## Compute operational metrics and plot the rates of bad loans for various thresholds obtained through binning.
Operational_Metrics <- compute_operational_metrics(LocalWorkDir,
HDFSWorkDir)
plot(Operational_Metrics$badRate, main = c("Rate of Bad Loans Among those with Scores Higher than Decision Thresholds"), xlab = "Default Score Percentiles", ylab = "Expected Rate of Bad Loans")
## EXAMPLE:
## If the score cutoff of the 91th score percentile is 0.9834, and we read a bad rate of 0.6449.
## This means that if 0.9834 is used as a threshold to classify loans as bad, we would have a bad rate of 64.49%.
## This bad rate is equal to the number of observed bad loans over the total number of loans with a score greater than the threshold.
## Transform the scores using the computed thresholds.
apply_score_transformation (LocalWorkDir,
HDFSWorkDir,
Stage = Stage)
# Finally, we copy the global means, modes and quantiles and the trained model for use in Production and Web Scoring.
# You can change the value of update_prod_flag to 0 or comment out the code below to avoid overwriting those currently in use for Production.
update_prod_flag = 1
if (update_prod_flag == 1){
# Production directory that will hold the development data.
ProdModelDir <- paste("/var/RevoShare/", Sys.info()[["user"]], "/LoanCreditRisk/prod/model/", sep="")
# Development directory that holds data to be used in Production.
DevModelDir <- LocalModelsDir
source(paste(getwd(),"/copy_dev_to_prod.R", sep =""))
copy_dev_to_prod(DevModelDir, ProdModelDir)
}
return(metrics)
}
##############################################################################################################################
## Apply the main function
##############################################################################################################################
metrics <- loan_dev (Loan, Borrower, LocalWorkDir, HDFSWorkDir, Stage = "Dev")

Просмотреть файл

@ -0,0 +1,188 @@
##########################################################################################################################################
## This R script will perform in-memory scoring for batch scoring or for scoring remotely with a web service.
##########################################################################################################################################
# Inputs of the function:
## Loan_df: data frame with the Loan data.
## Borrower_df: data frame with the Borrower data.
## Stage: "Prod" for batch scoring, or "Web" for scoring remotely with web service.
in_memory_scoring <- function(Loan_df,
Borrower_df,
Stage)
{
# Load library.
library(RevoScaleR)
library(MicrosoftML)
# Set the compute context to local.
rxSetComputeContext('local')
# Convert the binary variables to factors.
Loan_df$isJointApplication <- factor(Loan_df$isJointApplication)
Borrower_df$incomeVerified <- factor(Borrower_df$incomeVerified)
# Load variables from Development Stage.
if(Stage == "Web"){
Numeric_Means <- model_objects$Numeric_Means
Categorical_Modes <- model_objects$Categorical_Modes
bins <- model_objects$bins
logistic_model <- model_objects$logistic_model
dev_test_avg_score <- model_objects$dev_test_avg_score
Operational_Metrics <- model_objects$Operational_Metrics
}
if(Stage == "Prod"){
# Directory that holds the tables and model from the Development stage.
LocalModelsDir <- file.path(LocalWorkDir, "model")
Numeric_Means <- readRDS(file.path(LocalModelsDir, "Numeric_Means.rds"))
Categorical_Modes <- readRDS(file.path(LocalModelsDir, "Categorical_Modes.rds"))
bins <- readRDS(file.path(LocalModelsDir, "bins.rds"))
logistic_model <- readRDS(file.path(LocalModelsDir, "logistic_model.rds"))
dev_test_avg_score <- readRDS(file.path(LocalModelsDir, "dev_test_avg_score.rds"))
Operational_Metrics <- readRDS(file.path(LocalModelsDir, "Operational_Metrics.rds"))
}
############################################################################################################################################
## The block below will do the following:
## 1. Merge the input tables.
## 2. Determine if there are missing values.
## 3. If applicable, clean the merged data set: replace NAs with the global mean or global mode.
############################################################################################################################################
# Merge the input tables on memberId.
Merged <- rxMerge(Loan_df, Borrower_df, type = "inner", matchVars = "memberId")
# Convert characters to factors.
for (name in colnames(Merged)){
if(class(Merged[[name]]) == "character"){
Merged[[name]] <- factor(Merged[[name]])
}
}
# Get the variables types.
var_all <- colnames(Merged)[!colnames(Merged) %in% c("loanId", "memberId", "loanStatus", "date")]
types <- sapply(Merged[, var_all], function(x) class(x))
categorical_all <- names(types[types %in% c("factor")])
numeric_all <- setdiff(var_all, categorical_all)
# Look for variables missing values, per type.
no_of_NA <- sapply(Merged, function(x) sum(is.na(x)))
var_with_NA <- names(no_of_NA[no_of_NA > 0])
num_with_NA <- intersect(numeric_all, var_with_NA)
cat_with_NA <- intersect(categorical_all, var_with_NA)
# If there are no missing values, we go to the next step.
if(length(var_with_NA) == 0){
MergedCleaned <- Merged
# If there are missing values, we replace them with the mode or mean.
}else{
# Global means and modes from the development stage.
num_NA_mean <- round(Numeric_Means[Numeric_Means$Name %in% num_with_NA,]$Mean)
cat_NA_mode <- as.character(Categorical_Modes[Categorical_Modes$Name %in% cat_with_NA,]$Mode)
# Function to replace missing values with mean or mode. It will be wrapped into rxDataStep.
Mean_Mode_Replace <- function(data) {
# Replace numeric variables with the mean.
if(length(num_with_NA) > 0){
for(i in 1:length(num_with_NA)){
row_na <- which(is.na(data[, num_with_NA[i]]) == TRUE)
data[row_na, num_with_NA[i]] <- num_NA_mean[i]
}
}
# Replace categorical variables with the mode.
if(length(cat_with_NA) > 0){
for(i in 1:length(cat_with_NA)){
data[, cat_with_NA[i]] <- as.character(data[, cat_with_NA[i]])
row_na <- which(is.na(data[, cat_with_NA[i]]) == TRUE)
data[row_na, cat_with_NA[i]] <- cat_NA_mode[i]
data[, cat_with_NA[i]] <- factor(data[, cat_with_NA[i]])
}
}
return(data)
}
MergedCleaned <- Mean_Mode_Replace(Merged)
}
############################################################################################################################################
## The block below will perform feature engineering on the cleaned data set.
############################################################################################################################################
# Create an artificial target variable isBad. This is for rxPredict to work.
MergedCleaned$isBad <- sample(c("0", "1"), size = nrow(MergedCleaned), replace = T)
# Bucketize variables.
buckets_names <- c("loanAmount", "interestRate", "monthlyPayment", "annualIncome", "dtiRatio", "lengthCreditHistory",
"numTotalCreditLines", "numOpenCreditLines", "numOpenCreditLines1Year", "revolvingBalance",
"revolvingUtilizationRate", "numDerogatoryRec", "numDelinquency2Years", "numChargeoff1year",
"numInquiries6Mon")
bucketize <- function(data) {
for(name in buckets_names){
# Deal with the last bin.
name2 <- paste(name, "Bucket", sep = "")
data[, name2] <- as.character(length(bins[[name]]) + 1)
# Deal with the first bin.
rows <- which(data[, name] <= bins[[name]][[1]])
data[rows, name2] <- "1"
# Deal with the rest.
if(length(bins[[name]]) > 1){
for(i in seq(1, (length(bins[[name]]) - 1))){
rows <- which(data[, name] <= bins[[name]][[i + 1]] & data[, name] > bins[[name]][[i]])
data[rows, name2] <- as.character(i + 1)
}
}
# Factorize the new variable.
data[, name2] <- factor(data[, name2], levels = as.character(seq(1, (length(bins[[name]]) + 1))))
}
return(data)
}
MergedFeaturesFactors <- bucketize(MergedCleaned)
############################################################################################################################################
## The block below will score the featurized data set.
############################################################################################################################################
Predictions <- rxPredict(logistic_model,
data = MergedFeaturesFactors,
extraVarsToWrite = c("loanId"))
# Change the names of the variables in the predictions table for clarity.
Predictions <- Predictions[, c(1, 4)]
colnames(Predictions) <- c("loanId", "isBad_Pred")
############################################################################################################################################
## The block below will transform the scores based on Operational Metrics computed in the Development stage.
############################################################################################################################################
# Space out the scores (predicted probability of default) for interpretability with a sigmoid.
sigmoid <- function(x){
return(1/(1 + exp(-20*(x-1.2*dev_test_avg_score))))
}
Predictions$transformedScore <- sigmoid(Predictions$isBad_Pred)
# Deal with the bottom 1-99 percentiles.
for (i in seq(1, (nrow(Operational_Metrics) - 1))){
rows <- which(Predictions$transformedScore <= Operational_Metrics$scoreCutoff[i + 1] &
Predictions$transformedScore > Operational_Metrics$scoreCutoff[i])
Predictions[rows, c("scorePercentile")] <- as.character(Operational_Metrics$scorePercentile[i + 1])
Predictions[rows, c("badRate")] <- Operational_Metrics$badRate[i]
Predictions[rows, c("scoreCutoff")] <- Operational_Metrics$scoreCutoff[i]
}
# Deal with the top 1% higher scores (last bucket).
rows <- which(Predictions$transformedScore > Operational_Metrics$scoreCutoff[100])
Predictions[rows, c("scorePercentile")] <- "Top 1%"
Predictions[rows, c("scoreCutoff")] <- Operational_Metrics$scoreCutoff[100]
Predictions[rows, c("badRate")] <- Operational_Metrics$badRate[100]
# Output the final scores.
Scores <- Predictions[, c("loanId", "transformedScore", "scorePercentile", "scoreCutoff", "badRate")]
return(Scores)
}

Просмотреть файл

@ -0,0 +1,132 @@
##########################################################################################################################################
## This R script will do the following:
## 1. Specify parameters for main function.
## 2. Define the main function for production batch scoring.
## 3. Invoke the main function.
## Input : 1. Full path of the two input tables on HDFS (for scoring with Spark)
## OR the two tables as data frames (for in-memory scoring).
## 2. Working directories on local edge node and HDFS.
## 3. Stage: "Prod" for batch scoring.
## Output: The directory on HDFS which contains the Scores (Spark version) or The Scores table (in-memory version).
##########################################################################################################################################
##########################################################################################################################################
## Load the RevoScaleR library and Open Spark Connection
##########################################################################################################################################
library(RevoScaleR)
rxSparkConnect(consoleOutput = TRUE, reset = TRUE)
##########################################################################################################################################
## Directories
##########################################################################################################################################
# Local (edge node) working directory. We assume it already exists.
LocalWorkDir <- paste("/var/RevoShare/", Sys.info()[["user"]], "/LoanCreditRisk/prod", sep="")
#dir.create(LocalWorkDir, recursive = TRUE)
# HDFS directory for user calculation. We assume it already exists.
HDFSWorkDir <- paste("/",Sys.info()[["user"]],"/LoanCreditRisk/prod", sep="")
#rxHadoopMakeDir(HDFSWorkDir)
# Current working directory should be set with setwd() to the location of the .R files.
##########################################################################################################################################
## Data sets full path
##########################################################################################################################################
# We assume the data already exists on HDFS, and write the full path to the 2 data sets.
Loan_str <- "/Loans/Data/Loan_Prod.csv"
Borrower_str <- "/Loans/Data/Borrower_Prod.csv"
# Import the .csv files as data frames.
Loan_df <- rxImport(RxTextData(file = Loan_str, fileSystem = RxHdfsFileSystem()), stringsAsFactors = T)
Borrower_df <- rxImport(RxTextData(file = Borrower_str, fileSystem = RxHdfsFileSystem()), stringsAsFactors = T)
##############################################################################################################################
## Define main function
##############################################################################################################################
## If Loan and Borrower are data frames, the web scoring is done in_memory.
## Use paths to csv files on HDFS for large data sets that do not fit in-memory.
loan_prod <- function(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage = "Prod"){
# Directory that holds the tables and model from the Development stage.
LocalModelsDir <- file.path(LocalWorkDir, "model")
if((class(Loan) == "data.frame") & (class(Borrower) == "data.frame")){ # In-memory scoring.
source(paste(getwd(),"/in_memory_scoring.R", sep =""))
print("Scoring in-memory...")
return(in_memory_scoring(Loan, Borrower, Stage = Stage))
} else{ # Using Spark for scoring.
# step0: intermediate directories creation.
print("Creating Intermediate Directories on Local and HDFS...")
source(paste(getwd(),"/step0_directories_creation.R", sep =""))
# step1: data processing
source(paste(getwd(),"/step1_preprocessing.R", sep =""))
print("Step 1: Data Processing.")
data_preprocess(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage = Stage)
# step2: feature engineering
source(paste(getwd(),"/step2_feature_engineering.R", sep =""))
print("Step 2: Feature Engineering.")
## splitting_ratio is not used in Production stage.
feature_engineer(LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage = Stage)
# step3: making predictions.
source(paste(getwd(),"/step3_train_score_evaluate.R", sep =""))
print("Step 3: Making Predictions.")
## splitting_ratio is not used in Production stage.
training_evaluation (LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage = Stage)
# Step 4: scores transformation.
source(paste(getwd(),"/step4_operational_metrics.R", sep =""))
print("Step 4: Scores Transformation.")
## Transform the scores using the computed thresholds.
apply_score_transformation (LocalWorkDir,
HDFSWorkDir,
Stage = Stage)
# Return the directory storing the final scores.
return(file.path(HDFSWorkDir,"temp", "Scores"))
}
}
##############################################################################################################################
## Apply the main function
##############################################################################################################################
# Case 1: Input are paths to csv files. Scoring using Spark.
scores_directory <- loan_prod (Loan_str, Borrower_str, LocalWorkDir, HDFSWorkDir, Stage = "Prod")
# Warning: in case you get the following error: "Error: file.exists(inData1) is not TRUE",
# you should reset your R session with Ctrl + Shift + F10 (or Session -> Restart R) and try running it again.
# Case 2: Input are data frames. Scoring is performed in-memory.
Scores <- loan_prod (Loan_df, Borrower_df, LocalWorkDir, HDFSWorkDir, Stage = "Prod")

Просмотреть файл

@ -0,0 +1,29 @@
##########################################################################################################################################
## This R script will do the following:
## 1. Create or clean up an intermediate directory, LocalIntermediateDir, on the edge node.
## 2. Create or clean up an intermediate directory, HDFSIntermediateDir, on HDFS.
##########################################################################################################################################
# Intermediate folders paths one on the edge node and one on HDFS.
LocalIntermediateDir <- file.path(LocalWorkDir, "temp")
HDFSIntermediateDir <- file.path(HDFSWorkDir,"temp")
# Clean up the folders if they already exist and create them otherwise.
if(dir.exists(LocalIntermediateDir)){
system(paste("rm -rf ",LocalIntermediateDir,"/*", sep="")) # clean up the directory if exists
} else {
dir.create(LocalIntermediateDir, recursive = TRUE) # make new directory if doesn't exist
}
if(rxHadoopFileExists(HDFSIntermediateDir)){
rxHadoopRemoveDir(HDFSIntermediateDir, skipTrash = TRUE)
rxHadoopMakeDir(HDFSIntermediateDir)
} else {
rxHadoopMakeDir(HDFSIntermediateDir)
}
# Grant access authority for the edge node intermediate folder.
system(paste("chmod g+s ", LocalIntermediateDir, sep=""))
system(paste("setfacl -d -m g::rwx ", LocalIntermediateDir, sep=""))
system(paste("setfacl -d -m o::rwx ", LocalIntermediateDir, sep=""))

Просмотреть файл

@ -0,0 +1,218 @@
##########################################################################################################################################
## This R script will do the following:
## 1. Convert the 2 raw data sets Loan and Borrower to xdf.
## 2. Merge the 2 tables into one.
## 3. Clean the merged data set: replace NAs with the global mean (numeric variables) or global mode (categorical variables).
## Input : 2 Data Tables: Loan and Borrower.
## Output: Cleaned data set MergedCleaned.
##########################################################################################################################################
## Function of data processing:
# Loan: full name of the Loan table in .csv format.
# Borrower: full name of the Borrower table in .csv format.
# LocalWorkDir: the working directory on the edge node.
# HDFSWorkDir: the working directory on HDFS.
# Stage: "Dev" for development, "Prod" for batch scoring, or "Web" for scoring remotely with web service.
data_preprocess <- function(Loan,
Borrower,
LocalWorkDir,
HDFSWorkDir,
Stage)
{
# Define the intermediate directory holding the input data.
HDFSIntermediateDir <- file.path(HDFSWorkDir,"temp")
# Define the directory where summary statistics will be saved in the Development stage or loaded from in Production.
LocalModelsDir <- file.path(LocalWorkDir, "model")
##############################################################################################################################
## The block below will convert the data format to xdf in order to increase the efficiency of rx functions.
##############################################################################################################################
print("Converting the input data to xdf on HDFS...")
# Create XDF pointers for the 2 data sets on HDFS.
Loan_xdf <- RxXdfData(paste(HDFSIntermediateDir,"/Loan",sep=""), fileSystem = RxHdfsFileSystem())
Borrower_xdf <- RxXdfData(paste(HDFSIntermediateDir,"/Borrower",sep=""), fileSystem = RxHdfsFileSystem())
# Check the input format. Return an error if it is not a path.
if((class(Loan) == "character") & (class(Borrower) == "character")){
# Text pointers to the inputs.
Loan_txt <- RxTextData(Loan, firstRowIsColNames = T, fileSystem = RxHdfsFileSystem(), stringsAsFactors = T)
Borrower_txt <- RxTextData(Borrower, firstRowIsColNames = T, fileSystem = RxHdfsFileSystem(), stringsAsFactors = T)
# Conversion to xdf.
rxDataStep(inData = Loan_txt, outFile = Loan_xdf, overwrite = T)
rxDataStep(inData = Borrower_txt, outFile = Borrower_xdf, overwrite = T)
} else {
stop("invalid input format")
}
##############################################################################################################################
## The block below will merge the two xdf files.
##############################################################################################################################
print("Merging Loan and Borrower...")
# Create an XDF pointer for the output merged table.
# Merged_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "Merged"), fileSystem = RxHdfsFileSystem())
# Create a Hive Table pointer for the output merged table. We use a hive table to preserve column information for factors.
colInfo1 = rxCreateColInfo(Loan_xdf)
colInfo2 = rxCreateColInfo(Borrower_xdf)
colInfo <- list()
for(name in names(colInfo1)){
colInfo[[name]] <- colInfo1[[name]]
}
for(name in names(colInfo2)){
colInfo[[name]] <- colInfo2[[name]]
}
# Convert the two binary variables from integer to factor.
colInfo$isJointApplication$type <- "factor"
colInfo$isJointApplication$levels <- c("0", "1")
colInfo$incomeVerified$type <- "factor"
colInfo$incomeVerified$levels <- c("0", "1")
Merged_hive <- RxHiveData(table = "Merged", colInfo = colInfo)
# Merge Loan and Borrower on memberId.
rxMerge(inData1 = Loan_xdf,
inData2 = Borrower_xdf,
outFile = Merged_hive,
matchVars = "memberId",
type = "inner",
overwrite = TRUE)
# Convert back to xdf.
Merged_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "Merged"), fileSystem = RxHdfsFileSystem())
rxDataStep(inData = Merged_hive, outFile = Merged_xdf, overwrite = T)
############################################################################################################################################
## The block below will do the following:
## 1. Use rxSummary to get the names of the variables with missing values.
## Then, only if there are missing values:
## 2. Compute the global mean (numeric variables) or global mode (categorical variables) of variables with missing values.
## 3. Clean the merged data set: replace NAs with the global mean or global mode.
############################################################################################################################################
print("Looking for variables with missing values...")
# Use rxSummary function to get the names of the variables with missing values.
## Assumption: no NAs in the id variables (loan_id and member_id) and loan_status or the date.
colnames <- names(Merged_xdf)
var <- colnames[!colnames %in% c("loanId", "memberId", "loanStatus", "date")]
formula <- as.formula(paste("~", paste(var, collapse = "+")))
summary <- rxSummary(formula, Merged_xdf , byTerm = TRUE)
## Get the variables types.
categorical_all <- unlist(lapply(summary$categorical, FUN = function(x){colnames(x)[1]}))
numeric_all <- setdiff(var, categorical_all)
## Get the variables names with NA.
var_with_NA <- summary$sDataFrame[summary$sDataFrame$MissingObs > 0, 1]
categorical_NA <- intersect(categorical_all, var_with_NA)
numeric_NA <- intersect(numeric_all, var_with_NA)
## For the Development stage, we get and store the summary statistics for Production and Web Scoring use.
if(Stage == "Dev"){
# Compute the global means.
Summary_DF <- summary$sDataFrame
Numeric_Means <- Summary_DF[Summary_DF$Name %in% numeric_all, c("Name", "Mean")]
# Compute the global modes.
## Get the counts tables.
Summary_Counts <- summary$categorical
names(Summary_Counts) <- lapply(Summary_Counts, FUN = function(x){colnames(x)[1]})
## Compute for each count table the value with the highest count.
modes <- unlist(lapply(Summary_Counts, FUN = function(x){as.character(x[which.max(x[,2]),1])}), use.names = F)
Categorical_Modes <- data.frame(Name = categorical_all, Mode = modes)
# Save the statistics for Production or Web Scoring use.
saveRDS(Numeric_Means, file.path(LocalModelsDir, "Numeric_Means.rds"))
saveRDS(Categorical_Modes, file.path(LocalModelsDir, "Categorical_Modes.rds"))
}
## For the Production stage, we load the summary statistics computed in the Development stage.
if(Stage == "Prod"){
Numeric_Means <- readRDS(file.path(LocalModelsDir, "Numeric_Means.rds"))
Categorical_Modes <- readRDS(file.path(LocalModelsDir, "Categorical_Modes.rds"))
}
## For the Web Scoring, we directly read the summary statistics computed in the Development stage.
## They are included in the list model_objects, defined in "deployment.R". It can be used when calling the published web service.
if(Stage == "Web"){
Numeric_Means <- model_objects$Numeric_Means
Categorical_Modes <- model_objects$Categorical_Modes
}
# If no missing values, we copy and rename the files to the cleaned data folder.
if(length(var_with_NA) == 0){
print("No missing values: no treatment will be applied.")
rxHadoopCopy(source = file.path(HDFSIntermediateDir, "Merged"),
dest = file.path(HDFSIntermediateDir, "MergedCleaned"))
# If there are missing values, we replace them with the mode or mean.
}else{
print("Variables containing missing values are:")
print(var_with_NA)
print("Replacing missing values with the global mean or mode...")
# Get the global means of the numeric variables with missing values.
numeric_NA_mean <- round(Numeric_Means[Numeric_Means$Name %in% numeric_NA,]$Mean)
# Get the global modes of the categorical variables with missing values.
categorical_NA_mode <- as.character(Categorical_Modes[Categorical_Modes$Name %in% categorical_NA,]$Mode)
# Function to replace missing values with mean or mode. It will be wrapped into rxDataStep.
Mean_Mode_Replace <- function(data) {
data <- data.frame(data, stringsAsFactors = F)
# Replace numeric variables with the mean.
if(length(num_with_NA) > 0){
for(i in 1:length(num_with_NA)){
row_na <- which(is.na(data[, num_with_NA[i]]) == TRUE)
data[row_na, num_with_NA[i]] <- num_NA_mean[i]
}
}
# Replace categorical variables with the mode.
if(length(cat_with_NA) > 0){
for(i in 1:length(cat_with_NA)){
row_na <- which(is.na(data[, cat_with_NA[i]]) == TRUE)
data[row_na, cat_with_NA[i]] <- cat_NA_mode[i]
}
}
return(data)
}
# Point to the output partial data.
MergedCleaned_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedCleaned"), fileSystem = RxHdfsFileSystem())
# Perform the data cleaning with rxDataStep.
rxDataStep(inData = Merged_xdf,
outFile = MergedCleaned_xdf,
overwrite = T,
transformFunc = Mean_Mode_Replace,
transformObjects = list(num_with_NA = numeric_NA , num_NA_mean = numeric_NA_mean,
cat_with_NA = categorical_NA, cat_NA_mode = categorical_NA_mode))
## Check if data cleaned:
## summary_cleaned <- rxSummary(formula, MergedCleaned_xdf , byTerm = TRUE)
## Summary_Cleaned_DF <- summary_cleaned$sDataFrame
## length(Summary_Cleaned_DF[Summary_Cleaned_DF$MissingObs > 0,2]) == 0
print("Step 1 Completed.")
}
}# end of step 1 function.

Просмотреть файл

@ -0,0 +1,299 @@
##########################################################################################################################################
## This R script will do the following :
## 1. Create the label isBad based on the status of the loan.
## 2. Split the data set into a Training and a Testing set.
## 3. Bucketize all the numeric variables, based on Conditional Inference Trees, using the smbinning package on the Training set.
## 4. Specify correctly the variable types and drop the variables used to compute the new features.
## Input : Cleaned data set MergedCleaned.
## Output: Data set with new features MergedFeaturesFactors.
##########################################################################################################################################
## Function for feature engineering:
# LocalWorkDir: the working directory on the edge node.
# HDFSWorkDir: the working directory on HDFS.
# splitting_ratio: the proportion (in ]0,1]) of observations that will end in the training set.
# Stage: "Dev" for development, "Prod" for batch scoring, or "Web" for scoring remotely with web service.
feature_engineer <- function(LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage)
{
# Define the intermediate directory holding the input data.
HDFSIntermediateDir <- file.path(HDFSWorkDir,"temp")
# Define the directory where bins and variable information will be saved in the Development stage or loaded from in Production.
LocalModelsDir <- file.path(LocalWorkDir, "model")
# Point to the input data.
MergedCleaned_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedCleaned"), fileSystem = RxHdfsFileSystem())
#############################################################################################################################################
## For the development stage, the block below will:
## 1. Create the label, isBad, based on the loanStatus variable.
## 2. Create a variable, hashCode, which correspond to the mapping of loanId to integers using murmur3.32 hash function.
############################################################################################################################################
if(Stage == "Dev"){
print("Creating the label isBad based on loanStatus...")
# Point to the Output SQL table:
MergedLabeled_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedLabeled"), fileSystem = RxHdfsFileSystem())
# Create the target variable, isBad, based on loanStatus.
# We also map the loanId to integers with the murmur3.32 hash function.
rxDataStep(inData = MergedCleaned_xdf ,
outFile = MergedLabeled_xdf,
overwrite = TRUE,
transforms = list(
isBad = ifelse(loanStatus %in% c("Current"), "0", "1"),
hashCode = sapply(as.character(loanId), murmur3.32)
),
transformPackages = "hashFunction"
)
}
if(Stage == "Prod" | Stage == "Web" ){
# Since there is no loanStatus variable in data to score, we create a fake isBad variable not used later.
# Point to the Output SQL table:
MergedLabeled_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedLabeled"), fileSystem = RxHdfsFileSystem())
# Create the fake target variable isBad.
rxDataStep(inData = MergedCleaned_xdf ,
outFile = MergedLabeled_xdf,
overwrite = TRUE,
transforms = list(
isBad = sample(c("0", "1"), size = .rxNumRows, replace = T))
)
}
#############################################################################################################################################
## Development: The block below will create Training set to compute bins.
############################################################################################################################################
if(Stage == "Dev"){
print("Creating a training set to be used to compute bins...")
# Create the training set.
# It will be used to compute bins for numeric variables with smbining.
Train_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "Train"), fileSystem = RxHdfsFileSystem())
rxDataStep(inData = MergedLabeled_xdf,
outFile = Train_xdf,
overwrite = TRUE,
rowSelection = (hashCode %% 100 < 100*splitting_ratio),
transformObjects = list(splitting_ratio = splitting_ratio))
}
#############################################################################################################################################
## The block below will compute (load for Production or Web-Scoring) the bins for various numeric variables.
## smbinning is applied in-memory to the training set loaded as a data frame.
############################################################################################################################################
# Names of the variables that will be bucketed.
smb_buckets_names <- c("loanAmount", "interestRate", "monthlyPayment", "annualIncome", "dtiRatio", "lengthCreditHistory",
"numTotalCreditLines", "numOpenCreditLines", "numOpenCreditLines1Year", "revolvingBalance",
"revolvingUtilizationRate", "numDerogatoryRec", "numDelinquency2Years", "numChargeoff1year",
"numInquiries6Mon")
# Development: We compute the global quantiles for various numeric variables.
if(Stage == "Dev"){
print("Computing the bins to be used to create buckets...")
# Using the smbinning has some limitations, such as:
# - The variable should have more than 10 unique values.
# - If no significant splits are found, it does not output bins.
# For this reason, we manually specify default bins based on an analysis of the variables distributions or smbinning on a larger data set.
# We then overwrite them with smbinning when it output bins.
bins <- list()
# Default cutoffs for bins:
## EXAMPLE: If the cutoffs are (c1, c2, c3),
## Bin 1 = ]- inf, c1], Bin 2 = ]c1, c2], Bin 3 = ]c2, c3], Bin 4 = ]c3, + inf]
## c1 and c3 are NOT the minimum and maximum found in the training set.
bins$loanAmount <- c(13343, 15365, 16648, 17769, 19230, 20545, 22101, 22886, 24127, 24998, 27416)
bins$interestRate <- c(6.99, 8.78, 10.34, 11.13, 11.91, 12.72, 13.75, 14.59, 15.85, 18.01)
bins$monthlyPayment <- c(318, 360, 393, 440, 485, 520, 554, 595, 635, 681, 741, 855)
bins$annualIncome <- c(50022, 51632, 52261, 53075, 53854, 54430, 55055, 55499, 56171, 56913, 57735, 58626, 59715)
bins$dtiRatio <- c(9.08, 11.95, 13.77, 14.54, 15.43, 16.48, 17.54, 18.27, 19.66, 20.46, 21.29, 22.87, 25.03)
bins$lengthCreditHistory <- c(6, 7, 9)
bins$numTotalCreditLines <- c(1, 2)
bins$numOpenCreditLines <- c(3, 5)
bins$numOpenCreditLines1Year <- c(3, 4, 5, 6, 7, 8)
bins$revolvingBalance <- c(10722, 11630, 12298, 12916, 13317, 13797, 14256, 14633, 15174, 15680, 16394, 16796, 17257, 18180)
bins$revolvingUtilizationRate <- c(43.89, 49.36, 53.72, 57.61, 61.41, 64.45, 69.81, 74.05, 76.98, 82.14, 90.54)
bins$numDerogatoryRec <- c(0, 1)
bins$numDelinquency2Years <- c(0, 10)
bins$numChargeoff1year <- c(0, 8)
bins$numInquiries6Mon <- c(0, 1, 2)
# Function to compute smbinning on every variable.
## For large data sets, we take a random subset to speed up computations with smbinning.
compute_bins <- function(name, data){
# Import the training set to be able to apply smbinning and set the type of the label to numeric.
Train_df <- rxImport(data, varsToKeep = c("isBad", name))
## We take a subset of the training set to speed up computations for very large data sets.
Train_df <- Train_df[sample(seq(1, nrow(Train_df)), replace = F, size = min(300000, nrow(Train_df))), ]
Train_df$isBad <- as.numeric(as.character(Train_df$isBad))
# Compute the cutoffs with smbinning.
library(smbinning)
output <- smbinning(Train_df, y = "isBad", x = name, p = 0.05)
if (class(output) == "list"){ # case where the binning was performed and returned bins.
cuts <- output$cuts
return (cuts)
}
}
# We apply it in parallel on the variables accross the nodes of the cluster with the rxExec function.
rxOptions(numCoresToUse = -1) # use of the maximum number of cores.
bins_smb <- rxExec(compute_bins, name = rxElemArg(smb_buckets_names), data = Train_xdf)
names(bins_smb) <- smb_buckets_names
# Fill b with bins obtained in bins_smb with smbinning.
## We replace the default values in bins if and only if smbinning returned a non NULL result.
for(name in smb_buckets_names){
if (!is.null(bins_smb[[name]])){
bins[[name]] <- bins_smb[[name]]
}
}
## Saving for Production use.
saveRDS(bins, file.path(LocalModelsDir, "bins.rds"))
}
# Production: We load the bins computed during Development.
if(Stage == "Prod"){
print("Loading the bins to be used to create buckets...")
bins <- readRDS(file.path(LocalModelsDir, "bins.rds"))
}
# Web Scoring: we directly read the bins computed in the Development stage.
# They are included in the list model_objects, defined in "deployment.R". It can be used when calling the published web service.
if(Stage == "Web"){
print("Loading the bins to be used to create buckets...")
bins <- model_objects$bins
}
#############################################################################################################################################
## The block below will bucketize the numeric variables based on the computed or defined bins.
############################################################################################################################################
print("Bucketizing numeric variables...")
# Function to bucketize numeric variables. It will be wrapped into rxDataStep.
bucketize <- function(data) {
data <- data.frame(data)
for(name in buckets_names){
# Deal with the last bin.
name2 <- paste(name, "Bucket", sep = "")
data[, name2] <- as.character(length(b2[[name]]) + 1)
# Deal with the first bin.
rows <- which(data[, name] <= b2[[name]][[1]])
data[rows, name2] <- "1"
# Deal with the rest.
if(length(b2[[name]]) > 1){
for(i in seq(1, (length(b2[[name]]) - 1))){
rows <- which(data[, name] <= b2[[name]][[i + 1]] & data[, name] > b2[[name]][[i]])
data[rows, name2] <- as.character(i + 1)
}
}
}
return(data)
}
# Perform feature engineering on the cleaned data set.
## Create an XDF pointer for the output.
MergedFeatures_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedFeatures"), fileSystem = RxHdfsFileSystem())
## Apply the function Bucketize to MergedLabeled_xdf.
rxDataStep(inData = MergedLabeled_xdf,
outFile = MergedFeatures_xdf,
overwrite = TRUE,
transformFunc = bucketize,
transformObjects = list(
b2 = bins, buckets_names = smb_buckets_names)
)
#############################################################################################################################################
## The block below will:
## Development: set the type of the newly created variables to factor and save the variable information for Production/Web Scoring use.
## Production/ Web Scoring: factor the variables and specify their levels accordingly to the Development data.
############################################################################################################################################
# Create an XDF pointer to the output data.
MergedFeaturesFactors_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedFeaturesFactors"), fileSystem = RxHdfsFileSystem())
# Development stage.
if(Stage == "Dev"){
print("Transforming newly created variables to factors...")
# Add the newly created variables to a column_factor_info list to be used in rxFactors.
new_names <- paste(names(bins), "Bucket", sep = "")
new_levels <- unlist(lapply(bins, function(x) length(x) + 1))
column_factor_info = mapply(function(i, new_levels){list(levels = as.character(seq(1, new_levels)) )}, 1:length(new_levels), new_levels,SIMPLIFY = F)
names(column_factor_info) <- new_names
# Convert the new features from character to factors.
## We drop the numeric variables and loanStatus, used to compute the new features.
rxFactors(inData = MergedFeatures_xdf , outFile = MergedFeaturesFactors_xdf, factorInfo = column_factor_info, varsToDrop = c("loanStatus", smb_buckets_names))
print("Saving the variable information for Production and Web Scoring use...")
# Add to column_factor_info the levels of the other factor variables, for use in Production and Web Scoring.
## Get the names of the other factor variables.
colnames <- names(MergedFeaturesFactors_xdf)
colnames_other <- colnames[!(colnames %in% c("isBad", new_names))]
## Add them to the column_factor_info list.
var_info <- rxGetVarInfo(MergedFeaturesFactors_xdf)
for(name in colnames_other){
if(var_info[[name]]$varType == "factor"){
column_factor_info[[name]]$newLevels <- var_info[[name]]$levels
}
}
# Remove the date factor info (its levels will rarely match those of a Production data set)
column_factor_info$date <- NULL
## Save column_factor_info for Production or Web Scoring.
saveRDS(column_factor_info, file.path(LocalModelsDir, "column_factor_info.rds"))
}
# Production stage.
if(Stage == "Prod"){
# Load the variable information from the Development stage.
column_factor_info <- readRDS(file = file.path(LocalModelsDir, "column_factor_info.rds"))
# Convert the new features to factor and specify the levels of the other factors in the order of the Development data.
rxFactors(inData = MergedFeatures_xdf , outFile = MergedFeaturesFactors_xdf, factorInfo = column_factor_info, varsToDrop = c(smb_buckets_names))
}
# Web Scoring stage.
if(Stage == "Web"){
## For the Web Scoring, we directly read the factorInfo computed in the Development stage.
## It is included in the list model_objects, defined in "deployment.R". It can be used when calling the published web service.
column_factor_info <- model_objects$column_factor_info
# Convert the new features to factor and specify the levels of the other factors in the order of the Development data.
rxFactors(inData = MergedFeatures_xdf , outFile = MergedFeaturesFactors_xdf, factorInfo = column_factor_info, varsToDrop = c(smb_buckets_names))
}
print("Step 2 Completed.")
} # end of step 2 function.

Просмотреть файл

@ -0,0 +1,250 @@
##########################################################################################################################################
## This R script will do the following:
## 1. Combine the different text files into 1 XDF file.
## 2. Development: Split the featurized data set into a training and a testing set.
## 3. Development: Train a logistic regression classification model on the training set.
## 4. Production/ Web Scoring: Load the logistic regression model and variable information from Development.
## 5. Score the logisitc regression on the test set or Production/ Web Scoring data.
## 6. Development: Evaluate the model.
## Input : Data set MergedFeaturesFactors.
## Output: Logistic Regression Model (Development) and Predictions.
##########################################################################################################################################
## Function for splitting, training, scoring and evaluating:
# LocalWorkDir: the working directory on the edge node.
# HDFSWorkDir: the working directory on HDFS.
# splitting_ratio: the proportion (in ]0,1]) of observations that will end in the training set. Should be the same as in step 2.
# Stage: "Dev" for development, "Prod" for batch scoring, or "Web" for scoring remotely with web service.
training_evaluation <- function(LocalWorkDir,
HDFSWorkDir,
splitting_ratio = 0.7,
Stage)
{
# Load the MicrosoftML library to use rxLogisticRegression and rxPredict.
library(MicrosoftML)
# Define the intermediate directory holding the input data.
HDFSIntermediateDir <- file.path(HDFSWorkDir,"temp")
# Define the directory where the model will be saved in the Development stage or loaded from in Production.
LocalModelsDir <- file.path(LocalWorkDir, "model")
# Point to the input data (both Development and Production/ Web Scoring).
MergedFeaturesFactors_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedFeaturesFactors"), fileSystem = RxHdfsFileSystem())
if(Stage == "Dev"){ # Splitting and Training are only performed for the Development stage.
##########################################################################################################################################
## The block below will split the data into training and testing set
##########################################################################################################################################
print("Randomly Splitting into a training and a testing set using the hashCode created in step 2...")
# Split the analytical data set into a training and a testing set.
## Note that the training set in step 2 was used only to compute the bins.
## The training set here is augmented with the new features compared to the one in step 2.
Train_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "Train"), fileSystem = RxHdfsFileSystem())
Test_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "Test"), fileSystem = RxHdfsFileSystem())
rxDataStep(inData = MergedFeaturesFactors_xdf,
outFile = Train_xdf,
overwrite = TRUE,
rowSelection = (hashCode %% 100 < 100*splitting_ratio),
transformObjects = list(splitting_ratio = splitting_ratio),
varsToDrop = c("hashCode")
)
rxDataStep(inData = MergedFeaturesFactors_xdf,
outFile = Test_xdf,
overwrite = TRUE,
rowSelection = (hashCode %% 100 >= 100*splitting_ratio),
transformObjects = list(splitting_ratio = splitting_ratio),
varsToDrop = c("hashCode")
)
##########################################################################################################################################
## The block below will write the formula used for the training
##########################################################################################################################################
print("Writing the formula for training...")
# Write the formula after removing variables not used in the Development.
variables_all <- rxGetVarNames(Train_xdf)
variables_to_remove <- c("loanId", "memberId", "date", "residentialState", "term")
training_variables <- variables_all[!(variables_all %in% c("isBad", variables_to_remove))]
formula <- as.formula(paste("isBad ~", paste(training_variables, collapse = "+")))
##########################################################################################################################################
## The block below will do the following:
## 1. Train a logistic regression model.
## 2. Save the trained logistic regression model on the local edge node.
##########################################################################################################################################
print("Training the logistic regression model...")
# Train the logistic regression model.
## The regularization weights (l1Weight and l2Weight) can be modified for further optimization.
## The included selectFeatures function can select a certain number of optimal features based on a specified method.
## the number of variables to select and the method can be further optimized.
logistic_model <- rxLogisticRegression(formula = formula,
data = Train_xdf,
type = "binary",
l1Weight = 0.7,
l2Weight = 0.7,
mlTransforms = list(selectFeatures(formula, mode = mutualInformation(numFeaturesToKeep = 10))))
# Save the fitted model to the local edge node for use in Production.
saveRDS(logistic_model, file = paste(LocalModelsDir, "/logistic_model.rds", sep = ""))
# Get the coefficients of the logistic regression formula.
## NA means the variable has been dropped while building the model.
coeff <- logistic_model$coefficients
Logistic_Coeff <- data.frame(variable = names(coeff), coefficient = coeff, row.names = NULL)
## Order in decreasing order of absolute value of coefficients.
Logistic_Coeff <- Logistic_Coeff[order(abs(Logistic_Coeff$coefficient), decreasing = T),]
# Save the coefficients table to the local edge node.
saveRDS(Logistic_Coeff, file = paste(LocalModelsDir, "/Logistic_Coeff.rds", sep = ""))
} # end of Stage == "Dev"
##########################################################################################################################################
## The block below will do the following load the logistic regression model created during Development.
##########################################################################################################################################
if(Stage == "Prod"){
print("Importing the logistic regression model and variable information...")
logistic_model <- readRDS(file = file.path(LocalModelsDir, "logistic_model.rds"))
# Rename the pointer to the Production data to be scored.
Test_xdf <- MergedFeaturesFactors_xdf
}
if(Stage == "Web"){
print("Importing the logistic regression model and variable information...")
logistic_model <- model_objects$logistic_model
# Rename the pointer to the Production data to be scored.
Test_xdf <- MergedFeaturesFactors_xdf
}
##########################################################################################################################################
## The block below will score the test set or Production/ Web Scoring data on the logistic model and output the prediction table.
##########################################################################################################################################
print("Scoring the logistic regression model...")
# Make Predictions.
PredictionsLogistic_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "PredictionsLogistic"), fileSystem = RxHdfsFileSystem())
rxPredict(logistic_model,
data = Test_xdf,
outData = PredictionsLogistic_xdf,
overwrite = T,
extraVarsToWrite = c("isBad", "loanId"))
# Development: Perform model evaluation.
if(Stage == "Dev"){
##########################################################################################################################################
## The block below will do the following:
## 1. Compute the confusion matrix and some classification metrics.
## 2. Compute the AUC and plot the ROC curve.
## 3. Compute the KS statistic and draw the KS plot.
##########################################################################################################################################
print("Evaluating the logistic regression model...")
# Evaluation function.
evaluate_model <- function(predictions_table = PredictionsLogistic_xdf) {
# Import the prediction table and convert isBad to numeric for correct evaluation.
Predictions <- rxImport(predictions_table, varsToDrop = c("loanId"))
Predictions$isBad <- as.numeric(as.character(Predictions$isBad))
# Change the names of the variables in the predictions table for clarity.
Predictions <- Predictions[, c(1, 4)]
colnames(Predictions) <- c("isBad", "isBad_Pred")
## KS PLOT AND STATISTIC.
# Split the data according to the observed value.
Predictions0 <- Predictions[Predictions$isBad ==0,]$isBad_Pred
Predictions1 <- Predictions[Predictions$isBad ==1,]$isBad_Pred
# Get the cumulative distribution of predicted probabilities (on a subset for faster computations).
cdf0 <- ecdf(Predictions0[base::sample(seq(1, length(Predictions0)), replace = F, size = min(300000, length(Predictions0)))])
cdf1 <- ecdf(Predictions1)
# Compute the KS statistic and the corresponding points on the KS plot.
## Create a sequence of predicted probabilities in its range of values.
minMax <- seq(min(Predictions0, Predictions1), max(Predictions0, Predictions1), length.out=length(Predictions0))
## Compute KS, ie. the largest distance between the two cumulative distributions.
KS <- max(abs(cdf0(minMax) - cdf1(minMax)))
## Find one predicted probability where the cumulative distributions have the biggest difference.
x0 <- minMax[which(abs(cdf0(minMax) - cdf1(minMax)) == KS )][1]
## Get the corresponding points on the plot.
y0 <- cdf0(x0)
y1 <- cdf1(x0)
# Plot the two cumulative distributions with the line between points of greatest distance.
plot(cdf0, verticals = T, do.points = F, col = "blue", main = sprintf("KS Plot; KS = %s", round(KS, digits = 3)), ylab = "Cumulative Distribution Functions", xlab = "Predicted Probabilities")
plot(cdf1, verticals = T, do.points = F, col = "green", add = T)
legend(0.3, 0.8, c("isBad == 0", "isBad == 1"), lty = c(1, 1),lwd = c(2.5, 2.5), col = c("blue", "green"))
points(c(x0, x0), c(y0, y1), pch = 16, col = "red")
segments(x0, y0, x0, y1, col = "red", lty = "dotted")
## CONFUSION MATRIX AND VARIOUS METRICS.
# The cumulative distributions of predicted probabilities given observed values are the farthest apart for a score equal to x0.
# We can then use x0 as a decision threshold for example.
# Note that the choice of a decision threshold can be further optimized.
# Using the x0 point as a threshold, we compute the binary predictions to get the confusion matrix.
Predictions$isBad_Pred_Binary <- ifelse(Predictions$isBad_Pred < x0, 0, 1)
confusion <- table(Predictions$isBad, Predictions$isBad_Pred_Binary, dnn = c("Observed", "Predicted"))[c("0", "1"), c("0", "1")]
print(confusion)
tp <- confusion[1, 1]
fn <- confusion[1, 2]
fp <- confusion[2, 1]
tn <- confusion[2, 2]
accuracy <- (tp + tn) / (tp + fn + fp + tn)
precision <- tp / (tp + fp)
recall <- tp / (tp + fn)
fscore <- 2 * (precision * recall) / (precision + recall)
## ROC PLOT AND AUC.
ROC <- rxRoc(actualVarName = "isBad", predVarNames = "isBad_Pred", data = Predictions, numBreaks = 100)
AUC <- rxAuc(ROC)
plot(ROC, title = "ROC Curve for Logistic Regression")
# Return the computed metrics.
metrics <- c("Accuracy" = accuracy,
"Precision" = precision,
"Recall" = recall,
"F-Score" = fscore,
"AUC" = AUC,
"KS" = KS,
"Score Threshold" = x0)
return(metrics)
}
# Apply model evaluation in local compute context.
rxSetComputeContext('local')
metrics <- evaluate_model()
print("Step 3 Completed.")
print("Evaluation Metrics:")
print(metrics)
return(metrics)
} # end of model evaluation when Stage == "Dev".
print("Step 3 Completed.")
} # end of step 3 function.

Просмотреть файл

@ -0,0 +1,229 @@
##########################################################################################################################################
## This R script will do the following:
## I. Development stage: Compute Operational Metrics, ie. expected bad rate for various classification decision thresholds.
## II. Apply a score transformation based on operational metrics for Development and Production.
## Input : Predictions table.
## Output: Operational Metrics and Transformed Scores.
#########################################################################################################################################
##########################################################################################################################################
## The function below computes operational metrics in the Development stage, in the following way:
## 1. Apply a sigmoid function to the output scores of the logistic regression, in order to spread them in [0,1].
## 2. Compute bins for the scores, based on quantiles.
## 3. Take each lower bound of each bin as a decision threshold for default loan classification, and compute the rate of bad loans
## among loans with a score higher than the threshold.
##########################################################################################################################################
# LocalWorkDir: the working directory on the edge node.
# HDFSWorkDir: the working directory on HDFS.
compute_operational_metrics <- function(LocalWorkDir,
HDFSWorkDir)
{
print("Computing operational metrics...")
# Set the compute context to Local.
rxSetComputeContext('local')
# Define the intermediate directory holding the input data.
HDFSIntermediateDir <- file.path(HDFSWorkDir,"temp")
# Define the directory where the average of the Scores and the Operational Scores will be saved.
LocalModelsDir <- file.path(LocalWorkDir, "model")
# Point to the input data (Predictions table):
PredictionsLogistic_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "PredictionsLogistic"), fileSystem = RxHdfsFileSystem())
##########################################################################################################################################
## Import the predictions table and convert isBad to numeric for correct computations.
##########################################################################################################################################
Predictions <- rxImport(PredictionsLogistic_xdf, varsToDrop = c("loanId"))
Predictions$isBad <- as.numeric(as.character(Predictions$isBad))
# Change the names of the variables in the predictions table for clarity.
Predictions <- Predictions[, c(1, 4)]
colnames(Predictions) <- c("isBad", "isBad_Pred")
##########################################################################################################################################
## Space out the scores (predicted probability of default) for interpretability with a sigmoid.
##########################################################################################################################################
# Define the sigmoid: it is centered at 1.2*mean score to ensure a good spread of scores.
## The sigmoid parameters can be changed for a new data set.
dev_test_avg_score <- mean(Predictions$isBad_Pred)
sigmoid <- function(x){
return(1/(1 + exp(-20*(x-1.2*dev_test_avg_score))))
}
# Apply the function.
Predictions$transformedScore <- sigmoid(Predictions$isBad_Pred)
# Save the average score to the local edge node for use in Production.
saveRDS(dev_test_avg_score, file = paste(LocalModelsDir, "/dev_test_avg_score.rds", sep = ""))
##########################################################################################################################################
## Get the rates of bad loans for every bin taken as a threshold.
##########################################################################################################################################
# Bin the scores based on quantiles.
bins <- rxQuantile("transformedScore", Predictions, probs = c(seq(0, 0.99, 0.01)))
bins[["0%"]] <- 0
# We consider 100 decision thresholds: the lower bound of each bin.
# Compute the expected rates of bad loans for loans with scores higher than each decision threshold.
badrate <- rep(0, length(bins))
for(i in 1:length(bins))
{
selected <- Predictions$isBad[Predictions$transformedScore >= bins[i]]
badrate[i] <- sum(selected)/length(selected)
}
# Save the operational metrics to the local edge node and to HDFS.
Operational_Metrics <- data.frame(scorePercentile = names(bins), scoreCutoff = bins, badRate = badrate, row.names = NULL)
saveRDS(Operational_Metrics, file = paste(LocalModelsDir, "/Operational_Metrics.rds", sep = ""))
Operational_Metrics_xdf <- RxXdfData(paste(HDFSIntermediateDir,"/OperationalMetrics",sep=""), fileSystem = RxHdfsFileSystem(), createCompositeSet = T)
rxDataStep(inData = Operational_Metrics, outFile = Operational_Metrics_xdf, overwrite = T)
# Save the operational metrics to a Hive table for display in PowerBI.
rxSparkConnect(consoleOutput = TRUE, reset = FALSE)
Operational_Metrics_hive <- RxHiveData(table = "Operational_Metrics")
rxDataStep(inData = Operational_Metrics_xdf, outFile = Operational_Metrics_hive, overwrite = T)
print(paste0("The hive table Operational_Metrics is stored under the folder: ","/hive/warehouse"))
return(Operational_Metrics)
}
##########################################################################################################################################
## The function below transforms the scores given by the logistic regression on the testing or Production data in the following way:
## 1. Apply a sigmoid function to the output scores of the logistic regression, in order to spread them in [0,1].
## 2. Asssign each score to a quantile bin with the bad rates given by the Operational Scores table.
##########################################################################################################################################
# LocalWorkDir: the working directory on the edge node.
# HDFSWorkDir: the working directory on HDFS.
# Stage: "Dev" for development, "Prod" for batch scoring, or "Web" for scoring remotely with web service.
apply_score_transformation <- function(LocalWorkDir,
HDFSWorkDir,
Stage)
{
print("Transforming scores based on operational metrics...")
# Set the compute context to Local.
rxSetComputeContext('local')
# Define the intermediate directory holding the input data.
HDFSIntermediateDir <- file.path(HDFSWorkDir,"temp")
# Define the directory where the average of the Scores and the Operational_Metrics were saved.
LocalModelsDir <- file.path(LocalWorkDir, "model")
# Point to the input data (Predictions table):
PredictionsLogistic_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "PredictionsLogistic"), fileSystem = RxHdfsFileSystem())
# Import the average of the scores and Operational Metrics computed during Development.
if(Stage == "Dev" | Stage == "Prod"){
dev_test_avg_score <- readRDS(file = file.path(LocalModelsDir, "dev_test_avg_score.rds"))
Operational_Metrics <- readRDS(file = file.path(LocalModelsDir, "Operational_Metrics.rds"))
}
if(Stage == "Web"){
dev_test_avg_score <- model_objects$dev_test_avg_score
Operational_Metrics <- model_objects$Operational_Metrics
}
##########################################################################################################################################
## Import the predictions table and convert isBad to numeric for correct computations.
##########################################################################################################################################
Predictions <- rxImport(PredictionsLogistic_xdf)
Predictions$isBad <- as.numeric(as.character(Predictions$isBad))
# Change the names of the variables in the predictions table for clarity.
Predictions <- Predictions[, c(1, 2, 5)]
colnames(Predictions) <- c("isBad", "loanId", "isBad_Pred")
##########################################################################################################################################
## Space out the scores (predicted probability of default) for interpretability with a sigmoid.
##########################################################################################################################################
# Define the sigmoid.
sigmoid <- function(x){
return(1/(1 + exp(-20*(x-1.2*dev_test_avg_score))))
}
# Apply the function.
Predictions$transformedScore <- sigmoid(Predictions$isBad_Pred)
##########################################################################################################################################
## Apply the score transformation.
##########################################################################################################################################
# Deal with the bottom 1-99 percentiles.
for (i in seq(1, (nrow(Operational_Metrics) - 1))){
rows <- which(Predictions$transformedScore <= Operational_Metrics$scoreCutoff[i + 1] &
Predictions$transformedScore > Operational_Metrics$scoreCutoff[i])
Predictions[rows, c("scorePercentile")] <- as.character(Operational_Metrics$scorePercentile[i + 1])
Predictions[rows, c("badRate")] <- Operational_Metrics$badRate[i]
Predictions[rows, c("scoreCutoff")] <- Operational_Metrics$scoreCutoff[i]
}
# Deal with the top 1% higher scores (last bucket).
rows <- which(Predictions$transformedScore > Operational_Metrics$scoreCutoff[100])
Predictions[rows, c("scorePercentile")] <- "Top 1%"
Predictions[rows, c("scoreCutoff")] <- Operational_Metrics$scoreCutoff[100]
Predictions[rows, c("badRate")] <- Operational_Metrics$badRate[100]
##########################################################################################################################################
## Save the transformed scores to HDFS.
##########################################################################################################################################
Scores_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "Scores"), fileSystem = RxHdfsFileSystem(), createCompositeSet = T)
rxDataStep(inData = Predictions[, c("loanId", "transformedScore", "scorePercentile", "scoreCutoff", "badRate", "isBad")],
outFile = Scores_xdf,
overwrite = TRUE)
##########################################################################################################################################
## Save data in hive tables for display in PowerBI.
##########################################################################################################################################
rxSparkConnect(consoleOutput = TRUE, reset = FALSE)
if(Stage == "Dev"){
ScoresData_hive <- RxHiveData(table = "ScoresData")
} else{
ScoresData_hive <- RxHiveData(table = "ScoresData_Prod")
}
MergedCleaned_xdf <- RxXdfData(file.path(HDFSIntermediateDir, "MergedCleaned"), fileSystem = RxHdfsFileSystem())
rxMerge(inData1 = MergedCleaned_xdf,
inData2 = Scores_xdf,
outFile = ScoresData_hive,
matchVars = "loanId",
type = "inner",
overwrite = TRUE)
print("The hive table with the scores and data is stored under the folder /hive/warehouse")
}

Просмотреть файл

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# put R code in users home directory
git clone -b spark --single-branch https://github.com/Microsoft/r-server-loan-credit-risk.git loans
cp loans/RSparkCluster/* /home/$1
chmod 777 /home/$1/*.R
rm -rf loans
sed -i "s/XXYOURSQLPW/$2/g" /home/$1/*.R
# Configure edge node as one-box setup for R Server Operationalization
/usr/local/bin/dotnet /usr/lib64/microsoft-r/rserver/o16n/9.1.0/Microsoft.RServer.Utils.AdminUtil/Microsoft.RServer.Utils.AdminUtil.dll -silentoneboxinstall "$2"