Data Skipping Index Part 2: Basics (#461)

This commit is contained in:
Chungmin Lee 2021-08-10 08:44:10 +09:00 коммит произвёл GitHub
Родитель b60393a9cd
Коммит f94fda8671
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
22 изменённых файлов: 1441 добавлений и 0 удалений

Просмотреть файл

@ -127,6 +127,8 @@ ThisBuild / Test / fork := true
ThisBuild / Test / javaOptions += "-Xmx1024m"
ThisBuild / coverageExcludedPackages := "com\\.fasterxml.*;com\\.microsoft\\.hyperspace\\.shim"
/**
* Release configurations
*/

Просмотреть файл

@ -21,3 +21,4 @@ addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.8.2")

Просмотреть файл

@ -92,6 +92,8 @@ trait Index {
/**
* Writes the index data to the specified path.
*
* Any data that was already at the specified path is overwritten.
*
* @param ctx Helper object for indexing operations
* @param indexData Index data to write
*/

Просмотреть файл

@ -117,4 +117,15 @@ object IndexConstants {
// To provide multiple paths in the globbing pattern, separate them with commas, e.g.
// "/temp/1/*, /temp/2/*"
val GLOBBING_PATTERN_KEY = "spark.hyperspace.source.globbingPattern"
/**
* Target size for an index data file for data skipping indexes.
*
* Data skipping index application starts with filtering the index data with
* a translated predicate. Usually it's I/O bound and it's important to have
* index data distributed uniformly over files in terms of data size.
*/
val DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE =
"spark.hyperspace.index.dataskipping.targetIndexDataFileSize"
val DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE_DEFAULT = "268435456" // 256 MiB
}

Просмотреть файл

@ -16,7 +16,10 @@
package com.microsoft.hyperspace.index
import java.net.URLDecoder
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.util.ResolverUtils
@ -55,4 +58,10 @@ object IndexUtils {
s"from available source columns:\n${df.schema.treeString}")
}
}
/**
* Spark UDF to decode the URL-encoded string returned by input_file_name().
*/
lazy val decodeInputFileName = udf(
(p: String) => URLDecoder.decode(p.replace("+", "%2B"), "UTF-8"))
}

Просмотреть файл

@ -0,0 +1,179 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import org.apache.spark.sql.{Column, DataFrame, SaveMode}
import org.apache.spark.sql.functions.{input_file_name, min, spark_partition_id}
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.dataskipping.sketch.Sketch
import com.microsoft.hyperspace.index.dataskipping.util.{DataFrameUtils, ExpressionUtils}
import com.microsoft.hyperspace.util.HyperspaceConf
/**
* DataSkippingIndex is an index that can accelerate queries by filtering out
* files in relations using sketches.
*
* @param sketches List of sketches for this index
* @param properties Properties for this index; see [[Index.properties]] for details.
*/
case class DataSkippingIndex(
sketches: Seq[Sketch],
override val properties: Map[String, String] = Map.empty)
extends Index {
assert(sketches.nonEmpty, "At least one sketch is required.")
override def kind: String = DataSkippingIndex.kind
override def kindAbbr: String = DataSkippingIndex.kindAbbr
override def indexedColumns: Seq[String] = sketches.flatMap(_.indexedColumns).distinct
override def referencedColumns: Seq[String] = sketches.flatMap(_.referencedColumns).distinct
override def withNewProperties(newProperties: Map[String, String]): DataSkippingIndex = {
copy(properties = newProperties)
}
override def statistics(extended: Boolean = false): Map[String, String] = {
Map("sketches" -> sketches.mkString(", "))
}
override def canHandleDeletedFiles: Boolean = true
override def write(ctx: IndexerContext, indexData: DataFrame): Unit = {
writeImpl(ctx, indexData, SaveMode.Overwrite)
}
override def optimize(ctx: IndexerContext, indexDataFilesToOptimize: Seq[FileInfo]): Unit = {
val indexData = ctx.spark.read.parquet(indexDataFilesToOptimize.map(_.name): _*)
writeImpl(ctx, indexData, SaveMode.Overwrite)
}
override def refreshIncremental(
ctx: IndexerContext,
appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): (Index, Index.UpdateMode) = {
if (appendedSourceData.nonEmpty) {
writeImpl(ctx, index(ctx, appendedSourceData.get), SaveMode.Overwrite)
}
if (deletedSourceDataFiles.nonEmpty) {
val spark = ctx.spark
import spark.implicits._
val deletedFileIds = deletedSourceDataFiles.map(_.id).toDF(IndexConstants.DATA_FILE_NAME_ID)
val updatedIndexData = spark.read
.parquet(indexContent.files.map(_.toString): _*)
.join(deletedFileIds, Seq(IndexConstants.DATA_FILE_NAME_ID), "left_anti")
val writeMode = if (appendedSourceData.nonEmpty) {
SaveMode.Append
} else {
SaveMode.Overwrite
}
writeImpl(ctx, updatedIndexData, writeMode)
}
val updateMode = if (deletedSourceDataFiles.isEmpty) {
Index.UpdateMode.Merge
} else {
Index.UpdateMode.Overwrite
}
(this, updateMode)
}
override def refreshFull(
ctx: IndexerContext,
sourceData: DataFrame): (DataSkippingIndex, DataFrame) = {
val updatedIndex = copy(sketches = ExpressionUtils.resolve(ctx.spark, sketches, sourceData))
(updatedIndex, updatedIndex.index(ctx, sourceData))
}
override def equals(that: Any): Boolean =
that match {
case DataSkippingIndex(thatSketches, _) => sketches.toSet == thatSketches.toSet
case _ => false
}
override def hashCode: Int = sketches.map(_.hashCode).sum
/**
* Creates index data for the given source data.
*/
def index(ctx: IndexerContext, sourceData: DataFrame): DataFrame = {
val fileNameCol = "input_file_name"
val indexDataWithFileName = sourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(aggregateFunctions.head, aggregateFunctions.tail: _*)
// Construct a dataframe to convert file names to file ids.
val spark = ctx.spark
val relation = RelationUtils.getRelation(spark, sourceData.queryExecution.optimizedPlan)
import spark.implicits._
val fileIdDf = ctx.fileIdTracker
.getIdToFileMapping(relation.pathNormalizer)
.toDF(IndexConstants.DATA_FILE_NAME_ID, fileNameCol)
indexDataWithFileName
.join(
fileIdDf.hint("broadcast"),
IndexUtils.decodeInputFileName(indexDataWithFileName(fileNameCol)) ===
fileIdDf(fileNameCol))
.select(
IndexConstants.DATA_FILE_NAME_ID,
indexDataWithFileName.columns.filterNot(_ == fileNameCol).map(c => s"`$c`"): _*)
}
private def writeImpl(ctx: IndexerContext, indexData: DataFrame, writeMode: SaveMode): Unit = {
indexData.cache()
indexData.count() // force cache
val indexDataSize = DataFrameUtils.getSizeInBytes(indexData)
val targetIndexDataFileSize = HyperspaceConf.DataSkipping.targetIndexDataFileSize(ctx.spark)
val numFiles = indexDataSize / targetIndexDataFileSize
if (!numFiles.isValidInt) {
throw HyperspaceException(
"Could not create index data files due to too many files: " +
s"indexDataSize=$indexDataSize, targetIndexDataFileSize=$targetIndexDataFileSize")
}
val repartitionedIndexData = indexData.repartition(math.max(1, numFiles.toInt))
repartitionedIndexData.write.mode(writeMode).parquet(ctx.indexDataPath.toString)
indexData.unpersist()
}
/**
* Returns a normalized column name valid for a Parquet format.
*/
private def getNormalizeColumnName(name: String): String = {
name.replaceAll("[ ,;{}()\n\t=]", "_")
}
@transient
private lazy val aggregateFunctions = sketches.flatMap { s =>
val aggrs = s.aggregateFunctions
assert(aggrs.nonEmpty)
aggrs.zipWithIndex.map {
case (aggr, idx) =>
new Column(aggr).as(getNormalizeColumnName(s"${s}_$idx"))
}
}
}
object DataSkippingIndex {
// $COVERAGE-OFF$ https://github.com/scoverage/scalac-scoverage-plugin/issues/125
final val kind = "DataSkippingIndex"
final val kindAbbr = "DS"
// $COVERAGE-ON$
}

Просмотреть файл

@ -0,0 +1,76 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import scala.collection.mutable
import org.apache.spark.sql.DataFrame
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.{IndexConfigTrait, IndexerContext}
import com.microsoft.hyperspace.index.dataskipping.sketch.Sketch
import com.microsoft.hyperspace.index.dataskipping.util.ExpressionUtils
/**
* DataSkippingIndexConfig is used to create a [[DataSkippingIndex]] via
* [[Hyperspace.createIndex]].
*
* A sketch is a set of values for a source data file that contains aggregated
* information about columns to be indexed. For example, a MinMax sketch on a
* column X stores two values per file, min(X) and max(X) for each file.
*
* @param indexName Name of the index to create
* @param firstSketch Sketch to be used for the index
* @param moreSketches More sketches, if there are more than one sketch
*/
case class DataSkippingIndexConfig(
override val indexName: String,
firstSketch: Sketch,
moreSketches: Sketch*)
extends IndexConfigTrait {
checkDuplicateSketches(sketches)
/**
* Returns all sketches this config has.
*/
def sketches: Seq[Sketch] = firstSketch +: moreSketches
/**
* Returns the columns that the sketches reference.
*/
override def referencedColumns: Seq[String] = sketches.flatMap(_.referencedColumns)
override def createIndex(
ctx: IndexerContext,
sourceData: DataFrame,
properties: Map[String, String]): (DataSkippingIndex, DataFrame) = {
val resolvedSketches = ExpressionUtils.resolve(ctx.spark, sketches, sourceData)
checkDuplicateSketches(resolvedSketches)
val index = DataSkippingIndex(resolvedSketches, properties)
(index, index.index(ctx, sourceData))
}
private def checkDuplicateSketches(sketches: Seq[Sketch]): Unit = {
val uniqueSketches = sketches.toSet
if (uniqueSketches.size != sketches.size) {
val duplicateSketches = uniqueSketches.filter(s => sketches.count(_ == s) > 1)
throw HyperspaceException(
"Some sketches are specified multiple times: " +
s"${duplicateSketches.mkString(", ")}")
}
}
}

Просмотреть файл

@ -0,0 +1,43 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.sketch
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.{Max, Min}
import org.apache.spark.sql.types.DataType
/**
* Sketch based on minimum and maximum values for a given expression.
*
* @param expr Expression from which min/max values are calculated
* @param dataType Optional data type to specify the expected data type of the
* expression. If not specified, it is deduced automatically.
* If the actual data type of the expression is different from this,
* an error is thrown. Users are recommended to leave this parameter to
* None.
*/
case class MinMaxSketch(override val expr: String, override val dataType: Option[DataType] = None)
extends SingleExprSketch[MinMaxSketch](expr, dataType) {
override def name: String = "MinMax"
override def withNewExpression(newExpr: (String, Option[DataType])): MinMaxSketch = {
copy(expr = newExpr._1, dataType = newExpr._2)
}
override def aggregateFunctions: Seq[Expression] =
Min(parsedExpr).toAggregateExpression() :: Max(parsedExpr).toAggregateExpression() :: Nil
}

Просмотреть файл

@ -0,0 +1,85 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.sketch
import scala.reflect.ClassTag
import com.fasterxml.jackson.annotation.JsonProperty
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.DataType
/**
* Base class for sketches which are based on and can be used for a single
* expression.
*
* @param expr Expression this sketch is based on
*/
abstract class SingleExprSketch[T <: SingleExprSketch[T]](
val expr: String,
val dataType: Option[DataType])(implicit tag: ClassTag[T])
extends Sketch {
/**
* Returns the name of this sketch.
*/
def name: String
/**
* Parsed, unresolved expression of the expression string.
*
* Use this to build aggregate functions.
*/
@transient
protected final lazy val parsedExpr: Expression = {
SparkSession.getActiveSession.get.sessionState.sqlParser.parseExpression(expr)
}
/**
* Returns a copy of the sketch with an updated expression.
*/
def withNewExpression(newExpr: (String, Option[DataType])): T
final override def withNewExpressions(newExprs: Seq[(String, Option[DataType])]): T = {
assert(newExprs.length == 1)
withNewExpression(newExprs.head)
}
final override def expressions: Seq[(String, Option[DataType])] = (expr, dataType) :: Nil
final override def indexedColumns: Seq[String] =
parsedExpr.collect {
case attr: UnresolvedAttribute => attr.name
}
final override def referencedColumns: Seq[String] = indexedColumns
override def toString: String = s"$name($expr)"
final override def equals(that: Any): Boolean =
that match {
case other: T => expr == other.expr && dataType == other.dataType
case _ => false
}
final override def hashCode: Int = (tag.toString(), expr).hashCode
// Workaround for issue https://github.com/FasterXML/jackson-module-scala/issues/218
@JsonProperty("expr") private def _expr: String = expr
@JsonProperty("dataType") private def _dataType: Option[DataType] = dataType
}

Просмотреть файл

@ -0,0 +1,78 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.sketch
import com.fasterxml.jackson.annotation.JsonTypeInfo
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.DataType
/**
* Represents a sketch specification for data skipping indexes.
*
* Sketch implementations should support serialization with Jackson.
* Normally this should be done automatically without any special handling.
* When serialized as JSON objects, a special field called "type" is used
* to store the class name of the implementation. Therefore, the implementation
* must not have a field named "type".
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "type")
trait Sketch {
/**
* Returns the expressions this sketch is based on.
*/
def expressions: Seq[(String, Option[DataType])]
/**
* Returns column names that this sketch can be applied.
*/
def indexedColumns: Seq[String]
/**
* Returns column names that this sketch references.
*/
def referencedColumns: Seq[String]
/**
* Returns a copy of this sketch with updated expressions.
*/
def withNewExpressions(newExpressions: Seq[(String, Option[DataType])]): Sketch
/**
* Returns aggregate functions that can be used to compute the actual sketch
* values from source data.
*/
def aggregateFunctions: Seq[Expression]
/**
* Returns a human-readable string describing this sketch.
*/
def toString: String
/**
* Returns true if and only if this sketch equals to that.
*
* Sketches should be considered equal when their types and expressions are
* equal.
*/
def equals(that: Any): Boolean
/**
* Returns the hash code for this sketch.
*/
def hashCode: Int
}

Просмотреть файл

@ -0,0 +1,45 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.util
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
object DataFrameUtils {
/**
* Returns an estimated size of a cached dataframe in bytes.
*
* The dataframe should have been fully cached to get an accurate result.
*
* This method relies on the RDD of the dataframe. DataFrame.rdd is a lazy
* val and doesn't change once set, but `cache()` creates new RDDs when a
* dataframe is unpersisted and then cached again. Therefore, you shouldn't
* call this function with a dataframe that has been unpersisted and cached
* again.
*/
def getSizeInBytes(df: DataFrame): Long = {
def dependencies(rdd: RDD[_]): Seq[Int] = {
rdd.id +: rdd.dependencies.flatMap(d => dependencies(d.rdd))
}
val deps = dependencies(df.rdd).toSet
df.rdd.context.getRDDStorageInfo
.filter(info => deps.contains(info.id))
.map(info => info.memSize + info.diskSize)
.sum
}
}

Просмотреть файл

@ -0,0 +1,111 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.util
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project, Window}
import org.apache.spark.sql.types.DataType
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.IndexUtils
import com.microsoft.hyperspace.index.dataskipping.sketch.Sketch
object ExpressionUtils {
/**
* Returns copies of the given sketches with the indexed columns replaced by
* resolved column names and data types.
*/
def resolve(spark: SparkSession, sketches: Seq[Sketch], sourceData: DataFrame): Seq[Sketch] = {
sketches.map { s =>
val dataTypes = checkExprs(s.expressions, sourceData)
val oldColumns = s.referencedColumns
val newColumns = IndexUtils.resolveColumns(sourceData, oldColumns).map(_.name)
val columnMapping = oldColumns.zip(newColumns).toMap
val newExprs = s.expressions.map {
case (expr, _) =>
spark.sessionState.sqlParser
.parseExpression(expr)
.transformUp {
case attr: UnresolvedAttribute => QuotedAttribute(columnMapping(attr.name))
}
.sql
}
s.withNewExpressions(newExprs.zip(dataTypes.map(Some(_))))
}
}
private def checkExprs(
exprWithExpectedDataTypes: Seq[(String, Option[DataType])],
sourceData: DataFrame): Seq[DataType] = {
val (exprs, expectedDataTypes) =
(exprWithExpectedDataTypes.map(_._1), exprWithExpectedDataTypes.map(_._2))
def throwNotSupportedIf(cond: Boolean, msg: => String) = {
if (cond) {
throw HyperspaceException(s"DataSkippingIndex does not support indexing $msg")
}
}
val plan = sourceData.selectExpr(exprs: _*).queryExecution.analyzed
throwNotSupportedIf(
plan.isInstanceOf[Aggregate],
"aggregate functions: " + exprs.mkString(", "))
throwNotSupportedIf(
plan.find(_.isInstanceOf[Window]).nonEmpty,
"window functions: " + exprs.mkString(", "))
val analyzedExprs = plan.asInstanceOf[Project].projectList
exprWithExpectedDataTypes.zip(analyzedExprs).map {
case ((expr, expectedDataType), analyzedExpr) =>
val e = analyzedExpr match {
case Alias(child, _) => child
case e => e
}
throwNotSupportedIf(!e.deterministic, s"an expression which is non-deterministic: $expr")
throwNotSupportedIf(e.foldable, s"an expression which is evaluated to a constant: $expr")
throwNotSupportedIf(
e.find(_.isInstanceOf[SubqueryExpression]).nonEmpty,
s"an expression which has a subquery: $expr")
throwNotSupportedIf(
e.find(_.isInstanceOf[AttributeReference]).isEmpty,
s"an expression which does not reference source columns: $expr")
if (expectedDataType.nonEmpty && expectedDataType.get != analyzedExpr.dataType) {
throw HyperspaceException(
"Specified and analyzed data types differ: " +
s"expr=$expr, specified=${expectedDataType.get}, analyzed=${analyzedExpr.dataType}")
}
analyzedExpr.dataType
}
}
/**
* Used to workaround the issue where UnresolvedAttribute.sql() doesn't work as expected.
*/
private case class QuotedAttribute(name: String) extends LeafExpression {
override def sql: String = name
// $COVERAGE-OFF$ code never used
override def nullable: Boolean = throw new NotImplementedError
override def eval(input: InternalRow): Any = throw new NotImplementedError
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw new NotImplementedError
override def dataType: DataType = throw new NotImplementedError
// $COVERAGE-ON$
}
}

Просмотреть файл

@ -18,6 +18,7 @@ package com.microsoft.hyperspace.util
import org.apache.spark.sql.SparkSession
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.IndexConstants
/**
@ -105,6 +106,32 @@ object HyperspaceConf {
.toBoolean
}
object DataSkipping {
def targetIndexDataFileSize(spark: SparkSession): Long = {
// TODO: Consider using a systematic way to validate the config value
// like Spark's ConfigBuilder
val value = spark.conf
.get(
IndexConstants.DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE,
IndexConstants.DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE_DEFAULT)
val longValue =
try {
value.toLong
} catch {
case e: NumberFormatException =>
throw HyperspaceException(
s"${IndexConstants.DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE} " +
s"should be long, but was $value")
}
if (longValue <= 0) {
throw HyperspaceException(
s"${IndexConstants.DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE} " +
s"should be a positive number.")
}
longValue
}
}
/**
* Returns the config value whose key matches the first key given multiple keys. If no keys are
* matched, the given default value is returned.

Просмотреть файл

@ -41,6 +41,8 @@ object JsonUtils {
new SimpleModule()
.addSerializer(classOf[StructType], new StructTypeSerializer)
.addDeserializer(classOf[StructType], new StructTypeDeserializer)
.addSerializer(classOf[DataType], new DataTypeSerializer)
.addDeserializer(classOf[DataType], new DataTypeDeserializer)
}
def toJson[T: Manifest](obj: T): String = {
@ -69,4 +71,19 @@ object JsonUtils {
DataType.fromJson(p.readValueAsTree().toString).asInstanceOf[StructType]
}
}
private class DataTypeSerializer extends StdSerializer[DataType](classOf[DataType]) {
override def serialize(
value: DataType,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
gen.writeRawValue(value.json)
}
}
private class DataTypeDeserializer extends StdDeserializer[DataType](classOf[DataType]) {
override def deserialize(p: JsonParser, ctx: DeserializationContext): DataType = {
DataType.fromJson(p.readValueAsTree().toString)
}
}
}

Просмотреть файл

@ -29,6 +29,8 @@ import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
import com.microsoft.hyperspace.index.IndexLogEntryTags._
import com.microsoft.hyperspace.index.covering.JoinIndexRule
import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndexConfig
import com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector}
import com.microsoft.hyperspace.util.PathUtils
@ -1003,6 +1005,22 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
assert(hyperspace.indexes.filter("name == 'myind' and state == 'DELETED'").count() === 1)
}
test("FilterIndexRule ignores unsupported indexes.") {
val df = spark.read.parquet(nonPartitionedDataPath)
hyperspace.createIndex(df, IndexConfig("myind1", Seq("c1"), Seq("c4")))
hyperspace.createIndex(df, DataSkippingIndexConfig("myind2", MinMaxSketch("c1")))
def query(): DataFrame = df.filter("c1 == '2019-10-03'").select("c4")
verifyIndexUsage(query, getIndexFilesPath("myind1"))
}
test("JoinIndexRule ignores unsupported indexes.") {
val df = spark.read.parquet(nonPartitionedDataPath)
hyperspace.createIndex(df, IndexConfig("myind1", Seq("c1"), Seq("c4")))
hyperspace.createIndex(df, DataSkippingIndexConfig("myind2", MinMaxSketch("c1")))
def query(): DataFrame = df.as("x").join(df.as("y"), "c1").select("x.c4")
verifyIndexUsage(query, getIndexFilesPath("myind1") ++ getIndexFilesPath("myind1"))
}
/**
* Verify that the query plan has the expected rootPaths.
*

Просмотреть файл

@ -0,0 +1,105 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{input_file_name, max, min}
import org.apache.spark.sql.types.{LongType, StringType}
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.IndexConstants
import com.microsoft.hyperspace.index.dataskipping.sketch._
class DataSkippingIndexConfigTest extends DataSkippingSuite {
test("indexName returns the index name.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
assert(indexConfig.indexName === "myIndex")
}
test("sketches returns a single sketch.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
assert(indexConfig.sketches === Seq(MinMaxSketch("A")))
}
test("sketches returns multiple sketches.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"), MinMaxSketch("B"))
assert(indexConfig.sketches === Seq(MinMaxSketch("A"), MinMaxSketch("B")))
}
test("Duplicate sketches are not allowed.") {
val exception = intercept[HyperspaceException] {
DataSkippingIndexConfig("myIndex", MinMaxSketch("A"), MinMaxSketch("B"), MinMaxSketch("A"))
}
assert(exception.getMessage.contains("Some sketches are specified multiple times: MinMax(A)"))
}
test("Duplicate sketches are not allowed after the column resolution.") {
val sourceData = createSourceData(spark.range(10).toDF("A"))
val exception = intercept[HyperspaceException] {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"), MinMaxSketch("a"))
indexConfig.createIndex(ctx, sourceData, Map())
}
assert(exception.getMessage.contains("Some sketches are specified multiple times: MinMax(A)"))
}
test("referencedColumns returns referenced columns of sketches.") {
val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("A"), MinMaxSketch("B"))
assert(indexConfig.referencedColumns === Seq("A", "B"))
}
test("createIndex works correctly with a MinMaxSketch.") {
val sourceData = createSourceData(spark.range(100).toDF("A"))
val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
assert(index.sketches === Seq(MinMaxSketch("A", Some(LongType))))
val expectedSketchValues = sourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(min("A"), max("A"))
checkAnswer(indexData, withFileId(expectedSketchValues))
assert(
indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "MinMax_A__0", "MinMax_A__1"))
}
test("createIndex works correctly with file paths with special characters.") {
assume(!Path.WINDOWS)
val sourceData = createSourceData(spark.range(100).toDF("A"), "table ,.;'`~!@#$%^&()_+|\"<>")
val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
val expectedSketchValues = sourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(min("A"), max("A"))
checkAnswer(indexData, withFileId(expectedSketchValues))
}
test("createIndex resolves column names and data types.") {
val sourceData = createSourceData(spark.range(10).toDF("Foo"))
val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("foO"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
assert(index.sketches === Seq(MinMaxSketch("Foo", Some(LongType))))
}
test("createIndex throws an error if the data type is wrong.") {
val sourceData = createSourceData(spark.range(10).toDF("Foo"))
val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("foO", Some(StringType)))
val ex = intercept[HyperspaceException] {
indexConfig.createIndex(ctx, sourceData, Map())
}
assert(
ex.getMessage.contains("Specified and analyzed data types differ: " +
"expr=foO, specified=StringType, analyzed=LongType"))
}
}

Просмотреть файл

@ -0,0 +1,91 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch
class DataSkippingIndexIntegrationTest extends DataSkippingSuite {
import spark.implicits._
override val numParallelism: Int = 10
test("Non-deterministic expression is blocked.") {
val df = createSourceData(spark.range(100).toDF("A"))
val ex = intercept[HyperspaceException](
hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A + rand()"))))
assert(
ex.msg.contains("DataSkippingIndex does not support indexing an expression " +
"which is non-deterministic: A + rand()"))
}
test("Subquery expression is blocked.") {
withTable("T") {
spark.range(100).toDF("B").write.saveAsTable("T")
val df = createSourceData(spark.range(100).toDF("A"))
val ex = intercept[HyperspaceException](
hs.createIndex(
df,
DataSkippingIndexConfig("myind", MinMaxSketch("A + (select max(B) from T)"))))
assert(
ex.msg.contains("DataSkippingIndex does not support indexing an expression " +
"which has a subquery: A + (select max(B) from T)"))
}
}
test("Foldable expression is blocked.") {
val df = createSourceData(spark.range(100).toDF("A"))
val ex = intercept[HyperspaceException](
hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("1+1"))))
assert(
ex.msg.contains("DataSkippingIndex does not support indexing an expression " +
"which is evaluated to a constant: 1+1"))
}
test("Aggregate function is blocked.") {
val df = createSourceData(spark.range(100).toDF("A"))
val ex = intercept[HyperspaceException](
hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("min(A)"))))
assert(
ex.msg.contains("DataSkippingIndex does not support indexing aggregate functions: " +
"min(A)"))
}
test("Window function is blocked.") {
val df = createSourceData(spark.range(100).toDF("A"))
val ex = intercept[HyperspaceException](
hs.createIndex(
df,
DataSkippingIndexConfig(
"myind",
MinMaxSketch("min(a) over (rows between 1 preceding and 1 following)"))))
assert(
ex.msg.contains("DataSkippingIndex does not support indexing window functions: " +
"min(a) over (rows between 1 preceding and 1 following)"))
}
test("Expression not referencing the source column is blocked.") {
val df = createSourceData(spark.range(100).toDF("A"))
val f = spark.udf.register("myfunc", () => 1)
val ex = intercept[HyperspaceException](
hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("myfunc()"))))
assert(
ex.msg.contains(
"DataSkippingIndex does not support indexing an expression which does not " +
"reference source columns: myfunc()"))
}
}

Просмотреть файл

@ -0,0 +1,285 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{input_file_name, max, min}
import org.apache.spark.sql.types.IntegerType
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.{Content, FileInfo, Index, IndexConstants}
import com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch
import com.microsoft.hyperspace.util.JsonUtils
class DataSkippingIndexTest extends DataSkippingSuite {
override val numParallelism: Int = 3
test("""kind returns "DataSkippingIndex".""") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A")))
assert(index.kind === "DataSkippingIndex")
}
test("""kindAbbr returns "DS".""") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A")))
assert(index.kindAbbr === "DS")
}
test("indexedColumns returns indexed columns of sketches.") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A"), MinMaxSketch("B")))
assert(index.indexedColumns === Seq("A", "B"))
}
test("referencedColumns returns indexed columns of sketches.") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A"), MinMaxSketch("B")))
assert(index.referencedColumns === Seq("A", "B"))
}
test(
"withNewProperties returns a new index which copies the original index except the " +
"properties.") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A")))
val newIndex = index.withNewProperties(Map("foo" -> "bar"))
assert(newIndex.properties === Map("foo" -> "bar"))
assert(newIndex.sketches === index.sketches)
}
test("statistics returns a string-formatted list of sketches.") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A"), MinMaxSketch("B")))
assert(index.statistics() === Map("sketches" -> "MinMax(A), MinMax(B)"))
}
test("canHandleDeletedFiles returns true.") {
val index = DataSkippingIndex(Seq(MinMaxSketch("A")))
assert(index.canHandleDeletedFiles === true)
}
test("write writes the index data in a Parquet format.") {
val sourceData = createSourceData(spark.range(100).toDF("A"))
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
index.write(ctx, indexData)
val writtenIndexData = spark.read.parquet(indexDataPath.toString)
checkAnswer(writtenIndexData, indexData)
}
test("optimize reduces the number of index data files.") {
val targetIndexDataFileSize = 100000
val expectedNumIndexDataFiles = 1
withSQLConf(
IndexConstants.DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE ->
targetIndexDataFileSize.toString) {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val sourceData = createSourceData(spark.range(100).toDF("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
index.write(ctx, indexData)
// Create more index data files by refreshing the index incrementally.
val iterations = 5
val indexDataPaths = (1 until iterations).map { i =>
val appendedSourceData = createSourceData(
spark.range(i * 100, (i + 1) * 100).toDF("A"),
saveMode = SaveMode.Append,
appendedDataOnly = true)
val newIndexDataPath = new Path(inTempDir(s"Index$i"))
indexDataPathVar = newIndexDataPath
index.refreshIncremental(ctx, Some(appendedSourceData), Nil, emptyContent)
newIndexDataPath
}
// During refresh, index data files are put in different paths.
val indexDataFiles = listFiles(indexDataPaths: _*)
indexDataPathVar = new Path(inTempDir(s"Index$iterations"))
index.optimize(ctx, indexDataFiles.map(f => FileInfo(f, fileIdTracker.addFile(f), true)))
val optimizedIndexDataFiles = listFiles(indexDataPathVar).filter(isParquet)
assert(optimizedIndexDataFiles.length === expectedNumIndexDataFiles)
}
}
test("write throws an exception if target index data file size is too small.") {
withSQLConf(IndexConstants.DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE -> "1") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val sourceData = createSourceData(spark.range(100).toDF("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
val mockIndexData = RDDTestUtils.getMockDataFrameWithFakeSize(spark, 4000000000L)
val ex = intercept[HyperspaceException](index.write(ctx, mockIndexData))
assert(
ex.getMessage.contains("Could not create index data files due to too many files: " +
"indexDataSize=4000000000, targetIndexDataFileSize=1"))
}
}
test("refreshIncremental works correctly for appended data.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val sourceData = createSourceData(spark.range(100).toDF("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
index.write(ctx, indexData)
val appendedSourceData = createSourceData(
spark.range(100, 200).toDF("A"),
saveMode = SaveMode.Append,
appendedDataOnly = true)
val indexDataPath2 = new Path(inTempDir("Index2"))
indexDataPathVar = indexDataPath2
val (newIndex, updateMode) =
index.refreshIncremental(ctx, Some(appendedSourceData), Nil, emptyContent)
assert(newIndex === index)
assert(updateMode === Index.UpdateMode.Merge)
val updatedIndexData = spark.read.parquet(indexDataPath.toString, indexDataPath2.toString)
val expectedSketchValues = sourceData
.union(appendedSourceData)
.groupBy(input_file_name().as(fileNameCol))
.agg(min("A"), max("A"))
checkAnswer(updatedIndexData, withFileId(expectedSketchValues))
}
test("refreshIncremental works correctly for deleted data.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val sourceData = createSourceData(spark.range(100).toDF("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
index.write(ctx, indexData)
val deletedFile = listFiles(dataPath()).filter(isParquet).head
deleteFile(deletedFile.getPath)
val indexDataPath2 = new Path(inTempDir("Index2"))
indexDataPathVar = indexDataPath2
val (newIndex, updateMode) =
index.refreshIncremental(
ctx,
None,
Seq(FileInfo(deletedFile, fileIdTracker.addFile(deletedFile), true)),
Content.fromDirectory(indexDataPath, fileIdTracker, new Configuration))
assert(newIndex === index)
assert(updateMode === Index.UpdateMode.Overwrite)
val updatedIndexData = spark.read.parquet(indexDataPath2.toString)
val expectedSketchValues = spark.read
.parquet(dataPath().toString)
.union(spark.read.parquet(dataPath().toString))
.groupBy(input_file_name().as(fileNameCol))
.agg(min("A"), max("A"))
checkAnswer(updatedIndexData, withFileId(expectedSketchValues))
}
test("refreshIncremental works correctly for appended and deleted data.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val sourceData = createSourceData(spark.range(100).toDF("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
index.write(ctx, indexData)
val deletedFile = listFiles(dataPath()).filter(isParquet).head
deleteFile(deletedFile.getPath)
val appendedSourceData = createSourceData(
spark.range(100, 200).toDF("A"),
saveMode = SaveMode.Append,
appendedDataOnly = true)
val indexDataPath2 = new Path(inTempDir("Index2"))
indexDataPathVar = indexDataPath2
val (newIndex, updateMode) =
index.refreshIncremental(
ctx,
Some(appendedSourceData),
Seq(FileInfo(deletedFile, fileIdTracker.addFile(deletedFile), true)),
Content.fromDirectory(indexDataPath, fileIdTracker, new Configuration))
assert(newIndex === index)
assert(updateMode === Index.UpdateMode.Overwrite)
val updatedIndexData = spark.read.parquet(indexDataPath2.toString)
val expectedSketchValues = spark.read
.parquet(dataPath().toString)
.union(spark.read.parquet(dataPath().toString))
.groupBy(input_file_name().as(fileNameCol))
.agg(min("A"), max("A"))
checkAnswer(updatedIndexData, withFileId(expectedSketchValues))
}
test("refreshFull works correctly for fully overwritten data.") {
val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A"))
val sourceData = createSourceData(spark.range(100).toDF("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
index.write(ctx, indexData)
val newSourceData = createSourceData(spark.range(200).toDF("A"))
val (newIndex, newIndexData) = index.refreshFull(ctx, newSourceData)
assert(newIndex === index)
val expectedSketchValues = newSourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(min("A"), max("A"))
checkAnswer(newIndexData, withFileId(expectedSketchValues))
}
test("At least one sketch must be specified.") {
val ex = intercept[AssertionError](DataSkippingIndex(Nil))
assert(ex.getMessage().contains("At least one sketch is required"))
}
test("Indexes are equal if they have the same sketches and data types.") {
val ds1 = DataSkippingIndex(Seq(MinMaxSketch("A"), MinMaxSketch("B")))
val ds2 = DataSkippingIndex(Seq(MinMaxSketch("B"), MinMaxSketch("A")))
assert(ds1 === ds2)
assert(ds1.hashCode === ds2.hashCode)
}
test("Indexes are not equal to objects which are not indexes.") {
val ds = DataSkippingIndex(Seq(MinMaxSketch("A")))
assert(ds !== "ds")
}
test("Index can be serialized.") {
val ds = DataSkippingIndex(Seq(MinMaxSketch("A", Some(IntegerType))), Map("a" -> "b"))
val json = JsonUtils.toJson(ds)
assert(
json ===
"""|{
| "type" : "com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex",
| "sketches" : [ {
| "type" : "com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch",
| "expr" : "A",
| "dataType" : "integer"
| } ],
| "properties" : {
| "a" : "b"
| }
|}""".stripMargin)
}
test("Index can be deserialized.") {
val json =
"""|{
| "type" : "com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex",
| "sketches" : [ {
| "type" : "com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch",
| "expr" : "A",
| "dataType" : "integer"
| } ],
| "properties" : {
| "a" : "b"
| }
|}""".stripMargin
val ds = JsonUtils.fromJson[DataSkippingIndex](json)
assert(ds === DataSkippingIndex(Seq(MinMaxSketch("A", Some(IntegerType)))))
assert(ds.properties === Map("a" -> "b"))
}
}

Просмотреть файл

@ -0,0 +1,114 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import scala.collection.AbstractIterator
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RemoteIterator}
import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode, SparkSession}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.util.FileUtils
trait DataSkippingSuite extends QueryTest with HyperspaceSuite {
import spark.implicits._
val dataPathRoot = new Path(inTempDir("Data"))
val indexDataPath = new Path(inTempDir("Index"))
val fileNameCol = "input_file_name()"
val emptyContent = Content(Directory.createEmptyDirectory(new Path("/")))
val suite = this
val ctx = new IndexerContext {
override def spark: SparkSession = suite.spark
override def fileIdTracker: FileIdTracker = suite.fileIdTracker
override def indexDataPath: Path = suite.indexDataPathVar
}
var indexDataPathVar = indexDataPath
var fileIdTracker: FileIdTracker = _
var hs: Hyperspace = _
before {
indexDataPathVar = indexDataPath
fileIdTracker = new FileIdTracker
hs = new Hyperspace(spark)
}
after {
FileUtils.delete(tempDir)
}
def dataPath(path: String = "T"): Path = new Path(dataPathRoot, path)
def createSourceData(
originalData: DataFrame,
path: String = "T",
saveMode: SaveMode = SaveMode.Overwrite,
appendedDataOnly: Boolean = false): DataFrame = {
val p = dataPath(path)
val oldFiles = listFiles(p).toSet
originalData.write.mode(saveMode).parquet(p.toString)
updateFileIdTracker(p)
if (appendedDataOnly) {
val newFiles = listFiles(p).filterNot(oldFiles.contains)
spark.read.parquet(newFiles.map(_.getPath.toString): _*)
} else {
spark.read.parquet(p.toString)
}
}
def updateFileIdTracker(path: Path): Unit = {
listFiles(path).foreach(f => fileIdTracker.addFile(f))
}
def withFileId(indexData: DataFrame): DataFrame = {
val fileIdDf = fileIdTracker
.getIdToFileMapping(_.replace("file:/", "file:///"))
.toDF(IndexConstants.DATA_FILE_NAME_ID, fileNameCol)
indexData
.join(
fileIdDf,
IndexUtils.decodeInputFileName(indexData(fileNameCol)) === fileIdDf(fileNameCol))
.select(
IndexConstants.DATA_FILE_NAME_ID,
indexData.columns.filterNot(_ == fileNameCol).map(c => s"`$c`"): _*)
}
def listFiles(paths: Path*): Seq[FileStatus] = {
val fs = paths.head.getFileSystem(new Configuration)
paths
.filter(fs.exists)
.flatMap { p =>
val it = fs.listFiles(p, true)
case class IteratorAdapter[T](it: RemoteIterator[T]) extends AbstractIterator[T] {
def hasNext: Boolean = it.hasNext
def next(): T = it.next()
}
IteratorAdapter(it).toSeq
}
.sortBy(_.getPath.toString)
}
def deleteFile(path: Path): Unit = {
val fs = path.getFileSystem(new Configuration)
fs.delete(path, true)
}
def isParquet: FileStatus => Boolean = _.getPath.getName.endsWith(".parquet")
}

Просмотреть файл

@ -0,0 +1,38 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.RDDInfo
import org.mockito.Mockito.{mock, spy, when}
object RDDTestUtils {
def getMockDataFrameWithFakeSize(spark: SparkSession, size: Long): DataFrame = {
val df = spy(spark.emptyDataFrame)
val rdd = spy(df.rdd)
val mockSparkContext = mock(classOf[SparkContext])
val mockRddStorageInfo = mock(classOf[RDDInfo])
when(df.rdd).thenReturn(rdd)
when(rdd.id).thenReturn(42)
when(rdd.context).thenReturn(mockSparkContext)
when(mockSparkContext.getRDDStorageInfo).thenReturn(Array[RDDInfo](mockRddStorageInfo))
when(mockRddStorageInfo.id).thenReturn(42)
when(mockRddStorageInfo.memSize).thenReturn(size)
df
}
}

Просмотреть файл

@ -0,0 +1,66 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.sketch
import org.apache.spark.sql.{Column, QueryTest}
import org.mockito.Mockito.mock
import com.microsoft.hyperspace.index.HyperspaceSuite
class MinMaxSketchTest extends QueryTest with HyperspaceSuite {
import spark.implicits._
test("indexedColumns returns the indexed column.") {
val sketch = MinMaxSketch("A")
assert(sketch.indexedColumns === Seq("A"))
}
test("referencedColumns returns the indexed column.") {
val sketch = MinMaxSketch("A")
assert(sketch.referencedColumns === Seq("A"))
}
test("aggregateFunctions returns min and max aggregation functions.") {
val sketch = MinMaxSketch("A")
val aggrs = sketch.aggregateFunctions.map(new Column(_))
val data = Seq(1, -1, 10, 2, 4).toDF("A")
checkAnswer(data.select(aggrs: _*), Seq((-1, 10)).toDF)
}
test("toString returns a reasonable string.") {
val sketch = MinMaxSketch("A")
assert(sketch.toString === "MinMax(A)")
}
test("Two sketches are equal if their columns are equal.") {
assert(MinMaxSketch("A") === MinMaxSketch("A"))
assert(MinMaxSketch("A") !== MinMaxSketch("a"))
assert(MinMaxSketch("b") !== MinMaxSketch("B"))
assert(MinMaxSketch("B") === MinMaxSketch("B"))
}
test("MinMaxSketch is different from other sketches.") {
val s1 = MinMaxSketch("A")
val s2 = mock(classOf[Sketch])
assert(s1 !== s2)
}
test("hashCode is reasonably implemented.") {
assert(MinMaxSketch("A").hashCode === MinMaxSketch("A").hashCode)
assert(MinMaxSketch("A").hashCode !== MinMaxSketch("a").hashCode)
}
}

Просмотреть файл

@ -0,0 +1,38 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.dataskipping.util
import org.apache.spark.sql.functions.{count, max, min}
import com.microsoft.hyperspace.index.dataskipping.DataSkippingSuite
class DataFrameUtilsTest extends DataSkippingSuite {
test("getSizeInBytes returns an estimated size of the dataframe in bytes.") {
val df = spark
.range(100000000)
.selectExpr("id as A", "cast(id / 100 as int) as B")
.groupBy("B")
.agg(count("A").as("count"), min("A").as("min"), max("A").as("max"))
df.cache()
df.count() // force cache
val estimate = DataFrameUtils.getSizeInBytes(df)
df.repartition(1).write.parquet(dataPath().toString)
df.unpersist()
val real = listFiles(dataPath()).filter(isParquet).map(_.getLen).sum
assert(real / 2 <= estimate && estimate <= real * 2)
}
}