Introduce ZOrderCoveringIndex (#518)
This commit is contained in:
Родитель
64aae2f53a
Коммит
1adddf6d54
|
@ -56,6 +56,23 @@ object IndexConstants {
|
|||
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
|
||||
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"
|
||||
|
||||
// Config to determine max file size for ZOrderCoveringIndex.
|
||||
// It's an approximate value as it's based on the summation of size of source files
|
||||
// which can vary on file format and compression rates.
|
||||
val INDEX_ZORDER_TARGET_SOURCE_BYTES_PER_PARTITION =
|
||||
"spark.hyperspace.index.zorder.targetSourceBytesPerPartition"
|
||||
val INDEX_ZORDER_TARGET_SOURCE_BYTES_PER_PARTITION_DEFAULT = "1073741824" // 1G
|
||||
|
||||
// If enabled, Z-address will be calculated using percentile number for numeric column types
|
||||
// instead of actual column value. It can mitigate skewed data issue. It is disabled by default
|
||||
// as collecting quantiles takes longer than getting only min/max values.
|
||||
val INDEX_ZORDER_QUANTILE_ENABLED = "spark.hyperspace.index.zorder.quantile.enabled"
|
||||
val INDEX_ZORDER_QUANTILE_ENABLED_DEFAULT = "false"
|
||||
|
||||
// relativeError value when collecting approximate quantiles for numeric columns.
|
||||
val INDEX_ZORDER_QUANTILE_RELATIVE_ERROR = "spark.hyperspace.index.zorder.quantile.relativeError"
|
||||
val INDEX_ZORDER_QUANTILE_RELATIVE_ERROR_DEFAULT = "0.01"
|
||||
|
||||
// TODO: Remove dev config when nested column is fully supported.
|
||||
val DEV_NESTED_COLUMN_ENABLED = "spark.hyperspace.dev.index.nestedColumn.enabled"
|
||||
val DEV_NESTED_COLUMN_ENABLED_DEFAULT = "false"
|
||||
|
|
|
@ -32,8 +32,8 @@ import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn
|
|||
*/
|
||||
case class CoveringIndex(
|
||||
override val indexedColumns: Seq[String],
|
||||
includedColumns: Seq[String],
|
||||
schema: StructType,
|
||||
override val includedColumns: Seq[String],
|
||||
override val schema: StructType,
|
||||
numBuckets: Int,
|
||||
override val properties: Map[String, String])
|
||||
extends CoveringIndexTrait {
|
||||
|
@ -46,14 +46,14 @@ case class CoveringIndex(
|
|||
copy(properties = newProperties)
|
||||
}
|
||||
|
||||
protected def copyIndex(
|
||||
override protected def copyIndex(
|
||||
indexedCols: Seq[String],
|
||||
includedCols: Seq[String],
|
||||
schema: StructType): CoveringIndex = {
|
||||
copy(indexedColumns = indexedCols, includedColumns = includedCols, schema = schema)
|
||||
}
|
||||
|
||||
protected def write(ctx: IndexerContext, indexData: DataFrame, mode: SaveMode): Unit = {
|
||||
override protected def write(ctx: IndexerContext, indexData: DataFrame, mode: SaveMode): Unit = {
|
||||
// Run job
|
||||
val repartitionedIndexData = {
|
||||
// We are repartitioning with normalized columns (e.g., flattened nested column).
|
||||
|
@ -84,21 +84,21 @@ case class CoveringIndex(
|
|||
(indexedColumns, includedColumns, numBuckets)
|
||||
}
|
||||
|
||||
def bucketSpec: Option[BucketSpec] =
|
||||
override def bucketSpec: Option[BucketSpec] =
|
||||
Some(
|
||||
BucketSpec(
|
||||
numBuckets = numBuckets,
|
||||
bucketColumnNames = indexedColumns,
|
||||
sortColumnNames = indexedColumns))
|
||||
|
||||
protected def simpleStatistics: Map[String, String] = {
|
||||
override protected def simpleStatistics: Map[String, String] = {
|
||||
Map(
|
||||
"includedColumns" -> includedColumns.mkString(", "),
|
||||
"numBuckets" -> numBuckets.toString,
|
||||
"schema" -> schema.json)
|
||||
}
|
||||
|
||||
protected def extendedStatistics: Map[String, String] = {
|
||||
override protected def extendedStatistics: Map[String, String] = {
|
||||
Map("hasLineage" -> hasLineageColumn.toString)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,8 @@ import com.microsoft.hyperspace.util.HyperspaceConf
|
|||
*/
|
||||
case class CoveringIndexConfig(
|
||||
override val indexName: String,
|
||||
indexedColumns: Seq[String],
|
||||
includedColumns: Seq[String] = Seq())
|
||||
override val indexedColumns: Seq[String],
|
||||
override val includedColumns: Seq[String] = Seq())
|
||||
extends CoveringIndexConfigTrait {
|
||||
|
||||
override def createIndex(
|
||||
|
|
|
@ -21,9 +21,9 @@ import java.util.Locale
|
|||
import com.microsoft.hyperspace.index._
|
||||
|
||||
trait CoveringIndexConfigTrait extends IndexConfigTrait {
|
||||
val indexName: String
|
||||
val indexedColumns: Seq[String]
|
||||
val includedColumns: Seq[String]
|
||||
def indexName: String
|
||||
def indexedColumns: Seq[String]
|
||||
def includedColumns: Seq[String]
|
||||
|
||||
if (indexName.isEmpty || indexedColumns.isEmpty) {
|
||||
throw new IllegalArgumentException("Empty index name or indexed columns are not allowed.")
|
||||
|
|
|
@ -30,8 +30,8 @@ import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn
|
|||
* slice of source data including indexedColumns and includedColumns.
|
||||
*/
|
||||
trait CoveringIndexTrait extends Index {
|
||||
val schema: StructType
|
||||
val includedColumns: Seq[String]
|
||||
def schema: StructType
|
||||
def includedColumns: Seq[String]
|
||||
def bucketSpec: Option[BucketSpec]
|
||||
protected def write(ctx: IndexerContext, indexData: DataFrame, mode: SaveMode)
|
||||
protected def copyIndex(
|
||||
|
|
|
@ -16,8 +16,7 @@
|
|||
|
||||
package com.microsoft.hyperspace.index.covering
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
|
||||
|
||||
import com.microsoft.hyperspace.index.IndexLogEntryTags
|
||||
|
@ -66,25 +65,7 @@ object FilterColumnFilter extends QueryPlanIndexFilter {
|
|||
return Map.empty
|
||||
}
|
||||
|
||||
val (filterColumnNames, projectColumnNames) = plan match {
|
||||
case project @ Project(_, _ @Filter(condition: Expression, ExtractRelation(relation)))
|
||||
if !RuleUtils.isIndexApplied(relation) =>
|
||||
val projectColumnNames = CleanupAliases(project)
|
||||
.asInstanceOf[Project]
|
||||
.projectList
|
||||
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
|
||||
.flatMap(_.toSeq)
|
||||
val filterColumnNames = condition.references.map(_.name).toSeq
|
||||
(filterColumnNames, projectColumnNames)
|
||||
|
||||
case Filter(condition: Expression, ExtractRelation(relation))
|
||||
if !RuleUtils.isIndexApplied(relation) =>
|
||||
val relationColumnNames = relation.plan.output.map(_.name)
|
||||
val filterColumnNames = condition.references.map(_.name).toSeq
|
||||
(filterColumnNames, relationColumnNames)
|
||||
case _ =>
|
||||
(Seq(), Seq())
|
||||
}
|
||||
val (projectColumnNames, filterColumnNames) = RuleUtils.getProjectAndFilterColumns(plan)
|
||||
|
||||
// Filter candidate indexes if:
|
||||
// 1. Filter predicate's columns include the first 'indexed' column of the index.
|
||||
|
@ -173,7 +154,6 @@ object FilterIndexRule extends HyperspaceRule {
|
|||
}
|
||||
|
||||
val candidateIndex = indexes.head._2
|
||||
// Filter index rule
|
||||
val relation = RuleUtils.getRelation(spark, plan).get
|
||||
val commonBytes = candidateIndex
|
||||
.getTagValue(relation.plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES)
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
package com.microsoft.hyperspace.index.rules
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
|
||||
|
||||
import com.microsoft.hyperspace.Hyperspace
|
||||
import com.microsoft.hyperspace.index.IndexConstants
|
||||
|
@ -52,4 +54,33 @@ object RuleUtils {
|
|||
None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract project and filter columns when the given plan is Project-Filter-Relation
|
||||
* or Filter-Relation. Otherwise, return empty lists.
|
||||
*
|
||||
* @param plan Logical plan to extract project and filter columns.
|
||||
* @return A pair of project column names and filter column names
|
||||
*/
|
||||
def getProjectAndFilterColumns(plan: LogicalPlan): (Seq[String], Seq[String]) = {
|
||||
plan match {
|
||||
case project @ Project(_, _ @Filter(condition: Expression, ExtractRelation(relation)))
|
||||
if !isIndexApplied(relation) =>
|
||||
val projectColumnNames = CleanupAliases(project)
|
||||
.asInstanceOf[Project]
|
||||
.projectList
|
||||
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
|
||||
.flatMap(_.toSeq)
|
||||
val filterColumnNames = condition.references.map(_.name).toSeq
|
||||
(projectColumnNames, filterColumnNames)
|
||||
|
||||
case Filter(condition: Expression, ExtractRelation(relation))
|
||||
if !isIndexApplied(relation) =>
|
||||
val relationColumnNames = relation.plan.output.map(_.name)
|
||||
val filterColumnNames = condition.references.map(_.name).toSeq
|
||||
(relationColumnNames, filterColumnNames)
|
||||
case _ =>
|
||||
(Seq(), Seq())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,13 +23,14 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
|||
import com.microsoft.hyperspace.index.covering.{FilterIndexRule, JoinIndexRule}
|
||||
import com.microsoft.hyperspace.index.dataskipping.rules.ApplyDataSkippingIndex
|
||||
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap
|
||||
import com.microsoft.hyperspace.index.zordercovering.ZOrderFilterIndexRule
|
||||
|
||||
/**
|
||||
* Apply Hyperspace indexes based on the score of each index application.
|
||||
*/
|
||||
class ScoreBasedIndexPlanOptimizer {
|
||||
private val rules: Seq[HyperspaceRule] =
|
||||
Seq(FilterIndexRule, JoinIndexRule, ApplyDataSkippingIndex, NoOpRule)
|
||||
Seq(FilterIndexRule, JoinIndexRule, ApplyDataSkippingIndex, ZOrderFilterIndexRule, NoOpRule)
|
||||
|
||||
// Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s
|
||||
// and its value is a pair of best transformed plan and its score.
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Copyright (2021) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.microsoft.hyperspace.index.zordercovering
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, SaveMode}
|
||||
import org.apache.spark.sql.catalyst.catalog.BucketSpec
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import com.microsoft.hyperspace.index._
|
||||
import com.microsoft.hyperspace.index.covering.CoveringIndexTrait
|
||||
import com.microsoft.hyperspace.util.HyperspaceConf
|
||||
|
||||
/**
|
||||
* ZOrderCoveringIndex data is stored as sorted by z-address based on the values of indexedColumns
|
||||
* so that it can substitute a data scan node in a filter query plan.
|
||||
*/
|
||||
case class ZOrderCoveringIndex(
|
||||
override val indexedColumns: Seq[String],
|
||||
override val includedColumns: Seq[String],
|
||||
override val schema: StructType,
|
||||
targetBytesPerPartition: Long,
|
||||
override val properties: Map[String, String])
|
||||
extends CoveringIndexTrait {
|
||||
|
||||
override def bucketSpec: Option[BucketSpec] = None
|
||||
|
||||
override def kind: String = ZOrderCoveringIndex.kind
|
||||
|
||||
override def kindAbbr: String = ZOrderCoveringIndex.kindAbbr
|
||||
|
||||
override def withNewProperties(newProperties: Map[String, String]): ZOrderCoveringIndex = {
|
||||
copy(properties = newProperties)
|
||||
}
|
||||
|
||||
private def collectStats(df: DataFrame, zOrderByCols: Seq[String]): Map[String, Any] = {
|
||||
val isQuantileEnabled =
|
||||
HyperspaceConf.ZOrderCovering.quantileBasedZAddressEnabled(df.sparkSession)
|
||||
val (percentileBasedCols, minMaxBasedCols) = if (!isQuantileEnabled) {
|
||||
(Nil, zOrderByCols)
|
||||
} else {
|
||||
zOrderByCols.partition(name => ZOrderField.percentileApplicableType(df(name).expr.dataType))
|
||||
}
|
||||
|
||||
val getMinMax: Seq[String] => Map[String, Any] = cols => {
|
||||
if (cols.nonEmpty) {
|
||||
val minMaxAgg = cols.flatMap { name =>
|
||||
min(df(name)).as(s"min($name)") :: max(df(name)).as(s"max($name)") :: Nil
|
||||
}
|
||||
val aggDf = df.agg(minMaxAgg.head, minMaxAgg.tail: _*)
|
||||
aggDf.head.getValuesMap(aggDf.schema.fieldNames)
|
||||
} else {
|
||||
Map.empty
|
||||
}
|
||||
}
|
||||
|
||||
val getQuantiles: Seq[String] => Map[String, Any] = cols => {
|
||||
val relativeError =
|
||||
HyperspaceConf.ZOrderCovering.quantileBasedZAddressRelativeError(df.sparkSession)
|
||||
|
||||
// Sample list always contains min/max elements, so we could get min/max in double
|
||||
// with 0.0 and 1.0 probabilities and < 1.0 relativeError.
|
||||
val res = df.stat
|
||||
.approxQuantile(cols.toArray, Array(0.0, 0.25, 0.50, 0.75, 1.0), relativeError)
|
||||
.toSeq
|
||||
res.zipWithIndex.flatMap {
|
||||
case (arr, idx) =>
|
||||
val name = cols(idx)
|
||||
val keys =
|
||||
Seq(s"min($name)", s"ap_25($name)", s"ap_50($name)", s"ap_75($name)", s"max($name)")
|
||||
keys.zip(arr).toMap
|
||||
}.toMap
|
||||
}
|
||||
|
||||
// Using par to submit both jobs concurrently.
|
||||
val resMaps = Seq((percentileBasedCols, getQuantiles), (minMaxBasedCols, getMinMax)).par.map {
|
||||
case (cols, f) =>
|
||||
f(cols)
|
||||
}.toIndexedSeq
|
||||
resMaps.head ++ resMaps.last
|
||||
}
|
||||
|
||||
override protected def write(
|
||||
ctx: IndexerContext,
|
||||
indexData: DataFrame,
|
||||
mode: SaveMode): Unit = {
|
||||
val relation = RelationUtils.getRelation(ctx.spark, indexData.queryExecution.optimizedPlan)
|
||||
val numPartitions = (relation.allFileSizeInBytes / targetBytesPerPartition).toInt.max(2)
|
||||
|
||||
if (indexedColumns.size == 1) {
|
||||
val repartitionedIndexDataFrame = {
|
||||
indexData
|
||||
.repartitionByRange(numPartitions, col(indexedColumns.head))
|
||||
.sortWithinPartitions(indexedColumns.head)
|
||||
}
|
||||
repartitionedIndexDataFrame.write
|
||||
.format("parquet")
|
||||
.mode(mode)
|
||||
.save(ctx.indexDataPath.toString)
|
||||
} else {
|
||||
// Get min/max of indexed columns and total number of record.
|
||||
|
||||
val isQuantileEnabled =
|
||||
HyperspaceConf.ZOrderCovering.quantileBasedZAddressEnabled(indexData.sparkSession)
|
||||
val stats = collectStats(indexData, indexedColumns)
|
||||
val zOrderFields = indexedColumns.map { name =>
|
||||
val min = stats(s"min($name)")
|
||||
val max = stats(s"max($name)")
|
||||
val percentile =
|
||||
if (stats.contains(s"ap_25($name)")) {
|
||||
Seq(stats(s"ap_25($name)"), stats(s"ap_50($name)"), stats(s"ap_75($name)"))
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
ZOrderField.build(
|
||||
name,
|
||||
indexData(name).expr.dataType,
|
||||
min,
|
||||
max,
|
||||
percentile,
|
||||
isQuantileEnabled)
|
||||
}
|
||||
|
||||
val zOrderUdf = ZOrderUDF(zOrderFields)
|
||||
val repartitionedIndexDataFrame = {
|
||||
indexData
|
||||
.withColumn(
|
||||
"_zaddr",
|
||||
zOrderUdf.zAddressUdf(struct(indexedColumns.map(indexData(_)): _*)))
|
||||
.repartitionByRange(numPartitions, col("_zaddr"))
|
||||
.sortWithinPartitions("_zaddr")
|
||||
.drop("_zaddr")
|
||||
}
|
||||
|
||||
repartitionedIndexDataFrame.write
|
||||
.format("parquet")
|
||||
.mode(mode)
|
||||
.save(ctx.indexDataPath.toString)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def copyIndex(
|
||||
indexedCols: Seq[String],
|
||||
includedCols: Seq[String],
|
||||
schema: StructType): ZOrderCoveringIndex = {
|
||||
copy(indexedColumns = indexedCols, includedColumns = includedCols, schema = schema)
|
||||
}
|
||||
|
||||
override def equals(o: Any): Boolean =
|
||||
o match {
|
||||
case that: ZOrderCoveringIndex => comparedData == that.comparedData
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def hashCode: Int = {
|
||||
comparedData.hashCode
|
||||
}
|
||||
|
||||
private def comparedData: (Seq[String], Seq[String]) = {
|
||||
(indexedColumns, includedColumns)
|
||||
}
|
||||
|
||||
override protected def simpleStatistics: Map[String, String] = {
|
||||
Map("includedColumns" -> includedColumns.mkString(", "), "schema" -> schema.json)
|
||||
}
|
||||
|
||||
override protected def extendedStatistics: Map[String, String] = {
|
||||
Map("hasLineage" -> hasLineageColumn.toString)
|
||||
}
|
||||
}
|
||||
|
||||
object ZOrderCoveringIndex {
|
||||
final val kind = "ZOrderCoveringIndex"
|
||||
final val kindAbbr = "ZCI"
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright (2021) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.microsoft.hyperspace.index.zordercovering
|
||||
|
||||
import org.apache.spark.sql.DataFrame
|
||||
|
||||
import com.microsoft.hyperspace.Hyperspace
|
||||
import com.microsoft.hyperspace.index._
|
||||
import com.microsoft.hyperspace.index.covering.{CoveringIndex, CoveringIndexConfigTrait}
|
||||
import com.microsoft.hyperspace.util.HyperspaceConf
|
||||
|
||||
/**
|
||||
* ZOrderCoveringIndexConfig specifies the configuration of a covering index which is z-ordered.
|
||||
*
|
||||
* Use this class to create a Z-ordered covering index with [[Hyperspace.createIndex()]].
|
||||
*
|
||||
* @param indexName Index name.
|
||||
* @param indexedColumns Columns to be used for calculating z-address.
|
||||
* @param includedColumns Columns to be included in the index.
|
||||
*/
|
||||
case class ZOrderCoveringIndexConfig(
|
||||
override val indexName: String,
|
||||
override val indexedColumns: Seq[String],
|
||||
override val includedColumns: Seq[String] = Seq())
|
||||
extends CoveringIndexConfigTrait {
|
||||
override def createIndex(
|
||||
ctx: IndexerContext,
|
||||
sourceData: DataFrame,
|
||||
properties: Map[String, String]): (ZOrderCoveringIndex, DataFrame) = {
|
||||
val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) =
|
||||
CoveringIndex.createIndexData(
|
||||
ctx,
|
||||
sourceData,
|
||||
indexedColumns,
|
||||
includedColumns,
|
||||
IndexUtils.hasLineageColumn(properties))
|
||||
|
||||
val index = ZOrderCoveringIndex(
|
||||
resolvedIndexedColumns.map(_.normalizedName),
|
||||
resolvedIncludedColumns.map(_.normalizedName),
|
||||
indexData.schema,
|
||||
HyperspaceConf.ZOrderCovering.targetSourceBytesPerPartition(ctx.spark),
|
||||
properties)
|
||||
(index, indexData)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
/*
|
||||
* Copyright (2020) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.microsoft.hyperspace.index.zordercovering
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
|
||||
import com.microsoft.hyperspace.index.IndexLogEntryTags
|
||||
import com.microsoft.hyperspace.index.covering.{CoveringIndexRuleUtils, FilterPlanNodeFilter}
|
||||
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
|
||||
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils}
|
||||
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
|
||||
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
|
||||
|
||||
/**
|
||||
* ZOrderFilterColumnFilter filters indexes out if
|
||||
* 1) an index doesn't have all required output columns.
|
||||
* 2) the filter condition doesn't include any of indexed columns of the index.
|
||||
*
|
||||
* The only difference between FilterColumnFilter for CoveringIndex is allowing all indexed columns
|
||||
* in the filter condition, not just the first indexed column.
|
||||
*/
|
||||
object ZOrderFilterColumnFilter extends QueryPlanIndexFilter {
|
||||
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
|
||||
if (candidateIndexes.isEmpty || candidateIndexes.size != 1) {
|
||||
return Map.empty
|
||||
}
|
||||
|
||||
val (projectColumnNames, filterColumnNames) = RuleUtils.getProjectAndFilterColumns(plan)
|
||||
|
||||
// Filter candidate indexes if:
|
||||
// 1. Filter predicate's columns include any of indexed columns of the index.
|
||||
// 2. The index covers all columns from the filter predicate and output columns list.
|
||||
val (rel, indexes) = candidateIndexes.head
|
||||
val filteredIndexes =
|
||||
indexes.filter { index =>
|
||||
withFilterReasonTag(
|
||||
plan,
|
||||
index,
|
||||
FilterReasons.IneligibleFilterCondition("No indexed column in filter condition")) {
|
||||
index.derivedDataset.indexedColumns.exists(
|
||||
col =>
|
||||
ResolverUtils
|
||||
.resolve(spark, col, filterColumnNames)
|
||||
.isDefined)
|
||||
} &&
|
||||
withFilterReasonTag(
|
||||
plan,
|
||||
index,
|
||||
FilterReasons.MissingRequiredCol(
|
||||
(filterColumnNames ++ projectColumnNames).toSet.mkString(","),
|
||||
index.derivedDataset.referencedColumns.mkString(","))) {
|
||||
ResolverUtils
|
||||
.resolve(
|
||||
spark,
|
||||
filterColumnNames ++ projectColumnNames,
|
||||
index.derivedDataset.referencedColumns)
|
||||
.isDefined
|
||||
}
|
||||
}
|
||||
|
||||
Map(rel -> filteredIndexes)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* IndexRankFilter selects the best applicable index.
|
||||
*/
|
||||
object ZOrderFilterRankFilter extends IndexRankFilter {
|
||||
override def apply(
|
||||
plan: LogicalPlan,
|
||||
applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap = {
|
||||
if (applicableIndexes.isEmpty || applicableIndexes.size != 1
|
||||
|| applicableIndexes.head._2.isEmpty) {
|
||||
Map.empty
|
||||
} else {
|
||||
// TODO Enhance rank algorithm for z-order covering index. Currently, we pick an index
|
||||
// with the least number of indexed column which might have a better min/max distribution
|
||||
// for data skipping. However, apparently it is not the best.
|
||||
// For example, a high cardinality indexed column could be better for data skipping.
|
||||
val selected = applicableIndexes.head._2.minBy(_.indexedColumns.length)
|
||||
setFilterReasonTagForRank(plan, applicableIndexes.head._2, selected)
|
||||
Map(applicableIndexes.head._1 -> selected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ZOrderFilterIndexRule looks for opportunities in a logical plan to replace
|
||||
* a relation with an available z-ordered index according to columns in filter predicate.
|
||||
*/
|
||||
object ZOrderFilterIndexRule extends HyperspaceRule {
|
||||
override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
|
||||
IndexTypeFilter[ZOrderCoveringIndex]() :: FilterPlanNodeFilter ::
|
||||
ZOrderFilterColumnFilter :: Nil
|
||||
|
||||
override val indexRanker: IndexRankFilter = ZOrderFilterRankFilter
|
||||
|
||||
override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = {
|
||||
if (indexes.isEmpty || (indexes.size != 1)) {
|
||||
return plan
|
||||
}
|
||||
|
||||
// As FilterIndexRule is not intended to support bucketed scan, we set
|
||||
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
|
||||
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
|
||||
CoveringIndexRuleUtils.transformPlanToUseIndex(
|
||||
spark,
|
||||
indexes.head._2,
|
||||
plan,
|
||||
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
|
||||
useBucketUnionForAppended = false)
|
||||
}
|
||||
|
||||
override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = {
|
||||
if (indexes.isEmpty || (indexes.size != 1)) {
|
||||
return 0
|
||||
}
|
||||
|
||||
val candidateIndex = indexes.head._2
|
||||
// Filter index rule
|
||||
val relation = RuleUtils.getRelation(spark, plan).get
|
||||
val commonBytes = candidateIndex
|
||||
.getTagValue(relation.plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES)
|
||||
.getOrElse {
|
||||
relation.allFileInfos.foldLeft(0L) { (res, f) =>
|
||||
if (candidateIndex.sourceFileInfoSet.contains(f)) {
|
||||
res + f.size // count, total bytes
|
||||
} else {
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Enhance scoring function.
|
||||
// See https://github.com/microsoft/hyperspace/issues/444
|
||||
// ZOrderCoveringIndex should be prior to CoveringIndex for a filter query.
|
||||
(60 * (commonBytes.toFloat / relation.allFileSizeInBytes)).round
|
||||
}
|
||||
}
|
|
@ -122,6 +122,32 @@ object HyperspaceConf {
|
|||
.toBoolean
|
||||
}
|
||||
|
||||
object ZOrderCovering {
|
||||
def targetSourceBytesPerPartition(spark: SparkSession): Long = {
|
||||
spark.conf
|
||||
.get(
|
||||
IndexConstants.INDEX_ZORDER_TARGET_SOURCE_BYTES_PER_PARTITION,
|
||||
IndexConstants.INDEX_ZORDER_TARGET_SOURCE_BYTES_PER_PARTITION_DEFAULT)
|
||||
.toLong
|
||||
}
|
||||
|
||||
def quantileBasedZAddressEnabled(spark: SparkSession): Boolean = {
|
||||
spark.conf
|
||||
.get(
|
||||
IndexConstants.INDEX_ZORDER_QUANTILE_ENABLED,
|
||||
IndexConstants.INDEX_ZORDER_QUANTILE_ENABLED_DEFAULT)
|
||||
.toBoolean
|
||||
}
|
||||
|
||||
def quantileBasedZAddressRelativeError(spark: SparkSession): Double = {
|
||||
spark.conf
|
||||
.get(
|
||||
IndexConstants.INDEX_ZORDER_QUANTILE_RELATIVE_ERROR,
|
||||
IndexConstants.INDEX_ZORDER_QUANTILE_RELATIVE_ERROR_DEFAULT)
|
||||
.toDouble
|
||||
}
|
||||
}
|
||||
|
||||
object DataSkipping {
|
||||
def targetIndexDataFileSize(spark: SparkSession): Long = {
|
||||
// TODO: Consider using a systematic way to validate the config value
|
||||
|
|
|
@ -25,7 +25,8 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil
|
|||
import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig}
|
||||
import com.microsoft.hyperspace.actions.Constants
|
||||
import com.microsoft.hyperspace.index._
|
||||
import com.microsoft.hyperspace.index.covering.{FilterIndexRule, JoinIndexRule}
|
||||
import com.microsoft.hyperspace.index.covering.{CoveringIndexConfig, FilterIndexRule, JoinIndexRule}
|
||||
import com.microsoft.hyperspace.index.zordercovering.{ZOrderCoveringIndexConfig, ZOrderFilterIndexRule}
|
||||
|
||||
class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite {
|
||||
private val testDir = inTempDir("scoreBasedIndexPlanOptimizerTest")
|
||||
|
@ -137,6 +138,53 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite {
|
|||
}
|
||||
}
|
||||
|
||||
test("Verify ZOrderFilterIndexRule is prior to FilterIndexRule.") {
|
||||
withTempPathAsString { testPath =>
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
|
||||
import spark.implicits._
|
||||
val zOrderIndexConfig =
|
||||
ZOrderCoveringIndexConfig("zindex", Seq("c3"), Seq("c4", "c1"))
|
||||
val coveringIndexConfig = CoveringIndexConfig("cindex", Seq("c3"), Seq("c4", "c1"))
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(10)
|
||||
.write
|
||||
.parquet(testPath)
|
||||
val df = spark.read.load(testPath)
|
||||
|
||||
// Create indexes.
|
||||
hyperspace.createIndex(df, zOrderIndexConfig)
|
||||
hyperspace.createIndex(df, coveringIndexConfig)
|
||||
|
||||
def query(): DataFrame =
|
||||
spark.read.parquet(testPath).filter("c3 >= 'facebook'").select("c3", "c1")
|
||||
|
||||
// For covering index, the score is 50.
|
||||
// For z-order covering index, the score is 60.
|
||||
val plan = query().queryExecution.optimizedPlan
|
||||
val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE))
|
||||
val candidateIndexes = CandidateIndexCollector.apply(plan, allIndexes)
|
||||
assert(candidateIndexes.size == 1) // 1 source relation
|
||||
assert(candidateIndexes.head._2.size == 2) // both indexes
|
||||
|
||||
val (_, coveringIndexScore) = FilterIndexRule.apply(
|
||||
plan,
|
||||
candidateIndexes.map { kv =>
|
||||
(kv._1, Seq(kv._2.find(idx => idx.name.equals("cindex")).get))
|
||||
})
|
||||
val (_, zOrderIndexScore) = ZOrderFilterIndexRule.apply(
|
||||
plan,
|
||||
candidateIndexes.map { kv =>
|
||||
(kv._1, Seq(kv._2.find(idx => idx.name.equals("zindex")).get))
|
||||
})
|
||||
assert(coveringIndexScore == 50)
|
||||
assert(zOrderIndexScore == 60)
|
||||
|
||||
verifyIndexUsage(query, getIndexFilesPath(zOrderIndexConfig.indexName))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the query plan has the expected root paths.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,463 @@
|
|||
/*
|
||||
* Copyright (2020) The Hyperspace Project Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.microsoft.hyperspace.index.zordercovering
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
||||
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
|
||||
|
||||
import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
|
||||
import com.microsoft.hyperspace.index.{Content, FileIdTracker, HyperspaceSuite, IndexConstants}
|
||||
import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
|
||||
import com.microsoft.hyperspace.index.covering.CoveringIndexConfig
|
||||
|
||||
class E2EHyperspaceZOrderIndexTest extends QueryTest with HyperspaceSuite {
|
||||
private val testDir = inTempDir("e2eTests")
|
||||
private val nonPartitionedDataPath = testDir + "/sampleparquet"
|
||||
private val partitionedDataPath = testDir + "/samplepartitionedparquet"
|
||||
private val fileSystem = new Path(nonPartitionedDataPath).getFileSystem(new Configuration)
|
||||
private var hyperspace: Hyperspace = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
|
||||
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
|
||||
hyperspace = new Hyperspace(spark)
|
||||
fileSystem.delete(new Path(testDir), true)
|
||||
|
||||
val dataColumns = Seq("c1", "c2", "c3", "c4", "c5")
|
||||
// save test data non-partitioned.
|
||||
SampleData.save(spark, nonPartitionedDataPath, dataColumns)
|
||||
|
||||
// save test data partitioned.
|
||||
SampleData.save(spark, partitionedDataPath, dataColumns, Some(Seq("c1", "c3")))
|
||||
}
|
||||
|
||||
before {
|
||||
// Clear index cache so a new test does not see stale indexes from previous ones.
|
||||
clearCache()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
fileSystem.delete(new Path(testDir), true)
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
after {
|
||||
fileSystem.delete(systemPath, true)
|
||||
spark.disableHyperspace()
|
||||
}
|
||||
|
||||
test(
|
||||
"E2E test for filter query on partitioned and non-partitioned data with and without " +
|
||||
"lineage.") {
|
||||
Seq(nonPartitionedDataPath, partitionedDataPath).foreach { loc =>
|
||||
Seq(true, false).foreach { enableLineage =>
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> enableLineage.toString) {
|
||||
withIndex("filterZOrderIndex") {
|
||||
val df = spark.read.parquet(loc)
|
||||
val indexConfig =
|
||||
ZOrderCoveringIndexConfig("filterZOrderIndex", Seq("c2", "c3"), Seq("c1"))
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
|
||||
{
|
||||
// Check z-order index is applied for the second indexed column.
|
||||
def query(): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c1")
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
{
|
||||
def query(): DataFrame =
|
||||
df.filter("c2 >= '810a20a2baa24ff3ad493bfbf064569a'").select("c3", "c1")
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("E2E test for case insensitive filter query utilizing indexes.") {
|
||||
val df = spark.read.parquet(nonPartitionedDataPath)
|
||||
val indexConfig = ZOrderCoveringIndexConfig("filterIndex", Seq("C3"), Seq("C1"))
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
def query(): DataFrame = df.filter("C3 == 'facebook'").select("C3", "c1")
|
||||
// Verify if case-insensitive index works with case-insensitive query.
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
|
||||
test("E2E test for case sensitive filter query where changing conf changes behavior.") {
|
||||
val df = spark.read.parquet(nonPartitionedDataPath)
|
||||
val indexConfig = ZOrderCoveringIndexConfig("filterIndex", Seq("c3"), Seq("c1"))
|
||||
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
def query(): DataFrame = df.filter("C3 == 'facebook'").select("C3", "c1")
|
||||
|
||||
withSQLConf("spark.sql.caseSensitive" -> "true") {
|
||||
intercept[AnalysisException] {
|
||||
query().show
|
||||
}
|
||||
}
|
||||
|
||||
withSQLConf("spark.sql.caseSensitive" -> "false") {
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
}
|
||||
|
||||
test(
|
||||
"E2E test for filter query when all columns are selected on partitioned and " +
|
||||
"non-partitioned data with and without lineage.") {
|
||||
Seq(nonPartitionedDataPath, partitionedDataPath).foreach { loc =>
|
||||
Seq(true, false).foreach { enableLineage =>
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> enableLineage.toString) {
|
||||
withIndex("filterIndex") {
|
||||
val df = spark.read.parquet(loc)
|
||||
val indexConfig =
|
||||
ZOrderCoveringIndexConfig("filterIndex", Seq("c4", "c3"), Seq("c1", "c2", "c5"))
|
||||
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
df.createOrReplaceTempView("t")
|
||||
|
||||
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1")
|
||||
|
||||
// Verify no Project node is present in the query plan, as a result of using SELECT *
|
||||
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty)
|
||||
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test(
|
||||
"Verify ZOrderFilterRule utilizes indexes correctly after incremental refresh (append-only).") {
|
||||
withTempPathAsString { testPath =>
|
||||
// Setup. Create data.
|
||||
val indexConfig = ZOrderCoveringIndexConfig("index", Seq("c2", "c3", "c4"), Seq("c1"))
|
||||
import spark.implicits._
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(10)
|
||||
.write
|
||||
.parquet(testPath)
|
||||
|
||||
val df = spark.read.load(testPath)
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
def query(): DataFrame =
|
||||
spark.read.parquet(testPath).filter("c3 == 'facebook'").select("c3", "c1")
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
|
||||
// Append to original data.
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(3)
|
||||
.write
|
||||
.mode("append")
|
||||
.parquet(testPath)
|
||||
|
||||
// Check index is not applied because of appended data.
|
||||
verifyIndexNotUsed(query)
|
||||
|
||||
hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
}
|
||||
|
||||
test("Validate index usage after incremental refresh with some source data file deleted.") {
|
||||
withTempPathAsString { testPath =>
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
|
||||
// Save a copy of source data files.
|
||||
val dataColumns = Seq("c1", "c2", "c3", "c4", "c5")
|
||||
SampleData.save(spark, testPath, dataColumns)
|
||||
|
||||
val df = spark.read.parquet(testPath)
|
||||
val indexConfig = ZOrderCoveringIndexConfig("filterIndex", Seq("c2", "c3"), Seq("c1"))
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
|
||||
// Verify index usage.
|
||||
def query(): DataFrame =
|
||||
spark.read.parquet(testPath).filter("c3 == 'facebook'").select("c3", "c1")
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
|
||||
// Delete some source data file.
|
||||
TestUtils.deleteFiles(testPath, "*parquet", 1)
|
||||
|
||||
// Verify index is not used.
|
||||
verifyIndexNotUsed(query)
|
||||
|
||||
// Refresh the index to remove deleted source data file records from index.
|
||||
hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)
|
||||
|
||||
// Verify index usage on latest version of index (v=1) after refresh.
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test(
|
||||
"Verify ZOrderFilterIndexRule utilizes indexes correctly after incremental refresh " +
|
||||
"when some file gets deleted and some appended to source data.") {
|
||||
withTempPathAsString { testPath =>
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
|
||||
import spark.implicits._
|
||||
val indexConfig = ZOrderCoveringIndexConfig("index", Seq("c5", "c3"), Seq("c4", "c1"))
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(10)
|
||||
.write
|
||||
.parquet(testPath)
|
||||
val df = spark.read.load(testPath)
|
||||
|
||||
// Create index.
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
|
||||
// Delete some source data file.
|
||||
TestUtils.deleteFiles(testPath, "*parquet", 1)
|
||||
|
||||
// Append to original data.
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(3)
|
||||
.write
|
||||
.mode("append")
|
||||
.parquet(testPath)
|
||||
|
||||
def query(): DataFrame =
|
||||
spark.read.parquet(testPath).filter("c3 >= 'facebook'").select("c3", "c1")
|
||||
verifyIndexNotUsed(query)
|
||||
|
||||
// Refresh index.
|
||||
hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)
|
||||
|
||||
// Verify indexes are used, and all index files are picked.
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
|
||||
// Verify correctness of results.
|
||||
spark.disableHyperspace()
|
||||
val dfWithHyperspaceDisabled = query()
|
||||
spark.enableHyperspace()
|
||||
val dfWithHyperspaceEnabled = query()
|
||||
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test(
|
||||
"Verify ZOrderFilterIndexRule utilizes indexes correctly after quick refresh" +
|
||||
"when some file gets deleted and some appended to source data.") {
|
||||
withTempPathAsString { testPath =>
|
||||
val indexConfig = ZOrderCoveringIndexConfig("index", Seq("c3", "c4"), Seq("c1", "c2"))
|
||||
import spark.implicits._
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(10)
|
||||
.write
|
||||
.parquet(testPath)
|
||||
val df = spark.read.load(testPath)
|
||||
val oldFiles = df.inputFiles
|
||||
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
|
||||
// Create index.
|
||||
hyperspace.createIndex(df, indexConfig)
|
||||
}
|
||||
|
||||
// Delete some source data file.
|
||||
TestUtils.deleteFiles(testPath, "*parquet", 1)
|
||||
|
||||
// Append to original data.
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(3)
|
||||
.write
|
||||
.mode("append")
|
||||
.parquet(testPath)
|
||||
|
||||
def query(): DataFrame =
|
||||
spark.read
|
||||
.parquet(testPath)
|
||||
.filter("c3 >= 'facebook' and c2 >= '2018-09-03'")
|
||||
.select("c3", "c2")
|
||||
|
||||
verifyIndexNotUsed(query)
|
||||
|
||||
// Refresh index.
|
||||
hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_QUICK)
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
|
||||
{
|
||||
val df = spark.read.parquet(testPath)
|
||||
val appendedFiles = df.inputFiles.diff(oldFiles).map(new Path(_))
|
||||
|
||||
// Verify indexes are used, and all index files are picked.
|
||||
verifyRootPaths(query, getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles)
|
||||
|
||||
// Verify correctness of results.
|
||||
spark.disableHyperspace()
|
||||
val dfWithHyperspaceDisabled = query()
|
||||
spark.enableHyperspace()
|
||||
val dfWithHyperspaceEnabled = query()
|
||||
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
|
||||
}
|
||||
|
||||
// Append to original data again.
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(1)
|
||||
.write
|
||||
.mode("append")
|
||||
.parquet(testPath)
|
||||
|
||||
// Refreshed index as quick mode won't be applied with additional appended files.
|
||||
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") {
|
||||
verifyIndexNotUsed(query)
|
||||
}
|
||||
|
||||
// Refreshed index as quick mode can be applied with Hybrid Scan config.
|
||||
withSQLConf(TestConfig.HybridScanEnabled: _*) {
|
||||
verifyIndexUsage(query, indexConfig.indexName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Verify ZOrderFilterIndexRule is prior to FilterIndexRule.") {
|
||||
withTempPathAsString { testPath =>
|
||||
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
|
||||
import spark.implicits._
|
||||
val zOrderIndexConfig =
|
||||
ZOrderCoveringIndexConfig("zindex", Seq("c3"), Seq("c4", "c1"))
|
||||
val coveringIndexConfig = CoveringIndexConfig("cindex", Seq("c3"), Seq("c4", "c1"))
|
||||
SampleData.testData
|
||||
.toDF("c1", "c2", "c3", "c4", "c5")
|
||||
.limit(10)
|
||||
.write
|
||||
.parquet(testPath)
|
||||
val df = spark.read.load(testPath)
|
||||
|
||||
// Create indexes.
|
||||
hyperspace.createIndex(df, zOrderIndexConfig)
|
||||
hyperspace.createIndex(df, coveringIndexConfig)
|
||||
|
||||
def query(): DataFrame =
|
||||
spark.read.parquet(testPath).filter("c3 >= 'facebook'").select("c3", "c1")
|
||||
verifyIndexUsage(query, zOrderIndexConfig.indexName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the query plan has the expected rootPaths.
|
||||
*
|
||||
* @param optimizedPlan the optimized query plan.
|
||||
* @param expectedPaths the expected paths in the query plan.
|
||||
*/
|
||||
private def verifyQueryPlanHasExpectedRootPaths(
|
||||
optimizedPlan: LogicalPlan,
|
||||
expectedPaths: Seq[Path]): Unit = {
|
||||
assert(getAllRootPaths(optimizedPlan).sortBy(_.getName) === expectedPaths.sortBy(_.getName))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all rootPaths from a query plan.
|
||||
*
|
||||
* @param optimizedPlan the optimized query plan.
|
||||
* @return a sequence of [[Path]].
|
||||
*/
|
||||
private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = {
|
||||
optimizedPlan.collect {
|
||||
case LogicalRelation(
|
||||
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
|
||||
_,
|
||||
_,
|
||||
_) =>
|
||||
location.rootPaths
|
||||
}.flatten
|
||||
}
|
||||
|
||||
private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
|
||||
versions.flatMap { v =>
|
||||
Content
|
||||
.fromDirectory(
|
||||
new Path(systemPath, s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$v"),
|
||||
new FileIdTracker,
|
||||
new Configuration)
|
||||
.files
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the sorted rows from the given dataframe to make it easy to compare with
|
||||
* other dataframe.
|
||||
*
|
||||
* @param df dataframe to collect rows from.
|
||||
* @return sorted rows.
|
||||
*/
|
||||
private def getSortedRows(df: DataFrame): Array[Row] = {
|
||||
df.orderBy(df.columns.head, df.columns.tail: _*).collect()
|
||||
}
|
||||
|
||||
private def verifyIndexNotUsed(f: () => DataFrame) = {
|
||||
spark.enableHyperspace()
|
||||
val dfWithHyperspaceEnabled = f()
|
||||
val planStr = dfWithHyperspaceEnabled.queryExecution.optimizedPlan.toString
|
||||
assert(!planStr.contains("Hyperspace("))
|
||||
}
|
||||
|
||||
private def verifyIndexUsage(f: () => DataFrame, expectedIndexName: String): Unit = {
|
||||
spark.disableHyperspace()
|
||||
val dfWithHyperspaceDisabled = f()
|
||||
val schemaWithHyperspaceDisabled = dfWithHyperspaceDisabled.schema
|
||||
val sortedRowsWithHyperspaceDisabled = getSortedRows(dfWithHyperspaceDisabled)
|
||||
|
||||
spark.enableHyperspace()
|
||||
val dfWithHyperspaceEnabled = f()
|
||||
|
||||
val planStr = dfWithHyperspaceEnabled.queryExecution.optimizedPlan.toString
|
||||
assert(planStr.contains(s"Hyperspace(Type: ZCI, Name: $expectedIndexName"))
|
||||
|
||||
assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema))
|
||||
assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled)))
|
||||
}
|
||||
|
||||
private def verifyRootPaths(f: () => DataFrame, expectedRootPaths: Seq[Path]): Unit = {
|
||||
spark.enableHyperspace()
|
||||
val dfWithHyperspaceEnabled = f()
|
||||
|
||||
verifyQueryPlanHasExpectedRootPaths(
|
||||
dfWithHyperspaceEnabled.queryExecution.optimizedPlan,
|
||||
expectedRootPaths)
|
||||
}
|
||||
|
||||
private def getOriginalQueryPlan(query: DataFrame => DataFrame, df: DataFrame): LogicalPlan = {
|
||||
spark.disableHyperspace()
|
||||
val p = query(df).queryExecution.optimizedPlan
|
||||
spark.enableHyperspace()
|
||||
p
|
||||
}
|
||||
|
||||
private def equalsRef(a: Set[FileIndex], b: Set[FileIndex]): Boolean = {
|
||||
a.size == b.size && a.zip(b).forall(f => f._1 eq f._2)
|
||||
}
|
||||
|
||||
private def getFsLocation(plan: LogicalPlan): Set[FileIndex] = {
|
||||
plan.collect {
|
||||
case LogicalRelation(HadoopFsRelation(loc, _, _, _, _, _), _, _, _) =>
|
||||
loc
|
||||
}.toSet
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче