Rewrote SparkR data manipulation

This commit is contained in:
yueguoguo 2017-05-19 22:22:19 +09:00
Родитель fb68490522
Коммит 79bc4a23c3
1 изменённых файлов: 40 добавлений и 23 удалений

Просмотреть файл

@ -6,12 +6,15 @@
# libraries to use.
if(!require("devtools")) install.packages("devtools")
devtools::install_github("Microsoft/AzureSMR")
library(AzureSMR)
library(SparkR)
library(RevoScaleR)
library(dplyr)
library(magrittr)
library(readr)
library(AzureSMR)
library(jsonlite)
# data sources.
@ -37,6 +40,7 @@ RevoScaleR::rxDataStep(inData=data_xdf_path,
# initialize spark session.
sc <- SparkR::sparkR.session(
master="local[*]",
sparkPackages="com.databricks:spark-csv_2.10:1.3.0"
)
@ -45,47 +49,60 @@ SparkR::setLogLevel("OFF")
# data preparation - convert csv to parquet.
sdf_air <- SparkR::read.df(data_csv_path,
source="com.databricks.spark.csv",
header="true",
inferSchema="true")
source="com.databricks.spark.csv",
header="true",
inferSchema="true")
SparkR::cache(sdf_air) # cache into memory.
SparkR::count(sdf_air) # count number of rows.
SparkR::write.df(df=sdf_air,
path=data_parquet_path,
"parquet",
"overwrite")
path=data_parquet_path,
"parquet",
"overwrite")
sdf_air <- SparkR::read.df(data_parquet_path, source="parquet") # write data into parquet format.
SparkR::printSchema(sdf_air)
# create a SQL context for manipulating the Spark data frames.
# filter flights that are not cancelled.
SparkR::createOrReplaceTempView(sdf_air, "air")
stable_air <- SparkR::sql("SELECT a.DayofMOnth, a.DayOfWeek, a.Origin, a.Dest, a.DepTime, a.ArrDel15 FROM air a")
SparkR::createOrReplaceTempView(stable_air, "stable_air")
sdf_air <- SparkR::filter(sdf_air, sdf_air$Cancelled == FALSE)
head(SparkR::sql("show tables"))
# for demonstration purpose, only a few indicators are selected.
# sample 30% data from all.
sdf_air <- SparkR::select(sdf_air,
"DayofMonth",
"DayOfWeek",
"Dest",
"Origin",
"DepTime",
"ArrDel15")
sdf_air_sampled <- SparkR::sample(stable_air,
withReplacement=FALSE,
fraction=0.3,
seed=123)
# sample 30% from the data set.
# convert the data into local R data frame for visualization.
sdf_air <- SparkR::sample(sdf_air,
FALSE,
0.3)
df_air_sampled <- SparkR::as.data.frame(sdf_air_sampled)
# summary of the finalized data.
SparkR::collect(describe(sdf_air,
"DayofMonth",
"DayOfWeek",
"Dest",
"Origin",
"DepTime",
"ArrDel15"))
# convert the data into local R data frame.
df_air_sampled <- SparkR::as.data.frame(sdf_air)
# take a glimpse of the processed data.
dplyr::glimpse(df_air_sampled)
df_air_sampled %<>%
dplyr::filter(as.character(ArrDel15) != "NA") %>%
dplyr::mutate(DepTime=as.numeric(DepTime), ArrDel15=as.factor(ArrDel15))
# close Spark session.
SparkR::sparkR.session.stop()