Deterministic index selection for filter rule with no hybrid scan (#428)
This commit is contained in:
Родитель
dafbb13b34
Коммит
a373eef213
|
@ -466,6 +466,11 @@ case class IndexLogEntry(
|
|||
sourceFileInfoSet.foldLeft(0L)(_ + _.size)
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
lazy val indexFilesSizeInBytes: Long = {
|
||||
content.fileInfos.foldLeft(0L)(_ + _.size)
|
||||
}
|
||||
|
||||
def sourceUpdate: Option[Update] = {
|
||||
relations.head.data.properties.update
|
||||
}
|
||||
|
|
|
@ -54,7 +54,11 @@ object FilterIndexRanker {
|
|||
} else {
|
||||
// TODO: Add ranking algorithm to sort candidates.
|
||||
// See https://github.com/microsoft/hyperspace/issues/52
|
||||
Some(candidates.head)
|
||||
|
||||
// Pick the index with minimum size. If indexes with same size are found, pick the
|
||||
// one with lexicographically smaller name. This is required for deterministic selection
|
||||
// of indexes.
|
||||
Some(candidates.minBy(index => (index.indexFilesSizeInBytes, index.name)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
|
|||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.types.{IntegerType, StringType}
|
||||
|
||||
import com.microsoft.hyperspace.index.{FileInfo, IndexConstants}
|
||||
import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntryTags}
|
||||
import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite
|
||||
import com.microsoft.hyperspace.util.FileUtils
|
||||
|
||||
|
@ -45,16 +45,36 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite {
|
|||
val t2c1 = AttributeReference("t2c1", IntegerType)()
|
||||
val t2c2 = AttributeReference("t2c2", StringType)()
|
||||
|
||||
test("rank() should return the head of the list by default.") {
|
||||
val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false)
|
||||
setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil)
|
||||
val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false)
|
||||
setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil)
|
||||
val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false)
|
||||
setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil)
|
||||
test("rank() should return the index with smallest size by default.") {
|
||||
// Index with only 1 file of size 20
|
||||
val ind1 = createIndexLogEntry(
|
||||
"ind1",
|
||||
Seq(t1c1),
|
||||
Seq(t1c2),
|
||||
tempPlan,
|
||||
writeLog = false,
|
||||
filenames = Seq("f1.parquet", "f2.parquet"))
|
||||
|
||||
// Index with only 2 files of total size 10
|
||||
val ind2 = createIndexLogEntry(
|
||||
"ind2",
|
||||
Seq(t1c1),
|
||||
Seq(t1c2),
|
||||
tempPlan,
|
||||
writeLog = false,
|
||||
filenames = Seq("f1.parquet"))
|
||||
|
||||
// Index with only 3 files of total size 30
|
||||
val ind3 = createIndexLogEntry(
|
||||
"ind3",
|
||||
Seq(t2c1),
|
||||
Seq(t2c2),
|
||||
tempPlan,
|
||||
writeLog = false,
|
||||
filenames = Seq("f1.parquet", "f2.parquet", "f3.parquet"))
|
||||
|
||||
val indexes = Seq(ind1, ind2, ind3)
|
||||
assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1))
|
||||
assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind2))
|
||||
}
|
||||
|
||||
test(
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
package com.microsoft.hyperspace.index.rules
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
|
@ -27,9 +28,10 @@ import com.microsoft.hyperspace.HyperspaceException
|
|||
import com.microsoft.hyperspace.actions.Constants
|
||||
import com.microsoft.hyperspace.index._
|
||||
import com.microsoft.hyperspace.index.Hdfs.Properties
|
||||
import com.microsoft.hyperspace.util.PathUtils
|
||||
|
||||
trait HyperspaceRuleSuite extends HyperspaceSuite {
|
||||
private val filenames = Seq("f1.parquet", "f2.parquet")
|
||||
private val defaultFileNames = Seq("f1.parquet", "f2.parquet")
|
||||
def createIndexLogEntry(
|
||||
name: String,
|
||||
indexCols: Seq[AttributeReference],
|
||||
|
@ -37,7 +39,8 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
|
|||
plan: LogicalPlan,
|
||||
numBuckets: Int = 10,
|
||||
inputFiles: Seq[FileInfo] = Seq(),
|
||||
writeLog: Boolean = true): IndexLogEntry = {
|
||||
writeLog: Boolean = true,
|
||||
filenames: Seq[String] = defaultFileNames): IndexLogEntry = {
|
||||
val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName
|
||||
|
||||
LogicalPlanSignatureProvider.create(signClass).signature(plan) match {
|
||||
|
@ -54,7 +57,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
|
|||
null,
|
||||
LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s)))))
|
||||
|
||||
val indexFiles = getIndexDataFilesPaths(name).map { path =>
|
||||
val indexFiles = getIndexDataFilesPaths(name, filenames).map { path =>
|
||||
new FileStatus(10, false, 1, 10, 10, path)
|
||||
}
|
||||
|
||||
|
@ -82,11 +85,13 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
|
|||
}
|
||||
}
|
||||
|
||||
def getIndexDataFilesPaths(indexName: String): Seq[Path] =
|
||||
def getIndexDataFilesPaths(
|
||||
indexName: String,
|
||||
filenames: Seq[String] = defaultFileNames): Seq[Path] =
|
||||
filenames.map { f =>
|
||||
new Path(
|
||||
new Path(
|
||||
new Path(systemPath, indexName),
|
||||
getIndexRootPath(indexName),
|
||||
s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0"),
|
||||
f)
|
||||
}
|
||||
|
@ -99,7 +104,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
|
|||
spark)
|
||||
|
||||
def getIndexRootPath(indexName: String): Path =
|
||||
new Path(systemPath, indexName)
|
||||
PathUtils.makeAbsolute(new Path(systemPath, indexName), new Configuration)
|
||||
|
||||
def setCommonSourceSizeInBytesTag(
|
||||
index: IndexLogEntry,
|
||||
|
|
Загрузка…
Ссылка в новой задаче