From 7b594d4d66edc9b4b493d35507a976e352af521d Mon Sep 17 00:00:00 2001 From: Chungmin Lee Date: Tue, 29 Jun 2021 06:51:17 +0900 Subject: [PATCH] Refactoring for an extensible Index API: Part 2 (#474) --- .../actions/RefreshIncrementalAction.scala | 21 +++++----- .../hyperspace/index/CoveringIndex.scala | 40 +++++++------------ .../microsoft/hyperspace/index/Index.scala | 29 ++++++++++++-- .../hyperspace/index/IndexUtils.scala | 23 +++++++++++ .../hyperspace/actions/CreateActionTest.scala | 25 ++++++++++-- 5 files changed, 97 insertions(+), 41 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala index 46682af1..8450b307 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala @@ -72,8 +72,14 @@ class RefreshIncrementalAction( } else { None } - updatedIndex = Some(previousIndexLogEntry.derivedDataset - .refreshIncremental(this, appendedSourceData, deletedFiles, previousIndexLogEntry.content)) + val (updatedIndex, updateMode) = + previousIndexLogEntry.derivedDataset.refreshIncremental( + this, + appendedSourceData, + deletedFiles, + previousIndexLogEntry.content) + updatedIndexOpt = Some(updatedIndex) + updateModeOpt = Some(updateMode) } /** @@ -95,7 +101,8 @@ class RefreshIncrementalAction( } } - private var updatedIndex: Option[Index] = None + private var updatedIndexOpt: Option[Index] = None + private var updateModeOpt: Option[Index.UpdateMode] = None /** * Create a log entry with all source data files, and all required index content. This contains @@ -106,15 +113,11 @@ class RefreshIncrementalAction( * @return Refreshed index log entry. */ override def logEntry: LogEntry = { - val index = updatedIndex.getOrElse(previousIndexLogEntry.derivedDataset) + val index = updatedIndexOpt.getOrElse(previousIndexLogEntry.derivedDataset) val entry = getIndexLogEntry(spark, df, previousIndexLogEntry.name, index, indexDataPath, endId) - // If there is no deleted files, there are index data files only for appended data in this - // version and we need to add the index data files of previous index version. - // Otherwise, as previous index data is rewritten in this version while excluding - // indexed rows from deleted files, all necessary index data files exist in this version. - if (deletedFiles.isEmpty) { + if (updateModeOpt.contains(Index.UpdateMode.Merge)) { // Merge new index files with old index files. val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root)) entry.copy(content = mergedContent) diff --git a/src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala index 3f616774..fc3bf4bc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.functions.{col, input_file_name} import org.apache.spark.sql.hyperspace.utils.StructTypeUtils import org.apache.spark.sql.types.StructType -import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.util.ResolverUtils import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn @@ -85,7 +84,7 @@ case class CoveringIndex( ctx: IndexerContext, appendedSourceData: Option[DataFrame], deletedSourceDataFiles: Seq[FileInfo], - indexContent: Content): CoveringIndex = { + indexContent: Content): (CoveringIndex, Index.UpdateMode) = { val updatedIndex = if (appendedSourceData.nonEmpty) { val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) = CoveringIndex.createIndexData( @@ -123,7 +122,17 @@ case class CoveringIndex( indexedColumns, writeMode) } - updatedIndex + + // If there is no deleted files, there are index data files only for appended data in this + // version and we need to add the index data files of previous index version. + // Otherwise, as previous index data is rewritten in this version while excluding + // indexed rows from deleted files, all necessary index data files exist in this version. + val updatedMode = if (deletedSourceDataFiles.isEmpty) { + Index.UpdateMode.Merge + } else { + Index.UpdateMode.Overwrite + } + (updatedIndex, updatedMode) } override def refreshFull( @@ -221,8 +230,8 @@ object CoveringIndex { includedColumns: Seq[String], hasLineageColumn: Boolean): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = { val spark = ctx.spark - val (resolvedIndexedColumns, resolvedIncludedColumns) = - resolveConfig(sourceData, indexedColumns, includedColumns) + val resolvedIndexedColumns = IndexUtils.resolveColumns(sourceData, indexedColumns) + val resolvedIncludedColumns = IndexUtils.resolveColumns(sourceData, includedColumns) val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn) val indexData = @@ -267,25 +276,4 @@ object CoveringIndex { (indexData, resolvedIndexedColumns, resolvedIncludedColumns) } - - private def resolveConfig( - df: DataFrame, - indexedColumns: Seq[String], - includedColumns: Seq[String]): (Seq[ResolvedColumn], Seq[ResolvedColumn]) = { - val spark = df.sparkSession - val plan = df.queryExecution.analyzed - val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, plan) - val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, plan) - - (resolvedIndexedColumns, resolvedIncludedColumns) match { - case (Some(indexed), Some(included)) => (indexed, included) - case _ => - val unresolvedColumns = (indexedColumns ++ includedColumns) - .map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name)))) - .collect { case (c, r) if r.isEmpty => c } - throw HyperspaceException( - s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " + - s"from available source columns:\n${df.schema.treeString}") - } - } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/Index.scala b/src/main/scala/com/microsoft/hyperspace/index/Index.scala index 52d416ac..fff588b3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/Index.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/Index.scala @@ -118,14 +118,17 @@ trait Index { * @param deletedSourceDataFiles Source data files deleted from the source * since the creation of this index * @param indexContent Unrefreshed index data files - * @return Updated index resulting from the indexing operation, or this - * index if no update is needed + * @return Pair of (updated index, update mode) where the first value is an + * updated index resulting from the indexing operation or this index + * if no update is needed, and the second value is whether the + * updated index data needs to be merged to the existing index data + * or the existing index data should be overwritten */ def refreshIncremental( ctx: IndexerContext, appendedSourceData: Option[DataFrame], deletedSourceDataFiles: Seq[FileInfo], - indexContent: Content): Index + indexContent: Content): (Index, Index.UpdateMode) /** * Indexes the source data and returns an updated index and index data. @@ -140,4 +143,24 @@ trait Index { * index if no update is needed */ def refreshFull(ctx: IndexerContext, sourceData: DataFrame): (Index, DataFrame) + + /** + * Returns true if and only if this index equals to that. + * + * Indexes only differing in their [[properties]] should be considered equal. + */ + def equals(that: Any): Boolean + + /** + * Returns the hash code for this index. + */ + def hashCode: Int +} + +object Index { + sealed trait UpdateMode + object UpdateMode { + case object Merge extends UpdateMode + case object Overwrite extends UpdateMode + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexUtils.scala index 7d12dcec..b4fe6c6d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexUtils.scala @@ -16,6 +16,12 @@ package com.microsoft.hyperspace.index +import org.apache.spark.sql.DataFrame + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.util.ResolverUtils +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn + object IndexUtils { /** @@ -32,4 +38,21 @@ object IndexUtils { .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) .toBoolean } + + def resolveColumns(df: DataFrame, columns: Seq[String]): Seq[ResolvedColumn] = { + val spark = df.sparkSession + val plan = df.queryExecution.analyzed + val resolvedColumns = ResolverUtils.resolve(spark, columns, plan) + + resolvedColumns match { + case Some(cs) => cs + case _ => + val unresolvedColumns = columns + .map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name)))) + .collect { case (c, r) if r.isEmpty => c } + throw HyperspaceException( + s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " + + s"from available source columns:\n${df.schema.treeString}") + } + } } diff --git a/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala index f5c8c6b3..46dd6458 100644 --- a/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala @@ -117,15 +117,34 @@ class CreateActionTest extends HyperspaceSuite with SQLHelper { ex.getMessage.contains(s"Another Index with name ${indexConfig.indexName} already exists")) } - test("op() fails if index config is of wrong case and spark is case-sensitive") { + test("op() fails if indexed column is of wrong case and spark is case-sensitive") { when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE))) - val indexConfig = IndexConfig("index1", Seq("rgUID"), Seq("dATE")) + val indexConfig = IndexConfig("index1", Seq("rgUID"), Seq("Date")) val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager) withSQLConf("spark.sql.caseSensitive" -> "true") { val ex = intercept[HyperspaceException](action.op()) assert( ex.getMessage.contains( - "Columns 'rgUID,dATE' could not be resolved from available " + + "Columns 'rgUID' could not be resolved from available " + + "source columns:\n" + + "root\n " + + "|-- Date: string (nullable = true)\n " + + "|-- RGUID: string (nullable = true)\n " + + "|-- Query: string (nullable = true)\n " + + "|-- imprs: integer (nullable = true)\n " + + "|-- clicks: integer (nullable = true)")) + } + } + + test("op() fails if included config is of wrong case and spark is case-sensitive") { + when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE))) + val indexConfig = IndexConfig("index1", Seq("RGUID"), Seq("dATE")) + val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager) + withSQLConf("spark.sql.caseSensitive" -> "true") { + val ex = intercept[HyperspaceException](action.op()) + assert( + ex.getMessage.contains( + "Columns 'dATE' could not be resolved from available " + "source columns:\n" + "root\n " + "|-- Date: string (nullable = true)\n " +