From 1fe104f162bd2e0e0b1311fecaab0d8461f1f0ca Mon Sep 17 00:00:00 2001 From: EJ Song <51077614+sezruby@users.noreply.github.com> Date: Tue, 22 Jun 2021 17:01:51 -0700 Subject: [PATCH] Add dev config for nested column support (#466) --- .../hyperspace/actions/CreateAction.scala | 26 +++++++++---------- .../hyperspace/index/IndexConstants.scala | 4 +++ .../hyperspace/util/HyperspaceConf.scala | 8 ++++++ .../index/CreateIndexNestedTest.scala | 20 +++++++++++--- .../index/RefreshIndexNestedTest.scala | 2 ++ 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala index 4c5e7c16..95d8f8c3 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala @@ -56,12 +56,22 @@ class CreateAction( s"Source plan: ${df.queryExecution.sparkPlan}") } - // schema validity checks - if (!isValidIndexSchema(indexConfig, df)) { + // Schema validity checks + + // Resolve index config columns from available column names present in the dataframe. + val resolvedColumns = ResolverUtils + .resolve(spark, indexConfig.referencedColumns, df.queryExecution.analyzed) + if (resolvedColumns.isEmpty) { throw HyperspaceException("Index config is not applicable to dataframe schema.") } - // valid state check + // TODO: Temporarily block creating indexes using nested columns until it's fully supported. + if (!(HyperspaceConf.nestedColumnEnabled(spark) || resolvedColumns.get.forall( + !_.isNested))) { + throw HyperspaceException("Hyperspace does not support nested columns yet.") + } + + // Valid state check logManager.getLatestLog() match { case None => // valid case Some(entry) if entry.state.equals(DOESNOTEXIST) => // valid @@ -71,16 +81,6 @@ class CreateAction( } } - private def isValidIndexSchema(config: IndexConfigTrait, dataFrame: DataFrame): Boolean = { - // Resolve index config columns from available column names present in the dataframe. - ResolverUtils - .resolve( - spark, - config.referencedColumns, - dataFrame.queryExecution.analyzed) - .isDefined - } - // TODO: The following should be protected, but RefreshAction is calling CreateAction.op(). // This needs to be refactored to mark this as protected. final override def op(): Unit = index.write(this, indexData) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 5e32455d..ceee5a80 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -52,6 +52,10 @@ object IndexConstants { val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" + // TODO: Remove dev config when nested column is fully supported. + val DEV_NESTED_COLUMN_ENABLED = "spark.hyperspace.dev.index.nestedColumn.enabled" + val DEV_NESTED_COLUMN_ENABLED_DEFAULT = "false" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index f587544b..291a101f 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -97,6 +97,14 @@ object HyperspaceConf { "avro,csv,json,orc,parquet,text") } + def nestedColumnEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.DEV_NESTED_COLUMN_ENABLED, + IndexConstants.DEV_NESTED_COLUMN_ENABLED_DEFAULT) + .toBoolean + } + /** * Returns the config value whose key matches the first key given multiple keys. If no keys are * matched, the given default value is returned. diff --git a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala index 986b0fa0..8fc4f95b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala @@ -43,6 +43,7 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(IndexConstants.DEV_NESTED_COLUMN_ENABLED, "true") hyperspace = new Hyperspace(spark) FileUtils.delete(new Path(testDir), isRecursive = true) @@ -57,6 +58,7 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { } override def afterAll(): Unit = { + spark.conf.unset(IndexConstants.DEV_NESTED_COLUMN_ENABLED) FileUtils.delete(new Path(testDir), isRecursive = true) super.afterAll() } @@ -98,8 +100,8 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { "Indexed columns with wrong case are stored in metadata") assert( indexes.head - .getAs[Map[String, String]]("additionalStats")("includedColumns") == - "__hs_nested.nested.leaf.cnt", + .getAs[Map[String, String]]("additionalStats")( + "includedColumns") == "__hs_nested.nested.leaf.cnt", "Included columns with wrong case are stored in metadata") } @@ -139,7 +141,10 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { val dfB = nonPartitionedDataDF.as("B") val dfJoin = dfA .join(dfB, dfA("Query") === dfB("Query")) - .select(dfA("RGUID"), dfA("Query"), dfA("nested.leaf.cnt")) + .select( + dfA("RGUID"), + dfA("Query"), + dfA("nested.leaf.cnt")) val exception = intercept[HyperspaceException] { hyperspace.createIndex(dfJoin, indexConfig1) } @@ -204,6 +209,14 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { } } + test("Disable index creation with nested columns until fully supported.") { + spark.conf.set(IndexConstants.DEV_NESTED_COLUMN_ENABLED, "false") + val exception = intercept[HyperspaceException] { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig1) + } + assert(exception.getMessage.contains("Hyperspace does not support nested columns yet.")) + } + test("Verify index creation with StructType column.") { val indexConfig = IndexConfig("index1", Seq("nested"), Seq("clicks")) val indexConfig2 = IndexConfig("index2", Seq("clicks"), Seq("nested")) @@ -241,5 +254,4 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { .contains("Hyperspace(Type: CI, Name: index2")) } } - } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala index 48447e93..60546aac 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala @@ -40,11 +40,13 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(IndexConstants.DEV_NESTED_COLUMN_ENABLED, "true") hyperspace = new Hyperspace(spark) FileUtils.delete(new Path(testDir)) } override def afterAll(): Unit = { + spark.conf.unset(IndexConstants.DEV_NESTED_COLUMN_ENABLED) FileUtils.delete(new Path(testDir)) super.afterAll() }