Fix e2e tests on managed tables and enable external table tests. (#349)

This commit is contained in:
Terry Kim 2021-02-09 10:38:13 -08:00 коммит произвёл GitHub
Родитель 88f1b43147
Коммит 097eb0fcc2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 69 добавлений и 48 удалений

Просмотреть файл

@ -37,8 +37,7 @@ libraryDependencies ++= Seq(
"org.mockito" %% "mockito-scala" % "0.4.0" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests"
)
assemblyMergeStrategy in assembly := {

Просмотреть файл

@ -281,62 +281,84 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}
}
ignore("E2E test for join query on external catalog tables") {
// TODO: Ignoring this test as it depends on hive for testing.
withTable("t1", "t2") {
// save tables on hive metastore as external tables
spark.sql(s"""
|CREATE EXTERNAL TABLE t1
|(c1 string, c3 string)
|STORED AS PARQUET
|LOCATION '$nonPartitionedDataPath'
""".stripMargin)
spark.sql(s"""
|CREATE EXTERNAL TABLE t2
|(c3 string, c4 int)
|STORED AS PARQUET
|LOCATION '$nonPartitionedDataPath'
""".stripMargin)
test("E2E test for join query on external catalog tables") {
Seq(true, false).foreach { useDDL =>
withIndex("leftIndex", "rightIndex") {
withTable("t1", "t2") {
if (useDDL) {
spark.sql("CREATE TABLE t1 (c1 string, c3 string) USING PARQUET " +
s"LOCATION '$nonPartitionedDataPath'")
spark.sql("CREATE TABLE t2 (c3 string, c4 int) USING PARQUET " +
s"LOCATION '$nonPartitionedDataPath'")
} else {
val table1Location = testDir + "tables/t1"
val table2Location = testDir + "tables/t2"
val originalDf = spark.read.parquet(nonPartitionedDataPath)
originalDf.select("c1", "c3").write.option("path", table1Location).saveAsTable("t1")
originalDf.select("c3", "c4").write.option("path", table2Location).saveAsTable("t2")
}
val leftDf = spark.table("t1")
val rightDf = spark.table("t2")
assert(spark.catalog.getTable("t1").tableType == "EXTERNAL")
assert(spark.catalog.getTable("t2").tableType == "EXTERNAL")
val leftDfIndexConfig = IndexConfig("leftIndex", Seq("c3"), Seq("c1"))
val rightDfIndexConfig = IndexConfig("rightIndex", Seq("c3"), Seq("c4"))
val leftDf = spark.table("t1")
val rightDf = spark.table("t2")
val leftDfIndexConfig = IndexConfig("leftIndex", Seq("c3"), Seq("c1"))
val rightDfIndexConfig = IndexConfig("rightIndex", Seq("c3"), Seq("c4"))
hyperspace.createIndex(leftDf, leftDfIndexConfig)
hyperspace.createIndex(rightDf, rightDfIndexConfig)
hyperspace.createIndex(leftDf, leftDfIndexConfig)
hyperspace.createIndex(rightDf, rightDfIndexConfig)
// Test whether indexes work with catalog tables or not.
def query(): DataFrame =
spark.sql("SELECT t1.c1, t2.c4 FROM t1, t2 WHERE t1.c3 = t2.c3")
// Test whether indexes work with catalog tables or not
def query(): DataFrame = spark.sql("SELECT t1.c1, t2.c4 FROM t1, t2 WHERE t1.c3 = t2.c3")
verifyIndexUsage(
query,
getIndexFilesPath(leftDfIndexConfig.indexName) ++
getIndexFilesPath(rightDfIndexConfig.indexName))
verifyIndexUsage(
query,
getIndexFilesPath(leftDfIndexConfig.indexName) ++
getIndexFilesPath(rightDfIndexConfig.indexName))
}
}
}
}
test("E2E test for join query on managed catalog tables") {
withTable("t1", "t2") {
val table1Location = testDir + "tables/t1"
val table2Location = testDir + "tables/t2"
val originalDf = spark.read.parquet(nonPartitionedDataPath)
originalDf.select("c1", "c3").write.option("path", table1Location).saveAsTable("t1")
originalDf.select("c3", "c4").write.option("path", table2Location).saveAsTable("t2")
Seq(true, false).foreach { useDDL =>
withIndex("leftIndex", "rightIndex") {
withTable("t1", "t2") {
if (useDDL) {
withView("tv1", "tv2") {
val originalDf = spark.read.parquet(nonPartitionedDataPath)
originalDf.select("c1", "c3").createOrReplaceTempView("tv1")
originalDf.select("c3", "c4").createOrReplaceTempView("tv2")
spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT * FROM tv1")
spark.sql("CREATE TABLE t2 USING PARQUET AS SELECT * FROM tv2")
}
} else {
val originalDf = spark.read.parquet(nonPartitionedDataPath)
originalDf.select("c1", "c3").write.saveAsTable("t1")
originalDf.select("c3", "c4").write.saveAsTable("t2")
}
val leftDf = spark.table("t1")
val rightDf = spark.table("t2")
val leftDfIndexConfig = IndexConfig("leftIndex", Seq("c3"), Seq("c1"))
val rightDfIndexConfig = IndexConfig("rightIndex", Seq("c3"), Seq("c4"))
hyperspace.createIndex(leftDf, leftDfIndexConfig)
hyperspace.createIndex(rightDf, rightDfIndexConfig)
assert(spark.catalog.getTable("t1").tableType == "MANAGED")
assert(spark.catalog.getTable("t2").tableType == "MANAGED")
// Test whether indexes work with catalog tables or not
def query(): DataFrame = spark.sql("SELECT t1.c1, t2.c4 FROM t1, t2 WHERE t1.c3 = t2.c3")
verifyIndexUsage(
query,
getIndexFilesPath(leftDfIndexConfig.indexName) ++
getIndexFilesPath(rightDfIndexConfig.indexName))
val leftDf = spark.table("t1")
val rightDf = spark.table("t2")
val leftDfIndexConfig = IndexConfig("leftIndex", Seq("c3"), Seq("c1"))
val rightDfIndexConfig = IndexConfig("rightIndex", Seq("c3"), Seq("c4"))
hyperspace.createIndex(leftDf, leftDfIndexConfig)
hyperspace.createIndex(rightDf, rightDfIndexConfig)
// Test whether indexes work with catalog tables or not.
def query(): DataFrame =
spark.sql("SELECT t1.c1, t2.c4 FROM t1, t2 WHERE t1.c3 = t2.c3")
verifyIndexUsage(
query,
getIndexFilesPath(leftDfIndexConfig.indexName) ++
getIndexFilesPath(rightDfIndexConfig.indexName))
}
}
}
}