Add dev config for nested column support (#466)
This commit is contained in:
Родитель
7cb714c4ac
Коммит
1fe104f162
|
@ -56,12 +56,22 @@ class CreateAction(
|
|||
s"Source plan: ${df.queryExecution.sparkPlan}")
|
||||
}
|
||||
|
||||
// schema validity checks
|
||||
if (!isValidIndexSchema(indexConfig, df)) {
|
||||
// Schema validity checks
|
||||
|
||||
// Resolve index config columns from available column names present in the dataframe.
|
||||
val resolvedColumns = ResolverUtils
|
||||
.resolve(spark, indexConfig.referencedColumns, df.queryExecution.analyzed)
|
||||
if (resolvedColumns.isEmpty) {
|
||||
throw HyperspaceException("Index config is not applicable to dataframe schema.")
|
||||
}
|
||||
|
||||
// valid state check
|
||||
// TODO: Temporarily block creating indexes using nested columns until it's fully supported.
|
||||
if (!(HyperspaceConf.nestedColumnEnabled(spark) || resolvedColumns.get.forall(
|
||||
!_.isNested))) {
|
||||
throw HyperspaceException("Hyperspace does not support nested columns yet.")
|
||||
}
|
||||
|
||||
// Valid state check
|
||||
logManager.getLatestLog() match {
|
||||
case None => // valid
|
||||
case Some(entry) if entry.state.equals(DOESNOTEXIST) => // valid
|
||||
|
@ -71,16 +81,6 @@ class CreateAction(
|
|||
}
|
||||
}
|
||||
|
||||
private def isValidIndexSchema(config: IndexConfigTrait, dataFrame: DataFrame): Boolean = {
|
||||
// Resolve index config columns from available column names present in the dataframe.
|
||||
ResolverUtils
|
||||
.resolve(
|
||||
spark,
|
||||
config.referencedColumns,
|
||||
dataFrame.queryExecution.analyzed)
|
||||
.isDefined
|
||||
}
|
||||
|
||||
// TODO: The following should be protected, but RefreshAction is calling CreateAction.op().
|
||||
// This needs to be refactored to mark this as protected.
|
||||
final override def op(): Unit = index.write(this, indexData)
|
||||
|
|
|
@ -52,6 +52,10 @@ object IndexConstants {
|
|||
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
|
||||
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"
|
||||
|
||||
// TODO: Remove dev config when nested column is fully supported.
|
||||
val DEV_NESTED_COLUMN_ENABLED = "spark.hyperspace.dev.index.nestedColumn.enabled"
|
||||
val DEV_NESTED_COLUMN_ENABLED_DEFAULT = "false"
|
||||
|
||||
// Identifier injected to HadoopFsRelation as an option if an index is applied.
|
||||
// Currently, the identifier is added to options field of HadoopFsRelation.
|
||||
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.
|
||||
|
|
|
@ -97,6 +97,14 @@ object HyperspaceConf {
|
|||
"avro,csv,json,orc,parquet,text")
|
||||
}
|
||||
|
||||
def nestedColumnEnabled(spark: SparkSession): Boolean = {
|
||||
spark.conf
|
||||
.get(
|
||||
IndexConstants.DEV_NESTED_COLUMN_ENABLED,
|
||||
IndexConstants.DEV_NESTED_COLUMN_ENABLED_DEFAULT)
|
||||
.toBoolean
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the config value whose key matches the first key given multiple keys. If no keys are
|
||||
* matched, the given default value is returned.
|
||||
|
|
|
@ -43,6 +43,7 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
|
|||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
|
||||
spark.conf.set(IndexConstants.DEV_NESTED_COLUMN_ENABLED, "true")
|
||||
hyperspace = new Hyperspace(spark)
|
||||
FileUtils.delete(new Path(testDir), isRecursive = true)
|
||||
|
||||
|
@ -57,6 +58,7 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
|
|||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
spark.conf.unset(IndexConstants.DEV_NESTED_COLUMN_ENABLED)
|
||||
FileUtils.delete(new Path(testDir), isRecursive = true)
|
||||
super.afterAll()
|
||||
}
|
||||
|
@ -98,8 +100,8 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
|
|||
"Indexed columns with wrong case are stored in metadata")
|
||||
assert(
|
||||
indexes.head
|
||||
.getAs[Map[String, String]]("additionalStats")("includedColumns") ==
|
||||
"__hs_nested.nested.leaf.cnt",
|
||||
.getAs[Map[String, String]]("additionalStats")(
|
||||
"includedColumns") == "__hs_nested.nested.leaf.cnt",
|
||||
"Included columns with wrong case are stored in metadata")
|
||||
}
|
||||
|
||||
|
@ -139,7 +141,10 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
|
|||
val dfB = nonPartitionedDataDF.as("B")
|
||||
val dfJoin = dfA
|
||||
.join(dfB, dfA("Query") === dfB("Query"))
|
||||
.select(dfA("RGUID"), dfA("Query"), dfA("nested.leaf.cnt"))
|
||||
.select(
|
||||
dfA("RGUID"),
|
||||
dfA("Query"),
|
||||
dfA("nested.leaf.cnt"))
|
||||
val exception = intercept[HyperspaceException] {
|
||||
hyperspace.createIndex(dfJoin, indexConfig1)
|
||||
}
|
||||
|
@ -204,6 +209,14 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
|
|||
}
|
||||
}
|
||||
|
||||
test("Disable index creation with nested columns until fully supported.") {
|
||||
spark.conf.set(IndexConstants.DEV_NESTED_COLUMN_ENABLED, "false")
|
||||
val exception = intercept[HyperspaceException] {
|
||||
hyperspace.createIndex(nonPartitionedDataDF, indexConfig1)
|
||||
}
|
||||
assert(exception.getMessage.contains("Hyperspace does not support nested columns yet."))
|
||||
}
|
||||
|
||||
test("Verify index creation with StructType column.") {
|
||||
val indexConfig = IndexConfig("index1", Seq("nested"), Seq("clicks"))
|
||||
val indexConfig2 = IndexConfig("index2", Seq("clicks"), Seq("nested"))
|
||||
|
@ -241,5 +254,4 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
|
|||
.contains("Hyperspace(Type: CI, Name: index2"))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -40,11 +40,13 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite {
|
|||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
spark.conf.set(IndexConstants.DEV_NESTED_COLUMN_ENABLED, "true")
|
||||
hyperspace = new Hyperspace(spark)
|
||||
FileUtils.delete(new Path(testDir))
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
spark.conf.unset(IndexConstants.DEV_NESTED_COLUMN_ENABLED)
|
||||
FileUtils.delete(new Path(testDir))
|
||||
super.afterAll()
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче