Add a notebook for Delta Lake support (#447)

This commit is contained in:
EJ Song 2021-05-26 13:17:57 -07:00 коммит произвёл GitHub
Родитель ebbf5b3c93
Коммит 17cd87ff17
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 584 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,584 @@
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Hyperspace for Delta Lake\r\n",
"\r\n",
"[Hyperspace](https://github.com/microsoft/hyperspace) now supports Delta Lake as its data source. This notebook covers how Hyperspace works with Delta Lake tables and updates on the tables.\r\n"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Setup configurations"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"val sessionId = scala.util.Random.nextInt(1000000)\n",
"val dataPath = s\"/hyperspace/data-$sessionId\";\n",
"val indexLocation = s\"/hyperspace/indexes-$sessionId\"\n",
"\n",
"// Use a random index location to avoid conflicts while using the notebook.\n",
"spark.conf.set(\"spark.hyperspace.system.path\", indexLocation)\n",
"// Use HTML as a display mode.\n",
"spark.conf.set(\"spark.hyperspace.explain.displayMode\", \"html\")\n",
"// Enable Hybrid scan regardless of the amount of data being appended/deleted.\n",
"spark.conf.set(\"spark.hyperspace.index.hybridscan.maxAppendedRatio\", \"0.99\") // default: 0.3\n",
"spark.conf.set(\"spark.hyperspace.index.hybridscan.maxDeletedRatio\", \"0.99\") // default: 0.2"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"### Data preparation"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import spark.implicits._\n",
"import org.apache.spark.sql.DataFrame\n",
"\n",
"// Sample department records\n",
"val departments = Seq(\n",
" (10, \"Accounting\", \"New York\"),\n",
" (20, \"Research\", \"Dallas\"),\n",
" (30, \"Sales\", \"Chicago\"),\n",
" (40, \"Operations\", \"Boston\"))\n",
"\n",
"// Sample employee records\n",
"val employees = Seq(\n",
" (7369, \"SMITH\", 20),\n",
" (7499, \"ALLEN\", 30),\n",
" (7521, \"WARD\", 30),\n",
" (7566, \"JONES\", 20),\n",
" (7698, \"BLAKE\", 30),\n",
" (7782, \"CLARK\", 10),\n",
" (7788, \"SCOTT\", 20),\n",
" (7839, \"KING\", 10),\n",
" (7844, \"TURNER\", 30),\n",
" (7876, \"ADAMS\", 20),\n",
" (7900, \"JAMES\", 30),\n",
" (7934, \"MILLER\", 10),\n",
" (7902, \"FORD\", 20),\n",
" (7654, \"MARTIN\", 30))\n",
"\n",
"val empData = employees.toDF(\"empId\", \"empName\", \"deptId\")\n",
"val deptData = departments.toDF(\"deptId\", \"deptName\", \"location\")\n",
"val empLocation = s\"$dataPath/employees\"\n",
"val deptLocation = s\"$dataPath/departments\"\n",
"empData.write.format(\"delta\").mode(\"overwrite\").save(empLocation)\n",
"deptData.write.format(\"delta\").mode(\"overwrite\").save(deptLocation)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"val empDF = spark.read.format(\"delta\").load(empLocation)\n",
"val deptDF = spark.read.format(\"delta\").load(deptLocation)\n",
"\n",
"// Disable BroadcastHashJoin so that Spark™ will use SortMergeJoin that Hyperspace indexes can optimize.\n",
"spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", -1)\n",
"\n",
"val eqJoin =\n",
" empDF.\n",
" join(deptDF, empDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(empDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"eqJoin.show"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Create Hyperspace indexes over Delta Lake tables\r\n",
"\r\n",
"Hyperspace supports Delta Lake through an extensible data source builder framework.\r\n",
"In order to create and apply Hyperspace indexes on Delta Lake tables, you need to register Delta Lake source builder.\r\n",
"\r\n",
"spark.conf.set(\"spark.hyperspace.index.sources.fileBasedBuilders\", \r\n",
" \"**com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder**,com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder\")\r\n"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import com.microsoft.hyperspace._\n",
"import com.microsoft.hyperspace.index._\n",
"\n",
"// Register delta table source builder.\n",
"spark.conf.set(\n",
" \"spark.hyperspace.index.sources.fileBasedBuilders\",\n",
" \"com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder,\" +\n",
" \"com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder\")\n",
"\n",
"// Enable source lineage to support the scenario with deleted files.\n",
"spark.conf.set(\"spark.hyperspace.index.lineage.enabled\", \"true\")\n",
"\n",
"val hyperspace = Hyperspace()\n",
"\n",
"val empIndexConfig = IndexConfig(\"empIndex\", Seq(\"deptId\"), Seq(\"empName\"))\n",
"val deptIndexConfig = IndexConfig(\"deptIndex\", Seq(\"deptId\"), Seq(\"deptName\"))\n",
"\n",
"hyperspace.createIndex(empDF, empIndexConfig)\n",
"hyperspace.createIndex(deptDF, deptIndexConfig)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Enable Hypperspace to apply indexes.\n",
"// For simplicity, FilterIndexRule is disabled in this demo.\n",
"spark.enableHyperspace()"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Scenario: check if newly created indexes are applied.\n",
"val eqJoin =\n",
" empDF.\n",
" join(deptDF, empDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(empDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"eqJoin.show\n",
"\n",
"hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"### Append data\r\n",
"\r\n",
"With Hybrid Scan, you can still utilize Hyperspace indexes after appending data to the table.\r\n"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"// Add new employees.\n",
"val newEmployees = Seq(\n",
" (8000, \"NEW-EMPLOYEE-1\", 30),\n",
" (8001, \"NEW-EMPLOYEE-2\", 10),\n",
" (8002, \"NEW-EMPLOYEE-3\", 20),\n",
" (8003, \"NEW-EMPLOYEE-4\", 30))\n",
"\n",
"newEmployees.toDF(\"empId\", \"empName\", \"deptId\").write.format(\"delta\").mode(\"append\").save(empLocation)\n",
"\n",
"val latestEmpDF = spark.read.format(\"delta\").load(empLocation)\n",
"latestEmpDF.show"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Scneario: Hybrid scan is off.\n",
"spark.conf.set(\"spark.hyperspace.index.hybridscan.enabled\", \"false\")\n",
"\n",
"val eqJoin =\n",
" latestEmpDF.\n",
" join(deptDF, latestEmpDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(latestEmpDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"eqJoin.show\n",
"\n",
"hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Scenario: Hybrid Scan is on.\n",
"spark.conf.set(\"spark.hyperspace.index.hybridscan.enabled\", \"true\")\n",
"\n",
"val eqJoin =\n",
" latestEmpDF.\n",
" join(deptDF, latestEmpDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(latestEmpDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }\n",
"\n",
"eqJoin.show"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Incremental refresh\r\n",
"\r\n",
"Other than using Hybrid Scan, you can also incrementally build Hyperspace indexes only for appended and deleted data."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"// Incrementally build index on new employees only.\n",
"hyperspace.refreshIndex(\"empIndex\", \"incremental\")"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Show refreshed index only contains new data.\n",
"spark.read.parquet(s\"$indexLocation/empIndex/v__=1\").show"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Scenario: Check if refreshed index is applied.\n",
"val eqJoin =\n",
" latestEmpDF.\n",
" join(deptDF, latestEmpDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(latestEmpDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }\n",
"\n",
"eqJoin.show"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Update data\r\n",
"\r\n",
"Updated data to the table can be handled as deleted and appended data by using Hybrid Scan or Incremental refresh."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import io.delta.tables._\n",
"import org.apache.spark.sql.functions._\n",
"\n",
"val empDeltaTable = DeltaTable.forPath(spark, empLocation)\n",
"\n",
"// Append \"SPEICAL\" to the \"NEW-EMPLOYEE-2\"'s name.\n",
"empDeltaTable.update(\n",
" col(\"empName\") === (\"NEW-EMPLOYEE-2\"),\n",
" Map(\"empName\" -> (concat(col(\"empName\"), lit(\"-SPECIAL\")))))\n",
"\n",
"empDeltaTable.history.show(truncate = false)\n",
"empDeltaTable.toDF.show(truncate = false)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"// Scneario: handle updated data.\n",
"val updatedEmpDF = empDeltaTable.toDF\n",
"val eqJoin =\n",
" updatedEmpDF.\n",
" join(deptDF, updatedEmpDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(updatedEmpDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"eqJoin.show(truncate = false)\n",
"\n",
"hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Enhancement of Delta Lake time travel query\r\n",
"\r\n",
"For a time travel query with an old table version, the latest version of the index can be used with Hybrid Scan, but usually there could be many appended and/or deleted files which reduce the benefit of indexes.\r\n",
"To optimize it, Hyperspace tracks the history of the index version and table version for each refresh time and selects the closest index version based on the history.\r\n",
"\r\n",
"\r\n",
"Note that this feature is not available in the current Hyperspace version and will be delivered in the next release."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"// Scenario: Time travel to initial version of employees.\n",
"val oldEmpOnlyDF = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(empLocation)\n",
"\n",
"val eqJoin =\n",
" oldEmpOnlyDF.\n",
" join(deptDF, oldEmpOnlyDF(\"deptId\") === deptDF(\"deptId\")).\n",
" select(oldEmpOnlyDF(\"empName\"), deptDF(\"deptName\"))\n",
"\n",
"hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }\n",
"\n",
"eqJoin.show"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_spark",
"display_name": "Synapse Spark"
},
"language_info": {
"name": "scala"
},
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}