Add spark session extension for Hyperspace (#504)

This commit is contained in:
paryoja 2021-11-16 03:53:10 +09:00 коммит произвёл GitHub
Родитель 2f8d32b422
Коммит d8c4b79ceb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 335 добавлений и 18 удалений

Просмотреть файл

@ -0,0 +1,69 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
/**
* An extension for Spark SQL to activate Hyperspace.
*
* Example to run a `spark-submit` with Hyperspace enabled:
* {{{
* spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension
* }}}
*
* Example to create a `SparkSession` with Hyperspace enabled:
* {{{
* val spark = SparkSession
* .builder()
* .appName("...")
* .master("...")
* .config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
* .getOrCreate()
* }}}
*/
class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) {
/**
* If HyperspaceRule is injected directly to OptimizerRule with HyperspaceExtension,
* the order of applying rule is different from without HyperspaceExtension
* (i.e., explicitly calling enableHyperspace). To make behavior consistently,
* current implementation of HyperspaceExtension uses a trick to call enableHyperspace
* before rule is applied. Since the interface of injectOptimizerRule should return rule builder,
* it returns a dummy rule that does nothing. It may increase overhead slightly
* because enableHyperspace is called once for each evaluation of spark plan.
*/
private case object DummyRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan
}
}
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule { sparkSession =>
// Enable Hyperspace to leverage indexes.
sparkSession.addOptimizationsIfNeeded()
// Return a dummy rule to fit in interface of injectOptimizerRule
DummyRule
}
}
}

Просмотреть файл

@ -19,6 +19,10 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.internal.SQLConf
object IndexConstants {
// If it is set as false, Hyperspace will not be applied.
val HYPERSPACE_APPLY_ENABLED = "spark.hyperspace.apply.enabled"
val HYPERSPACE_APPLY_ENABLED_DEFAULT = "true"
val INDEXES_DIR = "indexes"
// Config used for setting the system path, which is considered as a "root" path for Hyperspace;

Просмотреть файл

@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging
import com.microsoft.hyperspace.util.HyperspaceConf
/**
* Transform the given plan to use Hyperspace indexes.
@ -42,7 +43,7 @@ object ApplyHyperspace
private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean]
override def apply(plan: LogicalPlan): LogicalPlan = {
if (disableForIndexMaintenance.get) {
if (!HyperspaceConf.hyperspaceApplyEnabled(spark) || disableForIndexMaintenance.get) {
return plan
}

Просмотреть файл

@ -18,8 +18,10 @@ package com.microsoft
import org.apache.spark.sql.SparkSession
import com.microsoft.hyperspace.HyperspaceSparkSessionExtension
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.util.HyperspaceConf
package object hyperspace {
@ -29,42 +31,65 @@ package object hyperspace {
implicit class Implicits(sparkSession: SparkSession) {
/**
* Plug in Hyperspace-specific rules.
* Enable Hyperspace indexes.
*
* Plug in Hyperspace-specific rules and set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as true.
*
* @return a spark session that contains Hyperspace-specific rules.
*/
def enableHyperspace(): SparkSession = {
disableHyperspace
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
ApplyHyperspace :: Nil
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true)
addOptimizationsIfNeeded()
sparkSession
}
/**
* Plug out Hyperspace-specific rules.
* Disable Hyperspace indexes.
*
* @return a spark session that does not contain Hyperspace-specific rules.
* Set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as false
* to stop applying Hyperspace indexes.
*
* @return a spark session that `IndexConstants.HYPERSPACE_APPLY_ENABLED` is set as false.
*/
def disableHyperspace(): SparkSession = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations =
experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals)
experimentalMethods.extraStrategies =
experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals)
HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, false)
sparkSession
}
/**
* Checks if Hyperspace is enabled or not.
*
* Note that Hyperspace is enabled when:
* 1) `ApplyHyperspace` exists in extraOptimization
* 2) `BucketUnionStrate` exists in extraStrategies and
* 3) `IndexConstants.HYPERSPACE_APPLY_ENABLED` is true.
*
* @return true if Hyperspace is enabled or false otherwise.
*/
def isHyperspaceEnabled(): Boolean = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations.contains(ApplyHyperspace) &&
experimentalMethods.extraStrategies.contains(BucketUnionStrategy)
experimentalMethods.extraStrategies.contains(BucketUnionStrategy) &&
HyperspaceConf.hyperspaceApplyEnabled(sparkSession)
}
/**
* Add ApplyHyperspace and BucketUnionStrategy into extraOptimization
* and extraStrategies, respectively, to make Spark can use Hyperspace.
*
* @param sparkSession Spark session that will use Hyperspace
*/
private[hyperspace] def addOptimizationsIfNeeded(): Unit = {
if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains(
ApplyHyperspace)) {
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
ApplyHyperspace :: Nil
}
if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains(
BucketUnionStrategy)) {
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
}
}
}
}

Просмотреть файл

@ -25,6 +25,22 @@ import com.microsoft.hyperspace.index.IndexConstants
* Helper class to extract Hyperspace-related configs from SparkSession.
*/
object HyperspaceConf {
/**
* Returns the config value whether hyperspace is enabled or not.
*/
def hyperspaceApplyEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.HYPERSPACE_APPLY_ENABLED,
IndexConstants.HYPERSPACE_APPLY_ENABLED_DEFAULT)
.toBoolean
}
def setHyperspaceApplyEnabled(spark: SparkSession, apply: Boolean): Unit = {
spark.conf.set(IndexConstants.HYPERSPACE_APPLY_ENABLED, apply.toString)
}
def hybridScanEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(

Просмотреть файл

@ -0,0 +1,198 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import com.microsoft.hyperspace.index.{Content, FileIdTracker, HyperspaceSuite, IndexConfig, IndexConstants}
import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY
import com.microsoft.hyperspace.util.FileUtils
class HyperspaceExtensionTest extends HyperspaceSuite {
private val sampleDeptDataLocation = inTempDir("dept")
private val sampleEmpDataLocation = inTempDir("emp")
private val departments = Seq(
(10, "Accounting", "New York"),
(20, "Research", "Dallas"),
(30, "Sales", "Chicago"),
(40, "Operations", "Boston"))
private val employees = Seq(
(7369, "SMITH", 20),
(7499, "ALLEN", 30),
(7521, "WARD", 30),
(7566, "JONES", 20),
(7698, "BLAKE", 30),
(7782, "CLARK", 10),
(7788, "SCOTT", 20),
(7839, "KING", 10),
(7844, "TURNER", 30),
(7876, "ADAMS", 20),
(7900, "JAMES", 30),
(7934, "MILLER", 10),
(7902, "FORD", 20),
(7654, "MARTIN", 30))
override protected lazy val spark: SparkSession = SparkSession
.builder()
.master(s"local[$numParallelism]")
.config(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger")
.config("delta.log.cacheSize", "3")
.config("spark.databricks.delta.snapshotPartitions", "2")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config(
"spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension," +
"com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
.config("spark.sql.shuffle.partitions", "5")
.config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5")
.config("spark.ui.enabled", "false")
.config("spark.ui.showConsoleProgress", "false")
.appName(suiteName)
.getOrCreate()
override def beforeAll(): Unit = {
super.beforeAll()
val sparkSession = spark
import sparkSession.implicits._
FileUtils.delete(new Path(sampleDeptDataLocation))
FileUtils.delete(new Path(sampleEmpDataLocation))
departments
.toDF("deptId", "deptName", "location")
.write
.mode("overwrite")
.parquet(sampleDeptDataLocation)
employees
.toDF("empId", "empName", "deptId")
.write
.mode("overwrite")
.parquet(sampleEmpDataLocation)
}
override def afterAll(): Unit = {
FileUtils.delete(new Path(sampleDeptDataLocation))
FileUtils.delete(new Path(sampleEmpDataLocation))
super.beforeAll()
}
test("Verify ApplyHyperspace is used with hyperspace extension session") {
MockEventLogger.reset()
val deptDF = spark.read.parquet(sampleDeptDataLocation)
val empDF = spark.read.parquet(sampleEmpDataLocation)
val deptIndexConfig = IndexConfig("deptIndex", Seq("deptId"), Seq("deptName"))
val empIndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))
// Create Hyperspace indexes.
val hs = new Hyperspace(spark)
hs.createIndex(deptDF, deptIndexConfig)
hs.createIndex(empDF, empIndexConfig)
// Make sure new index is available to all.
assert(Hyperspace.getContext(spark).indexCollectionManager.indexes.count == 2)
def filterQuery(): DataFrame = deptDF.filter("deptId == '30'").select("deptId", "deptName")
verifyIndexUsage(filterQuery, getIndexFilesPath(deptIndexConfig.indexName))
def eqJoinQuery(): DataFrame =
empDF
.join(deptDF, empDF("deptId") === deptDF("deptId"))
.select(empDF("empName"), deptDF("deptName"))
verifyIndexUsage(
eqJoinQuery,
getIndexFilesPath(deptIndexConfig.indexName) ++ getIndexFilesPath(empIndexConfig.indexName))
}
/**
* Verify that the query plan has the expected rootPaths.
*
* @param optimizedPlan the optimized query plan.
* @param expectedPaths the expected paths in the query plan.
*/
private def verifyQueryPlanHasExpectedRootPaths(
optimizedPlan: LogicalPlan,
expectedPaths: Seq[Path]): Unit = {
assert(getAllRootPaths(optimizedPlan).sortBy(_.getName) === expectedPaths.sortBy(_.getName))
}
/**
* Get all rootPaths from a query plan.
*
* @param optimizedPlan the optimized query plan.
* @return a sequence of [[Path]].
*/
private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = {
optimizedPlan.collect {
case LogicalRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_,
_,
_) =>
location.rootPaths
}.flatten
}
private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
versions.flatMap { v =>
Content
.fromDirectory(
new Path(systemPath, s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$v"),
new FileIdTracker,
new Configuration)
.files
}
}
/**
* Gets the sorted rows from the given dataframe to make it easy to compare with
* other dataframe.
*
* @param df dataframe to collect rows from.
* @return sorted rows.
*/
private def getSortedRows(df: DataFrame): Array[Row] = {
df.orderBy(df.columns.head, df.columns.tail: _*).collect()
}
private def verifyIndexUsage(f: () => DataFrame, expectedRootPaths: Seq[Path]): Unit = {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = f()
val schemaWithHyperspaceDisabled = dfWithHyperspaceDisabled.schema
val sortedRowsWithHyperspaceDisabled = getSortedRows(dfWithHyperspaceDisabled)
spark.enableHyperspace()
val dfWithHyperspaceEnabled = f()
verifyQueryPlanHasExpectedRootPaths(
dfWithHyperspaceEnabled.queryExecution.optimizedPlan,
expectedRootPaths)
assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema))
assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled)))
}
}

Просмотреть файл

@ -33,7 +33,7 @@ import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndexConfig
import com.microsoft.hyperspace.index.dataskipping.sketches.MinMaxSketch
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector}
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils}
class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
private val testDir = inTempDir("e2eTests")
@ -91,12 +91,16 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
spark.sessionState.experimentalMethods.extraStrategies
.containsSlice(expectedOptimizationStrategy))
// Since applyHyperspace is called before, extraOptimization contains ApplyHyperspace
// This behavior has changed according to following discussion:
// https://github.com/microsoft/hyperspace/pull/504/files#r740278070
spark.disableHyperspace()
assert(!HyperspaceConf.hyperspaceApplyEnabled(spark))
assert(
!spark.sessionState.experimentalMethods.extraOptimizations
spark.sessionState.experimentalMethods.extraOptimizations
.containsSlice(expectedOptimizationRuleBatch))
assert(
!spark.sessionState.experimentalMethods.extraStrategies
spark.sessionState.experimentalMethods.extraStrategies
.containsSlice(expectedOptimizationStrategy))
}