Refactoring for an extensible Index API: Part 2 (#474)

This commit is contained in:
Chungmin Lee 2021-06-29 06:51:17 +09:00 коммит произвёл GitHub
Родитель 896f734d4d
Коммит 7b594d4d66
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 97 добавлений и 41 удалений

Просмотреть файл

@ -72,8 +72,14 @@ class RefreshIncrementalAction(
} else { } else {
None None
} }
updatedIndex = Some(previousIndexLogEntry.derivedDataset val (updatedIndex, updateMode) =
.refreshIncremental(this, appendedSourceData, deletedFiles, previousIndexLogEntry.content)) previousIndexLogEntry.derivedDataset.refreshIncremental(
this,
appendedSourceData,
deletedFiles,
previousIndexLogEntry.content)
updatedIndexOpt = Some(updatedIndex)
updateModeOpt = Some(updateMode)
} }
/** /**
@ -95,7 +101,8 @@ class RefreshIncrementalAction(
} }
} }
private var updatedIndex: Option[Index] = None private var updatedIndexOpt: Option[Index] = None
private var updateModeOpt: Option[Index.UpdateMode] = None
/** /**
* Create a log entry with all source data files, and all required index content. This contains * Create a log entry with all source data files, and all required index content. This contains
@ -106,15 +113,11 @@ class RefreshIncrementalAction(
* @return Refreshed index log entry. * @return Refreshed index log entry.
*/ */
override def logEntry: LogEntry = { override def logEntry: LogEntry = {
val index = updatedIndex.getOrElse(previousIndexLogEntry.derivedDataset) val index = updatedIndexOpt.getOrElse(previousIndexLogEntry.derivedDataset)
val entry = val entry =
getIndexLogEntry(spark, df, previousIndexLogEntry.name, index, indexDataPath, endId) getIndexLogEntry(spark, df, previousIndexLogEntry.name, index, indexDataPath, endId)
// If there is no deleted files, there are index data files only for appended data in this if (updateModeOpt.contains(Index.UpdateMode.Merge)) {
// version and we need to add the index data files of previous index version.
// Otherwise, as previous index data is rewritten in this version while excluding
// indexed rows from deleted files, all necessary index data files exist in this version.
if (deletedFiles.isEmpty) {
// Merge new index files with old index files. // Merge new index files with old index files.
val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root)) val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root))
entry.copy(content = mergedContent) entry.copy(content = mergedContent)

Просмотреть файл

@ -22,7 +22,6 @@ import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.util.ResolverUtils import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn
@ -85,7 +84,7 @@ case class CoveringIndex(
ctx: IndexerContext, ctx: IndexerContext,
appendedSourceData: Option[DataFrame], appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo], deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): CoveringIndex = { indexContent: Content): (CoveringIndex, Index.UpdateMode) = {
val updatedIndex = if (appendedSourceData.nonEmpty) { val updatedIndex = if (appendedSourceData.nonEmpty) {
val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) = val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) =
CoveringIndex.createIndexData( CoveringIndex.createIndexData(
@ -123,7 +122,17 @@ case class CoveringIndex(
indexedColumns, indexedColumns,
writeMode) writeMode)
} }
updatedIndex
// If there is no deleted files, there are index data files only for appended data in this
// version and we need to add the index data files of previous index version.
// Otherwise, as previous index data is rewritten in this version while excluding
// indexed rows from deleted files, all necessary index data files exist in this version.
val updatedMode = if (deletedSourceDataFiles.isEmpty) {
Index.UpdateMode.Merge
} else {
Index.UpdateMode.Overwrite
}
(updatedIndex, updatedMode)
} }
override def refreshFull( override def refreshFull(
@ -221,8 +230,8 @@ object CoveringIndex {
includedColumns: Seq[String], includedColumns: Seq[String],
hasLineageColumn: Boolean): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = { hasLineageColumn: Boolean): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = ctx.spark val spark = ctx.spark
val (resolvedIndexedColumns, resolvedIncludedColumns) = val resolvedIndexedColumns = IndexUtils.resolveColumns(sourceData, indexedColumns)
resolveConfig(sourceData, indexedColumns, includedColumns) val resolvedIncludedColumns = IndexUtils.resolveColumns(sourceData, includedColumns)
val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn) val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn)
val indexData = val indexData =
@ -267,25 +276,4 @@ object CoveringIndex {
(indexData, resolvedIndexedColumns, resolvedIncludedColumns) (indexData, resolvedIndexedColumns, resolvedIncludedColumns)
} }
private def resolveConfig(
df: DataFrame,
indexedColumns: Seq[String],
includedColumns: Seq[String]): (Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, plan)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, plan)
(resolvedIndexedColumns, resolvedIncludedColumns) match {
case (Some(indexed), Some(included)) => (indexed, included)
case _ =>
val unresolvedColumns = (indexedColumns ++ includedColumns)
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
s"from available source columns:\n${df.schema.treeString}")
}
}
} }

Просмотреть файл

@ -118,14 +118,17 @@ trait Index {
* @param deletedSourceDataFiles Source data files deleted from the source * @param deletedSourceDataFiles Source data files deleted from the source
* since the creation of this index * since the creation of this index
* @param indexContent Unrefreshed index data files * @param indexContent Unrefreshed index data files
* @return Updated index resulting from the indexing operation, or this * @return Pair of (updated index, update mode) where the first value is an
* index if no update is needed * updated index resulting from the indexing operation or this index
* if no update is needed, and the second value is whether the
* updated index data needs to be merged to the existing index data
* or the existing index data should be overwritten
*/ */
def refreshIncremental( def refreshIncremental(
ctx: IndexerContext, ctx: IndexerContext,
appendedSourceData: Option[DataFrame], appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo], deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): Index indexContent: Content): (Index, Index.UpdateMode)
/** /**
* Indexes the source data and returns an updated index and index data. * Indexes the source data and returns an updated index and index data.
@ -140,4 +143,24 @@ trait Index {
* index if no update is needed * index if no update is needed
*/ */
def refreshFull(ctx: IndexerContext, sourceData: DataFrame): (Index, DataFrame) def refreshFull(ctx: IndexerContext, sourceData: DataFrame): (Index, DataFrame)
/**
* Returns true if and only if this index equals to that.
*
* Indexes only differing in their [[properties]] should be considered equal.
*/
def equals(that: Any): Boolean
/**
* Returns the hash code for this index.
*/
def hashCode: Int
}
object Index {
sealed trait UpdateMode
object UpdateMode {
case object Merge extends UpdateMode
case object Overwrite extends UpdateMode
}
} }

Просмотреть файл

@ -16,6 +16,12 @@
package com.microsoft.hyperspace.index package com.microsoft.hyperspace.index
import org.apache.spark.sql.DataFrame
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn
object IndexUtils { object IndexUtils {
/** /**
@ -32,4 +38,21 @@ object IndexUtils {
.getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT)
.toBoolean .toBoolean
} }
def resolveColumns(df: DataFrame, columns: Seq[String]): Seq[ResolvedColumn] = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val resolvedColumns = ResolverUtils.resolve(spark, columns, plan)
resolvedColumns match {
case Some(cs) => cs
case _ =>
val unresolvedColumns = columns
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
s"from available source columns:\n${df.schema.treeString}")
}
}
} }

Просмотреть файл

@ -117,15 +117,34 @@ class CreateActionTest extends HyperspaceSuite with SQLHelper {
ex.getMessage.contains(s"Another Index with name ${indexConfig.indexName} already exists")) ex.getMessage.contains(s"Another Index with name ${indexConfig.indexName} already exists"))
} }
test("op() fails if index config is of wrong case and spark is case-sensitive") { test("op() fails if indexed column is of wrong case and spark is case-sensitive") {
when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE))) when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE)))
val indexConfig = IndexConfig("index1", Seq("rgUID"), Seq("dATE")) val indexConfig = IndexConfig("index1", Seq("rgUID"), Seq("Date"))
val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager) val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager)
withSQLConf("spark.sql.caseSensitive" -> "true") { withSQLConf("spark.sql.caseSensitive" -> "true") {
val ex = intercept[HyperspaceException](action.op()) val ex = intercept[HyperspaceException](action.op())
assert( assert(
ex.getMessage.contains( ex.getMessage.contains(
"Columns 'rgUID,dATE' could not be resolved from available " + "Columns 'rgUID' could not be resolved from available " +
"source columns:\n" +
"root\n " +
"|-- Date: string (nullable = true)\n " +
"|-- RGUID: string (nullable = true)\n " +
"|-- Query: string (nullable = true)\n " +
"|-- imprs: integer (nullable = true)\n " +
"|-- clicks: integer (nullable = true)"))
}
}
test("op() fails if included config is of wrong case and spark is case-sensitive") {
when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE)))
val indexConfig = IndexConfig("index1", Seq("RGUID"), Seq("dATE"))
val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager)
withSQLConf("spark.sql.caseSensitive" -> "true") {
val ex = intercept[HyperspaceException](action.op())
assert(
ex.getMessage.contains(
"Columns 'dATE' could not be resolved from available " +
"source columns:\n" + "source columns:\n" +
"root\n " + "root\n " +
"|-- Date: string (nullable = true)\n " + "|-- Date: string (nullable = true)\n " +