This commit is contained in:
Chungmin Lee 2021-06-29 06:15:38 +09:00 коммит произвёл GitHub
Родитель 38bd40019e
Коммит 896f734d4d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
56 изменённых файлов: 570 добавлений и 561 удалений

Просмотреть файл

@ -1,3 +1,5 @@
version = 2.6.3
# The following configs are taken from https://github.com/apache/spark/blob/master/dev/.scalafmt.conf # The following configs are taken from https://github.com/apache/spark/blob/master/dev/.scalafmt.conf
align = none align = none
align.openParenDefnSite = false align.openParenDefnSite = false
@ -11,4 +13,4 @@ docstrings = JavaDoc
maxColumn = 98 maxColumn = 98
# The following are specific to Hyperspace. # The following are specific to Hyperspace.
importSelectors = singleLine importSelectors = singleLine

Просмотреть файл

@ -32,10 +32,11 @@ import com.microsoft.hyperspace.util.JavaConverters._
object IcebergShims { object IcebergShims {
def isIcebergRelation(plan: LogicalPlan): Boolean = plan match { def isIcebergRelation(plan: LogicalPlan): Boolean =
case DataSourceV2Relation(_: IcebergSource, _, _, _, _) => true plan match {
case _ => false case DataSourceV2Relation(_: IcebergSource, _, _, _, _) => true
} case _ => false
}
def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) = { def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) = {
val conf = spark.sessionState.newHadoopConf() val conf = spark.sessionState.newHadoopConf()

Просмотреть файл

@ -28,11 +28,12 @@ object IcebergShims {
// In Spark 3, the V2ScanRelationPushdown rule can convert DataSourceV2Relation into // In Spark 3, the V2ScanRelationPushdown rule can convert DataSourceV2Relation into
// DataSourceV2ScanRelation. // DataSourceV2ScanRelation.
def isIcebergRelation(plan: LogicalPlan): Boolean = plan match { def isIcebergRelation(plan: LogicalPlan): Boolean =
case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true plan match {
case DataSourceV2ScanRelation(_: SparkTable, _, _) => true case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
case _ => false case DataSourceV2ScanRelation(_: SparkTable, _, _) => true
} case _ => false
}
def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) = def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) =
plan match { plan match {

Просмотреть файл

@ -26,11 +26,12 @@ import org.apache.spark.util.hyperspace.Utils
object IcebergShims { object IcebergShims {
def isIcebergRelation(plan: LogicalPlan): Boolean = plan match { def isIcebergRelation(plan: LogicalPlan): Boolean =
case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true plan match {
case DataSourceV2ScanRelation(DataSourceV2Relation(_: SparkTable, _, _, _, _), _, _) => true case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
case _ => false case DataSourceV2ScanRelation(DataSourceV2Relation(_: SparkTable, _, _, _, _), _, _) => true
} case _ => false
}
def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) = def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) =
plan match { plan match {

Просмотреть файл

@ -78,25 +78,29 @@ trait ScalaObjectMapper {
if (isArray(clazz)) { if (isArray(clazz)) {
val typeArguments = m.typeArguments.map(constructType(_)).toArray val typeArguments = m.typeArguments.map(constructType(_)).toArray
if (typeArguments.length != 1) { if (typeArguments.length != 1) {
throw new IllegalArgumentException("Need exactly 1 type parameter for array like types ("+clazz.getName+")") throw new IllegalArgumentException(
"Need exactly 1 type parameter for array like types (" + clazz.getName + ")")
} }
getTypeFactory.constructArrayType(typeArguments(0)) getTypeFactory.constructArrayType(typeArguments(0))
} else if (isMapLike(clazz)) { } else if (isMapLike(clazz)) {
val typeArguments = m.typeArguments.map(constructType(_)).toArray val typeArguments = m.typeArguments.map(constructType(_)).toArray
if (typeArguments.length != 2) { if (typeArguments.length != 2) {
throw new IllegalArgumentException("Need exactly 2 type parameters for map like types ("+clazz.getName+")") throw new IllegalArgumentException(
"Need exactly 2 type parameters for map like types (" + clazz.getName + ")")
} }
getTypeFactory.constructMapLikeType(clazz, typeArguments(0), typeArguments(1)) getTypeFactory.constructMapLikeType(clazz, typeArguments(0), typeArguments(1))
} else if (isReference(clazz)) { // Option is a subclss of IterableOnce, so check it first } else if (isReference(clazz)) { // Option is a subclss of IterableOnce, so check it first
val typeArguments = m.typeArguments.map(constructType(_)).toArray val typeArguments = m.typeArguments.map(constructType(_)).toArray
if (typeArguments.length != 1) { if (typeArguments.length != 1) {
throw new IllegalArgumentException("Need exactly 1 type parameter for reference types ("+clazz.getName+")") throw new IllegalArgumentException(
"Need exactly 1 type parameter for reference types (" + clazz.getName + ")")
} }
getTypeFactory.constructReferenceType(clazz, typeArguments(0)) getTypeFactory.constructReferenceType(clazz, typeArguments(0))
} else if (isCollectionLike(clazz)) { } else if (isCollectionLike(clazz)) {
val typeArguments = m.typeArguments.map(constructType(_)).toArray val typeArguments = m.typeArguments.map(constructType(_)).toArray
if (typeArguments.length != 1) { if (typeArguments.length != 1) {
throw new IllegalArgumentException("Need exactly 1 type parameter for collection like types ("+clazz.getName+")") throw new IllegalArgumentException(
"Need exactly 1 type parameter for collection like types (" + clazz.getName + ")")
} }
getTypeFactory.constructCollectionLikeType(clazz, typeArguments(0)) getTypeFactory.constructCollectionLikeType(clazz, typeArguments(0))
} else { } else {
@ -355,7 +359,7 @@ trait ScalaObjectMapper {
c.isArray c.isArray
} }
private val MAP = classOf[collection.Map[_,_]] private val MAP = classOf[collection.Map[_, _]]
private def isMapLike(c: Class[_]): Boolean = { private def isMapLike(c: Class[_]): Boolean = {
MAP.isAssignableFrom(c) MAP.isAssignableFrom(c)
} }

Просмотреть файл

@ -156,8 +156,8 @@ class Hyperspace(spark: SparkSession) {
* @param redirectFunc optional function to redirect output of explain. * @param redirectFunc optional function to redirect output of explain.
* @param verbose Flag to enable verbose mode. * @param verbose Flag to enable verbose mode.
*/ */
def explain(df: DataFrame, verbose: Boolean = false)( def explain(df: DataFrame, verbose: Boolean = false)(implicit
implicit redirectFunc: String => Unit = print): Unit = { redirectFunc: String => Unit = print): Unit = {
redirectFunc(PlanAnalyzer.explainString(df, spark, indexManager.indexes, verbose)) redirectFunc(PlanAnalyzer.explainString(df, spark, indexManager.indexes, verbose))
} }

Просмотреть файл

@ -61,12 +61,7 @@ class CancelAction(final override protected val logManager: IndexLogManager) ext
*/ */
final override def op(): Unit = {} final override def op(): Unit = {}
final override protected def event( final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
appInfo: AppInfo, CancelActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
message: String): HyperspaceEvent = {
CancelActionEvent(
appInfo,
logEntry.asInstanceOf[IndexLogEntry],
message)
} }
} }

Просмотреть файл

@ -66,8 +66,7 @@ class CreateAction(
} }
// TODO: Temporarily block creating indexes using nested columns until it's fully supported. // TODO: Temporarily block creating indexes using nested columns until it's fully supported.
if (!(HyperspaceConf.nestedColumnEnabled(spark) || resolvedColumns.get.forall( if (!(HyperspaceConf.nestedColumnEnabled(spark) || resolvedColumns.get.forall(!_.isNested))) {
!_.isNested))) {
throw HyperspaceException("Hyperspace does not support nested columns yet.") throw HyperspaceException("Hyperspace does not support nested columns yet.")
} }

Просмотреть файл

@ -72,12 +72,8 @@ class RefreshIncrementalAction(
} else { } else {
None None
} }
updatedIndex = Some( updatedIndex = Some(previousIndexLogEntry.derivedDataset
previousIndexLogEntry.derivedDataset.refreshIncremental( .refreshIncremental(this, appendedSourceData, deletedFiles, previousIndexLogEntry.content))
this,
appendedSourceData,
deletedFiles,
previousIndexLogEntry.content))
} }
/** /**

Просмотреть файл

@ -146,10 +146,11 @@ case class CoveringIndex(
indexData) indexData)
} }
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean =
case that: CoveringIndex => comparedData == that.comparedData o match {
case _ => false case that: CoveringIndex => comparedData == that.comparedData
} case _ => false
}
override def hashCode: Int = { override def hashCode: Int = {
comparedData.hashCode comparedData.hashCode

Просмотреть файл

@ -28,11 +28,7 @@ import org.apache.spark.sql.DataFrame
* *
* The framework manages various types of indexes through this interface. * The framework manages various types of indexes through this interface.
*/ */
@JsonTypeInfo( @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "type")
use = JsonTypeInfo.Id.CLASS,
include = JsonTypeInfo.As.PROPERTY,
property = "type"
)
trait Index { trait Index {
/** /**
@ -126,10 +122,10 @@ trait Index {
* index if no update is needed * index if no update is needed
*/ */
def refreshIncremental( def refreshIncremental(
ctx: IndexerContext, ctx: IndexerContext,
appendedSourceData: Option[DataFrame], appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo], deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): Index indexContent: Content): Index
/** /**
* Indexes the source data and returns an updated index and index data. * Indexes the source data and returns an updated index and index data.

Просмотреть файл

@ -50,7 +50,9 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
rec( rec(
new Path(root.name), new Path(root.name),
root, root,
(f, prefix) => (
f,
prefix) =>
FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet
} }
@ -101,9 +103,7 @@ object Content {
* @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids.
* @return Content object with Directory tree from leaf files. * @return Content object with Directory tree from leaf files.
*/ */
def fromLeafFiles( def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Option[Content] = {
files: Seq[FileStatus],
fileIdTracker: FileIdTracker): Option[Content] = {
if (files.nonEmpty) { if (files.nonEmpty) {
Some(Content(Directory.fromLeafFiles(files, fileIdTracker))) Some(Content(Directory.fromLeafFiles(files, fileIdTracker)))
} else { } else {
@ -224,12 +224,10 @@ object Directory {
* @param files List of leaf files. * @param files List of leaf files.
* @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids.
* Note: If a new leaf file is discovered, the input fileIdTracker gets * Note: If a new leaf file is discovered, the input fileIdTracker gets
* updated by adding it to the files it is tracking. * updated by adding it to the files it is tracking.
* @return Content object with Directory tree from leaf files. * @return Content object with Directory tree from leaf files.
*/ */
def fromLeafFiles( def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Directory = {
files: Seq[FileStatus],
fileIdTracker: FileIdTracker): Directory = {
require( require(
files.nonEmpty, files.nonEmpty,
s"Empty files list found while creating a ${Directory.getClass.getName}.") s"Empty files list found while creating a ${Directory.getClass.getName}.")
@ -308,13 +306,14 @@ object Directory {
// id is a unique identifier generated by Hyperspace, for each unique combination of // id is a unique identifier generated by Hyperspace, for each unique combination of
// file's name, size and modifiedTime. // file's name, size and modifiedTime.
case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) { case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) {
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean =
case that: FileInfo => o match {
name.equals(that.name) && case that: FileInfo =>
size.equals(that.size) && name.equals(that.name) &&
modifiedTime.equals(that.modifiedTime) size.equals(that.size) &&
case _ => false modifiedTime.equals(that.modifiedTime)
} case _ => false
}
override def hashCode(): Int = { override def hashCode(): Int = {
name.hashCode + size.hashCode + modifiedTime.hashCode name.hashCode + size.hashCode + modifiedTime.hashCode
@ -349,9 +348,7 @@ object LogicalPlanFingerprint {
* @param appendedFiles Appended files. * @param appendedFiles Appended files.
* @param deletedFiles Deleted files. * @param deletedFiles Deleted files.
*/ */
case class Update( case class Update(appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None)
appendedFiles: Option[Content] = None,
deletedFiles: Option[Content] = None)
// IndexLogEntry-specific Hdfs that represents the source data. // IndexLogEntry-specific Hdfs that represents the source data.
case class Hdfs(properties: Hdfs.Properties) { case class Hdfs(properties: Hdfs.Properties) {
@ -467,34 +464,28 @@ case class IndexLogEntry(
def toFileStatus(f: FileInfo) = { def toFileStatus(f: FileInfo) = {
new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name)) new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name))
} }
copy( copy(source = source.copy(plan = source.plan.copy(properties = source.plan.properties.copy(
source = source.copy( fingerprint = latestFingerprint,
plan = source.plan.copy( relations = Seq(
properties = source.plan.properties.copy( relations.head.copy(data = relations.head.data.copy(properties =
fingerprint = latestFingerprint, relations.head.data.properties.copy(update = Some(Update(
relations = Seq( appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker),
relations.head.copy( deletedFiles =
data = relations.head.data.copy( Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker)))))))))))
properties = relations.head.data.properties.copy(
update = Some(
Update(
appendedFiles =
Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker),
deletedFiles =
Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker)))))))))))
} }
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean =
case that: IndexLogEntry => o match {
name.equals(that.name) && case that: IndexLogEntry =>
derivedDataset.equals(that.derivedDataset) && name.equals(that.name) &&
signature.equals(that.signature) && derivedDataset.equals(that.derivedDataset) &&
content.root.equals(that.content.root) && signature.equals(that.signature) &&
source.equals(that.source) && content.root.equals(that.content.root) &&
properties.equals(that.properties) && source.equals(that.source) &&
state.equals(that.state) properties.equals(that.properties) &&
case _ => false state.equals(that.state)
} case _ => false
}
def indexedColumns: Seq[String] = derivedDataset.indexedColumns def indexedColumns: Seq[String] = derivedDataset.indexedColumns
@ -506,8 +497,9 @@ case class IndexLogEntry(
def hasParquetAsSourceFormat: Boolean = { def hasParquetAsSourceFormat: Boolean = {
relations.head.fileFormat.equals("parquet") || relations.head.fileFormat.equals("parquet") ||
derivedDataset.properties.getOrElse( derivedDataset.properties
IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean .getOrElse(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false")
.toBoolean
} }
@JsonIgnore @JsonIgnore
@ -594,8 +586,7 @@ object IndexLogEntry {
derivedDataset, derivedDataset,
content, content,
source, source,
properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version)) properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version)))
)
} }
} }
@ -608,10 +599,10 @@ class FileIdTracker {
// Combination of file properties, used as key, to identify a // Combination of file properties, used as key, to identify a
// unique file for which an id is generated. // unique file for which an id is generated.
type key = ( type key = (
String, // Full path. String, // Full path.
Long, // Size. Long, // Size.
Long // Modified time. Long // Modified time.
) )
private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap()
def getMaxFileId: Long = maxId def getMaxFileId: Long = maxId
@ -636,8 +627,7 @@ class FileIdTracker {
setSizeHint(files.size) setSizeHint(files.size)
files.foreach { f => files.foreach { f =>
if (f.id == IndexConstants.UNKNOWN_FILE_ID) { if (f.id == IndexConstants.UNKNOWN_FILE_ID) {
throw HyperspaceException( throw HyperspaceException(s"Cannot add file info with unknown id. (file: ${f.name}).")
s"Cannot add file info with unknown id. (file: ${f.name}).")
} }
val key = (f.name, f.size, f.modifiedTime) val key = (f.name, f.size, f.modifiedTime)
@ -665,8 +655,7 @@ class FileIdTracker {
*/ */
def addFile(file: FileStatus): Long = { def addFile(file: FileStatus): Long = {
fileToIdMap.getOrElseUpdate( fileToIdMap.getOrElseUpdate(
(file.getPath.toString, file.getLen, file.getModificationTime), (file.getPath.toString, file.getLen, file.getModificationTime), {
{
maxId += 1 maxId += 1
maxId maxId
}) })

Просмотреть файл

@ -47,12 +47,12 @@ object IndexLogEntryTags {
// INMEMORYFILEINDEX_HYBRID_SCAN stores InMemoryFileIndex including index data files and also // INMEMORYFILEINDEX_HYBRID_SCAN stores InMemoryFileIndex including index data files and also
// appended files for Hybrid Scan. // appended files for Hybrid Scan.
val INMEMORYFILEINDEX_HYBRID_SCAN: IndexLogEntryTag[InMemoryFileIndex] = val INMEMORYFILEINDEX_HYBRID_SCAN: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScan") IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScan")
// INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED stores InMemoryFileIndex including only appended files // INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED stores InMemoryFileIndex including only appended files
// for Hybrid Scan. // for Hybrid Scan.
val INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED: IndexLogEntryTag[InMemoryFileIndex] = val INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended") IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended")
// FILTER_REASONS stores reason strings for disqualification. // FILTER_REASONS stores reason strings for disqualification.
val FILTER_REASONS: IndexLogEntryTag[Seq[String]] = val FILTER_REASONS: IndexLogEntryTag[Seq[String]] =

Просмотреть файл

@ -54,7 +54,7 @@ trait IndexLogManager {
} }
class IndexLogManagerImpl(indexPath: Path, hadoopConfiguration: Configuration = new Configuration) class IndexLogManagerImpl(indexPath: Path, hadoopConfiguration: Configuration = new Configuration)
extends IndexLogManager extends IndexLogManager
with Logging { with Logging {
// Use FileContext instead of FileSystem for atomic renames? // Use FileContext instead of FileSystem for atomic renames?
private lazy val fs: FileSystem = indexPath.getFileSystem(hadoopConfiguration) private lazy val fs: FileSystem = indexPath.getFileSystem(hadoopConfiguration)
@ -102,8 +102,9 @@ class IndexLogManagerImpl(indexPath: Path, hadoopConfiguration: Configuration =
if (entry.exists(e => Constants.STABLE_STATES.contains(e.state))) { if (entry.exists(e => Constants.STABLE_STATES.contains(e.state))) {
return entry return entry
} }
if (entry.exists(e => e.state.equals(Constants.States.CREATING) if (entry.exists(e =>
|| e.state.equals(Constants.States.VACUUMING))) { e.state.equals(Constants.States.CREATING)
|| e.state.equals(Constants.States.VACUUMING))) {
// Do not consider unrelated logs before creating or vacuuming state. // Do not consider unrelated logs before creating or vacuuming state.
return None return None
} }

Просмотреть файл

@ -55,12 +55,8 @@ private[hyperspace] case class IndexStatistics(
additionalStats: Map[String, String]) additionalStats: Map[String, String])
private[hyperspace] object IndexStatistics { private[hyperspace] object IndexStatistics {
val INDEX_SUMMARY_COLUMNS: Seq[String] = Seq( val INDEX_SUMMARY_COLUMNS: Seq[String] =
"name", Seq("name", "indexedColumns", "indexLocation", "state", "additionalStats")
"indexedColumns",
"indexLocation",
"state",
"additionalStats")
/** /**
* Create IndexStatistics instance for a given IndexLogEntry. * Create IndexStatistics instance for a given IndexLogEntry.
@ -146,7 +142,7 @@ private[hyperspace] object IndexStatistics {
var root = entry.content.root var root = entry.content.root
var prefix = entry.content.root.name var prefix = entry.content.root.name
while (root.subDirs.size == 1 && while (root.subDirs.size == 1 &&
!root.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) { !root.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) {
prefix += s"${root.subDirs.head.name}/" prefix += s"${root.subDirs.head.name}/"
root = root.subDirs.head root = root.subDirs.head
} }

Просмотреть файл

@ -26,9 +26,10 @@ import com.microsoft.hyperspace.index.plans.logical.BucketUnion
* to [[BucketUnionExec]] (Spark Plan) * to [[BucketUnionExec]] (Spark Plan)
*/ */
private[hyperspace] object BucketUnionStrategy extends SparkStrategy { private[hyperspace] object BucketUnionStrategy extends SparkStrategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { override def apply(plan: LogicalPlan): Seq[SparkPlan] =
case p: BucketUnion => plan match {
BucketUnionExec(p.children.map(planLater), p.bucketSpec) :: Nil case p: BucketUnion =>
case _ => Nil BucketUnionExec(p.children.map(planLater), p.bucketSpec) :: Nil
} case _ => Nil
}
} }

Просмотреть файл

@ -140,7 +140,7 @@ object PlanAnalyzer {
val usedPaths = new ListBuffer[String] val usedPaths = new ListBuffer[String]
sparkPlan.foreach { sparkPlan.foreach {
case ExtractFileSourceScanExecRelation( case ExtractFileSourceScanExecRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _)) => HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _)) =>
usedPaths += location.rootPaths.head.getParent.toString usedPaths += location.rootPaths.head.getParent.toString
case other => case other =>
other.subqueries.foreach { subQuery => other.subqueries.foreach { subQuery =>

Просмотреть файл

@ -62,7 +62,7 @@ private[hyperspace] case class BucketUnion(children: Seq[LogicalPlan], bucketSpe
// compare the data types with the first child // compare the data types with the first child
child.output.zip(children.head.output).forall { child.output.zip(children.head.output).forall {
case (l, r) => l.dataType.equals(r.dataType) case (l, r) => l.dataType.equals(r.dataType)
}) })
children.length > 1 && childrenResolved && allChildrenCompatible children.length > 1 && childrenResolved && allChildrenCompatible
} }
} }

Просмотреть файл

@ -32,8 +32,8 @@ import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging
* Collect candidate indexes for each source plan. * Collect candidate indexes for each source plan.
*/ */
object CandidateIndexCollector extends ActiveSparkSession { object CandidateIndexCollector extends ActiveSparkSession {
private val sourceFilters private val sourceFilters: Seq[SourcePlanIndexFilter] =
: Seq[SourcePlanIndexFilter] = ColumnSchemaFilter :: FileSignatureFilter :: Nil ColumnSchemaFilter :: FileSignatureFilter :: Nil
private def initializePlanToIndexes( private def initializePlanToIndexes(
plan: LogicalPlan, plan: LogicalPlan,
@ -56,9 +56,11 @@ object CandidateIndexCollector extends ActiveSparkSession {
val planToIndexes = initializePlanToIndexes(plan, allIndexes) val planToIndexes = initializePlanToIndexes(plan, allIndexes)
planToIndexes.flatMap { planToIndexes.flatMap {
case (node, allIndexes) => case (node, allIndexes) =>
Some(node, sourceFilters.foldLeft(allIndexes) { (indexes, filter) => Some(
filter(node, indexes) node,
}).filter(_._2.nonEmpty) sourceFilters.foldLeft(allIndexes) { (indexes, filter) =>
filter(node, indexes)
}).filter(_._2.nonEmpty)
} }
} }
} }

Просмотреть файл

@ -128,7 +128,7 @@ object FilterRankFilter extends IndexRankFilter {
plan: LogicalPlan, plan: LogicalPlan,
applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap = { applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap = {
if (applicableIndexes.isEmpty || applicableIndexes.size != 1 if (applicableIndexes.isEmpty || applicableIndexes.size != 1
|| applicableIndexes.head._2.isEmpty) { || applicableIndexes.head._2.isEmpty) {
Map.empty Map.empty
} else { } else {
val relation = RuleUtils.getRelation(spark, plan).get val relation = RuleUtils.getRelation(spark, plan).get

Просмотреть файл

@ -44,8 +44,8 @@ trait IndexFilter extends ActiveSparkSession {
index: IndexLogEntry, index: IndexLogEntry,
reasonString: => String): Unit = { reasonString: => String): Unit = {
if (!condition && index if (!condition && index
.getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED) .getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED)
.getOrElse(false)) { .getOrElse(false)) {
val prevReason = val prevReason =
index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil) index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil)
index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString) index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString)

Просмотреть файл

@ -52,13 +52,15 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
val left = RuleUtils.getRelation(spark, l) val left = RuleUtils.getRelation(spark, l)
val right = RuleUtils.getRelation(spark, r) val right = RuleUtils.getRelation(spark, r)
if (!(left.isDefined && right.isDefined && !RuleUtils.isIndexApplied(left.get) && !RuleUtils if (!(left.isDefined && right.isDefined && !RuleUtils.isIndexApplied(
.isIndexApplied(right.get))) { left.get) && !RuleUtils
.isIndexApplied(right.get))) {
return Map.empty return Map.empty
} }
val leftAndRightIndexes = candidateIndexes.getOrElse(left.get.plan, Nil) ++ candidateIndexes val leftAndRightIndexes =
.getOrElse(right.get.plan, Nil) candidateIndexes.getOrElse(left.get.plan, Nil) ++ candidateIndexes
.getOrElse(right.get.plan, Nil)
val joinConditionCond = withFilterReasonTag( val joinConditionCond = withFilterReasonTag(
plan, plan,
@ -160,17 +162,17 @@ object JoinAttributeFilter extends QueryPlanIndexFilter {
} }
if (withFilterReasonTag( if (withFilterReasonTag(
plan, plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2, candidateIndexes.head._2 ++ candidateIndexes.last._2,
"Each join condition column should come from " + "Each join condition column should come from " +
"relations directly and attributes from left plan must exclusively have " + "relations directly and attributes from left plan must exclusively have " +
"one-to-one mapping with attributes from right plan. " + "one-to-one mapping with attributes from right plan. " +
"E.g. join(A = B and A = D) is not eligible.") { "E.g. join(A = B and A = D) is not eligible.") {
ensureAttributeRequirements( ensureAttributeRequirements(
JoinIndexRule.leftRelation.get, JoinIndexRule.leftRelation.get,
JoinIndexRule.rightRelation.get, JoinIndexRule.rightRelation.get,
JoinIndexRule.joinCondition.get) JoinIndexRule.joinCondition.get)
}) { }) {
candidateIndexes candidateIndexes
} else { } else {
Map.empty Map.empty
@ -206,7 +208,6 @@ object JoinAttributeFilter extends QueryPlanIndexFilter {
* E.g. (A = B and A = D) is not supported. A maps with both B and D. There isn't a one-to-one * E.g. (A = B and A = D) is not supported. A maps with both B and D. There isn't a one-to-one
* mapping. * mapping.
* *
*
* Background knowledge: * Background knowledge:
* An alias in a query plan is represented as [[Alias]] at the time of * An alias in a query plan is represented as [[Alias]] at the time of
* its creation. Unnecessary aliases get resolved and removed during query analysis phase by * its creation. Unnecessary aliases get resolved and removed during query analysis phase by
@ -329,13 +330,13 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get
if (withFilterReasonTag( if (withFilterReasonTag(
plan, plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2, candidateIndexes.head._2 ++ candidateIndexes.last._2,
"Invalid query plan.") { "Invalid query plan.") {
// Make sure required indexed columns are subset of all required columns. // Make sure required indexed columns are subset of all required columns.
resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined && resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined &&
resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined
}) { }) {
val lIndexes = val lIndexes =
getUsableIndexes( getUsableIndexes(
plan, plan,
@ -350,13 +351,13 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
rRequiredAllCols) rRequiredAllCols)
if (withFilterReasonTag( if (withFilterReasonTag(
plan, plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2, candidateIndexes.head._2 ++ candidateIndexes.last._2,
"No available indexes for left subplan.")(lIndexes.nonEmpty) && "No available indexes for left subplan.")(lIndexes.nonEmpty) &&
withFilterReasonTag( withFilterReasonTag(
plan, plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2, candidateIndexes.head._2 ++ candidateIndexes.last._2,
"No available indexes for right subplan.")(rIndexes.nonEmpty)) { "No available indexes for right subplan.")(rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else { } else {
Map.empty Map.empty

Просмотреть файл

@ -218,7 +218,7 @@ object RuleUtils {
val filesToRead = { val filesToRead = {
if (useBucketSpec || !index.hasParquetAsSourceFormat || filesDeleted.nonEmpty || if (useBucketSpec || !index.hasParquetAsSourceFormat || filesDeleted.nonEmpty ||
relation.partitionSchema.nonEmpty) { relation.partitionSchema.nonEmpty) {
// Since the index data is in "parquet" format, we cannot read source files // Since the index data is in "parquet" format, we cannot read source files
// in formats other than "parquet" using one FileScan node as the operator requires // in formats other than "parquet" using one FileScan node as the operator requires
// files in one homogenous format. To address this, we need to read the appended // files in one homogenous format. To address this, we need to read the appended
@ -388,16 +388,17 @@ object RuleUtils {
// Extract top level plan including all required columns for shuffle in its output. // Extract top level plan including all required columns for shuffle in its output.
object ExtractTopLevelPlanForShuffle { object ExtractTopLevelPlanForShuffle {
type returnType = (LogicalPlan, Seq[Option[Attribute]], Boolean) type returnType = (LogicalPlan, Seq[Option[Attribute]], Boolean)
def unapply(plan: LogicalPlan): Option[returnType] = plan match { def unapply(plan: LogicalPlan): Option[returnType] =
case p @ Project(_, Filter(_, LogicalRelation(_: HadoopFsRelation, _, _, _))) => plan match {
Some(p, getIndexedAttrs(p, bucketSpec.bucketColumnNames), true) case p @ Project(_, Filter(_, LogicalRelation(_: HadoopFsRelation, _, _, _))) =>
case p @ Project(_, LogicalRelation(_: HadoopFsRelation, _, _, _)) => Some(p, getIndexedAttrs(p, bucketSpec.bucketColumnNames), true)
Some(p, getIndexedAttrs(p, bucketSpec.bucketColumnNames), true) case p @ Project(_, LogicalRelation(_: HadoopFsRelation, _, _, _)) =>
case f @ Filter(_, LogicalRelation(_: HadoopFsRelation, _, _, _)) => Some(p, getIndexedAttrs(p, bucketSpec.bucketColumnNames), true)
Some(f, getIndexedAttrs(f, bucketSpec.bucketColumnNames), false) case f @ Filter(_, LogicalRelation(_: HadoopFsRelation, _, _, _)) =>
case r @ LogicalRelation(_: HadoopFsRelation, _, _, _) => Some(f, getIndexedAttrs(f, bucketSpec.bucketColumnNames), false)
Some(r, getIndexedAttrs(r, bucketSpec.bucketColumnNames), false) case r @ LogicalRelation(_: HadoopFsRelation, _, _, _) =>
} Some(r, getIndexedAttrs(r, bucketSpec.bucketColumnNames), false)
}
private def getIndexedAttrs( private def getIndexedAttrs(
plan: LogicalPlan, plan: LogicalPlan,

Просмотреть файл

@ -37,11 +37,13 @@ import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf}
*/ */
class FileBasedSourceProviderManager(spark: SparkSession) { class FileBasedSourceProviderManager(spark: SparkSession) {
private val sourceProviders: CacheWithTransform[String, Seq[FileBasedSourceProvider]] = private val sourceProviders: CacheWithTransform[String, Seq[FileBasedSourceProvider]] =
new CacheWithTransform[String, Seq[FileBasedSourceProvider]]({ () => new CacheWithTransform[String, Seq[FileBasedSourceProvider]](
HyperspaceConf.fileBasedSourceBuilders(spark) { () =>
}, { builderClassNames => HyperspaceConf.fileBasedSourceBuilders(spark)
buildProviders(builderClassNames) },
}) { builderClassNames =>
buildProviders(builderClassNames)
})
/** /**
* Returns true if the given logical plan is a supported relation. If all of the registered * Returns true if the given logical plan is a supported relation. If all of the registered

Просмотреть файл

@ -42,14 +42,15 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
/** /**
* Computes the signature of the current relation. * Computes the signature of the current relation.
*/ */
override def signature: String = plan.relation match { override def signature: String =
case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => plan.relation match {
val result = filesFromIndex(location).sortBy(_.getPath.toString).foldLeft("") { case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) =>
(acc: String, f: FileStatus) => val result = filesFromIndex(location).sortBy(_.getPath.toString).foldLeft("") {
HashingUtils.md5Hex(acc + fingerprint(f)) (acc: String, f: FileStatus) =>
} HashingUtils.md5Hex(acc + fingerprint(f))
result }
} result
}
/** /**
* FileStatus list for all source files that the current relation references to. * FileStatus list for all source files that the current relation references to.
@ -62,28 +63,30 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
/** /**
* The partition schema of the current relation. * The partition schema of the current relation.
*/ */
override def partitionSchema: StructType = plan.relation match { override def partitionSchema: StructType =
case HadoopFsRelation(location: FileIndex, _, _, _, _, _) => plan.relation match {
location.partitionSchema case HadoopFsRelation(location: FileIndex, _, _, _, _, _) =>
} location.partitionSchema
}
/** /**
* The optional partition base path of the current relation. * The optional partition base path of the current relation.
*/ */
override def partitionBasePath: Option[String] = plan.relation match { override def partitionBasePath: Option[String] =
case HadoopFsRelation(p: PartitioningAwareFileIndex, _, _, _, _, _) plan.relation match {
if p.partitionSpec.partitions.nonEmpty => case HadoopFsRelation(p: PartitioningAwareFileIndex, _, _, _, _, _)
// For example, we could have the following in PartitionSpec: if p.partitionSpec.partitions.nonEmpty =>
// - partition columns = "col1", "col2" // For example, we could have the following in PartitionSpec:
// - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. // - partition columns = "col1", "col2"
// , and going up the same number of directory levels as the number of partition columns // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc.
// will compute the base path. Note that PartitionSpec.partitions will always contain // , and going up the same number of directory levels as the number of partition columns
// all the partitions in the path, so "partitions.head" is taken as an initial value. // will compute the base path. Note that PartitionSpec.partitions will always contain
val basePath = p.partitionSpec.partitionColumns // all the partitions in the path, so "partitions.head" is taken as an initial value.
.foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent) val basePath = p.partitionSpec.partitionColumns
Some(basePath.toString) .foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent)
case _ => None Some(basePath.toString)
} case _ => None
}
/** /**
* Creates [[HadoopFsRelation]] based on the current relation. * Creates [[HadoopFsRelation]] based on the current relation.
@ -93,10 +96,11 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
override def createHadoopFsRelation( override def createHadoopFsRelation(
location: FileIndex, location: FileIndex,
dataSchema: StructType, dataSchema: StructType,
options: Map[String, String]): HadoopFsRelation = plan.relation match { options: Map[String, String]): HadoopFsRelation =
case h: HadoopFsRelation => plan.relation match {
h.copy(location = location, dataSchema = dataSchema, options = options)(spark) case h: HadoopFsRelation =>
} h.copy(location = location, dataSchema = dataSchema, options = options)(spark)
}
/** /**
* Creates [[LogicalRelation]] based on the current relation. * Creates [[LogicalRelation]] based on the current relation.
@ -118,12 +122,12 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
override def createRelationMetadata(fileIdTracker: FileIdTracker): Relation = { override def createRelationMetadata(fileIdTracker: FileIdTracker): Relation = {
plan.relation match { plan.relation match {
case HadoopFsRelation( case HadoopFsRelation(
location: PartitioningAwareFileIndex, location: PartitioningAwareFileIndex,
_, _,
dataSchema, dataSchema,
_, _,
fileFormat, fileFormat,
options) => options) =>
val files = filesFromIndex(location) val files = filesFromIndex(location)
// Note that source files are currently fingerprinted when the optimized plan is // Note that source files are currently fingerprinted when the optimized plan is
// fingerprinted by LogicalPlanFingerprint. // fingerprinted by LogicalPlanFingerprint.
@ -215,10 +219,11 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
* *
* @return True if source files of the current relation are parquet. * @return True if source files of the current relation are parquet.
*/ */
def hasParquetAsSourceFormat: Boolean = plan.relation match { def hasParquetAsSourceFormat: Boolean =
case h: HadoopFsRelation => plan.relation match {
h.fileFormat.asInstanceOf[DataSourceRegister].shortName.equals("parquet") case h: HadoopFsRelation =>
} h.fileFormat.asInstanceOf[DataSourceRegister].shortName.equals("parquet")
}
/** /**
* For [[DefaultFileBasedRelation]], each file path should be in this format: * For [[DefaultFileBasedRelation]], each file path should be in this format:

Просмотреть файл

@ -36,11 +36,13 @@ import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf}
*/ */
class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider {
private val supportedFormats: CacheWithTransform[String, Set[String]] = private val supportedFormats: CacheWithTransform[String, Set[String]] =
new CacheWithTransform[String, Set[String]]({ () => new CacheWithTransform[String, Set[String]](
HyperspaceConf.supportedFileFormatsForDefaultFileBasedSource(spark) { () =>
}, { formats => HyperspaceConf.supportedFileFormatsForDefaultFileBasedSource(spark)
formats.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet },
}) { formats =>
formats.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet
})
/** /**
* Returns true if the given [[FileFormat]] is supported, false otherwise. * Returns true if the given [[FileFormat]] is supported, false otherwise.
@ -71,15 +73,16 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS
* @param plan Logical plan to check if it's supported. * @param plan Logical plan to check if it's supported.
* @return Some(true) if the given plan is a supported relation, otherwise None. * @return Some(true) if the given plan is a supported relation, otherwise None.
*/ */
def isSupportedRelation(plan: LogicalPlan): Option[Boolean] = plan match { def isSupportedRelation(plan: LogicalPlan): Option[Boolean] =
case LogicalRelation( plan match {
HadoopFsRelation(_: PartitioningAwareFileIndex, _, _, _, fileFormat, _), case LogicalRelation(
_, HadoopFsRelation(_: PartitioningAwareFileIndex, _, _, _, fileFormat, _),
_, _,
_) if isSupportedFileFormat(fileFormat) => _,
Some(true) _) if isSupportedFileFormat(fileFormat) =>
case _ => None Some(true)
} case _ => None
}
/** /**
* Returns the [[FileBasedRelation]] that wraps the given logical plan if the given * Returns the [[FileBasedRelation]] that wraps the given logical plan if the given

Просмотреть файл

@ -45,11 +45,12 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase
* @param plan Logical plan to check if it's supported. * @param plan Logical plan to check if it's supported.
* @return Some(true) if the given plan is a supported relation, otherwise None. * @return Some(true) if the given plan is a supported relation, otherwise None.
*/ */
def isSupportedRelation(plan: LogicalPlan): Option[Boolean] = plan match { def isSupportedRelation(plan: LogicalPlan): Option[Boolean] =
case LogicalRelation(HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _), _, _, _) => plan match {
Some(true) case LogicalRelation(HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _), _, _, _) =>
case _ => None Some(true)
} case _ => None
}
/** /**
* Returns the [[FileBasedRelation]] that wraps the given logical plan if the given * Returns the [[FileBasedRelation]] that wraps the given logical plan if the given

Просмотреть файл

@ -37,10 +37,11 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
/** /**
* Computes the signature of the current relation. * Computes the signature of the current relation.
*/ */
override def signature: String = plan.relation match { override def signature: String =
case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => plan.relation match {
location.tableVersion + location.path.toString case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) =>
} location.tableVersion + location.path.toString
}
/** /**
* All the files that the current relation references to. * All the files that the current relation references to.
@ -57,11 +58,12 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
/** /**
* The optional partition base path of the current relation. * The optional partition base path of the current relation.
*/ */
override def partitionBasePath: Option[String] = plan.relation match { override def partitionBasePath: Option[String] =
case HadoopFsRelation(t: TahoeLogFileIndex, _, _, _, _, _) if t.partitionSchema.nonEmpty => plan.relation match {
Some(t.path.toString) case HadoopFsRelation(t: TahoeLogFileIndex, _, _, _, _, _) if t.partitionSchema.nonEmpty =>
case _ => None Some(t.path.toString)
} case _ => None
}
/** /**
* Creates [[Relation]] for IndexLogEntry using the current relation. * Creates [[Relation]] for IndexLogEntry using the current relation.
@ -179,8 +181,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
// TODO: Support time travel utilizing Hybrid Scan append-only. // TODO: Support time travel utilizing Hybrid Scan append-only.
// See https://github.com/microsoft/hyperspace/issues/408. // See https://github.com/microsoft/hyperspace/issues/408.
if (!(HyperspaceConf.hybridScanEnabled(spark) && if (!(HyperspaceConf.hybridScanEnabled(spark) &&
HyperspaceConf.hybridScanDeleteEnabled(spark) && HyperspaceConf.hybridScanDeleteEnabled(spark) &&
index.derivedDataset.canHandleDeletedFiles)) { index.derivedDataset.canHandleDeletedFiles)) {
return index return index
} }

Просмотреть файл

@ -47,9 +47,12 @@ class DeltaLakeRelationMetadata(metadata: Relation) extends FileBasedRelationMet
val deltaVerHistory = metadata.options.get("versionAsOf").map { deltaVersion => val deltaVerHistory = metadata.options.get("versionAsOf").map { deltaVersion =>
val newVersionMapping = s"$indexVersion:$deltaVersion" val newVersionMapping = s"$indexVersion:$deltaVersion"
DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY -> DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY ->
properties.get(DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY).map { prop => properties
s"$prop,$newVersionMapping" .get(DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY)
}.getOrElse(newVersionMapping) .map { prop =>
s"$prop,$newVersionMapping"
}
.getOrElse(newVersionMapping)
} }
properties ++ deltaVerHistory properties ++ deltaVerHistory
} }

Просмотреть файл

@ -64,7 +64,7 @@ package object hyperspace {
def isHyperspaceEnabled(): Boolean = { def isHyperspaceEnabled(): Boolean = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations.contains(ApplyHyperspace) && experimentalMethods.extraOptimizations.contains(ApplyHyperspace) &&
experimentalMethods.extraStrategies.contains(BucketUnionStrategy) experimentalMethods.extraStrategies.contains(BucketUnionStrategy)
} }
} }
} }

Просмотреть файл

@ -114,7 +114,7 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St
* @param message Message about event. * @param message Message about event.
*/ */
case class RefreshIncrementalActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) case class RefreshIncrementalActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent extends HyperspaceIndexCRUDEvent
/** /**
* Index Refresh Event for quick mode. Emitted when refresh is called on an index * Index Refresh Event for quick mode. Emitted when refresh is called on an index
@ -125,7 +125,7 @@ case class RefreshIncrementalActionEvent(appInfo: AppInfo, index: IndexLogEntry,
* @param message Message about event. * @param message Message about event.
*/ */
case class RefreshQuickActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) case class RefreshQuickActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent extends HyperspaceIndexCRUDEvent
/** /**
* Index Optimize Event for index files. * Index Optimize Event for index files.
@ -135,7 +135,7 @@ case class RefreshQuickActionEvent(appInfo: AppInfo, index: IndexLogEntry, messa
* @param message Message about event. * @param message Message about event.
*/ */
case class OptimizeActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) case class OptimizeActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent extends HyperspaceIndexCRUDEvent
/** /**
* Index usage event. This event is emitted when an index is picked instead of original data * Index usage event. This event is emitted when an index is picked instead of original data

Просмотреть файл

@ -94,7 +94,9 @@ object ResolverUtils {
*/ */
def apply(normalizedColumnName: String): ResolvedColumn = { def apply(normalizedColumnName: String): ResolvedColumn = {
if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) { if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) {
ResolvedColumn(normalizedColumnName.substring(NESTED_FIELD_PREFIX.length), isNested = true) ResolvedColumn(
normalizedColumnName.substring(NESTED_FIELD_PREFIX.length),
isNested = true)
} else { } else {
ResolvedColumn(normalizedColumnName, isNested = false) ResolvedColumn(normalizedColumnName, isNested = false)
} }
@ -212,20 +214,21 @@ object ResolverUtils {
private def getColumnNameFromSchema( private def getColumnNameFromSchema(
schema: StructType, schema: StructType,
resolvedColNameParts: Seq[String], resolvedColNameParts: Seq[String],
resolver: Resolver): Seq[String] = resolvedColNameParts match { resolver: Resolver): Seq[String] =
case h :: tail => resolvedColNameParts match {
val field = schema.find(f => resolver(f.name, h)).get case h :: tail =>
field match { val field = schema.find(f => resolver(f.name, h)).get
case StructField(name, s: StructType, _, _) => field match {
name +: getColumnNameFromSchema(s, tail, resolver) case StructField(name, s: StructType, _, _) =>
case StructField(_, _: ArrayType, _, _) => name +: getColumnNameFromSchema(s, tail, resolver)
// TODO: Nested arrays will be supported later case StructField(_, _: ArrayType, _, _) =>
throw HyperspaceException("Array types are not supported.") // TODO: Nested arrays will be supported later
case StructField(_, _: MapType, _, _) => throw HyperspaceException("Array types are not supported.")
// TODO: Nested maps will be supported later case StructField(_, _: MapType, _, _) =>
throw HyperspaceException("Map types are not supported") // TODO: Nested maps will be supported later
case f => Seq(f.name) throw HyperspaceException("Map types are not supported")
} case f => Seq(f.name)
case _ => Nil }
} case _ => Nil
}
} }

Просмотреть файл

@ -24,35 +24,83 @@ import org.apache.spark.sql.SparkSession
object SampleNestedData { object SampleNestedData {
val testData = Seq( val testData = Seq(
("2017-09-03", "810a20a2baa24ff3ad493bfbf064569a", "donde", 2, 1000, (
SampleNestedDataStruct("id1", SampleNestedDataLeaf("leaf_id1", 1))), "2017-09-03",
("2017-09-03", "fd093f8a05604515957083e70cb3dceb", "facebook", 1, 3000, "810a20a2baa24ff3ad493bfbf064569a",
SampleNestedDataStruct("id1", SampleNestedDataLeaf("leaf_id1", 2))), "donde",
("2017-09-03", "af3ed6a197a8447cba8bc8ea21fad208", "facebook", 1, 3000, 2,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 1))), 1000,
("2017-09-03", "975134eca06c4711a0406d0464cbe7d6", "facebook", 1, 4000, SampleNestedDataStruct("id1", SampleNestedDataLeaf("leaf_id1", 1))),
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 2))), (
("2018-09-03", "e90a6028e15b4f4593eef557daf5166d", "ibraco", 2, 3000, "2017-09-03",
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 5))), "fd093f8a05604515957083e70cb3dceb",
("2018-09-03", "576ed96b0d5340aa98a47de15c9f87ce", "facebook", 2, 3000, "facebook",
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id9", 1))), 1,
("2018-09-03", "50d690516ca641438166049a6303650c", "ibraco", 2, 1000, 3000,
SampleNestedDataStruct("id3", SampleNestedDataLeaf("leaf_id9", 10))), SampleNestedDataStruct("id1", SampleNestedDataLeaf("leaf_id1", 2))),
("2019-10-03", "380786e6495d4cd8a5dd4cc8d3d12917", "facebook", 2, 3000, (
SampleNestedDataStruct("id4", SampleNestedDataLeaf("leaf_id9", 12))), "2017-09-03",
("2019-10-03", "ff60e4838b92421eafc3e6ee59a9e9f1", "miperro", 2, 2000, "af3ed6a197a8447cba8bc8ea21fad208",
SampleNestedDataStruct("id5", SampleNestedDataLeaf("leaf_id9", 21))), "facebook",
("2019-10-03", "187696fe0a6a40cc9516bc6e47c70bc1", "facebook", 4, 3000, 1,
SampleNestedDataStruct("id6", SampleNestedDataLeaf("leaf_id9", 22)))) 3000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 1))),
(
"2017-09-03",
"975134eca06c4711a0406d0464cbe7d6",
"facebook",
1,
4000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 2))),
(
"2018-09-03",
"e90a6028e15b4f4593eef557daf5166d",
"ibraco",
2,
3000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 5))),
(
"2018-09-03",
"576ed96b0d5340aa98a47de15c9f87ce",
"facebook",
2,
3000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id9", 1))),
(
"2018-09-03",
"50d690516ca641438166049a6303650c",
"ibraco",
2,
1000,
SampleNestedDataStruct("id3", SampleNestedDataLeaf("leaf_id9", 10))),
(
"2019-10-03",
"380786e6495d4cd8a5dd4cc8d3d12917",
"facebook",
2,
3000,
SampleNestedDataStruct("id4", SampleNestedDataLeaf("leaf_id9", 12))),
(
"2019-10-03",
"ff60e4838b92421eafc3e6ee59a9e9f1",
"miperro",
2,
2000,
SampleNestedDataStruct("id5", SampleNestedDataLeaf("leaf_id9", 21))),
(
"2019-10-03",
"187696fe0a6a40cc9516bc6e47c70bc1",
"facebook",
4,
3000,
SampleNestedDataStruct("id6", SampleNestedDataLeaf("leaf_id9", 22))))
def save( def save(
spark: SparkSession, spark: SparkSession,
path: String, path: String,
columns: Seq[String], columns: Seq[String],
partitionColumns: Option[Seq[String]] = None): Unit = { partitionColumns: Option[Seq[String]] = None): Unit = {
val df = spark.createDataFrame( val df = spark.createDataFrame(spark.sparkContext.parallelize(testData)).toDF(columns: _*)
spark.sparkContext.parallelize(testData)
).toDF(columns: _*)
partitionColumns match { partitionColumns match {
case Some(pcs) => case Some(pcs) =>
df.write.partitionBy(pcs: _*).parquet(path) df.write.partitionBy(pcs: _*).parquet(path)

Просмотреть файл

@ -58,7 +58,6 @@ import com.microsoft.hyperspace.util.SparkTestShims.SimpleExplainCommand
* The explain files are saved to help debug later, they are not checked. Only the simplified * The explain files are saved to help debug later, they are not checked. Only the simplified
* plans are checked (by string comparison). * plans are checked (by string comparison).
* *
*
* To run the entire test suite: * To run the entire test suite:
* {{{ * {{{
* sbt "test:testOnly *PlanStabilitySuite" * sbt "test:testOnly *PlanStabilitySuite"
@ -178,14 +177,15 @@ trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging {
val exchangeIdMap = new mutable.HashMap[SparkPlan, Int]() val exchangeIdMap = new mutable.HashMap[SparkPlan, Int]()
val subqueriesMap = new mutable.HashMap[SparkPlan, Int]() val subqueriesMap = new mutable.HashMap[SparkPlan, Int]()
def getId(plan: SparkPlan): Int = plan match { def getId(plan: SparkPlan): Int =
case exchange: Exchange => exchangeIdMap.getOrElseUpdate(exchange, exchangeIdMap.size + 1) plan match {
case ReusedExchangeExec(_, exchange) => case exchange: Exchange => exchangeIdMap.getOrElseUpdate(exchange, exchangeIdMap.size + 1)
exchangeIdMap.getOrElseUpdate(exchange, exchangeIdMap.size + 1) case ReusedExchangeExec(_, exchange) =>
case subquery: SubqueryExec => exchangeIdMap.getOrElseUpdate(exchange, exchangeIdMap.size + 1)
subqueriesMap.getOrElseUpdate(subquery, subqueriesMap.size + 1) case subquery: SubqueryExec =>
case _ => -1 subqueriesMap.getOrElseUpdate(subquery, subqueriesMap.size + 1)
} case _ => -1
}
/** /**
* Some expression names have ExprId in them due to using things such as * Some expression names have ExprId in them due to using things such as
@ -273,7 +273,9 @@ trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging {
*/ */
class TPCDSV1_4_SparkPlanStabilitySuite extends PlanStabilitySuite { class TPCDSV1_4_SparkPlanStabilitySuite extends PlanStabilitySuite {
override val goldenFilePath: String = { override val goldenFilePath: String = {
new File(baseResourcePath, s"spark-${BuildInfo.sparkShortVersion}/approved-plans-v1_4").getAbsolutePath new File(
baseResourcePath,
s"spark-${BuildInfo.sparkShortVersion}/approved-plans-v1_4").getAbsolutePath
} }
// Enable cross join because some queries fail during query optimization phase. // Enable cross join because some queries fail during query optimization phase.

Просмотреть файл

@ -536,8 +536,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
|`t_shift` STRING, |`t_shift` STRING,
|`t_sub_shift` STRING, |`t_sub_shift` STRING,
|`t_meal_time` STRING |`t_meal_time` STRING
""".stripMargin """.stripMargin)
)
val tableNames: Iterable[String] = tableColumns.keys val tableNames: Iterable[String] = tableColumns.keys
@ -546,8 +545,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {
tableName: String, tableName: String,
format: String = "parquet", format: String = "parquet",
options: Seq[String] = Nil): Unit = { options: Seq[String] = Nil): Unit = {
spark.sql( spark.sql(s"""
s"""
|CREATE TABLE `$tableName` (${tableColumns(tableName)}) |CREATE TABLE `$tableName` (${tableColumns(tableName)})
|USING $format |USING $format
|${options.mkString("\n")} |${options.mkString("\n")}

Просмотреть файл

@ -109,7 +109,7 @@ class BucketUnionTest extends SparkFunSuite with SparkInvolvedSuite {
val rdd = new BucketUnionRDD[Row](spark.sparkContext, Seq(p1.rdd, p2.rdd), bucketSpec) val rdd = new BucketUnionRDD[Row](spark.sparkContext, Seq(p1.rdd, p2.rdd), bucketSpec)
assert( assert(
rdd.collect.sortBy(r => (r.getInt(0), r.getString(1))).map(r => r.toSeq.toList).toList rdd.collect.sortBy(r => (r.getInt(0), r.getString(1))).map(r => r.toSeq.toList).toList
== Seq(Seq(2, "name1"), Seq(2, "name3"), Seq(3, "name2"), Seq(3, "name4"))) == Seq(Seq(2, "name1"), Seq(2, "name3"), Seq(3, "name2"), Seq(3, "name4")))
assert(rdd.getPartitions.length == 10) assert(rdd.getPartitions.length == 10)
assert(rdd.partitions.head.isInstanceOf[BucketUnionRDDPartition]) assert(rdd.partitions.head.isInstanceOf[BucketUnionRDDPartition])

Просмотреть файл

@ -141,10 +141,7 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
val dfB = nonPartitionedDataDF.as("B") val dfB = nonPartitionedDataDF.as("B")
val dfJoin = dfA val dfJoin = dfA
.join(dfB, dfA("Query") === dfB("Query")) .join(dfB, dfA("Query") === dfB("Query"))
.select( .select(dfA("RGUID"), dfA("Query"), dfA("nested.leaf.cnt"))
dfA("RGUID"),
dfA("Query"),
dfA("nested.leaf.cnt"))
val exception = intercept[HyperspaceException] { val exception = intercept[HyperspaceException] {
hyperspace.createIndex(dfJoin, indexConfig1) hyperspace.createIndex(dfJoin, indexConfig1)
} }
@ -153,7 +150,8 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
"Only creating index over HDFS file based scan nodes is supported.")) "Only creating index over HDFS file based scan nodes is supported."))
} }
test("Check lineage in index records for partitioned data when partition key is not in config.") { test(
"Check lineage in index records for partitioned data when partition key is not in config.") {
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
hyperspace.createIndex(partitionedDataDF, indexConfig2) hyperspace.createIndex(partitionedDataDF, indexConfig2)
val indexRecordsDF = spark.read.parquet( val indexRecordsDF = spark.read.parquet(

Просмотреть файл

@ -145,10 +145,7 @@ class CreateIndexTest extends HyperspaceSuite with SQLHelper {
val dfB = nonPartitionedDataDF.as("B") val dfB = nonPartitionedDataDF.as("B")
val dfJoin = dfA val dfJoin = dfA
.join(dfB, dfA("Query") === dfB("Query")) .join(dfB, dfA("Query") === dfB("Query"))
.select( .select(dfA("RGUID"), dfA("Query"), dfA("imprs"))
dfA("RGUID"),
dfA("Query"),
dfA("imprs"))
val exception = intercept[HyperspaceException] { val exception = intercept[HyperspaceException] {
hyperspace.createIndex(dfJoin, indexConfig1) hyperspace.createIndex(dfJoin, indexConfig1)
} }
@ -171,7 +168,8 @@ class CreateIndexTest extends HyperspaceSuite with SQLHelper {
} }
} }
test("Check lineage in index records for partitioned data when partition key is not in config.") { test(
"Check lineage in index records for partitioned data when partition key is not in config.") {
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
hyperspace.createIndex(partitionedDataDF, indexConfig3) hyperspace.createIndex(partitionedDataDF, indexConfig3)
val indexRecordsDF = spark.read.parquet( val indexRecordsDF = spark.read.parquet(
@ -201,7 +199,8 @@ class CreateIndexTest extends HyperspaceSuite with SQLHelper {
} }
} }
test("Check lineage in index records for partitioned data when partition key is in load path.") { test(
"Check lineage in index records for partitioned data when partition key is in load path.") {
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
val dataDF = val dataDF =
spark.read.parquet(s"$partitionedDataPath/${partitionKeys.head}=2017-09-03") spark.read.parquet(s"$partitionedDataPath/${partitionKeys.head}=2017-09-03")

Просмотреть файл

@ -549,10 +549,10 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
private def getDeltaLakeTableVersion(df: DataFrame): Long = { private def getDeltaLakeTableVersion(df: DataFrame): Long = {
df.queryExecution.optimizedPlan match { df.queryExecution.optimizedPlan match {
case LogicalRelation( case LogicalRelation(
HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _), HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _),
_, _,
_, _,
_) => _) =>
location.tableVersion location.tableVersion
} }
} }
@ -586,10 +586,10 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
def isIndexUsed(plan: LogicalPlan, expectedPathsSubStr: String*): Boolean = { def isIndexUsed(plan: LogicalPlan, expectedPathsSubStr: String*): Boolean = {
val rootPaths = plan.collect { val rootPaths = plan.collect {
case LogicalRelation( case LogicalRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_, _,
_, _,
_) => _) =>
location.rootPaths location.rootPaths
}.flatten }.flatten
rootPaths.nonEmpty && rootPaths.forall(p => rootPaths.nonEmpty && rootPaths.forall(p =>

Просмотреть файл

@ -435,7 +435,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfAfterHyperspaceDisabled))) sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfAfterHyperspaceDisabled)))
} }
test("Verify JoinIndexRule utilizes indexes correctly after incremental refresh (append-only).") { test(
"Verify JoinIndexRule utilizes indexes correctly after incremental refresh (append-only).") {
withTempPathAsString { testPath => withTempPathAsString { testPath =>
// Setup. Create data. // Setup. Create data.
val indexConfig = IndexConfig("index", Seq("c2"), Seq("c4")) val indexConfig = IndexConfig("index", Seq("c2"), Seq("c4"))
@ -596,7 +597,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
verifyIndexUsage( verifyIndexUsage(
query, query,
getIndexFilesPath(indexConfig.indexName, Seq(1)) ++ // for Left getIndexFilesPath(indexConfig.indexName, Seq(1)) ++ // for Left
getIndexFilesPath(indexConfig.indexName, Seq(1))) // for Right getIndexFilesPath(indexConfig.indexName, Seq(1))
) // for Right
// Verify correctness of results. // Verify correctness of results.
spark.disableHyperspace() spark.disableHyperspace()
@ -659,7 +661,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
verifyIndexUsage( verifyIndexUsage(
query, query,
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++ getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles) // for Right getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles
) // for Right
// Verify correctness of results. // Verify correctness of results.
spark.disableHyperspace() spark.disableHyperspace()
@ -1021,10 +1024,10 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = { private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = {
optimizedPlan.collect { optimizedPlan.collect {
case LogicalRelation( case LogicalRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_, _,
_, _,
_) => _) =>
location.rootPaths location.rootPaths
}.flatten }.flatten
} }

Просмотреть файл

@ -58,14 +58,15 @@ class FileIdTrackerTest extends SparkFunSuite {
val tracker = new FileIdTracker val tracker = new FileIdTracker
tracker.addFileInfo(Set(FileInfo("def", 123, 555, 10))) tracker.addFileInfo(Set(FileInfo("def", 123, 555, 10)))
val ex = intercept[HyperspaceException] { val ex = intercept[HyperspaceException] {
implicit def ordering: Ordering[FileInfo] = new Ordering[FileInfo] { implicit def ordering: Ordering[FileInfo] =
override def compare(x: FileInfo, y: FileInfo): Int = { new Ordering[FileInfo] {
x.name.compareTo(y.name) override def compare(x: FileInfo, y: FileInfo): Int = {
x.name.compareTo(y.name)
}
} }
} tracker.addFileInfo(
tracker.addFileInfo(scala.collection.immutable.SortedSet( scala.collection.immutable
FileInfo("abc", 100, 555, 15), .SortedSet(FileInfo("abc", 100, 555, 15), FileInfo("def", 123, 555, 11)))
FileInfo("def", 123, 555, 11)))
} }
assert(ex.getMessage.contains("Adding file info with a conflicting id")) assert(ex.getMessage.contains("Adding file info with a conflicting id"))
assert(tracker.getFileId("abc", 100, 555).contains(15)) assert(tracker.getFileId("abc", 100, 555).contains(15))

Просмотреть файл

@ -142,14 +142,16 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite {
val deletedRatio = 1 - (afterDeleteSize / sourceSize.toFloat) val deletedRatio = 1 - (afterDeleteSize / sourceSize.toFloat)
withSQLConf(TestConfig.HybridScanEnabled: _*) { withSQLConf(TestConfig.HybridScanEnabled: _*) {
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> withSQLConf(
(deletedRatio + 0.1).toString) { IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD ->
(deletedRatio + 0.1).toString) {
val filter = filterQuery val filter = filterQuery
// As deletedRatio is less than the threshold, the index can be applied. // As deletedRatio is less than the threshold, the index can be applied.
assert(!basePlan.equals(filter.queryExecution.optimizedPlan)) assert(!basePlan.equals(filter.queryExecution.optimizedPlan))
} }
withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> withSQLConf(
(deletedRatio - 0.1).toString) { IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD ->
(deletedRatio - 0.1).toString) {
val filter = filterQuery val filter = filterQuery
// As deletedRatio is greater than the threshold, the index shouldn't be applied. // As deletedRatio is greater than the threshold, the index shouldn't be applied.
assert(basePlan.equals(filter.queryExecution.optimizedPlan)) assert(basePlan.equals(filter.queryExecution.optimizedPlan))

Просмотреть файл

@ -129,8 +129,8 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
}.flatten }.flatten
val deletedFilesList = plan collect { val deletedFilesList = plan collect {
case Filter( case Filter(
Not(EqualTo(left: Attribute, right: Literal)), Not(EqualTo(left: Attribute, right: Literal)),
LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// Check new filter condition on lineage column. // Check new filter condition on lineage column.
val colName = left.toString val colName = left.toString
val deletedFile = right.toString val deletedFile = right.toString
@ -142,8 +142,8 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
assert(files.nonEmpty && files.forall(_.contains(indexName))) assert(files.nonEmpty && files.forall(_.contains(indexName)))
deleted deleted
case Filter( case Filter(
Not(InSet(attr, deletedFileIds)), Not(InSet(attr, deletedFileIds)),
LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// Check new filter condition on lineage column. // Check new filter condition on lineage column.
assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_ID)) assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_ID))
val deleted = deletedFileIds.map(_.toString).toSeq val deleted = deletedFileIds.map(_.toString).toSeq
@ -156,8 +156,8 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
assert(files.nonEmpty && files.forall(_.contains(indexName))) assert(files.nonEmpty && files.forall(_.contains(indexName)))
deleted deleted
case Filter( case Filter(
Not(In(attr, deletedFileIds)), Not(In(attr, deletedFileIds)),
LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// Check new filter condition on lineage column. // Check new filter condition on lineage column.
assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_ID)) assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_ID))
val deleted = deletedFileIds.map(_.toString) val deleted = deletedFileIds.map(_.toString)
@ -290,7 +290,8 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
case p @ BucketUnionExec(children, bucketSpec) => case p @ BucketUnionExec(children, bucketSpec) =>
assert(children.size === 2) assert(children.size === 2)
// children.head is always the index plan. // children.head is always the index plan.
assert(children.head.isInstanceOf[ProjectExec] || children.head.isInstanceOf[FilterExec]) assert(
children.head.isInstanceOf[ProjectExec] || children.head.isInstanceOf[FilterExec])
assert(children.last.isInstanceOf[ShuffleExchangeExec]) assert(children.last.isInstanceOf[ShuffleExchangeExec])
assert(bucketSpec.numBuckets === 200) assert(bucketSpec.numBuckets === 200)
p p
@ -582,7 +583,8 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
} }
} }
test("Delete-only: join rule, deleted files should be excluded from each index data relation.") { test(
"Delete-only: join rule, deleted files should be excluded from each index data relation.") {
withTempPathAsString { testPath => withTempPathAsString { testPath =>
val deletePath1 = testPath + "/delete1" val deletePath1 = testPath + "/delete1"
val deletePath2 = testPath + "/delete2" val deletePath2 = testPath + "/delete2"

Просмотреть файл

@ -376,10 +376,10 @@ class IcebergIntegrationTest extends QueryTest with HyperspaceSuite {
def isIndexUsed(plan: LogicalPlan, expectedPathsSubStr: String*): Boolean = { def isIndexUsed(plan: LogicalPlan, expectedPathsSubStr: String*): Boolean = {
val rootPaths = plan.collect { val rootPaths = plan.collect {
case LogicalRelation( case LogicalRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_, _,
_, _,
_) => _) =>
location.rootPaths location.rootPaths
}.flatten }.flatten
rootPaths.nonEmpty && rootPaths.forall(p => rootPaths.nonEmpty && rootPaths.forall(p =>

Просмотреть файл

@ -48,12 +48,7 @@ class IndexCacheTest extends HyperspaceSuite {
val entry = IndexLogEntry( val entry = IndexLogEntry(
"index1", "index1",
CoveringIndex( CoveringIndex(Seq("RGUID"), Seq("Date"), schema, 10, Map()),
Seq("RGUID"),
Seq("Date"),
schema,
10,
Map()),
Content(Directory(indexDir)), Content(Directory(indexDir)),
Source(SparkPlan(sourcePlanProperties)), Source(SparkPlan(sourcePlanProperties)),
Map()) Map())
@ -152,7 +147,7 @@ class IndexCacheTest extends HyperspaceSuite {
/** /**
* Mock for testing purposes so we can validate and invalidate entries based on time. * Mock for testing purposes so we can validate and invalidate entries based on time.
*
* @param time Current time. * @param time Current time.
*/ */
class MockClock(private var time: Long = 0L) extends Clock { class MockClock(private var time: Long = 0L) extends Clock {

Просмотреть файл

@ -47,12 +47,7 @@ class IndexCollectionManagerTest extends HyperspaceSuite {
val entry = IndexLogEntry( val entry = IndexLogEntry(
indexPath.toString, indexPath.toString,
CoveringIndex( CoveringIndex(Seq("RGUID"), Seq("Date"), new StructType(), 10, Map()),
Seq("RGUID"),
Seq("Date"),
new StructType(),
10,
Map()),
Content(Directory(s"$indexPath/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")), Content(Directory(s"$indexPath/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")),
Source(SparkPlan(sourcePlanProperties)), Source(SparkPlan(sourcePlanProperties)),
Map()) Map())
@ -98,12 +93,7 @@ class IndexCollectionManagerTest extends HyperspaceSuite {
val entry = IndexLogEntry( val entry = IndexLogEntry(
str, str,
CoveringIndex( CoveringIndex(Seq("RGUID"), Seq("Date"), new StructType(), 10, Map()),
Seq("RGUID"),
Seq("Date"),
new StructType(),
10,
Map()),
Content(Directory(s"$str/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")), Content(Directory(s"$str/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")),
Source(SparkPlan(sourcePlanProperties)), Source(SparkPlan(sourcePlanProperties)),
Map()) Map())

Просмотреть файл

@ -193,43 +193,24 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
Seq( Seq(
Relation( Relation(
Seq("rootpath"), Seq("rootpath"),
Hdfs( Hdfs(Hdfs.Properties(
Hdfs.Properties( Content(Directory(
Content( "test",
Directory( Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)),
"test", Seq())),
Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), Some(Update(None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))))))),
Seq()
)
),
Some(
Update(
None,
Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2)))))
)
)
)
),
new StructType(), new StructType(),
"type", "type",
Map() Map())),
)
),
null, null,
null, null,
LogicalPlanFingerprint( LogicalPlanFingerprint(
LogicalPlanFingerprint LogicalPlanFingerprint
.Properties(Seq(Signature("provider", "signatureValue"))) .Properties(Seq(Signature("provider", "signatureValue")))))
))
val expected = IndexLogEntry.create( val expected = IndexLogEntry.create(
"indexName", "indexName",
CoveringIndex( CoveringIndex(Seq("col1"), Seq("col2", "col3"), schema, 200, Map()),
Seq("col1"),
Seq("col2", "col3"),
schema,
200,
Map()),
Content(Directory("rootContentPath")), Content(Directory("rootContentPath")),
Source(SparkPlan(expectedSourcePlanProperties)), Source(SparkPlan(expectedSourcePlanProperties)),
Map()) Map())
@ -242,15 +223,19 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
} }
test("Content.files api lists all files from Content object.") { test("Content.files api lists all files from Content object.") {
val content = Content(Directory("file:/", subDirs = val content = Content(
Seq( Directory(
Directory("a", "file:/",
files = Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), subDirs = Seq(Directory(
"a",
files =
Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq( subDirs = Seq(
Directory("b", Directory(
files = "b",
Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID))))) files = Seq(
))) FileInfo("f3", 0, 0, UNKNOWN_FILE_ID),
FileInfo("f4", 0, 0, UNKNOWN_FILE_ID))))))))
val expected = val expected =
Seq("file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4").map(new Path(_)).toSet Seq("file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4").map(new Path(_)).toSet
@ -262,8 +247,9 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val nestedDirPath = toPath(nestedDir) val nestedDirPath = toPath(nestedDir)
val expected = { val expected = {
val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => val fileInfos = Seq(f3, f4)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val nestedDirDirectory = Directory("nested", fileInfos) val nestedDirDirectory = Directory("nested", fileInfos)
val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory)
Content(rootDirectory, NoOpFingerprint()) Content(rootDirectory, NoOpFingerprint())
@ -277,8 +263,9 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val nestedDirPath = toPath(nestedDir) val nestedDirPath = toPath(nestedDir)
val expected = { val expected = {
val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => val fileInfos = Seq(f3, f4)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val nestedDirDirectory = Directory("nested", fileInfos) val nestedDirDirectory = Directory("nested", fileInfos)
val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory)
Content(rootDirectory, NoOpFingerprint()) Content(rootDirectory, NoOpFingerprint())
@ -292,8 +279,9 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val nestedDirPath = toPath(nestedDir) val nestedDirPath = toPath(nestedDir)
val expected = { val expected = {
val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => val fileInfos = Seq(f3, f4)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val nestedDirDirectory = Directory("nested", fileInfos) val nestedDirDirectory = Directory("nested", fileInfos)
createDirectory(nestedDirPath, nestedDirDirectory) createDirectory(nestedDirPath, nestedDirDirectory)
} }
@ -302,17 +290,21 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
assert(directoryEquals(actual, expected)) assert(directoryEquals(actual, expected))
} }
test("Directory.fromDirectory api creates the correct Directory objects, " + test(
"recursively listing all leaf files.") { "Directory.fromDirectory api creates the correct Directory objects, " +
"recursively listing all leaf files.") {
val testDirPath = toPath(testDir) val testDirPath = toPath(testDir)
val testDirLeafFiles = val testDirLeafFiles =
Seq(f1, f2).map(toFileStatus).map(f => Seq(f1, f2)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val nestedDirLeafFiles = val nestedDirLeafFiles =
Seq(f3, f4).map(toFileStatus).map(f => Seq(f3, f4)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
val testDirDirectory = Directory(name = "testDir", .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val testDirDirectory = Directory(
name = "testDir",
files = testDirLeafFiles, files = testDirLeafFiles,
subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles)))
val expected = createDirectory(testDirPath, testDirDirectory) val expected = createDirectory(testDirPath, testDirDirectory)
@ -326,12 +318,15 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val testDirPath = toPath(testDir) val testDirPath = toPath(testDir)
val testDirLeafFiles = val testDirLeafFiles =
Seq(f1, f2).map(toFileStatus).map(f => Seq(f1, f2)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val nestedDirLeafFiles = val nestedDirLeafFiles =
Seq(f3, f4).map(toFileStatus).map(f => Seq(f3, f4)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
val testDirDirectory = Directory(name = "testDir", .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val testDirDirectory = Directory(
name = "testDir",
files = testDirLeafFiles, files = testDirLeafFiles,
subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles)))
@ -345,12 +340,15 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
test("Directory.fromLeafFiles api does not include other files in the directory.") { test("Directory.fromLeafFiles api does not include other files in the directory.") {
val testDirPath = toPath(testDir) val testDirPath = toPath(testDir)
val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => val testDirLeafFiles = Seq(f1)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val nestedDirLeafFiles = val nestedDirLeafFiles =
Seq(f4).map(toFileStatus).map(f => Seq(f4)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
val testDirDirectory = Directory(name = "testDir", .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val testDirDirectory = Directory(
name = "testDir",
files = testDirLeafFiles, files = testDirLeafFiles,
subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles)))
@ -389,8 +387,9 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
override def accept(path: Path): Boolean = path.getName.startsWith("f1") override def accept(path: Path): Boolean = path.getName.startsWith("f1")
} }
val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => val testDirLeafFiles = Seq(f1)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) .map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))
val testDirDirectory = Directory(name = "testDir", files = testDirLeafFiles) val testDirDirectory = Directory(name = "testDir", files = testDirLeafFiles)
val expected = createDirectory(testDirPath, testDirDirectory) val expected = createDirectory(testDirPath, testDirDirectory)
@ -400,8 +399,9 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
assert(directoryEquals(actual, expected)) assert(directoryEquals(actual, expected))
} }
test("Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + test(
"dirs.") { "Directory.fromDirectory and fromLeafFileswhere files are at same level but different" +
"dirs.") {
// File Structure // File Structure
// testDir/temp/a/f1 // testDir/temp/a/f1
// testDir/temp/b/f2 // testDir/temp/b/f2
@ -413,11 +413,17 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val f2 = Files.createFile(Paths.get(b + "/f2")) val f2 = Files.createFile(Paths.get(b + "/f2"))
val aDirectory = val aDirectory =
Directory("a", Seq(f1).map(toFileStatus).map(f => Directory(
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) "a",
Seq(f1)
.map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)))
val bDirectory = val bDirectory =
Directory("b", Seq(f2).map(toFileStatus).map(f => Directory(
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) "b",
Seq(f2)
.map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)))
val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory))
val tempDirectoryPath = toPath(tempDir) val tempDirectoryPath = toPath(tempDir)
@ -445,12 +451,18 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val f2 = Files.createFile(Paths.get(c + "/f2")) val f2 = Files.createFile(Paths.get(c + "/f2"))
val cDirectory = val cDirectory =
Directory("c", Seq(f2).map(toFileStatus).map(f => Directory(
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) "c",
Seq(f2)
.map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)))
val bDirectory = Directory("b", subDirs = Seq(cDirectory)) val bDirectory = Directory("b", subDirs = Seq(cDirectory))
val aDirectory = val aDirectory =
Directory("a", Seq(f1).map(toFileStatus).map(f => Directory(
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) "a",
Seq(f1)
.map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)))
val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory))
val tempDirectoryPath = toPath(tempDir) val tempDirectoryPath = toPath(tempDir)
@ -465,8 +477,9 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
FileUtils.deleteDirectory(tempDir.toFile) FileUtils.deleteDirectory(tempDir.toFile)
} }
test("Directory.fromDirectory and fromLeafFiles where files belong to multiple" + test(
"subdirectories.") { "Directory.fromDirectory and fromLeafFiles where files belong to multiple" +
"subdirectories.") {
// File Structure // File Structure
// testDir/temp/a/f1 // testDir/temp/a/f1
// testDir/temp/a/b/f2 // testDir/temp/a/b/f2
@ -481,17 +494,23 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
val f3 = Files.createFile(Paths.get(c + "/f3")) val f3 = Files.createFile(Paths.get(c + "/f3"))
val bDirectory = val bDirectory =
Directory("b", Seq(f2).map(toFileStatus).map(f => Directory(
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) "b",
Seq(f2)
.map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)))
val cDirectory = val cDirectory =
Directory("c", Seq(f3).map(toFileStatus).map(f => Directory(
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) "c",
Seq(f3)
.map(toFileStatus)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)))
val aDirectory = Directory( val aDirectory = Directory(
"a", "a",
Seq(f1).map(toFileStatus).map(f => Seq(f1)
FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), .map(toFileStatus)
Seq(bDirectory, cDirectory) .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)),
) Seq(bDirectory, cDirectory))
val tempDirectory = Directory("temp", subDirs = Seq(aDirectory)) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory))
val tempDirectoryPath = toPath(tempDir) val tempDirectoryPath = toPath(tempDir)
@ -511,11 +530,7 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
// a/f2 // a/f2
val directory1 = Directory( val directory1 = Directory(
name = "a", name = "a",
files = Seq( files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)))
FileInfo("f1", 100L, 100L, 1L),
FileInfo("f2", 100L, 100L, 2L)
)
)
// directory2: // directory2:
// a/b/f3 // a/b/f3
@ -525,13 +540,7 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
subDirs = Seq( subDirs = Seq(
Directory( Directory(
name = "b", name = "b",
files = Seq( files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)))))
FileInfo("f3", 100L, 100L, 3L),
FileInfo("f4", 100L, 100L, 4L)
)
)
)
)
// Expected result of merging directory1 and directory2: // Expected result of merging directory1 and directory2:
// a/f1 // a/f1
@ -540,20 +549,11 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
// a/b/f4 // a/b/f4
val expected = Directory( val expected = Directory(
name = "a", name = "a",
files = Seq( files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)),
FileInfo("f1", 100L, 100L, 1L),
FileInfo("f2", 100L, 100L, 2L)
),
subDirs = Seq( subDirs = Seq(
Directory( Directory(
name = "b", name = "b",
files = Seq( files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)))))
FileInfo("f3", 100L, 100L, 3L),
FileInfo("f4", 100L, 100L, 4L)
)
)
)
)
val actual1 = directory1.merge(directory2) val actual1 = directory1.merge(directory2)
val actual2 = directory2.merge(directory1) val actual2 = directory2.merge(directory1)
@ -569,14 +569,8 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
// a/b/f3 // a/b/f3
val directory1 = Directory( val directory1 = Directory(
name = "a", name = "a",
files = Seq( files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)),
FileInfo("f1", 100L, 100L, 1L), subDirs = Seq(Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L)))))
FileInfo("f2", 100L, 100L, 2L)
),
subDirs = Seq(
Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L)))
)
)
// directory2: // directory2:
// a/f4 // a/f4
@ -589,17 +583,8 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
subDirs = Seq( subDirs = Seq(
Directory( Directory(
name = "b", name = "b",
files = Seq( files = Seq(FileInfo("f5", 100L, 100L, 5L), FileInfo("f6", 100L, 100L, 6L)),
FileInfo("f5", 100L, 100L, 5L), subDirs = Seq(Directory(name = "c", files = Seq(FileInfo("f7", 100L, 100L, 7L)))))))
FileInfo("f6", 100L, 100L, 6L)
),
subDirs = Seq(Directory(
name = "c",
files = Seq(FileInfo("f7", 100L, 100L, 7L))
))
)
)
)
// Expected result of merging directory1 and directory2: // Expected result of merging directory1 and directory2:
// directory1: // directory1:
@ -615,23 +600,15 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
files = Seq( files = Seq(
FileInfo("f1", 100L, 100L, 1L), FileInfo("f1", 100L, 100L, 1L),
FileInfo("f2", 100L, 100L, 2L), FileInfo("f2", 100L, 100L, 2L),
FileInfo("f4", 100L, 100L, 4L) FileInfo("f4", 100L, 100L, 4L)),
),
subDirs = Seq( subDirs = Seq(
Directory( Directory(
name = "b", name = "b",
files = Seq( files = Seq(
FileInfo("f3", 100L, 100L, 3L), FileInfo("f3", 100L, 100L, 3L),
FileInfo("f5", 100L, 100L, 5L), FileInfo("f5", 100L, 100L, 5L),
FileInfo("f6", 100L, 100L, 6L) FileInfo("f6", 100L, 100L, 6L)),
), subDirs = Seq(Directory("c", files = Seq(FileInfo("f7", 100L, 100L, 7L)))))))
subDirs = Seq(
Directory("c",
files = Seq(FileInfo("f7", 100L, 100L, 7L)))
)
)
)
)
val actual1 = directory1.merge(directory2) val actual1 = directory1.merge(directory2)
val actual2 = directory2.merge(directory1) val actual2 = directory2.merge(directory1)
@ -646,19 +623,17 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
// a/f2 // a/f2
val directory1 = Directory( val directory1 = Directory(
name = "a", name = "a",
files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)) files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)))
)
// directory2: // directory2:
// b/f3 // b/f3
// b/f4 // b/f4
val directory2 = Directory( val directory2 = Directory(
name = "b", name = "b",
files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)) files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)))
)
val ex1 = intercept[HyperspaceException] (directory1.merge(directory2)) val ex1 = intercept[HyperspaceException](directory1.merge(directory2))
val ex2 = intercept[HyperspaceException] (directory2.merge(directory1)) val ex2 = intercept[HyperspaceException](directory2.merge(directory1))
assert(ex1.msg.contains("Merging directories with names a and b failed.")) assert(ex1.msg.contains("Merging directories with names a and b failed."))
assert(ex2.msg.contains("Merging directories with names b and a failed.")) assert(ex2.msg.contains("Merging directories with names b and a failed."))
@ -670,19 +645,18 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
private def directoryEquals(dir1: Directory, dir2: Directory): Boolean = { private def directoryEquals(dir1: Directory, dir2: Directory): Boolean = {
dir1.name.equals(dir2.name) && dir1.name.equals(dir2.name) &&
dir1.files.toSet.equals(dir2.files.toSet) && dir1.files.toSet.equals(dir2.files.toSet) &&
dir1.subDirs.size.equals(dir2.subDirs.size) && dir1.subDirs.size.equals(dir2.subDirs.size) &&
dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall{ dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall {
case (d1, d2) => directoryEquals(d1, d2) case (d1, d2) => directoryEquals(d1, d2)
} }
} }
// Using `directoryPath`, create a Directory tree starting from root and ending at // Using `directoryPath`, create a Directory tree starting from root and ending at
// `leafDirectory`. // `leafDirectory`.
private def createDirectory(directoryPath: Path, leafDirectory: Directory): Directory = { private def createDirectory(directoryPath: Path, leafDirectory: Directory): Directory = {
TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { (accum, name) =>
(accum, name) => Directory(name, Seq(), Seq(accum))
Directory(name, Seq(), Seq(accum))
} }
} }
} }

Просмотреть файл

@ -417,8 +417,8 @@ class IndexManagerTest extends HyperspaceSuite with SQLHelper {
// Check emitted events. // Check emitted events.
MockEventLogger.emittedEvents match { MockEventLogger.emittedEvents match {
case Seq( case Seq(
OptimizeActionEvent(_, _, "Operation started."), OptimizeActionEvent(_, _, "Operation started."),
OptimizeActionEvent(_, _, msg)) => OptimizeActionEvent(_, _, msg)) =>
assert( assert(
msg.contains( msg.contains(
"Optimize aborted as no optimizable index files smaller than 1 found.")) "Optimize aborted as no optimizable index files smaller than 1 found."))
@ -477,8 +477,8 @@ class IndexManagerTest extends HyperspaceSuite with SQLHelper {
// Check emitted events. // Check emitted events.
MockEventLogger.emittedEvents match { MockEventLogger.emittedEvents match {
case Seq( case Seq(
OptimizeActionEvent(_, _, "Operation started."), OptimizeActionEvent(_, _, "Operation started."),
OptimizeActionEvent(_, _, msg)) => OptimizeActionEvent(_, _, msg)) =>
assert( assert(
msg.contains( msg.contains(
"Optimize aborted as no optimizable index files smaller than 268435456 found.")) "Optimize aborted as no optimizable index files smaller than 268435456 found."))
@ -751,16 +751,16 @@ class IndexManagerTest extends HyperspaceSuite with SQLHelper {
case Some(s) => case Some(s) =>
val relations = df.queryExecution.optimizedPlan.collect { val relations = df.queryExecution.optimizedPlan.collect {
case LogicalRelation( case LogicalRelation(
HadoopFsRelation( HadoopFsRelation(
location: PartitioningAwareFileIndex, location: PartitioningAwareFileIndex,
_,
dataSchema,
_,
fileFormat,
options),
_, _,
dataSchema,
_, _,
fileFormat, _) =>
options),
_,
_,
_) =>
val files = location.allFiles val files = location.allFiles
val sourceDataProperties = val sourceDataProperties =
Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get)

Просмотреть файл

@ -39,12 +39,7 @@ class IndexTest extends SparkFunSuite {
val entry = IndexLogEntry( val entry = IndexLogEntry(
config.indexName, config.indexName,
CoveringIndex( CoveringIndex(config.indexedColumns, config.includedColumns, schema, numBuckets, Map()),
config.indexedColumns,
config.includedColumns,
schema,
numBuckets,
Map()),
Content(Directory(path)), Content(Directory(path)),
Source(SparkPlan(sourcePlanProperties)), Source(SparkPlan(sourcePlanProperties)),
Map()) Map())

Просмотреть файл

@ -154,8 +154,8 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite {
// Check emitted events. // Check emitted events.
MockEventLogger.emittedEvents match { MockEventLogger.emittedEvents match {
case Seq( case Seq(
RefreshIncrementalActionEvent(_, _, "Operation started."), RefreshIncrementalActionEvent(_, _, "Operation started."),
RefreshIncrementalActionEvent(_, _, msg)) => RefreshIncrementalActionEvent(_, _, msg)) =>
assert(msg.contains("Refresh incremental aborted as no source data change found.")) assert(msg.contains("Refresh incremental aborted as no source data change found."))
case _ => fail() case _ => fail()
} }

Просмотреть файл

@ -152,8 +152,8 @@ class RefreshIndexTest extends QueryTest with HyperspaceSuite {
// Check emitted events. // Check emitted events.
MockEventLogger.emittedEvents match { MockEventLogger.emittedEvents match {
case Seq( case Seq(
RefreshIncrementalActionEvent(_, _, "Operation started."), RefreshIncrementalActionEvent(_, _, "Operation started."),
RefreshIncrementalActionEvent(_, _, msg)) => RefreshIncrementalActionEvent(_, _, msg)) =>
assert(msg.contains("Refresh incremental aborted as no source data change found.")) assert(msg.contains("Refresh incremental aborted as no source data change found."))
case _ => fail() case _ => fail()
} }

Просмотреть файл

@ -138,10 +138,10 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite {
private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = { private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = {
optimizedPlan.collect { optimizedPlan.collect {
case LogicalRelation( case LogicalRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_, _,
_, _,
_) => _) =>
location.rootPaths location.rootPaths
}.flatten }.flatten
} }

Просмотреть файл

@ -116,7 +116,6 @@ class JoinIndexRankerTest extends HyperspaceRuleSuite with SQLHelper {
val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList1, false) val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList1, false)
setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList1) setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList1)
val indexPairs = Seq((l_10, r_10), (l_10, r_20), (l_20, r_20)) val indexPairs = Seq((l_10, r_10), (l_10, r_20), (l_20, r_20))
val expectedOrder = Seq((l_20, r_20), (l_10, r_10), (l_10, r_20)) val expectedOrder = Seq((l_20, r_20), (l_10, r_10), (l_10, r_20))
val actualOrder = JoinIndexRanker.rank(spark, leftPlan, rightPlan, indexPairs) val actualOrder = JoinIndexRanker.rank(spark, leftPlan, rightPlan, indexPairs)

Просмотреть файл

@ -138,7 +138,8 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite {
} }
} }
test("Verify FilterIndex rule does not apply if filter does not contain first indexed column.") { test(
"Verify FilterIndex rule does not apply if filter does not contain first indexed column.") {
val filterCondition = val filterCondition =
And(IsNotNull(c2), EqualTo(c2, Literal("RGUID_VALUE"))) // c2 is not first indexed column And(IsNotNull(c2), EqualTo(c2, Literal("RGUID_VALUE"))) // c2 is not first indexed column
val filterNode = Filter(filterCondition, scanNode) val filterNode = Filter(filterCondition, scanNode)

Просмотреть файл

@ -259,7 +259,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
} }
} }
test("Join rule does not update plan if index doesn't satisfy included columns from any side.") { test(
"Join rule does not update plan if index doesn't satisfy included columns from any side.") {
val t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode) val t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode)
val t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode) val t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode)

Просмотреть файл

@ -39,12 +39,7 @@ class JsonUtilsTest extends SparkFunSuite {
val index = IndexLogEntry( val index = IndexLogEntry(
"myIndex", "myIndex",
CoveringIndex( CoveringIndex(Seq("id"), Seq("name", "school"), schema, 10, Map()),
Seq("id"),
Seq("name", "school"),
schema,
10,
Map()),
Content(Directory("path")), Content(Directory("path")),
Source(SparkPlan(sourcePlanProperties)), Source(SparkPlan(sourcePlanProperties)),
Map()) Map())