Committed the SparkR source files

Committed the SparkR source files
This commit is contained in:
Rajdeep Biswas 2020-06-10 14:16:30 -05:00 коммит произвёл GitHub
Родитель 3a834dcb95
Коммит 6af6dbf50e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 1493 добавлений и 0 удалений

74
code/Step01a_Setup.r Normal file
Просмотреть файл

@ -0,0 +1,74 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### Copyright (c) Microsoft Corporation.
# MAGIC ##### Licensed under the MIT license.
# MAGIC
# MAGIC ###### File: Step01a_Setup
# MAGIC ###### Date: 06/09/2020
# COMMAND ----------
# MAGIC %md
# MAGIC ####Step01a_Setup R notebook
# MAGIC
# MAGIC This notebook has the following process flow:
# MAGIC 1. Source and sink Configurations. We are using databricks dbutils widget here.
# MAGIC 2. For the sink account name and sas token we are using Azure Key Vault to secure the credentials.
# MAGIC
# MAGIC Ref: https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope
# MAGIC and https://docs.microsoft.com/en-us/azure/databricks/security/secrets/example-secret-workflow
# MAGIC 3. Load the SparkR Library
# MAGIC 4. Intialize the spark session
# COMMAND ----------
#Source configuration
#dbutils.widgets.removeAll()
dbutils.widgets.text("source_blob_account_name","azureopendatastorage","Source Blob Account Name")
dbutils.widgets.text("source_blob_container_name","citydatacontainer","Source Blob Container Name")
dbutils.widgets.text("source_blob_sas_token","source_blob_sas_token_value","Source Blob SAS Token")
#source_blob_sas_token value : ?st=2019-02-26T02%3A34%3A32Z&se=2119-02-27T02%3A34%3A00Z&sp=rl&sv=2018-03-28&sr=c&sig=XlJVWA7fMXCSxCKqJm8psMOh0W4h7cSYO28coRqF2fs%3D
#Sink configuration
dbutils.widgets.text("sink_blob_account_name","analyticsdatalakeraj","Sink Blob Account Name")
dbutils.widgets.text("sink_blob_container_name","safetydata","Sink Blob Container Name")
#dbutils.widgets.text("sink_blob_sas_token","sink_blob_sas_token_value","Sink Blob SAS Token") #Use the secure credetial here rather than widget or clear text values
sink_blob_sas_token =dbutils.secrets.get(scope="tale_of_3_cities_secrets", key="sink-blob-sas-token")
# COMMAND ----------
#Libraries
library(SparkR)
#library(tidyverse)
#library(anomalize)
#library(ggplot2)
# COMMAND ----------
#Get the value for source_storage_conf
(source_storage_conf=paste('fs.azure.sas.',dbutils.widgets.get("source_blob_container_name"),'.',dbutils.widgets.get("source_blob_account_name"),'.blob.core.windows.net',sep=""))
# COMMAND ----------
#Get the value for sink_storage_conf
(sink_storage_conf=paste('fs.azure.sas.',dbutils.widgets.get("sink_blob_container_name"),'.',dbutils.widgets.get("sink_blob_account_name"),'.blob.core.windows.net',sep=""))
# COMMAND ----------
# initialize Spark session
sparkR.session(
sparkConfig = list(
'fs.azure.sas.citydatacontainer.azureopendatastorage.blob.core.windows.net' = dbutils.widgets.get("source_blob_sas_token"),
'fs.azure.sas.safetydata.analyticsdatalakeraj.blob.core.windows.net' = sink_blob_sas_token
)
)
# COMMAND ----------
#Quick test #Notice how the SAS token value is secured from being displayed as open text
sparkR.conf('fs.azure.sas.safetydata.analyticsdatalakeraj.blob.core.windows.net')
# COMMAND ----------
# MAGIC %md
# MAGIC Once an account access key or a SAS is set up in the Spark conf in the notebook, we can use standard Spark and Databricks APIs to read from the storage account.

Просмотреть файл

@ -0,0 +1,262 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### Copyright (c) Microsoft Corporation.
# MAGIC ##### Licensed under the MIT license.
# MAGIC
# MAGIC ###### File: Step02a_Data_Wrangling
# MAGIC ###### Date: 06/09/2020
# COMMAND ----------
# MAGIC %md
# MAGIC ####Step02a_Data_Wrangling R notebook
# MAGIC
# MAGIC This notebook has the following process flow:
# MAGIC 1. Run Step01a_Setup for the Source and sink Configurations and Intialize the spark session.
# MAGIC 2. Use SparkSQL to enrich and curate and load the data from 3 cities in the sink Blob Storage as parquet formatted files
# COMMAND ----------
# MAGIC %run "./Step01a_Setup"
# COMMAND ----------
# MAGIC %md
# MAGIC Azure Blob storage is a service for storing large amounts of unstructured object data, such as text or binary data. You can use Blob storage to expose data publicly to the world, or to store application data privately. Common uses of Blob storage include:
# MAGIC • Serving images or documents directly to a browser
# MAGIC • Storing files for distributed access
# MAGIC • Streaming video and audio
# MAGIC • Storing data for backup and restore, disaster recovery, and archiving
# MAGIC • Storing data for analysis by an on-premises or Azure-hosted service
# MAGIC
# MAGIC We can read data from public storage accounts without any additional settings. To read data from a private storage account, we need to configure a Shared Key or a Shared Access Signature (SAS).
# MAGIC
# MAGIC Input widgets allows us to add parameters to your notebooks and dashboards. The widget API consists of calls to create various types of input widgets, remove them, and get bound values.
# MAGIC Widgets are best for:
# MAGIC
# MAGIC • Building a notebook or dashboard that is re-executed with different parameters
# MAGIC
# MAGIC • Quickly exploring results of a single query with different parameters
# COMMAND ----------
sparkR.conf('fs.azure.sas.safetydata.analyticsdatalakeraj.blob.core.windows.net')
# COMMAND ----------
# MAGIC %md
# MAGIC In this notebook we are going to wrangle and explore the data from 3 cities Chicago, Boston and the city of New York
# COMMAND ----------
#Constructing the source absolute paths
blob_base_path = paste('wasbs://',dbutils.widgets.get("source_blob_container_name"),'@',dbutils.widgets.get("source_blob_account_name"),'.blob.core.windows.net',sep="")
blob_absolute_path_chicago = paste(blob_base_path,"/Safety/Release/city=Chicago",sep="")
blob_absolute_path_boston = paste(blob_base_path,"/Safety/Release/city=Boston",sep="")
blob_absolute_path_newyorkcity = paste(blob_base_path,"/Safety/Release/city=NewYorkCity",sep="")
#print the absolute paths
cat("",blob_absolute_path_chicago,"\n",blob_absolute_path_boston,"\n",blob_absolute_path_newyorkcity)
# COMMAND ----------
# MAGIC %md
# MAGIC The 3-1-1 data in these 3 cities are organized by Azure Open Datasets in parquet format.
# MAGIC
# MAGIC Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
# COMMAND ----------
#Read the source data in a dataframe for 3 cities
raw_chicago_safety_df <- read.df(blob_absolute_path_chicago, source = "parquet")
raw_boston_safety_df <- read.df(blob_absolute_path_boston, source = "parquet")
raw_newyorkcity_safety_df <- read.df(blob_absolute_path_newyorkcity, source = "parquet")
# COMMAND ----------
# MAGIC %md
# MAGIC Let us explore the 3 datasets
# COMMAND ----------
printSchema(raw_chicago_safety_df)
# COMMAND ----------
printSchema(raw_boston_safety_df)
# COMMAND ----------
printSchema(raw_newyorkcity_safety_df)
# COMMAND ----------
#Lets gauge the scale of the source dataset we are analyzing
cat("","Number of rows for Chicago dataset: ",format(count(raw_chicago_safety_df),big.mark = ","),"\n", "Number of rows for Boston dataset: ",format(count(raw_boston_safety_df),big.mark = ",") ,"\n", "Number of rows for New York City dataset: ",format(count(raw_newyorkcity_safety_df),big.mark = ","))
# COMMAND ----------
#Display the structure of the Chicago DataFrame, including column names, column types, as well as a a small sample of rows.
str(raw_chicago_safety_df)
# COMMAND ----------
#Display the structure of the Boston DataFrame, including column names, column types, as well as a a small sample of rows.
str(raw_boston_safety_df)
# COMMAND ----------
#Display the structure of the New York City DataFrame, including column names, column types, as well as a a small sample of rows.
str(raw_newyorkcity_safety_df)
# COMMAND ----------
# MAGIC %md
# MAGIC we need the different date components for timeseries analysis
# COMMAND ----------
raw_chicago_safety_df = mutate(raw_chicago_safety_df,date=date_format(raw_chicago_safety_df$dateTime, "MM-dd-yyyy"),
year=year(raw_chicago_safety_df$dateTime),
month=date_format(raw_chicago_safety_df$dateTime, "MMM"),
weekOfMonth=date_format(raw_chicago_safety_df$dateTime, "W"),
dayOfWeek= date_format(raw_chicago_safety_df$dateTime, "E"),
hour= date_format(raw_chicago_safety_df$dateTime, "H")
)
# COMMAND ----------
#Register the DataFrame as a SQL temporary view: raw_chicago_safety_view'
#Registers a DataFrame as a Temporary Table in the SQLContext
registerTempTable(raw_chicago_safety_df, "raw_chicago_safety_view")
# COMMAND ----------
#Display top 10 rows
display(sql('SELECT * FROM raw_chicago_safety_view LIMIT 10'))
# COMMAND ----------
# MAGIC %md
# MAGIC Steps to filter out columns which we do not need
# COMMAND ----------
# Display distinct dataType
#Since it only has one value "Safety" we can drop it from the enriched dataset
display(sql('SELECT distinct(dataType) FROM raw_chicago_safety_view'))
# COMMAND ----------
# Display distinct dataSubtype
#Since it only has one value "311_All" we can drop it from the enriched dataset
display(sql('SELECT distinct(dataSubtype) FROM raw_chicago_safety_view'))
# COMMAND ----------
# Display distinct source
#Since it only has one value "null" we can drop it from the enriched dataset
display(sql('SELECT distinct(source) FROM raw_chicago_safety_view'))
# COMMAND ----------
# Display distinct extendedProperties
#Since it only has one value null we can drop it from the enriched dataset
display(sql('SELECT distinct(extendedProperties) FROM raw_chicago_safety_view'))
# COMMAND ----------
display(sql('describe raw_chicago_safety_view'))
# COMMAND ----------
enriched_chicago_safety_df = sql('SELECT dateTime,category,subcategory,status,address,latitude,longitude,date,year,month,weekOfMonth,dayOfWeek,hour
FROM raw_chicago_safety_view
')
# COMMAND ----------
#Constructing the sink absolute paths
sink_blob_base_path = paste('wasbs://',dbutils.widgets.get("sink_blob_container_name"),'@',dbutils.widgets.get("sink_blob_account_name"),'.blob.core.windows.net',sep="")
sink_blob_absolute_path_chicago = paste(sink_blob_base_path,"/city311/city=Chicago",sep="")
sink_blob_absolute_path_boston = paste(sink_blob_base_path,"/city311/city=Boston",sep="")
sink_blob_absolute_path_newyorkcity = paste(sink_blob_base_path,"/city311/city=NewYorkCity",sep="")
#print the absolute paths
cat("",sink_blob_absolute_path_chicago,"\n",sink_blob_absolute_path_boston,"\n",sink_blob_absolute_path_newyorkcity)
# COMMAND ----------
write.parquet(enriched_chicago_safety_df, sink_blob_absolute_path_chicago)
# COMMAND ----------
raw_boston_safety_df = mutate(raw_boston_safety_df,date=date_format(raw_boston_safety_df$dateTime, "MM-dd-yyyy"),
year=year(raw_boston_safety_df$dateTime),
month=date_format(raw_boston_safety_df$dateTime, "MMM"),
weekOfMonth=date_format(raw_boston_safety_df$dateTime, "W"),
dayOfWeek= date_format(raw_boston_safety_df$dateTime, "E"),
hour= date_format(raw_boston_safety_df$dateTime, "H")
)
# COMMAND ----------
#Register the DataFrame as a SQL temporary view: raw_boston_safety_view'
#Registers a DataFrame as a Temporary Table in the SQLContext
registerTempTable(raw_boston_safety_df, "raw_boston_safety_view")
# COMMAND ----------
#Display top 10 rows
display(sql('SELECT * FROM raw_boston_safety_view LIMIT 10'))
# COMMAND ----------
enriched_boston_safety_df = sql('SELECT dateTime,category,subcategory,status,address,latitude,longitude,date,year,month,weekOfMonth,dayOfWeek,hour
FROM raw_boston_safety_view
')
# COMMAND ----------
write.parquet(enriched_boston_safety_df, sink_blob_absolute_path_boston)
# COMMAND ----------
raw_newyorkcity_safety_df = mutate(raw_newyorkcity_safety_df,date=date_format(raw_newyorkcity_safety_df$dateTime, "MM-dd-yyyy"),
year=year(raw_newyorkcity_safety_df$dateTime),
month=date_format(raw_newyorkcity_safety_df$dateTime, "MMM"),
weekOfMonth=date_format(raw_newyorkcity_safety_df$dateTime, "W"),
dayOfWeek= date_format(raw_newyorkcity_safety_df$dateTime, "E"),
hour= date_format(raw_newyorkcity_safety_df$dateTime, "H")
)
# COMMAND ----------
#Register the DataFrame as a SQL temporary view: raw_newyorkcity_safety_view'
#Registers a DataFrame as a Temporary Table in the SQLContext
registerTempTable(raw_newyorkcity_safety_df, "raw_newyorkcity_safety_view")
# COMMAND ----------
#Display top 10 rows
display(sql('SELECT * FROM raw_newyorkcity_safety_view LIMIT 10'))
# COMMAND ----------
enriched_newyorkcity_safety_df = sql('SELECT dateTime,category,subcategory,status,address,latitude,longitude,date,year,month,weekOfMonth,dayOfWeek,hour
FROM raw_newyorkcity_safety_view
')
# COMMAND ----------
write.parquet(enriched_newyorkcity_safety_df, sink_blob_absolute_path_newyorkcity)
# COMMAND ----------
# MAGIC %md
# MAGIC We wrote the enriched datasets from the three cities in the following place:
# MAGIC wasbs://safetydata@analyticsdatalakeraj.blob.core.windows.net/city311/city=Chicago
# MAGIC wasbs://safetydata@analyticsdatalakeraj.blob.core.windows.net/city311/city=Boston
# MAGIC wasbs://safetydata@analyticsdatalakeraj.blob.core.windows.net/city311/city=NewYorkCity
# MAGIC
# MAGIC Notice this naturally paritions the data based on city so that we can reason over 3 cities in a distribuuted fashion and make use of partition pruning

Просмотреть файл

@ -0,0 +1,335 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### Copyright (c) Microsoft Corporation.
# MAGIC ##### Licensed under the MIT license.
# MAGIC
# MAGIC ###### File: Step02b_Data_Exploration_Visualization
# MAGIC ###### Date: 06/09/2020
# COMMAND ----------
# MAGIC %md
# MAGIC ####Step02b_Data_Exploration_Visualization R notebook
# MAGIC
# MAGIC This notebook has the following process flow:
# MAGIC 1. Run Step01a_Setup for the Source and sink Configurations and Intialize the spark session.
# MAGIC 2. Libraries and R HTML widgets setup.
# MAGIC 3. Explore and Visualize the data from 3 cities Chicago, Boston and the city of New York
# COMMAND ----------
# MAGIC %run "./Step01a_Setup"
# COMMAND ----------
# MAGIC %md
# MAGIC Libraries
# COMMAND ----------
# library(SparkR) #Spark R is already loaded from the Step01a_Setup notebook
library(ggplot2)
library(magrittr)
library(leaflet)
library(htmltools)
library(htmlwidgets)
# COMMAND ----------
# MAGIC %md
# MAGIC Refer: https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/azure/htmlwidgets-azure.html
# MAGIC
# MAGIC Below steps shows how you can get R HTML widgets working in Azure Databricks notebooks. The setup has two steps:
# MAGIC
# MAGIC Installing pandoc, a Linux package that is used by HTML widgets to generate HTML Changing one function within HTML Widgets package to make it seemless work in Azure Databricks. Make sure you use correct URL of your Azure Databricks environment. Both steps can be automated using init scripts so that when your cluster launches it installs pandoc and updates R HTMLwidgets package automatically.
# COMMAND ----------
# MAGIC %md
# MAGIC Installing HTML Widgets linux dependency on the driver
# COMMAND ----------
# MAGIC %sh
# MAGIC apt-get --yes install pandoc
# COMMAND ----------
## Replace <region> with the region of your Azure Databricks Account URL
## Make sure you use HTTPS "?o=<workspace-id>"
databricksURL <- "https://<region>.azuredatabricks.net/files/rwidgets/"
# COMMAND ----------
#Fix HTML Widgets package to work in Azure Databricks Notebooks
### Replace <workspace-id> with the workspace ID of your Azure Databricks Account URL
db_html_print <- function(x, ..., view = interactive()) {
fileName <- paste(tempfile(), ".html", sep="")
htmlwidgets::saveWidget(x, file = fileName)
randomFileName = paste0(floor(runif(1, 0, 10^12)), ".html")
baseDir <- "/dbfs/FileStore/rwidgets/"
dir.create(baseDir)
internalFile = paste0(baseDir, randomFileName)
externalFile = paste0(databricksURL, randomFileName, "?o=<workspace-id>")
system(paste("cp", fileName, internalFile))
displayHTML(externalFile)
}
R.utils::reassignInPackage("print.htmlwidget", pkgName = "htmlwidgets", value = db_html_print)
# COMMAND ----------
# MAGIC %md
# MAGIC The 3-1-1 data in these 3 cities are organized by Azure Open Datasets in parquet format.
# MAGIC
# MAGIC Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
# COMMAND ----------
# MAGIC %md
# MAGIC Let us explore the 3 datasets
# COMMAND ----------
#Constructing the enriched absolute paths
#Notice here we are using all 3 cities
sink_blob_base_path = paste('wasbs://',dbutils.widgets.get("sink_blob_container_name"),'@',dbutils.widgets.get("sink_blob_account_name"),'.blob.core.windows.net',sep="")
#Optionally read individual cities
#sink_blob_absolute_path_chicago = paste(sink_blob_base_path,"/city311/city=Chicago",sep="")
#sink_blob_absolute_path_boston = paste(sink_blob_base_path,"/city311/city=Boston",sep="")
#sink_blob_absolute_path_newyorkcity = paste(sink_blob_base_path,"/city311/city=NewYorkCity",sep="")
#Read all 3 cities
enriched_blob_absolute_path_3cities = paste(sink_blob_base_path,"/city311",sep="")
#print the absolute paths
cat(enriched_blob_absolute_path_3cities)
# COMMAND ----------
#Read the enriched data in a dataframe for 3 cities
enriched_3cities_safety_df <- read.df(enriched_blob_absolute_path_3cities, source = "parquet")
# COMMAND ----------
#Read the enriched data in a dataframe for 3 cities
enriched_chicago_safety_df <- filter(enriched_3cities_safety_df, enriched_3cities_safety_df$city == "Chicago")
enriched_boston_safety_df <- filter(enriched_3cities_safety_df, enriched_3cities_safety_df$city == "Boston")
enriched_newyorkcity_safety_df <- filter(enriched_3cities_safety_df, enriched_3cities_safety_df$city == "NewYorkCity")
# COMMAND ----------
#Lets gauge the scale of the source dataset we are analyzing
cat("","Number of rows for Chicago dataset: ",format(count(enriched_chicago_safety_df),big.mark = ","),"\n", "Number of rows for Boston dataset: ",format(count(enriched_boston_safety_df),big.mark = ",") ,"\n", "Number of rows for New York City dataset: ",format(count(enriched_newyorkcity_safety_df),big.mark = ","))
# COMMAND ----------
printSchema(enriched_3cities_safety_df)
# COMMAND ----------
str(enriched_3cities_safety_df)
# COMMAND ----------
#Register the DataFrame as a SQL temporary view: enriched_3cities_safety_view'
#Registers a DataFrame as a Temporary Table in the SQLContext
registerTempTable(enriched_3cities_safety_df, "enriched_3cities_safety_view")
# COMMAND ----------
#Display top 10 rows
display(sql('SELECT * FROM enriched_3cities_safety_view LIMIT 10'))
# COMMAND ----------
# MAGIC %md
# MAGIC Lets us visualize the categories in Chicago city
# COMMAND ----------
chicago_category_grouped_df = sql('SELECT category, count, ROW_NUMBER() OVER (ORDER BY count DESC) as rank
FROM (
SELECT category, COUNT(*) as count
FROM enriched_3cities_safety_view
WHERE city = "Chicago"
GROUP BY category)
')
# COMMAND ----------
str(chicago_category_grouped_df)
# COMMAND ----------
#Get the top 30 category for plotting
chicago_category_grouped_df_top30 = collect(filter(chicago_category_grouped_df,"rank <= 30"))
# COMMAND ----------
display(chicago_category_grouped_df_top30)
# COMMAND ----------
# MAGIC %md
# MAGIC Plot the top 30 incidents reported in Chicago
# COMMAND ----------
ggplot(chicago_category_grouped_df_top30, aes(x = reorder(category,-count,sum), y = count, fill = count)) + geom_col() +
labs(y = "Count of Incidents",x="Category of Incidents", title = paste0("Safety Incidents, ","Chicago")) +
theme(axis.title = element_text(size = 16), title = element_text(size = 16), axis.text.y = element_text(size = 6), legend.position = "none") +
scale_fill_gradient(low = "blue", high = "red") +
coord_flip()
# COMMAND ----------
# MAGIC %md
# MAGIC We might have to remove few chart toppers as they are dominant for example the information only calls
# COMMAND ----------
# MAGIC %md
# MAGIC Now we will plot the bottom 30 occuring incidents.
# COMMAND ----------
take(arrange(chicago_category_grouped_df, "rank", decreasing = TRUE),30)
# COMMAND ----------
#Get the bottom 30 category for plotting
chicago_category_grouped_df_bottom30 = take(arrange(chicago_category_grouped_df, "rank", decreasing = TRUE),30)
# COMMAND ----------
ggplot(chicago_category_grouped_df_bottom30, aes(x = reorder(category,-count,sum), y = count, fill = count)) + geom_col() +
labs(y = "Count of Incidents",x="Category of Incidents", title = paste0("Safety Incidents, ","Chicago Bottom 30")) +
theme(axis.title = element_text(size = 16), title = element_text(size = 16), axis.text.y = element_text(size = 6), legend.position = "none") +
scale_fill_gradient(low = "green", high = "pink") +
coord_flip()
# COMMAND ----------
# MAGIC %md
# MAGIC There is a stark contrast on the number of cases from the bottom and top 30
# COMMAND ----------
# MAGIC %md
# MAGIC Let us plot the year wise cases in Chicago city
# COMMAND ----------
chicago_year_grouped_df = collect(sql('
SELECT year as yr, COUNT(*) as count
FROM enriched_3cities_safety_view
WHERE city = "Chicago" and year >= 2011
GROUP BY year'))
# COMMAND ----------
head(chicago_year_grouped_df)
# COMMAND ----------
ggplot(chicago_year_grouped_df, aes(x = yr, y = count)) +
geom_line(color = "RED", size = 0.5) +
scale_x_continuous(breaks=seq(2011, 2020, 1)) +
labs(x = "Year of Safety Incident", y = "Number of Safety Incidents", title = "Yearly Safety Incidents in Chicago from 2011 – 2020")
# COMMAND ----------
# MAGIC %md
# MAGIC We will display safety incident locations across the 3 cities using leaflet.
# MAGIC For the purpose of this visualtion we will choose 10k rows from each of the 3 cities
# COMMAND ----------
year_grouped_df_3cities_10k = collect(sql('select * from
(select * from (SELECT dateTime,category,subcategory,latitude,longitude,address
FROM enriched_3cities_safety_view
WHERE city = "Chicago" and year=2019 ) limit 10000) union
(select * from (SELECT dateTime,category,subcategory,latitude,longitude,address
FROM enriched_3cities_safety_view
WHERE city = "Boston" and year=2019 ) limit 10000) union
(select * from (SELECT dateTime,category,subcategory,latitude,longitude,address
FROM enriched_3cities_safety_view
WHERE city = "NewYorkCity" and year=2019 ) limit 10000) '))
# COMMAND ----------
year_grouped_df_3cities_10k$popup <- paste("<b>Category: </b>", year_grouped_df_3cities_10k$category,
"<br>", "<b>SubCategory: </b>", year_grouped_df_3cities_10k$subcategory,
"<br>", "<b>DateTime: </b>", year_grouped_df_3cities_10k$dateTime,
"<br>", "<b>Address: </b>", year_grouped_df_3cities_10k$address,
"<br>", "<b>Longitude: </b>", year_grouped_df_3cities_10k$longitude,
"<br>", "<b>Latitude: </b>", year_grouped_df_3cities_10k$latitude)
# COMMAND ----------
str(year_grouped_df_3cities_10k)
# COMMAND ----------
# MAGIC %md
# MAGIC The dynamic leaffet html is attached separately
# COMMAND ----------
category_maps = leaflet(year_grouped_df_3cities_10k, width = "100%") %>% addTiles() %>%
addTiles(group = "OSM (default)") %>%
addProviderTiles(provider = "Esri.WorldStreetMap",group = "World StreetMap") %>%
addProviderTiles(provider = "Esri.WorldImagery",group = "World Imagery") %>%
addMarkers(lng = ~longitude, lat = ~latitude, popup = year_grouped_df_3cities_10k$popup, clusterOptions = markerClusterOptions()) %>%
addLayersControl(
baseGroups = c("OSM (default)","World StreetMap", "World Imagery"),
options = layersControlOptions(collapsed = FALSE)
)
#category_maps
# COMMAND ----------
yearly_count_3cities = collect(sql('SELECT year,city,count(*) as cnt
FROM enriched_3cities_safety_view
GROUP BY city,year'))
# COMMAND ----------
str(yearly_count_3cities)
# COMMAND ----------
# MAGIC %md
# MAGIC Changes Over Time - Volume of All Safety Calls
# COMMAND ----------
ggplot(yearly_count_3cities, aes(x = year, y = cnt, color = city)) + geom_line() + geom_point() +
labs(y = "Yearly Count of All Safety Calls")
# COMMAND ----------
# MAGIC %md
# MAGIC Changes Over Time - Volume of Specific Safety Calls
# COMMAND ----------
yearly_count_3cities_graffiti = collect(sql('SELECT year,city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE lower(category) like "%graffiti%" OR lower(subcategory) like "%graffiti%"
GROUP BY city,year'))
# COMMAND ----------
str(yearly_count_3cities_graffiti)
# COMMAND ----------
ggplot(yearly_count_3cities_graffiti, aes(x = year, y = cnt, color = city)) + geom_line() + geom_point() +
scale_x_continuous(breaks=seq(2010, 2020, 1)) +
labs(title = paste0("Yearly count of graffiti calls"))

Просмотреть файл

@ -0,0 +1,573 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### Copyright (c) Microsoft Corporation.
# MAGIC ##### Licensed under the MIT license.
# MAGIC
# MAGIC ###### File: Step03a_Model_Training_Testing
# MAGIC ###### Date: 06/09/2020
# COMMAND ----------
# MAGIC %md
# MAGIC ####Step03a_Model_Training_Testing R notebook
# MAGIC
# MAGIC This notebook has the following process flow:
# MAGIC 1. Run Step01a_Setup for the Source and sink Configurations and Intialize the spark session.
# MAGIC 2. Load Libraries.
# MAGIC 3. Time series analysis and forecasting for the 311 safety data from 3 cities Chicago, Boston and the city of New York
# COMMAND ----------
# MAGIC %run "./Step01a_Setup"
# COMMAND ----------
# MAGIC %md
# MAGIC Libraries
# COMMAND ----------
# library(SparkR) #Spark R is already loaded from the Step01a_Setup notebook
library(ggplot2)
library(forecast)
library(ggfortify)
library(fpp2)
# COMMAND ----------
# MAGIC %md
# MAGIC The data from 3 cities Chicago, Boston and the city of New York is enriched in our previous step.
# COMMAND ----------
# MAGIC %md
# MAGIC The 3-1-1 data in these 3 cities are organized in parquet format.
# MAGIC
# MAGIC Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
# COMMAND ----------
# MAGIC %md
# MAGIC Let us explore the 3 datasets
# COMMAND ----------
#Constructing the enriched absolute paths
#Notice here we are using all 3 cities
sink_blob_base_path = paste('wasbs://',dbutils.widgets.get("sink_blob_container_name"),'@',dbutils.widgets.get("sink_blob_account_name"),'.blob.core.windows.net',sep="")
#sink_blob_absolute_path_chicago = paste(sink_blob_base_path,"/city311/city=Chicago",sep="")
#sink_blob_absolute_path_boston = paste(sink_blob_base_path,"/city311/city=Boston",sep="")
#sink_blob_absolute_path_newyorkcity = paste(sink_blob_base_path,"/city311/city=NewYorkCity",sep="")
enriched_blob_absolute_path_3cities = paste(sink_blob_base_path,"/city311",sep="")
#print the absolute paths
cat(enriched_blob_absolute_path_3cities)
# COMMAND ----------
#Read the enriched data in a dataframe for 3 cities
enriched_3cities_safety_df <- read.df(enriched_blob_absolute_path_3cities, source = "parquet")
# COMMAND ----------
#Read the enriched data in a dataframe for 3 cities
enriched_chicago_safety_df <- filter(enriched_3cities_safety_df, enriched_3cities_safety_df$city == "Chicago")
enriched_boston_safety_df <- filter(enriched_3cities_safety_df, enriched_3cities_safety_df$city == "Boston")
enriched_newyorkcity_safety_df <- filter(enriched_3cities_safety_df, enriched_3cities_safety_df$city == "NewYorkCity")
# COMMAND ----------
#Lets gauge the scale of the source dataset we are analyzing
cat("","Number of rows for Chicago dataset: ",format(count(enriched_chicago_safety_df),big.mark = ","),"\n", "Number of rows for Boston dataset: ",format(count(enriched_boston_safety_df),big.mark = ",") ,"\n", "Number of rows for New York City dataset: ",format(count(enriched_newyorkcity_safety_df),big.mark = ","))
# COMMAND ----------
printSchema(enriched_3cities_safety_df)
# COMMAND ----------
str(enriched_3cities_safety_df)
# COMMAND ----------
#Register the DataFrame as a SQL temporary view: enriched_3cities_safety_view'
#Registers a DataFrame as a Temporary Table in the SQLContext
registerTempTable(enriched_3cities_safety_df, "enriched_3cities_safety_view")
# COMMAND ----------
#Display top 10 rows
display(sql('SELECT * FROM enriched_3cities_safety_view LIMIT 10'))
# COMMAND ----------
# MAGIC %md
# MAGIC Lets us get the top 30 categories in each city
# COMMAND ----------
allcities_category_grouped_df_top30 = sql('select * from (SELECT city, category, count, ROW_NUMBER() OVER (PARTITION BY city ORDER BY count DESC) as rank
FROM (
SELECT city, category, COUNT(*) as count
FROM enriched_3cities_safety_view
GROUP BY city, category)) where rank <=30
')
# COMMAND ----------
display(allcities_category_grouped_df_top30)
# COMMAND ----------
allcities_subcategory_grouped_df_top30 = sql('select * from (SELECT city, subcategory, count, ROW_NUMBER() OVER (PARTITION BY city ORDER BY count DESC) as rank
FROM (
SELECT city, subcategory, COUNT(*) as count
FROM enriched_3cities_safety_view
GROUP BY city, subcategory)) where rank <=30
')
# COMMAND ----------
display(allcities_subcategory_grouped_df_top30)
# COMMAND ----------
# MAGIC %md
# MAGIC Point to note here is Pothole gets called out in category column of Chicago and Subcategory column of New York City and Boston.
# MAGIC
# MAGIC Pothole facts from wiki (https://en.wikipedia.org/wiki/Pothole#Costs_to_the_public): The American Automobile Association estimated in the five years prior to 2016 that 16 million drivers in the United States have suffered damage from potholes to their vehicle including tire punctures, bent wheels, and damaged suspensions with a cost of $3 billion a year. In India, 3,000 people per year are killed in accidents involving potholes. Britain has estimated that the cost of fixing all roads with potholes in the country would cost £12 billion.
# MAGIC
# MAGIC We will focus on the pothole data from the 3 cities however the techniques can be applied across other categories as well.
# COMMAND ----------
# MAGIC %md
# MAGIC Get all the pothole complaints by city and date
# COMMAND ----------
yearly_count_3cities_pothole = collect(sql('SELECT year,city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%"
GROUP BY city,year'))
# COMMAND ----------
ggplot(yearly_count_3cities_pothole, aes(x = year, y = cnt, color = city)) + geom_line() + geom_point() +
scale_x_continuous(breaks=seq(2010, 2020, 1)) +
labs(title = paste0("Yearly count of pothole repair calls"))
# COMMAND ----------
weekly_count_3cities_pothole = collect(sql('select * from (SELECT year,month, weekOfMonth, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%"
GROUP BY year,month, weekOfMonth, city) order by year,month,weekOfMonth') )
# COMMAND ----------
head(weekly_count_3cities_pothole)
# COMMAND ----------
ggplot(weekly_count_3cities_pothole, aes(x = year, y = cnt, color = city)) + geom_line() + geom_point() +
scale_x_continuous(breaks=seq(2010, 2020, 1)) +
labs(title = paste0("Weekly count of pothole repair calls"))
# COMMAND ----------
# MAGIC %md
# MAGIC A time series can be thought of as a vector or matrix of numbers along with some information about what times those numbers were recorded. This information is stored in a ts object in R.
# MAGIC ts(data, start, frequency, ...)
# COMMAND ----------
monthly_count_3cities_pothole = collect(sql('select to_date(concat(year,"-",month,"-","01"),"yyyy-MMM-dd") as startDayOfMonth,year,month, city,cnt from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%"
GROUP BY year,month, city) order by startDayOfMonth') )
# COMMAND ----------
head(monthly_count_3cities_pothole)
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019 = collect(sql('select to_date(concat(year,"-",month,"-","01"),"yyyy-MMM-dd") as startDayOfMonth,cnt from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="Chicago" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020
GROUP BY year,month, city) order by startDayOfMonth') )
# COMMAND ----------
head(monthly_count_chicago_pothole_2014_2019)
# COMMAND ----------
# MAGIC %md
# MAGIC Convert into ts object
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019_ts = ts(monthly_count_chicago_pothole_2014_2019[, 2], start = c(2014, 1), frequency = 12)
# COMMAND ----------
head(monthly_count_chicago_pothole_2014_2019_ts)
# COMMAND ----------
class(monthly_count_chicago_pothole_2014_2019_ts)
# COMMAND ----------
frequency(monthly_count_chicago_pothole_2014_2019)
# COMMAND ----------
str(monthly_count_chicago_pothole_2014_2019_ts)
# COMMAND ----------
# MAGIC %md
# MAGIC Graphs enable you to visualize many features of the data, including patterns, unusual observations, changes over time, and relationships between variables. Just as the type of data determines which forecasting method to use, it also determines which graphs are appropriate.
# MAGIC You can use the autoplot() function to produce a time plot of the data with or without facets, or panels that display different subsets of data:
# COMMAND ----------
ggplot2::autoplot(monthly_count_chicago_pothole_2014_2019_ts) +
ggtitle("Monthly Chicago Pothole Incidents") +
xlab("Year") +
ylab("Count")
# COMMAND ----------
monthly_count_allcities_pothole_2014_2019 = collect(sql('select to_date(concat(year,"-",month,"-","01"),"yyyy-MMM-dd") as startDayOfMonth,cnt,city from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020
GROUP BY year,month, city) order by startDayOfMonth') )
# COMMAND ----------
class(monthly_count_allcities_pothole_2014_2019)
# COMMAND ----------
monthly_count_allcities_pothole_2014_2019_df = sql('select to_date(concat(year,"-",month,"-","01"),"yyyy-MMM-dd") as startDayOfMonth,cnt,city from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020
GROUP BY year,month, city) order by startDayOfMonth')
# COMMAND ----------
class(monthly_count_allcities_pothole_2014_2019_df)
# COMMAND ----------
monthly_count_allcities_pothole_2014_2019_pivot <- collect(sum(pivot(groupBy(monthly_count_allcities_pothole_2014_2019_df, "startDayOfMonth"), "city"), "cnt"))
# COMMAND ----------
class(monthly_count_allcities_pothole_2014_2019_pivot)
# COMMAND ----------
head(monthly_count_allcities_pothole_2014_2019_pivot)
# COMMAND ----------
monthly_count_allcities_pothole_2014_2019_pivot_ts = ts(monthly_count_allcities_pothole_2014_2019_pivot[, 2:4], start = c(2014, 1), frequency = 12)
# COMMAND ----------
ggplot2::autoplot(monthly_count_allcities_pothole_2014_2019_pivot_ts, facets = TRUE) +
ggtitle("Monthly 3 cities Pothole Incidents") +
xlab("Year") +
ylab("Count") +
scale_x_continuous(breaks=seq(2014, 2019, 1))
# COMMAND ----------
ggplot2::autoplot(monthly_count_allcities_pothole_2014_2019_pivot_ts, facets = FALSE) +
ggtitle("Monthly 3 cities Pothole Incidents") +
xlab("Year") +
ylab("Count") +
scale_x_continuous(breaks=seq(2014, 2019, 1))
# COMMAND ----------
frequency(monthly_count_allcities_pothole_2014_2019_pivot_ts)
# COMMAND ----------
# MAGIC %md
# MAGIC Interesting observations:
# MAGIC Can the uptick of pothole repairs in the 3 cities during the first half of the yar be attributed to harsh winters?
# MAGIC Can the budget and contract of workers for pothole repair be alloted and spent following the trend?
# COMMAND ----------
# MAGIC %md
# MAGIC Let us look at Chicago in more details
# COMMAND ----------
monthly_count_Chicago_pothole_2014_2019_df = sql('SELECT year, cnt, month from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="Chicago" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020
GROUP BY year,month, city) order by year')
# COMMAND ----------
monthly_count_Chicago_2014_2019_pivot <- collect(sum(pivot(groupBy(monthly_count_Chicago_pothole_2014_2019_df, "year"), "month"), "cnt"))
# COMMAND ----------
monthly_count_Chicago_2014_2019_pivot
# COMMAND ----------
monthly_count_Chicago_2014_2019_pivot_ts = ts(monthly_count_Chicago_2014_2019_pivot[, 2:13], start = c(2014, 1), frequency = 1)
# COMMAND ----------
monthly_count_Chicago_2014_2019_pivot_ts
# COMMAND ----------
monthly_count_Chicago_pothole_2014_2019_df_1 = collect(sql('SELECT to_date(concat(year,"-",month,"-","01"),"yyyy-MMM-dd") as startDayOfMonth, cnt from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="Chicago" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020
GROUP BY year,month, city) order by startDayOfMonth'))
# COMMAND ----------
monthly_count_Chicago_pothole_2014_2019_df_1_ts = ts(monthly_count_Chicago_pothole_2014_2019_df_1[, 2], start = c(2014, 1), frequency = 12)
# COMMAND ----------
ggseasonplot(monthly_count_Chicago_pothole_2014_2019_df_1_ts, main="Seasonal plot broken by year for pothole repairs in Chicago", ylab = "Count")
# COMMAND ----------
# Produce a polar coordinate season plot for the a10 data
ggseasonplot(monthly_count_Chicago_pothole_2014_2019_df_1_ts, main="Polar coordinate season plot broken by year for pothole repairs in Chicago", ylab = "Count", polar = TRUE)
# COMMAND ----------
ggsubseriesplot(monthly_count_Chicago_pothole_2014_2019_df_1_ts, main="Subseries plot broken by month for pothole repairs in Chicago", ylab = "Count")
# COMMAND ----------
# MAGIC %md
# MAGIC It clearly shows that Feb, Mar and Apr have a uptick in pothole repair cases
# COMMAND ----------
# MAGIC %md
# MAGIC The correlations associated with the lag plots form what is called the autocorrelation function (ACF). The ggAcf() function produces ACF plots.
# COMMAND ----------
# Create an ACF plot of the Chicago data
ggAcf(monthly_count_Chicago_pothole_2014_2019_df_1_ts)
# COMMAND ----------
# MAGIC %md
# MAGIC White noise is a term that describes purely random data. We can conduct a Ljung-Box test using the function below to confirm the randomness of a series; a p-value greater than 0.05 suggests that the data are not significantly different from white noise.
# COMMAND ----------
# Plot the original series
autoplot(monthly_count_Chicago_pothole_2014_2019_df_1_ts)
# Plot the differenced series
autoplot(diff(monthly_count_Chicago_pothole_2014_2019_df_1_ts))
# ACF of the differenced series
ggAcf(diff(monthly_count_Chicago_pothole_2014_2019_df_1_ts))
# Ljung-Box test of the differenced series
Box.test(diff(monthly_count_Chicago_pothole_2014_2019_df_1_ts), lag = 10, type = "Ljung")
# COMMAND ----------
monthly_count_Chicago_pothole_2014_2019_df_1_ts
# COMMAND ----------
# MAGIC %md
# MAGIC A forecast is the mean or median of simulated futures of a time series.
# MAGIC
# MAGIC The very simplest forecasting method is to use the most recent observation; this is called a naive forecast and can be implemented in a namesake function. This is the best that can be done for many time series including most stock price data, and even if it is not a good forecasting method, it provides a useful benchmark for other forecasting methods.
# MAGIC
# MAGIC For seasonal data, a related idea is to use the corresponding season from the last year of data. For example, if you want to forecast the sales volume for next March, you would use the sales volume from the previous March. This is implemented in the snaive() function, meaning, seasonal naive.
# MAGIC
# MAGIC For both forecasting methods, you can set the second argument h, which specifies the number of values you want to forecast; as shown in the code below, they have different default values. The resulting output is an object of class forecast. This is the core class of objects in the forecast package, and there are many functions for dealing with them including summary() and autoplot().
# COMMAND ----------
# Use naive() to forecast the Chicago pothole series
fc_monthly_count_Chicago_pothole_2014_2019 <- naive(monthly_count_Chicago_pothole_2014_2019_df_1_ts, h = 12)
# COMMAND ----------
#Plot forecasts
autoplot(fc_monthly_count_Chicago_pothole_2014_2019) +
ggtitle("Forecast of 2020 Pothole Incidents") +
xlab("Year") +
ylab("Count")
# COMMAND ----------
#summarize the forecasts
summary(fc_monthly_count_Chicago_pothole_2014_2019)
# COMMAND ----------
# Use snaive() to forecast the ausbeer series
fcs_monthly_count_Chicago_pothole_2014_2019 <- snaive(monthly_count_Chicago_pothole_2014_2019_df_1_ts, h = 12)
# COMMAND ----------
#Plot forecasts
autoplot(fcs_monthly_count_Chicago_pothole_2014_2019) +
ggtitle("Seasonal Naive Forecast of 2020 Pothole Incidents in Chicago") +
xlab("Year") +
ylab("Count")
# COMMAND ----------
#summarize the forecasts
summary(fcs_monthly_count_Chicago_pothole_2014_2019)
# COMMAND ----------
# MAGIC %md
# MAGIC When applying a forecasting method, it is important to always check that the residuals are well-behaved (i.e., no outliers or patterns) and resemble white noise. The prediction intervals are computed assuming that the residuals are also normally distributed.
# COMMAND ----------
checkresiduals(fcs_monthly_count_Chicago_pothole_2014_2019)
# COMMAND ----------
monthly_count_Chicago_pothole_2014_2019_df_1_ts
# COMMAND ----------
# Create the training data as train
train <- subset(monthly_count_Chicago_pothole_2014_2019_df_1_ts, end = 48)
# COMMAND ----------
train
# COMMAND ----------
# Compute seasonal naive forecasts and save to naive_fc
naive_fc <- snaive(train, h = 108)
# Compute mean forecasts and save to mean_fc
mean_fc <- meanf(train, h = 108)
# Use accuracy() to compute RMSE statistics
print(accuracy(naive_fc, monthly_count_Chicago_pothole_2014_2019_df_1_ts))
print(accuracy(mean_fc, monthly_count_Chicago_pothole_2014_2019_df_1_ts))
# COMMAND ----------
# Create three training series omitting the last 1, 2, and 3 years
train1 <- window(monthly_count_Chicago_pothole_2014_2019_df_1_ts, end = c(2014, 12))
train2 <- window(monthly_count_Chicago_pothole_2014_2019_df_1_ts, end = c(2015, 12))
train3 <- window(monthly_count_Chicago_pothole_2014_2019_df_1_ts, end = c(2016, 12))
# Produce forecasts using snaive()
fc1 <- snaive(train1, h = 4)
fc2 <- snaive(train2, h = 4)
fc3 <- snaive(train3, h = 4)
# Use accuracy() to compare the MAPE of each series
print(accuracy(fc1, monthly_count_Chicago_pothole_2014_2019_df_1_ts)["Test set", "MAPE"])
print(accuracy(fc2, monthly_count_Chicago_pothole_2014_2019_df_1_ts)["Test set", "MAPE"])
print(accuracy(fc3, monthly_count_Chicago_pothole_2014_2019_df_1_ts)["Test set", "MAPE"])
# COMMAND ----------
fcses <- ses(train3, h = 12)
fcnaive <- snaive(train3, h = 12)
fcholt <- holt(train3, h = 12)
fchw <- hw(train3, seasonal = "multiplicative", h = 12)
print(accuracy(fcses, monthly_count_Chicago_pothole_2014_2019_df_1_ts))
print(accuracy(fcnaive, monthly_count_Chicago_pothole_2014_2019_df_1_ts))
print(accuracy(fcholt, monthly_count_Chicago_pothole_2014_2019_df_1_ts))
print(accuracy(fchw, monthly_count_Chicago_pothole_2014_2019_df_1_ts))
# COMMAND ----------
# Plot forecasts
autoplot(fchw)
# COMMAND ----------
fchwa <- hw(train3, seasonal = "additive", h = 12)
autoplot(fchwa)
# COMMAND ----------
# MAGIC %md
# MAGIC Automatic forecasting with exponential smoothing
# MAGIC The namesake function for finding errors, trend, and seasonality (ETS) provides a completely automatic way of producing forecasts for a wide range of time series.
# COMMAND ----------
fiths <- ets(monthly_count_Chicago_pothole_2014_2019_df_1_ts)
autoplot(forecast(fiths))
# COMMAND ----------
checkresiduals(fiths)
# COMMAND ----------
# Fit a seasonal ARIMA model to monthly_count_Chicago_pothole_2014_2019_df_1_ts with lambda = 0
fit <- auto.arima(monthly_count_Chicago_pothole_2014_2019_df_1_ts, lambda = 0)
# COMMAND ----------
# Summarize the fitted model
summary(fit)
# COMMAND ----------
# Plot 2-year forecasts
fit %>% forecast(h = 24) %>% autoplot()
# COMMAND ----------
# Fit a TBATS model to the gas data
fit_tbats <- tbats(monthly_count_Chicago_pothole_2014_2019_df_1_ts)
# COMMAND ----------
# Forecast the series for the next 2 years
fc_tbats <- forecast(fit_tbats, h = 12 * 2)
# COMMAND ----------
# Plot the forecasts
autoplot(fc_tbats)
# COMMAND ----------

Просмотреть файл

@ -0,0 +1,249 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### Copyright (c) Microsoft Corporation.
# MAGIC ##### Licensed under the MIT license.
# MAGIC
# MAGIC ###### File: Step03b_Anomaly_Detection
# MAGIC ###### Date: 06/09/2020
# COMMAND ----------
# MAGIC %md
# MAGIC ####Step03b_Anomaly_Detection R notebook
# MAGIC
# MAGIC This notebook has the following process flow:
# MAGIC 1. Run Step01a_Setup for the Source and sink Configurations and Intialize the spark session.
# MAGIC 2. Load Libraries.
# MAGIC 3. Time series anomaly detection for the 311 safety data from 3 cities Chicago, Boston and the city of New York
# COMMAND ----------
# MAGIC %run "./Step01a_Setup"
# COMMAND ----------
Libraries
# COMMAND ----------
# library(SparkR) #Spark R is already loaded from the Step01a_Setup notebook
library(anomalize)
library(tidyverse)
# COMMAND ----------
# MAGIC %md
# MAGIC Get the enriched data and register the temporary table
# COMMAND ----------
#Constructing the enriched absolute paths
#Notice here we are using all 3 cities
sink_blob_base_path = paste('wasbs://',dbutils.widgets.get("sink_blob_container_name"),'@',dbutils.widgets.get("sink_blob_account_name"),'.blob.core.windows.net',sep="")
enriched_blob_absolute_path_3cities = paste(sink_blob_base_path,"/city311",sep="")
#print the absolute paths
cat(enriched_blob_absolute_path_3cities)
# COMMAND ----------
#Read the enriched data in a dataframe for 3 cities
enriched_3cities_safety_df <- read.df(enriched_blob_absolute_path_3cities, source = "parquet")
# COMMAND ----------
#Register the DataFrame as a SQL temporary view: enriched_3cities_safety_view'
#Registers a DataFrame as a Temporary Table in the SQLContext
registerTempTable(enriched_3cities_safety_df, "enriched_3cities_safety_view")
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019 = SparkR::collect(SparkR::sql('select to_date(concat(year,"-",month,"-","01"),"yyyy-MMM-dd") as startDayOfMonth,cnt from (SELECT year,month, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="Chicago" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020
GROUP BY year,month, city) order by startDayOfMonth') )
# COMMAND ----------
head(monthly_count_chicago_pothole_2014_2019)
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019_tib = as.tibble(monthly_count_chicago_pothole_2014_2019)
# COMMAND ----------
head(monthly_count_chicago_pothole_2014_2019_tib)
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019_tib %>%
time_decompose(cnt, method = "stl", frequency = "auto", trend = "auto") %>%
anomalize(remainder, method = "gesd", alpha = 0.05, max_anoms = 0.2) %>%
plot_anomaly_decomposition()
# COMMAND ----------
# MAGIC %md
# MAGIC Anomaly Detection and Plotting the detected anomalies are almost similar to what we saw above with Time Series Decomposition. Its just that decomposed components after anomaly detection are recomposed back with time_recompose() and plotted with plot_anomalies() . The package itself automatically takes care of a lot of parameter setting like index, frequency and trend, making it easier to run anomaly detection out of the box with less prior expertise in the same domain.
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019_tib %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
time_recompose() %>%
plot_anomalies(time_recomposed = TRUE, ncol = 3, alpha_dots = 0.5)
# COMMAND ----------
# MAGIC %md
# MAGIC If you are interested in extracting the actual datapoints which are anomalies, the following code could be used:
# COMMAND ----------
monthly_count_chicago_pothole_2014_2019_tib %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
time_recompose() %>%
filter(anomaly == 'Yes')
# COMMAND ----------
# MAGIC %md
# MAGIC If we get a bit more granular with breaking the count weekly on the start day of the week
# COMMAND ----------
str(enriched_3cities_safety_df)
# COMMAND ----------
weekly_count_chicago_pothole_2014_2019 = SparkR::collect(SparkR::sql('SELECT startDayOfWeek,cnt FROM (SELECT year,month, weekOfMonth, min(to_date(date,"MM-dd-yyyy")) as startDayOfWeek, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="Chicago" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020 and date is not null
GROUP BY year,month, weekOfMonth, city) order by startDayOfWeek'))
# COMMAND ----------
head(weekly_count_chicago_pothole_2014_2019)
# COMMAND ----------
weekly_count_chicago_pothole_2014_2019_tib = as.tibble(weekly_count_chicago_pothole_2014_2019)
# COMMAND ----------
head(weekly_count_chicago_pothole_2014_2019_tib)
# COMMAND ----------
# MAGIC %md
# MAGIC anomalize has three main functions:
# MAGIC
# MAGIC time_decompose(): Separates the time series into seasonal, trend, and remainder components
# MAGIC
# MAGIC anomalize(): Applies anomaly detection methods to the remainder component.
# MAGIC
# MAGIC time_recompose(): Calculates limits that separate the “normal” data from the anomalies!
# COMMAND ----------
weekly_count_chicago_pothole_2014_2019_tib %>%
time_decompose(cnt, method = "stl", frequency = "auto", trend = "auto") %>%
anomalize(remainder, method = "gesd", alpha = 0.05, max_anoms = 0.2) %>%
plot_anomaly_decomposition() +
labs(title = "Decomposition of Anomalized Chicago Weekly Pothole Repair Complaints") +
xlab("Year (Data collected as weekly aggregate)")
# COMMAND ----------
weekly_count_chicago_pothole_2014_2019_tib %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
time_recompose() %>%
plot_anomalies(time_recomposed = TRUE, ncol = 3, alpha_dots = 0.5)+
labs(title = "Chicago Weekly Pothole Repair Complaints Anomalies")
# COMMAND ----------
weekly_count_chicago_pothole_2014_2019_tib %>%
# Data Manipulation / Anomaly Detection
time_decompose(cnt, method = "stl") %>%
anomalize(remainder, method = "iqr") %>%
time_recompose() %>%
# Anomaly Visualization
plot_anomalies(time_recomposed = TRUE, ncol = 3, alpha_dots = 0.5) +
labs(title = "Tidyverse Chicago Weekly Pothole Repair Complaints Anomalies", subtitle = "STL + IQR Methods") +
xlab("Year (Data collected as weekly aggregate)")
# COMMAND ----------
#plot_anomaly_decomposition() for visualizing the inner workings of how algorithm detects anomalies in the “remainder”.
weekly_count_chicago_pothole_2014_2019_tib %>%
ungroup() %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
plot_anomaly_decomposition() +
labs(title = "Decomposition of Anomalized Chicago Weekly Pothole Repair Complaints") +
xlab("Year (Data collected as weekly aggregate)")
# COMMAND ----------
weekly_count_chicago_pothole_2014_2019_tib %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
time_recompose() %>%
filter(anomaly == 'Yes')
# COMMAND ----------
# MAGIC %md
# MAGIC observation from the data: Why there was bump in potholes repair complaints in 2018 February?
# MAGIC From the records 2018 Jan-Feb had a harsh winter and flooding. Also snow, ice and moisture all contribute to potholes but a cycle of freezing temperatures followed by higher temperatures helps the formation of potholes. and that explains the anamoly :
# MAGIC
# MAGIC https://abc7chicago.com/chicago-weather-potholes-heavy-rain-flood-watch/3112763/
# MAGIC https://digitaledition.chicagotribune.com/tribune/article_popover.aspx?guid=0815ff4c-6db6-4166-848c-eed12b08a702
# COMMAND ----------
# MAGIC %md
# MAGIC Going by the theme of our resrach i.e whether the 3 cities are related let us find the anamolies in New York City and Boston also.
# MAGIC We observe both the cities during the early 2018 had a rise in cases of pothole complaints. We also see from the data that the trends and anomalies in pothole complaints in Boston and New York City are very similar which can be attributed to their proximity and climate similarities.
# COMMAND ----------
weekly_count_boston_pothole_2014_2019 = SparkR::collect(SparkR::sql('SELECT startDayOfWeek,cnt FROM (SELECT year,month, weekOfMonth, min(to_date(date,"MM-dd-yyyy")) as startDayOfWeek, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="Boston" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020 and date is not null
GROUP BY year,month, weekOfMonth, city) order by startDayOfWeek'))
# COMMAND ----------
weekly_count_boston_pothole_2014_2019_tib = as.tibble(weekly_count_boston_pothole_2014_2019)
# COMMAND ----------
weekly_count_boston_pothole_2014_2019_tib %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
time_recompose() %>%
filter(anomaly == 'Yes')
# COMMAND ----------
weekly_count_newyorkcity_pothole_2014_2019 = SparkR::collect(SparkR::sql('SELECT startDayOfWeek,cnt FROM (SELECT year,month, weekOfMonth, min(to_date(date,"MM-dd-yyyy")) as startDayOfWeek, city,count(*) as cnt
FROM enriched_3cities_safety_view
WHERE city="NewYorkCity" and (lower(category) like "%pothole%" OR lower(subcategory) like "%pothole%") and year > 2013 and year < 2020 and date is not null
GROUP BY year,month, weekOfMonth, city) order by startDayOfWeek'))
# COMMAND ----------
weekly_count_newyorkcity_pothole_2014_2019_tib = as.tibble(weekly_count_newyorkcity_pothole_2014_2019)
# COMMAND ----------
weekly_count_newyorkcity_pothole_2014_2019_tib %>%
time_decompose(cnt) %>%
anomalize(remainder) %>%
time_recompose() %>%
filter(anomaly == 'Yes')