From a0f3dd3e5cf8d8a4806837bf15a3eb27893c0357 Mon Sep 17 00:00:00 2001 From: EJ Song <51077614+sezruby@users.noreply.github.com> Date: Fri, 7 May 2021 17:41:42 -0700 Subject: [PATCH] Add ApplyHyperspace rule definitions (#427) --- .../index/rules/ApplyHyperspace.scala | 130 ++++++++++++++++++ .../index/rules/HyperspaceRule.scala | 99 +++++++++++++ .../hyperspace/index/rules/IndexFilter.scala | 76 ++++++++++ 3 files changed, 305 insertions(+) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala new file mode 100644 index 00000000..0f35de10 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -0,0 +1,130 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule + +import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap +import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging + +/** + * Collect candidate indexes for each source plan. + */ +object CandidateIndexCollector extends ActiveSparkSession { + // TODO: ColumnSchemaFilter :: FileSignatureFilter :: Nil + private val sourceFilters: Seq[SourcePlanIndexFilter] = Nil + + private def initializePlanToIndexes( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry]): PlanToIndexesMap = { + val provider = Hyperspace.getContext(spark).sourceProviderManager + plan.collect { + case l: LeafNode if provider.isSupportedRelation(l) => + (l.asInstanceOf[LogicalPlan], indexes) + }.toMap + } + + /** + * Extract candidate indexes for each source plan in the given query plan. + * + * @param plan Original query plan + * @param allIndexes All indexes + * @return Map of source plan to candidate indexes + */ + def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]): PlanToIndexesMap = { + val planToIndexes = initializePlanToIndexes(plan, allIndexes) + planToIndexes.flatMap { + case (node, allIndexes) => + Some(node, sourceFilters.foldLeft(allIndexes) { (indexes, filter) => + filter(node, indexes) + }).filter(_._2.nonEmpty) + } + } +} + +/** + * Apply Hyperspace indexes based on the score of each index application. + */ +class ScoreBasedIndexPlanOptimizer { + // TODO: FilterIndexRule :: JoinIndexRule :: Nil + private val rules: Seq[HyperspaceRule] = NoOpRule :: Nil + + // Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s + // and its value is a pair of best transformed plan and its score. + private val scoreMap: mutable.HashMap[LogicalPlan, (LogicalPlan, Int)] = mutable.HashMap() + + private def recApply(plan: LogicalPlan, indexes: PlanToIndexesMap): (LogicalPlan, Int) = { + // If pre-calculated value exists, return it. + scoreMap.get(plan).foreach(res => return res) + + val optResult = (plan, 0) + // TODO apply indexes recursively. + + scoreMap.put(plan, optResult) + optResult + } + + /** + * Transform the given query plan to use selected indexes based on score. + * + * @param plan Original query plan + * @param candidateIndexes Map of source plan to candidate indexes + * @return Transformed plan using selected indexes based on score + */ + def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): LogicalPlan = { + recApply(plan, candidateIndexes)._1 + } +} + +/** + * Transform the given plan to use Hyperspace indexes. + */ +object ApplyHyperspace + extends Rule[LogicalPlan] + with Logging + with HyperspaceEventLogging + with ActiveSparkSession { + + type PlanToIndexesMap = Map[LogicalPlan, Seq[IndexLogEntry]] + type PlanToSelectedIndexMap = Map[LogicalPlan, IndexLogEntry] + + override def apply(plan: LogicalPlan): LogicalPlan = { + val indexManager = Hyperspace + .getContext(spark) + .indexCollectionManager + val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + if (allIndexes.isEmpty) { + plan + } else { + try { + val candidateIndexes = CandidateIndexCollector(plan, allIndexes) + new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + } catch { + case e: Exception => + logWarning("Cannot apply Hyperspace indexes: " + e.getMessage) + plan + } + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala new file mode 100644 index 00000000..c2cd1575 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -0,0 +1,99 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} + +/** + * Interface of exclusive type of indexes. + */ +trait HyperspaceRule extends ActiveSparkSession { + + /** + * Sequence of conditions to apply indexes to the plan. Each filter contains conditions and + * filters out candidate indexes based on the conditions. The order of the sequence does matter + * because they are applied in order with the assumption that previous filter conditions were met. + */ + val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] + + /** + * Index ranker to select the best index among applicable indexes + * after applying [[filtersOnQueryPlan]]s. + */ + val indexRanker: RankerIndexFilter + + /** + * Transform the plan to use the selected indexes. + * All selected indexes should be able to be applied to the plan. + * + * @param plan Original query plan. + * @param indexes Selected indexes. + * @return Transformed plan to use the selected indexes. + */ + def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan + + /** + * Calculate the score of the selected indexes. + * + * @param plan Original query plan. + * @param indexes Selected indexes. + * @return Score of selected indexes. + */ + def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int + + final def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): (LogicalPlan, Int) = { + if (candidateIndexes.isEmpty) { + return (plan, 0) + } + + val applicableIndexes = filtersOnQueryPlan + .foldLeft(candidateIndexes) { (pti, filter) => + filter(plan, pti) + } + + if (applicableIndexes.nonEmpty) { + val selectedIndexes = indexRanker(plan, applicableIndexes) + (applyIndex(plan, selectedIndexes), score(plan, selectedIndexes)) + } else { + (plan, 0) + } + } +} + +/** + * No-op rule for traversal. + */ +object NoOpRule extends HyperspaceRule { + + object FilterAll extends QueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = + Map.empty + override def reason: String = "NoOpRule" + } + + override val filtersOnQueryPlan = FilterAll :: Nil + + // As there's no applicable index after [[FilterAll]], indexRanker is not reachable. + override val indexRanker = null + + override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = plan + + override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = 0 +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala new file mode 100644 index 00000000..dd462488 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -0,0 +1,76 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} + +trait IndexFilter { + + /** + * @return Failure reason for filtered out indexes. + */ + def reason: String +} + +/** + * IndexFilter used in CandidateIndexCollector. + */ +trait SourcePlanIndexFilter extends IndexFilter with ActiveSparkSession { + + /** + * Filter out indexes for the given source plan. + * + * @param plan Source plan + * @param indexes Indexes + * @return Indexes which meet conditions of Filter + */ + def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] +} + +/** + * IndexFilter used in HyperspaceRule. + */ +trait QueryPlanIndexFilter extends IndexFilter with ActiveSparkSession { + + /** + * Filter out candidate indexes for the given query plan. + * + * @param plan Query plan + * @param candidateIndexes Map of source plan to candidate indexes + * @return Map of source plan to applicable indexes which meet conditions of Filter + */ + def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap +} + +/** + * IndexFilter used in ranking applicable indexes. + */ +trait RankerIndexFilter extends IndexFilter with ActiveSparkSession { + + /** + * Rank best index for the given query plan. + * + * @param plan Query plan + * @param applicableIndexes Map of source plan to applicable indexes + * @return Map of source plan to selected index + */ + def apply(plan: LogicalPlan, applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap +}