Add ApplyHyperspace rule definitions (#427)

This commit is contained in:
EJ Song 2021-05-07 17:41:42 -07:00 коммит произвёл GitHub
Родитель 5aee0c8fb6
Коммит a0f3dd3e5c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 305 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,130 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap
import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging
/**
* Collect candidate indexes for each source plan.
*/
object CandidateIndexCollector extends ActiveSparkSession {
// TODO: ColumnSchemaFilter :: FileSignatureFilter :: Nil
private val sourceFilters: Seq[SourcePlanIndexFilter] = Nil
private def initializePlanToIndexes(
plan: LogicalPlan,
indexes: Seq[IndexLogEntry]): PlanToIndexesMap = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
plan.collect {
case l: LeafNode if provider.isSupportedRelation(l) =>
(l.asInstanceOf[LogicalPlan], indexes)
}.toMap
}
/**
* Extract candidate indexes for each source plan in the given query plan.
*
* @param plan Original query plan
* @param allIndexes All indexes
* @return Map of source plan to candidate indexes
*/
def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]): PlanToIndexesMap = {
val planToIndexes = initializePlanToIndexes(plan, allIndexes)
planToIndexes.flatMap {
case (node, allIndexes) =>
Some(node, sourceFilters.foldLeft(allIndexes) { (indexes, filter) =>
filter(node, indexes)
}).filter(_._2.nonEmpty)
}
}
}
/**
* Apply Hyperspace indexes based on the score of each index application.
*/
class ScoreBasedIndexPlanOptimizer {
// TODO: FilterIndexRule :: JoinIndexRule :: Nil
private val rules: Seq[HyperspaceRule] = NoOpRule :: Nil
// Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s
// and its value is a pair of best transformed plan and its score.
private val scoreMap: mutable.HashMap[LogicalPlan, (LogicalPlan, Int)] = mutable.HashMap()
private def recApply(plan: LogicalPlan, indexes: PlanToIndexesMap): (LogicalPlan, Int) = {
// If pre-calculated value exists, return it.
scoreMap.get(plan).foreach(res => return res)
val optResult = (plan, 0)
// TODO apply indexes recursively.
scoreMap.put(plan, optResult)
optResult
}
/**
* Transform the given query plan to use selected indexes based on score.
*
* @param plan Original query plan
* @param candidateIndexes Map of source plan to candidate indexes
* @return Transformed plan using selected indexes based on score
*/
def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): LogicalPlan = {
recApply(plan, candidateIndexes)._1
}
}
/**
* Transform the given plan to use Hyperspace indexes.
*/
object ApplyHyperspace
extends Rule[LogicalPlan]
with Logging
with HyperspaceEventLogging
with ActiveSparkSession {
type PlanToIndexesMap = Map[LogicalPlan, Seq[IndexLogEntry]]
type PlanToSelectedIndexMap = Map[LogicalPlan, IndexLogEntry]
override def apply(plan: LogicalPlan): LogicalPlan = {
val indexManager = Hyperspace
.getContext(spark)
.indexCollectionManager
val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE))
if (allIndexes.isEmpty) {
plan
} else {
try {
val candidateIndexes = CandidateIndexCollector(plan, allIndexes)
new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes)
} catch {
case e: Exception =>
logWarning("Cannot apply Hyperspace indexes: " + e.getMessage)
plan
}
}
}
}

Просмотреть файл

@ -0,0 +1,99 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.microsoft.hyperspace.ActiveSparkSession
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
/**
* Interface of exclusive type of indexes.
*/
trait HyperspaceRule extends ActiveSparkSession {
/**
* Sequence of conditions to apply indexes to the plan. Each filter contains conditions and
* filters out candidate indexes based on the conditions. The order of the sequence does matter
* because they are applied in order with the assumption that previous filter conditions were met.
*/
val filtersOnQueryPlan: Seq[QueryPlanIndexFilter]
/**
* Index ranker to select the best index among applicable indexes
* after applying [[filtersOnQueryPlan]]s.
*/
val indexRanker: RankerIndexFilter
/**
* Transform the plan to use the selected indexes.
* All selected indexes should be able to be applied to the plan.
*
* @param plan Original query plan.
* @param indexes Selected indexes.
* @return Transformed plan to use the selected indexes.
*/
def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan
/**
* Calculate the score of the selected indexes.
*
* @param plan Original query plan.
* @param indexes Selected indexes.
* @return Score of selected indexes.
*/
def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int
final def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): (LogicalPlan, Int) = {
if (candidateIndexes.isEmpty) {
return (plan, 0)
}
val applicableIndexes = filtersOnQueryPlan
.foldLeft(candidateIndexes) { (pti, filter) =>
filter(plan, pti)
}
if (applicableIndexes.nonEmpty) {
val selectedIndexes = indexRanker(plan, applicableIndexes)
(applyIndex(plan, selectedIndexes), score(plan, selectedIndexes))
} else {
(plan, 0)
}
}
}
/**
* No-op rule for traversal.
*/
object NoOpRule extends HyperspaceRule {
object FilterAll extends QueryPlanIndexFilter {
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap =
Map.empty
override def reason: String = "NoOpRule"
}
override val filtersOnQueryPlan = FilterAll :: Nil
// As there's no applicable index after [[FilterAll]], indexRanker is not reachable.
override val indexRanker = null
override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = plan
override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = 0
}

Просмотреть файл

@ -0,0 +1,76 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.rules
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.microsoft.hyperspace.ActiveSparkSession
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
trait IndexFilter {
/**
* @return Failure reason for filtered out indexes.
*/
def reason: String
}
/**
* IndexFilter used in CandidateIndexCollector.
*/
trait SourcePlanIndexFilter extends IndexFilter with ActiveSparkSession {
/**
* Filter out indexes for the given source plan.
*
* @param plan Source plan
* @param indexes Indexes
* @return Indexes which meet conditions of Filter
*/
def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry]
}
/**
* IndexFilter used in HyperspaceRule.
*/
trait QueryPlanIndexFilter extends IndexFilter with ActiveSparkSession {
/**
* Filter out candidate indexes for the given query plan.
*
* @param plan Query plan
* @param candidateIndexes Map of source plan to candidate indexes
* @return Map of source plan to applicable indexes which meet conditions of Filter
*/
def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap
}
/**
* IndexFilter used in ranking applicable indexes.
*/
trait RankerIndexFilter extends IndexFilter with ActiveSparkSession {
/**
* Rank best index for the given query plan.
*
* @param plan Query plan
* @param applicableIndexes Map of source plan to applicable indexes
* @return Map of source plan to selected index
*/
def apply(plan: LogicalPlan, applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap
}