Initial checkin for performance tuning scripts.

This commit is contained in:
kganesan 2016-05-31 17:41:23 -07:00
Родитель ff36831bbd
Коммит fe8c2a1009
12 изменённых файлов: 1447 добавлений и 0 удалений

48
PerfTuning/README.md Normal file
Просмотреть файл

@ -0,0 +1,48 @@
# SQL Server R Services - Performance Test Samples
The sample runtest.R script provided in this repository demonstrates the benefits of following various tips for improving performance of R script running in SQL compute context.
The test script helps to evaluate the performance gains that can be reailized when following the various tips outlined in the SQL Server R Services Performance Tuning document (<a href="http://www.msdn.com">Click here</a>).
To run these tests, the user needs access to SQL Server 2016 with R services enabled. The user is also assumed to be familiar with some basic concepts of using SQL Server R Services.
The directory contents are as follows:
**Data**
> - **airline.xdf** <p>Contains 10M rows (compressed) of data from airline database using Microsoft R Server xdf format.</p> <br/>
> - **airline-cleaned-10M.7z**<p>A 7zip compressed csv file that has 10M rows of cleaned up airline data. Used for creating airline columnar table in the database.</p>
- createdb.sql <p>Creates the PerfTuning database if it does not exist already. The logged in user should have the permissions to create the database and tables.</p>
- creattables.R <p>Creates airline and airlineWithIntCol tables. These tables should be created first.</p>
- createtablewithindex.sql <p>Creates the airlineWithIndex table with an index. This script can be run after running creatables.R.</p>
- createcolumnartable.sql <p>Creates an airline columnar table. This script can be run after running creatableswithindex.sql.</p>
- createpagecompressedtable.sql <p>Creates an airline table with page compression enabled. This script can be run after running creatableswithindex.sql.</p>
- createrowcompressedtable.sql <p>Creates an airline table with row compression enabled. This script can be run after running creatableswithindex.sql.</p>
- createall.cmd <p>A script that creates all tables. Create PerfTuning DB before running this script. Also, ensure that RScript and sqlcmd.exe are in your path.</p>
- runtests.R <p>This is the main test driver that uses PerfTuning database. Runs many tests to see how certain steps taken affect the performance.</p>
- serialization.R <p>Contains helper functions to searialize/deserialize models stored in database.</p>
- scoring.sql <p>Performs batch prediction using CRANR predict method. This depends on the model generated by the lm method. Run SaveLMModel test to store the model in DB before running this script.</p>
<p>To run a .R script, use your favorite IDE, source the file, and then call the function(s) defined in that file.</p>
<p>To run a .sql script, ensure that you have sqlcmd.exe in your path and then invoke sqlcmd -i <sqlscriptfilename>.</p>
There are 6 tables that are needed in the database PerfTuning before the runtests.R can be used for running the tests. The attached scripts help to create them using data from xdf and csv files in the data folder.
Alternatively, the user can download the database files using the following links and skip creating them using the scripts mentioned above.
To create the database, download the 2 files from the links below into some folder. Restore the database from the downloaded location. Assuming they are downloaded to folder D:\sql, the following command can be used to restore them.
RESTORE DATABASE PerfTuning FROM DISK = 'D:\sql\PerfTuning1.bak', DISK = 'D:\sql\PerfTuning2.bak' WITH REPLACE;
> https://sqlrperftuning.blob.core.windows.net/perftuningdb/PerfTuning1.bak
> https://sqlrperftuning.blob.core.windows.net/perftuningdb/PerfTuning2.bak
**Steps**
> - Ensure that you have access to SQL Server 2016 with R Service enabled. You should also have permission to create or access the PerfTuning database.
> - Create the PerfTuning database and add tables using the attached R and SQL scripts. You can either run createall.cmd or run the other db and table creating scripts one by one. To expedite, you can skip running the scripts by downloading the database using links above and restoring.
> - Install the dependant RODBC package, if not installed. Ensure that it is installed in the right library where Microsoft RevoScaleR package was installed. (If you ran creatall.cmd, it will install this package. Update the file if lib path needs to be specified)
> - Update the runtests.R file to match your connection string and data directories.
> - To store models, the code depends on rdata table. Follow the insructions in serialization.R file. You should also enable power shell on the client machine.
> - To run the tests, open your R IDE and set the working directory to the one with the scripts. Source the file runtests.R and call runTests(testsToRun, "output", 1, 500000L)

21
PerfTuning/createall.cmd Normal file
Просмотреть файл

@ -0,0 +1,21 @@
@echo off
call :ensure_exists "DATA\airline10M.xdf" || exit /b 1
call :ensure_exists "DATA\airline-cleaned-10M.csv" || exit /b 1
sqlcmd -i createdb.sql
RScript --default-packages=methods createtables.R
sqlcmd -i createtablewithindex.sql
sqlcmd -i createcolumnartable.sql
sqlcmd -i createpagecompressedtable.sql
sqlcmd -i createrowcompressedtable.sql
sqlcmd -i createrdatatable.sql
REM Use -l option to give library path to where to install the package.
RScript -e "install.packages('RODBC', dep=TRUE)"
exit /b 0
:ensure_exists
if exist "%~1" exit /b 0
echo ERROR: %~1 does not exist
exit /b 1

Просмотреть файл

@ -0,0 +1,83 @@
-- Change file path of csv file below before running this script.
-- This script creates airlineColumnar table needed for running perf tuning tests.
use PerfTuning;
go
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = N'airlineColumnar')
BEGIN
PRINT 'dropping existing table airlineColumnar'
drop table [airlineColumnar];
END
go
create table [airlineColumnar] (
[Year] smallint,
[Month] tinyint,
[DayOfMonth] tinyint,
[DayOfWeek] tinyint,
[DepTime] real,
[CRSDepTime] real,
[ArrTime] real,
[CRSArrTime] real,
[UniqueCarrier] varchar(100),
[FlightNum] smallint,
[TailNum] varchar(100),
[ActualElapsedTime] smallint,
[CRSElapsedTime] smallint,
[AirTime] smallint,
[ArrDelay] smallint,
[DepDelay] smallint,
[Origin] char(3),
[Dest] char(3),
[Distance] smallint,
[TaxiIn] smallint,
[TaxiOut] smallint,
[Cancelled] bit,
[CancellationCode] varchar(100),
[Diverted] bit,
[CarrierDelay] smallint,
[WeatherDelay] smallint,
[NASDelay] smallint,
[SecurityDelay] smallint,
[LateAircraftDelay] smallint,
);
go
CREATE CLUSTERED COLUMNSTORE INDEX airline_clustered ON airlineColumnar;
GO
DELETE FROM airlineColumnar
PRINT 'bulk inserting data from airline-cleaned-10M.csv into airlineColumnar'
GO
bulk insert airlineColumnar
from 'E:\PerfTuning\Data\airline-cleaned-10M.csv'
with(
batchsize = 1000000, -- Reduce this value if there is memory issue
fieldterminator = ',',
keepnulls,
-- lastrow = 100,
firstrow = 2 -- Skip header
);
go
PRINT 'Inserting Data Done'
GO
ALTER TABLE airlineColumnar
ADD Late BIT
ALTER TABLE airlineColumnar
ADD CRSDepHour int
GO
PRINT 'computing Late and ArrDelay column values.'
GO
update airlineColumnar
set CRSDepHour=ROUND(CRSDepTime, 0, 1), Late=case when ArrDelay>15 then 1 else 0 end
GO
PRINT 'computing done'
GO
PRINT 'adding id column and rebuilding index'
GO
ALTER TABLE airlineColumnar
ADD id INT IDENTITY(1,1)
ALTER INDEX airline_clustered ON airlineColumnar
REBUILD
GO
PRINT 'rebuilding done'
GO

8
PerfTuning/createdb.sql Normal file
Просмотреть файл

@ -0,0 +1,8 @@
IF NOT EXISTS (SELECT name FROM master.sys.databases WHERE name = N'PerfTuning')
BEGIN
PRINT 'Creating Database PerfTuning'
CREATE DATABASE PerfTuning;
END
ELSE
PRINT 'Database PerfTuning already exists. Skipping creation.'
GO

Просмотреть файл

@ -0,0 +1,83 @@
-- This script creates airlineWithPageComp table needed for running perf tuning tests.
use PerfTuning;
go
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = N'airlineWithPageComp')
BEGIN
PRINT 'dropping existing table airlineWithPageComp'
drop table [airlineWithPageComp];
END
CREATE TABLE [dbo].[airlineWithPageComp](
[Year] [int] NULL,
[Month] [int] NULL,
[DayofMonth] [int] NULL,
[DayOfWeek] [bigint] NULL,
[FlightDate] [float] NULL,
[UniqueCarrier] [nvarchar](255) NULL,
[TailNum] [nvarchar](255) NULL,
[FlightNum] [nvarchar](255) NULL,
[OriginAirportID] [nvarchar](255) NULL,
[Origin] [nvarchar](255) NULL,
[OriginState] [nvarchar](255) NULL,
[DestAirportID] [nvarchar](255) NULL,
[Dest] [nvarchar](255) NULL,
[DestState] [nvarchar](255) NULL,
[CRSDepTime] [float] NULL,
[DepTime] [float] NULL,
[DepDelay] [int] NULL,
[DepDelayMinutes] [int] NULL,
[DepDel15] [bit] NULL,
[DepDelayGroups] [nvarchar](255) NULL,
[TaxiOut] [int] NULL,
[WheelsOff] [float] NULL,
[WheelsOn] [float] NULL,
[TaxiIn] [int] NULL,
[CRSArrTime] [float] NULL,
[ArrTime] [float] NULL,
[ArrDelay] [int] NULL,
[ArrDelayMinutes] [int] NULL,
[ArrDel15] [bit] NULL,
[ArrDelayGroups] [nvarchar](255) NULL,
[Cancelled] [bit] NULL,
[CancellationCode] [nvarchar](255) NULL,
[Diverted] [bit] NULL,
[CRSElapsedTime] [int] NULL,
[ActualElapsedTime] [int] NULL,
[AirTime] [int] NULL,
[Flights] [int] NULL,
[Distance] [int] NULL,
[DistanceGroup] [nvarchar](255) NULL,
[CarrierDelay] [int] NULL,
[WeatherDelay] [int] NULL,
[NASDelay] [int] NULL,
[SecurityDelay] [int] NULL,
[LateAircraftDelay] [int] NULL,
[MonthsSince198710] [int] NULL,
[DaysSince19871001] [int] NULL,
[rowNum] [int] NULL
);
GO
CREATE CLUSTERED INDEX simple_index ON airlineWithPageComp (rowNum);
GO
ALTER TABLE airlineWithPageComp REBUILD PARTITION = ALL
WITH (DATA_COMPRESSION = PAGE);
GO
ALTER TABLE airlineWithPageComp
ADD Late BIT
ALTER TABLE airlineWithPageComp
ADD CRSDepHour int
GO
PRINT 'inserting data from airlineWithIndex into airlineWithPageComp'
GO
INSERT INTO airlineWithPageComp SELECT * FROM airlineWithIndex
GO
PRINT 'inserting data done'
GO
PRINT 'rebuilding index after insert operations'
GO
ALTER INDEX simple_index ON airlineWithPageComp
REBUILD
GO
PRINT 'rebuilding done'
GO

Просмотреть файл

@ -0,0 +1,13 @@
-- This script creates rdata table needed for running perf tuning tests.
use PerfTuning;
go
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = N'rdata')
BEGIN
PRINT 'dropping existing table rdata'
drop table [rdata];
END
go
PRINT 'creating table rdata'
CREATE TABLE [rdata] ([key] varchar(900) primary key not null, [value] varbinary(max))
go

Просмотреть файл

@ -0,0 +1,83 @@
-- This script creates airlineWithRowComp table needed for running perf tuning tests.
use PerfTuning;
go
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = N'airlineWithRowComp')
BEGIN
PRINT 'dropping existing table airlineWithRowComp'
drop table [airlineWithRowComp];
END
CREATE TABLE [dbo].[airlineWithRowComp](
[Year] [int] NULL,
[Month] [int] NULL,
[DayofMonth] [int] NULL,
[DayOfWeek] [bigint] NULL,
[FlightDate] [float] NULL,
[UniqueCarrier] [nvarchar](255) NULL,
[TailNum] [nvarchar](255) NULL,
[FlightNum] [nvarchar](255) NULL,
[OriginAirportID] [nvarchar](255) NULL,
[Origin] [nvarchar](255) NULL,
[OriginState] [nvarchar](255) NULL,
[DestAirportID] [nvarchar](255) NULL,
[Dest] [nvarchar](255) NULL,
[DestState] [nvarchar](255) NULL,
[CRSDepTime] [float] NULL,
[DepTime] [float] NULL,
[DepDelay] [int] NULL,
[DepDelayMinutes] [int] NULL,
[DepDel15] [bit] NULL,
[DepDelayGroups] [nvarchar](255) NULL,
[TaxiOut] [int] NULL,
[WheelsOff] [float] NULL,
[WheelsOn] [float] NULL,
[TaxiIn] [int] NULL,
[CRSArrTime] [float] NULL,
[ArrTime] [float] NULL,
[ArrDelay] [int] NULL,
[ArrDelayMinutes] [int] NULL,
[ArrDel15] [bit] NULL,
[ArrDelayGroups] [nvarchar](255) NULL,
[Cancelled] [bit] NULL,
[CancellationCode] [nvarchar](255) NULL,
[Diverted] [bit] NULL,
[CRSElapsedTime] [int] NULL,
[ActualElapsedTime] [int] NULL,
[AirTime] [int] NULL,
[Flights] [int] NULL,
[Distance] [int] NULL,
[DistanceGroup] [nvarchar](255) NULL,
[CarrierDelay] [int] NULL,
[WeatherDelay] [int] NULL,
[NASDelay] [int] NULL,
[SecurityDelay] [int] NULL,
[LateAircraftDelay] [int] NULL,
[MonthsSince198710] [int] NULL,
[DaysSince19871001] [int] NULL,
[rowNum] [int] NULL
);
GO
CREATE CLUSTERED INDEX simple_index ON airlineWithRowComp (rowNum);
GO
ALTER TABLE airlineWithRowComp REBUILD PARTITION = ALL
WITH (DATA_COMPRESSION = ROW);
GO
ALTER TABLE airlineWithRowComp
ADD Late BIT
ALTER TABLE airlineWithRowComp
ADD CRSDepHour int
GO
PRINT 'inserting data from airlineWithIndex into airlineWithRowComp'
GO
INSERT INTO airlineWithRowComp SELECT * FROM airlineWithIndex
GO
PRINT 'inserting data done'
GO
PRINT 'rebuilding index after insert operations'
GO
ALTER INDEX simple_index ON airlineWithRowComp
REBUILD
GO
PRINT 'rebuilding done'
GO

82
PerfTuning/createtables.R Normal file
Просмотреть файл

@ -0,0 +1,82 @@
####################################################################################################
# This script can be used to create airline and airlineWithIntCol tables needed to run perf tuning tests.
####################################################################################################
# Set Database information
options(sqlConnString = "Driver=SQL Server;Server=.;Database=PerfTuning;Trusted_Connection=TRUE;")
sqlConnString <- getOption("sqlConnString") # required option
# Directory where the scripts and data are.
options(dir = "e:/perftuning")
dir <- getOption("dir")
if (!file.exists(dir))
{
stop( "dir does not exist");
}
dataDir <- file.path(dir, "Data")
if (!file.exists(dataDir))
{
stop( "dataDir does not exist");
}
# Indicate what tables to create
airline <- "airline" # Regular
airlineWithIntCol <- "airlineWithIntCol" # DaysOfWeek as int
airlineWithIndex <- "airlineWithIndex" # clustered id index added
airlineWithPageCompression <- "airlineWithPageCompression" # Compression Enabled
airlineWithRowCompression <- "airlineWithRowCompression"
airlineColumnar <- "airlineColumnar" # columnar table
# This script can create some tables. See .sql files to create other tables.
tables <- c(airline, airlineWithIntCol)
# Drop Tables before creating them.
# cat ("ConnString", sqlConnString, "\n")
for(tbl in tables) {
cat("dropping existing table ", tbl, "\n")
if (rxSqlServerTableExists(tbl, connectionString = sqlConnString))
{
rxSqlServerDropTable(tbl, connectionString = sqlConnString)
}
}
# Data Files
airlineXdfFile <- file.path(dataDir, "airline10M.xdf" )
airlineCleanedCSVFile <- file.path(dataDir, "airline-cleanded-10M.csv" ) # used for bulk insert into columnar table using TSQL script.
# XDF Objects
airlineXdf <- RxXdfData(airlineXdfFile)
airlineCleanedCSV <- RxTextData(airlineCleanedCSVFile) # Not used in this script
# SQL Sources
rowsPerRead = 500000
airlineTable <- RxSqlServerData(table=airline, connectionString = sqlConnString, rowsPerRead = rowsPerRead, verbose =1)
airlineTableWithIntCol <- RxSqlServerData(table=airlineWithIntCol, connectionString = sqlConnString, rowsPerRead = rowsPerRead, verbose =1)
varsToKeep <- c("CRSDepTime", "CRSArrTime", "CRSElapsedTime", "ArrTime", "Month", "Year", "DayOfWeek", "DayofMonth", "Origin", "Dest", "FlightNum", "ArrDelay", "DepDelay", "DepTime")
# Create the tables
if (airline %in% tables)
{
cat("creating table airline\n")
rxDataStep(inData = airlineXdf,
outFile = airlineTable,
#varsToKeep = varsToKeep,
rowsPerRead = rowsPerRead,
overwrite=TRUE,
reportProgress=1)
}
if (airlineWithIntCol %in% tables) {
cat("creating table airlineWithIntCol\n")
rxDataStep(inData = airlineXdf,
outFile = airlineTableWithIntCol, overwrite=TRUE,
transforms = list( DayOfWeek = as.integer(DayOfWeek), rowNum = .rxStartRow : (.rxStartRow + .rxNumRows - 1) ),
#varsToKeep = varsToKeep,
rowsPerRead = rowsPerRead,
reportProgress=1)
}
# see sql scripts for other tables

Просмотреть файл

@ -0,0 +1,87 @@
-- This script creates airlineWithIndex table needed for running perf tuning tests.
use PerfTuning;
go
IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = N'airlineWithIndex')
BEGIN
PRINT 'dropping existing table airlineWithIndex'
drop table [airlineWithIndex];
END
GO
CREATE TABLE [dbo].airlineWithIndex(
[Year] [int] NULL,
[Month] [int] NULL,
[DayofMonth] [int] NULL,
[DayOfWeek] [bigint] NULL,
[FlightDate] [float] NULL,
[UniqueCarrier] [nvarchar](255) NULL,
[TailNum] [nvarchar](255) NULL,
[FlightNum] [nvarchar](255) NULL,
[OriginAirportID] [nvarchar](255) NULL,
[Origin] [nvarchar](255) NULL,
[OriginState] [nvarchar](255) NULL,
[DestAirportID] [nvarchar](255) NULL,
[Dest] [nvarchar](255) NULL,
[DestState] [nvarchar](255) NULL,
[CRSDepTime] [float] NULL,
[DepTime] [float] NULL,
[DepDelay] [int] NULL,
[DepDelayMinutes] [int] NULL,
[DepDel15] [bit] NULL,
[DepDelayGroups] [nvarchar](255) NULL,
[TaxiOut] [int] NULL,
[WheelsOff] [float] NULL,
[WheelsOn] [float] NULL,
[TaxiIn] [int] NULL,
[CRSArrTime] [float] NULL,
[ArrTime] [float] NULL,
[ArrDelay] [int] NULL,
[ArrDelayMinutes] [int] NULL,
[ArrDel15] [bit] NULL,
[ArrDelayGroups] [nvarchar](255) NULL,
[Cancelled] [bit] NULL,
[CancellationCode] [nvarchar](255) NULL,
[Diverted] [bit] NULL,
[CRSElapsedTime] [int] NULL,
[ActualElapsedTime] [int] NULL,
[AirTime] [int] NULL,
[Flights] [int] NULL,
[Distance] [int] NULL,
[DistanceGroup] [nvarchar](255) NULL,
[CarrierDelay] [int] NULL,
[WeatherDelay] [int] NULL,
[NASDelay] [int] NULL,
[SecurityDelay] [int] NULL,
[LateAircraftDelay] [int] NULL,
[MonthsSince198710] [int] NULL,
[DaysSince19871001] [int] NULL,
[rowNum] [int] NULL
);
GO
CREATE CLUSTERED INDEX simple_index ON airlineWithIndex (rowNum);
GO
PRINT 'inserting data from airlineWithIntCol into airlineWithIndex'
GO
INSERT INTO airlineWithIndex SELECT * FROM airlineWithIntCol
GO
PRINT 'inserting data done'
GO
ALTER TABLE airlineWithIndex
ADD Late BIT
ALTER TABLE airlineWithIndex
ADD CRSDepHour int
GO
PRINT 'computing Late and ArrDelay column values.'
GO
update airlineWithIndex
set CRSDepHour=ROUND(CRSDepTime, 0, 1), Late=case when ArrDelay>15 then 1 else 0 end
GO
PRINT 'computing done'
GO
PRINT 'rebuilding index after insert operations'
GO
ALTER INDEX simple_index ON airlineWithIndex
REBUILD
GO
PRINT 'rebuilding done'
GO

716
PerfTuning/runtests.R Normal file
Просмотреть файл

@ -0,0 +1,716 @@
################################################################
## Title: Main test driver for performance tuning.
## Description:: Main R file to run tests to compare how various
## performance tuning tips can affect performance
## of running rx analytic functions in SQL compute
## context.
## Author: Microsoft
################################################################
library("RODBC") # Install this package if missing on the machine before running this script
source("serialization.R")
####################################################################################################
# Database information
####################################################################################################
options(sqlConnString = "Driver=SQL Server;Server=.; Database=PerfTuning;Trusted_Connection=TRUE;")
sqlConnString <- getOption("sqlConnString") # required option
####################################################################################################
# Directory where the scripts and data are. Update this before running test.
####################################################################################################
options(dir = "e:/perftuning")
dir <- getOption("dir")
if (!file.exists(dir))
{
stop( "dir does not exist");
}
dataDir <- file.path(dir, "Data")
if (!file.exists(dataDir))
{
stop( "dataDir does not exist");
}
####################################################################################################
# Helper function to return sql compute context.
####################################################################################################
getSqlComputeContext <- function(numTasks, sqlConnString) {
sqlShareDir <- paste("c:\\temp\\", Sys.getenv("USERNAME"), sep="")
dir.create(sqlShareDir, recursive = TRUE, showWarnings = FALSE)
sqlWait <- TRUE
sqlConsoleOutput <- TRUE
RxInSqlServer(connectionString = sqlConnString, shareDir = sqlShareDir,
wait = sqlWait, consoleOutput = sqlConsoleOutput, numTasks = numTasks)
}
####################################################################################################
# Helper function to process file generated by rx analytic functions when using reportProgress parameter.
####################################################################################################
processOutput = function(file.path) {
ff = file(file.path)
lines = readLines(con=ff)
close(ff)
perfStrs = c("Overall compute", "Total read", "Total transform")
res = NULL
# There could be multiple iterations - rxGlm for example. In that case, there are multiple
# lines of the same pattern.
for(ss in perfStrs) {
mm = grep(ss, lines)
if(NROW(mm) > 0) {
description = ""
total.time = 0
for(ii in mm) {
# Skip debugger lines that are not real output.
if (NROW(grep("Called from:", lines[ii])) > 0 ||
NROW(grep("debug at", lines[ii])) > 0) {
next
}
both.strs = strsplit(lines[ii], ": ")[[1]]
total.time = total.time + as.numeric(strsplit(both.strs[2], " ")[[1]][1])
description = both.strs[1]
}
res = rbind(res, data.frame(metric=description, time=total.time))
}
}
# Replace "Total read time for XX reads" with simply "Total read time"
aa = as.character(res[,1])
aa[grep("Total read", aa)] = "Total read time"
aa[grep("Overall compute", aa)] = "Overall compute time"
res[,1] = aa
return(res)
}
####################################################################################################
# Helper function to processing timings data.
####################################################################################################
processTimings = function(testResult) {
# drop the first run and average the results.
# Columns are: metric time1 time2 ...timeN. We are dropping time1 column.
origRes <- testResult$output
avgTime <- testResult$avgTime
res <- origRes[,c(1,3:NCOL(origRes))]
res <- cbind(res[,1,drop=F], rowMeans(res[,-1,drop=F]))
colnames(res) <- c("metric","time")
# add the total time row
res <- rbind(data.frame(metric="Total time",time=as.numeric(avgTime)), res)
# Add the transition time. Currently it's total time minus analytic time.
tt <- grep("Overall compute", res[,1])
if(NROW(tt) != 1) {
stop("Wrong number of matches [for Overall compute]!")
}
transition.time <- avgTime - res[tt[1],2]
res <- rbind(res, data.frame(metric="Transition time",time=transition.time))
# Add the total processing time. Currently: Analytic Time minus IO Time
yy <- grep("Total read", res[,1])
if(NROW(yy) != 1) {
stop("Wrong number of matches [for Total read]!")
}
nonio.time <- res[tt[1],2] - res[yy[1],2]
res <- rbind(res, data.frame(metric="Total non IO time",time=nonio.time))
# add pct column
pct <- round(res[,2]/res[1,2]*100,2)
res <- cbind(res, pct)
return(res)
}
####################################################################################################
# Simple helper to get current time in readable format
####################################################################################################
getNowTimeString <- function() {
format(Sys.time(), "%m/%d/%Y %I:%M:%S %p")
}
####################################################################################################
# Simple helper to get dataframe from RxSqlServerData for use with lm, glm functions
####################################################################################################
getDataFrameFromDataSource <- function(dataSource, tableName) {
data <- NULL
if (inherits(dataSource, "RxSqlServerData")) {
attr = attributes(dataSource)
if (!("sqlQuery" %in% names(attr))) {
return(NULL)
}
query = attr$sqlQuery
sqlConnString <- getOption("sqlConnString")
if (!is.null(attr$connectionString) && !is.na(attr$connectionString)) {
sqlConnString = attr$connectionString
}
ch <- odbcDriverConnect(sqlConnString)
on.exit(odbcClose(ch))
data <- sqlQuery(ch, query)
}
if (!is.null(data)) {
cols <- names(data)
if ("DayOfWeek" %in% cols) {
if (tableName == "airline") {
levels <- c("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday")
}
else {
levels = as.character(1:7)
}
data$DayOfWeek <- factor(data$DayOfWeek, levels)
}
}
return(data)
}
cleanupModel = function(m) {
m$family$variance = c()
m$family$dev.resids = c()
m$family$aic = c()
m$family$validmu = c()
m$family$simulate = c()
m$y = c()
m$model = c()
m$residuals = c()
m$fitted.values = c()
m$effects = c()
m$qr$qr = c()
m$linear.predictors = c()
m$weights = c()
m$prior.weights = c()
m$data = c()
attr(m$terms,".Environment") = c()
attr(m$formula,".Environment") = c()
m
}
####################################################################################################
# Driver function to run one single test with given parameters.
# Returns output timings, average time, and the analytic function output (such as model generated).
####################################################################################################
runTest = function(resultDir, analytic="rxLinMod", formula, dataSource, transformFunc, transformVars, runs=3, cube=F, maxDepth=1, ...)
{
# The analytic has to be passed as a character string
if(!is.character(analytic)) {
stop("Analytic must be a character string ['rxLinMod' for rxLinMod]!")
}
suppressWarnings(dir.create(resultDir)) # Make Sure timestamp Folder exists
N = runs
set.seed(1)
runId = 1
total.times = rep(0, N+1)
max.time = -1
combined.output <- NULL
analyticsResult <- "undefined"
for (i in 0:N) {
invisible(gc())
# The output file
baseName <- paste(".", analytic, ".", format(Sys.time(), "%Y%m%d-%H%M%OS6"), sep = "")
output.file = file.path(resultDir, paste("output", baseName, ".txt", sep=""))
analyticsResult <- "undefined"
# Redirect the output
sink(output.file)
# Run the test
tryCatch(
{
if (analytic == "rxLinMod") {
time = system.time(
analyticsResult <- rxLinMod(
formula = formula,
transformFunc = transformFunc,
transformVars = transformVars,
data = dataSource,
cube = cube,
reportProgress = 3
)
)[3]
} else if (analytic == "rxGlm") {
time = system.time(
analyticsResult <- rxGlm(
formula = formula,
transformFunc = transformFunc,
transformVars = transformVars,
data = dataSource,
reportProgress = 3
)
)[3]
} else if (analytic == "rxLogit") {
time = system.time(
analyticsResult <- rxLogit(
formula = formula,
transformFunc = transformFunc,
transformVars = transformVars,
data = dataSource,
reportProgress = 3
)
)[3]
} else if (analytic == "rxDTree") {
time = system.time(
analyticsResult <- rxDTree(
formula = formula,
transformFunc = transformFunc,
transformVars = transformVars,
data = dataSource,
maxDepth = maxDepth,
reportProgress = 3
)
)[3]
} else if (analytic == "lm" || analytic == "glm") {
# lm tests use only airlineWithIndex table only.
ioTime = system.time(dataFrame <- getDataFrameFromDataSource(dataSource, "airlineWithIndex"))[3]
if (!is.null(dataFrame)) {
if (analytic == "Glm") {
computeTime = system.time(
output <- glm(formula = formula, data=dataFrame))[3]
} else {
computeTime = system.time(
output <- lm(formula = formula, data=dataFrame))[3]
}
} else {
warning(paste("dataFrame produced null for ", analytic, sep=""))
computeTime <- 0
}
ioTime <- as.numeric(ioTime)
computeTime <- as.numeric(computeTime)
time <- ioTime + computeTime
# processOutput call below is strict. We need <desc>: <time> with one space after :
cat("Overall compute time:", time, "\n")
cat("Total read:", ioTime, "\n")
cat("Total loop time:", computeTime, "\n")
cat("Total transform: 0\n")
cat("Total process data time:", time, "\n")
analyticsResult <- output
} else {
cat("Analytic function supplied is not implemented.\n")
return(NULL)
}
},
finally = sink()
);
# Process the output
oo = processOutput(output.file)
combined.output = if (is.null(combined.output))
oo
else
cbind(combined.output, oo[,2])
total.times[i + 1] = time
if (i > 0 && max.time < time[[1]]) {
max.time <- time[[1]] # keep track of max of all times except the very first one.
}
if (printEachTestRuntime) {
cat(" run ", runId, " took ", time, " seconds", "\n")
}
runId = runId + 1
}
# Don't account for the first run
averageTime = (sum(tail(total.times,-1)) - max.time)/(N-1)
n.col = NCOL(combined.output)
colnames(combined.output)[2:n.col] = paste("time",1:(n.col-1),sep="")
return(list(output=combined.output,avgTime=averageTime,analyticsResult=analyticsResult))
}
####################################################################################################
# Simple helper to get compute column info from xdf file to deal with factors for sql data source.
####################################################################################################
getColInfoForSqlDataSource <- function(rxXdfData, columns, treatFactorColAsInt = FALSE)
{
oldComputeContext <- rxGetComputeContext()
on.exit(rxSetComputeContext(oldComputeContext))
rxSetComputeContext ("local") # Work around for rxGetVarInfo not returning factor level when cc is not local.
rxXdfDataInfo <- rxGetVarInfo(rxXdfData)
result <- list()
for(col in columns)
{
factorLevels <- rxXdfDataInfo[[col]]$levels
if (treatFactorColAsInt)
{
result[[col]] = list(type="factor", levels = as.character(c(1:length(factorLevels))), newLevels = factorLevels)
}
else
{
result[[col]] = list(type="factor", levels = factorLevels)
}
}
result
}
####################################################################################################
# Main driver function to run a series of tests and produce resutls in given output folder.
# numTasks controls the number of processes requested for the SQL compute context.
# rowsPerRead controls the number of rows to read per batch processing by the analytic function.
####################################################################################################
runTests = function(testsToRun, outputFolder, numTasks, rowsPerRead=500000L)
{
oldComputeContext <- rxGetComputeContext()
on.exit(rxSetComputeContext(oldComputeContext))
resultDir <- file.path(resultsFolderDir, outputFolder)
sqlcc <- getSqlComputeContext(numTasks, sqlConnString)
rxSetComputeContext(sqlcc)
cat ("Running tests with SQL compute context. numTasks = ", numTasks, " rowsPerRead = ", rowsPerRead, "\n")
if ("FactorCol" %in% testsToRun) {
colInfo <- getColInfoForSqlDataSource(airlineXdf, c("DayOfWeek"), FALSE)
formula <- ArrDelay ~ CRSDepTime + DayOfWeek
airlineTable <- RxSqlServerData(table="airline", connectionString = sqlConnString, colInfo = colInfo)
cat("\nRunning FactorCol Test. Using airline table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineTable, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("IntCol" %in% testsToRun) {
formula <- ArrDelay ~ CRSDepTime + DayOfWeek
airlineWithIntCol <- RxSqlServerData(table="airlineWithIntCol", connectionString = sqlConnString)
cat("\nRunning IntCol Test. Using airlineWithIntCol table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithIntCol, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("NoCompression" %in% testsToRun) {
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineWithIndex <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek] FROM airlineWithIndex", rowsPerRead = rowsPerRead,
connectionString = sqlConnString)
cat("\nRunning NoCompression Test. Using airlineWithIndex table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithIndex, transformFunc = function(data) {
data$Late = data$ArrDelay > 15
data$CRSDepHour = as.integer(trunc(data$CRSDepTime))
return(data)
}, transformVars = c("ArrDelay","CRSDepTime"), runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("PageCompression" %in% testsToRun) {
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineWithPageComp <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek] FROM airlineWithPageComp", rowsPerRead = rowsPerRead, connectionString = sqlConnString)
cat("\nRunning PageCompression Test. Using airlineWithPageComp table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithPageComp, transformFunc = function(data) {
data$Late = data$ArrDelay > 15
data$CRSDepHour = as.integer(trunc(data$CRSDepTime))
return(data)
}, transformVars = c("ArrDelay","CRSDepTime"), runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("RowCompression" %in% testsToRun) {
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineWithRowComp <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek] FROM airlineWithRowComp", rowsPerRead = rowsPerRead, connectionString = sqlConnString)
cat("\nRunning RowCompression Test. Using airlineWithRowComp table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithRowComp, transformFunc = function(data) {
data$Late = data$ArrDelay > 15
data$CRSDepHour = as.integer(trunc(data$CRSDepTime))
return(data)
}, transformVars = c("ArrDelay","CRSDepTime"), runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("WithTransformation" %in% testsToRun) {
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineWithIndex <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek] FROM airlineWithIndex", connectionString = sqlConnString)
cat("\nRunning WithTransformation Test. Using airlineWithIndex table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithIndex, transformFunc = function(data) {
data$Late = data$ArrDelay > 15
data$CRSDepHour = as.integer(trunc(data$CRSDepTime))
return(data)
}, transformVars = c("ArrDelay","CRSDepTime"), runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("WithoutTransformation" %in% testsToRun) {
# The query used in this test depends on the columns Late and CRSDepHour to exist in the table.
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineWithIndex <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek], [Late], [CRSDepHour] FROM airlineWithIndex", connectionString = sqlConnString)
cat("\nRunning WithoutTransformation Test. Using airlineWithIndex table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithIndex, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("RowStore" %in% testsToRun) {
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineWithIndex <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek], [Late], [CRSDepHour] FROM airlineWithIndex", connectionString = sqlConnString)
cat("\nRunning RowStore Test. No transformation. Using airlineWithIndex table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineWithIndex, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("ColStore" %in% testsToRun) {
formula = Late ~ CRSDepTime + DayOfWeek + CRSDepHour:DayOfWeek
airlineColumnar <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek], [Late], [CRSDepHour] FROM airlineColumnar", connectionString = sqlConnString)
cat("\nRunning ColStore Test. No transformation. Using airlineColumnar table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airlineColumnar, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
if ("CubeArgEffect" %in% testsToRun) {
formula <- ArrDelay ~ Origin:DayOfWeek + Month + DayofMonth + CRSDepTime
colInfo <- getColInfoForSqlDataSource(airlineXdf, c("Origin", "DayOfWeek"))
airline <- RxSqlServerData(sqlQuery="select [ArrDelay],[Origin],[DayOfWeek], [Month],[DayofMonth],[CRSDepTime] FROM airline",
colInfo = colInfo,
connectionString = sqlConnString)
cat("\nRunning CubeArgEffect Test with Cube = F. Using airline table.", "\n")
testResult1 <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airline, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult1$avgTime, "\n")
times <- processTimings(testResult1)
if (printTimeSplits) {
print(times)
}
cat("\nRunning CubeArgEffect Test with Cube = T. Using airline table.", "\n")
testResult2 <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airline, transformFunc = NULL, transformVars = NULL, runs=5, cube=T)
cat("Average Time: ", testResult2$avgTime, "\n")
times <- processTimings(testResult2)
if (printTimeSplits) {
print(times)
}
# Predict with both models.
savedComputeContext <- rxGetComputeContext()
rxSetComputeContext ("local")
df <- data.frame(DayOfWeek=factor("Thur",colInfo$DayOfWeek$levels),
CRSDepTime=9,
Origin=factor("JFK", colInfo$Origin$levels),
Month=10,
DayofMonth=1)
pred1 <- rxPredict(testResult1$analyticsResult, df)
pred2 <- rxPredict(testResult2$analyticsResult, df)
cat("Predict using model from Cube=F\n")
print(pred1)
cat("Predict using model from Cube=T\n")
print(pred2)
rxSetComputeContext(savedComputeContext)
}
if ("SaveModel" %in% testsToRun) {
formula = ArrDelay ~ Origin:DayOfWeek + Month + DayofMonth + CRSDepTime
airline <- RxSqlServerData(sqlQuery="select [ArrDelay],[Origin],[DayOfWeek], [Month],[DayofMonth],[CRSDepTime] FROM airline",
colInfo = getColInfoForSqlDataSource(airlineXdf, c("Origin", "DayOfWeek")),
connectionString = sqlConnString)
cat("\nRunning SaveModel Test. Using airline table.", "\n")
testResult <- runTest(resultDir, "rxLinMod", formula = formula, dataSource = airline, transformFunc = NULL, transformVars = NULL, runs=5, cube=T)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
odbcDS <- RxOdbcData(table = "rdata", connectionString = sqlConnString, useFastRead=TRUE)
rxOpen(odbcDS, "w")
rxExecuteSQLDDL(odbcDS, sSQLString = paste("delete from [rdata] where [key] = 'linmod.model.1'", sep=""))
rxClose(odbcDS)
time = system.time({
dbSaveRDS(sqlConnString, "linmod.model.1", testResult$analyticsResult)
})[3]
cat ("Time to save model = ", time, "\n")
}
if ("LoadModelAndPredict" %in% testsToRun) {
savedComputeContext <- rxGetComputeContext()
rxSetComputeContext ("local")
cat("\nRunning LoadModelAndPredict Test. One row prediction. Using airline table and local compute context.", "\n")
time = system.time({
mm <- dbReadRDS(sqlConnString, "linmod.model.1")
# Prepare the data for single row scoring
colInfo <- getColInfoForSqlDataSource(airlineXdf, c("Origin", "DayOfWeek"))
df <- data.frame(DayOfWeek=factor("Thur",colInfo$DayOfWeek$levels),
CRSDepTime=9,
Origin=factor("JFK", colInfo$Origin$levels),
Month=10,
DayofMonth=1)
pred <- rxPredict(mm, df)
cat ("prediction: ArrayDelay = ", pred[1,], "\n")
rxSetComputeContext(savedComputeContext)
})[3]
cat("Time to load and predict: ", time, "\n")
}
if ("TreeDepthEffect" %in% testsToRun) {
formula = ArrDelay ~ CRSDepTime + DayOfWeek
airlineColumnar <- RxSqlServerData(sqlQuery="select [ArrDelay],[CRSDepTime],[DayOfWeek] FROM airlineColumnar", connectionString = sqlConnString)
cat("\nRunning TreeDepthEffect Test. No transformations. Using airlineColumnar table.", "\n")
for(depth in c(1,2,4,8,16)) {
cat("Tree Depth: ", depth, "\n")
testResult <- runTest(resultDir, "rxDTree", formula = formula, dataSource = airlineColumnar, transformFunc = NULL, transformVars = NULL, runs=5, maxDepth=depth)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
}
}
if ("SaveLMModel" %in% testsToRun) {
formula = ArrDelay ~ CRSDepTime + DayOfWeek
# Train using first 9M rows. Predict using the last 1M rows.
airline <- RxSqlServerData(sqlQuery="select [ArrDelay],[DayOfWeek], [CRSDepTime] FROM airlineWithIndex WHERE rowNum <= 9000000", connectionString = sqlConnString)
cat("\nRunning SaveLMModel Test. Using airlineWithIndex table.", "\n")
testResult <- runTest(resultDir, "lm", formula = formula, dataSource = airline, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
odbcDS <- RxOdbcData(table = "rdata", connectionString = sqlConnString, useFastRead=TRUE)
rxOpen(odbcDS, "w")
rxExecuteSQLDDL(odbcDS, sSQLString = paste("delete from [rdata] where [key] = 'lm.model.1'", sep=""))
rxClose(odbcDS)
lmModel <- testResult$analyticsResult
# lm model is quite big and contains data. So, we cannot store as it is in DB. We need to clean up unneeded fields.
# It is the beyond the scope of this script to optimize the lm models. We will clean minimal information to get predict to work.
m <- cleanupModel(lmModel)
lmModel$fitted.values <- NULL
lmModel$effects <- NULL
lmModel$model <- NULL
lmModel$residuals <- NULL
time = system.time({
dbSaveRDS(sqlConnString, "lm.model.1", m)
})[3]
cat ("Time to save model = ", time, "\n")
}
if ("LoadLMModelAndPredict" %in% testsToRun) {
# This test demonstrats how we can load the saved model and do prediction. Here, we pull the data from SQL and predict.
# See the script scoring.sql to see how TSQL can be used to predict using trivial parallelism.
savedComputeContext <- rxGetComputeContext()
rxSetComputeContext ("local")
cat("\nRunning LoadLMModelAndPredict Test. 1M row prediction. Using airlineWithIndex table and local compute context.", "\n")
time = system.time({
mm <- dbReadRDS(sqlConnString, "lm.model.1")
# Prepare the data for last 1M rows. Note that the
airline <- RxSqlServerData(sqlQuery="select [ArrDelay],[DayOfWeek], [CRSDepTime] FROM airlineWithIndex WHERE rowNum > 9000000", connectionString = sqlConnString)
df <- getDataFrameFromDataSource(airline, "airlineWithIndex") # coverts sql data into df by using RODBC.
#df <- data.frame(DayOfWeek=factor(1, as.character(1:7)), CRSDepTime=9)
pred <- predict(mm, df)
rxSetComputeContext(savedComputeContext)
})[3]
cat("Time to load and predict: ", time, "\n")
}
if ("SaveGlmModel" %in% testsToRun) {
formula = ArrDelay ~ CRSDepTime + DayOfWeek
# Train using first 9M rows. Predict using the last 1M rows.
airline <- RxSqlServerData(sqlQuery="select [ArrDelay],[DayOfWeek], [CRSDepTime] FROM airlineWithIndex WHERE rowNum <= 9000000", connectionString = sqlConnString)
cat("\nRunning SaveGlmModel Test. Using airlineWithIndex table.", "\n")
testResult <- runTest(resultDir, "glm", formula = formula, dataSource = airline, transformFunc = NULL, transformVars = NULL, runs=5)
cat("Average Time: ", testResult$avgTime, "\n")
times <- processTimings(testResult)
if (printTimeSplits) {
print(times)
}
odbcDS <- RxOdbcData(table = "rdata", connectionString = sqlConnString, useFastRead=TRUE)
rxOpen(odbcDS, "w")
rxExecuteSQLDDL(odbcDS, sSQLString = paste("delete from [rdata] where [key] = 'glm.model.1'", sep=""))
rxClose(odbcDS)
lmModel <- testResult$analyticsResult
# lm model is quite big and contains data. So, we cannot store as it is in DB. We need to clean up unneeded fields.
# It is the beyond the scope of this script to optimize the lm models. We will clean minimal information to get predict to work.
m <- cleanupModel(lmModel)
lmModel$fitted.values <- NULL
lmModel$effects <- NULL
lmModel$model <- NULL
lmModel$residuals <- NULL
time = system.time({
dbSaveRDS(sqlConnString, "glm.model.1", m)
})[3]
cat ("Time to save model = ", time, "\n")
}
if ("LoadGlmModelAndPredict" %in% testsToRun) {
savedComputeContext <- rxGetComputeContext()
rxSetComputeContext ("local")
cat("\nRunning LoadGlmModelAndPredict Test. 1M row prediction. Using airlineWithIndex table and local compute context.", "\n")
time = system.time({
mm <- dbReadRDS(sqlConnString, "glm.model.1")
# Prepare the data for last 1M rows
airline <- RxSqlServerData(sqlQuery="select [ArrDelay],[DayOfWeek], [CRSDepTime] FROM airlineWithIndex WHERE rowNum > 9000000", connectionString = sqlConnString)
df <- getDataFrameFromDataSource(airline, "airlineWithIndex") # coverts sql data into df by using RODBC.
#df <- data.frame(DayOfWeek=factor(1, as.character(1:7)), CRSDepTime=9)
pred <- predict(mm, df)
rxSetComputeContext(savedComputeContext)
})[3]
cat("Time to load and predict: ", time, "\n")
}
}
####################################################################################################
# Set some global variables to simplify running the tests.
####################################################################################################
resultsFolderDir <- file.path(dir, "Results")
suppressWarnings(dir.create(resultsFolderDir))
airlineXdfFile <- file.path(dataDir, "Airline10M.xdf" )
airlineXdf <- RxXdfData(airlineXdfFile)
####################################################################################################
# Set some global flags to control what gets printed when running the tests.
####################################################################################################
printEachTestRuntime <- T # Prints run time for each test
printTimeSplits <- F # Print the time for IO/Transition/Compute.
####################################################################################################
# Set the global to contain all the tests to run.
####################################################################################################
testsToRun <- c("FactorCol", "IntCol", "NoCompression", "PageCompression", "RowCompression", "WithTransformation",
"WithoutTransformation", "RowStore", "ColStore", "CubeArgEffect", "SaveModel", "LoadModelAndPredict",
"TreeDepthEffect", "SaveLMModel", "LoadLMModelAndPredict", "SaveGlmModel", "LoadGlmModelAndPredict")
cat("call runTests(testsToRun, 'Run1', 1, 500000L) to run tests.\n")
#runTests(testsToRun, "Run1", 1, 500000L)
# Start with local compute context. The test driver should restore to this context when it returns.
rxSetComputeContext ("local")

23
PerfTuning/scoring.sql Normal file
Просмотреть файл

@ -0,0 +1,23 @@
-- Before running script, the model needs to be stored in the database. Run SaveLMModel test using runtests.R
-- Also, the table airlineWithIndex should exist with 10 M rows of data.
-- This code uses trivial parallelism to speed up prediction of 1M rows.
use PerfTuning;
declare @just_model varbinary(max);
select @just_model = [value] from [rdata] where [key] = 'lm.model.1';
declare @pred float;
exec sp_execute_external_script
@language = N'R',
@script = N'
# Prepare the data for single row scoring
InputDataSet[,"DayOfWeek"] <- factor(InputDataSet[,"DayOfWeek"], levels=as.character(1:7))
mm <- unserialize(as.raw(model_param))
# Predict
OutputDataSet <- data.frame(pred=predict(mm, InputDataSet))
',
@input_data_1 = N'SELECT [ArrDelay],[DayOfWeek], [CRSDepTime] FROM airlineWithIndex WHERE rowNum > 9000000',
@parallel = 1,
@params = N'@model_param varbinary(max)',
@model_param = @just_model
with result sets ((pred float));

200
PerfTuning/serialization.R Normal file
Просмотреть файл

@ -0,0 +1,200 @@
# A collection of functions useful to serialize/unserialize R objects
# into/from a database. These functions are modeled after well known
# R functions:
#
# - dbReadRDS/dbSaveRDS: modeled after readRDS/saveRDS
# - dbLoad/dbSave: modeled after load/save
#
# For these functions to work, powershell needs to be installed. It's
# installed on all Windows clients by default, might be missing only
# on Windows servers. Also, scripts should be allowed to execute:
#
# Set-ExecutionPolicy Unrestricted
#
# The table, named "rdata" by default, used to store the R objects must
# have at least two columns:
# - one character column, named "key" by default
# - one varbinary column, named "value" by default
#
# The table name, as well as the names of the two columns can be changed
# using the additional parameters.
#
# To create the table one can use the following query:
# create table [rdata] ([key] varchar(999) primary key not null, [value] varbinary(max))
#
# To remove a row from the table:
# delete from [rdata] where [key] = 'mykey'
#
# The following functions are useful to run SQL from R:
# - rxExecuteSQLDDL - to create a table for example, or to delete some rows
# - rxSqlServerTableExists - to check whether a table exists
# - rxSqlServerDropTable - to drop a table
# - rxOpen/rxClose - these are needed when working with the above
# Removes "Driver" from the rx connection string - powershell frawns on it.
# Author; Microsoft
convertConnectionString <- function(conStr) {
parts <- strsplit(strsplit(conStr, split=";")[[1]], split="=")
keys <- tolower(sapply(parts, "[", 1))
vals <- sapply(parts, "[", 2)
ff <- grepl("driver", keys)
res <- paste(paste(keys[!ff], vals[!ff], sep="="), collapse=";")
res
}
# A lower level API shared by dbSaveRDS and dbSave.
#
# Builds and executes a powershell script to save a binary
# (given as a string file path), into the database.
dbSaveRaw <- function(connectionString, key, filePath, table="rdata", keyColumn="key", objectColumn="value") {
queryLine <- paste("$query = 'insert into [", table, "]([", keyColumn, "],[", objectColumn, "]) values (@key,@value)'", sep="")
psScript <- paste(
"[CmdletBinding()]",
"Param(",
"[Parameter()]",
"[string]$ConnectionString,",
"[string]$Key,",
"[string]$File)",
"[byte[]]$dd = [System.IO.File]::ReadAllBytes($File)",
"$con = new-object System.Data.SqlClient.SqlConnection($ConnectionString)",
"$res = $con.Open()",
queryLine,
"$cmd = new-object System.Data.SqlClient.SqlCommand($query, $con)",
"$res = $cmd.Parameters.Add('@key', [System.Data.SqlDbType]'VarChar')",
"$res = $cmd.Parameters.Add('@value', [System.Data.SqlDbType]'VarBinary')",
"$res = $cmd.Parameters['@key'].Value = $Key",
"$res = $cmd.Parameters['@value'].Value = $dd",
"$res = $cmd.ExecuteNonQuery()",
"$res = $con.Close()",
sep="\n")
scriptFile <- tempfile(fileext=".ps1")
on.exit(unlink(scriptFile))
cat(psScript, file=scriptFile)
psCmd <- paste(
scriptFile,
" -Key ",
key,
" -File ",
filePath,
" -ConnectionString ",
"'", convertConnectionString(connectionString), "'",
sep="")
shell(psCmd, shell="powershell.exe")
}
# A lower level API shared by dbReadRDS and dbLoad.
#
# Builds and executes a powershell script to read a binary
# from the database and store it into a local file.
dbReadRaw <- function(connectionString, key, filePath, table="rdata", keyColumn="key", objectColumn="value") {
queryLine <- paste("$query = 'select [", objectColumn, "] from [", table, "] where [", keyColumn, "] = @key'", sep="")
psScript <- paste(
"[CmdletBinding()]",
"Param(",
"[Parameter()]",
"[string]$ConnectionString,",
"[string]$Key,",
"[string]$File)",
"$con = new-object System.Data.SqlClient.SqlConnection($ConnectionString)",
"$res = $con.Open()",
queryLine,
"$cmd = new-object System.Data.SqlClient.SqlCommand($query, $con)",
"$res = $cmd.Parameters.Add('@key', [System.Data.SqlDbType]'VarChar')",
"$res = $cmd.Parameters['@key'].Value = $Key",
"[byte[]]$dd = $cmd.ExecuteScalar()",
"$res = $con.Close()",
"$res = [IO.File]::WriteAllBytes($File, $dd)",
sep="\n")
scriptFile <- tempfile(fileext=".ps1")
on.exit(unlink(scriptFile))
cat(psScript, file=scriptFile)
psCmd <- paste(
scriptFile,
" -Key ",
key,
" -File ",
filePath,
" -ConnectionString ",
"'", convertConnectionString(connectionString), "'",
sep="")
tt <- try(shell(psCmd, shell="powershell.exe"), silent=T)
}
# The dbSaveRDS and dbReadRDS functions are the database equivalents of R's
# readRDS/saveRDS APIs.
# The database equivalent of saveRDS. Serializes an R object and saves it into
# a varbinary column in the database. The row to insert the key is identified by
# a character string (key).
# The table name, the name of the key column and the name of the object column can
# be controlled by the table, keyColumn and objectColumn parameters.
dbSaveRDS <- function(connectionString, key, object, table="rdata", keyColumn="key", objectColumn="value", compress=F) {
rawCon <- tempfile()
on.exit(unlink(rawCon))
saveRDS(object, rawCon, compress=compress)
dbSaveRaw(connectionString, key, filePath=rawCon, table=table, keyColumn=keyColumn, objectColumn=objectColumn)
}
dbReadRDS <- function(connectionString, key, table="rdata", keyColumn="key", objectColumn="value") {
rawCon <- tempfile()
on.exit(unlink(rawCon))
tt <- try(dbReadRaw(connectionString, key, rawCon, table=table, keyColumn=keyColumn, objectColumn=objectColumn), silent=T)
res <- if(inherits(tt, "try-error")) NULL else readRDS(rawCon)
return(res)
}
# The dbSave and dbLoad functions are the database equivalents of R's
# save/load APIs.
# The database equivalent of "save". Serializes a group of R objects and saves them
# into a varbinary column in the database. The row to insert the key is identified by
# a character string (key).
#
# The table name, the name of the key column and the name of the object column can
# be controlled by the table, keyColumn and objectColumn parameters.
dbSave <- function(connectionString, key, ..., table="rdata", keyColumn="key", objectColumn="value", compress=F) {
objectNames <- as.character(substitute(list(...)))[-1]
rawCon <- tempfile()
on.exit(unlink(rawCon))
save(list=objectNames, file=rawCon, compress=compress)
dbSaveRaw(connectionString, key, filePath=rawCon, table=table, keyColumn=keyColumn, objectColumn=objectColumn)
}
dbLoad <- function(connectionString, key, table="rdata", envir=parent.frame(), keyColumn="key", objectColumn="value") {
rawCon <- tempfile()
on.exit(unlink(rawCon))
tt <- try(dbReadRaw(connectionString, key, rawCon, table=table, keyColumn=keyColumn, objectColumn=objectColumn), silent=T)
if(!inherits(tt, "try-error")) {
load(file=rawCon, envir=envir)
}
}