From 467a891ec239d5a5dd83f0bd45e649f07e46f4a2 Mon Sep 17 00:00:00 2001 From: Chungmin Lee Date: Thu, 29 Jul 2021 14:55:55 +0900 Subject: [PATCH] Data Skipping Index Part 1: Refactoring (#481) --- .../index/covering/CoveringIndexFilter.scala | 36 ------ ...ils.scala => CoveringIndexRuleUtils.scala} | 34 +----- .../index/covering/FilterIndexRule.scala | 21 +--- .../index/covering/JoinIndexRule.scala | 12 +- .../index/rules/ExtractRelation.scala | 33 ++++++ .../index/rules/IndexTypeFilter.scala | 49 ++++++++ .../hyperspace/index/rules/RuleUtils.scala | 55 +++++++++ ...scala => CoveringIndexRuleUtilsTest.scala} | 30 +---- .../rules/CandidateIndexCollectorTest.scala | 1 - .../index/rules/RuleUtilsTest.scala | 106 ++++++++++++++++++ 10 files changed, 263 insertions(+), 114 deletions(-) delete mode 100644 src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexFilter.scala rename src/main/scala/com/microsoft/hyperspace/index/covering/{RuleUtils.scala => CoveringIndexRuleUtils.scala} (94%) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/ExtractRelation.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/IndexTypeFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala rename src/test/scala/com/microsoft/hyperspace/index/covering/{RuleUtilsTest.scala => CoveringIndexRuleUtilsTest.scala} (84%) create mode 100644 src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexFilter.scala deleted file mode 100644 index c1a9e65e..00000000 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexFilter.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (2021) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.index.covering - -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap -import com.microsoft.hyperspace.index.rules.QueryPlanIndexFilter - -/** - * Filters out indexes which are not [[CoveringIndex]]. - */ -object CoveringIndexFilter extends QueryPlanIndexFilter { - override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { - candidateIndexes - .map { - case (plan, indexes) => - plan -> indexes.filter(_.derivedDataset.isInstanceOf[CoveringIndex]) - } - .filter { case (_, indexes) => indexes.nonEmpty } - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala similarity index 94% rename from src/main/scala/com/microsoft/hyperspace/index/covering/RuleUtils.scala rename to src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala index c9ffaf26..5e66df82 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala @@ -29,21 +29,10 @@ import org.apache.spark.sql.types.{LongType, StructType} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation} -import com.microsoft.hyperspace.index.sources.FileBasedRelation +import com.microsoft.hyperspace.index.rules.RuleUtils import com.microsoft.hyperspace.util.HyperspaceConf -object RuleUtils { - - /** - * Check if an index was applied the given relation or not. - * This can be determined by an identifier in [[FileBasedRelation]]'s options. - * - * @param relation FileBasedRelation to check if an index is already applied. - * @return true if an index is applied to the given relation. Otherwise false. - */ - def isIndexApplied(relation: FileBasedRelation): Boolean = { - relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER)) - } +object CoveringIndexRuleUtils { /** * Transform the current plan to utilize the given index. @@ -70,7 +59,7 @@ object RuleUtils { useBucketSpec: Boolean, useBucketUnionForAppended: Boolean): LogicalPlan = { // Check pre-requisite. - val relation = getRelation(spark, plan) + val relation = RuleUtils.getRelation(spark, plan) assert(relation.isDefined) // If there is no change in source data files, the index can be applied by @@ -93,23 +82,6 @@ object RuleUtils { transformed } - /** - * Extract the relation node if the given logical plan is linear. - * - * @param plan Logical plan to extract a relation node from. - * @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]] - * object that wraps the relation node. Otherwise None. - */ - def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = { - val provider = Hyperspace.getContext(spark).sourceProviderManager - val leaves = plan.collectLeaves() - if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) { - Some(provider.getRelation(leaves.head)) - } else { - None - } - } - /** * Transform the current plan to utilize index. * The transformed plan reads data from indexes instead of the source relations. diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala index 8dea6c00..4fc56f8e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala @@ -18,13 +18,11 @@ package com.microsoft.hyperspace.index.covering import org.apache.spark.sql.catalyst.analysis.CleanupAliases import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} -import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.index.IndexLogEntryTags -import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} +import com.microsoft.hyperspace.index.rules._ import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} -import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} /** @@ -139,17 +137,6 @@ object FilterRankFilter extends IndexRankFilter { } } -object ExtractRelation extends ActiveSparkSession { - def unapply(plan: LeafNode): Option[FileBasedRelation] = { - val provider = Hyperspace.getContext(spark).sourceProviderManager - if (provider.isSupportedRelation(plan)) { - Some(provider.getRelation(plan)) - } else { - None - } - } -} - /** * FilterIndexRule looks for opportunities in a logical plan to replace * a relation with an available hash partitioned index according to columns in @@ -157,7 +144,7 @@ object ExtractRelation extends ActiveSparkSession { */ object FilterIndexRule extends HyperspaceRule { override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] = - CoveringIndexFilter :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil + IndexTypeFilter[CoveringIndex]() :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil override val indexRanker: IndexRankFilter = FilterRankFilter @@ -169,7 +156,7 @@ object FilterIndexRule extends HyperspaceRule { // As FilterIndexRule is not intended to support bucketed scan, we set // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause // unnecessary shuffle for appended data to apply BucketUnion for merging data. - RuleUtils.transformPlanToUseIndex( + CoveringIndexRuleUtils.transformPlanToUseIndex( spark, indexes.head._2, plan, diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala index 058afa23..6004a8b6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions -import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} +import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.shim.JoinWithoutHint @@ -618,7 +618,11 @@ object JoinRankFilter extends IndexRankFilter { object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging { override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] = - CoveringIndexFilter :: JoinPlanNodeFilter :: JoinAttributeFilter :: JoinColumnFilter :: Nil + IndexTypeFilter[CoveringIndex]() :: + JoinPlanNodeFilter :: + JoinAttributeFilter :: + JoinColumnFilter :: + Nil override val indexRanker: IndexRankFilter = JoinRankFilter @@ -641,13 +645,13 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging { val updatedPlan = join .copy( - left = RuleUtils.transformPlanToUseIndex( + left = CoveringIndexRuleUtils.transformPlanToUseIndex( spark, lIndex, l, useBucketSpec = true, useBucketUnionForAppended = true), - right = RuleUtils.transformPlanToUseIndex( + right = CoveringIndexRuleUtils.transformPlanToUseIndex( spark, rIndex, r, diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ExtractRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ExtractRelation.scala new file mode 100644 index 00000000..00d5aade --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ExtractRelation.scala @@ -0,0 +1,33 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LeafNode + +import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} +import com.microsoft.hyperspace.index.sources.FileBasedRelation + +object ExtractRelation extends ActiveSparkSession { + def unapply(plan: LeafNode): Option[FileBasedRelation] = { + val provider = Hyperspace.getContext(spark).sourceProviderManager + if (provider.isSupportedRelation(plan)) { + Some(provider.getRelation(plan)) + } else { + None + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexTypeFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexTypeFilter.scala new file mode 100644 index 00000000..a16875cf --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexTypeFilter.scala @@ -0,0 +1,49 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.Index +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap + +object IndexTypeFilter { + + /** + * Returns a [[QueryPlanIndexFilter]] that filters out indexes which are not T. + */ + def apply[T <: Index: ClassTag](): QueryPlanIndexFilter = + new QueryPlanIndexFilter { + override def apply( + plan: LogicalPlan, + candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + candidateIndexes + .map { + case (plan, indexes) => + plan -> indexes.filter { + _.derivedDataset match { + case _: T => true + case _ => false + } + } + } + .filter { case (_, indexes) => indexes.nonEmpty } + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala new file mode 100644 index 00000000..26a60994 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -0,0 +1,55 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.Hyperspace +import com.microsoft.hyperspace.index.IndexConstants +import com.microsoft.hyperspace.index.sources.FileBasedRelation + +object RuleUtils { + + /** + * Check if an index was applied the given relation or not. + * This can be determined by an identifier in [[FileBasedRelation]]'s options. + * + * @param relation FileBasedRelation to check if an index is already applied. + * @return true if an index is applied to the given relation. Otherwise false. + */ + def isIndexApplied(relation: FileBasedRelation): Boolean = { + relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER)) + } + + /** + * Extract the relation node if the given logical plan is linear. + * + * @param plan Logical plan to extract a relation node from. + * @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]] + * object that wraps the relation node. Otherwise None. + */ + def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = { + val provider = Hyperspace.getContext(spark).sourceProviderManager + val leaves = plan.collectLeaves() + if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) { + Some(provider.getRelation(leaves.head)) + } else { + None + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtilsTest.scala similarity index 84% rename from src/test/scala/com/microsoft/hyperspace/index/covering/RuleUtilsTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtilsTest.scala index 9c87b612..ddab675a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtilsTest.scala @@ -19,15 +19,15 @@ package com.microsoft.hyperspace.index.covering import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, IsNotNull} -import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper} +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.index.HyperspaceRuleSuite -import com.microsoft.hyperspace.shim.{JoinWithoutHint, RepartitionByExpressionWithOptionalNumPartitions} +import com.microsoft.hyperspace.shim.RepartitionByExpressionWithOptionalNumPartitions -class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { +class CoveringIndexRuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { override val indexLocationDirName = "ruleUtilsTest" val t1c1 = AttributeReference("t1c1", IntegerType)() @@ -85,20 +85,6 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) } - test("Verify get logical relation for single logical relation node plan.") { - validateLogicalRelation(t1ScanNode, t1ScanNode) - } - - test("Verify get logical relation for multi-node linear plan.") { - validateLogicalRelation(t1ProjectNode, t1ScanNode) - } - - test("Verify get logical relation for non-linear plan.") { - val joinNode = JoinWithoutHint(t1ProjectNode, t2ProjectNode, JoinType("inner"), None) - val r = RuleUtils.getRelation(spark, Project(Seq(t1c3, t2c3), joinNode)) - assert(r.isEmpty) - } - test("Verify the location of injected shuffle for Hybrid Scan.") { withTempPath { tempPath => val dataPath = tempPath.getAbsolutePath @@ -112,7 +98,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { val df = spark.read.parquet(dataPath) val query = df.filter(df("id") >= 3).select("id", "name") val bucketSpec = BucketSpec(100, Seq("id"), Seq()) - val shuffled = RuleUtils.transformPlanToShuffleUsingBucketSpec( + val shuffled = CoveringIndexRuleUtils.transformPlanToShuffleUsingBucketSpec( bucketSpec, query.queryExecution.optimizedPlan) @@ -146,7 +132,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { val bucketSpec2 = BucketSpec(100, Seq("age"), Seq()) val query2 = df.filter(df("id") <= 3).select("id", "name") val shuffled2 = - RuleUtils.transformPlanToShuffleUsingBucketSpec( + CoveringIndexRuleUtils.transformPlanToShuffleUsingBucketSpec( bucketSpec2, query2.queryExecution.optimizedPlan) assert(shuffled2.collect { @@ -163,10 +149,4 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { }.length == 1) } } - - private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = { - val r = RuleUtils.getRelation(spark, plan) - assert(r.isDefined) - assert(r.get.plan.equals(expected)) - } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala index 8f276274..07c7e583 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.{HyperspaceRuleSuite, IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags} -import com.microsoft.hyperspace.index.covering.RuleUtils import com.microsoft.hyperspace.util.FileUtils class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala new file mode 100644 index 00000000..80376123 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -0,0 +1,106 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, IsNotNull} +import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} +import org.apache.spark.sql.types.{IntegerType, StringType} + +import com.microsoft.hyperspace.index.HyperspaceRuleSuite +import com.microsoft.hyperspace.shim.JoinWithoutHint + +class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { + override val indexLocationDirName = "ruleUtilsTest" + + val t1c1 = AttributeReference("t1c1", IntegerType)() + val t1c2 = AttributeReference("t1c2", StringType)() + val t1c3 = AttributeReference("t1c3", IntegerType)() + val t1c4 = AttributeReference("t1c4", StringType)() + val t2c1 = AttributeReference("t2c1", IntegerType)() + val t2c2 = AttributeReference("t2c2", StringType)() + val t2c3 = AttributeReference("t2c3", IntegerType)() + val t2c4 = AttributeReference("t2c4", StringType)() + + val t1Schema = schemaFromAttributes(t1c1, t1c2, t1c3, t1c4) + val t2Schema = schemaFromAttributes(t2c1, t2c2, t2c3, t2c4) + + var t1Relation: HadoopFsRelation = _ + var t2Relation: HadoopFsRelation = _ + var t1ScanNode: LogicalRelation = _ + var t2ScanNode: LogicalRelation = _ + var t1FilterNode: Filter = _ + var t2FilterNode: Filter = _ + var t1ProjectNode: Project = _ + var t2ProjectNode: Project = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + val t1Location = + new InMemoryFileIndex(spark, Seq(new Path("t1")), Map.empty, Some(t1Schema), NoopCache) + val t2Location = + new InMemoryFileIndex(spark, Seq(new Path("t2")), Map.empty, Some(t2Schema), NoopCache) + + t1Relation = baseRelation(t1Location, t1Schema) + t2Relation = baseRelation(t2Location, t2Schema) + + t1ScanNode = LogicalRelation(t1Relation, Seq(t1c1, t1c2, t1c3, t1c4), None, false) + t2ScanNode = LogicalRelation(t2Relation, Seq(t2c1, t2c2, t2c3, t2c4), None, false) + + t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode) + t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode) + + t1ProjectNode = Project(Seq(t1c1, t1c3), t1FilterNode) + // Project [t1c1#0, t1c3#2] + // +- Filter isnotnull(t1c1#0) + // +- Relation[t1c1#0,t1c2#1,t1c3#2,t1c4#3] parquet + + t2ProjectNode = Project(Seq(t2c1, t2c3), t2FilterNode) + // Project [t2c1#4, t2c3#6] + // +- Filter isnotnull(t2c1#4) + // +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet + + createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) + } + + test("Verify get logical relation for single logical relation node plan.") { + validateLogicalRelation(t1ScanNode, t1ScanNode) + } + + test("Verify get logical relation for multi-node linear plan.") { + validateLogicalRelation(t1ProjectNode, t1ScanNode) + } + + test("Verify get logical relation for non-linear plan.") { + val joinNode = JoinWithoutHint(t1ProjectNode, t2ProjectNode, JoinType("inner"), None) + val r = RuleUtils.getRelation(spark, Project(Seq(t1c3, t2c3), joinNode)) + assert(r.isEmpty) + } + + private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = { + val r = RuleUtils.getRelation(spark, plan) + assert(r.isDefined) + assert(r.get.plan.equals(expected)) + } +}