Add a notebook for ZOrderCoveringIndex

This commit is contained in:
EJ Song 2022-01-13 20:47:41 -08:00
Родитель db4bef41b1
Коммит bc6da283a4
1 изменённых файлов: 281 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,281 @@
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Hyperspace ZOrderCoveringIndex"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"val sessionId = scala.util.Random.nextInt(1000000)\r\n",
"val dataPath = s\"/hyperspacetest/data-$sessionId\"\r\n",
"val indexPath = s\"/hyperspacetest/index-$sessionId\"\r\n",
"spark.conf.set(\"spark.hyperspace.system.path\", indexPath)\r\n",
"\r\n",
"val numFiles = 100"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"microsoft": {},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Data preparation"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.range(50000000).map { _ =>\r\n",
" (scala.util.Random.nextInt(10000000).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(10000000))\r\n",
"}.toDF(\"colA\", \"colB\", \"colC\").repartition(numFiles).write.mode(\"overwrite\").format(\"parquet\").save(dataPath)\r\n",
"\r\n",
"// 50M rows with random integers stored in numFiles parquet files"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Create index"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import com.microsoft.hyperspace.index.zordercovering._\r\n",
"import com.microsoft.hyperspace._\r\n",
"import com.microsoft.hyperspace.util.FileUtils\r\n",
"import org.apache.hadoop.fs.Path\r\n",
"\r\n",
"val totalSizeInBytes = FileUtils.getDirectorySize(new Path(dataPath))\r\n",
"val sizePerPartition = totalSizeInBytes / numFiles \r\n",
"spark.conf.set(\"spark.hyperspace.index.zorder.targetSourceBytesPerPartition\", sizePerPartition) // Default: 1G\r\n",
"// Changed per file size for z-order index for demonstration\r\n",
"\r\n",
"val df = spark.read.parquet(dataPath)\r\n",
"val hs = new Hyperspace(spark)\r\n",
"hs.createIndex(df, ZOrderCoveringIndexConfig(\"zorderTestIndex\", Seq(\"colA\", \"colB\"), Seq(\"colC\")))"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"def measureDuration(f : => Unit) {\r\n",
" val start = System.nanoTime\r\n",
" f\r\n",
" val durationInMS = (System.nanoTime - start) / 1000 / 1000\r\n",
" println(\"duration(ms): \" + durationInMS)\r\n",
"}"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Check performance with and without ZOrderCoveringIndex\r\n",
"\r\n",
"NOTE: performance gain will be different depending on query type, data size and computing environment. \r\n",
"As the test data is not huge, use small computing resource to see the improvement from Z-ordering."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.disableHyperspace\r\n",
"val filterQuery = df.filter(\"colA > 758647 AND colA < 779999 AND colB > 10537919 AND colB < 10599715\")\r\n",
"println(filterQuery.queryExecution.sparkPlan)\r\n",
"measureDuration(filterQuery.count)\r\n",
"measureDuration(filterQuery.count)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.enableHyperspace\r\n",
"val filterQuery = df.filter(\"colA > 758647 AND colA < 779999 AND colB > 10537919 AND colB < 10599715\")\r\n",
"println(filterQuery.queryExecution.sparkPlan)\r\n",
"measureDuration(filterQuery.count)\r\n",
"measureDuration(filterQuery.count)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Utility function for min/max skipping analysis\r\n",
"\r\n",
"We provide min/max based analysis utility function for any DataFrame.\r\n",
"The analysis function only works for numeric columns. \r\n",
"It'll collect min/max for each data file and generate analysis result.\r\n"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import com.microsoft.hyperspace.util.MinMaxAnalysisUtil\r\n",
"val df = spark.read.parquet(dataPath) \r\n",
"\r\n",
"// Since source data is randomly generated, we need to check all files to find a value.\r\n",
"displayHTML(MinMaxAnalysisUtil.analyze(df, Seq(\"colA\", \"colB\"), format = \"html\")) // format \"text\" and \"html\" are available.\r\n"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"// As the index data is Z-ordered, we can skip reading unnecessary files based on min/max statistics.\r\n",
"displayHTML(MinMaxAnalysisUtil.analyzeIndex(spark, \"zorderTestIndex\", Seq(\"colA\", \"colB\"), format = \"html\")) "
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"language": "Python",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "scala"
},
"kernel_info": {
"name": "synapse_pyspark"
},
"description": null,
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}