Below are some examples of how you can do distributed aggregations and analytics using Apache Spark and Azure Cosmos DB together. Note, Azure Cosmos DB already has support for aggregations (link to blog goes here) so here is how you can take it to the next level with Apache Spark.
Note, these aggregations are in reference to the Spark to Cosmos DB Connector Notebook
Connecting to Flights Sample Data
For these aggregations examples, we are accessing some flight performance data stored in our DoctorWho Cosmos DB database. To connect to it, you will need to utilize the following code snippet below:
// Import Spark to Cosmos DB Connector
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
// Connect to Cosmos DB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "<YourMasterKey>",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0"))
// Create collection connection
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")
With this, we will also run a base query which transfer the filtered set of data we want from Cosmos DB to Spark (where the latter can perform distributed aggregates). In this case, we are asking for flights departing from Seattle (SEA).
// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")
The results below are from running the queries using Jupyter notebook service. Note, all of these code snippets are generic and not specific to any service.
Running LIMIT and COUNT queries
Just like you're used to in SQL / Spark SQL, let's start off with a LIMIT
query:
The next query being a simple and fast COUNT
query:
GROUP BY query
In this next set, now we can easily run GROUP BY
queries against our DocumentDB database:
select destination, sum(delay) as TotalDelays
from originSEA
group by destination
order by sum(delay) desc limit 10
DISTINCT, ORDER BY query
And here is a DISTINCT, ORDER BY
query:
Continuing Flights Data Analysis
Below are some example queries to continue the analysis of our flights data:
Top 5 Delayed Destinations (Cities) departing from Seattle
select destination, sum(delay)
from originSEA
where delay < 0
group by destination
order by sum(delay) limit 5
Calculate median delays by destination cities departing from Seattle
select destination, percentile_approx(delay, 0.5) as median_delay
from originSEA
where delay < 0
group by destination
order by percentile_approx(delay, 0.5)