Data Skipping Index Part 1: Refactoring (#481)

This commit is contained in:
Chungmin Lee 2021-07-29 14:55:55 +09:00 коммит произвёл GitHub
Родитель 12e61c68ec
Коммит 467a891ec2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 263 добавлений и 114 удалений

Просмотреть файл

@ -1,36 +0,0 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.covering
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap
import com.microsoft.hyperspace.index.rules.QueryPlanIndexFilter
/**
* Filters out indexes which are not [[CoveringIndex]].
*/
object CoveringIndexFilter extends QueryPlanIndexFilter {
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
candidateIndexes
.map {
case (plan, indexes) =>
plan -> indexes.filter(_.derivedDataset.isInstanceOf[CoveringIndex])
}
.filter { case (_, indexes) => indexes.nonEmpty }
}
}

Просмотреть файл

@ -29,21 +29,10 @@ import org.apache.spark.sql.types.{LongType, StructType}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.index.rules.RuleUtils
import com.microsoft.hyperspace.util.HyperspaceConf
object RuleUtils {
/**
* Check if an index was applied the given relation or not.
* This can be determined by an identifier in [[FileBasedRelation]]'s options.
*
* @param relation FileBasedRelation to check if an index is already applied.
* @return true if an index is applied to the given relation. Otherwise false.
*/
def isIndexApplied(relation: FileBasedRelation): Boolean = {
relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
}
object CoveringIndexRuleUtils {
/**
* Transform the current plan to utilize the given index.
@ -70,7 +59,7 @@ object RuleUtils {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
// Check pre-requisite.
val relation = getRelation(spark, plan)
val relation = RuleUtils.getRelation(spark, plan)
assert(relation.isDefined)
// If there is no change in source data files, the index can be applied by
@ -93,23 +82,6 @@ object RuleUtils {
transformed
}
/**
* Extract the relation node if the given logical plan is linear.
*
* @param plan Logical plan to extract a relation node from.
* @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]]
* object that wraps the relation node. Otherwise None.
*/
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}
/**
* Transform the current plan to utilize index.
* The transformed plan reads data from indexes instead of the source relations.

Просмотреть файл

@ -18,13 +18,11 @@ package com.microsoft.hyperspace.index.covering
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules._
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
/**
@ -139,17 +137,6 @@ object FilterRankFilter extends IndexRankFilter {
}
}
object ExtractRelation extends ActiveSparkSession {
def unapply(plan: LeafNode): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
if (provider.isSupportedRelation(plan)) {
Some(provider.getRelation(plan))
} else {
None
}
}
}
/**
* FilterIndexRule looks for opportunities in a logical plan to replace
* a relation with an available hash partitioned index according to columns in
@ -157,7 +144,7 @@ object ExtractRelation extends ActiveSparkSession {
*/
object FilterIndexRule extends HyperspaceRule {
override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
CoveringIndexFilter :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil
IndexTypeFilter[CoveringIndex]() :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil
override val indexRanker: IndexRankFilter = FilterRankFilter
@ -169,7 +156,7 @@ object FilterIndexRule extends HyperspaceRule {
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
RuleUtils.transformPlanToUseIndex(
CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
indexes.head._2,
plan,

Просмотреть файл

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.shim.JoinWithoutHint
@ -618,7 +618,11 @@ object JoinRankFilter extends IndexRankFilter {
object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {
override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
CoveringIndexFilter :: JoinPlanNodeFilter :: JoinAttributeFilter :: JoinColumnFilter :: Nil
IndexTypeFilter[CoveringIndex]() ::
JoinPlanNodeFilter ::
JoinAttributeFilter ::
JoinColumnFilter ::
Nil
override val indexRanker: IndexRankFilter = JoinRankFilter
@ -641,13 +645,13 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {
val updatedPlan =
join
.copy(
left = RuleUtils.transformPlanToUseIndex(
left = CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
right = CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
rIndex,
r,

Просмотреть файл

@ -0,0 +1,33 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
object ExtractRelation extends ActiveSparkSession {
def unapply(plan: LeafNode): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
if (provider.isSupportedRelation(plan)) {
Some(provider.getRelation(plan))
} else {
None
}
}
}

Просмотреть файл

@ -0,0 +1,49 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import scala.reflect.ClassTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.microsoft.hyperspace.index.Index
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap
object IndexTypeFilter {
/**
* Returns a [[QueryPlanIndexFilter]] that filters out indexes which are not T.
*/
def apply[T <: Index: ClassTag](): QueryPlanIndexFilter =
new QueryPlanIndexFilter {
override def apply(
plan: LogicalPlan,
candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
candidateIndexes
.map {
case (plan, indexes) =>
plan -> indexes.filter {
_.derivedDataset match {
case _: T => true
case _ => false
}
}
}
.filter { case (_, indexes) => indexes.nonEmpty }
}
}
}

Просмотреть файл

@ -0,0 +1,55 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.IndexConstants
import com.microsoft.hyperspace.index.sources.FileBasedRelation
object RuleUtils {
/**
* Check if an index was applied the given relation or not.
* This can be determined by an identifier in [[FileBasedRelation]]'s options.
*
* @param relation FileBasedRelation to check if an index is already applied.
* @return true if an index is applied to the given relation. Otherwise false.
*/
def isIndexApplied(relation: FileBasedRelation): Boolean = {
relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
}
/**
* Extract the relation node if the given logical plan is linear.
*
* @param plan Logical plan to extract a relation node from.
* @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]]
* object that wraps the relation node. Otherwise None.
*/
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}
}

Просмотреть файл

@ -19,15 +19,15 @@ package com.microsoft.hyperspace.index.covering
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, IsNotNull}
import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache}
import org.apache.spark.sql.types.{IntegerType, StringType}
import com.microsoft.hyperspace.index.HyperspaceRuleSuite
import com.microsoft.hyperspace.shim.{JoinWithoutHint, RepartitionByExpressionWithOptionalNumPartitions}
import com.microsoft.hyperspace.shim.RepartitionByExpressionWithOptionalNumPartitions
class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
class CoveringIndexRuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
override val indexLocationDirName = "ruleUtilsTest"
val t1c1 = AttributeReference("t1c1", IntegerType)()
@ -85,20 +85,6 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
}
test("Verify get logical relation for single logical relation node plan.") {
validateLogicalRelation(t1ScanNode, t1ScanNode)
}
test("Verify get logical relation for multi-node linear plan.") {
validateLogicalRelation(t1ProjectNode, t1ScanNode)
}
test("Verify get logical relation for non-linear plan.") {
val joinNode = JoinWithoutHint(t1ProjectNode, t2ProjectNode, JoinType("inner"), None)
val r = RuleUtils.getRelation(spark, Project(Seq(t1c3, t2c3), joinNode))
assert(r.isEmpty)
}
test("Verify the location of injected shuffle for Hybrid Scan.") {
withTempPath { tempPath =>
val dataPath = tempPath.getAbsolutePath
@ -112,7 +98,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
val df = spark.read.parquet(dataPath)
val query = df.filter(df("id") >= 3).select("id", "name")
val bucketSpec = BucketSpec(100, Seq("id"), Seq())
val shuffled = RuleUtils.transformPlanToShuffleUsingBucketSpec(
val shuffled = CoveringIndexRuleUtils.transformPlanToShuffleUsingBucketSpec(
bucketSpec,
query.queryExecution.optimizedPlan)
@ -146,7 +132,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
val bucketSpec2 = BucketSpec(100, Seq("age"), Seq())
val query2 = df.filter(df("id") <= 3).select("id", "name")
val shuffled2 =
RuleUtils.transformPlanToShuffleUsingBucketSpec(
CoveringIndexRuleUtils.transformPlanToShuffleUsingBucketSpec(
bucketSpec2,
query2.queryExecution.optimizedPlan)
assert(shuffled2.collect {
@ -163,10 +149,4 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
}.length == 1)
}
}
private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {
val r = RuleUtils.getRelation(spark, plan)
assert(r.isDefined)
assert(r.get.plan.equals(expected))
}
}

Просмотреть файл

@ -26,7 +26,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType}
import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{HyperspaceRuleSuite, IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.RuleUtils
import com.microsoft.hyperspace.util.FileUtils
class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper {

Просмотреть файл

@ -0,0 +1,106 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, IsNotNull}
import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache}
import org.apache.spark.sql.types.{IntegerType, StringType}
import com.microsoft.hyperspace.index.HyperspaceRuleSuite
import com.microsoft.hyperspace.shim.JoinWithoutHint
class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
override val indexLocationDirName = "ruleUtilsTest"
val t1c1 = AttributeReference("t1c1", IntegerType)()
val t1c2 = AttributeReference("t1c2", StringType)()
val t1c3 = AttributeReference("t1c3", IntegerType)()
val t1c4 = AttributeReference("t1c4", StringType)()
val t2c1 = AttributeReference("t2c1", IntegerType)()
val t2c2 = AttributeReference("t2c2", StringType)()
val t2c3 = AttributeReference("t2c3", IntegerType)()
val t2c4 = AttributeReference("t2c4", StringType)()
val t1Schema = schemaFromAttributes(t1c1, t1c2, t1c3, t1c4)
val t2Schema = schemaFromAttributes(t2c1, t2c2, t2c3, t2c4)
var t1Relation: HadoopFsRelation = _
var t2Relation: HadoopFsRelation = _
var t1ScanNode: LogicalRelation = _
var t2ScanNode: LogicalRelation = _
var t1FilterNode: Filter = _
var t2FilterNode: Filter = _
var t1ProjectNode: Project = _
var t2ProjectNode: Project = _
override def beforeAll(): Unit = {
super.beforeAll()
val t1Location =
new InMemoryFileIndex(spark, Seq(new Path("t1")), Map.empty, Some(t1Schema), NoopCache)
val t2Location =
new InMemoryFileIndex(spark, Seq(new Path("t2")), Map.empty, Some(t2Schema), NoopCache)
t1Relation = baseRelation(t1Location, t1Schema)
t2Relation = baseRelation(t2Location, t2Schema)
t1ScanNode = LogicalRelation(t1Relation, Seq(t1c1, t1c2, t1c3, t1c4), None, false)
t2ScanNode = LogicalRelation(t2Relation, Seq(t2c1, t2c2, t2c3, t2c4), None, false)
t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode)
t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode)
t1ProjectNode = Project(Seq(t1c1, t1c3), t1FilterNode)
// Project [t1c1#0, t1c3#2]
// +- Filter isnotnull(t1c1#0)
// +- Relation[t1c1#0,t1c2#1,t1c3#2,t1c4#3] parquet
t2ProjectNode = Project(Seq(t2c1, t2c3), t2FilterNode)
// Project [t2c1#4, t2c3#6]
// +- Filter isnotnull(t2c1#4)
// +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet
createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode)
createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
}
test("Verify get logical relation for single logical relation node plan.") {
validateLogicalRelation(t1ScanNode, t1ScanNode)
}
test("Verify get logical relation for multi-node linear plan.") {
validateLogicalRelation(t1ProjectNode, t1ScanNode)
}
test("Verify get logical relation for non-linear plan.") {
val joinNode = JoinWithoutHint(t1ProjectNode, t2ProjectNode, JoinType("inner"), None)
val r = RuleUtils.getRelation(spark, Project(Seq(t1c3, t2c3), joinNode))
assert(r.isEmpty)
}
private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {
val r = RuleUtils.getRelation(spark, plan)
assert(r.isDefined)
assert(r.get.plan.equals(expected))
}
}