This commit is contained in:
Chungmin Lee 2021-05-08 05:15:38 +09:00 коммит произвёл GitHub
Родитель ba4e71273e
Коммит 5aee0c8fb6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
23 изменённых файлов: 428 добавлений и 59 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -46,3 +46,4 @@ build.log
# for IntelliJ
/spark2.4/src/*
/spark3.0/src/*
/spark3.1/src/*

Просмотреть файл

@ -81,6 +81,29 @@ jobs:
artifactName: 'hyperspace-core_spark3.0'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark3.0/'
- job: Build_Spark3_1_2_12
displayName: 'Build sources and run unit tests for Spark 3.1 / Scala 2.12'
pool:
vmImage: 'ubuntu-18.04'
steps:
- script: sbt ++2.12.8 "project spark3_1" clean update compile test
displayName: 'Running $sbt clean & update & compile & test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: sbt ++2.12.8 "project spark3_1" package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
inputs:
sourceFolder: '$(Build.SourcesDirectory)/target/'
contents: '**/*.jar'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark3.1/'
- task: PublishBuildArtifacts@1
displayName: 'Publish Hyperspace artifacts'
inputs:
artifactName: 'hyperspace-core_spark3.1'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core_spark3.1/'
- job: PythonTest
displayName: 'Run Python tests'
pool:

Просмотреть файл

@ -29,7 +29,7 @@ ThisBuild / javaOptions += "-Xmx1024m"
// The root project is a virtual project aggregating the other projects.
// It cannot compile, as necessary utility code is only in those projects.
lazy val root = (project in file("."))
.aggregate(spark2_4, spark3_0)
.aggregate(spark2_4, spark3_0, spark3_1)
.settings(
compile / skip := true,
publish / skip := true,
@ -55,6 +55,15 @@ lazy val spark3_0 = (project in file("spark3.0"))
inConfig(Compile)(addSparkVersionSpecificSourceDirectories),
inConfig(Test)(addSparkVersionSpecificSourceDirectories))
lazy val spark3_1 = (project in file("spark3.1"))
.enablePlugins(BuildInfoPlugin)
.settings(
commonSettings,
sparkVersion := Version(3, 1, 1),
crossScalaVersions := List(scala212), // Spark 3 doesn't support Scala 2.11
inConfig(Compile)(addSparkVersionSpecificSourceDirectories),
inConfig(Test)(addSparkVersionSpecificSourceDirectories))
lazy val sparkVersion = settingKey[Version]("sparkVersion")
// In addition to the usual scala/ and scala-<version>/ source directories,

Просмотреть файл

@ -1,3 +1,3 @@
[alias]
replace-symlinks = "!__git_replace_symlinks() {\nif [ -n \"$WINDIR\" ]; then\n symlinks=`git ls-files -s | awk '/120000/{print $4}'`\n cwd=`pwd`\n for s in $symlinks; do\n if [ -f $s ]; then\n target=`cat $s`\n dir=`dirname $s`\n name=`basename $s`\n cd $dir\n cmd <<< \"mklink /J \\\"$name.link\\\" \\\"$target\\\"\" > /dev/null\n if [ ! -e $name.link ]; then\n echo \"Failed to replace \\\"$s\\\" with a junction\"\n exit 1\n fi\n rm $name\n mv $name.link $name\n git update-index --assume-unchanged $name\n echo \"Replaced symlink $s with a junction\"\n cd \"$cwd\"\n fi\n done\nfi\n}\n__git_replace_symlinks"
restore-symlinks = "!__git_restore_symlinks() {\nsymlinks=`git ls-files -s | awk '/120000/{print $4}'`\nfor s in $symlinks; do\n if [ -h $s ]; then\n git update-index --no-assume-unchanged $s\n rm $s\n git checkout -- $s\n echo \"Restored symlink $s\"\n fi\ndone\n}\n__git_restore_symlinks"
replace-symlinks = "!__git_replace_symlinks() {\nif [ -n \"$WINDIR\" ]; then\n symlinks=`git ls-files -s | awk '/120000/{print $4}'`\n cwd=`pwd`\n for s in $symlinks; do\n if [ -f $s ]; then\n target=`cat $s`\n dir=`dirname $s`\n name=`basename $s`\n cd $dir\n if [ -d $target ]; then\n cmd <<< \"mklink /J \\\"$name.link\\\" \\\"$target\\\"\" > /dev/null\n else\n cmd <<< \"mklink /H \\\"$name.link\\\" \\\"$target\\\"\" > /dev/null\n fi\n if [ ! -e $name.link ]; then\n echo \"Failed to replace \\\"$s\\\" with a junction/hardlink\"\n exit 1\n fi\n rm $name\n mv $name.link $name\n git update-index --assume-unchanged $name\n echo \"Replaced symlink $s with a junction/hardlink\"\n cd \"$cwd\"\n fi\n done\nfi\n}\n__git_replace_symlinks"
restore-symlinks = "!__git_restore_symlinks() {\nsymlinks=`git ls-files -s | awk '/120000/{print $4}'`\nfor s in $symlinks; do\n git update-index --no-assume-unchanged $s\n rm $s\n git checkout -- $s\n echo \"Restored symlink $s\"\ndone\n}\n__git_restore_symlinks"

Просмотреть файл

@ -24,11 +24,14 @@ object Dependencies {
"org.apache.spark" %% "spark-core" % sv % "provided" withSources (),
"org.apache.spark" %% "spark-sql" % sv % "provided" withSources (),
// Test dependencies
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.mockito" %% "mockito-scala" % "0.4.0" % "test",
"org.apache.spark" %% "spark-catalyst" % sv % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sv % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sv % "test" classifier "tests") ++
(if (sparkVersion < Version(3, 1, 0))
Seq("org.scalatest" %% "scalatest" % "3.0.5" % "test")
else
Seq("org.scalatest" %% "scalatest" % "3.2.7" % "test")) ++
(if (sparkVersion < Version(3, 0, 0))
Seq(
"io.delta" %% "delta-core" % "0.6.1" % "provided" withSources (),

1
spark3.1/src Symbolic link
Просмотреть файл

@ -0,0 +1 @@
../src

Просмотреть файл

@ -0,0 +1,26 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.shim
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionByExpression}
object RepartitionByExpressionWithOptionalNumPartitions {
def unapply(r: RepartitionByExpression): Option[(Seq[Expression], LogicalPlan, Option[Int])] = {
Some(r.partitionExpressions, r.child, Some(r.numPartitions))
}
}

Просмотреть файл

@ -0,0 +1 @@
../../../../../scala-spark2/com/microsoft/hyperspace/shim/RepartitionByExpression.scala

Просмотреть файл

@ -0,0 +1,47 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.index.sources.iceberg
import org.apache.iceberg.Table
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.util.hyperspace.Utils
object IcebergShims {
def isIcebergRelation(plan: LogicalPlan): Boolean = plan match {
case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
case DataSourceV2ScanRelation(DataSourceV2Relation(_: SparkTable, _, _, _, _), _, _) => true
case _ => false
}
def loadIcebergTable(spark: SparkSession, plan: LogicalPlan): (Table, Option[Long]) =
plan match {
case r @ DataSourceV2Relation(table: SparkTable, _, _, _, _) =>
(table.table, Option(r.options.get("snapshot-id")).map(_.toLong))
case DataSourceV2ScanRelation(
r @ DataSourceV2Relation(table: SparkTable, _, _, _, _),
_,
_) =>
(table.table, Option(r.options.get("snapshot-id")).map(_.toLong))
case _ =>
throw new IllegalArgumentException(s"Unexpected plan type: ${plan.getClass.toString}")
}
}

Просмотреть файл

@ -0,0 +1,23 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace
import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
package object shim {
val RepartitionByExpressionWithOptionalNumPartitions = RepartitionByExpression
}

Просмотреть файл

@ -28,6 +28,7 @@ import org.apache.spark.sql.hyperspace.utils.logicalPlanToDataFrame
import com.microsoft.hyperspace.{HyperspaceException, Implicits}
import com.microsoft.hyperspace.index.IndexConstants
import com.microsoft.hyperspace.shim.ExtractFileSourceScanExecRelation
/**
* Provides helper methods for explain API.
@ -138,14 +139,8 @@ object PlanAnalyzer {
private def getPaths(sparkPlan: SparkPlan): Seq[String] = {
val usedPaths = new ListBuffer[String]
sparkPlan.foreach {
case FileSourceScanExec(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_,
_,
_,
_,
_,
_) =>
case ExtractFileSourceScanExecRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _)) =>
usedPaths += location.rootPaths.head.getParent.toString
case other =>
other.subqueries.foreach { subQuery =>

Просмотреть файл

@ -0,0 +1,33 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.shim
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
object ExtractFileSourceScanExecRelation {
def unapply(scan: FileSourceScanExec): Option[HadoopFsRelation] = {
Some(scan.relation)
}
}
object ExtractFileSourceScanExecFilters {
def unapply(scan: FileSourceScanExec): Option[(Seq[Expression], Seq[Expression])] = {
Some((scan.partitionFilters, scan.dataFilters))
}
}

Просмотреть файл

@ -0,0 +1,25 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.shim
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Union}
object ExtractUnionChildren {
def unapply(union: Union): Option[Seq[LogicalPlan]] = {
Some(union.children)
}
}

Просмотреть файл

@ -0,0 +1,22 @@
$begin=============================================================
Plan with indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col2#) AND (Col2# = 2))
+- ColumnarToRow
$highlightBegin+- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 2)], Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct<Col2:int,Col1:string>$highlightEnd
=============================================================
Plan without indexes:
=============================================================
Project [Col1#]
+- Filter (isnotnull(Col2#) AND (Col2# = 2))
+- ColumnarToRow
$highlightBegin+- FileScan parquet [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 2)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct<Col1:string,Col2:int>$highlightEnd
=============================================================
Indexes used:
=============================================================
filterIndex:$filterIndexPath
$end

Просмотреть файл

@ -0,0 +1,50 @@
=============================================================
Plan with indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ColumnarToRow---->
<----: +- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) ColumnarToRow---->
<----+- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
=============================================================
Plan without indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(2) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----: +- Exchange hashpartitioning(Col1#, 5), ENSURE_REQUIREMENTS, [id=#]---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ColumnarToRow---->
<----: +- FileScan parquet [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>---->
<----+- *(4) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----+- ReusedExchange [Col1#, Col2#], Exchange hashpartitioning(Col1#, 5), ENSURE_REQUIREMENTS, [id=#]---->
=============================================================
Indexes used:
=============================================================
joinIndex:$joinIndexPath
=============================================================
Physical operator stats:
=============================================================
+----------------------------------------------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+----------------------------------------------------------+-------------------+------------------+----------+
| *ColumnarToRow| 1| 2| 1|
| *Filter| 1| 2| 1|
| *InputAdapter| 5| 4| -1|
| *ReusedExchange| 1| 0| -1|
|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|
| *Scan parquet| 1| 0| -1|
| *ShuffleExchange| 1| 0| -1|
| *Sort| 2| 0| -2|
| *WholeStageCodegen (3)| 0| 1| 1|
| *WholeStageCodegen (4)| 1| 0| -1|
| *WholeStageCodegen (5)| 1| 0| -1|
| SortMergeJoin| 1| 1| 0|
| WholeStageCodegen (1)| 1| 1| 0|
| WholeStageCodegen (2)| 1| 1| 0|
+----------------------------------------------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,45 @@
=============================================================
Plan with indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Filter isnotnull(Col1#)---->
<----: +- *(1) ColumnarToRow---->
<----: +- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- *(2) ColumnarToRow---->
<----+- FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $joinIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string,Col2:int>, SelectedBucketsCount: 200 out of 200---->
=============================================================
Plan without indexes:
=============================================================
SortMergeJoin [Col1#], [Col1#], Inner
<----:- *(1) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----: +- *(1) Filter isnotnull(Col1#)---->
<----: +- BatchScan[Col1#, Col2#] $icebergPath [filters=Col1 IS NOT NULL]---->
<----+- *(2) Sort [Col1# ASC NULLS FIRST], false, 0---->
<----+- *(2) Filter isnotnull(Col1#)---->
<----+- BatchScan[Col1#, Col2#] $icebergPath [filters=Col1 IS NOT NULL]---->
=============================================================
Indexes used:
=============================================================
joinIndex:$joinIndexPath
=============================================================
Physical operator stats:
=============================================================
+----------------------------------------------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+----------------------------------------------------------+-------------------+------------------+----------+
| *BatchScan| 2| 0| -2|
| *ColumnarToRow| 0| 2| 2|
|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|
| *Sort| 2| 0| -2|
| Filter| 2| 2| 0|
| InputAdapter| 4| 4| 0|
| SortMergeJoin| 1| 1| 0|
| WholeStageCodegen (1)| 1| 1| 0|
| WholeStageCodegen (2)| 1| 1| 0|
| WholeStageCodegen (3)| 1| 1| 0|
+----------------------------------------------------------+-------------------+------------------+----------+

Просмотреть файл

@ -0,0 +1,42 @@
=============================================================
Plan with indexes:
=============================================================
Filter (isnotnull(Col1#) AND (Col1# = Subquery scalar-subquery#, [id=#]))
: +- Subquery scalar-subquery#, [id=#]
: +- *(1) Project [Col1#]
: +- *(1) Filter (isnotnull(Col2#) AND (Col2# = 1))
: +- *(1) ColumnarToRow
<----: +- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 1)], Format: Parquet, Location: $filterIndexLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col2:int,Col1:string>---->
+- ColumnarToRow
+- FileScan parquet [Col1#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>
=============================================================
Plan without indexes:
=============================================================
Filter (isnotnull(Col1#) AND (Col1# = Subquery scalar-subquery#, [id=#]))
: +- Subquery scalar-subquery#, [id=#]
: +- *(1) Project [Col1#]
: +- *(1) Filter (isnotnull(Col2#) AND (Col2# = 1))
: +- *(1) ColumnarToRow
<----: +- FileScan parquet [Col1#,Col2#] Batched: true, DataFilters: [isnotnull(Col2#), (Col2# = 1)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct<Col1:string,Col2:int>---->
+- ColumnarToRow
+- FileScan parquet [Col1#] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: $sampleParquetDataLocation, PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct<Col1:string>
=============================================================
Indexes used:
=============================================================
filterIndex:$filterIndexPath
=============================================================
Physical operator stats:
=============================================================
+---------------------+-------------------+------------------+----------+
| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+---------------------+-------------------+------------------+----------+
| ColumnarToRow| 1| 1| 0|
| Filter| 1| 1| 0|
| InputAdapter| 1| 1| 0|
| Scan parquet| 1| 1| 0|
|WholeStageCodegen (1)| 1| 1| 0|
+---------------------+-------------------+------------------+----------+

Просмотреть файл

@ -150,31 +150,35 @@ class CreateActionTest extends HyperspaceSuite with SQLHelper {
.write
.parquet(path2)
Seq(
(spark.read.format("parquet").load(path1), Seq(path1), 2),
(spark.read.format("parquet").load(path1, path2), Seq(path1, path2), 5),
(spark.read.parquet(path1), Seq(path1), 2),
(spark.read.parquet(path1, path2), Seq(path1, path2), 5),
(spark.read.parquet(path1, path1, path1), Seq(path1, path1, path1), 6),
(spark.read.format("parquet").option("path", path1).load(path1), Seq(path1), 2),
(spark.read.format("parquet").option("path", path1).load(path2), Seq(path2), 3),
(spark.read.option("path", path1).parquet(path1), Seq(path1, path1), 4),
(spark.read.option("path", path1).parquet(path2), Seq(path1, path2), 5),
(
spark.read.format("parquet").option("path", path1).load(path1, path2),
Seq(path1, path1, path2),
7),
(spark.read.option("path", path1).parquet(path1, path2), Seq(path1, path1, path2), 7))
.foreach {
case (df, expectedPaths, expectedCount) =>
val relation = CreateActionBaseWrapper.getSourceRelations(df).head
def normalize(path: String): String = {
new Path(path).toUri.getPath
}
assert(relation.rootPaths.map(normalize) == expectedPaths.map(normalize))
assert(df.count == expectedCount)
assert(!relation.options.isDefinedAt("path"))
}
// For Spark 3.1 - pathOptionBehavior must be enabled manually
// for inconsistent use of option("path") and load(paths...)
withSQLConf("spark.sql.legacy.pathOptionBehavior.enabled" -> "true") {
Seq(
(spark.read.format("parquet").load(path1), Seq(path1), 2),
(spark.read.format("parquet").load(path1, path2), Seq(path1, path2), 5),
(spark.read.parquet(path1), Seq(path1), 2),
(spark.read.parquet(path1, path2), Seq(path1, path2), 5),
(spark.read.parquet(path1, path1, path1), Seq(path1, path1, path1), 6),
(spark.read.format("parquet").option("path", path1).load(path1), Seq(path1), 2),
(spark.read.format("parquet").option("path", path1).load(path2), Seq(path2), 3),
(spark.read.option("path", path1).parquet(path1), Seq(path1, path1), 4),
(spark.read.option("path", path1).parquet(path2), Seq(path1, path2), 5),
(
spark.read.format("parquet").option("path", path1).load(path1, path2),
Seq(path1, path1, path2),
7),
(spark.read.option("path", path1).parquet(path1, path2), Seq(path1, path1, path2), 7))
.foreach {
case (df, expectedPaths, expectedCount) =>
val relation = CreateActionBaseWrapper.getSourceRelations(df).head
def normalize(path: String): String = {
new Path(path).toUri.getPath
}
assert(relation.rootPaths.map(normalize) == expectedPaths.map(normalize))
assert(df.count == expectedCount)
assert(!relation.options.isDefinedAt("path"))
}
}
}
}
}

Просмотреть файл

@ -135,12 +135,14 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper {
}
test("Index creation fails since the dataframe has a join node.") {
val dfJoin = nonPartitionedDataDF
.join(nonPartitionedDataDF, nonPartitionedDataDF("Query") === nonPartitionedDataDF("Query"))
val dfA = nonPartitionedDataDF.as("A")
val dfB = nonPartitionedDataDF.as("B")
val dfJoin = dfA
.join(dfB, dfA("Query") === dfB("Query"))
.select(
nonPartitionedDataDF("RGUID"),
nonPartitionedDataDF("Query"),
nonPartitionedDataDF("nested.leaf.cnt"))
dfA("RGUID"),
dfA("Query"),
dfA("nested.leaf.cnt"))
val exception = intercept[HyperspaceException] {
hyperspace.createIndex(dfJoin, indexConfig1)
}

Просмотреть файл

@ -141,12 +141,14 @@ class CreateIndexTest extends HyperspaceSuite with SQLHelper {
}
test("Index creation fails since the dataframe has a join node.") {
val dfJoin = nonPartitionedDataDF
.join(nonPartitionedDataDF, nonPartitionedDataDF("Query") === nonPartitionedDataDF("Query"))
val dfA = nonPartitionedDataDF.as("A")
val dfB = nonPartitionedDataDF.as("B")
val dfJoin = dfA
.join(dfB, dfA("Query") === dfB("Query"))
.select(
nonPartitionedDataDF("RGUID"),
nonPartitionedDataDF("Query"),
nonPartitionedDataDF("imprs"))
dfA("RGUID"),
dfA("Query"),
dfA("imprs"))
val exception = intercept[HyperspaceException] {
hyperspace.createIndex(dfJoin, indexConfig1)
}

Просмотреть файл

@ -30,6 +30,7 @@ import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig}
import com.microsoft.hyperspace.TestUtils.logManager
import com.microsoft.hyperspace.index.execution.BucketUnionExec
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
import com.microsoft.hyperspace.shim.{ExtractFileSourceScanExecFilters, ExtractUnionChildren, RepartitionByExpressionWithOptionalNumPartitions}
import com.microsoft.hyperspace.util.FileUtils
trait HybridScanSuite extends QueryTest with HyperspaceSuite {
@ -178,7 +179,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
val execPlan = spark.sessionState.executePlan(plan).executedPlan
val execNodes = execPlan collect {
case p @ FileSourceScanExec(_, _, _, _, _, dataFilters, _) =>
case p @ ExtractFileSourceScanExecFilters(_, dataFilters) =>
// Check deleted files.
assert(deletedFiles.forall(dataFilters.toString.contains))
p
@ -213,10 +214,10 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
bucketSpec.bucketColumnNames.head === "clicks")
val childNodes = children.collect {
case r @ RepartitionByExpression(
case r @ RepartitionByExpressionWithOptionalNumPartitions(
attrs,
Project(_, Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))),
numBucket) =>
Some(numBucket)) =>
assert(attrs.size === 1)
assert(attrs.head.asInstanceOf[Attribute].name.contains("clicks"))
@ -250,10 +251,10 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
bucketSpec.bucketColumnNames.head === "clicks")
val childNodes = children.collect {
case r @ RepartitionByExpression(
case r @ RepartitionByExpressionWithOptionalNumPartitions(
attrs,
Project(_, Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))),
numBucket) =>
Some(numBucket)) =>
assert(attrs.size === 1)
assert(attrs.head.asInstanceOf[Attribute].name.contains("clicks"))
@ -293,7 +294,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
assert(children.last.isInstanceOf[ShuffleExchangeExec])
assert(bucketSpec.numBuckets === 200)
p
case p @ FileSourceScanExec(_, _, _, partitionFilters, _, dataFilters, _) =>
case p @ ExtractFileSourceScanExecFilters(partitionFilters, dataFilters) =>
// Check filter pushed down properly.
if (partitionFilters.nonEmpty) {
assert(filterConditions.forall(partitionFilters.toString.contains))
@ -325,7 +326,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
if (expectedAppendedFiles.nonEmpty) {
val nodes = plan.collect {
case u @ Union(children) =>
case u @ ExtractUnionChildren(children) =>
val indexChild = children.head
indexChild collect {
case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) =>
@ -355,7 +356,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
assert(children.head.isInstanceOf[ProjectExec]) // index data
assert(children.last.isInstanceOf[ProjectExec]) // appended data
p
case p @ FileSourceScanExec(_, _, _, partitionFilters, _, dataFilters, _) =>
case p @ ExtractFileSourceScanExecFilters(partitionFilters, dataFilters) =>
// Check filter pushed down properly.
if (partitionFilters.nonEmpty) {
assert(filterConditions.forall(partitionFilters.toString.contains))
@ -404,7 +405,13 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
val baseQuery = joinQuery()
val basePlan = baseQuery.queryExecution.optimizedPlan
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") {
// RemoveRedundantProjects rule causes HybridScanForIcebergTest to fail
// in Spark 3.1. Either a bug of Spark 3.1, or Iceberg needs to be
// updated. Either way, it will take some time to be fixed, so let's
// temporarily disable the rule here.
withSQLConf(
"spark.sql.autoBroadcastJoinThreshold" -> "-1",
"spark.sql.execution.removeRedundantProjects" -> "false") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") {

Просмотреть файл

@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags}
import com.microsoft.hyperspace.shim.JoinWithoutHint
import com.microsoft.hyperspace.shim.{JoinWithoutHint, RepartitionByExpressionWithOptionalNumPartitions}
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}
class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
@ -335,7 +335,10 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
// should be transformed to:
// Shuffle ("id") -> Project("id", "name") -> Filter ("id") -> Relation
assert(shuffled.collect {
case RepartitionByExpression(attrs, p: Project, numBuckets) =>
case RepartitionByExpressionWithOptionalNumPartitions(
attrs,
p: Project,
Some(numBuckets)) =>
assert(numBuckets == 100)
assert(attrs.size == 1)
assert(attrs.head.asInstanceOf[Attribute].name.contains("id"))
@ -362,7 +365,12 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
bucketSpec2,
query2.queryExecution.optimizedPlan)
assert(shuffled2.collect {
case Project(_, RepartitionByExpression(attrs, _: Filter, numBuckets)) =>
case Project(
_,
RepartitionByExpressionWithOptionalNumPartitions(
attrs,
_: Filter,
Some(numBuckets))) =>
assert(numBuckets == 100)
assert(attrs.size == 1)
assert(attrs.head.asInstanceOf[Attribute].name.contains("age"))