Apply JoinIndexRule only for SortMergeJoin (#502)
This commit is contained in:
Родитель
e3b9213793
Коммит
661df177a5
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright (2021) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hyperspace.shim
|
||||
|
||||
import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
|
||||
import org.apache.spark.sql.execution.SparkPlanner
|
||||
|
||||
class SparkPlannerShim(spark: SparkSession)
|
||||
extends SparkPlanner(spark.sparkContext, spark.sessionState.conf, new ExperimentalMethods) {}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright (2021) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hyperspace.shim
|
||||
|
||||
import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
|
||||
import org.apache.spark.sql.execution.SparkPlanner
|
||||
|
||||
class SparkPlannerShim(spark: SparkSession)
|
||||
extends SparkPlanner(spark, spark.sessionState.conf, new ExperimentalMethods) {}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright (2021) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hyperspace.shim
|
||||
|
||||
import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
|
||||
import org.apache.spark.sql.execution.SparkPlanner
|
||||
|
||||
class SparkPlannerShim(spark: SparkSession)
|
||||
extends SparkPlanner(spark, new ExperimentalMethods) {}
|
|
@ -22,6 +22,8 @@ import scala.util.Try
|
|||
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
|
||||
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
|
||||
import org.apache.spark.sql.hyperspace.shim.SparkPlannerShim
|
||||
|
||||
import com.microsoft.hyperspace.Hyperspace
|
||||
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
|
||||
|
@ -70,6 +72,13 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
|
|||
isJoinConditionSupported(condition)
|
||||
}
|
||||
|
||||
val sortMergeJoinCond = withFilterReasonTag(
|
||||
plan,
|
||||
leftAndRightIndexes,
|
||||
FilterReasons.NotEligibleJoin("Not SortMergeJoin")) {
|
||||
isSortMergeJoin(plan)
|
||||
}
|
||||
|
||||
val leftPlanLinearCond =
|
||||
withFilterReasonTag(
|
||||
plan,
|
||||
|
@ -77,6 +86,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
|
|||
FilterReasons.NotEligibleJoin("Non linear left child plan")) {
|
||||
isPlanLinear(l)
|
||||
}
|
||||
|
||||
val rightPlanLinearCond =
|
||||
withFilterReasonTag(
|
||||
plan,
|
||||
|
@ -85,7 +95,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
|
|||
isPlanLinear(r)
|
||||
}
|
||||
|
||||
if (joinConditionCond && leftPlanLinearCond && rightPlanLinearCond) {
|
||||
if (sortMergeJoinCond && joinConditionCond && leftPlanLinearCond && rightPlanLinearCond) {
|
||||
// Set join query context.
|
||||
JoinIndexRule.leftRelation.set(left.get)
|
||||
JoinIndexRule.rightRelation.set(right.get)
|
||||
|
@ -109,6 +119,11 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
|
|||
}
|
||||
}
|
||||
|
||||
private def isSortMergeJoin(join: LogicalPlan): Boolean = {
|
||||
val execJoin = new SparkPlannerShim(spark).JoinSelection(join)
|
||||
execJoin.head.isInstanceOf[SortMergeJoinExec]
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a logical plan is linear. Linear means starting at the top, each node in the
|
||||
* plan has at most one child.
|
||||
|
|
|
@ -68,6 +68,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
|
|||
*/
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
|
||||
|
||||
val t1Location =
|
||||
new InMemoryFileIndex(spark, Seq(new Path("t1")), Map.empty, Some(t1Schema), NoopCache)
|
||||
|
@ -127,6 +128,23 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
|
|||
verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths)
|
||||
}
|
||||
|
||||
test("Join rule doesn't update plan if it's broadcast join.") {
|
||||
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "10241024") {
|
||||
val joinCondition = EqualTo(t1c1, t2c1)
|
||||
val originalPlan =
|
||||
Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition))
|
||||
val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE))
|
||||
val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes)
|
||||
assert(updatedPlan.equals(originalPlan))
|
||||
allIndexes.foreach { index =>
|
||||
val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS)
|
||||
assert(reasons.isDefined)
|
||||
val msg = reasons.get.map(_.verboseStr)
|
||||
assert(msg.exists(_.contains("Join condition is not eligible. Reason: Not SortMergeJoin")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Join rule works if indexes exist for case insensitive index and query.") {
|
||||
val t1c1Caps = t1c1.withName("T1C1")
|
||||
|
||||
|
@ -142,8 +160,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
|
|||
test("Join rule does not update plan if index location is not set.") {
|
||||
withSQLConf(IndexConstants.INDEX_SYSTEM_PATH -> "") {
|
||||
val joinCondition = EqualTo(t1c1, t2c1)
|
||||
val originalPlan =
|
||||
Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition))
|
||||
val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition))
|
||||
|
||||
try {
|
||||
applyJoinIndexRuleHelper(originalPlan)
|
||||
|
|
Загрузка…
Ссылка в новой задаче